mirror of
https://github.com/openai/codex.git
synced 2026-03-16 19:06:30 +03:00
Compare commits
1 Commits
dev/friel/
...
ruslan/exe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58a5bd9c55 |
@@ -8,6 +8,10 @@ license.workspace = true
|
||||
name = "codex-app-server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "codex-app-server-command-exec-test-helper"
|
||||
path = "src/bin/command_exec_test_helper.rs"
|
||||
|
||||
[lib]
|
||||
name = "codex_app_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
@@ -648,6 +648,7 @@ Notes:
|
||||
- `size` is only valid when `tty: true`. It sets the initial PTY size in character cells.
|
||||
- Buffered Windows sandbox execution accepts `processId` for correlation, but `command/exec/write` and `command/exec/terminate` are still unsupported for those requests.
|
||||
- Buffered Windows sandbox execution also requires the default output cap; custom `outputBytesCap` and `disableOutputCap` are unsupported there.
|
||||
- Buffered Windows sandbox execution does not support `disableTimeout`.
|
||||
- `tty`, `streamStdin`, and `streamStdoutStderr` are optional booleans. Legacy requests that omit them continue to use buffered execution.
|
||||
- `tty: true` implies PTY mode plus `streamStdin: true` and `streamStdoutStderr: true`.
|
||||
- `tty` and `streamStdin` do not disable the timeout on their own; omit `timeoutMs` to use the server default timeout, or set `disableTimeout: true` to keep the process alive until exit or explicit termination.
|
||||
|
||||
30
codex-rs/app-server/src/bin/command_exec_test_helper.rs
Normal file
30
codex-rs/app-server/src/bin/command_exec_test_helper.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use std::env;
|
||||
use std::io::Write;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
const HOLD_STDOUT_OPEN_ARG: &str = "--hold-stdout-open";
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
if env::args().nth(1).as_deref() == Some(HOLD_STDOUT_OPEN_ARG) {
|
||||
print!("ta");
|
||||
std::io::stdout().flush()?;
|
||||
thread::sleep(Duration::from_millis(30));
|
||||
print!("il");
|
||||
std::io::stdout().flush()?;
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_exe = env::current_exe()?;
|
||||
Command::new(current_exe)
|
||||
.arg(HOLD_STDOUT_OPEN_ARG)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -31,8 +31,10 @@ use codex_utils_pty::SpawnedProcess;
|
||||
use codex_utils_pty::TerminalSize;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_PARAMS_ERROR_CODE;
|
||||
@@ -42,6 +44,8 @@ use crate::outgoing_message::ConnectionRequestId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
|
||||
const EXEC_TIMEOUT_EXIT_CODE: i32 = 124;
|
||||
const POST_EXIT_QUIET_PERIOD_MS: u64 = 50;
|
||||
const POST_EXIT_READY_DRAIN_LIMIT: usize = 256;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CommandExecManager {
|
||||
@@ -112,7 +116,7 @@ struct SpawnProcessOutputParams {
|
||||
connection_id: ConnectionId,
|
||||
process_id: Option<String>,
|
||||
output_rx: mpsc::Receiver<Vec<u8>>,
|
||||
stdio_timeout_rx: watch::Receiver<bool>,
|
||||
process_exited_rx: watch::Receiver<bool>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
stream: CommandExecOutputStream,
|
||||
stream_output: bool,
|
||||
@@ -175,6 +179,11 @@ impl CommandExecManager {
|
||||
};
|
||||
|
||||
if matches!(exec_request.sandbox, SandboxType::WindowsRestrictedToken) {
|
||||
if matches!(&exec_request.expiration, ExecExpiration::Cancellation(_)) {
|
||||
return Err(invalid_request(
|
||||
"disableTimeout is not supported with windows sandbox".to_string(),
|
||||
));
|
||||
}
|
||||
if tty || stream_stdin || stream_stdout_stderr {
|
||||
return Err(invalid_request(
|
||||
"streaming command/exec is not supported with windows sandbox".to_string(),
|
||||
@@ -470,13 +479,13 @@ async fn run_command(params: RunCommandParams) {
|
||||
} = spawned;
|
||||
tokio::pin!(exit_rx);
|
||||
let mut timed_out = false;
|
||||
let (stdio_timeout_tx, stdio_timeout_rx) = watch::channel(false);
|
||||
let (process_exited_tx, process_exited_rx) = watch::channel(false);
|
||||
|
||||
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
|
||||
connection_id: request_id.connection_id,
|
||||
process_id: process_id.clone(),
|
||||
output_rx: stdout_rx,
|
||||
stdio_timeout_rx: stdio_timeout_rx.clone(),
|
||||
process_exited_rx: process_exited_rx.clone(),
|
||||
outgoing: Arc::clone(&outgoing),
|
||||
stream: CommandExecOutputStream::Stdout,
|
||||
stream_output: stream_stdout_stderr,
|
||||
@@ -486,7 +495,7 @@ async fn run_command(params: RunCommandParams) {
|
||||
connection_id: request_id.connection_id,
|
||||
process_id,
|
||||
output_rx: stderr_rx,
|
||||
stdio_timeout_rx,
|
||||
process_exited_rx,
|
||||
outgoing: Arc::clone(&outgoing),
|
||||
stream: CommandExecOutputStream::Stderr,
|
||||
stream_output: stream_stdout_stderr,
|
||||
@@ -539,14 +548,13 @@ async fn run_command(params: RunCommandParams) {
|
||||
}
|
||||
};
|
||||
|
||||
let timeout_handle = tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(IO_DRAIN_TIMEOUT_MS)).await;
|
||||
let _ = stdio_timeout_tx.send(true);
|
||||
});
|
||||
// The child has exited, so future control RPCs should fail immediately
|
||||
// instead of waiting for the post-exit output drain to finish.
|
||||
drop(control_rx);
|
||||
|
||||
let _ = process_exited_tx.send(true);
|
||||
let stdout = stdout_handle.await.unwrap_or_default();
|
||||
let stderr = stderr_handle.await.unwrap_or_default();
|
||||
timeout_handle.abort();
|
||||
|
||||
outgoing
|
||||
.send_response(
|
||||
@@ -561,61 +569,181 @@ async fn run_command(params: RunCommandParams) {
|
||||
}
|
||||
|
||||
fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHandle<String> {
|
||||
tokio::spawn(async move {
|
||||
collect_process_output(
|
||||
params,
|
||||
Duration::from_millis(POST_EXIT_QUIET_PERIOD_MS),
|
||||
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
|
||||
)
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
struct ProcessOutputChunkContext<'a> {
|
||||
outgoing: &'a OutgoingMessageSender,
|
||||
connection_id: ConnectionId,
|
||||
process_id: Option<&'a String>,
|
||||
stream: CommandExecOutputStream,
|
||||
stream_output: bool,
|
||||
output_bytes_cap: Option<usize>,
|
||||
}
|
||||
|
||||
async fn collect_process_output(
|
||||
params: SpawnProcessOutputParams,
|
||||
post_exit_quiet_period: Duration,
|
||||
post_exit_hard_cap: Duration,
|
||||
) -> String {
|
||||
let SpawnProcessOutputParams {
|
||||
connection_id,
|
||||
process_id,
|
||||
mut output_rx,
|
||||
mut stdio_timeout_rx,
|
||||
mut process_exited_rx,
|
||||
outgoing,
|
||||
stream,
|
||||
stream_output,
|
||||
output_bytes_cap,
|
||||
} = params;
|
||||
tokio::spawn(async move {
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
let mut observed_num_bytes = 0usize;
|
||||
loop {
|
||||
let chunk = tokio::select! {
|
||||
chunk = output_rx.recv() => match chunk {
|
||||
Some(chunk) => chunk,
|
||||
None => break,
|
||||
},
|
||||
_ = stdio_timeout_rx.wait_for(|&v| v) => break,
|
||||
};
|
||||
let capped_chunk = match output_bytes_cap {
|
||||
Some(output_bytes_cap) => {
|
||||
let capped_chunk_len = output_bytes_cap
|
||||
.saturating_sub(observed_num_bytes)
|
||||
.min(chunk.len());
|
||||
observed_num_bytes += capped_chunk_len;
|
||||
&chunk[0..capped_chunk_len]
|
||||
}
|
||||
None => chunk.as_slice(),
|
||||
};
|
||||
let cap_reached = Some(observed_num_bytes) == output_bytes_cap;
|
||||
if let (true, Some(process_id)) = (stream_output, process_id.as_ref()) {
|
||||
outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::CommandExecOutputDelta(
|
||||
CommandExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
stream,
|
||||
delta_base64: STANDARD.encode(capped_chunk),
|
||||
cap_reached,
|
||||
},
|
||||
),
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
let mut observed_num_bytes = 0usize;
|
||||
let mut post_exit_deadlines = (*process_exited_rx.borrow()).then(|| {
|
||||
let now = Instant::now();
|
||||
(now + post_exit_quiet_period, now + post_exit_hard_cap)
|
||||
});
|
||||
let chunk_context = ProcessOutputChunkContext {
|
||||
outgoing: &outgoing,
|
||||
connection_id,
|
||||
process_id: process_id.as_ref(),
|
||||
stream,
|
||||
stream_output,
|
||||
output_bytes_cap,
|
||||
};
|
||||
|
||||
loop {
|
||||
let mut drained_any = false;
|
||||
// Once the hard cap elapses, do one last bounded `try_recv` pass so already-ready
|
||||
// tail bytes win over the timeout path instead of being dropped by a select race.
|
||||
let mut ready_drain_limit = post_exit_deadlines
|
||||
.as_ref()
|
||||
.filter(|(_, hard_deadline)| Instant::now() >= *hard_deadline)
|
||||
.map(|_| POST_EXIT_READY_DRAIN_LIMIT);
|
||||
|
||||
while !matches!(ready_drain_limit, Some(0)) {
|
||||
match output_rx.try_recv() {
|
||||
Ok(chunk) => {
|
||||
drained_any = true;
|
||||
if process_output_chunk(
|
||||
&chunk_context,
|
||||
&mut observed_num_bytes,
|
||||
&mut buffer,
|
||||
chunk,
|
||||
)
|
||||
.await;
|
||||
} else if !stream_output {
|
||||
buffer.extend_from_slice(capped_chunk);
|
||||
}
|
||||
if cap_reached {
|
||||
break;
|
||||
.await
|
||||
{
|
||||
return bytes_to_string_smart(&buffer);
|
||||
}
|
||||
if let Some(limit) = ready_drain_limit.as_mut() {
|
||||
*limit = limit.saturating_sub(1);
|
||||
} else if post_exit_deadlines
|
||||
.as_ref()
|
||||
.is_some_and(|(_, hard_deadline)| Instant::now() >= *hard_deadline)
|
||||
{
|
||||
ready_drain_limit = Some(POST_EXIT_READY_DRAIN_LIMIT);
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => return bytes_to_string_smart(&buffer),
|
||||
}
|
||||
}
|
||||
bytes_to_string_smart(&buffer)
|
||||
})
|
||||
|
||||
// Process exit starts the post-exit quiet window, but we still wait a bit longer for
|
||||
// trailing bytes until either the stream goes quiet or the hard cap expires.
|
||||
if post_exit_deadlines.is_none() && *process_exited_rx.borrow() {
|
||||
let now = Instant::now();
|
||||
post_exit_deadlines = Some((now + post_exit_quiet_period, now + post_exit_hard_cap));
|
||||
}
|
||||
|
||||
if drained_any {
|
||||
if let Some((quiet_deadline, _)) = post_exit_deadlines.as_mut() {
|
||||
*quiet_deadline = Instant::now() + post_exit_quiet_period;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let next_deadline = post_exit_deadlines
|
||||
.as_ref()
|
||||
.map(|(quiet_deadline, hard_deadline)| (*quiet_deadline).min(*hard_deadline));
|
||||
if next_deadline.is_some_and(|deadline| Instant::now() >= deadline) {
|
||||
break;
|
||||
}
|
||||
tokio::select! {
|
||||
chunk = output_rx.recv() => match chunk {
|
||||
Some(chunk) => {
|
||||
if process_output_chunk(
|
||||
&chunk_context,
|
||||
&mut observed_num_bytes,
|
||||
&mut buffer,
|
||||
chunk,
|
||||
)
|
||||
.await {
|
||||
break;
|
||||
}
|
||||
if let Some((quiet_deadline, _)) = post_exit_deadlines.as_mut() {
|
||||
*quiet_deadline = Instant::now() + post_exit_quiet_period;
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
},
|
||||
// Before exit we wait for the exit signal; after exit we swap that branch for the
|
||||
// next post-exit deadline so the same select handles both phases.
|
||||
_ = tokio::time::sleep_until(next_deadline.unwrap_or_else(Instant::now)),
|
||||
if next_deadline.is_some() => {}
|
||||
changed = process_exited_rx.changed(), if next_deadline.is_none() => {
|
||||
if changed.is_err() || *process_exited_rx.borrow() {
|
||||
let now = Instant::now();
|
||||
post_exit_deadlines = Some((now + post_exit_quiet_period, now + post_exit_hard_cap));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bytes_to_string_smart(&buffer)
|
||||
}
|
||||
|
||||
async fn process_output_chunk(
|
||||
context: &ProcessOutputChunkContext<'_>,
|
||||
observed_num_bytes: &mut usize,
|
||||
buffer: &mut Vec<u8>,
|
||||
chunk: Vec<u8>,
|
||||
) -> bool {
|
||||
let capped_chunk = match context.output_bytes_cap {
|
||||
Some(output_bytes_cap) => {
|
||||
let capped_chunk_len = output_bytes_cap
|
||||
.saturating_sub(*observed_num_bytes)
|
||||
.min(chunk.len());
|
||||
*observed_num_bytes += capped_chunk_len;
|
||||
&chunk[0..capped_chunk_len]
|
||||
}
|
||||
None => chunk.as_slice(),
|
||||
};
|
||||
let cap_reached = Some(*observed_num_bytes) == context.output_bytes_cap;
|
||||
if let (true, Some(process_id)) = (context.stream_output, context.process_id) {
|
||||
context
|
||||
.outgoing
|
||||
.send_server_notification_to_connections(
|
||||
&[context.connection_id],
|
||||
ServerNotification::CommandExecOutputDelta(CommandExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
stream: context.stream,
|
||||
delta_base64: STANDARD.encode(capped_chunk),
|
||||
cap_reached,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
} else if !context.stream_output {
|
||||
buffer.extend_from_slice(capped_chunk);
|
||||
}
|
||||
cap_reached
|
||||
}
|
||||
|
||||
async fn handle_process_write(
|
||||
@@ -704,20 +832,21 @@ mod tests {
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::protocol::ReadOnlyAccess;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_utils_cargo_bin::cargo_bin;
|
||||
use pretty_assertions::assert_eq;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use tokio::time::Duration;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use tokio::time::timeout;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::*;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
|
||||
fn test_outgoing() -> Arc<OutgoingMessageSender> {
|
||||
let (tx, _rx) = mpsc::channel(4);
|
||||
Arc::new(OutgoingMessageSender::new(tx))
|
||||
}
|
||||
|
||||
fn windows_sandbox_exec_request() -> ExecRequest {
|
||||
ExecRequest {
|
||||
command: vec!["cmd".to_string()],
|
||||
@@ -767,6 +896,39 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn windows_sandbox_disable_timeout_is_rejected() {
|
||||
let (tx, _rx) = mpsc::channel(1);
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(14),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(43),
|
||||
},
|
||||
process_id: Some("proc-43".to_string()),
|
||||
exec_request: ExecRequest {
|
||||
expiration: ExecExpiration::Cancellation(CancellationToken::new()),
|
||||
..windows_sandbox_exec_request()
|
||||
},
|
||||
started_network_proxy: None,
|
||||
tty: false,
|
||||
stream_stdin: false,
|
||||
stream_stdout_stderr: false,
|
||||
output_bytes_cap: Some(DEFAULT_OUTPUT_BYTES_CAP),
|
||||
size: None,
|
||||
})
|
||||
.await
|
||||
.expect_err("disableTimeout windows sandbox exec should be rejected");
|
||||
|
||||
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
|
||||
assert_eq!(
|
||||
err.message,
|
||||
"disableTimeout is not supported with windows sandbox"
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[tokio::test]
|
||||
async fn windows_sandbox_non_streaming_exec_uses_execution_path() {
|
||||
@@ -1001,4 +1163,207 @@ mod tests {
|
||||
assert_eq!(err.code, INVALID_REQUEST_ERROR_CODE);
|
||||
assert_eq!(err.message, "command/exec \"proc-13\" is no longer running");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_output_drains_ready_bytes_even_after_quiet_deadline_elapsed() {
|
||||
let (output_tx, output_rx) = mpsc::channel(4);
|
||||
output_tx
|
||||
.send(b"tail".to_vec())
|
||||
.await
|
||||
.expect("queue ready output");
|
||||
let (process_exited_tx, process_exited_rx) = watch::channel(true);
|
||||
let output = collect_process_output(
|
||||
SpawnProcessOutputParams {
|
||||
connection_id: ConnectionId(21),
|
||||
process_id: None,
|
||||
output_rx,
|
||||
process_exited_rx,
|
||||
outgoing: test_outgoing(),
|
||||
stream: CommandExecOutputStream::Stdout,
|
||||
stream_output: false,
|
||||
output_bytes_cap: None,
|
||||
},
|
||||
Duration::ZERO,
|
||||
Duration::from_millis(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
drop(process_exited_tx);
|
||||
assert_eq!(output, "tail");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_output_resets_quiet_window_when_more_tail_bytes_arrive() {
|
||||
let (output_tx, output_rx) = mpsc::channel(4);
|
||||
let (process_exited_tx, process_exited_rx) = watch::channel(false);
|
||||
|
||||
let send_task = tokio::spawn(async move {
|
||||
process_exited_tx
|
||||
.send(true)
|
||||
.expect("signal process exit to collector");
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
output_tx
|
||||
.send(b"one".to_vec())
|
||||
.await
|
||||
.expect("send first trailing chunk");
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
output_tx
|
||||
.send(b"two".to_vec())
|
||||
.await
|
||||
.expect("send second trailing chunk");
|
||||
});
|
||||
|
||||
let output = collect_process_output(
|
||||
SpawnProcessOutputParams {
|
||||
connection_id: ConnectionId(22),
|
||||
process_id: None,
|
||||
output_rx,
|
||||
process_exited_rx,
|
||||
outgoing: test_outgoing(),
|
||||
stream: CommandExecOutputStream::Stdout,
|
||||
stream_output: false,
|
||||
output_bytes_cap: None,
|
||||
},
|
||||
Duration::from_millis(15),
|
||||
Duration::from_millis(100),
|
||||
)
|
||||
.await;
|
||||
|
||||
send_task.await.expect("sender task should complete");
|
||||
assert_eq!(output, "onetwo");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_output_stops_after_hard_cap_when_stream_stays_open() {
|
||||
let (_output_tx, output_rx) = mpsc::channel(4);
|
||||
let (process_exited_tx, process_exited_rx) = watch::channel(true);
|
||||
let output = timeout(
|
||||
Duration::from_millis(100),
|
||||
collect_process_output(
|
||||
SpawnProcessOutputParams {
|
||||
connection_id: ConnectionId(23),
|
||||
process_id: None,
|
||||
output_rx,
|
||||
process_exited_rx,
|
||||
outgoing: test_outgoing(),
|
||||
stream: CommandExecOutputStream::Stdout,
|
||||
stream_output: false,
|
||||
output_bytes_cap: None,
|
||||
},
|
||||
Duration::from_millis(10),
|
||||
Duration::from_millis(20),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("collector should stop without waiting forever");
|
||||
|
||||
drop(process_exited_tx);
|
||||
assert_eq!(output, "");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_command_closes_control_channel_before_sending_drained_response() {
|
||||
enum ObservedEvent {
|
||||
ControlClosed,
|
||||
Response(OutgoingEnvelope),
|
||||
}
|
||||
|
||||
let helper = cargo_bin("codex-app-server-command-exec-test-helper")
|
||||
.expect("should find command_exec test helper");
|
||||
let spawned = codex_utils_pty::spawn_pipe_process_no_stdin(
|
||||
&helper.to_string_lossy(),
|
||||
&Vec::new(),
|
||||
PathBuf::from(".").as_path(),
|
||||
&HashMap::new(),
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
.expect("helper process should spawn");
|
||||
let (control_tx, control_rx) = mpsc::channel(4);
|
||||
let (tx, mut rx) = mpsc::channel(4);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(24),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(4),
|
||||
};
|
||||
|
||||
let run_task = tokio::spawn(run_command(RunCommandParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-24".to_string()),
|
||||
spawned,
|
||||
control_rx,
|
||||
stream_stdin: false,
|
||||
stream_stdout_stderr: false,
|
||||
expiration: ExecExpiration::DefaultTimeout,
|
||||
output_bytes_cap: None,
|
||||
}));
|
||||
|
||||
let (event_tx, mut event_rx) = mpsc::channel(2);
|
||||
let closed_event_tx = event_tx.clone();
|
||||
let closed_control_tx = control_tx.clone();
|
||||
let closed_observer = tokio::spawn(async move {
|
||||
closed_control_tx.closed().await;
|
||||
let _ = closed_event_tx.send(ObservedEvent::ControlClosed).await;
|
||||
});
|
||||
let response_observer = tokio::spawn(async move {
|
||||
if let Some(envelope) = rx.recv().await {
|
||||
let _ = event_tx.send(ObservedEvent::Response(envelope)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let first_event = timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for control close or drained response")
|
||||
.expect("observer channel closed before first event");
|
||||
assert!(
|
||||
matches!(first_event, ObservedEvent::ControlClosed),
|
||||
"drained response arrived before control receiver closed",
|
||||
);
|
||||
let (response_tx, _response_rx) = oneshot::channel();
|
||||
let send_result = control_tx
|
||||
.send(CommandControlRequest {
|
||||
control: CommandControl::Terminate,
|
||||
response_tx: Some(response_tx),
|
||||
})
|
||||
.await;
|
||||
assert!(send_result.is_err(), "post-exit control send should fail");
|
||||
|
||||
let second_event = timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for drained response")
|
||||
.expect("observer channel closed before drained response");
|
||||
let ObservedEvent::Response(envelope) = second_event else {
|
||||
panic!("expected drained response after control receiver closed");
|
||||
};
|
||||
let OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message,
|
||||
} = envelope
|
||||
else {
|
||||
panic!("expected connection-scoped outgoing message");
|
||||
};
|
||||
assert_eq!(connection_id, request_id.connection_id);
|
||||
let OutgoingMessage::Response(response) = message else {
|
||||
panic!("expected command/exec response after drain");
|
||||
};
|
||||
assert_eq!(response.id, request_id.request_id);
|
||||
let response: CommandExecResponse =
|
||||
serde_json::from_value(response.result).expect("deserialize command/exec response");
|
||||
assert_eq!(
|
||||
response,
|
||||
CommandExecResponse {
|
||||
exit_code: 0,
|
||||
stdout: "tail".to_string(),
|
||||
stderr: String::new(),
|
||||
}
|
||||
);
|
||||
|
||||
run_task.await.expect("run_command task should complete");
|
||||
closed_observer
|
||||
.await
|
||||
.expect("closed observer task should complete");
|
||||
response_observer
|
||||
.await
|
||||
.expect("response observer task should complete");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use codex_app_server_protocol::CommandExecWriteParams;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_utils_cargo_bin::cargo_bin;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use tempfile::TempDir;
|
||||
@@ -83,6 +84,56 @@ async fn command_exec_without_streams_can_be_terminated() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_response_drains_tail_output_after_parent_exit() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
let helper = cargo_bin("codex-app-server-command-exec-test-helper")
|
||||
.context("should find command_exec test helper")?;
|
||||
|
||||
let process_id = "post-exit-1".to_string();
|
||||
let command_request_id = mcp
|
||||
.send_command_exec_request(CommandExecParams {
|
||||
command: vec![helper.to_string_lossy().to_string()],
|
||||
process_id: Some(process_id.clone()),
|
||||
tty: false,
|
||||
stream_stdin: false,
|
||||
stream_stdout_stderr: false,
|
||||
output_bytes_cap: None,
|
||||
disable_output_cap: false,
|
||||
disable_timeout: false,
|
||||
timeout_ms: None,
|
||||
cwd: None,
|
||||
env: None,
|
||||
size: None,
|
||||
sandbox_policy: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert!(
|
||||
timeout(
|
||||
Duration::from_millis(20),
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(command_request_id)),
|
||||
)
|
||||
.await
|
||||
.is_err(),
|
||||
"response should stay pending while post-exit tail bytes are still draining",
|
||||
);
|
||||
|
||||
let response = mcp
|
||||
.read_stream_until_response_message(RequestId::Integer(command_request_id))
|
||||
.await?;
|
||||
let response: CommandExecResponse = to_response(response)?;
|
||||
assert_eq!(response.exit_code, 0);
|
||||
assert_eq!(response.stdout, "tail");
|
||||
assert_eq!(response.stderr, "");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn command_exec_without_process_id_keeps_buffered_compatibility() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
|
||||
Reference in New Issue
Block a user