Compare commits

...

1 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
58a5bd9c55 app-server: harden command/exec drain and lifecycle races
Close the remaining post-exit and control-path races by draining tail output with a quiet window plus a bounded final ready pass, and by returning consistent not-running errors after a process has already exited.

Add a dedicated helper binary and focused tests so the lifecycle behavior is reviewable on its own, and reject Windows disableTimeout requests that the buffered fallback cannot honor.
2026-03-06 17:32:47 -08:00
5 changed files with 508 additions and 57 deletions

View File

@@ -8,6 +8,10 @@ license.workspace = true
name = "codex-app-server"
path = "src/main.rs"
[[bin]]
name = "codex-app-server-command-exec-test-helper"
path = "src/bin/command_exec_test_helper.rs"
[lib]
name = "codex_app_server"
path = "src/lib.rs"

View File

@@ -648,6 +648,7 @@ Notes:
- `size` is only valid when `tty: true`. It sets the initial PTY size in character cells.
- Buffered Windows sandbox execution accepts `processId` for correlation, but `command/exec/write` and `command/exec/terminate` are still unsupported for those requests.
- Buffered Windows sandbox execution also requires the default output cap; custom `outputBytesCap` and `disableOutputCap` are unsupported there.
- Buffered Windows sandbox execution does not support `disableTimeout`.
- `tty`, `streamStdin`, and `streamStdoutStderr` are optional booleans. Legacy requests that omit them continue to use buffered execution.
- `tty: true` implies PTY mode plus `streamStdin: true` and `streamStdoutStderr: true`.
- `tty` and `streamStdin` do not disable the timeout on their own; omit `timeoutMs` to use the server default timeout, or set `disableTimeout: true` to keep the process alive until exit or explicit termination.

View File

@@ -0,0 +1,30 @@
use std::env;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::thread;
use std::time::Duration;
const HOLD_STDOUT_OPEN_ARG: &str = "--hold-stdout-open";
fn main() -> anyhow::Result<()> {
if env::args().nth(1).as_deref() == Some(HOLD_STDOUT_OPEN_ARG) {
print!("ta");
std::io::stdout().flush()?;
thread::sleep(Duration::from_millis(30));
print!("il");
std::io::stdout().flush()?;
thread::sleep(Duration::from_secs(1));
return Ok(());
}
let current_exe = env::current_exe()?;
Command::new(current_exe)
.arg(HOLD_STDOUT_OPEN_ARG)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
Ok(())
}

View File

@@ -31,8 +31,10 @@ use codex_utils_pty::SpawnedProcess;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::time::Instant;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_PARAMS_ERROR_CODE;
@@ -42,6 +44,8 @@ use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
const POST_EXIT_QUIET_PERIOD_MS: u64 = 50;
const POST_EXIT_READY_DRAIN_LIMIT: usize = 256;
#[derive(Clone)]
pub(crate) struct CommandExecManager {
@@ -112,7 +116,7 @@ struct SpawnProcessOutputParams {
connection_id: ConnectionId,
process_id: Option<String>,
output_rx: mpsc::Receiver<Vec<u8>>,
stdio_timeout_rx: watch::Receiver<bool>,
process_exited_rx: watch::Receiver<bool>,
outgoing: Arc<OutgoingMessageSender>,
stream: CommandExecOutputStream,
stream_output: bool,
@@ -175,6 +179,11 @@ impl CommandExecManager {
};
if matches!(exec_request.sandbox, SandboxType::WindowsRestrictedToken) {
if matches!(&exec_request.expiration, ExecExpiration::Cancellation(_)) {
return Err(invalid_request(
"disableTimeout is not supported with windows sandbox".to_string(),
));
}
if tty || stream_stdin || stream_stdout_stderr {
return Err(invalid_request(
"streaming command/exec is not supported with windows sandbox".to_string(),
@@ -470,13 +479,13 @@ async fn run_command(params: RunCommandParams) {
} = spawned;
tokio::pin!(exit_rx);
let mut timed_out = false;
let (stdio_timeout_tx, stdio_timeout_rx) = watch::channel(false);
let (process_exited_tx, process_exited_rx) = watch::channel(false);
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: stdout_rx,
stdio_timeout_rx: stdio_timeout_rx.clone(),
process_exited_rx: process_exited_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
@@ -486,7 +495,7 @@ async fn run_command(params: RunCommandParams) {
connection_id: request_id.connection_id,
process_id,
output_rx: stderr_rx,
stdio_timeout_rx,
process_exited_rx,
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stderr,
stream_output: stream_stdout_stderr,
@@ -539,14 +548,13 @@ async fn run_command(params: RunCommandParams) {
}
};
let timeout_handle = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(IO_DRAIN_TIMEOUT_MS)).await;
let _ = stdio_timeout_tx.send(true);
});
// The child has exited, so future control RPCs should fail immediately
// instead of waiting for the post-exit output drain to finish.
drop(control_rx);
let _ = process_exited_tx.send(true);
let stdout = stdout_handle.await.unwrap_or_default();
let stderr = stderr_handle.await.unwrap_or_default();
timeout_handle.abort();
outgoing
.send_response(
@@ -561,61 +569,181 @@ async fn run_command(params: RunCommandParams) {
}
fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHandle<String> {
tokio::spawn(async move {
collect_process_output(
params,
Duration::from_millis(POST_EXIT_QUIET_PERIOD_MS),
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
)
.await
})
}
struct ProcessOutputChunkContext<'a> {
outgoing: &'a OutgoingMessageSender,
connection_id: ConnectionId,
process_id: Option<&'a String>,
stream: CommandExecOutputStream,
stream_output: bool,
output_bytes_cap: Option<usize>,
}
async fn collect_process_output(
params: SpawnProcessOutputParams,
post_exit_quiet_period: Duration,
post_exit_hard_cap: Duration,
) -> String {
let SpawnProcessOutputParams {
connection_id,
process_id,
mut output_rx,
mut stdio_timeout_rx,
mut process_exited_rx,
outgoing,
stream,
stream_output,
output_bytes_cap,
} = params;
tokio::spawn(async move {
let mut buffer: Vec<u8> = Vec::new();
let mut observed_num_bytes = 0usize;
loop {
let chunk = tokio::select! {
chunk = output_rx.recv() => match chunk {
Some(chunk) => chunk,
None => break,
},
_ = stdio_timeout_rx.wait_for(|&v| v) => break,
};
let capped_chunk = match output_bytes_cap {
Some(output_bytes_cap) => {
let capped_chunk_len = output_bytes_cap
.saturating_sub(observed_num_bytes)
.min(chunk.len());
observed_num_bytes += capped_chunk_len;
&chunk[0..capped_chunk_len]
}
None => chunk.as_slice(),
};
let cap_reached = Some(observed_num_bytes) == output_bytes_cap;
if let (true, Some(process_id)) = (stream_output, process_id.as_ref()) {
outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::CommandExecOutputDelta(
CommandExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
delta_base64: STANDARD.encode(capped_chunk),
cap_reached,
},
),
let mut buffer = Vec::new();
let mut observed_num_bytes = 0usize;
let mut post_exit_deadlines = (*process_exited_rx.borrow()).then(|| {
let now = Instant::now();
(now + post_exit_quiet_period, now + post_exit_hard_cap)
});
let chunk_context = ProcessOutputChunkContext {
outgoing: &outgoing,
connection_id,
process_id: process_id.as_ref(),
stream,
stream_output,
output_bytes_cap,
};
loop {
let mut drained_any = false;
// Once the hard cap elapses, do one last bounded `try_recv` pass so already-ready
// tail bytes win over the timeout path instead of being dropped by a select race.
let mut ready_drain_limit = post_exit_deadlines
.as_ref()
.filter(|(_, hard_deadline)| Instant::now() >= *hard_deadline)
.map(|_| POST_EXIT_READY_DRAIN_LIMIT);
while !matches!(ready_drain_limit, Some(0)) {
match output_rx.try_recv() {
Ok(chunk) => {
drained_any = true;
if process_output_chunk(
&chunk_context,
&mut observed_num_bytes,
&mut buffer,
chunk,
)
.await;
} else if !stream_output {
buffer.extend_from_slice(capped_chunk);
}
if cap_reached {
break;
.await
{
return bytes_to_string_smart(&buffer);
}
if let Some(limit) = ready_drain_limit.as_mut() {
*limit = limit.saturating_sub(1);
} else if post_exit_deadlines
.as_ref()
.is_some_and(|(_, hard_deadline)| Instant::now() >= *hard_deadline)
{
ready_drain_limit = Some(POST_EXIT_READY_DRAIN_LIMIT);
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return bytes_to_string_smart(&buffer),
}
}
bytes_to_string_smart(&buffer)
})
// Process exit starts the post-exit quiet window, but we still wait a bit longer for
// trailing bytes until either the stream goes quiet or the hard cap expires.
if post_exit_deadlines.is_none() && *process_exited_rx.borrow() {
let now = Instant::now();
post_exit_deadlines = Some((now + post_exit_quiet_period, now + post_exit_hard_cap));
}
if drained_any {
if let Some((quiet_deadline, _)) = post_exit_deadlines.as_mut() {
*quiet_deadline = Instant::now() + post_exit_quiet_period;
}
continue;
}
let next_deadline = post_exit_deadlines
.as_ref()
.map(|(quiet_deadline, hard_deadline)| (*quiet_deadline).min(*hard_deadline));
if next_deadline.is_some_and(|deadline| Instant::now() >= deadline) {
break;
}
tokio::select! {
chunk = output_rx.recv() => match chunk {
Some(chunk) => {
if process_output_chunk(
&chunk_context,
&mut observed_num_bytes,
&mut buffer,
chunk,
)
.await {
break;
}
if let Some((quiet_deadline, _)) = post_exit_deadlines.as_mut() {
*quiet_deadline = Instant::now() + post_exit_quiet_period;
}
}
None => break,
},
// Before exit we wait for the exit signal; after exit we swap that branch for the
// next post-exit deadline so the same select handles both phases.
_ = tokio::time::sleep_until(next_deadline.unwrap_or_else(Instant::now)),
if next_deadline.is_some() => {}
changed = process_exited_rx.changed(), if next_deadline.is_none() => {
if changed.is_err() || *process_exited_rx.borrow() {
let now = Instant::now();
post_exit_deadlines = Some((now + post_exit_quiet_period, now + post_exit_hard_cap));
}
}
}
}
bytes_to_string_smart(&buffer)
}
async fn process_output_chunk(
context: &ProcessOutputChunkContext<'_>,
observed_num_bytes: &mut usize,
buffer: &mut Vec<u8>,
chunk: Vec<u8>,
) -> bool {
let capped_chunk = match context.output_bytes_cap {
Some(output_bytes_cap) => {
let capped_chunk_len = output_bytes_cap
.saturating_sub(*observed_num_bytes)
.min(chunk.len());
*observed_num_bytes += capped_chunk_len;
&chunk[0..capped_chunk_len]
}
None => chunk.as_slice(),
};
let cap_reached = Some(*observed_num_bytes) == context.output_bytes_cap;
if let (true, Some(process_id)) = (context.stream_output, context.process_id) {
context
.outgoing
.send_server_notification_to_connections(
&[context.connection_id],
ServerNotification::CommandExecOutputDelta(CommandExecOutputDeltaNotification {
process_id: process_id.clone(),
stream: context.stream,
delta_base64: STANDARD.encode(capped_chunk),
cap_reached,
}),
)
.await;
} else if !context.stream_output {
buffer.extend_from_slice(capped_chunk);
}
cap_reached
}
async fn handle_process_write(
@@ -704,20 +832,21 @@ mod tests {
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::protocol::ReadOnlyAccess;
use codex_protocol::protocol::SandboxPolicy;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
#[cfg(not(target_os = "windows"))]
use tokio::time::Duration;
#[cfg(not(target_os = "windows"))]
use tokio::time::timeout;
#[cfg(not(target_os = "windows"))]
use tokio_util::sync::CancellationToken;
use super::*;
#[cfg(not(target_os = "windows"))]
use crate::outgoing_message::OutgoingEnvelope;
#[cfg(not(target_os = "windows"))]
use crate::outgoing_message::OutgoingMessage;
fn test_outgoing() -> Arc<OutgoingMessageSender> {
let (tx, _rx) = mpsc::channel(4);
Arc::new(OutgoingMessageSender::new(tx))
}
fn windows_sandbox_exec_request() -> ExecRequest {
ExecRequest {
command: vec!["cmd".to_string()],
@@ -767,6 +896,39 @@ mod tests {
);
}
#[tokio::test]
async fn windows_sandbox_disable_timeout_is_rejected() {
let (tx, _rx) = mpsc::channel(1);
let manager = CommandExecManager::default();
let err = manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
request_id: ConnectionRequestId {
connection_id: ConnectionId(14),
request_id: codex_app_server_protocol::RequestId::Integer(43),
},
process_id: Some("proc-43".to_string()),
exec_request: ExecRequest {
expiration: ExecExpiration::Cancellation(CancellationToken::new()),
..windows_sandbox_exec_request()
},
started_network_proxy: None,
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: Some(DEFAULT_OUTPUT_BYTES_CAP),
size: None,
})
.await
.expect_err("disableTimeout windows sandbox exec should be rejected");
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(
err.message,
"disableTimeout is not supported with windows sandbox"
);
}
#[cfg(not(target_os = "windows"))]
#[tokio::test]
async fn windows_sandbox_non_streaming_exec_uses_execution_path() {
@@ -1001,4 +1163,207 @@ mod tests {
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(err.message, "command/exec \"proc-13\" is no longer running");
}
#[tokio::test]
async fn process_output_drains_ready_bytes_even_after_quiet_deadline_elapsed() {
let (output_tx, output_rx) = mpsc::channel(4);
output_tx
.send(b"tail".to_vec())
.await
.expect("queue ready output");
let (process_exited_tx, process_exited_rx) = watch::channel(true);
let output = collect_process_output(
SpawnProcessOutputParams {
connection_id: ConnectionId(21),
process_id: None,
output_rx,
process_exited_rx,
outgoing: test_outgoing(),
stream: CommandExecOutputStream::Stdout,
stream_output: false,
output_bytes_cap: None,
},
Duration::ZERO,
Duration::from_millis(1),
)
.await;
drop(process_exited_tx);
assert_eq!(output, "tail");
}
#[tokio::test]
async fn process_output_resets_quiet_window_when_more_tail_bytes_arrive() {
let (output_tx, output_rx) = mpsc::channel(4);
let (process_exited_tx, process_exited_rx) = watch::channel(false);
let send_task = tokio::spawn(async move {
process_exited_tx
.send(true)
.expect("signal process exit to collector");
tokio::time::sleep(Duration::from_millis(10)).await;
output_tx
.send(b"one".to_vec())
.await
.expect("send first trailing chunk");
tokio::time::sleep(Duration::from_millis(10)).await;
output_tx
.send(b"two".to_vec())
.await
.expect("send second trailing chunk");
});
let output = collect_process_output(
SpawnProcessOutputParams {
connection_id: ConnectionId(22),
process_id: None,
output_rx,
process_exited_rx,
outgoing: test_outgoing(),
stream: CommandExecOutputStream::Stdout,
stream_output: false,
output_bytes_cap: None,
},
Duration::from_millis(15),
Duration::from_millis(100),
)
.await;
send_task.await.expect("sender task should complete");
assert_eq!(output, "onetwo");
}
#[tokio::test]
async fn process_output_stops_after_hard_cap_when_stream_stays_open() {
let (_output_tx, output_rx) = mpsc::channel(4);
let (process_exited_tx, process_exited_rx) = watch::channel(true);
let output = timeout(
Duration::from_millis(100),
collect_process_output(
SpawnProcessOutputParams {
connection_id: ConnectionId(23),
process_id: None,
output_rx,
process_exited_rx,
outgoing: test_outgoing(),
stream: CommandExecOutputStream::Stdout,
stream_output: false,
output_bytes_cap: None,
},
Duration::from_millis(10),
Duration::from_millis(20),
),
)
.await
.expect("collector should stop without waiting forever");
drop(process_exited_tx);
assert_eq!(output, "");
}
#[tokio::test]
async fn run_command_closes_control_channel_before_sending_drained_response() {
enum ObservedEvent {
ControlClosed,
Response(OutgoingEnvelope),
}
let helper = cargo_bin("codex-app-server-command-exec-test-helper")
.expect("should find command_exec test helper");
let spawned = codex_utils_pty::spawn_pipe_process_no_stdin(
&helper.to_string_lossy(),
&Vec::new(),
PathBuf::from(".").as_path(),
&HashMap::new(),
&None,
)
.await
.expect("helper process should spawn");
let (control_tx, control_rx) = mpsc::channel(4);
let (tx, mut rx) = mpsc::channel(4);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(24),
request_id: codex_app_server_protocol::RequestId::Integer(4),
};
let run_task = tokio::spawn(run_command(RunCommandParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
request_id: request_id.clone(),
process_id: Some("proc-24".to_string()),
spawned,
control_rx,
stream_stdin: false,
stream_stdout_stderr: false,
expiration: ExecExpiration::DefaultTimeout,
output_bytes_cap: None,
}));
let (event_tx, mut event_rx) = mpsc::channel(2);
let closed_event_tx = event_tx.clone();
let closed_control_tx = control_tx.clone();
let closed_observer = tokio::spawn(async move {
closed_control_tx.closed().await;
let _ = closed_event_tx.send(ObservedEvent::ControlClosed).await;
});
let response_observer = tokio::spawn(async move {
if let Some(envelope) = rx.recv().await {
let _ = event_tx.send(ObservedEvent::Response(envelope)).await;
}
});
let first_event = timeout(Duration::from_secs(1), event_rx.recv())
.await
.expect("timed out waiting for control close or drained response")
.expect("observer channel closed before first event");
assert!(
matches!(first_event, ObservedEvent::ControlClosed),
"drained response arrived before control receiver closed",
);
let (response_tx, _response_rx) = oneshot::channel();
let send_result = control_tx
.send(CommandControlRequest {
control: CommandControl::Terminate,
response_tx: Some(response_tx),
})
.await;
assert!(send_result.is_err(), "post-exit control send should fail");
let second_event = timeout(Duration::from_secs(1), event_rx.recv())
.await
.expect("timed out waiting for drained response")
.expect("observer channel closed before drained response");
let ObservedEvent::Response(envelope) = second_event else {
panic!("expected drained response after control receiver closed");
};
let OutgoingEnvelope::ToConnection {
connection_id,
message,
} = envelope
else {
panic!("expected connection-scoped outgoing message");
};
assert_eq!(connection_id, request_id.connection_id);
let OutgoingMessage::Response(response) = message else {
panic!("expected command/exec response after drain");
};
assert_eq!(response.id, request_id.request_id);
let response: CommandExecResponse =
serde_json::from_value(response.result).expect("deserialize command/exec response");
assert_eq!(
response,
CommandExecResponse {
exit_code: 0,
stdout: "tail".to_string(),
stderr: String::new(),
}
);
run_task.await.expect("run_command task should complete");
closed_observer
.await
.expect("closed observer task should complete");
response_observer
.await
.expect("response observer task should complete");
}
}

View File

@@ -16,6 +16,7 @@ use codex_app_server_protocol::CommandExecWriteParams;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use tempfile::TempDir;
@@ -83,6 +84,56 @@ async fn command_exec_without_streams_can_be_terminated() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn command_exec_response_drains_tail_output_after_parent_exit() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let helper = cargo_bin("codex-app-server-command-exec-test-helper")
.context("should find command_exec test helper")?;
let process_id = "post-exit-1".to_string();
let command_request_id = mcp
.send_command_exec_request(CommandExecParams {
command: vec![helper.to_string_lossy().to_string()],
process_id: Some(process_id.clone()),
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
})
.await?;
assert!(
timeout(
Duration::from_millis(20),
mcp.read_stream_until_response_message(RequestId::Integer(command_request_id)),
)
.await
.is_err(),
"response should stay pending while post-exit tail bytes are still draining",
);
let response = mcp
.read_stream_until_response_message(RequestId::Integer(command_request_id))
.await?;
let response: CommandExecResponse = to_response(response)?;
assert_eq!(response.exit_code, 0);
assert_eq!(response.stdout, "tail");
assert_eq!(response.stderr, "");
Ok(())
}
#[tokio::test]
async fn command_exec_without_process_id_keeps_buffered_compatibility() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;