Compare commits

..

1 Commits

Author SHA1 Message Date
starr-openai
0017779b05 Route app-server command exec through executor backend
Co-authored-by: Codex <noreply@openai.com>
2026-03-18 19:49:14 +00:00
10 changed files with 739 additions and 9 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1428,6 +1428,7 @@ dependencies = [
"codex-cloud-requirements",
"codex-core",
"codex-environment",
"codex-exec-server",
"codex-feedback",
"codex-file-search",
"codex-login",

View File

@@ -33,6 +33,7 @@ codex-arg0 = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-core = { workspace = true }
codex-environment = { workspace = true }
codex-exec-server = { path = "../exec-server" }
codex-otel = { workspace = true }
codex-shell-command = { workspace = true }
codex-utils-cli = { workspace = true }

View File

@@ -722,6 +722,7 @@ Streaming stdin/stdout uses base64 so PTY sessions can carry arbitrary bytes:
- `command/exec/outputDelta.capReached` is `true` on the final streamed chunk for a stream when `outputBytesCap` truncates that stream; later output on that stream is dropped.
- `command/exec.params.env` overrides the server-computed environment per key; set a key to `null` to unset an inherited variable.
- `command/exec/resize` is only supported for PTY-backed `command/exec` sessions.
- When `experimental_unified_exec_use_exec_server = true`, `command/exec` reuses the selected executor backend. In that mode, streaming exec/write/terminate are supported remotely, while sandboxed execution, initial terminal sizing, `command/exec/resize`, and `closeStdin` remain unsupported and return explicit errors.
### Example: Filesystem utilities

View File

@@ -408,6 +408,7 @@ pub(crate) struct CodexMessageProcessorArgs {
pub(crate) config: Arc<Config>,
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
pub(crate) command_exec_manager: CommandExecManager,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
}
@@ -468,6 +469,7 @@ impl CodexMessageProcessor {
config,
cli_overrides,
cloud_requirements,
command_exec_manager,
feedback,
log_db,
} = args;
@@ -483,7 +485,7 @@ impl CodexMessageProcessor {
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
thread_state_manager: ThreadStateManager::new(),
thread_watch_manager: ThreadWatchManager::new_with_outgoing(outgoing),
command_exec_manager: CommandExecManager::default(),
command_exec_manager,
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
background_tasks: TaskTracker::new(),

View File

@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@@ -25,6 +27,11 @@ use codex_core::exec::ExecExpiration;
use codex_core::exec::IO_DRAIN_TIMEOUT_MS;
use codex_core::exec::SandboxType;
use codex_core::sandboxing::ExecRequest;
use codex_exec_server::ExecServerClient;
use codex_exec_server::ExecServerEvent;
use codex_exec_server::ExecOutputStream as RemoteExecOutputStream;
use codex_exec_server::ExecParams as RemoteExecParams;
use codex_protocol::protocol::SandboxPolicy;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::ProcessHandle;
use codex_utils_pty::SpawnedProcess;
@@ -45,13 +52,211 @@ const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
#[derive(Clone)]
pub(crate) struct CommandExecManager {
backend: CommandExecBackend,
sessions: Arc<Mutex<HashMap<ConnectionProcessId, CommandExecSession>>>,
next_generated_process_id: Arc<AtomicI64>,
}
#[cfg(test)]
mod remote_backend_tests {
use super::*;
use crate::outgoing_message::{ConnectionId, ConnectionRequestId, OutgoingMessageSender};
use codex_app_server_protocol::RequestId;
use codex_core::exec::ExecExpiration;
use codex_core::sandboxing::SandboxPermissions;
use codex_exec_server::{ExecServerClient, ExecServerClientConnectOptions};
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::permissions::{FileSystemSandboxPolicy, NetworkSandboxPolicy};
use codex_protocol::protocol::{ReadOnlyAccess, SandboxPolicy};
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::sync::mpsc;
fn remote_exec_request(
sandbox_policy: SandboxPolicy,
sandbox: SandboxType,
) -> ExecRequest {
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy);
ExecRequest {
command: vec!["/bin/sh".to_string(), "-c".to_string(), "sleep 30".to_string()],
cwd: PathBuf::from("."),
env: HashMap::new(),
network: None,
expiration: ExecExpiration::DefaultTimeout,
sandbox,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
windows_sandbox_private_desktop: false,
sandbox_permissions: SandboxPermissions::UseDefault,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
justification: None,
arg0: None,
}
}
fn start_params(
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
process_id: &str,
exec_request: ExecRequest,
tty: bool,
) -> StartCommandExecParams {
StartCommandExecParams {
outgoing,
request_id,
process_id: Some(process_id.to_string()),
exec_request,
started_network_proxy: None,
tty,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: Some(DEFAULT_OUTPUT_BYTES_CAP),
size: None,
}
}
async fn remote_manager(tempdir: &TempDir) -> CommandExecManager {
let client = ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default())
.await
.expect("connect in process exec server");
CommandExecManager::new(CommandExecBackend::ExecServer(RemoteCommandExecBackend::new(
client,
tempdir.path().to_path_buf(),
None,
)))
}
#[tokio::test]
async fn command_exec_remote_backend_rejects_resize() {
let tempdir = TempDir::new().expect("tempdir");
let manager = remote_manager(&tempdir).await;
let (tx, _rx) = mpsc::channel(1);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(11),
request_id: RequestId::Integer(2),
};
manager
.start(start_params(
Arc::new(OutgoingMessageSender::new(tx)),
request_id.clone(),
"proc-remote-resize",
remote_exec_request(SandboxPolicy::DangerFullAccess, SandboxType::None),
true,
))
.await
.expect("start remote process");
let err = manager
.resize(
request_id.clone(),
CommandExecResizeParams {
process_id: "proc-remote-resize".to_string(),
size: CommandExecTerminalSize {
cols: 120,
rows: 40,
},
},
)
.await
.expect_err("resize should fail");
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(err.message, "remote command/exec does not support resize");
let _ = manager
.terminate(
request_id,
CommandExecTerminateParams {
process_id: "proc-remote-resize".to_string(),
},
)
.await;
}
#[tokio::test]
async fn command_exec_remote_backend_rejects_sandboxed_execution() {
let tempdir = TempDir::new().expect("tempdir");
let manager = remote_manager(&tempdir).await;
let (tx, _rx) = mpsc::channel(1);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(12),
request_id: RequestId::Integer(5),
};
let sandbox_policy = SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: false,
};
let err = manager
.start(start_params(
Arc::new(OutgoingMessageSender::new(tx)),
request_id,
"proc-remote-sandbox",
remote_exec_request(sandbox_policy, SandboxType::MacosSeatbelt),
true,
))
.await
.expect_err("sandboxed remote exec should fail");
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(
err.message,
"remote command/exec does not support sandboxed execution"
);
assert_eq!(err.data, None);
}
}
impl Default for CommandExecManager {
fn default() -> Self {
Self::new(CommandExecBackend::Local)
}
}
#[derive(Clone)]
pub(crate) enum CommandExecBackend {
Local,
ExecServer(RemoteCommandExecBackend),
}
#[derive(Clone)]
pub(crate) struct RemoteCommandExecBackend {
client: ExecServerClient,
local_workspace_root: PathBuf,
remote_workspace_root: Option<PathBuf>,
}
impl RemoteCommandExecBackend {
pub(crate) fn new(
client: ExecServerClient,
local_workspace_root: PathBuf,
remote_workspace_root: Option<PathBuf>,
) -> Self {
Self {
client,
local_workspace_root,
remote_workspace_root,
}
}
fn map_path(&self, path: &Path) -> PathBuf {
match &self.remote_workspace_root {
Some(remote_workspace_root) => match path.strip_prefix(&self.local_workspace_root) {
Ok(relative) => remote_workspace_root.join(relative),
Err(_) => path.to_path_buf(),
},
None => path.to_path_buf(),
}
}
}
impl CommandExecManager {
pub(crate) fn new(backend: CommandExecBackend) -> Self {
Self {
backend,
sessions: Arc::new(Mutex::new(HashMap::new())),
next_generated_process_id: Arc::new(AtomicI64::new(1)),
}
@@ -119,6 +324,20 @@ struct SpawnProcessOutputParams {
output_bytes_cap: Option<usize>,
}
struct RunRemoteCommandParams {
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
process_id: Option<String>,
remote_process_id: String,
client: ExecServerClient,
control_rx: mpsc::Receiver<CommandControlRequest>,
stream_stdin: bool,
stream_stdout_stderr: bool,
expiration: ExecExpiration,
output_bytes_cap: Option<usize>,
started_network_proxy: Option<StartedNetworkProxy>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum InternalProcessId {
Generated(i64),
@@ -127,6 +346,7 @@ enum InternalProcessId {
trait InternalProcessIdExt {
fn error_repr(&self) -> String;
fn protocol_repr(&self) -> String;
}
impl InternalProcessIdExt for InternalProcessId {
@@ -136,6 +356,13 @@ impl InternalProcessIdExt for InternalProcessId {
Self::Client(id) => serde_json::to_string(id).unwrap_or_else(|_| format!("{id:?}")),
}
}
fn protocol_repr(&self) -> String {
match self {
Self::Generated(id) => id.to_string(),
Self::Client(id) => id.clone(),
}
}
}
impl CommandExecManager {
@@ -174,6 +401,30 @@ impl CommandExecManager {
process_id: process_id.clone(),
};
if let CommandExecBackend::ExecServer(remote_backend) = &self.backend {
return self
.start_remote(
remote_backend.clone(),
process_key,
StartCommandExecParams {
outgoing,
request_id,
process_id: match process_id {
InternalProcessId::Generated(_) => None,
InternalProcessId::Client(process_id) => Some(process_id),
},
exec_request,
started_network_proxy,
tty,
stream_stdin,
stream_stdout_stderr,
output_bytes_cap,
size,
},
)
.await;
}
if matches!(exec_request.sandbox, SandboxType::WindowsRestrictedToken) {
if tty || stream_stdin || stream_stdout_stderr {
return Err(invalid_request(
@@ -304,6 +555,105 @@ impl CommandExecManager {
Ok(())
}
async fn start_remote(
&self,
remote_backend: RemoteCommandExecBackend,
process_key: ConnectionProcessId,
params: StartCommandExecParams,
) -> Result<(), JSONRPCErrorError> {
let StartCommandExecParams {
outgoing,
request_id,
process_id,
exec_request,
started_network_proxy,
tty,
stream_stdin,
stream_stdout_stderr,
output_bytes_cap,
size,
} = params;
if exec_request.sandbox != SandboxType::None
|| !matches!(exec_request.sandbox_policy, SandboxPolicy::DangerFullAccess)
{
return Err(invalid_request(
"remote command/exec does not support sandboxed execution".to_string(),
));
}
if size.is_some() {
return Err(invalid_request(
"remote command/exec does not support terminal sizing".to_string(),
));
}
let remote_process_id = process_key.process_id.protocol_repr();
let ExecRequest {
command,
cwd,
env,
expiration,
sandbox: _sandbox,
arg0,
..
} = exec_request;
let (control_tx, control_rx) = mpsc::channel(32);
{
let mut sessions = self.sessions.lock().await;
if sessions.contains_key(&process_key) {
return Err(invalid_request(format!(
"duplicate active command/exec process id: {}",
process_key.process_id.error_repr(),
)));
}
sessions.insert(
process_key.clone(),
CommandExecSession::Active { control_tx },
);
}
let remote_cwd = remote_backend.map_path(cwd.as_path());
if let Err(err) = remote_backend
.client
.exec(RemoteExecParams {
process_id: remote_process_id.clone(),
argv: command,
cwd: remote_cwd,
env,
tty,
arg0,
sandbox: None,
})
.await
{
self.sessions.lock().await.remove(&process_key);
return Err(internal_error(format!("failed to spawn command: {err}")));
}
let notification_process_id = process_id.clone();
let client = remote_backend.client.clone();
let sessions = Arc::clone(&self.sessions);
tokio::spawn(async move {
run_remote_command(RunRemoteCommandParams {
outgoing,
request_id: request_id.clone(),
process_id: notification_process_id,
remote_process_id,
client,
control_rx,
stream_stdin: tty || stream_stdin,
stream_stdout_stderr: tty || stream_stdout_stderr,
expiration,
output_bytes_cap,
started_network_proxy,
})
.await;
sessions.lock().await.remove(&process_key);
});
Ok(())
}
pub(crate) async fn write(
&self,
request_id: ConnectionRequestId,
@@ -562,6 +912,128 @@ async fn run_command(params: RunCommandParams) {
.await;
}
async fn run_remote_command(params: RunRemoteCommandParams) {
let RunRemoteCommandParams {
outgoing,
request_id,
process_id,
remote_process_id,
client,
control_rx,
stream_stdin,
stream_stdout_stderr,
expiration,
output_bytes_cap,
started_network_proxy,
} = params;
let _started_network_proxy = started_network_proxy;
let mut control_rx = control_rx;
let mut control_open = true;
let mut events_rx = client.event_receiver();
let expiration = async {
match expiration {
ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await,
ExecExpiration::DefaultTimeout => {
tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)).await;
}
ExecExpiration::Cancellation(cancel) => {
cancel.cancelled().await;
}
}
};
tokio::pin!(expiration);
let mut timed_out = false;
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut stdout_bytes = 0usize;
let mut stderr_bytes = 0usize;
let exit_code = loop {
tokio::select! {
control = control_rx.recv(), if control_open => {
match control {
Some(CommandControlRequest { control, response_tx }) => {
let result = match control {
CommandControl::Write { delta, close_stdin } => {
handle_remote_process_write(
&client,
&remote_process_id,
stream_stdin,
delta,
close_stdin,
).await
}
CommandControl::Resize { .. } => Err(invalid_request(
"remote command/exec does not support resize".to_string(),
)),
CommandControl::Terminate => {
client
.terminate(&remote_process_id)
.await
.map(|_| ())
.map_err(|err| internal_error(format!("failed to terminate remote command: {err}")))
}
};
if let Some(response_tx) = response_tx {
let _ = response_tx.send(result);
}
}
None => {
control_open = false;
let _ = client.terminate(&remote_process_id).await;
}
}
}
_ = &mut expiration, if !timed_out => {
timed_out = true;
let _ = client.terminate(&remote_process_id).await;
}
event = events_rx.recv() => {
match event {
Ok(ExecServerEvent::OutputDelta(notification)) if notification.process_id == remote_process_id => {
handle_remote_output_chunk(
&outgoing,
request_id.connection_id,
process_id.as_ref(),
notification.stream,
notification.chunk.into_inner(),
stream_stdout_stderr,
output_bytes_cap,
&mut stdout,
&mut stdout_bytes,
&mut stderr,
&mut stderr_bytes,
).await;
}
Ok(ExecServerEvent::Exited(notification)) if notification.process_id == remote_process_id => {
break if timed_out { EXEC_TIMEOUT_EXIT_CODE } else { notification.exit_code };
}
Ok(_) => {}
Err(err) => {
outgoing
.send_error(
request_id,
internal_error(format!("exec-server event stream closed: {err}")),
)
.await;
return;
}
}
}
}
};
outgoing
.send_response(
request_id,
CommandExecResponse {
exit_code,
stdout: bytes_to_string_smart(&stdout),
stderr: bytes_to_string_smart(&stderr),
},
)
.await;
}
fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHandle<String> {
let SpawnProcessOutputParams {
connection_id,
@@ -620,6 +1092,85 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa
})
}
async fn handle_remote_process_write(
client: &ExecServerClient,
remote_process_id: &str,
stream_stdin: bool,
delta: Vec<u8>,
close_stdin: bool,
) -> Result<(), JSONRPCErrorError> {
if !stream_stdin {
return Err(invalid_request(
"stdin streaming is not enabled for this command/exec".to_string(),
));
}
if close_stdin {
return Err(invalid_request(
"remote command/exec does not support closeStdin".to_string(),
));
}
if !delta.is_empty() {
client
.write(remote_process_id, delta)
.await
.map_err(|err| internal_error(format!("failed to write remote stdin: {err}")))?;
}
Ok(())
}
async fn handle_remote_output_chunk(
outgoing: &Arc<OutgoingMessageSender>,
connection_id: ConnectionId,
process_id: Option<&String>,
stream: RemoteExecOutputStream,
chunk: Vec<u8>,
stream_output: bool,
output_bytes_cap: Option<usize>,
stdout: &mut Vec<u8>,
stdout_bytes: &mut usize,
stderr: &mut Vec<u8>,
stderr_bytes: &mut usize,
) {
let (stream, buffer, observed_num_bytes) = match stream {
RemoteExecOutputStream::Stdout | RemoteExecOutputStream::Pty => (
CommandExecOutputStream::Stdout,
stdout,
stdout_bytes,
),
RemoteExecOutputStream::Stderr => (
CommandExecOutputStream::Stderr,
stderr,
stderr_bytes,
),
};
let capped_chunk = match output_bytes_cap {
Some(output_bytes_cap) => {
let capped_chunk_len = output_bytes_cap
.saturating_sub(*observed_num_bytes)
.min(chunk.len());
*observed_num_bytes += capped_chunk_len;
&chunk[0..capped_chunk_len]
}
None => chunk.as_slice(),
};
let cap_reached = Some(*observed_num_bytes) == output_bytes_cap;
if let (true, Some(process_id)) = (stream_output, process_id) {
outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::CommandExecOutputDelta(CommandExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
delta_base64: STANDARD.encode(capped_chunk),
cap_reached,
}),
)
.await;
} else if !stream_output {
buffer.extend_from_slice(capped_chunk);
}
}
async fn handle_process_write(
session: &ProcessHandle,
stream_stdin: bool,

View File

@@ -7,6 +7,9 @@ use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::command_exec::CommandExecBackend;
use crate::command_exec::CommandExecManager;
use crate::command_exec::RemoteCommandExecBackend;
use crate::config_api::ConfigApi;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::external_agent_config_api::ExternalAgentConfigApi;
@@ -62,6 +65,7 @@ use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::default_client::set_default_originator;
use codex_core::executor_backends_for_config;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
@@ -181,7 +185,7 @@ pub(crate) struct MessageProcessorArgs {
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
pub(crate) async fn new(args: MessageProcessorArgs) -> std::io::Result<Self> {
let MessageProcessorArgs {
outgoing,
arg0_paths,
@@ -233,6 +237,29 @@ impl MessageProcessor {
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let executor_backends = executor_backends_for_config(config.as_ref(), None).await?;
let codex_core::ExecutorBackends {
environment,
exec_server_client,
} = executor_backends;
let local_workspace_root = config
.cwd
.clone()
.try_into()
.expect("config cwd should be absolute");
let command_exec_manager = match exec_server_client {
Some(client) => CommandExecManager::new(CommandExecBackend::ExecServer(
RemoteCommandExecBackend::new(
client,
local_workspace_root,
config
.experimental_unified_exec_exec_server_workspace_root
.clone()
.map(Into::into),
),
)),
None => CommandExecManager::default(),
};
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager: auth_manager.clone(),
thread_manager: Arc::clone(&thread_manager),
@@ -241,6 +268,7 @@ impl MessageProcessor {
config: Arc::clone(&config),
cli_overrides: cli_overrides.clone(),
cloud_requirements: cloud_requirements.clone(),
command_exec_manager,
feedback,
log_db,
});
@@ -253,9 +281,9 @@ impl MessageProcessor {
analytics_events_client,
);
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());
let fs_api = FsApi::default();
let fs_api = FsApi::new(environment.get_filesystem());
Self {
Ok(Self {
outgoing,
codex_message_processor,
config_api,
@@ -264,7 +292,7 @@ impl MessageProcessor {
auth_manager,
config,
config_warnings: Arc::new(config_warnings),
}
})
}
pub(crate) fn clear_runtime_references(&self) {

View File

@@ -18,6 +18,7 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::Instant;
@@ -440,6 +441,84 @@ async fn command_exec_streaming_does_not_buffer_output() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn command_exec_remote_backend_supports_streaming_write_and_terminate() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_command_exec_config_toml(
codex_home.path(),
&server.uri(),
"never",
"danger-full-access",
true,
)?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let mut ws = connect_websocket(bind_addr).await?;
send_initialize_request(&mut ws, 1, "remote_command_exec_client").await?;
read_initialize_response(&mut ws, 1).await?;
send_request(
&mut ws,
"command/exec",
2,
Some(serde_json::to_value(CommandExecParams {
command: vec!["sh".to_string(), "-lc".to_string(), "cat".to_string()],
process_id: Some("remote-cat-1".to_string()),
tty: false,
stream_stdin: true,
stream_stdout_stderr: true,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
})?),
)
.await?;
send_request(
&mut ws,
"command/exec/write",
3,
Some(serde_json::to_value(CommandExecWriteParams {
process_id: "remote-cat-1".to_string(),
delta_base64: Some(STANDARD.encode("remote-stdin\n")),
close_stdin: false,
})?),
)
.await?;
let write_response = super::connection_handling_websocket::read_response_for_id(&mut ws, 3).await?;
assert_eq!(write_response.id, RequestId::Integer(3));
let delta = read_command_exec_delta_ws(&mut ws).await?;
assert_eq!(delta.process_id, "remote-cat-1");
assert_eq!(String::from_utf8(STANDARD.decode(&delta.delta_base64)?)?, "remote-stdin\n");
send_request(
&mut ws,
"command/exec/terminate",
4,
Some(serde_json::to_value(CommandExecTerminateParams {
process_id: "remote-cat-1".to_string(),
})?),
)
.await?;
let terminate_response = super::connection_handling_websocket::read_response_for_id(&mut ws, 4).await?;
assert_eq!(terminate_response.id, RequestId::Integer(4));
let response = super::connection_handling_websocket::read_response_for_id(&mut ws, 2).await?;
let response: CommandExecResponse = to_response(response)?;
assert_ne!(response.exit_code, 0);
assert_eq!(response.stdout, "");
assert_eq!(response.stderr, "");
process.kill().await.context("failed to stop websocket app-server process")?;
Ok(())
}
#[tokio::test]
async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
@@ -884,3 +963,32 @@ fn process_with_marker_exists(marker: &str) -> Result<bool> {
let stdout = String::from_utf8(output.stdout).context("decode ps output")?;
Ok(stdout.lines().any(|line| line.contains(marker)))
}
fn create_command_exec_config_toml(
codex_home: &Path,
server_uri: &str,
approval_policy: &str,
sandbox_mode: &str,
use_exec_server: bool,
) -> std::io::Result<()> {
let mut config = format!(
r#"
model = "mock-model"
approval_policy = "{approval_policy}"
sandbox_mode = "{sandbox_mode}"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
);
if use_exec_server {
config.push_str("\nexperimental_unified_exec_use_exec_server = true\n");
}
std::fs::write(codex_home.join("config.toml"), config)
}

View File

@@ -126,6 +126,7 @@ mod tools;
pub mod turn_diff_tracker;
mod turn_metadata;
mod turn_timing;
pub use unified_exec::ExecutorBackends;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
@@ -138,6 +139,7 @@ pub use rollout::find_archived_thread_path_by_id_str;
pub use rollout::find_conversation_path_by_id_str;
pub use rollout::find_thread_name_by_id;
pub use rollout::find_thread_path_by_id_str;
pub use unified_exec::executor_backends_for_config;
pub use rollout::find_thread_path_by_name_str;
pub use rollout::list::Cursor;
pub use rollout::list::ThreadItem;

View File

@@ -1,3 +1,4 @@
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
@@ -25,6 +26,12 @@ pub(crate) type UnifiedExecSessionFactoryHandle = Arc<dyn UnifiedExecSessionFact
pub(crate) struct SessionExecutionBackends {
pub(crate) unified_exec_session_factory: UnifiedExecSessionFactoryHandle,
pub(crate) environment: Arc<Environment>,
pub(crate) exec_server_client: Option<ExecServerClient>,
}
pub struct ExecutorBackends {
pub environment: Arc<Environment>,
pub exec_server_client: Option<ExecServerClient>,
}
#[async_trait]
@@ -157,6 +164,7 @@ pub(crate) async fn session_execution_backends_for_config(
return Ok(SessionExecutionBackends {
unified_exec_session_factory: local_unified_exec_session_factory(),
environment: Arc::new(Environment::default()),
exec_server_client: None,
});
}
@@ -179,9 +187,10 @@ pub(crate) async fn session_execution_backends_for_config(
spawn_local_exec_server(command, ExecServerClientConnectOptions::default())
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
return Ok(exec_server_backends_from_spawned_server(Arc::new(
spawned_server,
), path_mapper));
return Ok(exec_server_backends_from_spawned_server(
Arc::new(spawned_server),
path_mapper,
));
}
let client = ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default())
@@ -190,6 +199,29 @@ pub(crate) async fn session_execution_backends_for_config(
Ok(exec_server_backends_from_client(client, path_mapper))
}
pub async fn executor_environment_for_config(
config: &Config,
local_exec_server_command: Option<ExecServerLaunchCommand>,
) -> io::Result<Arc<Environment>> {
session_execution_backends_for_config(config, local_exec_server_command)
.await
.map(|backends| backends.environment)
.map_err(|err| io::Error::other(err.to_string()))
}
pub async fn executor_backends_for_config(
config: &Config,
local_exec_server_command: Option<ExecServerLaunchCommand>,
) -> io::Result<ExecutorBackends> {
session_execution_backends_for_config(config, local_exec_server_command)
.await
.map(|backends| ExecutorBackends {
environment: backends.environment,
exec_server_client: backends.exec_server_client,
})
.map_err(|err| io::Error::other(err.to_string()))
}
fn default_local_exec_server_command() -> ExecServerLaunchCommand {
let binary_name = if cfg!(windows) {
"codex-exec-server.exe"
@@ -217,9 +249,10 @@ fn exec_server_backends_from_client(
path_mapper.clone(),
),
environment: Arc::new(Environment::new(Arc::new(ExecServerFileSystem::new(
client,
client.clone(),
path_mapper,
)))),
exec_server_client: Some(client),
}
}
@@ -236,6 +269,7 @@ fn exec_server_backends_from_spawned_server(
spawned_server.client().clone(),
path_mapper,
)))),
exec_server_client: Some(spawned_server.client().clone()),
}
}

View File

@@ -48,6 +48,8 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
process_manager::set_deterministic_process_ids_for_tests(enabled);
}
pub use backend::ExecutorBackends;
pub use backend::executor_backends_for_config;
pub(crate) use backend::UnifiedExecSessionFactoryHandle;
pub(crate) use backend::local_unified_exec_session_factory;
pub(crate) use backend::session_execution_backends_for_config;