Compare commits

...

1 Commits

Author SHA1 Message Date
Charles Cunningham
08a1f30ca2 Add plan collaboration MVP to SDK and exec 2026-02-19 20:49:36 -08:00
15 changed files with 549 additions and 64 deletions

View File

@@ -26,6 +26,10 @@ pub struct Cli {
#[arg(long, short = 'm', global = true)]
pub model: Option<String>,
/// Collaboration mode to run for this turn.
#[arg(long = "collaboration-mode", value_enum, global = true)]
pub collaboration_mode: Option<CollaborationModeCliArg>,
/// Use open-source provider.
#[arg(long = "oss", default_value_t = false)]
pub oss: bool,
@@ -249,6 +253,13 @@ pub enum Color {
Auto,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
#[value(rename_all = "kebab-case")]
pub enum CollaborationModeCliArg {
Default,
Plan,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -25,7 +25,10 @@ use crate::exec_events::McpToolCallItemResult;
use crate::exec_events::McpToolCallStatus;
use crate::exec_events::PatchApplyStatus;
use crate::exec_events::PatchChangeKind;
use crate::exec_events::PlanDeltaEvent;
use crate::exec_events::PlanItem;
use crate::exec_events::ReasoningItem;
use crate::exec_events::RequestUserInputEvent;
use crate::exec_events::ThreadErrorEvent;
use crate::exec_events::ThreadEvent;
use crate::exec_events::ThreadItem;
@@ -121,12 +124,30 @@ impl EventProcessorWithJsonOutput {
protocol::EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev),
protocol::EventMsg::ThreadNameUpdated(_) => Vec::new(),
protocol::EventMsg::AgentMessage(ev) => self.handle_agent_message(ev),
protocol::EventMsg::ItemStarted(protocol::ItemStartedEvent {
item: codex_protocol::items::TurnItem::Plan(item),
..
}) => {
let item = ThreadItem {
id: item.id.clone(),
details: ThreadItemDetails::Plan(PlanItem {
text: item.text.clone(),
}),
};
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
protocol::EventMsg::ItemCompleted(protocol::ItemCompletedEvent {
item: codex_protocol::items::TurnItem::Plan(item),
..
}) => {
self.last_proposed_plan = Some(item.text.clone());
Vec::new()
let item = ThreadItem {
id: item.id.clone(),
details: ThreadItemDetails::Plan(PlanItem {
text: item.text.clone(),
}),
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
protocol::EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev),
protocol::EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev),
@@ -161,6 +182,17 @@ impl EventProcessorWithJsonOutput {
}
protocol::EventMsg::TurnStarted(ev) => self.handle_task_started(ev),
protocol::EventMsg::TurnComplete(_) => self.handle_task_complete(),
protocol::EventMsg::PlanDelta(ev) => vec![ThreadEvent::PlanDelta(PlanDeltaEvent {
item_id: ev.item_id.clone(),
delta: ev.delta.clone(),
})],
protocol::EventMsg::RequestUserInput(ev) => {
vec![ThreadEvent::RequestUserInput(RequestUserInputEvent {
id: ev.turn_id.clone(),
call_id: ev.call_id.clone(),
questions: ev.questions.clone(),
})]
}
protocol::EventMsg::Error(ev) => {
let error = ThreadErrorEvent {
message: ev.message.clone(),

View File

@@ -1,4 +1,5 @@
use codex_protocol::models::WebSearchAction;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
@@ -31,6 +32,12 @@ pub enum ThreadEvent {
/// Signals that an item has reached a terminal state—either success or failure.
#[serde(rename = "item.completed")]
ItemCompleted(ItemCompletedEvent),
/// Emitted when a plan item streams text deltas.
#[serde(rename = "item.plan.delta")]
PlanDelta(PlanDeltaEvent),
/// Emitted when the model requests input via the request_user_input tool.
#[serde(rename = "request_user_input")]
RequestUserInput(RequestUserInputEvent),
/// Represents an unrecoverable error emitted directly by the event stream.
#[serde(rename = "error")]
Error(ThreadErrorEvent),
@@ -82,6 +89,21 @@ pub struct ItemUpdatedEvent {
pub item: ThreadItem,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct PlanDeltaEvent {
pub item_id: String,
pub delta: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct RequestUserInputEvent {
/// Turn id for the in-flight request.
pub id: String,
/// Tool call id from the model stream.
pub call_id: String,
pub questions: Vec<RequestUserInputQuestion>,
}
/// Fatal error emitted by the stream.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct ThreadErrorEvent {
@@ -103,6 +125,8 @@ pub enum ThreadItemDetails {
/// Response from the agent.
/// Either a natural-language response or a JSON string when structured output is requested.
AgentMessage(AgentMessageItem),
/// Proposed implementation plan emitted in plan mode.
Plan(PlanItem),
/// Agent's reasoning summary.
Reasoning(ReasoningItem),
/// Tracks a command executed by the agent. The item starts when the command is
@@ -134,6 +158,11 @@ pub struct AgentMessageItem {
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct PlanItem {
pub text: String,
}
/// Agent's reasoning summary.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub struct ReasoningItem {

View File

@@ -11,6 +11,7 @@ pub mod event_processor_with_jsonl_output;
pub mod exec_events;
pub use cli::Cli;
pub use cli::CollaborationModeCliArg;
pub use cli::Command;
pub use cli::ReviewArgs;
use codex_cloud_requirements::cloud_requirements_loader;
@@ -40,7 +41,11 @@ use codex_core::protocol::ReviewRequest;
use codex_core::protocol::ReviewTarget;
use codex_core::protocol::SessionSource;
use codex_protocol::approvals::ElicitationAction;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Settings;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_oss::ensure_oss_provider_ready;
@@ -54,6 +59,7 @@ use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use supports_color::Stream;
use tokio::io::AsyncBufReadExt;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::error;
@@ -88,6 +94,15 @@ struct ThreadEventEnvelope {
event: Event,
}
#[derive(Debug, serde::Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ControlMessage {
UserInputAnswer {
id: String,
response: RequestUserInputResponse,
},
}
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
@@ -97,6 +112,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
command,
images,
model: model_cli_arg,
collaboration_mode: collaboration_mode_cli_arg,
oss,
oss_provider,
config_profile,
@@ -379,6 +395,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
.get_models_manager()
.get_default_model(&config.model, RefreshStrategy::OnlineIfUncached)
.await;
let default_collaboration_mode = collaboration_mode_cli_arg.map(|mode| {
let collaboration_mode = CollaborationMode {
mode: mode.into(),
settings: Settings {
model: default_model.clone(),
reasoning_effort: default_effort,
developer_instructions: None,
},
};
normalize_collaboration_mode_with_preset(
collaboration_mode,
thread_manager.list_collaboration_modes(),
)
});
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewThread {
@@ -479,6 +509,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
});
}
let mut control_lines = if json_mode {
Some(tokio::io::BufReader::new(tokio::io::stdin()).lines())
} else {
None
};
{
let thread_manager = Arc::clone(&thread_manager);
let attached_threads = Arc::clone(&attached_threads);
@@ -525,7 +561,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
effort: default_effort,
summary: default_summary,
final_output_json_schema: output_schema,
collaboration_mode: None,
collaboration_mode: default_collaboration_mode.clone(),
personality: None,
})
.await?;
@@ -568,6 +604,31 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
})
.await?;
}
if !json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
// Non-JSON exec has no interactive request_user_input UI. Return an empty response
// so the turn can continue instead of hanging indefinitely.
thread
.submit(Op::UserInputAnswer {
id: ev.turn_id.clone(),
response: RequestUserInputResponse {
answers: Default::default(),
},
})
.await?;
}
if json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
let response = read_user_input_answer(&mut control_lines, &ev.turn_id)
.await
.unwrap_or_else(|| RequestUserInputResponse {
answers: Default::default(),
});
thread
.submit(Op::UserInputAnswer {
id: ev.turn_id.clone(),
response,
})
.await?;
}
if let EventMsg::McpStartupUpdate(update) = &event.msg
&& required_mcp_servers.contains(&update.server)
&& let codex_core::protocol::McpStartupStatus::Failed { error } = &update.status
@@ -645,6 +706,69 @@ fn spawn_thread_listener(
});
}
type ControlLines = tokio::io::Lines<tokio::io::BufReader<tokio::io::Stdin>>;
async fn read_user_input_answer(
control_lines: &mut Option<ControlLines>,
expected_id: &str,
) -> Option<RequestUserInputResponse> {
let Some(control_lines) = control_lines.as_mut() else {
return None;
};
loop {
match control_lines.next_line().await {
Ok(Some(line)) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<ControlMessage>(trimmed) {
Ok(ControlMessage::UserInputAnswer { id, response }) => {
if id == expected_id {
return Some(response);
}
warn!("ignoring user_input_answer for id {id}; expected {expected_id}");
}
Err(err) => {
warn!("invalid control message: {err}");
}
}
}
Ok(None) => return None,
Err(err) => {
warn!("failed reading control stdin: {err}");
return None;
}
}
}
}
fn normalize_collaboration_mode_with_preset(
mut collaboration_mode: CollaborationMode,
presets: Vec<codex_protocol::config_types::CollaborationModeMask>,
) -> CollaborationMode {
if collaboration_mode.settings.developer_instructions.is_none()
&& let Some(instructions) = presets
.into_iter()
.find(|preset| preset.mode == Some(collaboration_mode.mode))
.and_then(|preset| preset.developer_instructions.flatten())
.filter(|instructions| !instructions.is_empty())
{
collaboration_mode.settings.developer_instructions = Some(instructions);
}
collaboration_mode
}
impl From<CollaborationModeCliArg> for ModeKind {
fn from(value: CollaborationModeCliArg) -> Self {
match value {
CollaborationModeCliArg::Default => ModeKind::Default,
CollaborationModeCliArg::Plan => ModeKind::Plan,
}
}
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,

View File

@@ -43,7 +43,10 @@ use codex_exec::exec_events::McpToolCallItemResult;
use codex_exec::exec_events::McpToolCallStatus;
use codex_exec::exec_events::PatchApplyStatus;
use codex_exec::exec_events::PatchChangeKind;
use codex_exec::exec_events::PlanDeltaEvent as ExecPlanDeltaEvent;
use codex_exec::exec_events::PlanItem as ExecPlanItem;
use codex_exec::exec_events::ReasoningItem;
use codex_exec::exec_events::RequestUserInputEvent as ExecRequestUserInputEvent;
use codex_exec::exec_events::ThreadErrorEvent;
use codex_exec::exec_events::ThreadEvent;
use codex_exec::exec_events::ThreadItem;
@@ -66,6 +69,8 @@ use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ExecCommandOutputDeltaEvent;
use codex_protocol::protocol::ExecOutputStream;
use codex_protocol::request_user_input::RequestUserInputQuestion;
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
use pretty_assertions::assert_eq;
use rmcp::model::Content;
use serde_json::json;
@@ -345,6 +350,114 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
);
}
#[test]
fn plan_item_and_plan_delta_emit_json_events() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let out_started = ep.collect_thread_events(&event(
"plan-started",
EventMsg::ItemStarted(codex_core::protocol::ItemStartedEvent {
thread_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap(),
turn_id: "turn-1".to_string(),
item: codex_protocol::items::TurnItem::Plan(codex_protocol::items::PlanItem {
id: "turn-1-plan".to_string(),
text: String::new(),
}),
}),
));
assert_eq!(
out_started,
vec![ThreadEvent::ItemStarted(ItemStartedEvent {
item: ThreadItem {
id: "turn-1-plan".to_string(),
details: ThreadItemDetails::Plan(ExecPlanItem {
text: String::new(),
}),
},
})]
);
let out_delta = ep.collect_thread_events(&event(
"plan-delta",
EventMsg::PlanDelta(codex_core::protocol::PlanDeltaEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "turn-1-plan".to_string(),
delta: "- step 1\n".to_string(),
}),
));
assert_eq!(
out_delta,
vec![ThreadEvent::PlanDelta(ExecPlanDeltaEvent {
item_id: "turn-1-plan".to_string(),
delta: "- step 1\n".to_string(),
})]
);
let out_completed = ep.collect_thread_events(&event(
"plan-completed",
EventMsg::ItemCompleted(codex_core::protocol::ItemCompletedEvent {
thread_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap(),
turn_id: "turn-1".to_string(),
item: codex_protocol::items::TurnItem::Plan(codex_protocol::items::PlanItem {
id: "turn-1-plan".to_string(),
text: "# Plan\n- step 1".to_string(),
}),
}),
));
assert_eq!(
out_completed,
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ThreadItem {
id: "turn-1-plan".to_string(),
details: ThreadItemDetails::Plan(ExecPlanItem {
text: "# Plan\n- step 1".to_string(),
}),
},
})]
);
}
#[test]
fn request_user_input_event_emits_json_event() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let out = ep.collect_thread_events(&event(
"rui-1",
EventMsg::RequestUserInput(codex_protocol::request_user_input::RequestUserInputEvent {
call_id: "call-123".to_string(),
turn_id: "turn-123".to_string(),
questions: vec![RequestUserInputQuestion {
id: "decision".to_string(),
header: "Decision".to_string(),
question: "Choose one".to_string(),
is_other: true,
is_secret: false,
options: Some(vec![RequestUserInputQuestionOption {
label: "A".to_string(),
description: "First".to_string(),
}]),
}],
}),
));
assert_eq!(
out,
vec![ThreadEvent::RequestUserInput(ExecRequestUserInputEvent {
id: "turn-123".to_string(),
call_id: "call-123".to_string(),
questions: vec![RequestUserInputQuestion {
id: "decision".to_string(),
header: "Decision".to_string(),
question: "Choose one".to_string(),
is_other: true,
is_secret: false,
options: Some(vec![RequestUserInputQuestionOption {
label: "A".to_string(),
description: "First".to_string(),
}]),
}],
})]
);
}
#[test]
fn mcp_tool_call_begin_and_end_emit_item_events() {
let mut ep = EventProcessorWithJsonOutput::new(None);

View File

@@ -57,6 +57,44 @@ export type ItemCompletedEvent = {
item: ThreadItem;
};
/** A single option for a request_user_input question. */
export type RequestUserInputQuestionOption = {
label: string;
description: string;
};
/** One user-facing prompt emitted by the request_user_input tool. */
export type RequestUserInputQuestion = {
id: string;
header: string;
question: string;
isOther: boolean;
isSecret: boolean;
options?: RequestUserInputQuestionOption[];
};
/** Response shape for answering a request_user_input prompt. */
export type RequestUserInputResponse = {
answers: Record<string, { answers: string[] }>;
};
/** Emitted when the agent invokes the request_user_input tool and awaits an answer. */
export type RequestUserInputEvent = {
type: "request_user_input";
/** Turn id for the in-flight request. */
id: string;
/** Tool call id from the model stream. */
call_id: string;
questions: RequestUserInputQuestion[];
};
/** Emitted when a plan item streams partial text updates. */
export type PlanDeltaEvent = {
type: "item.plan.delta";
item_id: string;
delta: string;
};
/** Fatal error emitted by the stream. */
export type ThreadError = {
message: string;
@@ -77,4 +115,6 @@ export type ThreadEvent =
| ItemStartedEvent
| ItemUpdatedEvent
| ItemCompletedEvent
| RequestUserInputEvent
| PlanDeltaEvent
| ThreadErrorEvent;

View File

@@ -4,7 +4,14 @@ import readline from "node:readline";
import { createRequire } from "node:module";
import type { CodexConfigObject, CodexConfigValue } from "./codexOptions";
import { SandboxMode, ModelReasoningEffort, ApprovalMode, WebSearchMode } from "./threadOptions";
import {
SandboxMode,
ModelReasoningEffort,
ApprovalMode,
WebSearchMode,
CollaborationMode,
} from "./threadOptions";
import type { RequestUserInputResponse } from "./events";
export type CodexExecArgs = {
input: string;
@@ -37,6 +44,18 @@ export type CodexExecArgs = {
webSearchEnabled?: boolean;
// --config approval_policy
approvalPolicy?: ApprovalMode;
// --collaboration-mode
collaborationMode?: CollaborationMode;
};
type CodexControlMessage = {
type: "user_input_answer";
id: string;
response: RequestUserInputResponse;
};
export type CodexExecRun = AsyncIterable<string> & {
sendControl: (message: CodexControlMessage) => void;
};
const INTERNAL_ORIGINATOR_ENV = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE";
@@ -69,7 +88,7 @@ export class CodexExec {
this.configOverrides = configOverrides;
}
async *run(args: CodexExecArgs): AsyncGenerator<string> {
run(args: CodexExecArgs): CodexExecRun {
const commandArgs: string[] = ["exec", "--experimental-json"];
if (this.configOverrides) {
@@ -127,6 +146,10 @@ export class CodexExec {
commandArgs.push("--config", `approval_policy="${args.approvalPolicy}"`);
}
if (args.collaborationMode) {
commandArgs.push("--collaboration-mode", args.collaborationMode);
}
if (args.threadId) {
commandArgs.push("resume", args.threadId);
}
@@ -137,6 +160,9 @@ export class CodexExec {
}
}
// Keep stdin available for control messages while passing the initial prompt as a positional arg.
commandArgs.push("--", args.input);
const env: Record<string, string> = {};
if (this.envOverride) {
Object.assign(env, this.envOverride);
@@ -169,9 +195,6 @@ export class CodexExec {
child.kill();
throw new Error("Child process has no stdin");
}
child.stdin.write(args.input);
child.stdin.end();
if (!child.stdout) {
child.kill();
throw new Error("Child process has no stdout");
@@ -197,28 +220,51 @@ export class CodexExec {
crlfDelay: Infinity,
});
try {
for await (const line of rl) {
// `line` is a string (Node sets default encoding to utf8 for readline)
yield line as string;
}
if (spawnError) throw spawnError;
const { code, signal } = await exitPromise;
if (code !== 0 || signal) {
const stderrBuffer = Buffer.concat(stderrChunks);
const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`;
throw new Error(`Codex Exec exited with ${detail}: ${stderrBuffer.toString("utf8")}`);
}
} finally {
rl.close();
child.removeAllListeners();
const generator = (async function* () {
try {
if (!child.killed) child.kill();
} catch {
// ignore
for await (const line of rl) {
// `line` is a string (Node sets default encoding to utf8 for readline)
yield line as string;
}
if (spawnError) throw spawnError;
const { code, signal } = await exitPromise;
if (code !== 0 || signal) {
const stderrBuffer = Buffer.concat(stderrChunks);
const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`;
throw new Error(`Codex Exec exited with ${detail}: ${stderrBuffer.toString("utf8")}`);
}
} finally {
rl.close();
child.removeAllListeners();
try {
if (child.stdin && !child.stdin.destroyed) {
child.stdin.end();
}
} catch {
// ignore
}
try {
if (!child.killed) child.kill();
} catch {
// ignore
}
}
}
})();
const sendControl = (message: CodexControlMessage): void => {
if (!child.stdin || child.stdin.destroyed || child.killed) {
throw new Error("Codex Exec is not accepting control messages");
}
child.stdin.write(`${JSON.stringify(message)}\n`);
};
return {
sendControl,
[Symbol.asyncIterator](): AsyncGenerator<string> {
return generator;
},
};
}
}

View File

@@ -7,6 +7,11 @@ export type {
ItemStartedEvent,
ItemUpdatedEvent,
ItemCompletedEvent,
RequestUserInputEvent,
RequestUserInputQuestion,
RequestUserInputQuestionOption,
RequestUserInputResponse,
PlanDeltaEvent,
ThreadError,
ThreadErrorEvent,
Usage,
@@ -14,6 +19,7 @@ export type {
export type {
ThreadItem,
AgentMessageItem,
PlanItem,
ReasoningItem,
CommandExecutionItem,
FileChangeItem,
@@ -36,5 +42,6 @@ export type {
SandboxMode,
ModelReasoningEffort,
WebSearchMode,
CollaborationMode,
} from "./threadOptions";
export type { TurnOptions } from "./turnOptions";

View File

@@ -78,6 +78,13 @@ export type AgentMessageItem = {
text: string;
};
/** Proposed implementation plan emitted in plan mode. */
export type PlanItem = {
id: string;
type: "plan";
text: string;
};
/** Agent's reasoning summary. */
export type ReasoningItem = {
id: string;
@@ -118,6 +125,7 @@ export type TodoListItem = {
/** Canonical union of thread items and their type-specific payloads. */
export type ThreadItem =
| AgentMessageItem
| PlanItem
| ReasoningItem
| CommandExecutionItem
| FileChangeItem

View File

@@ -1,5 +1,5 @@
import { CodexOptions } from "./codexOptions";
import { ThreadEvent, ThreadError, Usage } from "./events";
import { RequestUserInputResponse, ThreadEvent, ThreadError, Usage } from "./events";
import { CodexExec } from "./exec";
import { ThreadItem } from "./items";
import { ThreadOptions } from "./threadOptions";
@@ -43,6 +43,8 @@ export class Thread {
private _options: CodexOptions;
private _id: string | null;
private _threadOptions: ThreadOptions;
private _sendUserInputAnswer: ((id: string, response: RequestUserInputResponse) => void) | null =
null;
/** Returns the ID of the thread. Populated after the first turn starts. */
public get id(): string | null {
@@ -67,6 +69,14 @@ export class Thread {
return { events: this.runStreamedInternal(input, turnOptions) };
}
/** Sends an answer for an in-flight request_user_input prompt. */
answerUserInput(id: string, response: RequestUserInputResponse): void {
if (!this._sendUserInputAnswer) {
throw new Error("No active turn is awaiting request_user_input answers");
}
this._sendUserInputAnswer(id, response);
}
private async *runStreamedInternal(
input: Input,
turnOptions: TurnOptions = {},
@@ -74,7 +84,7 @@ export class Thread {
const { schemaPath, cleanup } = await createOutputSchemaFile(turnOptions.outputSchema);
const options = this._threadOptions;
const { prompt, images } = normalizeInput(input);
const generator = this._exec.run({
const run = this._exec.run({
input: prompt,
baseUrl: this._options.baseUrl,
apiKey: this._options.apiKey,
@@ -92,9 +102,17 @@ export class Thread {
webSearchEnabled: options?.webSearchEnabled,
approvalPolicy: options?.approvalPolicy,
additionalDirectories: options?.additionalDirectories,
collaborationMode: options?.collaborationMode,
});
this._sendUserInputAnswer = (id: string, response: RequestUserInputResponse) => {
run.sendControl({
type: "user_input_answer",
id,
response,
});
};
try {
for await (const item of generator) {
for await (const item of run) {
let parsed: ThreadEvent;
try {
parsed = JSON.parse(item) as ThreadEvent;
@@ -107,6 +125,7 @@ export class Thread {
yield parsed;
}
} finally {
this._sendUserInputAnswer = null;
await cleanup();
}
}
@@ -119,9 +138,23 @@ export class Thread {
let usage: Usage | null = null;
let turnFailure: ThreadError | null = null;
for await (const event of generator) {
if (event.type === "item.completed") {
if (event.type === "request_user_input") {
if (!turnOptions.onRequestUserInput) {
throw new Error(
"Turn requested user input but no onRequestUserInput handler was provided",
);
}
const response = await turnOptions.onRequestUserInput({
id: event.id,
call_id: event.call_id,
questions: event.questions,
});
this.answerUserInput(event.id, response);
} else if (event.type === "item.completed") {
if (event.item.type === "agent_message") {
finalResponse = event.item.text;
} else if (event.item.type === "plan" && finalResponse === "") {
finalResponse = event.item.text;
}
items.push(event.item);
} else if (event.type === "turn.completed") {

View File

@@ -6,6 +6,8 @@ export type ModelReasoningEffort = "minimal" | "low" | "medium" | "high" | "xhig
export type WebSearchMode = "disabled" | "cached" | "live";
export type CollaborationMode = "default" | "plan";
export type ThreadOptions = {
model?: string;
sandboxMode?: SandboxMode;
@@ -17,4 +19,5 @@ export type ThreadOptions = {
webSearchEnabled?: boolean;
approvalPolicy?: ApprovalMode;
additionalDirectories?: string[];
collaborationMode?: CollaborationMode;
};

View File

@@ -1,6 +1,15 @@
import type { RequestUserInputEvent, RequestUserInputResponse } from "./events";
export type TurnOptions = {
/** JSON schema describing the expected agent output. */
outputSchema?: unknown;
/** AbortSignal to cancel the turn. */
signal?: AbortSignal;
/**
* Optional handler for request_user_input prompts received during `run()`.
* If omitted and the turn asks for input, `run()` throws.
*/
onRequestUserInput?: (
request: Omit<RequestUserInputEvent, "type">,
) => Promise<RequestUserInputResponse>;
};

View File

@@ -157,7 +157,11 @@ describe("AbortSignal support", () => {
const result = await thread.run("Hello, world!", { signal: controller.signal });
expect(result.finalResponse).toBe("Hi!");
expect(result.items).toHaveLength(1);
expect(result.items).toContainEqual({
id: expect.any(String),
type: "agent_message",
text: "Hi!",
});
} finally {
await close();
}

View File

@@ -32,14 +32,11 @@ describe("Codex", () => {
const thread = client.startThread();
const result = await thread.run("Hello, world!");
const expectedItems = [
{
id: expect.any(String),
type: "agent_message",
text: "Hi!",
},
];
expect(result.items).toEqual(expectedItems);
expect(result.items).toContainEqual({
id: expect.any(String),
type: "agent_message",
text: "Hi!",
});
expect(result.usage).toEqual({
cached_input_tokens: 12,
input_tokens: 42,
@@ -49,7 +46,7 @@ describe("Codex", () => {
} finally {
await close();
}
});
}, 15000);
it("sends previous items when run is called twice", async () => {
const { url, close, requests } = await startResponsesTestProxy({
@@ -410,6 +407,37 @@ describe("Codex", () => {
}
});
it("passes collaborationMode to exec", async () => {
const { url, close } = await startResponsesTestProxy({
statusCode: 200,
responseBodies: [
sse(
responseStarted("response_1"),
assistantMessage("Collaboration mode set", "item_1"),
responseCompleted("response_1"),
),
],
});
const { args: spawnArgs, restore } = codexExecSpy();
try {
const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" });
const thread = client.startThread({
collaborationMode: "plan",
});
await thread.run("test collaboration mode");
const commandArgs = spawnArgs[0];
expect(commandArgs).toBeDefined();
expectPair(commandArgs, ["--collaboration-mode", "plan"]);
} finally {
restore();
await close();
}
});
it("passes CodexOptions config overrides as TOML --config flags", async () => {
const { url, close } = await startResponsesTestProxy({
statusCode: 200,

View File

@@ -33,31 +33,29 @@ describe("Codex", () => {
events.push(event);
}
expect(events).toEqual([
{
type: "thread.started",
thread_id: expect.any(String),
expect(events).toContainEqual({
type: "thread.started",
thread_id: expect.any(String),
});
expect(events).toContainEqual({
type: "turn.started",
});
expect(events).toContainEqual({
type: "item.completed",
item: {
id: expect.any(String),
type: "agent_message",
text: "Hi!",
},
{
type: "turn.started",
});
expect(events).toContainEqual({
type: "turn.completed",
usage: {
cached_input_tokens: 12,
input_tokens: 42,
output_tokens: 5,
},
{
type: "item.completed",
item: {
id: "item_0",
type: "agent_message",
text: "Hi!",
},
},
{
type: "turn.completed",
usage: {
cached_input_tokens: 12,
input_tokens: 42,
output_tokens: 5,
},
},
]);
});
expect(thread.id).toEqual(expect.any(String));
} finally {
await close();