Compare commits

...

1 Commits

Author SHA1 Message Date
pakrym-oai
8fb9f9f1dd Add abort support to TypeScript SDK runs 2025-10-23 21:01:19 -07:00
6 changed files with 200 additions and 15 deletions

View File

@@ -50,6 +50,27 @@ for await (const event of events) {
}
```
### Aborting a turn
Both `run()` and `runStreamed()` accept an `AbortSignal` via `TurnOptions.signal`. Attach a signal from an
`AbortController` to stop an in-flight turn.
```typescript
const controller = new AbortController();
const turnPromise = thread.run("Perform a health check", { signal: controller.signal });
setTimeout(() => controller.abort(), 1000);
try {
await turnPromise;
} catch (error) {
if ((error as Error).name === "AbortError") {
console.log("Turn cancelled");
}
}
```
### Structured output
The Codex agent can produce a JSON response that conforms to a specified schema. The schema can be provided for each turn as a plain JSON object.

View File

@@ -22,6 +22,7 @@ export type CodexExecArgs = {
skipGitRepoCheck?: boolean;
// --output-schema
outputSchemaFile?: string;
signal?: AbortSignal;
};
const INTERNAL_ORIGINATOR_ENV = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE";
@@ -34,6 +35,9 @@ export class CodexExec {
}
async *run(args: CodexExecArgs): AsyncGenerator<string> {
if (args.signal?.aborted) {
throw createAbortError(args.signal);
}
const commandArgs: string[] = ["exec", "--experimental-json"];
if (args.model) {
@@ -98,6 +102,23 @@ export class CodexExec {
throw new Error("Child process has no stdout");
}
const stderrChunks: Buffer[] = [];
let abortError: unknown | null = null;
const exitPromise: Promise<void> = new Promise((resolve, reject) => {
child.once("exit", (code, signal) => {
if (abortError) {
reject(abortError);
return;
}
if (code === 0) {
resolve();
return;
}
const stderrBuffer = Buffer.concat(stderrChunks);
const exitDescription = signal ? `signal ${signal}` : `code ${code}`;
reject(new Error(`Codex Exec exited with ${exitDescription}: ${stderrBuffer.toString("utf8")}`));
});
});
if (child.stderr) {
child.stderr.on("data", (data) => {
@@ -110,28 +131,34 @@ export class CodexExec {
crlfDelay: Infinity,
});
const abortHandler = () => {
abortError = createAbortError(args.signal);
try {
rl.close();
} catch {
// ignore
}
try {
if (!child.killed) child.kill();
} catch {
// ignore
}
};
if (args.signal) {
args.signal.addEventListener("abort", abortHandler, { once: true });
}
try {
for await (const line of rl) {
// `line` is a string (Node sets default encoding to utf8 for readline)
yield line as string;
}
const exitCode = new Promise((resolve, reject) => {
child.once("exit", (code) => {
if (code === 0) {
resolve(code);
} else {
const stderrBuffer = Buffer.concat(stderrChunks);
reject(
new Error(`Codex Exec exited with code ${code}: ${stderrBuffer.toString("utf8")}`),
);
}
});
});
if (spawnError) throw spawnError;
await exitCode;
await exitPromise;
} finally {
if (args.signal) {
args.signal.removeEventListener("abort", abortHandler);
}
rl.close();
child.removeAllListeners();
try {
@@ -143,6 +170,15 @@ export class CodexExec {
}
}
function createAbortError(signal?: AbortSignal): unknown {
if (signal?.reason !== undefined) {
return signal.reason;
}
const error = new Error("Codex turn aborted");
error.name = "AbortError";
return error;
}
const scriptFileName = fileURLToPath(import.meta.url);
const scriptDirName = path.dirname(scriptFileName);

View File

@@ -85,6 +85,7 @@ export class Thread {
workingDirectory: options?.workingDirectory,
skipGitRepoCheck: options?.skipGitRepoCheck,
outputSchemaFile: schemaPath,
signal: turnOptions.signal,
});
try {
for await (const item of generator) {

View File

@@ -1,4 +1,6 @@
export type TurnOptions = {
/** JSON schema describing the expected agent output. */
outputSchema?: unknown;
/** Abort signal that cancels an in-flight turn. */
signal?: AbortSignal;
};

View File

@@ -0,0 +1,67 @@
import { describe, expect, it } from "@jest/globals";
import type { CodexExec, CodexExecArgs } from "../src/exec";
import type { CodexOptions } from "../src/codexOptions";
import { Thread } from "../src/thread";
import type { ThreadOptions } from "../src/threadOptions";
class AbortAwareExec {
public calls: CodexExecArgs[] = [];
async *run(args: CodexExecArgs): AsyncGenerator<string> {
this.calls.push(args);
yield JSON.stringify({ type: "thread.started", thread_id: "thread_123" });
yield JSON.stringify({ type: "turn.started" });
await waitForAbort(args.signal);
}
}
function waitForAbort(signal?: AbortSignal): Promise<never> {
return new Promise((_, reject) => {
if (!signal) {
reject(new Error("Missing abort signal"));
return;
}
signal.addEventListener(
"abort",
() => {
reject(signal.reason ?? new Error("aborted"));
},
{ once: true },
);
});
}
describe("Thread turn cancellation", () => {
const codexOptions = {} as CodexOptions;
const threadOptions = {} as ThreadOptions;
it("rejects run when the abort signal is triggered", async () => {
const exec = new AbortAwareExec();
const thread = new Thread(exec as unknown as CodexExec, codexOptions, threadOptions);
const controller = new AbortController();
const abortError = new Error("stop");
const promise = thread.run("Hello", { signal: controller.signal });
controller.abort(abortError);
await expect(promise).rejects.toBe(abortError);
expect(exec.calls[0]?.signal).toBe(controller.signal);
});
it("rejects runStreamed iterators when the abort signal is triggered", async () => {
const exec = new AbortAwareExec();
const thread = new Thread(exec as unknown as CodexExec, codexOptions, threadOptions);
const controller = new AbortController();
const abortError = new Error("stop");
const { events } = await thread.runStreamed("Hello", { signal: controller.signal });
await events.next();
await events.next();
const pending = events.next();
controller.abort(abortError);
await expect(pending).rejects.toBe(abortError);
expect(exec.calls[0]?.signal).toBe(controller.signal);
});
});

View File

@@ -0,0 +1,58 @@
import { beforeEach, describe, expect, it } from "@jest/globals";
import type { ChildProcess } from "node:child_process";
import * as child_process from "node:child_process";
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import { CodexExec } from "../src/exec";
jest.mock("node:child_process", () => {
const actual = jest.requireActual<typeof import("node:child_process")>("node:child_process");
return { ...actual, spawn: jest.fn() };
});
const actualChildProcess =
jest.requireActual<typeof import("node:child_process")>("node:child_process");
const spawnMock = child_process.spawn as jest.MockedFunction<typeof actualChildProcess.spawn>;
class FakeChildProcess extends EventEmitter {
public stdin: PassThrough | null = new PassThrough();
public stdout: PassThrough | null = new PassThrough();
public stderr: PassThrough | null = new PassThrough();
public killed = false;
public kill = jest.fn((_signal?: NodeJS.Signals | number) => {
this.killed = true;
return true;
});
}
describe("CodexExec", () => {
beforeEach(() => {
spawnMock.mockReset();
});
it("rejects pending iterations when aborted", async () => {
const fakeChild = new FakeChildProcess();
spawnMock.mockReturnValue(fakeChild as unknown as ChildProcess);
const exec = new CodexExec("/fake/path");
const controller = new AbortController();
const generator = exec.run({ input: "hello", signal: controller.signal });
const first = generator.next();
fakeChild.stdout?.write("{\"type\":\"turn.started\"}\n");
await expect(first).resolves.toEqual({
value: "{\"type\":\"turn.started\"}",
done: false,
});
const pending = generator.next();
const abortError = new Error("stop");
controller.abort(abortError);
fakeChild.stdout?.end();
fakeChild.emit("exit", null, "SIGTERM");
await expect(pending).rejects.toBe(abortError);
expect(fakeChild.kill).toHaveBeenCalled();
});
});