Compare commits

...

3 Commits

Author SHA1 Message Date
starr-openai
e6c9f12a40 Add a high-level remote shell exec RPC
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 17:40:35 -07:00
starr-openai
f5ffc0373d Cap remote shell output deltas
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 17:31:43 -07:00
starr-openai
275be5a38d Route shell commands through exec environments
Co-authored-by: Codex <noreply@openai.com>
2026-03-20 16:53:19 -07:00
12 changed files with 613 additions and 26 deletions

View File

@@ -33,6 +33,12 @@ use crate::spawn::StdioPolicy;
use crate::spawn::spawn_child_async;
use crate::text_encoding::bytes_to_string_smart;
use crate::tools::sandboxing::SandboxablePreference;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_exec_server::ExecOutputStream as ExecutorOutputStream;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessOutputChunk as ExecutorProcessOutputChunk;
use codex_exec_server::ReadParams as ExecutorReadParams;
use codex_exec_server::ShellExecParams as ExecutorShellExecParams;
use codex_network_proxy::NetworkProxy;
#[cfg(any(target_os = "windows", test))]
use codex_protocol::permissions::FileSystemSandboxKind;
@@ -40,7 +46,6 @@ use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::process_group::kill_child_process_group;
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
// Hardcode these since it does not seem worth including the libc crate just
@@ -367,6 +372,105 @@ pub(crate) async fn execute_exec_request(
finalize_exec_result(raw_output_result, sandbox, duration)
}
pub(crate) async fn execute_exec_request_in_environment(
exec_request: ExecRequest,
environment: &ExecutorEnvironment,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<ExecToolCallOutput> {
if environment.experimental_exec_server_url().is_none() {
let effective_policy = exec_request.sandbox_policy.clone();
return execute_exec_request(exec_request, &effective_policy, stdout_stream, after_spawn)
.await;
}
execute_exec_request_via_environment(exec_request, environment, stdout_stream, after_spawn)
.await
}
async fn execute_exec_request_via_environment(
exec_request: ExecRequest,
environment: &ExecutorEnvironment,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<ExecToolCallOutput> {
let ExecRequest {
command,
cwd,
mut env,
network,
expiration,
capture_policy,
sandbox,
windows_sandbox_level: _,
windows_sandbox_private_desktop: _,
sandbox_permissions: _,
sandbox_policy: _,
file_system_sandbox_policy: _,
network_sandbox_policy: _,
justification: _,
arg0,
} = exec_request;
if matches!(expiration, ExecExpiration::Cancellation(_)) {
return Err(CodexErr::Io(io::Error::new(
io::ErrorKind::Unsupported,
"remote shell/exec does not yet support cancellation-backed expiration",
)));
}
if let Some(network) = network.as_ref() {
network.apply_to_env(&mut env);
}
let params = ExecutorShellExecParams {
command,
cwd,
env,
timeout_ms: expiration.timeout_ms(),
output_bytes_cap: capture_policy.retained_bytes_cap(),
arg0,
};
let start = Instant::now();
let response = environment
.shell_exec(params)
.await
.map_err(exec_server_error_to_codex)?;
if let Some(after_spawn) = after_spawn {
after_spawn();
}
if let Some(stream) = stdout_stream.as_ref() {
let stdout = response.stdout.clone().into_inner();
if !stdout.is_empty() {
emit_output_delta(Some(stream), /*is_stderr*/ false, stdout).await;
}
let stderr = response.stderr.clone().into_inner();
if !stderr.is_empty() {
emit_output_delta(Some(stream), /*is_stderr*/ true, stderr).await;
}
}
let raw_output_result = Ok(RawExecToolCallOutput {
exit_status: synthetic_exit_status(response.exit_code),
stdout: StreamOutput {
text: response.stdout.into_inner(),
truncated_after_lines: None,
},
stderr: StreamOutput {
text: response.stderr.into_inner(),
truncated_after_lines: None,
},
aggregated_output: StreamOutput {
text: response.aggregated_output.into_inner(),
truncated_after_lines: None,
},
timed_out: response.timed_out,
});
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}
#[cfg(target_os = "windows")]
fn extract_create_process_as_user_error_code(err: &str) -> Option<String> {
let marker = "CreateProcessAsUserW failed: ";
@@ -1027,6 +1131,143 @@ async fn consume_output(
})
}
async fn consume_exec_server_output(
executor: std::sync::Arc<dyn ExecProcess>,
process_id: &str,
expiration: ExecExpiration,
capture_policy: ExecCapturePolicy,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
let retained_bytes_cap = capture_policy.retained_bytes_cap();
let mut stdout = Vec::with_capacity(
retained_bytes_cap.map_or(AGGREGATE_BUFFER_INITIAL_CAPACITY, |max_bytes| {
AGGREGATE_BUFFER_INITIAL_CAPACITY.min(max_bytes)
}),
);
let mut stderr = Vec::with_capacity(stdout.capacity());
let mut after_seq = None;
let mut exit_status = None;
let mut timed_out = false;
let mut emitted_deltas = 0usize;
let expiration_wait = async {
if capture_policy.uses_expiration() {
expiration.wait().await;
} else {
std::future::pending::<()>().await;
}
};
tokio::pin!(expiration_wait);
loop {
let read_future = executor.read(ExecutorReadParams {
process_id: process_id.to_string(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(50),
});
tokio::pin!(read_future);
let read_response = tokio::select! {
response = &mut read_future => response.map_err(exec_server_error_to_codex)?,
_ = &mut expiration_wait => {
timed_out = true;
let _ = executor.terminate(process_id).await;
break;
}
_ = tokio::signal::ctrl_c() => {
let _ = executor.terminate(process_id).await;
exit_status = Some(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE));
break;
}
};
after_seq = Some(read_response.next_seq.saturating_sub(1));
append_exec_server_chunks(
read_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
stdout_stream.as_ref(),
&mut emitted_deltas,
)
.await;
if read_response.exited {
exit_status = Some(synthetic_exit_status(read_response.exit_code.unwrap_or(-1)));
loop {
let drain_response = executor
.read(ExecutorReadParams {
process_id: process_id.to_string(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(0),
})
.await
.map_err(exec_server_error_to_codex)?;
if drain_response.chunks.is_empty() {
break;
}
after_seq = Some(drain_response.next_seq.saturating_sub(1));
append_exec_server_chunks(
drain_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
stdout_stream.as_ref(),
&mut emitted_deltas,
)
.await;
}
break;
}
}
let stdout = StreamOutput {
text: stdout,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: stderr,
truncated_after_lines: None,
};
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
Ok(RawExecToolCallOutput {
exit_status: exit_status
.unwrap_or_else(|| synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE)),
stdout,
stderr,
aggregated_output,
timed_out,
})
}
async fn append_exec_server_chunks(
chunks: Vec<ExecutorProcessOutputChunk>,
stdout: &mut Vec<u8>,
stderr: &mut Vec<u8>,
retained_bytes_cap: Option<usize>,
stdout_stream: Option<&StdoutStream>,
emitted_deltas: &mut usize,
) {
for chunk in chunks {
let bytes = chunk.chunk.into_inner();
let is_stderr = chunk.stream == ExecutorOutputStream::Stderr;
if *emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
emit_output_delta(stdout_stream, is_stderr, bytes.clone()).await;
*emitted_deltas += 1;
}
match chunk.stream {
ExecutorOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
ExecutorOutputStream::Stdout | ExecutorOutputStream::Pty => {
append_with_cap(stdout, &bytes, retained_bytes_cap)
}
}
}
}
async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
stream: Option<StdoutStream>,
@@ -1050,22 +1291,7 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
if let Some(stream) = &stream
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
{
let chunk = tmp[..n].to_vec();
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
let event = Event {
id: stream.sub_id.clone(),
msg,
};
#[allow(clippy::let_unit_value)]
let _ = stream.tx_event.send(event).await;
emit_output_delta(Some(stream), is_stderr, tmp[..n].to_vec()).await;
emitted_deltas += 1;
}
@@ -1083,6 +1309,40 @@ async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
})
}
async fn emit_output_delta(stream: Option<&StdoutStream>, is_stderr: bool, chunk: Vec<u8>) {
let Some(stream) = stream else {
return;
};
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
let event = Event {
id: stream.sub_id.clone(),
msg,
};
#[allow(clippy::let_unit_value)]
let _ = stream.tx_event.send(event).await;
}
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
if let Some(max_bytes) = max_bytes {
append_capped(dst, src, max_bytes);
} else {
dst.extend_from_slice(src);
}
}
fn exec_server_error_to_codex(err: codex_exec_server::ExecServerError) -> CodexErr {
CodexErr::Io(io::Error::other(err.to_string()))
}
#[cfg(unix)]
fn synthetic_exit_status(code: i32) -> ExitStatus {
use std::os::unix::process::ExitStatusExt;

View File

@@ -1,4 +1,5 @@
use super::*;
use codex_exec_server::Environment as ExecutorEnvironment;
use codex_protocol::config_types::WindowsSandboxLevel;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
@@ -199,6 +200,58 @@ async fn read_output_retains_all_bytes_for_full_buffer_capture() {
assert_eq!(out.text.len(), expected_len);
}
#[tokio::test]
async fn consume_exec_server_output_collects_split_streams() -> anyhow::Result<()> {
let environment = ExecutorEnvironment::create(None).await?;
let executor = environment.get_executor();
let process_id = format!("exec-test-{}", uuid::Uuid::new_v4());
#[cfg(windows)]
let argv = vec![
"powershell.exe".to_string(),
"-NonInteractive".to_string(),
"-NoLogo".to_string(),
"-Command".to_string(),
"[Console]::Out.Write('hello'); [Console]::Error.Write('oops')".to_string(),
];
#[cfg(not(windows))]
let argv = vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf hello; printf oops >&2".to_string(),
];
executor
.start(codex_exec_server::ExecParams {
process_id: process_id.clone(),
argv,
cwd: std::env::current_dir()?,
env: HashMap::new(),
tty: false,
arg0: None,
})
.await?;
let output = consume_exec_server_output(
executor,
&process_id,
5_000.into(),
ExecCapturePolicy::ShellTool,
None,
)
.await?;
assert_eq!(output.exit_status.code(), Some(0));
assert_eq!(bytes_to_string_smart(&output.stdout.text), "hello");
assert_eq!(bytes_to_string_smart(&output.stderr.text), "oops");
assert_eq!(
bytes_to_string_smart(&output.aggregated_output.text),
"hellooops"
);
Ok(())
}
#[test]
fn aggregate_output_keeps_all_bytes_when_uncapped() {
let stdout = StreamOutput {

View File

@@ -15,7 +15,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::execute_exec_request;
use crate::exec::execute_exec_request_in_environment;
use crate::exec_env::create_env;
use crate::parse_command::parse_command;
use crate::protocol::EventMsg;
@@ -187,9 +187,9 @@ pub(crate) async fn execute_user_shell_command(
tx_event: session.get_tx_event(),
});
let exec_result = execute_exec_request(
let exec_result = execute_exec_request_in_environment(
exec_env,
&sandbox_policy,
session.services.environment.as_ref(),
stdout_stream,
/*after_spawn*/ None,
)

View File

@@ -10,12 +10,12 @@ pub(crate) mod zsh_fork_backend;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec::ExecToolCallOutput;
use crate::exec::execute_exec_request_in_environment;
use crate::guardian::GuardianApprovalRequest;
use crate::guardian::review_approval_request;
use crate::guardian::routes_approval_to_guardian;
use crate::powershell::prefix_powershell_script_with_utf8;
use crate::sandboxing::SandboxPermissions;
use crate::sandboxing::execute_env;
use crate::shell::ShellType;
use crate::tools::network_approval::NetworkApprovalMode;
use crate::tools::network_approval::NetworkApprovalSpec;
@@ -231,8 +231,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
} else {
command
};
let environment = ctx.session.services.environment.as_ref();
let remote_exec_server_enabled = environment.experimental_exec_server_url().is_some();
if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
if self.backend == ShellRuntimeBackend::ShellCommandZshFork && !remote_exec_server_enabled {
match zsh_fork_backend::maybe_run_shell_command(req, attempt, ctx, &command).await? {
Some(out) => return Ok(out),
None => {
@@ -241,6 +243,10 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
);
}
}
} else if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
tracing::warn!(
"ZshFork backend specified, but exec-server environments require the standard shell runtime path; falling back to normal execution",
);
}
let spec = build_command_spec(
@@ -255,9 +261,14 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
let env = attempt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
let out = execute_env(env, Self::stdout_stream(ctx))
.await
.map_err(ToolError::Codex)?;
let out = execute_exec_request_in_environment(
env,
environment,
Self::stdout_stream(ctx),
/*after_spawn*/ None,
)
.await
.map_err(ToolError::Codex)?;
Ok(out)
}
}

View File

@@ -50,6 +50,9 @@ use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SHELL_EXEC_METHOD;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -202,6 +205,17 @@ impl ExecServerClient {
.map_err(Into::into)
}
pub async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, ExecServerError> {
self.inner
.client
.call(SHELL_EXEC_METHOD, &params)
.await
.map_err(Into::into)
}
pub async fn write(
&self,
process_id: &str,

View File

@@ -3,6 +3,8 @@ use std::sync::Arc;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::RemoteExecServerConnectArgs;
use crate::ShellExecParams;
use crate::ShellExecResponse;
use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
@@ -104,6 +106,16 @@ impl Environment {
Arc::new(LocalFileSystem)
}
}
pub async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, ExecServerError> {
let client = self.remote_exec_server_client.clone().ok_or_else(|| {
ExecServerError::Protocol("remote exec-server client is not configured".to_string())
})?;
client.shell_exec(params).await
}
}
impl ExecutorEnvironment for Environment {

View File

@@ -50,6 +50,8 @@ pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::ShellExecParams;
pub use protocol::ShellExecResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;

View File

@@ -13,6 +13,7 @@ pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
pub const SHELL_EXEC_METHOD: &str = "shell/exec";
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
@@ -140,6 +141,27 @@ pub struct ExecExitedNotification {
pub exit_code: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ShellExecParams {
pub command: Vec<String>,
pub cwd: PathBuf,
pub env: HashMap<String, String>,
pub timeout_ms: Option<u64>,
pub output_bytes_cap: Option<usize>,
pub arg0: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ShellExecResponse {
pub exit_code: i32,
pub stdout: ByteChunk,
pub stderr: ByteChunk,
pub aggregated_output: ByteChunk,
pub timed_out: bool,
}
mod base64_bytes {
use super::BASE64_STANDARD;
use base64::Engine as _;

View File

@@ -3,6 +3,7 @@ mod handler;
mod process_handler;
mod processor;
mod registry;
mod shell_exec_handler;
mod transport;
pub(crate) use handler::ExecServerHandler;

View File

@@ -19,6 +19,8 @@ use crate::protocol::ExecResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -26,17 +28,21 @@ use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::server::file_system_handler::FileSystemHandler;
use crate::server::process_handler::ProcessHandler;
use crate::server::shell_exec_handler::ShellExecHandler;
#[derive(Clone)]
pub(crate) struct ExecServerHandler {
process: ProcessHandler,
shell_exec: ShellExecHandler,
file_system: FileSystemHandler,
}
impl ExecServerHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
let process = ProcessHandler::new(notifications);
Self {
process: ProcessHandler::new(notifications),
shell_exec: ShellExecHandler::new(process.clone()),
process,
file_system: FileSystemHandler::default(),
}
}
@@ -78,6 +84,13 @@ impl ExecServerHandler {
self.process.terminate(params).await
}
pub(crate) async fn shell_exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, JSONRPCErrorError> {
self.shell_exec.exec(params).await
}
pub(crate) async fn fs_read_file(
&self,
params: FsReadFileParams,

View File

@@ -16,6 +16,8 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::ReadParams;
use crate::protocol::SHELL_EXEC_METHOD;
use crate::protocol::ShellExecParams;
use crate::protocol::TerminateParams;
use crate::protocol::WriteParams;
use crate::rpc::RpcRouter;
@@ -64,6 +66,12 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
handler.terminate(params).await
},
);
router.request(
SHELL_EXEC_METHOD,
|handler: Arc<ExecServerHandler>, params: ShellExecParams| async move {
handler.shell_exec(params).await
},
);
router.request(
FS_READ_FILE_METHOD,
|handler: Arc<ExecServerHandler>, params: FsReadFileParams| async move {

View File

@@ -0,0 +1,191 @@
use std::time::Duration;
use codex_app_server_protocol::JSONRPCErrorError;
use uuid::Uuid;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::ReadParams;
use crate::protocol::ShellExecParams;
use crate::protocol::ShellExecResponse;
use crate::protocol::TerminateParams;
use crate::rpc::invalid_params;
use crate::server::process_handler::ProcessHandler;
const DEFAULT_EXEC_TIMEOUT_MS: u64 = 10_000;
const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
const READ_CHUNK_SIZE: usize = 8192;
#[derive(Clone)]
pub(crate) struct ShellExecHandler {
process: ProcessHandler,
}
impl ShellExecHandler {
pub(crate) fn new(process: ProcessHandler) -> Self {
Self { process }
}
pub(crate) async fn exec(
&self,
params: ShellExecParams,
) -> Result<ShellExecResponse, JSONRPCErrorError> {
self.process.require_initialized_for("shell execution")?;
if params.command.is_empty() {
return Err(invalid_params("command must not be empty".to_string()));
}
let process_id = format!("shell-exec-{}", Uuid::new_v4());
self.process
.exec(ExecParams {
process_id: process_id.clone(),
argv: params.command,
cwd: params.cwd,
env: params.env,
tty: false,
arg0: params.arg0,
})
.await?;
let retained_bytes_cap = params.output_bytes_cap;
let timeout = Duration::from_millis(params.timeout_ms.unwrap_or(DEFAULT_EXEC_TIMEOUT_MS));
let expiration_wait = tokio::time::sleep(timeout);
tokio::pin!(expiration_wait);
let mut stdout = Vec::with_capacity(
retained_bytes_cap.map_or(READ_CHUNK_SIZE, |max_bytes| READ_CHUNK_SIZE.min(max_bytes)),
);
let mut stderr = Vec::with_capacity(stdout.capacity());
let mut after_seq = None;
let mut exit_code = None;
let mut timed_out = false;
loop {
let read_future = self.process.exec_read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(50),
});
tokio::pin!(read_future);
let read_response = tokio::select! {
response = &mut read_future => response?,
_ = &mut expiration_wait => {
timed_out = true;
let _ = self.process.terminate(TerminateParams {
process_id: process_id.clone(),
}).await;
break;
}
};
after_seq = Some(read_response.next_seq.saturating_sub(1));
append_process_output(
read_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
);
if read_response.exited {
exit_code = Some(read_response.exit_code.unwrap_or(-1));
loop {
let drain_response = self
.process
.exec_read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: Some(READ_CHUNK_SIZE),
wait_ms: Some(0),
})
.await?;
if drain_response.chunks.is_empty() {
break;
}
after_seq = Some(drain_response.next_seq.saturating_sub(1));
append_process_output(
drain_response.chunks,
&mut stdout,
&mut stderr,
retained_bytes_cap,
);
}
break;
}
}
let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);
Ok(ShellExecResponse {
exit_code: exit_code.unwrap_or(EXEC_TIMEOUT_EXIT_CODE),
stdout: stdout.into(),
stderr: stderr.into(),
aggregated_output: aggregated_output.into(),
timed_out,
})
}
}
fn append_process_output(
chunks: Vec<crate::protocol::ProcessOutputChunk>,
stdout: &mut Vec<u8>,
stderr: &mut Vec<u8>,
retained_bytes_cap: Option<usize>,
) {
for chunk in chunks {
let bytes = chunk.chunk.into_inner();
match chunk.stream {
ExecOutputStream::Stderr => append_with_cap(stderr, &bytes, retained_bytes_cap),
ExecOutputStream::Stdout | ExecOutputStream::Pty => {
append_with_cap(stdout, &bytes, retained_bytes_cap)
}
}
}
}
fn append_with_cap(dst: &mut Vec<u8>, src: &[u8], max_bytes: Option<usize>) {
if let Some(max_bytes) = max_bytes {
append_capped(dst, src, max_bytes);
} else {
dst.extend_from_slice(src);
}
}
fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
if dst.len() >= max_bytes {
return;
}
let remaining = max_bytes.saturating_sub(dst.len());
let take = remaining.min(src.len());
dst.extend_from_slice(&src[..take]);
}
fn aggregate_output(stdout: &[u8], stderr: &[u8], max_bytes: Option<usize>) -> Vec<u8> {
let Some(max_bytes) = max_bytes else {
let total_len = stdout.len().saturating_add(stderr.len());
let mut aggregated = Vec::with_capacity(total_len);
aggregated.extend_from_slice(stdout);
aggregated.extend_from_slice(stderr);
return aggregated;
};
let total_len = stdout.len().saturating_add(stderr.len());
let mut aggregated = Vec::with_capacity(total_len.min(max_bytes));
if total_len <= max_bytes {
aggregated.extend_from_slice(stdout);
aggregated.extend_from_slice(stderr);
return aggregated;
}
let want_stdout = stdout.len().min(max_bytes / 3);
let want_stderr = stderr.len();
let stderr_take = want_stderr.min(max_bytes.saturating_sub(want_stdout));
let remaining = max_bytes.saturating_sub(want_stdout + stderr_take);
let stdout_take = want_stdout + remaining.min(stdout.len().saturating_sub(want_stdout));
aggregated.extend_from_slice(&stdout[..stdout_take]);
aggregated.extend_from_slice(&stderr[..stderr_take]);
aggregated
}