use std::collections::HashMap; use std::path::Path; use pretty_assertions::assert_eq; use crate::combine_output_receivers; use crate::spawn_pipe_process; use crate::spawn_pipe_process_no_stdin; use crate::spawn_pty_process; use crate::SpawnedProcess; use crate::TerminalSize; fn find_python() -> Option { for candidate in ["python3", "python"] { if let Ok(output) = std::process::Command::new(candidate) .arg("--version") .output() { if output.status.success() { return Some(candidate.to_string()); } } } None } fn setsid_available() -> bool { if cfg!(windows) { return false; } std::process::Command::new("setsid") .arg("true") .status() .map(|status| status.success()) .unwrap_or(false) } fn shell_command(program: &str) -> (String, Vec) { if cfg!(windows) { let cmd = std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string()); (cmd, vec!["/C".to_string(), program.to_string()]) } else { ( "/bin/sh".to_string(), vec!["-c".to_string(), program.to_string()], ) } } fn echo_sleep_command(marker: &str) -> String { if cfg!(windows) { format!("echo {marker} & ping -n 2 127.0.0.1 > NUL") } else { format!("echo {marker}; sleep 0.05") } } fn split_stdout_stderr_command() -> String { if cfg!(windows) { // Keep this in cmd.exe syntax so the test does not depend on a runner-local // PowerShell/Python setup just to produce deterministic split output. "(echo split-out)&(>&2 echo split-err)".to_string() } else { "printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string() } } async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver>) -> Vec { let mut collected = Vec::new(); while let Some(chunk) = output_rx.recv().await { collected.extend_from_slice(&chunk); } collected } fn combine_spawned_output( spawned: SpawnedProcess, ) -> ( crate::ProcessHandle, tokio::sync::broadcast::Receiver>, tokio::sync::oneshot::Receiver, ) { let SpawnedProcess { session, stdout_rx, stderr_rx, exit_rx, } = spawned; ( session, combine_output_receivers(stdout_rx, stderr_rx), exit_rx, ) } async fn collect_output_until_exit( mut output_rx: tokio::sync::broadcast::Receiver>, exit_rx: tokio::sync::oneshot::Receiver, timeout_ms: u64, ) -> (Vec, i32) { let mut collected = Vec::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); tokio::pin!(exit_rx); loop { tokio::select! { res = output_rx.recv() => { if let Ok(chunk) = res { collected.extend_from_slice(&chunk); } } res = &mut exit_rx => { let code = res.unwrap_or(-1); // On Windows (ConPTY in particular), it's possible to observe the exit notification // before the final bytes are drained from the PTY reader thread. Drain for a brief // "quiet" window to make output assertions deterministic. let (quiet_ms, max_ms) = if cfg!(windows) { (200, 2_000) } else { (50, 500) }; let quiet = tokio::time::Duration::from_millis(quiet_ms); let max_deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(max_ms); while tokio::time::Instant::now() < max_deadline { match tokio::time::timeout(quiet, output_rx.recv()).await { Ok(Ok(chunk)) => collected.extend_from_slice(&chunk), Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break, Err(_) => break, } } return (collected, code); } _ = tokio::time::sleep_until(deadline) => { return (collected, -1); } } } } async fn wait_for_python_repl_ready( output_rx: &mut tokio::sync::broadcast::Receiver>, timeout_ms: u64, ready_marker: &str, ) -> anyhow::Result> { let mut collected = Vec::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); while tokio::time::Instant::now() < deadline { let now = tokio::time::Instant::now(); let remaining = deadline.saturating_duration_since(now); match tokio::time::timeout(remaining, output_rx.recv()).await { Ok(Ok(chunk)) => { collected.extend_from_slice(&chunk); if String::from_utf8_lossy(&collected).contains(ready_marker) { return Ok(collected); } } Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => { anyhow::bail!( "PTY output closed while waiting for Python REPL readiness: {:?}", String::from_utf8_lossy(&collected) ); } Err(_) => break, } } anyhow::bail!( "timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}", String::from_utf8_lossy(&collected) ); } #[cfg(unix)] fn process_exists(pid: i32) -> anyhow::Result { let result = unsafe { libc::kill(pid, 0) }; if result == 0 { return Ok(true); } let err = std::io::Error::last_os_error(); match err.raw_os_error() { Some(libc::ESRCH) => Ok(false), Some(libc::EPERM) => Ok(true), _ => Err(err.into()), } } #[cfg(unix)] async fn wait_for_marker_pid( output_rx: &mut tokio::sync::broadcast::Receiver>, marker: &str, timeout_ms: u64, ) -> anyhow::Result { let mut collected = Vec::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); loop { let now = tokio::time::Instant::now(); if now >= deadline { anyhow::bail!( "timed out waiting for marker {marker:?} in PTY output: {:?}", String::from_utf8_lossy(&collected) ); } let remaining = deadline.saturating_duration_since(now); let chunk = tokio::time::timeout(remaining, output_rx.recv()) .await .map_err(|_| anyhow::anyhow!("timeout waiting for PTY output"))??; collected.extend_from_slice(&chunk); let text = String::from_utf8_lossy(&collected); if let Some(marker_idx) = text.find(marker) { let suffix = &text[marker_idx + marker.len()..]; let digits: String = suffix .chars() .skip_while(|ch| !ch.is_ascii_digit()) .take_while(char::is_ascii_digit) .collect(); if !digits.is_empty() { return Ok(digits.parse()?); } } } } #[cfg(unix)] async fn wait_for_process_exit(pid: i32, timeout_ms: u64) -> anyhow::Result { let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); loop { if !process_exists(pid)? { return Ok(true); } if tokio::time::Instant::now() >= deadline { return Ok(false); } tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { let Some(python) = find_python() else { eprintln!("python not found; skipping pty_python_repl_emits_output_and_exits"); return Ok(()); }; let ready_marker = "__codex_pty_ready__"; let args = vec![ "-i".to_string(), "-q".to_string(), "-c".to_string(), format!("print('{ready_marker}')"), ]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pty_process( &python, &args, Path::new("."), &env_map, &None, TerminalSize::default(), ) .await?; let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned); let writer = session.writer_sender(); let newline = if cfg!(windows) { "\r\n" } else { "\n" }; let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; let mut output = wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?; writer .send(format!("print('hello from pty'){newline}").into_bytes()) .await?; writer.send(format!("exit(){newline}").into_bytes()).await?; let timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, timeout_ms).await; output.extend_from_slice(&remaining_output); let text = String::from_utf8_lossy(&output); assert!( text.contains("hello from pty"), "expected python output in PTY: {text:?}" ); assert_eq!(code, 0, "expected python to exit cleanly"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> { let Some(python) = find_python() else { eprintln!("python not found; skipping pipe_process_round_trips_stdin"); return Ok(()); }; let args = vec![ "-u".to_string(), "-c".to_string(), "import sys; print(sys.stdin.readline().strip());".to_string(), ]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?; let (session, output_rx, exit_rx) = combine_spawned_output(spawned); let writer = session.writer_sender(); writer.send(b"roundtrip\n".to_vec()).await?; let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await; let text = String::from_utf8_lossy(&output); assert!( text.contains("roundtrip"), "expected pipe process to echo stdin: {text:?}" ); assert_eq!(code, 0, "expected python -c to exit cleanly"); Ok(()) } #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> { let parent_sid = unsafe { libc::getsid(0) }; if parent_sid == -1 { anyhow::bail!("failed to read parent session id"); } let env_map: HashMap = std::env::vars().collect(); let script = "echo $$; sleep 0.2"; let (program, args) = shell_command(script); let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?; let (_session, mut output_rx, exit_rx) = combine_spawned_output(spawned); let pid_bytes = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()).await??; let pid_text = String::from_utf8_lossy(&pid_bytes); let child_pid: i32 = pid_text .split_whitespace() .next() .ok_or_else(|| anyhow::anyhow!("missing child pid output: {pid_text:?}"))? .parse()?; let child_sid = unsafe { libc::getsid(child_pid) }; if child_sid == -1 { anyhow::bail!("failed to read child session id"); } assert_eq!(child_sid, child_pid, "expected child to be session leader"); assert_ne!( child_sid, parent_sid, "expected child to be detached from parent session" ); let exit_code = exit_rx.await.unwrap_or(-1); assert_eq!( exit_code, 0, "expected detached pipe process to exit cleanly" ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_and_pty_share_interface() -> anyhow::Result<()> { let env_map: HashMap = std::env::vars().collect(); let (pipe_program, pipe_args) = shell_command(&echo_sleep_command("pipe_ok")); let (pty_program, pty_args) = shell_command(&echo_sleep_command("pty_ok")); let pipe = spawn_pipe_process(&pipe_program, &pipe_args, Path::new("."), &env_map, &None).await?; let pty = spawn_pty_process( &pty_program, &pty_args, Path::new("."), &env_map, &None, TerminalSize::default(), ) .await?; let (_pipe_session, pipe_output_rx, pipe_exit_rx) = combine_spawned_output(pipe); let (_pty_session, pty_output_rx, pty_exit_rx) = combine_spawned_output(pty); let timeout_ms = if cfg!(windows) { 10_000 } else { 3_000 }; let (pipe_out, pipe_code) = collect_output_until_exit(pipe_output_rx, pipe_exit_rx, timeout_ms).await; let (pty_out, pty_code) = collect_output_until_exit(pty_output_rx, pty_exit_rx, timeout_ms).await; assert_eq!(pipe_code, 0); assert_eq!(pty_code, 0); assert!( String::from_utf8_lossy(&pipe_out).contains("pipe_ok"), "pipe output mismatch: {pipe_out:?}" ); assert!( String::from_utf8_lossy(&pty_out).contains("pty_ok"), "pty output mismatch: {pty_out:?}" ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> { let Some(python) = find_python() else { eprintln!("python not found; skipping pipe_drains_stderr_without_stdout_activity"); return Ok(()); }; let script = "import sys\nchunk = 'E' * 65536\nfor _ in range(64):\n sys.stderr.write(chunk)\n sys.stderr.flush()\n"; let args = vec!["-c".to_string(), script.to_string()]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?; let (_session, output_rx, exit_rx) = combine_spawned_output(spawned); let (output, code) = collect_output_until_exit(output_rx, exit_rx, 10_000).await; assert_eq!(code, 0, "expected python to exit cleanly"); assert!(!output.is_empty(), "expected stderr output to be drained"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> { let env_map: HashMap = std::env::vars().collect(); let (program, args) = shell_command(&split_stdout_stderr_command()); let spawned = spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?; let SpawnedProcess { session: _session, stdout_rx, stderr_rx, exit_rx, } = spawned; let timeout_ms = if cfg!(windows) { 10_000 } else { 2_000 }; let timeout = tokio::time::Duration::from_millis(timeout_ms); let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await }); let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await }); let code = tokio::time::timeout(timeout, exit_rx) .await .map_err(|_| anyhow::anyhow!("timed out waiting for split process exit"))? .unwrap_or(-1); let stdout = tokio::time::timeout(timeout, stdout_task) .await .map_err(|_| anyhow::anyhow!("timed out waiting to drain split stdout"))??; let stderr = tokio::time::timeout(timeout, stderr_task) .await .map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??; let expected_stdout = if cfg!(windows) { b"split-out\r\n".to_vec() } else { b"split-out\n".to_vec() }; let expected_stderr = if cfg!(windows) { b"split-err\r\n".to_vec() } else { b"split-err\n".to_vec() }; assert_eq!(stdout, expected_stdout); assert_eq!(stderr, expected_stderr); assert_eq!(code, 0); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> { if !setsid_available() { eprintln!("setsid not available; skipping pipe_terminate_aborts_detached_readers"); return Ok(()); } let env_map: HashMap = std::env::vars().collect(); let script = "setsid sh -c 'i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &"; let (program, args) = shell_command(script); let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?; let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned); let _ = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()) .await .map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??; session.terminate(); let mut post_rx = output_rx.resubscribe(); let post_terminate = tokio::time::timeout(tokio::time::Duration::from_millis(200), post_rx.recv()).await; match post_terminate { Err(_) => Ok(()), Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => Ok(()), Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => { anyhow::bail!("unexpected output after terminate (lagged)") } Ok(Ok(chunk)) => anyhow::bail!( "unexpected output after terminate: {:?}", String::from_utf8_lossy(&chunk) ), } } #[cfg(unix)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pty_terminate_kills_background_children_in_same_process_group() -> anyhow::Result<()> { let env_map: HashMap = std::env::vars().collect(); let marker = "__codex_bg_pid:"; let script = format!("sleep 1000 & bg=$!; echo {marker}$bg; wait"); let (program, args) = shell_command(&script); let spawned = spawn_pty_process( &program, &args, Path::new("."), &env_map, &None, TerminalSize::default(), ) .await?; let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned); let bg_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await { Ok(pid) => pid, Err(err) => { session.terminate(); return Err(err); } }; assert!( process_exists(bg_pid)?, "expected background child pid {bg_pid} to exist before terminate" ); session.terminate(); let exited = wait_for_process_exit(bg_pid, 3_000).await?; if !exited { let _ = unsafe { libc::kill(bg_pid, libc::SIGKILL) }; } assert!( exited, "background child pid {bg_pid} survived PTY terminate()" ); Ok(()) }