diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 3cd7b9ad97..c40cc3f92e 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1841,6 +1841,8 @@ dependencies = [ "codex-client", "codex-config", "codex-connectors", + "codex-environment", + "codex-exec-server", "codex-execpolicy", "codex-file-search", "codex-git", diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index ef6b8a0132..d116d66e14 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -34,6 +34,8 @@ codex-async-utils = { workspace = true } codex-client = { workspace = true } codex-connectors = { workspace = true } codex-config = { workspace = true } +codex-environment = { workspace = true } +codex-exec-server = { path = "../exec-server" } codex-shell-command = { workspace = true } codex-skills = { workspace = true } codex-execpolicy = { workspace = true } diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 2235315d43..fc3bf1f3ed 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -321,6 +321,12 @@ "experimental_compact_prompt_file": { "$ref": "#/definitions/AbsolutePathBuf" }, + "experimental_unified_exec_spawn_local_exec_server": { + "type": "boolean" + }, + "experimental_unified_exec_use_exec_server": { + "type": "boolean" + }, "experimental_use_freeform_apply_patch": { "type": "boolean" }, @@ -1873,6 +1879,14 @@ "description": "Experimental / do not use. Replaces the synthesized realtime startup context appended to websocket session instructions. An empty string disables startup context injection entirely.", "type": "string" }, + "experimental_unified_exec_spawn_local_exec_server": { + "description": "When `true`, start a session-scoped local `codex-exec-server` subprocess during session startup and route unified-exec calls through it.", + "type": "boolean" + }, + "experimental_unified_exec_use_exec_server": { + "description": "When `true`, route unified-exec process launches through `codex-exec-server` instead of spawning them directly in-process.", + "type": "boolean" + }, "experimental_use_freeform_apply_patch": { "type": "boolean" }, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d30e5a3eaf..9c2f47d520 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -308,6 +308,7 @@ use crate::turn_timing::TurnTimingState; use crate::turn_timing::record_turn_ttfm_metric; use crate::turn_timing::record_turn_ttft_metric; use crate::unified_exec::UnifiedExecProcessManager; +use crate::unified_exec::unified_exec_session_factory_for_config; use crate::util::backoff; use crate::windows_sandbox::WindowsSandboxLevelExt; use codex_async_utils::OrCancelExt; @@ -1741,6 +1742,8 @@ impl Session { }); } + let unified_exec_session_factory = + unified_exec_session_factory_for_config(config.as_ref(), None).await?; let services = SessionServices { // Initialize the MCP connection manager with an uninitialized // instance. It will be replaced with one created via @@ -1753,8 +1756,9 @@ impl Session { &config.permissions.approval_policy, ))), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), - unified_exec_manager: UnifiedExecProcessManager::new( + unified_exec_manager: UnifiedExecProcessManager::with_session_factory( config.background_terminal_max_timeout, + unified_exec_session_factory, ), shell_zsh_path: config.zsh_path.clone(), main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(), diff --git a/codex-rs/core/src/config/config_tests.rs b/codex-rs/core/src/config/config_tests.rs index 7e82bd7da9..7316212db9 100644 --- a/codex-rs/core/src/config/config_tests.rs +++ b/codex-rs/core/src/config/config_tests.rs @@ -1695,6 +1695,29 @@ fn legacy_toggles_map_to_features() -> std::io::Result<()> { assert!(config.include_apply_patch_tool); assert!(config.use_experimental_unified_exec_tool); + assert!(!config.experimental_unified_exec_use_exec_server); + assert!(!config.experimental_unified_exec_spawn_local_exec_server); + + Ok(()) +} + +#[test] +fn unified_exec_exec_server_flags_load_from_config() -> std::io::Result<()> { + let codex_home = TempDir::new()?; + let cfg = ConfigToml { + experimental_unified_exec_use_exec_server: Some(true), + experimental_unified_exec_spawn_local_exec_server: Some(true), + ..Default::default() + }; + + let config = Config::load_from_base_config_with_overrides( + cfg, + ConfigOverrides::default(), + codex_home.path().to_path_buf(), + )?; + + assert!(config.experimental_unified_exec_use_exec_server); + assert!(config.experimental_unified_exec_spawn_local_exec_server); Ok(()) } @@ -4262,6 +4285,8 @@ fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> { web_search_mode: Constrained::allow_any(WebSearchMode::Cached), web_search_config: None, use_experimental_unified_exec_tool: !cfg!(windows), + experimental_unified_exec_use_exec_server: false, + experimental_unified_exec_spawn_local_exec_server: false, background_terminal_max_timeout: DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS, ghost_snapshot: GhostSnapshotConfig::default(), features: Features::with_defaults().into(), @@ -4401,6 +4426,8 @@ fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> { web_search_mode: Constrained::allow_any(WebSearchMode::Cached), web_search_config: None, use_experimental_unified_exec_tool: !cfg!(windows), + experimental_unified_exec_use_exec_server: false, + experimental_unified_exec_spawn_local_exec_server: false, background_terminal_max_timeout: DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS, ghost_snapshot: GhostSnapshotConfig::default(), features: Features::with_defaults().into(), @@ -4538,6 +4565,8 @@ fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> { web_search_mode: Constrained::allow_any(WebSearchMode::Cached), web_search_config: None, use_experimental_unified_exec_tool: !cfg!(windows), + experimental_unified_exec_use_exec_server: false, + experimental_unified_exec_spawn_local_exec_server: false, background_terminal_max_timeout: DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS, ghost_snapshot: GhostSnapshotConfig::default(), features: Features::with_defaults().into(), @@ -4661,6 +4690,8 @@ fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> { web_search_mode: Constrained::allow_any(WebSearchMode::Cached), web_search_config: None, use_experimental_unified_exec_tool: !cfg!(windows), + experimental_unified_exec_use_exec_server: false, + experimental_unified_exec_spawn_local_exec_server: false, background_terminal_max_timeout: DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS, ghost_snapshot: GhostSnapshotConfig::default(), features: Features::with_defaults().into(), diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 04c06ca8d7..dbb2fb1344 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -515,6 +515,14 @@ pub struct Config { /// If set to `true`, used only the experimental unified exec tool. pub use_experimental_unified_exec_tool: bool, + /// When `true`, route unified-exec process launches through `codex-exec-server` + /// instead of spawning them directly in-process. + pub experimental_unified_exec_use_exec_server: bool, + + /// When `true`, start a session-scoped local `codex-exec-server` subprocess + /// during session startup and route unified-exec calls through it. + pub experimental_unified_exec_spawn_local_exec_server: bool, + /// Maximum poll window for background terminal output (`write_stdin`), in milliseconds. /// Default: `300000` (5 minutes). pub background_terminal_max_timeout: u64, @@ -1298,6 +1306,14 @@ pub struct ConfigToml { /// Default: `300000` (5 minutes). pub background_terminal_max_timeout: Option, + /// When `true`, route unified-exec process launches through `codex-exec-server` + /// instead of spawning them directly in-process. + pub experimental_unified_exec_use_exec_server: Option, + + /// When `true`, start a session-scoped local `codex-exec-server` subprocess + /// during session startup and route unified-exec calls through it. + pub experimental_unified_exec_spawn_local_exec_server: Option, + /// Optional absolute path to the Node runtime used by `js_repl`. pub js_repl_node_path: Option, @@ -2426,6 +2442,14 @@ impl Config { let include_apply_patch_tool_flag = features.enabled(Feature::ApplyPatchFreeform); let use_experimental_unified_exec_tool = features.enabled(Feature::UnifiedExec); + let experimental_unified_exec_use_exec_server = config_profile + .experimental_unified_exec_use_exec_server + .or(cfg.experimental_unified_exec_use_exec_server) + .unwrap_or(false); + let experimental_unified_exec_spawn_local_exec_server = config_profile + .experimental_unified_exec_spawn_local_exec_server + .or(cfg.experimental_unified_exec_spawn_local_exec_server) + .unwrap_or(false); let forced_chatgpt_workspace_id = cfg.forced_chatgpt_workspace_id.as_ref().and_then(|value| { @@ -2717,6 +2741,8 @@ impl Config { web_search_mode: constrained_web_search_mode.value, web_search_config, use_experimental_unified_exec_tool, + experimental_unified_exec_use_exec_server, + experimental_unified_exec_spawn_local_exec_server, background_terminal_max_timeout, ghost_snapshot, features, diff --git a/codex-rs/core/src/config/profile.rs b/codex-rs/core/src/config/profile.rs index 743830ab32..fc986779ae 100644 --- a/codex-rs/core/src/config/profile.rs +++ b/codex-rs/core/src/config/profile.rs @@ -49,6 +49,8 @@ pub struct ConfigProfile { pub experimental_compact_prompt_file: Option, pub include_apply_patch_tool: Option, pub experimental_use_unified_exec_tool: Option, + pub experimental_unified_exec_use_exec_server: Option, + pub experimental_unified_exec_spawn_local_exec_server: Option, pub experimental_use_freeform_apply_patch: Option, pub tools_view_image: Option, pub tools: Option, diff --git a/codex-rs/core/src/tools/runtimes/unified_exec.rs b/codex-rs/core/src/tools/runtimes/unified_exec.rs index 7f0032c8d4..178c519769 100644 --- a/codex-rs/core/src/tools/runtimes/unified_exec.rs +++ b/codex-rs/core/src/tools/runtimes/unified_exec.rs @@ -46,6 +46,7 @@ use std::path::PathBuf; #[derive(Clone, Debug)] pub struct UnifiedExecRequest { + pub process_id: i32, pub command: Vec, pub cwd: PathBuf, pub env: HashMap, @@ -239,6 +240,7 @@ impl<'a> ToolRuntime for UnifiedExecRunt return self .manager .open_session_with_exec_env( + req.process_id, &prepared.exec_request, req.tty, prepared.spawn_lifecycle, @@ -275,7 +277,12 @@ impl<'a> ToolRuntime for UnifiedExecRunt .env_for(spec, req.network.as_ref()) .map_err(|err| ToolError::Codex(err.into()))?; self.manager - .open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle)) + .open_session_with_exec_env( + req.process_id, + &exec_env, + req.tty, + Box::new(NoopSpawnLifecycle), + ) .await .map_err(|err| match err { UnifiedExecError::SandboxDenied { output, .. } => { diff --git a/codex-rs/core/src/unified_exec/backend.rs b/codex-rs/core/src/unified_exec/backend.rs index 14cacb7a81..6892d887cd 100644 --- a/codex-rs/core/src/unified_exec/backend.rs +++ b/codex-rs/core/src/unified_exec/backend.rs @@ -1,7 +1,16 @@ +use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; +use codex_exec_server::ExecServerClient; +use codex_exec_server::ExecServerClientConnectOptions; +use codex_exec_server::ExecServerLaunchCommand; +use codex_exec_server::SpawnedExecServer; +use codex_exec_server::spawn_local_exec_server; +use tracing::debug; +use crate::config::Config; +use crate::exec::SandboxType; use crate::sandboxing::ExecRequest; use crate::unified_exec::SpawnLifecycleHandle; use crate::unified_exec::UnifiedExecError; @@ -13,6 +22,7 @@ pub(crate) type UnifiedExecSessionFactoryHandle = Arc UnifiedExecSessionFactoryH impl UnifiedExecSessionFactory for LocalUnifiedExecSessionFactory { async fn open_session( &self, + _process_id: i32, env: &ExecRequest, tty: bool, spawn_lifecycle: SpawnLifecycleHandle, @@ -38,6 +49,117 @@ impl UnifiedExecSessionFactory for LocalUnifiedExecSessionFactory { } } +pub(crate) struct ExecServerUnifiedExecSessionFactory { + client: ExecServerClient, + _spawned_server: Option>, +} + +impl std::fmt::Debug for ExecServerUnifiedExecSessionFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecServerUnifiedExecSessionFactory") + .field("owns_spawned_server", &self._spawned_server.is_some()) + .finish_non_exhaustive() + } +} + +impl ExecServerUnifiedExecSessionFactory { + pub(crate) fn from_client(client: ExecServerClient) -> UnifiedExecSessionFactoryHandle { + Arc::new(Self { + client, + _spawned_server: None, + }) + } + + pub(crate) fn from_spawned_server( + spawned_server: Arc, + ) -> UnifiedExecSessionFactoryHandle { + Arc::new(Self { + client: spawned_server.client().clone(), + _spawned_server: Some(spawned_server), + }) + } +} + +#[async_trait] +impl UnifiedExecSessionFactory for ExecServerUnifiedExecSessionFactory { + async fn open_session( + &self, + process_id: i32, + env: &ExecRequest, + tty: bool, + spawn_lifecycle: SpawnLifecycleHandle, + ) -> Result { + let inherited_fds = spawn_lifecycle.inherited_fds(); + if !inherited_fds.is_empty() { + debug!( + process_id, + inherited_fd_count = inherited_fds.len(), + "falling back to local unified-exec backend because exec-server does not support inherited fds", + ); + return open_local_session(env, tty, spawn_lifecycle).await; + } + + if env.sandbox == SandboxType::WindowsRestrictedToken { + debug!( + process_id, + "falling back to local unified-exec backend because Windows restricted-token execution is not modeled by exec-server", + ); + return open_local_session(env, tty, spawn_lifecycle).await; + } + + UnifiedExecProcess::from_exec_server( + self.client.clone(), + process_id, + env, + tty, + spawn_lifecycle, + ) + .await + } +} + +pub(crate) async fn unified_exec_session_factory_for_config( + config: &Config, + local_exec_server_command: Option, +) -> Result { + if !config.experimental_unified_exec_use_exec_server { + return Ok(local_unified_exec_session_factory()); + } + + if config.experimental_unified_exec_spawn_local_exec_server { + let command = local_exec_server_command.unwrap_or_else(default_local_exec_server_command); + let spawned_server = + spawn_local_exec_server(command, ExecServerClientConnectOptions::default()) + .await + .map_err(|err| UnifiedExecError::create_process(err.to_string()))?; + return Ok(ExecServerUnifiedExecSessionFactory::from_spawned_server( + Arc::new(spawned_server), + )); + } + + let client = ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()) + .await + .map_err(|err| UnifiedExecError::create_process(err.to_string()))?; + Ok(ExecServerUnifiedExecSessionFactory::from_client(client)) +} + +fn default_local_exec_server_command() -> ExecServerLaunchCommand { + let binary_name = if cfg!(windows) { + "codex-exec-server.exe" + } else { + "codex-exec-server" + }; + let program = std::env::current_exe() + .ok() + .map(|current_exe| current_exe.with_file_name(binary_name)) + .filter(|candidate| candidate.exists()) + .unwrap_or_else(|| PathBuf::from(binary_name)); + ExecServerLaunchCommand { + program, + args: Vec::new(), + } +} + async fn open_local_session( env: &ExecRequest, tty: bool, diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index baf03a329d..d797ff90dc 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -50,6 +50,7 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) { pub(crate) use backend::UnifiedExecSessionFactoryHandle; pub(crate) use backend::local_unified_exec_session_factory; +pub(crate) use backend::unified_exec_session_factory_for_config; pub(crate) use errors::UnifiedExecError; pub(crate) use process::NoopSpawnLifecycle; #[cfg(unix)] diff --git a/codex-rs/core/src/unified_exec/mod_tests.rs b/codex-rs/core/src/unified_exec/mod_tests.rs index c81d1329d5..271f7103b5 100644 --- a/codex-rs/core/src/unified_exec/mod_tests.rs +++ b/codex-rs/core/src/unified_exec/mod_tests.rs @@ -3,14 +3,27 @@ use super::*; use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::make_session_and_context; +use crate::config::ConfigBuilder; +use crate::config::ConfigOverrides; +use crate::exec::ExecExpiration; use crate::protocol::AskForApproval; use crate::protocol::SandboxPolicy; +use crate::sandboxing::ExecRequest; use crate::tools::context::ExecCommandToolOutput; use crate::unified_exec::ExecCommandRequest; use crate::unified_exec::WriteStdinRequest; +use codex_exec_server::ExecServerLaunchCommand; +use codex_protocol::config_types::WindowsSandboxLevel; +use codex_protocol::permissions::FileSystemSandboxPolicy; +use codex_protocol::permissions::NetworkSandboxPolicy; use core_test_support::skip_if_sandbox; +use std::collections::HashMap; +use std::path::PathBuf; +use std::process::Command; use std::sync::Arc; +use tempfile::TempDir; use tokio::time::Duration; +use toml::Value as TomlValue; async fn test_session_and_turn() -> (Arc, Arc) { let (session, mut turn) = make_session_and_context().await; @@ -82,6 +95,28 @@ async fn write_stdin( .await } +fn test_exec_request(command: Vec, cwd: &std::path::Path) -> ExecRequest { + let sandbox_policy = SandboxPolicy::DangerFullAccess; + let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy); + let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy); + ExecRequest { + command, + cwd: cwd.to_path_buf(), + env: HashMap::new(), + network: None, + expiration: ExecExpiration::Timeout(Duration::from_secs(5)), + sandbox: crate::exec::SandboxType::None, + windows_sandbox_level: WindowsSandboxLevel::default(), + windows_sandbox_private_desktop: false, + sandbox_permissions: SandboxPermissions::UseDefault, + sandbox_policy, + file_system_sandbox_policy, + network_sandbox_policy, + justification: None, + arg0: None, + } +} + #[test] fn push_chunk_preserves_prefix_and_suffix() { let mut buffer = HeadTailBuffer::default(); @@ -233,6 +268,93 @@ async fn unified_exec_timeouts() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_can_spawn_a_local_exec_server_backend() -> anyhow::Result<()> { + skip_if_sandbox!(Ok(())); + + let codex_home = TempDir::new()?; + let cwd = TempDir::new()?; + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .cli_overrides(vec![ + ( + "experimental_unified_exec_use_exec_server".to_string(), + TomlValue::Boolean(true), + ), + ( + "experimental_unified_exec_spawn_local_exec_server".to_string(), + TomlValue::Boolean(true), + ), + ]) + .harness_overrides(ConfigOverrides { + cwd: Some(cwd.path().to_path_buf()), + ..Default::default() + }) + .build() + .await?; + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .expect("core crate should be under codex-rs") + .to_path_buf(); + let cargo = PathBuf::from(env!("CARGO")); + let build_status = Command::new(&cargo) + .current_dir(&workspace_root) + .args([ + "build", + "-p", + "codex-exec-server", + "--bin", + "codex-exec-server", + ]) + .status()?; + assert!(build_status.success(), "failed to build codex-exec-server"); + let target_dir = std::env::var_os("CARGO_TARGET_DIR") + .map(PathBuf::from) + .unwrap_or_else(|| workspace_root.join("target")); + let binary_name = if cfg!(windows) { + "codex-exec-server.exe" + } else { + "codex-exec-server" + }; + let session_factory = unified_exec_session_factory_for_config( + &config, + Some(ExecServerLaunchCommand { + program: target_dir.join("debug").join(binary_name), + args: Vec::new(), + }), + ) + .await?; + let manager = UnifiedExecProcessManager::with_session_factory( + DEFAULT_MAX_BACKGROUND_TERMINAL_TIMEOUT_MS, + session_factory, + ); + let process = manager + .open_session_with_exec_env( + 1000, + &test_exec_request( + vec![ + "bash".to_string(), + "-c".to_string(), + "printf unified_exec_spawned_exec_server_backend_marker".to_string(), + ], + cwd.path(), + ), + false, + Box::new(NoopSpawnLifecycle), + ) + .await?; + let mut output_rx = process.output_receiver(); + let chunk = tokio::time::timeout(Duration::from_secs(5), output_rx.recv()).await??; + + assert_eq!( + String::from_utf8_lossy(&chunk), + "unified_exec_spawned_exec_server_backend_marker" + ); + + process.terminate(); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> { skip_if_sandbox!(Ok(())); diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index 6da7c739ec..0ebfe1b98e 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -1,6 +1,7 @@ #![allow(clippy::module_inception)] use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use tokio::sync::Mutex; @@ -16,8 +17,12 @@ use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::StreamOutput; use crate::exec::is_likely_sandbox_denied; +use crate::sandboxing::ExecRequest; use crate::truncate::TruncationPolicy; use crate::truncate::formatted_truncate_text; +use codex_exec_server::ExecParams; +use codex_exec_server::ExecServerClient; +use codex_exec_server::ExecServerEvent; use codex_utils_pty::ExecCommandSession; use codex_utils_pty::SpawnedPty; @@ -56,7 +61,7 @@ pub(crate) struct OutputHandles { #[derive(Debug)] pub(crate) struct UnifiedExecProcess { - process_handle: ExecCommandSession, + process_handle: ProcessBackend, output_rx: broadcast::Receiver>, output_buffer: OutputBuffer, output_notify: Arc, @@ -69,9 +74,45 @@ pub(crate) struct UnifiedExecProcess { _spawn_lifecycle: SpawnLifecycleHandle, } +enum ProcessBackend { + Local(ExecCommandSession), + Remote(RemoteExecSession), +} + +impl std::fmt::Debug for ProcessBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Local(process_handle) => f.debug_tuple("Local").field(process_handle).finish(), + Self::Remote(process_handle) => f.debug_tuple("Remote").field(process_handle).finish(), + } + } +} + +#[derive(Clone)] +struct RemoteExecSession { + process_key: String, + client: ExecServerClient, + writer_tx: mpsc::Sender>, + exited: Arc, + exit_code: Arc>>, +} + +impl std::fmt::Debug for RemoteExecSession { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RemoteExecSession") + .field("process_key", &self.process_key) + .field("exited", &self.exited.load(Ordering::SeqCst)) + .field( + "exit_code", + &self.exit_code.lock().ok().and_then(|guard| *guard), + ) + .finish_non_exhaustive() + } +} + impl UnifiedExecProcess { - pub(super) fn new( - process_handle: ExecCommandSession, + fn new( + process_handle: ProcessBackend, initial_output_rx: tokio::sync::broadcast::Receiver>, sandbox_type: SandboxType, spawn_lifecycle: SpawnLifecycleHandle, @@ -123,7 +164,10 @@ impl UnifiedExecProcess { } pub(super) fn writer_sender(&self) -> mpsc::Sender> { - self.process_handle.writer_sender() + match &self.process_handle { + ProcessBackend::Local(process_handle) => process_handle.writer_sender(), + ProcessBackend::Remote(process_handle) => process_handle.writer_tx.clone(), + } } pub(super) fn output_handles(&self) -> OutputHandles { @@ -149,17 +193,38 @@ impl UnifiedExecProcess { } pub(super) fn has_exited(&self) -> bool { - self.process_handle.has_exited() + match &self.process_handle { + ProcessBackend::Local(process_handle) => process_handle.has_exited(), + ProcessBackend::Remote(process_handle) => process_handle.exited.load(Ordering::SeqCst), + } } pub(super) fn exit_code(&self) -> Option { - self.process_handle.exit_code() + match &self.process_handle { + ProcessBackend::Local(process_handle) => process_handle.exit_code(), + ProcessBackend::Remote(process_handle) => process_handle + .exit_code + .lock() + .ok() + .and_then(|guard| *guard), + } } pub(super) fn terminate(&self) { self.output_closed.store(true, Ordering::Release); self.output_closed_notify.notify_waiters(); - self.process_handle.terminate(); + match &self.process_handle { + ProcessBackend::Local(process_handle) => process_handle.terminate(), + ProcessBackend::Remote(process_handle) => { + let client = process_handle.client.clone(); + let process_key = process_handle.process_key.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let _ = client.terminate(&process_key).await; + }); + } + } + } self.cancellation_token.cancel(); self.output_task.abort(); } @@ -232,7 +297,12 @@ impl UnifiedExecProcess { mut exit_rx, } = spawned; let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx); - let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle); + let managed = Self::new( + ProcessBackend::Local(process_handle), + output_rx, + sandbox_type, + spawn_lifecycle, + ); let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed)); @@ -262,6 +332,89 @@ impl UnifiedExecProcess { Ok(managed) } + pub(super) async fn from_exec_server( + client: ExecServerClient, + process_id: i32, + env: &ExecRequest, + tty: bool, + spawn_lifecycle: SpawnLifecycleHandle, + ) -> Result { + let process_key = process_id.to_string(); + let mut events_rx = client.event_receiver(); + client + .exec(ExecParams { + process_id: process_key.clone(), + argv: env.command.clone(), + cwd: env.cwd.clone(), + env: env.env.clone(), + tty, + arg0: env.arg0.clone(), + sandbox: None, + }) + .await + .map_err(|err| UnifiedExecError::create_process(err.to_string()))?; + + let (output_tx, output_rx) = broadcast::channel(256); + let (writer_tx, mut writer_rx) = mpsc::channel::>(256); + let exited = Arc::new(AtomicBool::new(false)); + let exit_code = Arc::new(StdMutex::new(None)); + + let managed = Self::new( + ProcessBackend::Remote(RemoteExecSession { + process_key: process_key.clone(), + client: client.clone(), + writer_tx, + exited: Arc::clone(&exited), + exit_code: Arc::clone(&exit_code), + }), + output_rx, + env.sandbox, + spawn_lifecycle, + ); + + { + let client = client.clone(); + tokio::spawn(async move { + while let Some(chunk) = writer_rx.recv().await { + if client.write(&process_key, chunk).await.is_err() { + break; + } + } + }); + } + + { + let process_key = process_id.to_string(); + let exited = Arc::clone(&exited); + let exit_code = Arc::clone(&exit_code); + let cancellation_token = managed.cancellation_token(); + tokio::spawn(async move { + while let Ok(event) = events_rx.recv().await { + match event { + ExecServerEvent::OutputDelta(notification) + if notification.process_id == process_key => + { + let _ = output_tx.send(notification.chunk.into_inner()); + } + ExecServerEvent::Exited(notification) + if notification.process_id == process_key => + { + exited.store(true, Ordering::SeqCst); + if let Ok(mut guard) = exit_code.lock() { + *guard = Some(notification.exit_code); + } + cancellation_token.cancel(); + break; + } + ExecServerEvent::OutputDelta(_) | ExecServerEvent::Exited(_) => {} + } + } + }); + } + + Ok(managed) + } + fn signal_exit(&self) { self.cancellation_token.cancel(); } diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index f0df2e0416..c12006d34a 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -539,12 +539,13 @@ impl UnifiedExecProcessManager { pub(crate) async fn open_session_with_exec_env( &self, + process_id: i32, env: &ExecRequest, tty: bool, spawn_lifecycle: SpawnLifecycleHandle, ) -> Result { self.session_factory - .open_session(env, tty, spawn_lifecycle) + .open_session(process_id, env, tty, spawn_lifecycle) .await } @@ -581,6 +582,7 @@ impl UnifiedExecProcessManager { }) .await; let req = UnifiedExecToolRequest { + process_id: request.process_id, command: request.command.clone(), cwd, env, diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 848e777502..45da183f45 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -269,6 +269,78 @@ async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_can_route_through_in_process_exec_server() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); + + let builder = test_codex().with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config.experimental_unified_exec_use_exec_server = true; + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let harness = TestCodexHarness::with_builder(builder).await?; + + let call_id = "uexec-exec-server-inprocess"; + let marker = "unified_exec_exec_server_inprocess_marker"; + let args = json!({ + "cmd": format!("printf {marker}"), + "yield_time_ms": 250, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(harness.server(), responses).await; + + let test = harness.test(); + let codex = test.codex.clone(); + let cwd = test.cwd_path().to_path_buf(); + let session_model = test.session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "route unified exec through the in-process exec-server".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd, + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + let output = harness.function_call_stdout(call_id).await; + assert!( + output.contains(marker), + "expected unified exec output from exec-server backend, got: {output:?}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/docs/config.md b/docs/config.md index d03fb98434..63c42be333 100644 --- a/docs/config.md +++ b/docs/config.md @@ -78,4 +78,13 @@ developer message Codex inserts when realtime becomes active. It only affects the realtime start message in prompt history and does not change websocket backend prompt settings or the realtime end/inactive message. +## Unified exec over exec-server + +`experimental_unified_exec_use_exec_server` routes `exec_command` and +`write_stdin` process launches through `codex-exec-server` instead of spawning +them directly in-process. When +`experimental_unified_exec_spawn_local_exec_server` is also enabled, Codex +starts a session-scoped local `codex-exec-server` subprocess on startup and +uses that connection for unified-exec calls. + Ctrl+C/Ctrl+D quitting uses a ~1 second double-press hint (`ctrl + c again to quit`).