refactor js_repl to reuse managed child process infra

git-stack-id: fjord/js_repl_seq---4hxc_-tczpgzls
git-stack-title: refactor js_repl to reuse managed child process infra
This commit is contained in:
Curtis 'Fjord' Hawthorne
2026-03-02 09:41:02 -08:00
parent 5df2191637
commit d502cb1121
11 changed files with 484 additions and 278 deletions

View File

@@ -2,15 +2,12 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
@@ -19,16 +16,12 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use thiserror::Error;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::process::ChildStdin;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio::sync::Semaphore;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::trace;
@@ -57,9 +50,13 @@ use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventFailure;
use crate::tools::events::ToolEventStage;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::SandboxOverride;
use crate::tools::sandboxing::SandboxablePreference;
use crate::truncate::TruncationPolicy;
use crate::truncate::truncate_text;
use crate::unified_exec::ManagedSplitProcess;
use crate::unified_exec::UnifiedExecProcess;
pub(crate) const JS_REPL_PRAGMA_PREFIX: &str = "// codex-js-repl:";
const KERNEL_SOURCE: &str = include_str!("kernel.js");
@@ -182,9 +179,9 @@ pub struct JsExecPollResult {
#[derive(Clone)]
struct KernelState {
child: Arc<Mutex<Child>>,
process: Arc<UnifiedExecProcess>,
recent_stderr: Arc<Mutex<VecDeque<String>>>,
stdin: Arc<Mutex<ChildStdin>>,
stdin: tokio::sync::mpsc::Sender<Vec<u8>>,
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>>,
shutdown: CancellationToken,
@@ -440,7 +437,6 @@ struct ExecCompletionEvent {
enum KernelStreamEnd {
Shutdown,
StdoutEof,
StdoutReadError(String),
}
impl KernelStreamEnd {
@@ -448,15 +444,11 @@ impl KernelStreamEnd {
match self {
Self::Shutdown => "shutdown",
Self::StdoutEof => "stdout_eof",
Self::StdoutReadError(_) => "stdout_read_error",
}
}
fn error(&self) -> Option<&str> {
match self {
Self::StdoutReadError(err) => Some(err),
_ => None,
}
None
}
}
@@ -466,17 +458,6 @@ struct KernelDebugSnapshot {
stderr_tail: String,
}
fn format_exit_status(status: std::process::ExitStatus) -> String {
if let Some(code) = status.code() {
return format!("code={code}");
}
#[cfg(unix)]
if let Some(signal) = status.signal() {
return format!("signal={signal}");
}
"unknown".to_string()
}
fn format_stderr_tail(lines: &VecDeque<String>) -> String {
if lines.is_empty() {
return "<empty>".to_string();
@@ -1145,7 +1126,7 @@ impl JsReplManager {
};
if let Some(state) = state {
state.shutdown.cancel();
Self::kill_kernel_child(&state.child, "reset").await;
Self::kill_kernel_child(&state.process, "reset").await;
}
}
@@ -1168,7 +1149,7 @@ impl JsReplManager {
self.mark_exec_host_terminating(exec_id).await;
}
state.kernel.shutdown.cancel();
Self::kill_kernel_child(&state.kernel.child, kill_reason).await;
Self::kill_kernel_child(&state.kernel.process, kill_reason).await;
if let Some(exec_id) = active_exec {
self.exec_to_session.lock().await.remove(&exec_id);
Self::cancel_exec_tool_calls_map(&self.exec_tool_calls, &exec_id).await;
@@ -1230,7 +1211,7 @@ impl JsReplManager {
let mut kernel = self.kernel.lock().await;
if kernel.is_none() {
let state = self
.start_kernel(Arc::clone(&turn), Some(session.conversation_id), None)
.start_kernel(Arc::clone(&session), Arc::clone(&turn), None)
.await
.map_err(JsReplExecuteError::RespondToModel)?;
*kernel = Some(state);
@@ -1245,10 +1226,10 @@ impl JsReplManager {
}
};
(
Arc::clone(&state.stdin),
state.stdin.clone(),
Arc::clone(&state.pending_execs),
Arc::clone(&state.exec_contexts),
Arc::clone(&state.child),
Arc::clone(&state.process),
Arc::clone(&state.recent_stderr),
)
};
@@ -1433,13 +1414,13 @@ impl JsReplManager {
{
if let Some(state) = pruned_idle_session {
state.kernel.shutdown.cancel();
Self::kill_kernel_child(&state.kernel.child, "poll_prune_idle_session").await;
Self::kill_kernel_child(&state.kernel.process, "poll_prune_idle_session").await;
}
let mut new_kernel = Some(
self.start_kernel(
Arc::clone(&session),
Arc::clone(&turn),
Some(session.conversation_id),
Some(session_id.clone()),
)
.await
@@ -1466,11 +1447,11 @@ impl JsReplManager {
}
if let Some(kernel) = stale_kernel {
kernel.shutdown.cancel();
Self::kill_kernel_child(&kernel.child, "poll_submit_session_race").await;
Self::kill_kernel_child(&kernel.process, "poll_submit_session_race").await;
}
if let Some(kernel) = capacity_kernel {
kernel.shutdown.cancel();
Self::kill_kernel_child(&kernel.child, "poll_submit_capacity_race").await;
Self::kill_kernel_child(&kernel.process, "poll_submit_capacity_race").await;
return Err(max_sessions_error());
}
}
@@ -1489,9 +1470,9 @@ impl JsReplManager {
state.active_exec = Some(req_id.clone());
state.last_used = Instant::now();
(
Arc::clone(&state.kernel.stdin),
state.kernel.stdin.clone(),
Arc::clone(&state.kernel.exec_contexts),
Arc::clone(&state.kernel.child),
Arc::clone(&state.kernel.process),
Arc::clone(&state.kernel.recent_stderr),
)
};
@@ -1543,7 +1524,7 @@ impl JsReplManager {
};
if let Some(state) = removed_state {
state.kernel.shutdown.cancel();
Self::kill_kernel_child(&state.kernel.child, "poll_submit_write_failed").await;
Self::kill_kernel_child(&state.kernel.process, "poll_submit_write_failed").await;
}
let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await;
let err_message = err.to_string();
@@ -1626,15 +1607,18 @@ impl JsReplManager {
}
async fn start_kernel(
&self,
session: Arc<Session>,
turn: Arc<TurnContext>,
thread_id: Option<ThreadId>,
poll_session_id: Option<String>,
) -> Result<KernelState, String> {
let node_path = resolve_compatible_node(self.node_path.as_deref()).await?;
let kernel_path = self.kernel_script_path.clone();
let mut env = create_env(&turn.shell_environment_policy, thread_id);
let mut env = create_env(
&turn.shell_environment_policy,
Some(session.conversation_id),
);
env.insert(
"CODEX_JS_TMP_DIR".to_string(),
self.tmp_dir.path().to_string_lossy().to_string(),
@@ -1664,68 +1648,27 @@ impl JsReplManager {
};
let sandbox = SandboxManager::new();
let has_managed_network_requirements = turn
.config
.config_layer_stack
.requirements_toml()
.network
.is_some();
let sandbox_type = sandbox.select_initial(
&turn.sandbox_policy,
let attempt = SandboxAttempt::initial_for_turn(
&sandbox,
turn.as_ref(),
SandboxablePreference::Auto,
turn.windows_sandbox_level,
has_managed_network_requirements,
SandboxOverride::NoOverride,
);
let exec_env = sandbox
.transform(crate::sandboxing::SandboxTransformRequest {
spec,
policy: &turn.sandbox_policy,
sandbox: sandbox_type,
enforce_managed_network: has_managed_network_requirements,
network: None,
sandbox_policy_cwd: &turn.cwd,
#[cfg(target_os = "macos")]
macos_seatbelt_profile_extensions: None,
codex_linux_sandbox_exe: turn.codex_linux_sandbox_exe.as_ref(),
use_linux_sandbox_bwrap: turn
.features
.enabled(crate::features::Feature::UseLinuxSandboxBwrap),
windows_sandbox_level: turn.windows_sandbox_level,
})
let exec_env = attempt
.env_for(spec, None)
.map_err(|err| format!("failed to configure sandbox for js_repl: {err}"))?;
let mut cmd =
tokio::process::Command::new(exec_env.command.first().cloned().unwrap_or_default());
if exec_env.command.len() > 1 {
cmd.args(&exec_env.command[1..]);
}
#[cfg(unix)]
cmd.arg0(
exec_env
.arg0
.clone()
.unwrap_or_else(|| exec_env.command.first().cloned().unwrap_or_default()),
);
cmd.current_dir(&exec_env.cwd);
cmd.env_clear();
cmd.envs(exec_env.env);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
let mut child = cmd
.spawn()
let ManagedSplitProcess {
process,
stdin,
stdout_rx,
stderr_rx,
} = session
.services
.unified_exec_manager
.open_split_pipe_session_with_exec_env(&exec_env)
.await
.map_err(|err| format!("failed to start Node runtime: {err}"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| "js_repl kernel missing stdout".to_string())?;
let stderr = child.stderr.take();
let stdin = child
.stdin
.take()
.ok_or_else(|| "js_repl kernel missing stdin".to_string())?;
let process = Arc::new(process);
let shutdown = CancellationToken::new();
let pending_execs: Arc<
@@ -1733,15 +1676,13 @@ impl JsReplManager {
> = Arc::new(Mutex::new(HashMap::new()));
let exec_contexts: Arc<Mutex<HashMap<String, ExecContext>>> =
Arc::new(Mutex::new(HashMap::new()));
let stdin_arc = Arc::new(Mutex::new(stdin));
let child = Arc::new(Mutex::new(child));
let recent_stderr = Arc::new(Mutex::new(VecDeque::with_capacity(
JS_REPL_STDERR_TAIL_LINE_LIMIT,
)));
tokio::spawn(Self::read_stdout(
stdout,
Arc::clone(&child),
stdout_rx,
Arc::clone(&process),
Arc::clone(&self.kernel),
Arc::clone(&recent_stderr),
Arc::clone(&pending_execs),
@@ -1750,24 +1691,20 @@ impl JsReplManager {
Arc::clone(&self.exec_store),
Arc::clone(&self.poll_sessions),
Arc::clone(&self.exec_to_session),
Arc::clone(&stdin_arc),
stdin.clone(),
poll_session_id,
shutdown.clone(),
));
if let Some(stderr) = stderr {
tokio::spawn(Self::read_stderr(
stderr,
Arc::clone(&recent_stderr),
shutdown.clone(),
));
} else {
warn!("js_repl kernel missing stderr");
}
tokio::spawn(Self::read_stderr(
stderr_rx,
Arc::clone(&recent_stderr),
shutdown.clone(),
));
Ok(KernelState {
child,
process,
recent_stderr,
stdin: stdin_arc,
stdin,
pending_execs,
exec_contexts,
shutdown,
@@ -1783,19 +1720,17 @@ impl JsReplManager {
}
async fn write_message(
stdin: &Arc<Mutex<ChildStdin>>,
stdin: &tokio::sync::mpsc::Sender<Vec<u8>>,
msg: &HostToKernel,
) -> Result<(), FunctionCallError> {
let encoded = serde_json::to_string(msg).map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to serialize kernel message: {err}"))
})?;
let mut guard = stdin.lock().await;
guard.write_all(encoded.as_bytes()).await.map_err(|err| {
let mut bytes = encoded.into_bytes();
bytes.push(b'\n');
stdin.send(bytes).await.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to write to kernel: {err}"))
})?;
guard.write_all(b"\n").await.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to flush kernel message: {err}"))
})?;
Ok(())
}
@@ -1805,18 +1740,17 @@ impl JsReplManager {
}
async fn kernel_debug_snapshot(
child: &Arc<Mutex<Child>>,
process: &Arc<UnifiedExecProcess>,
recent_stderr: &Arc<Mutex<VecDeque<String>>>,
) -> KernelDebugSnapshot {
let (pid, status) = {
let mut guard = child.lock().await;
let pid = guard.id();
let status = match guard.try_wait() {
Ok(Some(status)) => format!("exited({})", format_exit_status(status)),
Ok(None) => "running".to_string(),
Err(err) => format!("unknown ({err})"),
};
(pid, status)
let pid = process.pid();
let status = if process.has_exited() {
match process.exit_code() {
Some(code) => format!("exited({code})"),
None => "exited(unknown)".to_string(),
}
} else {
"running".to_string()
};
let stderr_tail = {
let tail = recent_stderr.lock().await;
@@ -1829,50 +1763,18 @@ impl JsReplManager {
}
}
async fn kill_kernel_child(child: &Arc<Mutex<Child>>, reason: &'static str) {
let mut guard = child.lock().await;
let pid = guard.id();
match guard.try_wait() {
Ok(Some(_)) => return,
Ok(None) => {}
Err(err) => {
warn!(
kernel_pid = ?pid,
kill_reason = reason,
error = %err,
"failed to inspect js_repl kernel before kill"
);
}
}
if let Err(err) = guard.start_kill() {
warn!(
kernel_pid = ?pid,
kill_reason = reason,
error = %err,
"failed to send kill signal to js_repl kernel"
);
async fn kill_kernel_child(process: &Arc<UnifiedExecProcess>, reason: &'static str) {
if process.has_exited() {
return;
}
match tokio::time::timeout(Duration::from_secs(2), guard.wait()).await {
Ok(Ok(_status)) => {}
Ok(Err(err)) => {
warn!(
kernel_pid = ?pid,
kill_reason = reason,
error = %err,
"failed while waiting for js_repl kernel exit"
);
}
Err(_) => {
warn!(
kernel_pid = ?pid,
kill_reason = reason,
"timed out waiting for js_repl kernel to exit after kill"
);
}
}
let pid = process.pid();
process.terminate();
warn!(
kernel_pid = ?pid,
kill_reason = reason,
"terminated js_repl kernel process"
);
}
fn truncate_id_list(ids: &[String]) -> Vec<String> {
@@ -1886,8 +1788,8 @@ impl JsReplManager {
#[allow(clippy::too_many_arguments)]
async fn read_stdout(
stdout: tokio::process::ChildStdout,
child: Arc<Mutex<Child>>,
mut stdout: broadcast::Receiver<Vec<u8>>,
process: Arc<UnifiedExecProcess>,
manager_kernel: Arc<Mutex<Option<KernelState>>>,
recent_stderr: Arc<Mutex<VecDeque<String>>>,
pending_execs: Arc<Mutex<HashMap<String, tokio::sync::oneshot::Sender<ExecResultMessage>>>>,
@@ -1896,19 +1798,41 @@ impl JsReplManager {
exec_store: Arc<Mutex<HashMap<String, ExecBuffer>>>,
poll_sessions: Arc<Mutex<HashMap<String, PollSessionState>>>,
exec_to_session: Arc<Mutex<HashMap<String, String>>>,
stdin: Arc<Mutex<ChildStdin>>,
stdin: tokio::sync::mpsc::Sender<Vec<u8>>,
poll_session_id: Option<String>,
shutdown: CancellationToken,
) {
let mut reader = BufReader::new(stdout).lines();
let end_reason = loop {
let line = tokio::select! {
_ = shutdown.cancelled() => break KernelStreamEnd::Shutdown,
res = reader.next_line() => match res {
Ok(Some(line)) => line,
Ok(None) => break KernelStreamEnd::StdoutEof,
Err(err) => break KernelStreamEnd::StdoutReadError(err.to_string()),
},
let mut pending_line = Vec::new();
let mut ready_lines = VecDeque::new();
let end_reason = 'outer: loop {
let line = if let Some(line) = ready_lines.pop_front() {
line
} else {
loop {
let chunk = tokio::select! {
_ = shutdown.cancelled() => break 'outer KernelStreamEnd::Shutdown,
res = stdout.recv() => match res {
Ok(chunk) => chunk,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
if let Some(line) = finish_broadcast_line(&mut pending_line) {
break line;
}
break 'outer KernelStreamEnd::StdoutEof;
}
},
};
pending_line.extend_from_slice(&chunk);
let lines = drain_broadcast_lines(&mut pending_line);
if lines.is_empty() {
continue;
}
ready_lines.extend(lines);
let Some(line) = ready_lines.pop_front() else {
continue;
};
break line;
}
};
let parsed: Result<KernelToHost, _> = serde_json::from_str(&line);
@@ -2037,7 +1961,8 @@ impl JsReplManager {
});
if let Err(err) = JsReplManager::write_message(&stdin, &payload).await {
let snapshot =
JsReplManager::kernel_debug_snapshot(&child, &recent_stderr).await;
JsReplManager::kernel_debug_snapshot(&process, &recent_stderr)
.await;
warn!(
exec_id = %exec_id,
tool_call_id = %tool_call_id,
@@ -2050,7 +1975,7 @@ impl JsReplManager {
}
continue;
};
let stdin_clone = Arc::clone(&stdin);
let stdin_clone = stdin.clone();
let exec_contexts = Arc::clone(&exec_contexts);
let exec_tool_calls_for_task = Arc::clone(&exec_tool_calls);
let recent_stderr = Arc::clone(&recent_stderr);
@@ -2117,7 +2042,7 @@ impl JsReplManager {
let unexpected_snapshot = if matches!(end_reason, KernelStreamEnd::Shutdown) {
None
} else {
Some(Self::kernel_debug_snapshot(&child, &recent_stderr).await)
Some(Self::kernel_debug_snapshot(&process, &recent_stderr).await)
};
let kernel_failure_message = unexpected_snapshot.as_ref().map(|snapshot| {
with_model_kernel_failure_message(
@@ -2135,7 +2060,7 @@ impl JsReplManager {
let mut kernel = manager_kernel.lock().await;
let should_clear = kernel
.as_ref()
.is_some_and(|state| Arc::ptr_eq(&state.child, &child));
.is_some_and(|state| Arc::ptr_eq(&state.process, &process));
if should_clear {
kernel.take();
}
@@ -2156,7 +2081,7 @@ impl JsReplManager {
let mut sessions = poll_sessions.lock().await;
let should_remove = sessions
.get(poll_session_id)
.is_some_and(|state| Arc::ptr_eq(&state.kernel.child, &child));
.is_some_and(|state| Arc::ptr_eq(&state.kernel.process, &process));
if should_remove {
sessions.remove(poll_session_id)
} else {
@@ -2330,23 +2255,42 @@ impl JsReplManager {
}
async fn read_stderr(
stderr: tokio::process::ChildStderr,
mut stderr: broadcast::Receiver<Vec<u8>>,
recent_stderr: Arc<Mutex<VecDeque<String>>>,
shutdown: CancellationToken,
) {
let mut reader = BufReader::new(stderr).lines();
let mut pending_line = Vec::new();
let mut ready_lines = VecDeque::new();
loop {
let line = tokio::select! {
_ = shutdown.cancelled() => break,
res = reader.next_line() => match res {
Ok(Some(line)) => line,
Ok(None) => break,
Err(err) => {
warn!("js_repl kernel stderr ended: {err}");
break;
let line = if let Some(line) = ready_lines.pop_front() {
line
} else {
loop {
let chunk = tokio::select! {
_ = shutdown.cancelled() => return,
res = stderr.recv() => match res {
Ok(chunk) => chunk,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
if let Some(line) = finish_broadcast_line(&mut pending_line) {
break line;
}
return;
}
},
};
pending_line.extend_from_slice(&chunk);
let lines = drain_broadcast_lines(&mut pending_line);
if lines.is_empty() {
continue;
}
},
ready_lines.extend(lines);
let Some(line) = ready_lines.pop_front() else {
continue;
};
break line;
}
};
let trimmed = line.trim();
if !trimmed.is_empty() {
@@ -2363,6 +2307,31 @@ impl JsReplManager {
}
}
fn drain_broadcast_lines(buffer: &mut Vec<u8>) -> Vec<String> {
let mut lines = Vec::new();
loop {
let Some(pos) = buffer.iter().position(|byte| *byte == b'\n') else {
break;
};
let line = buffer.drain(..=pos).collect::<Vec<_>>();
lines.push(decode_broadcast_line(&line));
}
lines
}
fn finish_broadcast_line(buffer: &mut Vec<u8>) -> Option<String> {
if buffer.is_empty() {
None
} else {
Some(decode_broadcast_line(&std::mem::take(buffer)))
}
}
fn decode_broadcast_line(line: &[u8]) -> String {
let line = String::from_utf8_lossy(line);
line.trim_end_matches(['\n', '\r']).to_string()
}
fn response_content_items(
response: &ResponseInputItem,
) -> Option<Vec<FunctionCallOutputContentItem>> {
@@ -3235,10 +3204,10 @@ mod tests {
)
.await?;
let child = {
let process = {
let guard = manager.kernel.lock().await;
let state = guard.as_ref().expect("kernel should exist after warmup");
Arc::clone(&state.child)
Arc::clone(&state.process)
};
let result = manager
@@ -3258,12 +3227,8 @@ mod tests {
assert_eq!(result, JsReplExecuteError::TimedOut);
let exit_state = {
let mut child = child.lock().await;
child.try_wait()?
};
assert!(
exit_state.is_some(),
process.has_exited(),
"timed out js_repl execution should kill previous kernel process"
);
Ok(())
@@ -3295,19 +3260,19 @@ mod tests {
)
.await?;
let child = {
let process = {
let guard = manager.kernel.lock().await;
let state = guard.as_ref().expect("kernel should exist after warmup");
Arc::clone(&state.child)
Arc::clone(&state.process)
};
JsReplManager::kill_kernel_child(&child, "test_crash").await;
JsReplManager::kill_kernel_child(&process, "test_crash").await;
tokio::time::timeout(Duration::from_secs(1), async {
loop {
let cleared = {
let guard = manager.kernel.lock().await;
guard
.as_ref()
.is_none_or(|state| !Arc::ptr_eq(&state.child, &child))
.is_none_or(|state| !Arc::ptr_eq(&state.process, &process))
};
if cleared {
return;
@@ -3361,10 +3326,10 @@ mod tests {
)
.await?;
let child = {
let process = {
let guard = manager.kernel.lock().await;
let state = guard.as_ref().expect("kernel should exist after warmup");
Arc::clone(&state.child)
Arc::clone(&state.process)
};
let err = tokio::time::timeout(
@@ -3393,11 +3358,7 @@ mod tests {
tokio::time::timeout(Duration::from_secs(1), async {
loop {
let exited = {
let mut child = child.lock().await;
child.try_wait()?.is_some()
};
if exited {
if process.has_exited() {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -3412,7 +3373,7 @@ mod tests {
let guard = manager.kernel.lock().await;
guard
.as_ref()
.is_none_or(|state| !Arc::ptr_eq(&state.child, &child))
.is_none_or(|state| !Arc::ptr_eq(&state.process, &process))
};
if cleared {
return;
@@ -4227,7 +4188,7 @@ console.log(out.type);
let turn = Arc::new(turn);
let manager = turn.js_repl.manager().await?;
let template_kernel = manager
.start_kernel(Arc::clone(&turn), Some(session.conversation_id), None)
.start_kernel(Arc::clone(&session), Arc::clone(&turn), None)
.await
.map_err(anyhow::Error::msg)?;

View File

@@ -9,7 +9,6 @@ caching).
use crate::error::CodexErr;
use crate::error::SandboxErr;
use crate::exec::ExecToolCallOutput;
use crate::features::Feature;
use crate::network_policy_decision::network_approval_context_from_payload;
use crate::sandboxing::SandboxManager;
use crate::tools::network_approval::DeferredNetworkApproval;
@@ -20,11 +19,11 @@ use crate::tools::network_approval::finish_immediate_network_approval;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ExecApprovalRequirement;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::SandboxOverride;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::default_exec_approval_requirement;
use crate::tools::sandboxing::has_managed_network_requirements;
use codex_otel::ToolDecisionSource;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::NetworkPolicyRuleAction;
@@ -160,35 +159,13 @@ impl ToolOrchestrator {
}
// 2) First attempt under the selected sandbox.
let has_managed_network_requirements = turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some();
let initial_sandbox = match tool.sandbox_mode_for_first_attempt(req) {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => self.sandbox.select_initial(
&turn_ctx.sandbox_policy,
tool.sandbox_preference(),
turn_ctx.windows_sandbox_level,
has_managed_network_requirements,
),
};
// Platform-specific flag gating is handled by SandboxManager::select_initial
// via crate::safety::get_platform_sandbox(..).
let use_linux_sandbox_bwrap = turn_ctx.features.enabled(Feature::UseLinuxSandboxBwrap);
let initial_attempt = SandboxAttempt {
sandbox: initial_sandbox,
policy: &turn_ctx.sandbox_policy,
enforce_managed_network: has_managed_network_requirements,
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_linux_sandbox_bwrap,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};
let has_managed_network_requirements = has_managed_network_requirements(turn_ctx);
let initial_attempt = SandboxAttempt::initial_for_turn(
&self.sandbox,
turn_ctx,
tool.sandbox_preference(),
tool.sandbox_mode_for_first_attempt(req),
);
let (first_result, first_deferred_network_approval) = Self::run_attempt(
tool,
@@ -300,7 +277,7 @@ impl ToolOrchestrator {
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: None,
use_linux_sandbox_bwrap,
use_linux_sandbox_bwrap: initial_attempt.use_linux_sandbox_bwrap,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};

View File

@@ -7,6 +7,7 @@
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::features::Feature;
use crate::protocol::SandboxPolicy;
use crate::sandboxing::CommandSpec;
use crate::sandboxing::SandboxManager;
@@ -326,8 +327,46 @@ pub(crate) struct SandboxAttempt<'a> {
pub windows_sandbox_level: codex_protocol::config_types::WindowsSandboxLevel,
}
pub(crate) fn has_managed_network_requirements(turn_ctx: &TurnContext) -> bool {
turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some()
}
impl<'a> SandboxAttempt<'a> {
pub fn env_for(
pub(crate) fn initial_for_turn(
manager: &'a SandboxManager,
turn_ctx: &'a TurnContext,
preference: SandboxablePreference,
sandbox_override: SandboxOverride,
) -> Self {
let enforce_managed_network = has_managed_network_requirements(turn_ctx);
let sandbox = match sandbox_override {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => manager.select_initial(
&turn_ctx.sandbox_policy,
preference,
turn_ctx.windows_sandbox_level,
enforce_managed_network,
),
};
Self {
sandbox,
policy: &turn_ctx.sandbox_policy,
enforce_managed_network,
manager,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_linux_sandbox_bwrap: turn_ctx.features.enabled(Feature::UseLinuxSandboxBwrap),
windows_sandbox_level: turn_ctx.windows_sandbox_level,
}
}
pub(crate) fn env_for(
&self,
spec: CommandSpec,
network: Option<&NetworkProxy>,

View File

@@ -49,6 +49,7 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
}
pub(crate) use errors::UnifiedExecError;
pub(crate) use process::ManagedSplitProcess;
pub(crate) use process::UnifiedExecProcess;
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;

View File

@@ -5,7 +5,9 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
@@ -46,10 +48,17 @@ pub(crate) struct UnifiedExecProcess {
sandbox_type: SandboxType,
}
pub(crate) struct ManagedSplitProcess {
pub(crate) process: UnifiedExecProcess,
pub(crate) stdin: mpsc::Sender<Vec<u8>>,
pub(crate) stdout_rx: broadcast::Receiver<Vec<u8>>,
pub(crate) stderr_rx: broadcast::Receiver<Vec<u8>>,
}
impl UnifiedExecProcess {
pub(super) fn new(
pub(crate) fn new(
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
initial_output_rx: broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
) -> Self {
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
@@ -95,7 +104,7 @@ impl UnifiedExecProcess {
}
}
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
pub(crate) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.process_handle.writer_sender()
}
@@ -113,7 +122,7 @@ impl UnifiedExecProcess {
self.process_handle.output_receiver()
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
pub(crate) fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
@@ -121,15 +130,15 @@ impl UnifiedExecProcess {
Arc::clone(&self.output_drained)
}
pub(super) fn has_exited(&self) -> bool {
pub(crate) fn has_exited(&self) -> bool {
self.process_handle.has_exited()
}
pub(super) fn exit_code(&self) -> Option<i32> {
pub(crate) fn exit_code(&self) -> Option<i32> {
self.process_handle.exit_code()
}
pub(super) fn terminate(&self) {
pub(crate) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();
@@ -137,6 +146,10 @@ impl UnifiedExecProcess {
self.output_task.abort();
}
pub(crate) fn pid(&self) -> Option<u32> {
self.process_handle.pid()
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot_chunks()
@@ -200,8 +213,17 @@ impl UnifiedExecProcess {
let SpawnedPty {
session: process_handle,
output_rx,
mut exit_rx,
exit_rx,
} = spawned;
Self::from_process_parts(process_handle, output_rx, exit_rx, sandbox_type).await
}
pub(crate) async fn from_process_parts(
process_handle: ExecCommandSession,
output_rx: broadcast::Receiver<Vec<u8>>,
mut exit_rx: oneshot::Receiver<i32>,
sandbox_type: SandboxType,
) -> Result<Self, UnifiedExecError> {
let managed = Self::new(process_handle, output_rx, sandbox_type);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View File

@@ -33,6 +33,7 @@ use crate::unified_exec::MAX_UNIFIED_EXEC_PROCESSES;
use crate::unified_exec::MAX_YIELD_TIME_MS;
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
use crate::unified_exec::MIN_YIELD_TIME_MS;
use crate::unified_exec::ManagedSplitProcess;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::ProcessStore;
use crate::unified_exec::UnifiedExecContext;
@@ -102,6 +103,17 @@ struct PreparedProcessHandles {
tty: bool,
}
#[derive(Clone, Copy)]
enum ExecEnvSpawnMode {
Merged { tty: bool },
SplitPipe,
}
enum SpawnedExecEnvProcess {
Merged(codex_utils_pty::SpawnedPty),
Split(codex_utils_pty::SpawnedProcessSplit),
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
@@ -529,13 +541,61 @@ impl UnifiedExecProcessManager {
env: &ExecRequest,
tty: bool,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::Merged { tty }).await? {
SpawnedExecEnvProcess::Merged(spawned) => {
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
}
SpawnedExecEnvProcess::Split(_) => {
unreachable!("merged spawn mode returned split process")
}
}
}
pub(crate) async fn open_split_pipe_session_with_exec_env(
&self,
env: &ExecRequest,
) -> Result<ManagedSplitProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::SplitPipe).await? {
SpawnedExecEnvProcess::Merged(_) => {
unreachable!("split pipe spawn mode returned merged process")
}
SpawnedExecEnvProcess::Split(spawned) => {
let codex_utils_pty::SpawnedProcessSplit {
session: process_handle,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let stdin = process_handle.writer_sender();
let output_rx = process_handle.output_receiver();
let process = UnifiedExecProcess::from_process_parts(
process_handle,
output_rx,
exit_rx,
env.sandbox,
)
.await?;
Ok(ManagedSplitProcess {
process,
stdin,
stdout_rx,
stderr_rx,
})
}
}
}
async fn spawn_exec_env_process(
env: &ExecRequest,
mode: ExecEnvSpawnMode,
) -> Result<SpawnedExecEnvProcess, UnifiedExecError> {
let (program, args) = env
.command
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawn_result = if tty {
codex_utils_pty::pty::spawn_process(
let spawn_result = match mode {
ExecEnvSpawnMode::Merged { tty: true } => codex_utils_pty::pty::spawn_process(
program,
args,
env.cwd.as_path(),
@@ -543,8 +603,19 @@ impl UnifiedExecProcessManager {
&env.arg0,
)
.await
} else {
codex_utils_pty::pipe::spawn_process_no_stdin(
.map(SpawnedExecEnvProcess::Merged),
ExecEnvSpawnMode::Merged { tty: false } => {
codex_utils_pty::pipe::spawn_process_no_stdin(
program,
args,
env.cwd.as_path(),
&env.env,
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Merged)
}
ExecEnvSpawnMode::SplitPipe => codex_utils_pty::pipe::spawn_process_split(
program,
args,
env.cwd.as_path(),
@@ -552,10 +623,10 @@ impl UnifiedExecProcessManager {
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Split),
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))
}
pub(super) async fn open_session_with_sandbox(

View File

@@ -15,10 +15,14 @@ pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
pub use process::ProcessHandle;
/// Bundle of process handles plus output and exit receivers returned by spawn helpers.
pub use process::SpawnedProcess;
/// Bundle of process handles plus split stdout/stderr receivers returned by pipe spawn helpers.
pub use process::SpawnedProcessSplit;
/// Backwards-compatible alias for ProcessHandle.
pub type ExecCommandSession = ProcessHandle;
/// Backwards-compatible alias for SpawnedProcess.
pub type SpawnedPty = SpawnedProcess;
/// Spawn a non-interactive process using regular pipes and preserve split stdout/stderr streams.
pub use pipe::spawn_process_split as spawn_pipe_process_split;
/// Report whether ConPTY is available on this platform (Windows only).
pub use pty::conpty_supported;
/// Spawn a process attached to a PTY for interactive use.

View File

@@ -21,6 +21,7 @@ use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::SpawnedProcess;
use crate::process::SpawnedProcessSplit;
#[cfg(target_os = "linux")]
use libc;
@@ -73,8 +74,11 @@ fn kill_process(pid: u32) -> io::Result<()> {
}
}
async fn read_output_stream<R>(mut reader: R, output_tx: broadcast::Sender<Vec<u8>>)
where
async fn read_output_stream<R>(
mut reader: R,
merged_output_tx: broadcast::Sender<Vec<u8>>,
stream_output_tx: broadcast::Sender<Vec<u8>>,
) where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; 8_192];
@@ -82,7 +86,9 @@ where
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec());
let chunk = buf[..n].to_vec();
let _ = merged_output_tx.send(chunk.clone());
let _ = stream_output_tx.send(chunk);
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
@@ -103,7 +109,7 @@ async fn spawn_process_with_stdin_mode(
env: &HashMap<String, String>,
arg0: &Option<String>,
stdin_mode: PipeStdinMode,
) -> Result<SpawnedProcess> {
) -> Result<SpawnedProcessSplit> {
if program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
@@ -159,6 +165,10 @@ async fn spawn_process_with_stdin_mode(
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let initial_output_rx = output_tx.subscribe();
let (stdout_tx, _) = broadcast::channel::<Vec<u8>>(256);
let initial_stdout_rx = stdout_tx.subscribe();
let (stderr_tx, _) = broadcast::channel::<Vec<u8>>(256);
let initial_stderr_rx = stderr_tx.subscribe();
let writer_handle = if let Some(stdin) = stdin {
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
@@ -176,14 +186,16 @@ async fn spawn_process_with_stdin_mode(
let stdout_handle = stdout.map(|stdout| {
let output_tx = output_tx.clone();
let stdout_tx = stdout_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), output_tx).await;
read_output_stream(BufReader::new(stdout), output_tx, stdout_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let output_tx = output_tx.clone();
let stderr_tx = stderr_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), output_tx).await;
read_output_stream(BufReader::new(stderr), output_tx, stderr_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
@@ -223,6 +235,7 @@ async fn spawn_process_with_stdin_mode(
writer_tx,
output_tx,
initial_output_rx,
Some(pid),
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
@@ -238,9 +251,11 @@ async fn spawn_process_with_stdin_mode(
None,
);
Ok(SpawnedProcess {
let _ = output_rx;
Ok(SpawnedProcessSplit {
session: handle,
output_rx,
stdout_rx: initial_stdout_rx,
stderr_rx: initial_stderr_rx,
exit_rx,
})
}
@@ -253,7 +268,14 @@ pub async fn spawn_process(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await?;
let output_rx = spawned.session.output_receiver();
Ok(SpawnedProcess {
session: spawned.session,
output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes, but close stdin immediately.
@@ -264,5 +286,23 @@ pub async fn spawn_process_no_stdin(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await?;
let output_rx = spawned.session.output_receiver();
Ok(SpawnedProcess {
session: spawned.session,
output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), preserving split stdout/stderr streams.
pub async fn spawn_process_split(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcessSplit> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
}

View File

@@ -31,6 +31,7 @@ impl fmt::Debug for PtyHandles {
pub struct ProcessHandle {
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
pid: Option<u32>,
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
@@ -55,6 +56,7 @@ impl ProcessHandle {
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
initial_output_rx: broadcast::Receiver<Vec<u8>>,
pid: Option<u32>,
killer: Box<dyn ChildTerminator>,
reader_handle: JoinHandle<()>,
reader_abort_handles: Vec<AbortHandle>,
@@ -68,6 +70,7 @@ impl ProcessHandle {
Self {
writer_tx,
output_tx,
pid,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_abort_handles: StdMutex::new(reader_abort_handles),
@@ -91,6 +94,11 @@ impl ProcessHandle {
self.output_tx.subscribe()
}
/// Returns the child process ID when available.
pub fn pid(&self) -> Option<u32> {
self.pid
}
/// True if the child process has exited.
pub fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
@@ -108,6 +116,14 @@ impl ProcessHandle {
let _ = killer.kill();
}
}
self.exit_status
.store(true, std::sync::atomic::Ordering::SeqCst);
match self.exit_code.lock() {
Ok(mut guard) if guard.is_none() => {
*guard = Some(-1);
}
Ok(_) | Err(_) => {}
}
if let Ok(mut h) = self.reader_handle.lock() {
if let Some(handle) = h.take() {
@@ -145,3 +161,12 @@ pub struct SpawnedProcess {
pub output_rx: broadcast::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}
/// Return value from split-output spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcessSplit {
pub session: ProcessHandle,
pub stdout_rx: broadcast::Receiver<Vec<u8>>,
pub stderr_rx: broadcast::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}

View File

@@ -103,6 +103,7 @@ pub async fn spawn_process(
}
let mut child = pair.slave.spawn_command(command_builder)?;
let pid = child.process_id();
#[cfg(unix)]
// portable-pty establishes the spawned PTY child as a new session leader on
// Unix, so PID == PGID and we can reuse the pipe backend's process-group
@@ -178,6 +179,7 @@ pub async fn spawn_process(
writer_tx,
output_tx,
initial_output_rx,
pid,
Box::new(PtyChildTerminator {
killer,
#[cfg(unix)]

View File

@@ -4,6 +4,7 @@ use std::path::Path;
use pretty_assertions::assert_eq;
use crate::spawn_pipe_process;
use crate::spawn_pipe_process_split;
use crate::spawn_pty_process;
fn find_python() -> Option<String> {
@@ -93,6 +94,29 @@ async fn collect_output_until_exit(
}
}
async fn collect_output_until_closed(
mut output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
) -> Vec<u8> {
let mut collected = Vec::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return collected;
}
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => collected.extend_from_slice(&chunk),
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) | Err(_) => {
return collected;
}
}
}
}
async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
@@ -275,6 +299,46 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_split_preserves_stdout_and_stderr() -> anyhow::Result<()> {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_split_preserves_stdout_and_stderr");
return Ok(());
};
let args = vec![
"-u".to_string(),
"-c".to_string(),
"import sys; print('stdout-line'); sys.stderr.write('stderr-line\\n'); sys.stderr.flush()"
.to_string(),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process_split(&python, &args, Path::new("."), &env_map, &None).await?;
let merged_rx = spawned.session.output_receiver();
let stdout_task =
tokio::spawn(async move { collect_output_until_closed(spawned.stdout_rx, 5_000).await });
let stderr_task =
tokio::spawn(async move { collect_output_until_closed(spawned.stderr_rx, 5_000).await });
let merged_task =
tokio::spawn(
async move { collect_output_until_exit(merged_rx, spawned.exit_rx, 5_000).await },
);
let stdout = stdout_task.await?;
let stderr = stderr_task.await?;
let (merged, code) = merged_task.await?;
assert_eq!(code, 0, "expected python -c to exit cleanly");
assert_eq!(String::from_utf8_lossy(&stdout), "stdout-line\n");
assert_eq!(String::from_utf8_lossy(&stderr), "stderr-line\n");
let merged = String::from_utf8_lossy(&merged);
assert!(merged.contains("stdout-line"));
assert!(merged.contains("stderr-line"));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {