mirror of
https://github.com/openai/codex.git
synced 2026-03-21 13:26:30 +03:00
Compare commits
3 Commits
main
...
starr/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6c9f12a40 | ||
|
|
f5ffc0373d | ||
|
|
275be5a38d |
@@ -33,6 +33,12 @@ use crate::spawn::StdioPolicy;
|
||||
use crate::spawn::spawn_child_async;
|
||||
use crate::text_encoding::bytes_to_string_smart;
|
||||
use crate::tools::sandboxing::SandboxablePreference;
|
||||
use codex_exec_server::Environment as ExecutorEnvironment;
|
||||
use codex_exec_server::ExecOutputStream as ExecutorOutputStream;
|
||||
use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ProcessOutputChunk as ExecutorProcessOutputChunk;
|
||||
use codex_exec_server::ReadParams as ExecutorReadParams;
|
||||
use codex_exec_server::ShellExecParams as ExecutorShellExecParams;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
#[cfg(any(target_os = "windows", test))]
|
||||
use codex_protocol::permissions::FileSystemSandboxKind;
|
||||
@@ -40,7 +46,6 @@ use codex_protocol::permissions::FileSystemSandboxPolicy;
|
||||
use codex_protocol::permissions::NetworkSandboxPolicy;
|
||||
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
|
||||
use codex_utils_pty::process_group::kill_child_process_group;
|
||||
|
||||
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
|
||||
|
||||
// Hardcode these since it does not seem worth including the libc crate just
|
||||
@@ -367,6 +372,105 @@ pub(crate) async fn execute_exec_request(
|
||||
finalize_exec_result(raw_output_result, sandbox, duration)
|
||||
}
|
||||
|
||||
pub(crate) async fn execute_exec_request_in_environment(
|
||||
exec_request: ExecRequest,
|
||||
environment: &ExecutorEnvironment,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
after_spawn: Option<Box<dyn FnOnce() + Send>>,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
if environment.experimental_exec_server_url().is_none() {
|
||||
let effective_policy = exec_request.sandbox_policy.clone();
|
||||
return execute_exec_request(exec_request, &effective_policy, stdout_stream, after_spawn)
|
||||
.await;
|
||||
}
|
||||
|
||||
execute_exec_request_via_environment(exec_request, environment, stdout_stream, after_spawn)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn execute_exec_request_via_environment(
|
||||
exec_request: ExecRequest,
|
||||
environment: &ExecutorEnvironment,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
after_spawn: Option<Box<dyn FnOnce() + Send>>,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
let ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
mut env,
|
||||
network,
|
||||
expiration,
|
||||
capture_policy,
|
||||
sandbox,
|
||||
windows_sandbox_level: _,
|
||||
windows_sandbox_private_desktop: _,
|
||||
sandbox_permissions: _,
|
||||
sandbox_policy: _,
|
||||
file_system_sandbox_policy: _,
|
||||
network_sandbox_policy: _,
|
||||
justification: _,
|
||||
arg0,
|
||||
} = exec_request;
|
||||
|
||||
if matches!(expiration, ExecExpiration::Cancellation(_)) {
|
||||
return Err(CodexErr::Io(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"remote shell/exec does not yet support cancellation-backed expiration",
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(network) = network.as_ref() {
|
||||
network.apply_to_env(&mut env);
|
||||
}
|
||||
|
||||
let params = ExecutorShellExecParams {
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
timeout_ms: expiration.timeout_ms(),
|
||||
output_bytes_cap: capture_policy.retained_bytes_cap(),
|
||||
arg0,
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
let response = environment
|
||||
.shell_exec(params)
|
||||
.await
|
||||
.map_err(exec_server_error_to_codex)?;
|
||||
if let Some(after_spawn) = after_spawn {
|
||||
after_spawn();
|
||||
}
|
||||
if let Some(stream) = stdout_stream.as_ref() {
|
||||
let stdout = response.stdout.clone().into_inner();
|
||||
if !stdout.is_empty() {
|
||||
emit_output_delta(Some(stream), /*is_stderr*/ false, stdout).await;
|
||||
}
|
||||
let stderr = response.stderr.clone().into_inner();
|
||||
if !stderr.is_empty() {
|
||||
emit_output_delta(Some(stream), /*is_stderr*/ true, stderr).await;
|
||||
}
|
||||
}
|
||||
|
||||
let raw_output_result = Ok(RawExecToolCallOutput {
|
||||
exit_status: synthetic_exit_status(response.exit_code),
|
||||
stdout: StreamOutput {
|
||||
text: response.stdout.into_inner(),
|
||||
truncated_after_lines: None,
|
||||
},
|
||||
stderr: StreamOutput {
|
||||
text: response.stderr.into_inner(),
|
||||
truncated_after_lines: None,
|
||||
},
|
||||
aggregated_output: StreamOutput {
|
||||
text: response.aggregated_output.into_inner(),
|
||||
truncated_after_lines: None,
|
||||
},
|
||||
timed_out: response.timed_out,
|
||||
});
|
||||
let duration = start.elapsed();
|
||||
finalize_exec_result(raw_output_result, sandbox, duration)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
fn extract_create_process_as_user_error_code(err: &str) -> Option<String> {
|
||||
let marker = "CreateProcessAsUserW failed: ";
|
||||
@@ -1027,6 +1131,143 @@ async fn consume_output(
|
||||
})
|
||||
}
|
||||
|
||||
async fn consume_exec_server_output(
|
||||
executor: std::sync::Arc<dyn ExecProcess>,
|
||||
process_id: &str,
|
||||
expiration: ExecExpiration,
|
||||
capture_policy: ExecCapturePolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<RawExecToolCallOutput> {
|
||||
let retained_bytes_cap = capture_policy.retained_bytes_cap();
|
||||
let mut stdout = Vec::with_capacity(
|
||||
retained_bytes_cap.map_or(AGGREGATE_BUFFER_INITIAL_CAPACITY, |max_bytes| {
|
||||
AGGREGATE_BUFFER_INITIAL_CAPACITY.min(max_bytes)
|
||||
}),
|
||||
);
|
||||
let mut stderr = Vec::with_capacity(stdout.capacity());
|
||||
let mut after_seq = None;
|
||||
let mut exit_status = None;
|
||||
let mut timed_out = false;
|
||||
let mut emitted_deltas = 0usize;
|
||||
|
||||
let expiration_wait = async {
|
||||
if capture_policy.uses_expiration() {
|
||||
expiration.wait().await;
|
||||
} else {
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
};
|
||||
tokio::pin!(expiration_wait);
|
||||
|
||||
loop {
|
||||
let read_future = executor.read(ExecutorReadParams {
|
||||
process_id: process_id.to_string(),
|
||||
after_seq,
|
||||
max_bytes: Some(READ_CHUNK_SIZE),
|
||||
wait_ms: Some(50),
|
||||
});
|
||||
tokio::pin!(read_future);
|
||||
|
||||
let read_response = tokio::select! {
|
||||
response = &mut read_future => response.map_err(exec_server_error_to_codex)?,
|
||||
_ = &mut expiration_wait => {
|
||||
timed_out = true;
|
||||
let _ = executor.terminate(process_id).await;
|
||||
break;
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
let _ = executor.terminate(process_id).await;
|
||||
exit_status = Some(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE));
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
after_seq = Some(read_response.next_seq.saturating_sub(1));
|
||||
append_exec_server_chunks(
|
||||
read_response.chunks,
|
||||
&mut stdout,
|
||||
&mut stderr,
|
||||
retained_bytes_cap,
|
||||
stdout_stream.as_ref(),
|
||||
&mut emitted_deltas,
|
||||
)
|
||||
.await;
|
||||
|
||||
if read_response.exited {
|
||||
exit_status = Some(synthetic_exit_status(read_response.exit_code.unwrap_or(-1)));
|
||||
loop {
|
||||
let drain_response = executor
|
||||
.read(ExecutorReadParams {
|
||||
process_id: process_id.to_string(),
|
||||
after_seq,
|
||||
max_bytes: Some(READ_CHUNK_SIZE),
|
||||
wait_ms: Some(0),
|
||||
})
|
||||
.await
|
||||
.map_err(exec_server_error_to_codex)?;
|
||||
if drain_response.chunks.is_empty() {
|
||||
break;
|
||||
}
|
||||
after_seq = Some(drain_response.next_seq.saturating_sub(1));
|
||||
append_exec_server_chunks(
|
||||
drain_response.chunks,
|
||||
&mut stdout,
|
||||
&mut stderr,
|
||||
retained_bytes_cap,
|
||||
stdout_stream.as_ref(),
|
||||
&mut emitted_deltas,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let stdout = StreamOutput {
|
||||
text: stdout,
|
||||
truncated_after_lines: None,
|
||||
};
|
||||
let stderr = StreamOutput {
|
||||
text: stderr,
|
||||
truncated_after_lines: None,
|
||||
};
|
||||
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
|
||||
|
||||
Ok(RawExecToolCallOutput {
|
||||
exit_status: exit_status
|
||||
.unwrap_or_else(|| synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE)),
|
||||
stdout,
|
||||
stderr,
|
||||
aggregated_output,
|
||||
timed_out,
|
||||
})
|
||||
}
|
||||
|
||||
async fn append_exec_server_chunks(
|
||||
chunks: Vec<ExecutorProcessOutputChunk>,
|
||||
stdout: &mut Vec<u8>,
|
||||
stderr: &mut Vec<u8>,
|
||||
retained_bytes_cap: Option<usize>,
|
||||
stdout_stream: Option<&StdoutStream>,
|
||||
emitted_deltas: &mut usize,
|
||||
) {
|
||||
for chunk in chunks {
|
||||
let bytes = chunk.chunk.into_inner();
|
||||
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
|
||||
if *emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
|
||||
emit_output_delta(stdout_stream, is_stderr, bytes.clone()).await;
|
||||
*emitted_deltas += 1;
|
||||
}
|
||||
|
||||
match chunk.stream {
|
||||
ExecutorOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
|
||||
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
|
||||
append_with_cap(stdout, &bytes, retained_bytes_cap)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
|
||||
mut reader: R,
|
||||
stream: Option<StdoutStream>,
|
||||
@@ -1050,22 +1291,7 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
|
||||
if let Some(stream) = &stream
|
||||
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
|
||||
{
|
||||
let chunk = tmp[..n].to_vec();
|
||||
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
|
||||
call_id: stream.call_id.clone(),
|
||||
stream: if is_stderr {
|
||||
ExecOutputStream::Stderr
|
||||
} else {
|
||||
ExecOutputStream::Stdout
|
||||
},
|
||||
chunk,
|
||||
});
|
||||
let event = Event {
|
||||
id: stream.sub_id.clone(),
|
||||
msg,
|
||||
};
|
||||
#[allow(clippy::let_unit_value)]
|
||||
let _ = stream.tx_event.send(event).await;
|
||||
emit_output_delta(Some(stream), is_stderr, tmp[..n].to_vec()).await;
|
||||
emitted_deltas += 1;
|
||||
}
|
||||
|
||||
@@ -1083,6 +1309,40 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
|
||||
})
|
||||
}
|
||||
|
||||
async fn emit_output_delta(stream: Option<&StdoutStream>, is_stderr: bool, chunk: Vec<u8>) {
|
||||
let Some(stream) = stream else {
|
||||
return;
|
||||
};
|
||||
|
||||
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
|
||||
call_id: stream.call_id.clone(),
|
||||
stream: if is_stderr {
|
||||
ExecOutputStream::Stderr
|
||||
} else {
|
||||
ExecOutputStream::Stdout
|
||||
},
|
||||
chunk,
|
||||
});
|
||||
let event = Event {
|
||||
id: stream.sub_id.clone(),
|
||||
msg,
|
||||
};
|
||||
#[allow(clippy::let_unit_value)]
|
||||
let _ = stream.tx_event.send(event).await;
|
||||
}
|
||||
|
||||
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
|
||||
if let Some(max_bytes) = max_bytes {
|
||||
append_capped(dst, src, max_bytes);
|
||||
} else {
|
||||
dst.extend_from_slice(src);
|
||||
}
|
||||
}
|
||||
|
||||
fn exec_server_error_to_codex(err: codex_exec_server::ExecServerError) -> CodexErr {
|
||||
CodexErr::Io(io::Error::other(err.to_string()))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn synthetic_exit_status(code: i32) -> ExitStatus {
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use codex_exec_server::Environment as ExecutorEnvironment;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
@@ -199,6 +200,58 @@ async fn read_output_retains_all_bytes_for_full_buffer_capture() {
|
||||
assert_eq!(out.text.len(), expected_len);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn consume_exec_server_output_collects_split_streams() -> anyhow::Result<()> {
|
||||
let environment = ExecutorEnvironment::create(None).await?;
|
||||
let executor = environment.get_executor();
|
||||
let process_id = format!("exec-test-{}", uuid::Uuid::new_v4());
|
||||
|
||||
#[cfg(windows)]
|
||||
let argv = vec![
|
||||
"powershell.exe".to_string(),
|
||||
"-NonInteractive".to_string(),
|
||||
"-NoLogo".to_string(),
|
||||
"-Command".to_string(),
|
||||
"[Console]::Out.Write('hello'); [Console]::Error.Write('oops')".to_string(),
|
||||
];
|
||||
#[cfg(not(windows))]
|
||||
let argv = vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"printf hello; printf oops >&2".to_string(),
|
||||
];
|
||||
|
||||
executor
|
||||
.start(codex_exec_server::ExecParams {
|
||||
process_id: process_id.clone(),
|
||||
argv,
|
||||
cwd: std::env::current_dir()?,
|
||||
env: HashMap::new(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let output = consume_exec_server_output(
|
||||
executor,
|
||||
&process_id,
|
||||
5_000.into(),
|
||||
ExecCapturePolicy::ShellTool,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(output.exit_status.code(), Some(0));
|
||||
assert_eq!(bytes_to_string_smart(&output.stdout.text), "hello");
|
||||
assert_eq!(bytes_to_string_smart(&output.stderr.text), "oops");
|
||||
assert_eq!(
|
||||
bytes_to_string_smart(&output.aggregated_output.text),
|
||||
"hellooops"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aggregate_output_keeps_all_bytes_when_uncapped() {
|
||||
let stdout = StreamOutput {
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StdoutStream;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec::execute_exec_request;
|
||||
use crate::exec::execute_exec_request_in_environment;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::parse_command::parse_command;
|
||||
use crate::protocol::EventMsg;
|
||||
@@ -187,9 +187,9 @@ pub(crate) async fn execute_user_shell_command(
|
||||
tx_event: session.get_tx_event(),
|
||||
});
|
||||
|
||||
let exec_result = execute_exec_request(
|
||||
let exec_result = execute_exec_request_in_environment(
|
||||
exec_env,
|
||||
&sandbox_policy,
|
||||
session.services.environment.as_ref(),
|
||||
stdout_stream,
|
||||
/*after_spawn*/ None,
|
||||
)
|
||||
|
||||
@@ -10,12 +10,12 @@ pub(crate) mod zsh_fork_backend;
|
||||
|
||||
use crate::command_canonicalization::canonicalize_command_for_approval;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::execute_exec_request_in_environment;
|
||||
use crate::guardian::GuardianApprovalRequest;
|
||||
use crate::guardian::review_approval_request;
|
||||
use crate::guardian::routes_approval_to_guardian;
|
||||
use crate::powershell::prefix_powershell_script_with_utf8;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::sandboxing::execute_env;
|
||||
use crate::shell::ShellType;
|
||||
use crate::tools::network_approval::NetworkApprovalMode;
|
||||
use crate::tools::network_approval::NetworkApprovalSpec;
|
||||
@@ -231,8 +231,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
|
||||
} else {
|
||||
command
|
||||
};
|
||||
let environment = ctx.session.services.environment.as_ref();
|
||||
let remote_exec_server_enabled = environment.experimental_exec_server_url().is_some();
|
||||
|
||||
if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
|
||||
if self.backend == ShellRuntimeBackend::ShellCommandZshFork && !remote_exec_server_enabled {
|
||||
match zsh_fork_backend::maybe_run_shell_command(req, attempt, ctx, &command).await? {
|
||||
Some(out) => return Ok(out),
|
||||
None => {
|
||||
@@ -241,6 +243,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
|
||||
tracing::warn!(
|
||||
"ZshFork backend specified, but exec-server environments require the standard shell runtime path; falling back to normal execution",
|
||||
);
|
||||
}
|
||||
|
||||
let spec = build_command_spec(
|
||||
@@ -255,9 +261,14 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
|
||||
let env = attempt
|
||||
.env_for(spec, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
let out = execute_env(env, Self::stdout_stream(ctx))
|
||||
.await
|
||||
.map_err(ToolError::Codex)?;
|
||||
let out = execute_exec_request_in_environment(
|
||||
env,
|
||||
environment,
|
||||
Self::stdout_stream(ctx),
|
||||
/*after_spawn*/ None,
|
||||
)
|
||||
.await
|
||||
.map_err(ToolError::Codex)?;
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,9 @@ use crate::protocol::InitializeParams;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::SHELL_EXEC_METHOD;
|
||||
use crate::protocol::ShellExecParams;
|
||||
use crate::protocol::ShellExecResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
@@ -202,6 +205,17 @@ impl ExecServerClient {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub async fn shell_exec(
|
||||
&self,
|
||||
params: ShellExecParams,
|
||||
) -> Result<ShellExecResponse, ExecServerError> {
|
||||
self.inner
|
||||
.client
|
||||
.call(SHELL_EXEC_METHOD, ¶ms)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub async fn write(
|
||||
&self,
|
||||
process_id: &str,
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::sync::Arc;
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::RemoteExecServerConnectArgs;
|
||||
use crate::ShellExecParams;
|
||||
use crate::ShellExecResponse;
|
||||
use crate::file_system::ExecutorFileSystem;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::local_process::LocalProcess;
|
||||
@@ -104,6 +106,16 @@ impl Environment {
|
||||
Arc::new(LocalFileSystem)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shell_exec(
|
||||
&self,
|
||||
params: ShellExecParams,
|
||||
) -> Result<ShellExecResponse, ExecServerError> {
|
||||
let client = self.remote_exec_server_client.clone().ok_or_else(|| {
|
||||
ExecServerError::Protocol("remote exec-server client is not configured".to_string())
|
||||
})?;
|
||||
client.shell_exec(params).await
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutorEnvironment for Environment {
|
||||
|
||||
@@ -50,6 +50,8 @@ pub use protocol::InitializeParams;
|
||||
pub use protocol::InitializeResponse;
|
||||
pub use protocol::ReadParams;
|
||||
pub use protocol::ReadResponse;
|
||||
pub use protocol::ShellExecParams;
|
||||
pub use protocol::ShellExecResponse;
|
||||
pub use protocol::TerminateParams;
|
||||
pub use protocol::TerminateResponse;
|
||||
pub use protocol::WriteParams;
|
||||
|
||||
@@ -13,6 +13,7 @@ pub const EXEC_WRITE_METHOD: &str = "process/write";
|
||||
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
|
||||
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
|
||||
pub const EXEC_EXITED_METHOD: &str = "process/exited";
|
||||
pub const SHELL_EXEC_METHOD: &str = "shell/exec";
|
||||
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
|
||||
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
|
||||
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
|
||||
@@ -140,6 +141,27 @@ pub struct ExecExitedNotification {
|
||||
pub exit_code: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ShellExecParams {
|
||||
pub command: Vec<String>,
|
||||
pub cwd: PathBuf,
|
||||
pub env: HashMap<String, String>,
|
||||
pub timeout_ms: Option<u64>,
|
||||
pub output_bytes_cap: Option<usize>,
|
||||
pub arg0: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ShellExecResponse {
|
||||
pub exit_code: i32,
|
||||
pub stdout: ByteChunk,
|
||||
pub stderr: ByteChunk,
|
||||
pub aggregated_output: ByteChunk,
|
||||
pub timed_out: bool,
|
||||
}
|
||||
|
||||
mod base64_bytes {
|
||||
use super::BASE64_STANDARD;
|
||||
use base64::Engine as _;
|
||||
|
||||
@@ -3,6 +3,7 @@ mod handler;
|
||||
mod process_handler;
|
||||
mod processor;
|
||||
mod registry;
|
||||
mod shell_exec_handler;
|
||||
mod transport;
|
||||
|
||||
pub(crate) use handler::ExecServerHandler;
|
||||
|
||||
@@ -19,6 +19,8 @@ use crate::protocol::ExecResponse;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::ShellExecParams;
|
||||
use crate::protocol::ShellExecResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
@@ -26,17 +28,21 @@ use crate::protocol::WriteResponse;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::server::file_system_handler::FileSystemHandler;
|
||||
use crate::server::process_handler::ProcessHandler;
|
||||
use crate::server::shell_exec_handler::ShellExecHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ExecServerHandler {
|
||||
process: ProcessHandler,
|
||||
shell_exec: ShellExecHandler,
|
||||
file_system: FileSystemHandler,
|
||||
}
|
||||
|
||||
impl ExecServerHandler {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
let process = ProcessHandler::new(notifications);
|
||||
Self {
|
||||
process: ProcessHandler::new(notifications),
|
||||
shell_exec: ShellExecHandler::new(process.clone()),
|
||||
process,
|
||||
file_system: FileSystemHandler::default(),
|
||||
}
|
||||
}
|
||||
@@ -78,6 +84,13 @@ impl ExecServerHandler {
|
||||
self.process.terminate(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn shell_exec(
|
||||
&self,
|
||||
params: ShellExecParams,
|
||||
) -> Result<ShellExecResponse, JSONRPCErrorError> {
|
||||
self.shell_exec.exec(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_read_file(
|
||||
&self,
|
||||
params: FsReadFileParams,
|
||||
|
||||
@@ -16,6 +16,8 @@ use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::SHELL_EXEC_METHOD;
|
||||
use crate::protocol::ShellExecParams;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::rpc::RpcRouter;
|
||||
@@ -64,6 +66,12 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
|
||||
handler.terminate(params).await
|
||||
},
|
||||
);
|
||||
router.request(
|
||||
SHELL_EXEC_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: ShellExecParams| async move {
|
||||
handler.shell_exec(params).await
|
||||
},
|
||||
);
|
||||
router.request(
|
||||
FS_READ_FILE_METHOD,
|
||||
|handler: Arc<ExecServerHandler>, params: FsReadFileParams| async move {
|
||||
|
||||
191
codex-rs/exec-server/src/server/shell_exec_handler.rs
Normal file
191
codex-rs/exec-server/src/server/shell_exec_handler.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ShellExecParams;
|
||||
use crate::protocol::ShellExecResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::rpc::invalid_params;
|
||||
use crate::server::process_handler::ProcessHandler;
|
||||
|
||||
const DEFAULT_EXEC_TIMEOUT_MS: u64 = 10_000;
|
||||
const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
|
||||
const READ_CHUNK_SIZE: usize = 8192;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ShellExecHandler {
|
||||
process: ProcessHandler,
|
||||
}
|
||||
|
||||
impl ShellExecHandler {
|
||||
pub(crate) fn new(process: ProcessHandler) -> Self {
|
||||
Self { process }
|
||||
}
|
||||
|
||||
pub(crate) async fn exec(
|
||||
&self,
|
||||
params: ShellExecParams,
|
||||
) -> Result<ShellExecResponse, JSONRPCErrorError> {
|
||||
self.process.require_initialized_for("shell execution")?;
|
||||
|
||||
if params.command.is_empty() {
|
||||
return Err(invalid_params("command must not be empty".to_string()));
|
||||
}
|
||||
|
||||
let process_id = format!("shell-exec-{}", Uuid::new_v4());
|
||||
self.process
|
||||
.exec(ExecParams {
|
||||
process_id: process_id.clone(),
|
||||
argv: params.command,
|
||||
cwd: params.cwd,
|
||||
env: params.env,
|
||||
tty: false,
|
||||
arg0: params.arg0,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let retained_bytes_cap = params.output_bytes_cap;
|
||||
let timeout = Duration::from_millis(params.timeout_ms.unwrap_or(DEFAULT_EXEC_TIMEOUT_MS));
|
||||
let expiration_wait = tokio::time::sleep(timeout);
|
||||
tokio::pin!(expiration_wait);
|
||||
|
||||
let mut stdout = Vec::with_capacity(
|
||||
retained_bytes_cap.map_or(READ_CHUNK_SIZE, |max_bytes| READ_CHUNK_SIZE.min(max_bytes)),
|
||||
);
|
||||
let mut stderr = Vec::with_capacity(stdout.capacity());
|
||||
let mut after_seq = None;
|
||||
let mut exit_code = None;
|
||||
let mut timed_out = false;
|
||||
|
||||
loop {
|
||||
let read_future = self.process.exec_read(ReadParams {
|
||||
process_id: process_id.clone(),
|
||||
after_seq,
|
||||
max_bytes: Some(READ_CHUNK_SIZE),
|
||||
wait_ms: Some(50),
|
||||
});
|
||||
tokio::pin!(read_future);
|
||||
|
||||
let read_response = tokio::select! {
|
||||
response = &mut read_future => response?,
|
||||
_ = &mut expiration_wait => {
|
||||
timed_out = true;
|
||||
let _ = self.process.terminate(TerminateParams {
|
||||
process_id: process_id.clone(),
|
||||
}).await;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
after_seq = Some(read_response.next_seq.saturating_sub(1));
|
||||
append_process_output(
|
||||
read_response.chunks,
|
||||
&mut stdout,
|
||||
&mut stderr,
|
||||
retained_bytes_cap,
|
||||
);
|
||||
|
||||
if read_response.exited {
|
||||
exit_code = Some(read_response.exit_code.unwrap_or(-1));
|
||||
loop {
|
||||
let drain_response = self
|
||||
.process
|
||||
.exec_read(ReadParams {
|
||||
process_id: process_id.clone(),
|
||||
after_seq,
|
||||
max_bytes: Some(READ_CHUNK_SIZE),
|
||||
wait_ms: Some(0),
|
||||
})
|
||||
.await?;
|
||||
if drain_response.chunks.is_empty() {
|
||||
break;
|
||||
}
|
||||
after_seq = Some(drain_response.next_seq.saturating_sub(1));
|
||||
append_process_output(
|
||||
drain_response.chunks,
|
||||
&mut stdout,
|
||||
&mut stderr,
|
||||
retained_bytes_cap,
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
|
||||
Ok(ShellExecResponse {
|
||||
exit_code: exit_code.unwrap_or(EXEC_TIMEOUT_EXIT_CODE),
|
||||
stdout: stdout.into(),
|
||||
stderr: stderr.into(),
|
||||
aggregated_output: aggregated_output.into(),
|
||||
timed_out,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn append_process_output(
|
||||
chunks: Vec<crate::protocol::ProcessOutputChunk>,
|
||||
stdout: &mut Vec<u8>,
|
||||
stderr: &mut Vec<u8>,
|
||||
retained_bytes_cap: Option<usize>,
|
||||
) {
|
||||
for chunk in chunks {
|
||||
let bytes = chunk.chunk.into_inner();
|
||||
match chunk.stream {
|
||||
ExecOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
|
||||
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
|
||||
append_with_cap(stdout, &bytes, retained_bytes_cap)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
|
||||
if let Some(max_bytes) = max_bytes {
|
||||
append_capped(dst, src, max_bytes);
|
||||
} else {
|
||||
dst.extend_from_slice(src);
|
||||
}
|
||||
}
|
||||
|
||||
fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
|
||||
if dst.len() >= max_bytes {
|
||||
return;
|
||||
}
|
||||
let remaining = max_bytes.saturating_sub(dst.len());
|
||||
let take = remaining.min(src.len());
|
||||
dst.extend_from_slice(&src[..take]);
|
||||
}
|
||||
|
||||
fn aggregate_output(stdout: &[u8], stderr: &[u8], max_bytes: Option<usize>) -> Vec<u8> {
|
||||
let Some(max_bytes) = max_bytes else {
|
||||
let total_len = stdout.len().saturating_add(stderr.len());
|
||||
let mut aggregated = Vec::with_capacity(total_len);
|
||||
aggregated.extend_from_slice(stdout);
|
||||
aggregated.extend_from_slice(stderr);
|
||||
return aggregated;
|
||||
};
|
||||
|
||||
let total_len = stdout.len().saturating_add(stderr.len());
|
||||
let mut aggregated = Vec::with_capacity(total_len.min(max_bytes));
|
||||
|
||||
if total_len <= max_bytes {
|
||||
aggregated.extend_from_slice(stdout);
|
||||
aggregated.extend_from_slice(stderr);
|
||||
return aggregated;
|
||||
}
|
||||
|
||||
let want_stdout = stdout.len().min(max_bytes / 3);
|
||||
let want_stderr = stderr.len();
|
||||
let stderr_take = want_stderr.min(max_bytes.saturating_sub(want_stdout));
|
||||
let remaining = max_bytes.saturating_sub(want_stdout + stderr_take);
|
||||
let stdout_take = want_stdout + remaining.min(stdout.len().saturating_sub(want_stdout));
|
||||
|
||||
aggregated.extend_from_slice(&stdout[..stdout_take]);
|
||||
aggregated.extend_from_slice(&stderr[..stderr_take]);
|
||||
aggregated
|
||||
}
|
||||
Reference in New Issue
Block a user