Add plan collaboration MVP to SDK and exec

This commit is contained in:
Charles Cunningham
2026-02-19 20:49:36 -08:00
parent e4456840f5
commit 08a1f30ca2
15 changed files with 549 additions and 64 deletions

View File

@@ -11,6 +11,7 @@ pub mod event_processor_with_jsonl_output;
pub mod exec_events;
pub use cli::Cli;
pub use cli::CollaborationModeCliArg;
pub use cli::Command;
pub use cli::ReviewArgs;
use codex_cloud_requirements::cloud_requirements_loader;
@@ -40,7 +41,11 @@ use codex_core::protocol::ReviewRequest;
use codex_core::protocol::ReviewTarget;
use codex_core::protocol::SessionSource;
use codex_protocol::approvals::ElicitationAction;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Settings;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_oss::ensure_oss_provider_ready;
@@ -54,6 +59,7 @@ use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use supports_color::Stream;
use tokio::io::AsyncBufReadExt;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::error;
@@ -88,6 +94,15 @@ struct ThreadEventEnvelope {
event: Event,
}
#[derive(Debug, serde::Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ControlMessage {
UserInputAnswer {
id: String,
response: RequestUserInputResponse,
},
}
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
@@ -97,6 +112,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
command,
images,
model: model_cli_arg,
collaboration_mode: collaboration_mode_cli_arg,
oss,
oss_provider,
config_profile,
@@ -379,6 +395,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
.get_models_manager()
.get_default_model(&config.model, RefreshStrategy::OnlineIfUncached)
.await;
let default_collaboration_mode = collaboration_mode_cli_arg.map(|mode| {
let collaboration_mode = CollaborationMode {
mode: mode.into(),
settings: Settings {
model: default_model.clone(),
reasoning_effort: default_effort,
developer_instructions: None,
},
};
normalize_collaboration_mode_with_preset(
collaboration_mode,
thread_manager.list_collaboration_modes(),
)
});
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewThread {
@@ -479,6 +509,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
});
}
let mut control_lines = if json_mode {
Some(tokio::io::BufReader::new(tokio::io::stdin()).lines())
} else {
None
};
{
let thread_manager = Arc::clone(&thread_manager);
let attached_threads = Arc::clone(&attached_threads);
@@ -525,7 +561,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
effort: default_effort,
summary: default_summary,
final_output_json_schema: output_schema,
collaboration_mode: None,
collaboration_mode: default_collaboration_mode.clone(),
personality: None,
})
.await?;
@@ -568,6 +604,31 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
})
.await?;
}
if !json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
// Non-JSON exec has no interactive request_user_input UI. Return an empty response
// so the turn can continue instead of hanging indefinitely.
thread
.submit(Op::UserInputAnswer {
id: ev.turn_id.clone(),
response: RequestUserInputResponse {
answers: Default::default(),
},
})
.await?;
}
if json_mode && let EventMsg::RequestUserInput(ev) = &event.msg {
let response = read_user_input_answer(&mut control_lines, &ev.turn_id)
.await
.unwrap_or_else(|| RequestUserInputResponse {
answers: Default::default(),
});
thread
.submit(Op::UserInputAnswer {
id: ev.turn_id.clone(),
response,
})
.await?;
}
if let EventMsg::McpStartupUpdate(update) = &event.msg
&& required_mcp_servers.contains(&update.server)
&& let codex_core::protocol::McpStartupStatus::Failed { error } = &update.status
@@ -645,6 +706,69 @@ fn spawn_thread_listener(
});
}
type ControlLines = tokio::io::Lines<tokio::io::BufReader<tokio::io::Stdin>>;
async fn read_user_input_answer(
control_lines: &mut Option<ControlLines>,
expected_id: &str,
) -> Option<RequestUserInputResponse> {
let Some(control_lines) = control_lines.as_mut() else {
return None;
};
loop {
match control_lines.next_line().await {
Ok(Some(line)) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<ControlMessage>(trimmed) {
Ok(ControlMessage::UserInputAnswer { id, response }) => {
if id == expected_id {
return Some(response);
}
warn!("ignoring user_input_answer for id {id}; expected {expected_id}");
}
Err(err) => {
warn!("invalid control message: {err}");
}
}
}
Ok(None) => return None,
Err(err) => {
warn!("failed reading control stdin: {err}");
return None;
}
}
}
}
fn normalize_collaboration_mode_with_preset(
mut collaboration_mode: CollaborationMode,
presets: Vec<codex_protocol::config_types::CollaborationModeMask>,
) -> CollaborationMode {
if collaboration_mode.settings.developer_instructions.is_none()
&& let Some(instructions) = presets
.into_iter()
.find(|preset| preset.mode == Some(collaboration_mode.mode))
.and_then(|preset| preset.developer_instructions.flatten())
.filter(|instructions| !instructions.is_empty())
{
collaboration_mode.settings.developer_instructions = Some(instructions);
}
collaboration_mode
}
impl From<CollaborationModeCliArg> for ModeKind {
fn from(value: CollaborationModeCliArg) -> Self {
match value {
CollaborationModeCliArg::Default => ModeKind::Default,
CollaborationModeCliArg::Plan => ModeKind::Plan,
}
}
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,