mirror of
https://github.com/openai/codex.git
synced 2026-03-23 16:46:32 +03:00
Compare commits
1 Commits
starr/exec
...
starr/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0017779b05 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1428,6 +1428,7 @@ dependencies = [
|
||||
"codex-cloud-requirements",
|
||||
"codex-core",
|
||||
"codex-environment",
|
||||
"codex-exec-server",
|
||||
"codex-feedback",
|
||||
"codex-file-search",
|
||||
"codex-login",
|
||||
|
||||
@@ -33,6 +33,7 @@ codex-arg0 = { workspace = true }
|
||||
codex-cloud-requirements = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-environment = { workspace = true }
|
||||
codex-exec-server = { path = "../exec-server" }
|
||||
codex-otel = { workspace = true }
|
||||
codex-shell-command = { workspace = true }
|
||||
codex-utils-cli = { workspace = true }
|
||||
|
||||
@@ -722,6 +722,7 @@ Streaming stdin/stdout uses base64 so PTY sessions can carry arbitrary bytes:
|
||||
- `command/exec/outputDelta.capReached` is `true` on the final streamed chunk for a stream when `outputBytesCap` truncates that stream; later output on that stream is dropped.
|
||||
- `command/exec.params.env` overrides the server-computed environment per key; set a key to `null` to unset an inherited variable.
|
||||
- `command/exec/resize` is only supported for PTY-backed `command/exec` sessions.
|
||||
- When `experimental_unified_exec_use_exec_server = true`, `command/exec` reuses the selected executor backend. In that mode, streaming exec/write/terminate are supported remotely, while sandboxed execution, initial terminal sizing, `command/exec/resize`, and `closeStdin` remain unsupported and return explicit errors.
|
||||
|
||||
### Example: Filesystem utilities
|
||||
|
||||
|
||||
@@ -408,6 +408,7 @@ pub(crate) struct CodexMessageProcessorArgs {
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
|
||||
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
pub(crate) command_exec_manager: CommandExecManager,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
}
|
||||
@@ -468,6 +469,7 @@ impl CodexMessageProcessor {
|
||||
config,
|
||||
cli_overrides,
|
||||
cloud_requirements,
|
||||
command_exec_manager,
|
||||
feedback,
|
||||
log_db,
|
||||
} = args;
|
||||
@@ -483,7 +485,7 @@ impl CodexMessageProcessor {
|
||||
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
|
||||
thread_state_manager: ThreadStateManager::new(),
|
||||
thread_watch_manager: ThreadWatchManager::new_with_outgoing(outgoing),
|
||||
command_exec_manager: CommandExecManager::default(),
|
||||
command_exec_manager,
|
||||
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
|
||||
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
|
||||
background_tasks: TaskTracker::new(),
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -25,6 +27,11 @@ use codex_core::exec::ExecExpiration;
|
||||
use codex_core::exec::IO_DRAIN_TIMEOUT_MS;
|
||||
use codex_core::exec::SandboxType;
|
||||
use codex_core::sandboxing::ExecRequest;
|
||||
use codex_exec_server::ExecServerClient;
|
||||
use codex_exec_server::ExecServerEvent;
|
||||
use codex_exec_server::ExecOutputStream as RemoteExecOutputStream;
|
||||
use codex_exec_server::ExecParams as RemoteExecParams;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
|
||||
use codex_utils_pty::ProcessHandle;
|
||||
use codex_utils_pty::SpawnedProcess;
|
||||
@@ -45,13 +52,211 @@ const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CommandExecManager {
|
||||
backend: CommandExecBackend,
|
||||
sessions: Arc<Mutex<HashMap<ConnectionProcessId, CommandExecSession>>>,
|
||||
next_generated_process_id: Arc<AtomicI64>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod remote_backend_tests {
|
||||
use super::*;
|
||||
use crate::outgoing_message::{ConnectionId, ConnectionRequestId, OutgoingMessageSender};
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_core::exec::ExecExpiration;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_exec_server::{ExecServerClient, ExecServerClientConnectOptions};
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::permissions::{FileSystemSandboxPolicy, NetworkSandboxPolicy};
|
||||
use codex_protocol::protocol::{ReadOnlyAccess, SandboxPolicy};
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
fn remote_exec_request(
|
||||
sandbox_policy: SandboxPolicy,
|
||||
sandbox: SandboxType,
|
||||
) -> ExecRequest {
|
||||
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
|
||||
let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy);
|
||||
ExecRequest {
|
||||
command: vec!["/bin/sh".to_string(), "-c".to_string(), "sleep 30".to_string()],
|
||||
cwd: PathBuf::from("."),
|
||||
env: HashMap::new(),
|
||||
network: None,
|
||||
expiration: ExecExpiration::DefaultTimeout,
|
||||
sandbox,
|
||||
windows_sandbox_level: WindowsSandboxLevel::Disabled,
|
||||
windows_sandbox_private_desktop: false,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
sandbox_policy,
|
||||
file_system_sandbox_policy,
|
||||
network_sandbox_policy,
|
||||
justification: None,
|
||||
arg0: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_params(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: ConnectionRequestId,
|
||||
process_id: &str,
|
||||
exec_request: ExecRequest,
|
||||
tty: bool,
|
||||
) -> StartCommandExecParams {
|
||||
StartCommandExecParams {
|
||||
outgoing,
|
||||
request_id,
|
||||
process_id: Some(process_id.to_string()),
|
||||
exec_request,
|
||||
started_network_proxy: None,
|
||||
tty,
|
||||
stream_stdin: false,
|
||||
stream_stdout_stderr: false,
|
||||
output_bytes_cap: Some(DEFAULT_OUTPUT_BYTES_CAP),
|
||||
size: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn remote_manager(tempdir: &TempDir) -> CommandExecManager {
|
||||
let client = ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default())
|
||||
.await
|
||||
.expect("connect in process exec server");
|
||||
CommandExecManager::new(CommandExecBackend::ExecServer(RemoteCommandExecBackend::new(
|
||||
client,
|
||||
tempdir.path().to_path_buf(),
|
||||
None,
|
||||
)))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_remote_backend_rejects_resize() {
|
||||
let tempdir = TempDir::new().expect("tempdir");
|
||||
let manager = remote_manager(&tempdir).await;
|
||||
let (tx, _rx) = mpsc::channel(1);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(11),
|
||||
request_id: RequestId::Integer(2),
|
||||
};
|
||||
|
||||
manager
|
||||
.start(start_params(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id.clone(),
|
||||
"proc-remote-resize",
|
||||
remote_exec_request(SandboxPolicy::DangerFullAccess, SandboxType::None),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
.expect("start remote process");
|
||||
|
||||
let err = manager
|
||||
.resize(
|
||||
request_id.clone(),
|
||||
CommandExecResizeParams {
|
||||
process_id: "proc-remote-resize".to_string(),
|
||||
size: CommandExecTerminalSize {
|
||||
cols: 120,
|
||||
rows: 40,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("resize should fail");
|
||||
|
||||
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
|
||||
assert_eq!(err.message, "remote command/exec does not support resize");
|
||||
|
||||
let _ = manager
|
||||
.terminate(
|
||||
request_id,
|
||||
CommandExecTerminateParams {
|
||||
process_id: "proc-remote-resize".to_string(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_remote_backend_rejects_sandboxed_execution() {
|
||||
let tempdir = TempDir::new().expect("tempdir");
|
||||
let manager = remote_manager(&tempdir).await;
|
||||
let (tx, _rx) = mpsc::channel(1);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(12),
|
||||
request_id: RequestId::Integer(5),
|
||||
};
|
||||
let sandbox_policy = SandboxPolicy::ReadOnly {
|
||||
access: ReadOnlyAccess::FullAccess,
|
||||
network_access: false,
|
||||
};
|
||||
|
||||
let err = manager
|
||||
.start(start_params(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id,
|
||||
"proc-remote-sandbox",
|
||||
remote_exec_request(sandbox_policy, SandboxType::MacosSeatbelt),
|
||||
true,
|
||||
))
|
||||
.await
|
||||
.expect_err("sandboxed remote exec should fail");
|
||||
|
||||
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
|
||||
assert_eq!(
|
||||
err.message,
|
||||
"remote command/exec does not support sandboxed execution"
|
||||
);
|
||||
assert_eq!(err.data, None);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CommandExecManager {
|
||||
fn default() -> Self {
|
||||
Self::new(CommandExecBackend::Local)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum CommandExecBackend {
|
||||
Local,
|
||||
ExecServer(RemoteCommandExecBackend),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RemoteCommandExecBackend {
|
||||
client: ExecServerClient,
|
||||
local_workspace_root: PathBuf,
|
||||
remote_workspace_root: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl RemoteCommandExecBackend {
|
||||
pub(crate) fn new(
|
||||
client: ExecServerClient,
|
||||
local_workspace_root: PathBuf,
|
||||
remote_workspace_root: Option<PathBuf>,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
local_workspace_root,
|
||||
remote_workspace_root,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_path(&self, path: &Path) -> PathBuf {
|
||||
match &self.remote_workspace_root {
|
||||
Some(remote_workspace_root) => match path.strip_prefix(&self.local_workspace_root) {
|
||||
Ok(relative) => remote_workspace_root.join(relative),
|
||||
Err(_) => path.to_path_buf(),
|
||||
},
|
||||
None => path.to_path_buf(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CommandExecManager {
|
||||
pub(crate) fn new(backend: CommandExecBackend) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
sessions: Arc::new(Mutex::new(HashMap::new())),
|
||||
next_generated_process_id: Arc::new(AtomicI64::new(1)),
|
||||
}
|
||||
@@ -119,6 +324,20 @@ struct SpawnProcessOutputParams {
|
||||
output_bytes_cap: Option<usize>,
|
||||
}
|
||||
|
||||
struct RunRemoteCommandParams {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: ConnectionRequestId,
|
||||
process_id: Option<String>,
|
||||
remote_process_id: String,
|
||||
client: ExecServerClient,
|
||||
control_rx: mpsc::Receiver<CommandControlRequest>,
|
||||
stream_stdin: bool,
|
||||
stream_stdout_stderr: bool,
|
||||
expiration: ExecExpiration,
|
||||
output_bytes_cap: Option<usize>,
|
||||
started_network_proxy: Option<StartedNetworkProxy>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
enum InternalProcessId {
|
||||
Generated(i64),
|
||||
@@ -127,6 +346,7 @@ enum InternalProcessId {
|
||||
|
||||
trait InternalProcessIdExt {
|
||||
fn error_repr(&self) -> String;
|
||||
fn protocol_repr(&self) -> String;
|
||||
}
|
||||
|
||||
impl InternalProcessIdExt for InternalProcessId {
|
||||
@@ -136,6 +356,13 @@ impl InternalProcessIdExt for InternalProcessId {
|
||||
Self::Client(id) => serde_json::to_string(id).unwrap_or_else(|_| format!("{id:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn protocol_repr(&self) -> String {
|
||||
match self {
|
||||
Self::Generated(id) => id.to_string(),
|
||||
Self::Client(id) => id.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CommandExecManager {
|
||||
@@ -174,6 +401,30 @@ impl CommandExecManager {
|
||||
process_id: process_id.clone(),
|
||||
};
|
||||
|
||||
if let CommandExecBackend::ExecServer(remote_backend) = &self.backend {
|
||||
return self
|
||||
.start_remote(
|
||||
remote_backend.clone(),
|
||||
process_key,
|
||||
StartCommandExecParams {
|
||||
outgoing,
|
||||
request_id,
|
||||
process_id: match process_id {
|
||||
InternalProcessId::Generated(_) => None,
|
||||
InternalProcessId::Client(process_id) => Some(process_id),
|
||||
},
|
||||
exec_request,
|
||||
started_network_proxy,
|
||||
tty,
|
||||
stream_stdin,
|
||||
stream_stdout_stderr,
|
||||
output_bytes_cap,
|
||||
size,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if matches!(exec_request.sandbox, SandboxType::WindowsRestrictedToken) {
|
||||
if tty || stream_stdin || stream_stdout_stderr {
|
||||
return Err(invalid_request(
|
||||
@@ -304,6 +555,105 @@ impl CommandExecManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_remote(
|
||||
&self,
|
||||
remote_backend: RemoteCommandExecBackend,
|
||||
process_key: ConnectionProcessId,
|
||||
params: StartCommandExecParams,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let StartCommandExecParams {
|
||||
outgoing,
|
||||
request_id,
|
||||
process_id,
|
||||
exec_request,
|
||||
started_network_proxy,
|
||||
tty,
|
||||
stream_stdin,
|
||||
stream_stdout_stderr,
|
||||
output_bytes_cap,
|
||||
size,
|
||||
} = params;
|
||||
|
||||
if exec_request.sandbox != SandboxType::None
|
||||
|| !matches!(exec_request.sandbox_policy, SandboxPolicy::DangerFullAccess)
|
||||
{
|
||||
return Err(invalid_request(
|
||||
"remote command/exec does not support sandboxed execution".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if size.is_some() {
|
||||
return Err(invalid_request(
|
||||
"remote command/exec does not support terminal sizing".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let remote_process_id = process_key.process_id.protocol_repr();
|
||||
let ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
expiration,
|
||||
sandbox: _sandbox,
|
||||
arg0,
|
||||
..
|
||||
} = exec_request;
|
||||
let (control_tx, control_rx) = mpsc::channel(32);
|
||||
{
|
||||
let mut sessions = self.sessions.lock().await;
|
||||
if sessions.contains_key(&process_key) {
|
||||
return Err(invalid_request(format!(
|
||||
"duplicate active command/exec process id: {}",
|
||||
process_key.process_id.error_repr(),
|
||||
)));
|
||||
}
|
||||
sessions.insert(
|
||||
process_key.clone(),
|
||||
CommandExecSession::Active { control_tx },
|
||||
);
|
||||
}
|
||||
|
||||
let remote_cwd = remote_backend.map_path(cwd.as_path());
|
||||
if let Err(err) = remote_backend
|
||||
.client
|
||||
.exec(RemoteExecParams {
|
||||
process_id: remote_process_id.clone(),
|
||||
argv: command,
|
||||
cwd: remote_cwd,
|
||||
env,
|
||||
tty,
|
||||
arg0,
|
||||
sandbox: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
self.sessions.lock().await.remove(&process_key);
|
||||
return Err(internal_error(format!("failed to spawn command: {err}")));
|
||||
}
|
||||
|
||||
let notification_process_id = process_id.clone();
|
||||
let client = remote_backend.client.clone();
|
||||
let sessions = Arc::clone(&self.sessions);
|
||||
tokio::spawn(async move {
|
||||
run_remote_command(RunRemoteCommandParams {
|
||||
outgoing,
|
||||
request_id: request_id.clone(),
|
||||
process_id: notification_process_id,
|
||||
remote_process_id,
|
||||
client,
|
||||
control_rx,
|
||||
stream_stdin: tty || stream_stdin,
|
||||
stream_stdout_stderr: tty || stream_stdout_stderr,
|
||||
expiration,
|
||||
output_bytes_cap,
|
||||
started_network_proxy,
|
||||
})
|
||||
.await;
|
||||
sessions.lock().await.remove(&process_key);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn write(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
@@ -562,6 +912,128 @@ async fn run_command(params: RunCommandParams) {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn run_remote_command(params: RunRemoteCommandParams) {
|
||||
let RunRemoteCommandParams {
|
||||
outgoing,
|
||||
request_id,
|
||||
process_id,
|
||||
remote_process_id,
|
||||
client,
|
||||
control_rx,
|
||||
stream_stdin,
|
||||
stream_stdout_stderr,
|
||||
expiration,
|
||||
output_bytes_cap,
|
||||
started_network_proxy,
|
||||
} = params;
|
||||
let _started_network_proxy = started_network_proxy;
|
||||
let mut control_rx = control_rx;
|
||||
let mut control_open = true;
|
||||
let mut events_rx = client.event_receiver();
|
||||
let expiration = async {
|
||||
match expiration {
|
||||
ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await,
|
||||
ExecExpiration::DefaultTimeout => {
|
||||
tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)).await;
|
||||
}
|
||||
ExecExpiration::Cancellation(cancel) => {
|
||||
cancel.cancelled().await;
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::pin!(expiration);
|
||||
let mut timed_out = false;
|
||||
let mut stdout = Vec::new();
|
||||
let mut stderr = Vec::new();
|
||||
let mut stdout_bytes = 0usize;
|
||||
let mut stderr_bytes = 0usize;
|
||||
let exit_code = loop {
|
||||
tokio::select! {
|
||||
control = control_rx.recv(), if control_open => {
|
||||
match control {
|
||||
Some(CommandControlRequest { control, response_tx }) => {
|
||||
let result = match control {
|
||||
CommandControl::Write { delta, close_stdin } => {
|
||||
handle_remote_process_write(
|
||||
&client,
|
||||
&remote_process_id,
|
||||
stream_stdin,
|
||||
delta,
|
||||
close_stdin,
|
||||
).await
|
||||
}
|
||||
CommandControl::Resize { .. } => Err(invalid_request(
|
||||
"remote command/exec does not support resize".to_string(),
|
||||
)),
|
||||
CommandControl::Terminate => {
|
||||
client
|
||||
.terminate(&remote_process_id)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|err| internal_error(format!("failed to terminate remote command: {err}")))
|
||||
}
|
||||
};
|
||||
if let Some(response_tx) = response_tx {
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
control_open = false;
|
||||
let _ = client.terminate(&remote_process_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = &mut expiration, if !timed_out => {
|
||||
timed_out = true;
|
||||
let _ = client.terminate(&remote_process_id).await;
|
||||
}
|
||||
event = events_rx.recv() => {
|
||||
match event {
|
||||
Ok(ExecServerEvent::OutputDelta(notification)) if notification.process_id == remote_process_id => {
|
||||
handle_remote_output_chunk(
|
||||
&outgoing,
|
||||
request_id.connection_id,
|
||||
process_id.as_ref(),
|
||||
notification.stream,
|
||||
notification.chunk.into_inner(),
|
||||
stream_stdout_stderr,
|
||||
output_bytes_cap,
|
||||
&mut stdout,
|
||||
&mut stdout_bytes,
|
||||
&mut stderr,
|
||||
&mut stderr_bytes,
|
||||
).await;
|
||||
}
|
||||
Ok(ExecServerEvent::Exited(notification)) if notification.process_id == remote_process_id => {
|
||||
break if timed_out { EXEC_TIMEOUT_EXIT_CODE } else { notification.exit_code };
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
outgoing
|
||||
.send_error(
|
||||
request_id,
|
||||
internal_error(format!("exec-server event stream closed: {err}")),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
CommandExecResponse {
|
||||
exit_code,
|
||||
stdout: bytes_to_string_smart(&stdout),
|
||||
stderr: bytes_to_string_smart(&stderr),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHandle<String> {
|
||||
let SpawnProcessOutputParams {
|
||||
connection_id,
|
||||
@@ -620,6 +1092,85 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_remote_process_write(
|
||||
client: &ExecServerClient,
|
||||
remote_process_id: &str,
|
||||
stream_stdin: bool,
|
||||
delta: Vec<u8>,
|
||||
close_stdin: bool,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
if !stream_stdin {
|
||||
return Err(invalid_request(
|
||||
"stdin streaming is not enabled for this command/exec".to_string(),
|
||||
));
|
||||
}
|
||||
if close_stdin {
|
||||
return Err(invalid_request(
|
||||
"remote command/exec does not support closeStdin".to_string(),
|
||||
));
|
||||
}
|
||||
if !delta.is_empty() {
|
||||
client
|
||||
.write(remote_process_id, delta)
|
||||
.await
|
||||
.map_err(|err| internal_error(format!("failed to write remote stdin: {err}")))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_remote_output_chunk(
|
||||
outgoing: &Arc<OutgoingMessageSender>,
|
||||
connection_id: ConnectionId,
|
||||
process_id: Option<&String>,
|
||||
stream: RemoteExecOutputStream,
|
||||
chunk: Vec<u8>,
|
||||
stream_output: bool,
|
||||
output_bytes_cap: Option<usize>,
|
||||
stdout: &mut Vec<u8>,
|
||||
stdout_bytes: &mut usize,
|
||||
stderr: &mut Vec<u8>,
|
||||
stderr_bytes: &mut usize,
|
||||
) {
|
||||
let (stream, buffer, observed_num_bytes) = match stream {
|
||||
RemoteExecOutputStream::Stdout | RemoteExecOutputStream::Pty => (
|
||||
CommandExecOutputStream::Stdout,
|
||||
stdout,
|
||||
stdout_bytes,
|
||||
),
|
||||
RemoteExecOutputStream::Stderr => (
|
||||
CommandExecOutputStream::Stderr,
|
||||
stderr,
|
||||
stderr_bytes,
|
||||
),
|
||||
};
|
||||
let capped_chunk = match output_bytes_cap {
|
||||
Some(output_bytes_cap) => {
|
||||
let capped_chunk_len = output_bytes_cap
|
||||
.saturating_sub(*observed_num_bytes)
|
||||
.min(chunk.len());
|
||||
*observed_num_bytes += capped_chunk_len;
|
||||
&chunk[0..capped_chunk_len]
|
||||
}
|
||||
None => chunk.as_slice(),
|
||||
};
|
||||
let cap_reached = Some(*observed_num_bytes) == output_bytes_cap;
|
||||
if let (true, Some(process_id)) = (stream_output, process_id) {
|
||||
outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::CommandExecOutputDelta(CommandExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
stream,
|
||||
delta_base64: STANDARD.encode(capped_chunk),
|
||||
cap_reached,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
} else if !stream_output {
|
||||
buffer.extend_from_slice(capped_chunk);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_process_write(
|
||||
session: &ProcessHandle,
|
||||
stream_stdin: bool,
|
||||
|
||||
@@ -7,6 +7,9 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_message_processor::CodexMessageProcessorArgs;
|
||||
use crate::command_exec::CommandExecBackend;
|
||||
use crate::command_exec::CommandExecManager;
|
||||
use crate::command_exec::RemoteCommandExecBackend;
|
||||
use crate::config_api::ConfigApi;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::external_agent_config_api::ExternalAgentConfigApi;
|
||||
@@ -62,6 +65,7 @@ use codex_core::default_client::USER_AGENT_SUFFIX;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::default_client::set_default_client_residency_requirement;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_core::executor_backends_for_config;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -181,7 +185,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
impl MessageProcessor {
|
||||
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
|
||||
/// `Sender` so handlers can enqueue messages to be written to stdout.
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
pub(crate) async fn new(args: MessageProcessorArgs) -> std::io::Result<Self> {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
arg0_paths,
|
||||
@@ -233,6 +237,29 @@ impl MessageProcessor {
|
||||
.plugins_manager()
|
||||
.maybe_start_curated_repo_sync_for_config(&config);
|
||||
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
|
||||
let executor_backends = executor_backends_for_config(config.as_ref(), None).await?;
|
||||
let codex_core::ExecutorBackends {
|
||||
environment,
|
||||
exec_server_client,
|
||||
} = executor_backends;
|
||||
let local_workspace_root = config
|
||||
.cwd
|
||||
.clone()
|
||||
.try_into()
|
||||
.expect("config cwd should be absolute");
|
||||
let command_exec_manager = match exec_server_client {
|
||||
Some(client) => CommandExecManager::new(CommandExecBackend::ExecServer(
|
||||
RemoteCommandExecBackend::new(
|
||||
client,
|
||||
local_workspace_root,
|
||||
config
|
||||
.experimental_unified_exec_exec_server_workspace_root
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
),
|
||||
)),
|
||||
None => CommandExecManager::default(),
|
||||
};
|
||||
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager: auth_manager.clone(),
|
||||
thread_manager: Arc::clone(&thread_manager),
|
||||
@@ -241,6 +268,7 @@ impl MessageProcessor {
|
||||
config: Arc::clone(&config),
|
||||
cli_overrides: cli_overrides.clone(),
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
command_exec_manager,
|
||||
feedback,
|
||||
log_db,
|
||||
});
|
||||
@@ -253,9 +281,9 @@ impl MessageProcessor {
|
||||
analytics_events_client,
|
||||
);
|
||||
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());
|
||||
let fs_api = FsApi::default();
|
||||
let fs_api = FsApi::new(environment.get_filesystem());
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
outgoing,
|
||||
codex_message_processor,
|
||||
config_api,
|
||||
@@ -264,7 +292,7 @@ impl MessageProcessor {
|
||||
auth_manager,
|
||||
config,
|
||||
config_warnings: Arc::new(config_warnings),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn clear_runtime_references(&self) {
|
||||
|
||||
@@ -18,6 +18,7 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
@@ -440,6 +441,84 @@ async fn command_exec_streaming_does_not_buffer_output() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_remote_backend_supports_streaming_write_and_terminate() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_command_exec_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
"danger-full-access",
|
||||
true,
|
||||
)?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let mut ws = connect_websocket(bind_addr).await?;
|
||||
send_initialize_request(&mut ws, 1, "remote_command_exec_client").await?;
|
||||
read_initialize_response(&mut ws, 1).await?;
|
||||
|
||||
send_request(
|
||||
&mut ws,
|
||||
"command/exec",
|
||||
2,
|
||||
Some(serde_json::to_value(CommandExecParams {
|
||||
command: vec!["sh".to_string(), "-lc".to_string(), "cat".to_string()],
|
||||
process_id: Some("remote-cat-1".to_string()),
|
||||
tty: false,
|
||||
stream_stdin: true,
|
||||
stream_stdout_stderr: true,
|
||||
output_bytes_cap: None,
|
||||
disable_output_cap: false,
|
||||
disable_timeout: false,
|
||||
timeout_ms: None,
|
||||
cwd: None,
|
||||
env: None,
|
||||
size: None,
|
||||
sandbox_policy: None,
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
|
||||
send_request(
|
||||
&mut ws,
|
||||
"command/exec/write",
|
||||
3,
|
||||
Some(serde_json::to_value(CommandExecWriteParams {
|
||||
process_id: "remote-cat-1".to_string(),
|
||||
delta_base64: Some(STANDARD.encode("remote-stdin\n")),
|
||||
close_stdin: false,
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let write_response = super::connection_handling_websocket::read_response_for_id(&mut ws, 3).await?;
|
||||
assert_eq!(write_response.id, RequestId::Integer(3));
|
||||
|
||||
let delta = read_command_exec_delta_ws(&mut ws).await?;
|
||||
assert_eq!(delta.process_id, "remote-cat-1");
|
||||
assert_eq!(String::from_utf8(STANDARD.decode(&delta.delta_base64)?)?, "remote-stdin\n");
|
||||
|
||||
send_request(
|
||||
&mut ws,
|
||||
"command/exec/terminate",
|
||||
4,
|
||||
Some(serde_json::to_value(CommandExecTerminateParams {
|
||||
process_id: "remote-cat-1".to_string(),
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let terminate_response = super::connection_handling_websocket::read_response_for_id(&mut ws, 4).await?;
|
||||
assert_eq!(terminate_response.id, RequestId::Integer(4));
|
||||
|
||||
let response = super::connection_handling_websocket::read_response_for_id(&mut ws, 2).await?;
|
||||
let response: CommandExecResponse = to_response(response)?;
|
||||
assert_ne!(response.exit_code, 0);
|
||||
assert_eq!(response.stdout, "");
|
||||
assert_eq!(response.stderr, "");
|
||||
|
||||
process.kill().await.context("failed to stop websocket app-server process")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_pipe_streams_output_and_accepts_write() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
@@ -884,3 +963,32 @@ fn process_with_marker_exists(marker: &str) -> Result<bool> {
|
||||
let stdout = String::from_utf8(output.stdout).context("decode ps output")?;
|
||||
Ok(stdout.lines().any(|line| line.contains(marker)))
|
||||
}
|
||||
|
||||
fn create_command_exec_config_toml(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
approval_policy: &str,
|
||||
sandbox_mode: &str,
|
||||
use_exec_server: bool,
|
||||
) -> std::io::Result<()> {
|
||||
let mut config = format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "{approval_policy}"
|
||||
sandbox_mode = "{sandbox_mode}"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
);
|
||||
if use_exec_server {
|
||||
config.push_str("\nexperimental_unified_exec_use_exec_server = true\n");
|
||||
}
|
||||
std::fs::write(codex_home.join("config.toml"), config)
|
||||
}
|
||||
|
||||
@@ -126,6 +126,7 @@ mod tools;
|
||||
pub mod turn_diff_tracker;
|
||||
mod turn_metadata;
|
||||
mod turn_timing;
|
||||
pub use unified_exec::ExecutorBackends;
|
||||
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
|
||||
pub use rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
pub use rollout::RolloutRecorder;
|
||||
@@ -138,6 +139,7 @@ pub use rollout::find_archived_thread_path_by_id_str;
|
||||
pub use rollout::find_conversation_path_by_id_str;
|
||||
pub use rollout::find_thread_name_by_id;
|
||||
pub use rollout::find_thread_path_by_id_str;
|
||||
pub use unified_exec::executor_backends_for_config;
|
||||
pub use rollout::find_thread_path_by_name_str;
|
||||
pub use rollout::list::Cursor;
|
||||
pub use rollout::list::ThreadItem;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -25,6 +26,12 @@ pub(crate) type UnifiedExecSessionFactoryHandle = Arc<dyn UnifiedExecSessionFact
|
||||
pub(crate) struct SessionExecutionBackends {
|
||||
pub(crate) unified_exec_session_factory: UnifiedExecSessionFactoryHandle,
|
||||
pub(crate) environment: Arc<Environment>,
|
||||
pub(crate) exec_server_client: Option<ExecServerClient>,
|
||||
}
|
||||
|
||||
pub struct ExecutorBackends {
|
||||
pub environment: Arc<Environment>,
|
||||
pub exec_server_client: Option<ExecServerClient>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -157,6 +164,7 @@ pub(crate) async fn session_execution_backends_for_config(
|
||||
return Ok(SessionExecutionBackends {
|
||||
unified_exec_session_factory: local_unified_exec_session_factory(),
|
||||
environment: Arc::new(Environment::default()),
|
||||
exec_server_client: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -179,9 +187,10 @@ pub(crate) async fn session_execution_backends_for_config(
|
||||
spawn_local_exec_server(command, ExecServerClientConnectOptions::default())
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
return Ok(exec_server_backends_from_spawned_server(Arc::new(
|
||||
spawned_server,
|
||||
), path_mapper));
|
||||
return Ok(exec_server_backends_from_spawned_server(
|
||||
Arc::new(spawned_server),
|
||||
path_mapper,
|
||||
));
|
||||
}
|
||||
|
||||
let client = ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default())
|
||||
@@ -190,6 +199,29 @@ pub(crate) async fn session_execution_backends_for_config(
|
||||
Ok(exec_server_backends_from_client(client, path_mapper))
|
||||
}
|
||||
|
||||
pub async fn executor_environment_for_config(
|
||||
config: &Config,
|
||||
local_exec_server_command: Option<ExecServerLaunchCommand>,
|
||||
) -> io::Result<Arc<Environment>> {
|
||||
session_execution_backends_for_config(config, local_exec_server_command)
|
||||
.await
|
||||
.map(|backends| backends.environment)
|
||||
.map_err(|err| io::Error::other(err.to_string()))
|
||||
}
|
||||
|
||||
pub async fn executor_backends_for_config(
|
||||
config: &Config,
|
||||
local_exec_server_command: Option<ExecServerLaunchCommand>,
|
||||
) -> io::Result<ExecutorBackends> {
|
||||
session_execution_backends_for_config(config, local_exec_server_command)
|
||||
.await
|
||||
.map(|backends| ExecutorBackends {
|
||||
environment: backends.environment,
|
||||
exec_server_client: backends.exec_server_client,
|
||||
})
|
||||
.map_err(|err| io::Error::other(err.to_string()))
|
||||
}
|
||||
|
||||
fn default_local_exec_server_command() -> ExecServerLaunchCommand {
|
||||
let binary_name = if cfg!(windows) {
|
||||
"codex-exec-server.exe"
|
||||
@@ -217,9 +249,10 @@ fn exec_server_backends_from_client(
|
||||
path_mapper.clone(),
|
||||
),
|
||||
environment: Arc::new(Environment::new(Arc::new(ExecServerFileSystem::new(
|
||||
client,
|
||||
client.clone(),
|
||||
path_mapper,
|
||||
)))),
|
||||
exec_server_client: Some(client),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,6 +269,7 @@ fn exec_server_backends_from_spawned_server(
|
||||
spawned_server.client().clone(),
|
||||
path_mapper,
|
||||
)))),
|
||||
exec_server_client: Some(spawned_server.client().clone()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,8 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
|
||||
process_manager::set_deterministic_process_ids_for_tests(enabled);
|
||||
}
|
||||
|
||||
pub use backend::ExecutorBackends;
|
||||
pub use backend::executor_backends_for_config;
|
||||
pub(crate) use backend::UnifiedExecSessionFactoryHandle;
|
||||
pub(crate) use backend::local_unified_exec_session_factory;
|
||||
pub(crate) use backend::session_execution_backends_for_config;
|
||||
|
||||
Reference in New Issue
Block a user