mirror of
https://github.com/openai/codex.git
synced 2026-03-24 09:06:33 +03:00
Compare commits
1 Commits
starr/exec
...
cc/plan-mo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08a1f30ca2 |
@@ -26,6 +26,10 @@ pub struct Cli {
|
||||
#[arg(long, short = 'm', global = true)]
|
||||
pub model: Option<String>,
|
||||
|
||||
/// Collaboration mode to run for this turn.
|
||||
#[arg(long = "collaboration-mode", value_enum, global = true)]
|
||||
pub collaboration_mode: Option<CollaborationModeCliArg>,
|
||||
|
||||
/// Use open-source provider.
|
||||
#[arg(long = "oss", default_value_t = false)]
|
||||
pub oss: bool,
|
||||
@@ -249,6 +253,13 @@ pub enum Color {
|
||||
Auto,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
|
||||
#[value(rename_all = "kebab-case")]
|
||||
pub enum CollaborationModeCliArg {
|
||||
Default,
|
||||
Plan,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -25,7 +25,10 @@ use crate::exec_events::McpToolCallItemResult;
|
||||
use crate::exec_events::McpToolCallStatus;
|
||||
use crate::exec_events::PatchApplyStatus;
|
||||
use crate::exec_events::PatchChangeKind;
|
||||
use crate::exec_events::PlanDeltaEvent;
|
||||
use crate::exec_events::PlanItem;
|
||||
use crate::exec_events::ReasoningItem;
|
||||
use crate::exec_events::RequestUserInputEvent;
|
||||
use crate::exec_events::ThreadErrorEvent;
|
||||
use crate::exec_events::ThreadEvent;
|
||||
use crate::exec_events::ThreadItem;
|
||||
@@ -121,12 +124,30 @@ impl EventProcessorWithJsonOutput {
|
||||
protocol::EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev),
|
||||
protocol::EventMsg::ThreadNameUpdated(_) => Vec::new(),
|
||||
protocol::EventMsg::AgentMessage(ev) => self.handle_agent_message(ev),
|
||||
protocol::EventMsg::ItemStarted(protocol::ItemStartedEvent {
|
||||
item: codex_protocol::items::TurnItem::Plan(item),
|
||||
..
|
||||
}) => {
|
||||
let item = ThreadItem {
|
||||
id: item.id.clone(),
|
||||
details: ThreadItemDetails::Plan(PlanItem {
|
||||
text: item.text.clone(),
|
||||
}),
|
||||
};
|
||||
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
|
||||
}
|
||||
protocol::EventMsg::ItemCompleted(protocol::ItemCompletedEvent {
|
||||
item: codex_protocol::items::TurnItem::Plan(item),
|
||||
..
|
||||
}) => {
|
||||
self.last_proposed_plan = Some(item.text.clone());
|
||||
Vec::new()
|
||||
let item = ThreadItem {
|
||||
id: item.id.clone(),
|
||||
details: ThreadItemDetails::Plan(PlanItem {
|
||||
text: item.text.clone(),
|
||||
}),
|
||||
};
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
|
||||
}
|
||||
protocol::EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev),
|
||||
protocol::EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev),
|
||||
@@ -161,6 +182,17 @@ impl EventProcessorWithJsonOutput {
|
||||
}
|
||||
protocol::EventMsg::TurnStarted(ev) => self.handle_task_started(ev),
|
||||
protocol::EventMsg::TurnComplete(_) => self.handle_task_complete(),
|
||||
protocol::EventMsg::PlanDelta(ev) => vec![ThreadEvent::PlanDelta(PlanDeltaEvent {
|
||||
item_id: ev.item_id.clone(),
|
||||
delta: ev.delta.clone(),
|
||||
})],
|
||||
protocol::EventMsg::RequestUserInput(ev) => {
|
||||
vec![ThreadEvent::RequestUserInput(RequestUserInputEvent {
|
||||
id: ev.turn_id.clone(),
|
||||
call_id: ev.call_id.clone(),
|
||||
questions: ev.questions.clone(),
|
||||
})]
|
||||
}
|
||||
protocol::EventMsg::Error(ev) => {
|
||||
let error = ThreadErrorEvent {
|
||||
message: ev.message.clone(),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestion;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -31,6 +32,12 @@ pub enum ThreadEvent {
|
||||
/// Signals that an item has reached a terminal state—either success or failure.
|
||||
#[serde(rename = "item.completed")]
|
||||
ItemCompleted(ItemCompletedEvent),
|
||||
/// Emitted when a plan item streams text deltas.
|
||||
#[serde(rename = "item.plan.delta")]
|
||||
PlanDelta(PlanDeltaEvent),
|
||||
/// Emitted when the model requests input via the request_user_input tool.
|
||||
#[serde(rename = "request_user_input")]
|
||||
RequestUserInput(RequestUserInputEvent),
|
||||
/// Represents an unrecoverable error emitted directly by the event stream.
|
||||
#[serde(rename = "error")]
|
||||
Error(ThreadErrorEvent),
|
||||
@@ -82,6 +89,21 @@ pub struct ItemUpdatedEvent {
|
||||
pub item: ThreadItem,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct PlanDeltaEvent {
|
||||
pub item_id: String,
|
||||
pub delta: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct RequestUserInputEvent {
|
||||
/// Turn id for the in-flight request.
|
||||
pub id: String,
|
||||
/// Tool call id from the model stream.
|
||||
pub call_id: String,
|
||||
pub questions: Vec<RequestUserInputQuestion>,
|
||||
}
|
||||
|
||||
/// Fatal error emitted by the stream.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct ThreadErrorEvent {
|
||||
@@ -103,6 +125,8 @@ pub enum ThreadItemDetails {
|
||||
/// Response from the agent.
|
||||
/// Either a natural-language response or a JSON string when structured output is requested.
|
||||
AgentMessage(AgentMessageItem),
|
||||
/// Proposed implementation plan emitted in plan mode.
|
||||
Plan(PlanItem),
|
||||
/// Agent's reasoning summary.
|
||||
Reasoning(ReasoningItem),
|
||||
/// Tracks a command executed by the agent. The item starts when the command is
|
||||
@@ -134,6 +158,11 @@ pub struct AgentMessageItem {
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct PlanItem {
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
/// Agent's reasoning summary.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct ReasoningItem {
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod event_processor_with_jsonl_output;
|
||||
pub mod exec_events;
|
||||
|
||||
pub use cli::Cli;
|
||||
pub use cli::CollaborationModeCliArg;
|
||||
pub use cli::Command;
|
||||
pub use cli::ReviewArgs;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
@@ -40,7 +41,11 @@ use codex_core::protocol::ReviewRequest;
|
||||
use codex_core::protocol::ReviewTarget;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_protocol::approvals::ElicitationAction;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::config_types::Settings;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_oss::ensure_oss_provider_ready;
|
||||
@@ -54,6 +59,7 @@ use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use supports_color::Stream;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
@@ -88,6 +94,15 @@ struct ThreadEventEnvelope {
|
||||
event: Event,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum ControlMessage {
|
||||
UserInputAnswer {
|
||||
id: String,
|
||||
response: RequestUserInputResponse,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
|
||||
if let Err(err) = set_default_originator("codex_exec".to_string()) {
|
||||
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
|
||||
@@ -97,6 +112,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
command,
|
||||
images,
|
||||
model: model_cli_arg,
|
||||
collaboration_mode: collaboration_mode_cli_arg,
|
||||
oss,
|
||||
oss_provider,
|
||||
config_profile,
|
||||
@@ -379,6 +395,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
.get_models_manager()
|
||||
.get_default_model(&config.model, RefreshStrategy::OnlineIfUncached)
|
||||
.await;
|
||||
let default_collaboration_mode = collaboration_mode_cli_arg.map(|mode| {
|
||||
let collaboration_mode = CollaborationMode {
|
||||
mode: mode.into(),
|
||||
settings: Settings {
|
||||
model: default_model.clone(),
|
||||
reasoning_effort: default_effort,
|
||||
developer_instructions: None,
|
||||
},
|
||||
};
|
||||
normalize_collaboration_mode_with_preset(
|
||||
collaboration_mode,
|
||||
thread_manager.list_collaboration_modes(),
|
||||
)
|
||||
});
|
||||
|
||||
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
|
||||
let NewThread {
|
||||
@@ -479,6 +509,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
});
|
||||
}
|
||||
|
||||
let mut control_lines = if json_mode {
|
||||
Some(tokio::io::BufReader::new(tokio::io::stdin()).lines())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
{
|
||||
let thread_manager = Arc::clone(&thread_manager);
|
||||
let attached_threads = Arc::clone(&attached_threads);
|
||||
@@ -525,7 +561,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
effort: default_effort,
|
||||
summary: default_summary,
|
||||
final_output_json_schema: output_schema,
|
||||
collaboration_mode: None,
|
||||
collaboration_mode: default_collaboration_mode.clone(),
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -568,6 +604,31 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
if !json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
|
||||
// Non-JSON exec has no interactive request_user_input UI. Return an empty response
|
||||
// so the turn can continue instead of hanging indefinitely.
|
||||
thread
|
||||
.submit(Op::UserInputAnswer {
|
||||
id: ev.turn_id.clone(),
|
||||
response: RequestUserInputResponse {
|
||||
answers: Default::default(),
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
if json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
|
||||
let response = read_user_input_answer(&mut control_lines, &ev.turn_id)
|
||||
.await
|
||||
.unwrap_or_else(|| RequestUserInputResponse {
|
||||
answers: Default::default(),
|
||||
});
|
||||
thread
|
||||
.submit(Op::UserInputAnswer {
|
||||
id: ev.turn_id.clone(),
|
||||
response,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
if let EventMsg::McpStartupUpdate(update) = &event.msg
|
||||
&& required_mcp_servers.contains(&update.server)
|
||||
&& let codex_core::protocol::McpStartupStatus::Failed { error } = &update.status
|
||||
@@ -645,6 +706,69 @@ fn spawn_thread_listener(
|
||||
});
|
||||
}
|
||||
|
||||
type ControlLines = tokio::io::Lines<tokio::io::BufReader<tokio::io::Stdin>>;
|
||||
|
||||
async fn read_user_input_answer(
|
||||
control_lines: &mut Option<ControlLines>,
|
||||
expected_id: &str,
|
||||
) -> Option<RequestUserInputResponse> {
|
||||
let Some(control_lines) = control_lines.as_mut() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
loop {
|
||||
match control_lines.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
match serde_json::from_str::<ControlMessage>(trimmed) {
|
||||
Ok(ControlMessage::UserInputAnswer { id, response }) => {
|
||||
if id == expected_id {
|
||||
return Some(response);
|
||||
}
|
||||
warn!("ignoring user_input_answer for id {id}; expected {expected_id}");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("invalid control message: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => return None,
|
||||
Err(err) => {
|
||||
warn!("failed reading control stdin: {err}");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_collaboration_mode_with_preset(
|
||||
mut collaboration_mode: CollaborationMode,
|
||||
presets: Vec<codex_protocol::config_types::CollaborationModeMask>,
|
||||
) -> CollaborationMode {
|
||||
if collaboration_mode.settings.developer_instructions.is_none()
|
||||
&& let Some(instructions) = presets
|
||||
.into_iter()
|
||||
.find(|preset| preset.mode == Some(collaboration_mode.mode))
|
||||
.and_then(|preset| preset.developer_instructions.flatten())
|
||||
.filter(|instructions| !instructions.is_empty())
|
||||
{
|
||||
collaboration_mode.settings.developer_instructions = Some(instructions);
|
||||
}
|
||||
collaboration_mode
|
||||
}
|
||||
|
||||
impl From<CollaborationModeCliArg> for ModeKind {
|
||||
fn from(value: CollaborationModeCliArg) -> Self {
|
||||
match value {
|
||||
CollaborationModeCliArg::Default => ModeKind::Default,
|
||||
CollaborationModeCliArg::Plan => ModeKind::Plan,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_resume_path(
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
|
||||
@@ -43,7 +43,10 @@ use codex_exec::exec_events::McpToolCallItemResult;
|
||||
use codex_exec::exec_events::McpToolCallStatus;
|
||||
use codex_exec::exec_events::PatchApplyStatus;
|
||||
use codex_exec::exec_events::PatchChangeKind;
|
||||
use codex_exec::exec_events::PlanDeltaEvent as ExecPlanDeltaEvent;
|
||||
use codex_exec::exec_events::PlanItem as ExecPlanItem;
|
||||
use codex_exec::exec_events::ReasoningItem;
|
||||
use codex_exec::exec_events::RequestUserInputEvent as ExecRequestUserInputEvent;
|
||||
use codex_exec::exec_events::ThreadErrorEvent;
|
||||
use codex_exec::exec_events::ThreadEvent;
|
||||
use codex_exec::exec_events::ThreadItem;
|
||||
@@ -66,6 +69,8 @@ use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ExecCommandOutputDeltaEvent;
|
||||
use codex_protocol::protocol::ExecOutputStream;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestion;
|
||||
use codex_protocol::request_user_input::RequestUserInputQuestionOption;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::model::Content;
|
||||
use serde_json::json;
|
||||
@@ -345,6 +350,114 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plan_item_and_plan_delta_emit_json_events() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
let out_started = ep.collect_thread_events(&event(
|
||||
"plan-started",
|
||||
EventMsg::ItemStarted(codex_core::protocol::ItemStartedEvent {
|
||||
thread_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: codex_protocol::items::TurnItem::Plan(codex_protocol::items::PlanItem {
|
||||
id: "turn-1-plan".to_string(),
|
||||
text: String::new(),
|
||||
}),
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
out_started,
|
||||
vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
||||
item: ThreadItem {
|
||||
id: "turn-1-plan".to_string(),
|
||||
details: ThreadItemDetails::Plan(ExecPlanItem {
|
||||
text: String::new(),
|
||||
}),
|
||||
},
|
||||
})]
|
||||
);
|
||||
|
||||
let out_delta = ep.collect_thread_events(&event(
|
||||
"plan-delta",
|
||||
EventMsg::PlanDelta(codex_core::protocol::PlanDeltaEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "turn-1-plan".to_string(),
|
||||
delta: "- step 1\n".to_string(),
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
out_delta,
|
||||
vec![ThreadEvent::PlanDelta(ExecPlanDeltaEvent {
|
||||
item_id: "turn-1-plan".to_string(),
|
||||
delta: "- step 1\n".to_string(),
|
||||
})]
|
||||
);
|
||||
|
||||
let out_completed = ep.collect_thread_events(&event(
|
||||
"plan-completed",
|
||||
EventMsg::ItemCompleted(codex_core::protocol::ItemCompletedEvent {
|
||||
thread_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: codex_protocol::items::TurnItem::Plan(codex_protocol::items::PlanItem {
|
||||
id: "turn-1-plan".to_string(),
|
||||
text: "# Plan\n- step 1".to_string(),
|
||||
}),
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
out_completed,
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ThreadItem {
|
||||
id: "turn-1-plan".to_string(),
|
||||
details: ThreadItemDetails::Plan(ExecPlanItem {
|
||||
text: "# Plan\n- step 1".to_string(),
|
||||
}),
|
||||
},
|
||||
})]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn request_user_input_event_emits_json_event() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
let out = ep.collect_thread_events(&event(
|
||||
"rui-1",
|
||||
EventMsg::RequestUserInput(codex_protocol::request_user_input::RequestUserInputEvent {
|
||||
call_id: "call-123".to_string(),
|
||||
turn_id: "turn-123".to_string(),
|
||||
questions: vec![RequestUserInputQuestion {
|
||||
id: "decision".to_string(),
|
||||
header: "Decision".to_string(),
|
||||
question: "Choose one".to_string(),
|
||||
is_other: true,
|
||||
is_secret: false,
|
||||
options: Some(vec![RequestUserInputQuestionOption {
|
||||
label: "A".to_string(),
|
||||
description: "First".to_string(),
|
||||
}]),
|
||||
}],
|
||||
}),
|
||||
));
|
||||
assert_eq!(
|
||||
out,
|
||||
vec![ThreadEvent::RequestUserInput(ExecRequestUserInputEvent {
|
||||
id: "turn-123".to_string(),
|
||||
call_id: "call-123".to_string(),
|
||||
questions: vec![RequestUserInputQuestion {
|
||||
id: "decision".to_string(),
|
||||
header: "Decision".to_string(),
|
||||
question: "Choose one".to_string(),
|
||||
is_other: true,
|
||||
is_secret: false,
|
||||
options: Some(vec![RequestUserInputQuestionOption {
|
||||
label: "A".to_string(),
|
||||
description: "First".to_string(),
|
||||
}]),
|
||||
}],
|
||||
})]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_tool_call_begin_and_end_emit_item_events() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
@@ -57,6 +57,44 @@ export type ItemCompletedEvent = {
|
||||
item: ThreadItem;
|
||||
};
|
||||
|
||||
/** A single option for a request_user_input question. */
|
||||
export type RequestUserInputQuestionOption = {
|
||||
label: string;
|
||||
description: string;
|
||||
};
|
||||
|
||||
/** One user-facing prompt emitted by the request_user_input tool. */
|
||||
export type RequestUserInputQuestion = {
|
||||
id: string;
|
||||
header: string;
|
||||
question: string;
|
||||
isOther: boolean;
|
||||
isSecret: boolean;
|
||||
options?: RequestUserInputQuestionOption[];
|
||||
};
|
||||
|
||||
/** Response shape for answering a request_user_input prompt. */
|
||||
export type RequestUserInputResponse = {
|
||||
answers: Record<string, { answers: string[] }>;
|
||||
};
|
||||
|
||||
/** Emitted when the agent invokes the request_user_input tool and awaits an answer. */
|
||||
export type RequestUserInputEvent = {
|
||||
type: "request_user_input";
|
||||
/** Turn id for the in-flight request. */
|
||||
id: string;
|
||||
/** Tool call id from the model stream. */
|
||||
call_id: string;
|
||||
questions: RequestUserInputQuestion[];
|
||||
};
|
||||
|
||||
/** Emitted when a plan item streams partial text updates. */
|
||||
export type PlanDeltaEvent = {
|
||||
type: "item.plan.delta";
|
||||
item_id: string;
|
||||
delta: string;
|
||||
};
|
||||
|
||||
/** Fatal error emitted by the stream. */
|
||||
export type ThreadError = {
|
||||
message: string;
|
||||
@@ -77,4 +115,6 @@ export type ThreadEvent =
|
||||
| ItemStartedEvent
|
||||
| ItemUpdatedEvent
|
||||
| ItemCompletedEvent
|
||||
| RequestUserInputEvent
|
||||
| PlanDeltaEvent
|
||||
| ThreadErrorEvent;
|
||||
|
||||
@@ -4,7 +4,14 @@ import readline from "node:readline";
|
||||
import { createRequire } from "node:module";
|
||||
|
||||
import type { CodexConfigObject, CodexConfigValue } from "./codexOptions";
|
||||
import { SandboxMode, ModelReasoningEffort, ApprovalMode, WebSearchMode } from "./threadOptions";
|
||||
import {
|
||||
SandboxMode,
|
||||
ModelReasoningEffort,
|
||||
ApprovalMode,
|
||||
WebSearchMode,
|
||||
CollaborationMode,
|
||||
} from "./threadOptions";
|
||||
import type { RequestUserInputResponse } from "./events";
|
||||
|
||||
export type CodexExecArgs = {
|
||||
input: string;
|
||||
@@ -37,6 +44,18 @@ export type CodexExecArgs = {
|
||||
webSearchEnabled?: boolean;
|
||||
// --config approval_policy
|
||||
approvalPolicy?: ApprovalMode;
|
||||
// --collaboration-mode
|
||||
collaborationMode?: CollaborationMode;
|
||||
};
|
||||
|
||||
type CodexControlMessage = {
|
||||
type: "user_input_answer";
|
||||
id: string;
|
||||
response: RequestUserInputResponse;
|
||||
};
|
||||
|
||||
export type CodexExecRun = AsyncIterable<string> & {
|
||||
sendControl: (message: CodexControlMessage) => void;
|
||||
};
|
||||
|
||||
const INTERNAL_ORIGINATOR_ENV = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE";
|
||||
@@ -69,7 +88,7 @@ export class CodexExec {
|
||||
this.configOverrides = configOverrides;
|
||||
}
|
||||
|
||||
async *run(args: CodexExecArgs): AsyncGenerator<string> {
|
||||
run(args: CodexExecArgs): CodexExecRun {
|
||||
const commandArgs: string[] = ["exec", "--experimental-json"];
|
||||
|
||||
if (this.configOverrides) {
|
||||
@@ -127,6 +146,10 @@ export class CodexExec {
|
||||
commandArgs.push("--config", `approval_policy="${args.approvalPolicy}"`);
|
||||
}
|
||||
|
||||
if (args.collaborationMode) {
|
||||
commandArgs.push("--collaboration-mode", args.collaborationMode);
|
||||
}
|
||||
|
||||
if (args.threadId) {
|
||||
commandArgs.push("resume", args.threadId);
|
||||
}
|
||||
@@ -137,6 +160,9 @@ export class CodexExec {
|
||||
}
|
||||
}
|
||||
|
||||
// Keep stdin available for control messages while passing the initial prompt as a positional arg.
|
||||
commandArgs.push("--", args.input);
|
||||
|
||||
const env: Record<string, string> = {};
|
||||
if (this.envOverride) {
|
||||
Object.assign(env, this.envOverride);
|
||||
@@ -169,9 +195,6 @@ export class CodexExec {
|
||||
child.kill();
|
||||
throw new Error("Child process has no stdin");
|
||||
}
|
||||
child.stdin.write(args.input);
|
||||
child.stdin.end();
|
||||
|
||||
if (!child.stdout) {
|
||||
child.kill();
|
||||
throw new Error("Child process has no stdout");
|
||||
@@ -197,28 +220,51 @@ export class CodexExec {
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
try {
|
||||
for await (const line of rl) {
|
||||
// `line` is a string (Node sets default encoding to utf8 for readline)
|
||||
yield line as string;
|
||||
}
|
||||
|
||||
if (spawnError) throw spawnError;
|
||||
const { code, signal } = await exitPromise;
|
||||
if (code !== 0 || signal) {
|
||||
const stderrBuffer = Buffer.concat(stderrChunks);
|
||||
const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`;
|
||||
throw new Error(`Codex Exec exited with ${detail}: ${stderrBuffer.toString("utf8")}`);
|
||||
}
|
||||
} finally {
|
||||
rl.close();
|
||||
child.removeAllListeners();
|
||||
const generator = (async function* () {
|
||||
try {
|
||||
if (!child.killed) child.kill();
|
||||
} catch {
|
||||
// ignore
|
||||
for await (const line of rl) {
|
||||
// `line` is a string (Node sets default encoding to utf8 for readline)
|
||||
yield line as string;
|
||||
}
|
||||
|
||||
if (spawnError) throw spawnError;
|
||||
const { code, signal } = await exitPromise;
|
||||
if (code !== 0 || signal) {
|
||||
const stderrBuffer = Buffer.concat(stderrChunks);
|
||||
const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`;
|
||||
throw new Error(`Codex Exec exited with ${detail}: ${stderrBuffer.toString("utf8")}`);
|
||||
}
|
||||
} finally {
|
||||
rl.close();
|
||||
child.removeAllListeners();
|
||||
try {
|
||||
if (child.stdin && !child.stdin.destroyed) {
|
||||
child.stdin.end();
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
if (!child.killed) child.kill();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
const sendControl = (message: CodexControlMessage): void => {
|
||||
if (!child.stdin || child.stdin.destroyed || child.killed) {
|
||||
throw new Error("Codex Exec is not accepting control messages");
|
||||
}
|
||||
child.stdin.write(`${JSON.stringify(message)}\n`);
|
||||
};
|
||||
|
||||
return {
|
||||
sendControl,
|
||||
[Symbol.asyncIterator](): AsyncGenerator<string> {
|
||||
return generator;
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,11 @@ export type {
|
||||
ItemStartedEvent,
|
||||
ItemUpdatedEvent,
|
||||
ItemCompletedEvent,
|
||||
RequestUserInputEvent,
|
||||
RequestUserInputQuestion,
|
||||
RequestUserInputQuestionOption,
|
||||
RequestUserInputResponse,
|
||||
PlanDeltaEvent,
|
||||
ThreadError,
|
||||
ThreadErrorEvent,
|
||||
Usage,
|
||||
@@ -14,6 +19,7 @@ export type {
|
||||
export type {
|
||||
ThreadItem,
|
||||
AgentMessageItem,
|
||||
PlanItem,
|
||||
ReasoningItem,
|
||||
CommandExecutionItem,
|
||||
FileChangeItem,
|
||||
@@ -36,5 +42,6 @@ export type {
|
||||
SandboxMode,
|
||||
ModelReasoningEffort,
|
||||
WebSearchMode,
|
||||
CollaborationMode,
|
||||
} from "./threadOptions";
|
||||
export type { TurnOptions } from "./turnOptions";
|
||||
|
||||
@@ -78,6 +78,13 @@ export type AgentMessageItem = {
|
||||
text: string;
|
||||
};
|
||||
|
||||
/** Proposed implementation plan emitted in plan mode. */
|
||||
export type PlanItem = {
|
||||
id: string;
|
||||
type: "plan";
|
||||
text: string;
|
||||
};
|
||||
|
||||
/** Agent's reasoning summary. */
|
||||
export type ReasoningItem = {
|
||||
id: string;
|
||||
@@ -118,6 +125,7 @@ export type TodoListItem = {
|
||||
/** Canonical union of thread items and their type-specific payloads. */
|
||||
export type ThreadItem =
|
||||
| AgentMessageItem
|
||||
| PlanItem
|
||||
| ReasoningItem
|
||||
| CommandExecutionItem
|
||||
| FileChangeItem
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { CodexOptions } from "./codexOptions";
|
||||
import { ThreadEvent, ThreadError, Usage } from "./events";
|
||||
import { RequestUserInputResponse, ThreadEvent, ThreadError, Usage } from "./events";
|
||||
import { CodexExec } from "./exec";
|
||||
import { ThreadItem } from "./items";
|
||||
import { ThreadOptions } from "./threadOptions";
|
||||
@@ -43,6 +43,8 @@ export class Thread {
|
||||
private _options: CodexOptions;
|
||||
private _id: string | null;
|
||||
private _threadOptions: ThreadOptions;
|
||||
private _sendUserInputAnswer: ((id: string, response: RequestUserInputResponse) => void) | null =
|
||||
null;
|
||||
|
||||
/** Returns the ID of the thread. Populated after the first turn starts. */
|
||||
public get id(): string | null {
|
||||
@@ -67,6 +69,14 @@ export class Thread {
|
||||
return { events: this.runStreamedInternal(input, turnOptions) };
|
||||
}
|
||||
|
||||
/** Sends an answer for an in-flight request_user_input prompt. */
|
||||
answerUserInput(id: string, response: RequestUserInputResponse): void {
|
||||
if (!this._sendUserInputAnswer) {
|
||||
throw new Error("No active turn is awaiting request_user_input answers");
|
||||
}
|
||||
this._sendUserInputAnswer(id, response);
|
||||
}
|
||||
|
||||
private async *runStreamedInternal(
|
||||
input: Input,
|
||||
turnOptions: TurnOptions = {},
|
||||
@@ -74,7 +84,7 @@ export class Thread {
|
||||
const { schemaPath, cleanup } = await createOutputSchemaFile(turnOptions.outputSchema);
|
||||
const options = this._threadOptions;
|
||||
const { prompt, images } = normalizeInput(input);
|
||||
const generator = this._exec.run({
|
||||
const run = this._exec.run({
|
||||
input: prompt,
|
||||
baseUrl: this._options.baseUrl,
|
||||
apiKey: this._options.apiKey,
|
||||
@@ -92,9 +102,17 @@ export class Thread {
|
||||
webSearchEnabled: options?.webSearchEnabled,
|
||||
approvalPolicy: options?.approvalPolicy,
|
||||
additionalDirectories: options?.additionalDirectories,
|
||||
collaborationMode: options?.collaborationMode,
|
||||
});
|
||||
this._sendUserInputAnswer = (id: string, response: RequestUserInputResponse) => {
|
||||
run.sendControl({
|
||||
type: "user_input_answer",
|
||||
id,
|
||||
response,
|
||||
});
|
||||
};
|
||||
try {
|
||||
for await (const item of generator) {
|
||||
for await (const item of run) {
|
||||
let parsed: ThreadEvent;
|
||||
try {
|
||||
parsed = JSON.parse(item) as ThreadEvent;
|
||||
@@ -107,6 +125,7 @@ export class Thread {
|
||||
yield parsed;
|
||||
}
|
||||
} finally {
|
||||
this._sendUserInputAnswer = null;
|
||||
await cleanup();
|
||||
}
|
||||
}
|
||||
@@ -119,9 +138,23 @@ export class Thread {
|
||||
let usage: Usage | null = null;
|
||||
let turnFailure: ThreadError | null = null;
|
||||
for await (const event of generator) {
|
||||
if (event.type === "item.completed") {
|
||||
if (event.type === "request_user_input") {
|
||||
if (!turnOptions.onRequestUserInput) {
|
||||
throw new Error(
|
||||
"Turn requested user input but no onRequestUserInput handler was provided",
|
||||
);
|
||||
}
|
||||
const response = await turnOptions.onRequestUserInput({
|
||||
id: event.id,
|
||||
call_id: event.call_id,
|
||||
questions: event.questions,
|
||||
});
|
||||
this.answerUserInput(event.id, response);
|
||||
} else if (event.type === "item.completed") {
|
||||
if (event.item.type === "agent_message") {
|
||||
finalResponse = event.item.text;
|
||||
} else if (event.item.type === "plan" && finalResponse === "") {
|
||||
finalResponse = event.item.text;
|
||||
}
|
||||
items.push(event.item);
|
||||
} else if (event.type === "turn.completed") {
|
||||
|
||||
@@ -6,6 +6,8 @@ export type ModelReasoningEffort = "minimal" | "low" | "medium" | "high" | "xhig
|
||||
|
||||
export type WebSearchMode = "disabled" | "cached" | "live";
|
||||
|
||||
export type CollaborationMode = "default" | "plan";
|
||||
|
||||
export type ThreadOptions = {
|
||||
model?: string;
|
||||
sandboxMode?: SandboxMode;
|
||||
@@ -17,4 +19,5 @@ export type ThreadOptions = {
|
||||
webSearchEnabled?: boolean;
|
||||
approvalPolicy?: ApprovalMode;
|
||||
additionalDirectories?: string[];
|
||||
collaborationMode?: CollaborationMode;
|
||||
};
|
||||
|
||||
@@ -1,6 +1,15 @@
|
||||
import type { RequestUserInputEvent, RequestUserInputResponse } from "./events";
|
||||
|
||||
export type TurnOptions = {
|
||||
/** JSON schema describing the expected agent output. */
|
||||
outputSchema?: unknown;
|
||||
/** AbortSignal to cancel the turn. */
|
||||
signal?: AbortSignal;
|
||||
/**
|
||||
* Optional handler for request_user_input prompts received during `run()`.
|
||||
* If omitted and the turn asks for input, `run()` throws.
|
||||
*/
|
||||
onRequestUserInput?: (
|
||||
request: Omit<RequestUserInputEvent, "type">,
|
||||
) => Promise<RequestUserInputResponse>;
|
||||
};
|
||||
|
||||
@@ -157,7 +157,11 @@ describe("AbortSignal support", () => {
|
||||
const result = await thread.run("Hello, world!", { signal: controller.signal });
|
||||
|
||||
expect(result.finalResponse).toBe("Hi!");
|
||||
expect(result.items).toHaveLength(1);
|
||||
expect(result.items).toContainEqual({
|
||||
id: expect.any(String),
|
||||
type: "agent_message",
|
||||
text: "Hi!",
|
||||
});
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
|
||||
@@ -32,14 +32,11 @@ describe("Codex", () => {
|
||||
const thread = client.startThread();
|
||||
const result = await thread.run("Hello, world!");
|
||||
|
||||
const expectedItems = [
|
||||
{
|
||||
id: expect.any(String),
|
||||
type: "agent_message",
|
||||
text: "Hi!",
|
||||
},
|
||||
];
|
||||
expect(result.items).toEqual(expectedItems);
|
||||
expect(result.items).toContainEqual({
|
||||
id: expect.any(String),
|
||||
type: "agent_message",
|
||||
text: "Hi!",
|
||||
});
|
||||
expect(result.usage).toEqual({
|
||||
cached_input_tokens: 12,
|
||||
input_tokens: 42,
|
||||
@@ -49,7 +46,7 @@ describe("Codex", () => {
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
});
|
||||
}, 15000);
|
||||
|
||||
it("sends previous items when run is called twice", async () => {
|
||||
const { url, close, requests } = await startResponsesTestProxy({
|
||||
@@ -410,6 +407,37 @@ describe("Codex", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("passes collaborationMode to exec", async () => {
|
||||
const { url, close } = await startResponsesTestProxy({
|
||||
statusCode: 200,
|
||||
responseBodies: [
|
||||
sse(
|
||||
responseStarted("response_1"),
|
||||
assistantMessage("Collaboration mode set", "item_1"),
|
||||
responseCompleted("response_1"),
|
||||
),
|
||||
],
|
||||
});
|
||||
|
||||
const { args: spawnArgs, restore } = codexExecSpy();
|
||||
|
||||
try {
|
||||
const client = new Codex({ codexPathOverride: codexExecPath, baseUrl: url, apiKey: "test" });
|
||||
|
||||
const thread = client.startThread({
|
||||
collaborationMode: "plan",
|
||||
});
|
||||
await thread.run("test collaboration mode");
|
||||
|
||||
const commandArgs = spawnArgs[0];
|
||||
expect(commandArgs).toBeDefined();
|
||||
expectPair(commandArgs, ["--collaboration-mode", "plan"]);
|
||||
} finally {
|
||||
restore();
|
||||
await close();
|
||||
}
|
||||
});
|
||||
|
||||
it("passes CodexOptions config overrides as TOML --config flags", async () => {
|
||||
const { url, close } = await startResponsesTestProxy({
|
||||
statusCode: 200,
|
||||
|
||||
@@ -33,31 +33,29 @@ describe("Codex", () => {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
expect(events).toEqual([
|
||||
{
|
||||
type: "thread.started",
|
||||
thread_id: expect.any(String),
|
||||
expect(events).toContainEqual({
|
||||
type: "thread.started",
|
||||
thread_id: expect.any(String),
|
||||
});
|
||||
expect(events).toContainEqual({
|
||||
type: "turn.started",
|
||||
});
|
||||
expect(events).toContainEqual({
|
||||
type: "item.completed",
|
||||
item: {
|
||||
id: expect.any(String),
|
||||
type: "agent_message",
|
||||
text: "Hi!",
|
||||
},
|
||||
{
|
||||
type: "turn.started",
|
||||
});
|
||||
expect(events).toContainEqual({
|
||||
type: "turn.completed",
|
||||
usage: {
|
||||
cached_input_tokens: 12,
|
||||
input_tokens: 42,
|
||||
output_tokens: 5,
|
||||
},
|
||||
{
|
||||
type: "item.completed",
|
||||
item: {
|
||||
id: "item_0",
|
||||
type: "agent_message",
|
||||
text: "Hi!",
|
||||
},
|
||||
},
|
||||
{
|
||||
type: "turn.completed",
|
||||
usage: {
|
||||
cached_input_tokens: 12,
|
||||
input_tokens: 42,
|
||||
output_tokens: 5,
|
||||
},
|
||||
},
|
||||
]);
|
||||
});
|
||||
expect(thread.id).toEqual(expect.any(String));
|
||||
} finally {
|
||||
await close();
|
||||
|
||||
Reference in New Issue
Block a user