Compare commits

...

22 Commits

Author SHA1 Message Date
jif-oai
b3eb9355dc Comments 2025-10-16 18:46:19 +01:00
jif-oai
aeb82f79c0 Adding a bit of docs 2025-10-16 12:56:02 +01:00
jif-oai
802082f748 R3 2025-10-16 12:49:03 +01:00
jif-oai
8327dc047b R2 2025-10-16 12:28:22 +01:00
jif-oai
d09165383f R1 2025-10-16 12:28:02 +01:00
jif-oai
65be622e9f Fix tests 2 2025-10-15 15:18:26 +01:00
jif-oai
b7e834a02b Fix tests 2025-10-15 12:55:22 +01:00
jif-oai
9865a0cfe1 RV7 2025-10-15 12:26:08 +01:00
jif-oai
6755b3b3d8 Merge remote-tracking branch 'origin/main' into jif/sandbox-unified-exec
# Conflicts:
#	codex-rs/core/src/executor/runner.rs
2025-10-15 12:20:34 +01:00
jif-oai
a55fc6a88f RV6 2025-10-15 12:19:53 +01:00
jif-oai
b622a1c850 RV5 2025-10-15 12:10:32 +01:00
jif-oai
53f07c304b RV4 2025-10-15 11:49:55 +01:00
jif-oai
1e2f6f21e2 RV3 2025-10-15 11:46:06 +01:00
jif-oai
703c3825ce RV2 2025-10-15 11:40:07 +01:00
jif-oai
f61c7aa160 RV1 2025-10-15 11:15:52 +01:00
jif-oai
ecac194a66 Merge branch 'main' into jif/sandbox-unified-exec 2025-10-14 18:20:23 +01:00
jimmyfraiture
b3cf2a4b41 Fix 3 2025-10-09 14:37:45 +01:00
jimmyfraiture
f5c626b2e0 Fix 2 2025-10-09 14:31:47 +01:00
jimmyfraiture
30ae983c3a Fix 1 2025-10-09 14:21:11 +01:00
jimmyfraiture
b47d54d57c V3 2025-10-09 14:15:20 +01:00
jimmyfraiture
29a2305025 V2 2025-10-09 13:51:25 +01:00
jimmyfraiture
3aadc93924 feat: sandboxing for unified exec 2025-10-09 13:34:17 +01:00
16 changed files with 1136 additions and 587 deletions

View File

@@ -472,6 +472,7 @@ impl Session {
turn_context.sandbox_policy.clone(),
turn_context.cwd.clone(),
config.codex_linux_sandbox_exe.clone(),
None,
)),
};
@@ -1073,16 +1074,6 @@ impl Session {
.map_err(FunctionCallError::RespondToModel)
}
pub(crate) async fn run_unified_exec_request(
&self,
request: crate::unified_exec::UnifiedExecRequest<'_>,
) -> Result<crate::unified_exec::UnifiedExecResult, crate::unified_exec::UnifiedExecError> {
self.services
.unified_exec_manager
.handle_request(request)
.await
}
pub async fn interrupt_task(self: &Arc<Self>) {
info!("interrupt received: abort current task, if any");
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
@@ -2771,6 +2762,7 @@ mod tests {
turn_context.sandbox_policy.clone(),
turn_context.cwd.clone(),
None,
None,
)),
};
let session = Session {
@@ -2839,6 +2831,7 @@ mod tests {
config.sandbox_policy.clone(),
config.cwd.clone(),
None,
None,
)),
};
let session = Arc::new(Session {

View File

@@ -18,13 +18,14 @@ use tokio::process::Child;
use crate::error::CodexErr;
use crate::error::Result;
use crate::error::SandboxErr;
use crate::landlock::spawn_command_under_linux_sandbox;
use crate::executor::SandboxLaunch;
use crate::executor::SandboxLaunchError;
use crate::executor::build_launch_for_sandbox;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecOutputStream;
use crate::protocol::SandboxPolicy;
use crate::seatbelt::spawn_command_under_seatbelt;
use crate::spawn::StdioPolicy;
use crate::spawn::spawn_child_async;
@@ -86,58 +87,36 @@ pub async fn process_exec_tool_call(
sandbox_cwd: &Path,
codex_linux_sandbox_exe: &Option<PathBuf>,
stdout_stream: Option<StdoutStream>,
) -> Result<ExecToolCallOutput> {
let launch = build_launch_for_sandbox(
sandbox_type,
&params.command,
sandbox_policy,
sandbox_cwd,
codex_linux_sandbox_exe.as_ref(),
)
.map_err(CodexErr::from)?;
execute_sandbox_launch(params, launch, sandbox_type, sandbox_policy, stdout_stream).await
}
pub(crate) async fn execute_sandbox_launch(
params: ExecParams,
launch: SandboxLaunch,
sandbox_type: SandboxType,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
) -> Result<ExecToolCallOutput> {
let start = Instant::now();
let timeout_duration = params.timeout_duration();
let raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr> = match sandbox_type
{
SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await,
SandboxType::MacosSeatbelt => {
let ExecParams {
command,
cwd: command_cwd,
env,
..
} = params;
let child = spawn_command_under_seatbelt(
command,
command_cwd,
sandbox_policy,
sandbox_cwd,
StdioPolicy::RedirectForShellTool,
env,
)
.await?;
consume_truncated_output(child, timeout_duration, stdout_stream.clone()).await
}
SandboxType::LinuxSeccomp => {
let ExecParams {
command,
cwd: command_cwd,
env,
..
} = params;
let codex_linux_sandbox_exe = codex_linux_sandbox_exe
.as_ref()
.ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?;
let child = spawn_command_under_linux_sandbox(
codex_linux_sandbox_exe,
command,
command_cwd,
sandbox_policy,
sandbox_cwd,
StdioPolicy::RedirectForShellTool,
env,
)
.await?;
consume_truncated_output(child, timeout_duration, stdout_stream).await
}
};
let raw_output_result = spawn_with_launch(params, launch, sandbox_policy, stdout_stream).await;
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox_type, duration)
}
fn finalize_exec_result(
raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr>,
sandbox_type: SandboxType,
duration: Duration,
) -> Result<ExecToolCallOutput> {
match raw_output_result {
Ok(raw_output) => {
#[allow(unused_mut)]
@@ -192,12 +171,73 @@ pub async fn process_exec_tool_call(
}
}
async fn spawn_with_launch(
params: ExecParams,
launch: SandboxLaunch,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
) -> std::result::Result<RawExecToolCallOutput, CodexErr> {
let ExecParams {
command: _,
cwd,
timeout_ms,
mut env,
with_escalated_permissions,
justification,
} = params;
let SandboxLaunch {
program,
args,
env: launch_env,
..
} = launch;
env.extend(launch_env);
let mut command = Vec::with_capacity(1 + args.len());
command.push(program);
command.extend(args);
let updated_params = ExecParams {
command,
cwd,
timeout_ms,
env,
with_escalated_permissions,
justification,
};
exec(updated_params, sandbox_policy, stdout_stream).await
}
pub(crate) mod errors {
use super::CodexErr;
use super::SandboxLaunchError;
impl From<SandboxLaunchError> for CodexErr {
fn from(err: SandboxLaunchError) -> Self {
match err {
SandboxLaunchError::MissingCommandLine => CodexErr::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"command args are empty",
)),
SandboxLaunchError::MissingLinuxSandboxExecutable => {
CodexErr::LandlockSandboxExecutableNotProvided
}
}
}
}
}
/// We don't have a fully deterministic way to tell if our command failed
/// because of the sandbox - a command in the user's zshrc file might hit an
/// error, but the command itself might fail or succeed for other reasons.
/// For now, we conservatively check for well known command failure exit codes and
/// also look for common sandbox denial keywords in the command output.
fn is_likely_sandbox_denied(sandbox_type: SandboxType, exec_output: &ExecToolCallOutput) -> bool {
pub(crate) fn is_likely_sandbox_denied(
sandbox_type: SandboxType,
exec_output: &ExecToolCallOutput,
) -> bool {
if sandbox_type == SandboxType::None || exec_output.exit_code == 0 {
return false;
}

View File

@@ -27,9 +27,12 @@ pub(crate) struct ExecCommandSession {
/// Tracks whether the underlying process has exited.
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
/// Captures the process exit code once it becomes available.
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
}
impl ExecCommandSession {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
@@ -38,6 +41,7 @@ impl ExecCommandSession {
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
) -> (Self, broadcast::Receiver<Vec<u8>>) {
let initial_output_rx = output_tx.subscribe();
(
@@ -49,6 +53,7 @@ impl ExecCommandSession {
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
exit_code,
},
initial_output_rx,
)
@@ -65,6 +70,10 @@ impl ExecCommandSession {
pub(crate) fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn exit_code(&self) -> Option<i32> {
self.exit_code.lock().ok().and_then(|guard| *guard)
}
}
impl Drop for ExecCommandSession {

View File

@@ -1,16 +1,7 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU32;
use portable_pty::CommandBuilder;
use portable_pty::PtySize;
use portable_pty::native_pty_system;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;
use tokio::time::Instant;
@@ -20,6 +11,7 @@ use crate::exec_command::exec_command_params::ExecCommandParams;
use crate::exec_command::exec_command_params::WriteStdinParams;
use crate::exec_command::exec_command_session::ExecCommandSession;
use crate::exec_command::session_id::SessionId;
use crate::pty::spawn_pty_process;
use crate::truncate::truncate_middle;
#[derive(Debug, Default)]
@@ -242,102 +234,12 @@ async fn create_exec_command_session(
login,
} = params;
// Use the native pty implementation for the system
let pty_system = native_pty_system();
// Create a new pty
let pair = pty_system.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})?;
// Spawn a shell into the pty
let mut command_builder = CommandBuilder::new(shell);
let shell_mode_opt = if login { "-lc" } else { "-c" };
command_builder.arg(shell_mode_opt);
command_builder.arg(cmd);
let args = vec![shell_mode_opt.to_string(), cmd];
let mut child = pair.slave.spawn_command(command_builder)?;
// Obtain a killer that can signal the process independently of `.wait()`.
let killer = child.clone_killer();
// Channel to forward write requests to the PTY writer.
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
// Broadcast for streaming PTY output to readers: subscribers receive from subscription time.
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
// Reader task: drain PTY and forward chunks to output channel.
let mut reader = pair.master.try_clone_reader()?;
let output_tx_clone = output_tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break, // EOF
Ok(n) => {
// Forward to broadcast; best-effort if there are subscribers.
let _ = output_tx_clone.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {
// Retry on EINTR
continue;
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
// We're in a blocking thread; back off briefly and retry.
std::thread::sleep(Duration::from_millis(5));
continue;
}
Err(_) => break,
}
}
});
// Writer task: apply stdin writes to the PTY writer.
let writer = pair.master.take_writer()?;
let writer = Arc::new(StdMutex::new(writer));
let writer_handle = tokio::spawn({
let writer = writer.clone();
async move {
while let Some(bytes) = writer_rx.recv().await {
let writer = writer.clone();
// Perform blocking write on a blocking thread.
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut guard) = writer.lock() {
use std::io::Write;
let _ = guard.write_all(&bytes);
let _ = guard.flush();
}
})
.await;
}
}
});
// Keep the child alive until it exits, then signal exit code.
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = exit_status.clone();
let wait_handle = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.exit_code() as i32,
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = exit_tx.send(code);
});
// Create and store the session with channels.
let (session, initial_output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
killer,
reader_handle,
writer_handle,
wait_handle,
exit_status,
);
Ok((session, initial_output_rx, exit_rx))
let env = HashMap::new();
let spawned = spawn_pty_process(&shell, &args, &env).await?;
Ok((spawned.session, spawned.output_rx, spawned.exit_rx))
}
#[cfg(test)]

View File

@@ -11,6 +11,7 @@ use crate::function_tool::FunctionCallError;
pub(crate) enum ExecutionMode {
Shell,
InteractiveShell,
ApplyPatch(ApplyPatchExec),
}
@@ -36,7 +37,7 @@ static APPLY_PATCH_BACKEND: ApplyPatchBackend = ApplyPatchBackend;
pub(crate) fn backend_for_mode(mode: &ExecutionMode) -> &'static dyn ExecutionBackend {
match mode {
ExecutionMode::Shell => &SHELL_BACKEND,
ExecutionMode::Shell | ExecutionMode::InteractiveShell => &SHELL_BACKEND,
ExecutionMode::ApplyPatch(_) => &APPLY_PATCH_BACKEND,
}
}
@@ -52,7 +53,7 @@ impl ExecutionBackend for ShellBackend {
_config: &ExecutorConfig,
) -> Result<ExecParams, FunctionCallError> {
match mode {
ExecutionMode::Shell => Ok(params),
ExecutionMode::Shell | ExecutionMode::InteractiveShell => Ok(params),
_ => Err(FunctionCallError::RespondToModel(
"shell backend invoked with non-shell mode".to_string(),
)),
@@ -97,9 +98,11 @@ impl ExecutionBackend for ApplyPatchBackend {
justification: params.justification,
})
}
ExecutionMode::Shell => Err(FunctionCallError::RespondToModel(
"apply_patch backend invoked without patch context".to_string(),
)),
ExecutionMode::Shell | ExecutionMode::InteractiveShell => {
Err(FunctionCallError::RespondToModel(
"apply_patch backend invoked without patch context".to_string(),
))
}
}
}

View File

@@ -1,13 +1,46 @@
//! Executor: centralized sandbox policy, approvals, and execution planning.
//!
//! Purpose and responsibilities
//! - Normalizes permode parameters via backends (`backends.rs`).
//! - Selects sandbox placement and handles approvals (`sandbox.rs`).
//! - Produces an `ExecutionPlan` (single source of truth for policy) that
//! callers can either execute directly via `Executor::run` (nonPTY, piped),
//! or consume piecemeal (e.g., Unified Exec) to launch with a PTY while
//! retaining consistent policy decisions.
//!
//! Key types
//! - `ExecutionMode`: `Shell`, `InteractiveShell`, `ApplyPatch`.
//! - `ExecutionRequest`: inputs + mode + stdout streaming preference.
//! - `ExecutionPlan`: immutable snapshot of the policy decision and helpers to
//! build a `SandboxLaunch` and retry without a sandbox when approved.
//! - `SandboxLaunch`: concrete program/args/env to execute under the chosen
//! sandbox.
//!
//! Typical flows
//! - NonPTY (piped): `Executor::run(request, …)` handles plan → launch →
//! execution and postprocessing, including converting sandbox failures into
//! userfacing messages.
//! - PTY (Unified Exec): build the plan with `prepare_execution_plan` and then
//! use `ExecutionPlan::attempt_with_retry_if` to drive the spawn with
//! `SandboxLaunch`; PTY I/O and buffering remain the callers responsibility.
//!
//! This separation keeps sandbox logic and user interaction consistent while
//! allowing different transports (piped vs PTY) to manage their own lifecycles.
mod backends;
mod cache;
mod runner;
mod sandbox;
pub(crate) use backends::ExecutionMode;
pub(crate) use runner::ExecutionPlan;
pub(crate) use runner::ExecutionRequest;
pub(crate) use runner::Executor;
pub(crate) use runner::ExecutorConfig;
pub(crate) use runner::normalize_exec_result;
pub(crate) use sandbox::SandboxLaunch;
pub(crate) use sandbox::SandboxLaunchError;
pub(crate) use sandbox::build_launch_for_sandbox;
pub(crate) mod linkers {
use crate::exec::ExecParams;
@@ -45,6 +78,7 @@ pub(crate) mod linkers {
pub mod errors {
use crate::error::CodexErr;
use crate::executor::SandboxLaunchError;
use crate::function_tool::FunctionCallError;
use thiserror::Error;
@@ -61,4 +95,10 @@ pub mod errors {
FunctionCallError::RespondToModel(msg.into()).into()
}
}
impl From<SandboxLaunchError> for ExecError {
fn from(err: SandboxLaunchError) -> Self {
CodexErr::from(err).into()
}
}
}

View File

@@ -1,3 +1,4 @@
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
@@ -15,21 +16,27 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::process_exec_tool_call;
use crate::exec::execute_sandbox_launch;
use crate::executor::errors::ExecError;
use crate::executor::sandbox::RetrySandboxContext;
use crate::executor::sandbox::SandboxDecision;
use crate::executor::sandbox::SandboxLaunch;
use crate::executor::sandbox::SandboxLaunchError;
use crate::executor::sandbox::build_launch_for_sandbox;
use crate::executor::sandbox::select_sandbox;
use crate::function_tool::FunctionCallError;
use crate::protocol::AskForApproval;
use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy;
use crate::shell;
use crate::tools::context::ExecCommandContext;
use codex_otel::otel_event_manager::ToolDecisionSource;
#[derive(Clone, Debug)]
pub(crate) struct ExecutorConfig {
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) sandbox_cwd: PathBuf,
// Path to codex-linux-sandbox executable (Linux-only). Used by initial_launch when selecting Linux sandbox.
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
// Path to the codex binary itself (used by apply_patch backend to self-invoke when needed).
pub(crate) codex_exe: Option<PathBuf>,
}
@@ -37,16 +44,134 @@ impl ExecutorConfig {
pub(crate) fn new(
sandbox_policy: SandboxPolicy,
sandbox_cwd: PathBuf,
codex_linux_sandbox_exe: Option<PathBuf>,
codex_exe: Option<PathBuf>,
) -> Self {
let codex_exe = codex_exe.or_else(|| derive_codex_exe(&codex_linux_sandbox_exe));
Self {
sandbox_policy,
sandbox_cwd,
codex_linux_sandbox_exe,
codex_exe,
}
}
}
fn derive_codex_exe(sandbox_exe: &Option<PathBuf>) -> Option<PathBuf> {
sandbox_exe.as_ref().and_then(|path| {
let stem_matches_sandbox = path
.file_stem()
.and_then(|stem| stem.to_str())
.is_some_and(|stem| stem == "codex-linux-sandbox");
if stem_matches_sandbox {
None
} else {
Some(path.clone())
}
})
}
pub(crate) struct ExecutionPlan {
request: ExecutionRequest,
config: ExecutorConfig,
sandbox_decision: SandboxDecision,
stdout_stream: Option<StdoutStream>,
context: ExecCommandContext,
}
impl ExecutionPlan {
pub(crate) fn request(&self) -> &ExecutionRequest {
&self.request
}
pub(crate) fn config(&self) -> &ExecutorConfig {
&self.config
}
pub(crate) fn stdout_stream(&self) -> Option<StdoutStream> {
self.stdout_stream.clone()
}
pub(crate) fn initial_launch(&self) -> Result<SandboxLaunch, SandboxLaunchError> {
build_launch_for_sandbox(
self.sandbox_decision.initial_sandbox,
&self.request.params.command,
&self.config.sandbox_policy,
&self.config.sandbox_cwd,
self.config.codex_linux_sandbox_exe.as_ref(),
)
}
pub(crate) fn retry_launch(&self) -> Result<SandboxLaunch, SandboxLaunchError> {
build_launch_for_sandbox(
SandboxType::None,
&self.request.params.command,
&self.config.sandbox_policy,
&self.config.sandbox_cwd,
None,
)
}
pub(crate) fn approval_command(&self) -> &[String] {
&self.request.approval_command
}
pub(crate) async fn prompt_retry_without_sandbox(
&self,
session: &Session,
failure_message: impl Into<String>,
) -> bool {
if !self.sandbox_decision.escalate_on_failure {
return false;
}
let approval = crate::executor::sandbox::request_retry_without_sandbox(
session,
failure_message.into(),
self.approval_command(),
self.request.params.cwd.clone(),
RetrySandboxContext {
sub_id: &self.context.sub_id,
call_id: &self.context.call_id,
tool_name: &self.context.tool_name,
otel_event_manager: &self.context.otel_event_manager,
},
)
.await;
approval.is_some()
}
/// Like `attempt_with_retry`, but only retries if `should_retry(err)` returns true.
pub(crate) async fn attempt_with_retry_if<R, E, Fut, Attempt, Should>(
&self,
session: &Session,
attempt: Attempt,
should_retry: Should,
) -> Result<R, E>
where
Attempt: Fn(SandboxLaunch) -> Fut,
Fut: Future<Output = Result<R, E>>,
Should: Fn(&E) -> bool,
E: From<SandboxLaunchError> + std::fmt::Display,
{
let initial_launch = self.initial_launch().map_err(E::from)?;
match attempt(initial_launch).await {
Ok(result) => Ok(result),
Err(err) if self.sandbox_decision.escalate_on_failure && should_retry(&err) => {
let failure = format!("Execution failed: {err}");
if self.prompt_retry_without_sandbox(session, failure).await {
let retry_launch = self.retry_launch().map_err(E::from)?;
attempt(retry_launch).await
} else {
Err(err)
}
}
Err(err) => Err(err),
}
}
}
/// Coordinates sandbox selection, backend-specific preparation, and command
/// execution for tool calls requested by the model.
pub(crate) struct Executor {
@@ -62,26 +187,21 @@ impl Executor {
}
}
/// Updates the sandbox policy and working directory used for future
/// executions without recreating the executor.
pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) {
if let Ok(mut cfg) = self.config.write() {
cfg.sandbox_policy = sandbox_policy;
cfg.sandbox_cwd = sandbox_cwd;
}
pub(crate) fn record_session_approval(&self, command: Vec<String>) {
self.approval_cache.insert(command);
}
/// Runs a prepared execution request end-to-end: prepares parameters, decides on
/// sandbox placement (prompting the user when necessary), launches the command,
/// and lets the backend post-process the final output.
pub(crate) async fn run(
pub(crate) async fn prepare_execution_plan(
&self,
mut request: ExecutionRequest,
session: &Session,
approval_policy: AskForApproval,
context: &ExecCommandContext,
) -> Result<ExecToolCallOutput, ExecError> {
if matches!(request.mode, ExecutionMode::Shell) {
) -> Result<ExecutionPlan, ExecError> {
if matches!(
request.mode,
ExecutionMode::Shell | ExecutionMode::InteractiveShell
) {
request.params =
maybe_translate_shell_command(request.params, session, request.use_shell_profile);
}
@@ -104,6 +224,12 @@ impl Executor {
.prepare(request.params, &request.mode, &config)
.map_err(ExecError::from)?;
let config = self
.config
.read()
.map_err(|_| ExecError::rejection("executor config poisoned"))?
.clone();
// Step 3: Decide sandbox placement, prompting for approval when needed.
let sandbox_decision = select_sandbox(
&request,
@@ -120,118 +246,71 @@ impl Executor {
self.approval_cache.insert(request.approval_command.clone());
}
// Step 4: Launch the command within the chosen sandbox.
let first_attempt = self
.spawn(
request.params.clone(),
sandbox_decision.initial_sandbox,
&config,
stdout_stream.clone(),
Ok(ExecutionPlan {
request,
config,
sandbox_decision,
stdout_stream,
context: context.clone(),
})
}
/// Updates the sandbox policy and working directory used for future
/// executions without recreating the executor.
pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) {
if let Ok(mut cfg) = self.config.write() {
cfg.sandbox_policy = sandbox_policy;
cfg.sandbox_cwd = sandbox_cwd;
}
}
/// Runs a prepared execution request end-to-end: prepares parameters, decides on
/// sandbox placement (prompting the user when necessary), launches the command,
/// and lets the backend post-process the final output.
pub(crate) async fn run(
&self,
request: ExecutionRequest,
session: &Session,
approval_policy: AskForApproval,
context: &ExecCommandContext,
) -> Result<ExecToolCallOutput, ExecError> {
let plan = self
.prepare_execution_plan(request, session, approval_policy, context)
.await?;
let stdout_stream = plan.stdout_stream();
let sandbox_policy = plan.config().sandbox_policy.clone();
// Drive attempts via the shared helper, but do not retry on timeouts.
let result: Result<ExecToolCallOutput, CodexErr> = plan
.attempt_with_retry_if(
session,
|launch| {
let params = plan.request().params.clone();
let sandbox = plan.sandbox_decision.initial_sandbox;
let policy = sandbox_policy.clone();
let stream = stdout_stream.clone();
async move { execute_sandbox_launch(params, launch, sandbox, &policy, stream).await }
},
|err: &CodexErr| { !matches!(err, CodexErr::Sandbox(SandboxErr::Timeout { .. })) },
)
.await;
// Step 5: Handle sandbox outcomes, optionally escalating to an unsandboxed retry.
match first_attempt {
match result {
Ok(output) => Ok(output),
Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => {
Err(CodexErr::Sandbox(SandboxErr::Timeout { output }).into())
Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout {
output,
})))
}
Err(CodexErr::Sandbox(error)) => {
if sandbox_decision.escalate_on_failure {
self.retry_without_sandbox(
&request,
&config,
session,
context,
stdout_stream,
error,
)
.await
} else {
let message = sandbox_failure_message(error);
Err(ExecError::rejection(message))
}
// Convert non-timeout sandbox errors into user-facing rejection messages.
let message = sandbox_failure_message(error);
Err(ExecError::rejection(message))
}
Err(err) => Err(err.into()),
}
}
/// Fallback path invoked when a sandboxed run is denied so the user can
/// approve rerunning without isolation.
async fn retry_without_sandbox(
&self,
request: &ExecutionRequest,
config: &ExecutorConfig,
session: &Session,
context: &ExecCommandContext,
stdout_stream: Option<StdoutStream>,
sandbox_error: SandboxErr,
) -> Result<ExecToolCallOutput, ExecError> {
session
.notify_background_event(
&context.sub_id,
format!("Execution failed: {sandbox_error}"),
)
.await;
let decision = session
.request_command_approval(
context.sub_id.to_string(),
context.call_id.to_string(),
request.approval_command.clone(),
request.params.cwd.clone(),
Some("command failed; retry without sandbox?".to_string()),
)
.await;
context.otel_event_manager.tool_decision(
&context.tool_name,
&context.call_id,
decision,
ToolDecisionSource::User,
);
match decision {
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
if matches!(decision, ReviewDecision::ApprovedForSession) {
self.approval_cache.insert(request.approval_command.clone());
}
session
.notify_background_event(&context.sub_id, "retrying command without sandbox")
.await;
let retry_output = self
.spawn(
request.params.clone(),
SandboxType::None,
config,
stdout_stream,
)
.await?;
Ok(retry_output)
}
ReviewDecision::Denied | ReviewDecision::Abort => {
Err(ExecError::rejection("exec command rejected by user"))
}
}
}
async fn spawn(
&self,
params: ExecParams,
sandbox: SandboxType,
config: &ExecutorConfig,
stdout_stream: Option<StdoutStream>,
) -> Result<ExecToolCallOutput, CodexErr> {
process_exec_tool_call(
params,
sandbox,
&config.sandbox_policy,
&config.sandbox_cwd,
&config.codex_exe,
stdout_stream,
)
.await
}
}
fn maybe_translate_shell_command(

View File

@@ -5,14 +5,103 @@ use crate::executor::ExecutionMode;
use crate::executor::ExecutionRequest;
use crate::executor::ExecutorConfig;
use crate::executor::errors::ExecError;
use crate::landlock::create_linux_sandbox_command_args;
use crate::protocol::SandboxPolicy;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_patch_safety;
use crate::seatbelt::MACOS_PATH_TO_SEATBELT_EXECUTABLE;
use crate::seatbelt::create_seatbelt_command_args;
use crate::spawn::CODEX_SANDBOX_ENV_VAR;
use crate::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_otel::otel_event_manager::ToolDecisionSource;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewDecision;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug)]
pub(crate) struct SandboxLaunch {
pub sandbox_type: SandboxType,
pub program: String,
pub args: Vec<String>,
pub env: HashMap<String, String>,
}
#[derive(Debug, Error)]
pub(crate) enum SandboxLaunchError {
#[error("missing command line for sandbox launch")]
MissingCommandLine,
#[error("missing codex-linux-sandbox executable path")]
MissingLinuxSandboxExecutable,
}
pub(crate) fn build_launch_for_sandbox(
sandbox: SandboxType,
command: &[String],
sandbox_policy: &SandboxPolicy,
sandbox_policy_cwd: &Path,
codex_linux_sandbox_exe: Option<&PathBuf>,
) -> Result<SandboxLaunch, SandboxLaunchError> {
let mut env = HashMap::new();
if !sandbox_policy.has_full_network_access() {
env.insert(
CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR.to_string(),
"1".to_string(),
);
}
match sandbox {
SandboxType::None => {
let (program, args) = command
.split_first()
.ok_or(SandboxLaunchError::MissingCommandLine)?;
Ok(SandboxLaunch {
sandbox_type: SandboxType::None,
program: program.clone(),
args: args.to_vec(),
env,
})
}
SandboxType::MacosSeatbelt => {
env.insert(CODEX_SANDBOX_ENV_VAR.to_string(), "seatbelt".to_string());
let args =
create_seatbelt_command_args(command.to_vec(), sandbox_policy, sandbox_policy_cwd);
Ok(SandboxLaunch {
sandbox_type: SandboxType::MacosSeatbelt,
program: MACOS_PATH_TO_SEATBELT_EXECUTABLE.to_string(),
args,
env,
})
}
SandboxType::LinuxSeccomp => {
let exe =
codex_linux_sandbox_exe.ok_or(SandboxLaunchError::MissingLinuxSandboxExecutable)?;
let args = create_linux_sandbox_command_args(
command.to_vec(),
sandbox_policy,
sandbox_policy_cwd,
);
Ok(SandboxLaunch {
sandbox_type: SandboxType::LinuxSeccomp,
program: exe.to_string_lossy().to_string(),
args,
env,
})
}
}
}
pub(crate) struct RetrySandboxContext<'a> {
pub sub_id: &'a str,
pub call_id: &'a str,
pub tool_name: &'a str,
pub otel_event_manager: &'a OtelEventManager,
}
/// Sandbox placement options selected for an execution run, including whether
/// to escalate after failures and whether approvals should persist.
@@ -50,6 +139,53 @@ fn should_escalate_on_failure(approval: AskForApproval, sandbox: SandboxType) ->
)
}
pub(crate) async fn request_retry_without_sandbox(
session: &Session,
failure_message: impl Into<String>,
command: &[String],
cwd: PathBuf,
ctx: RetrySandboxContext<'_>,
) -> Option<ReviewDecision> {
session
.notify_background_event(ctx.sub_id, failure_message.into())
.await;
let approval_command = command.to_vec();
let decision = session
.request_command_approval(
ctx.sub_id.to_string(),
ctx.call_id.to_string(),
approval_command.clone(),
cwd,
Some("command failed; retry without sandbox?".to_string()),
)
.await;
ctx.otel_event_manager.tool_decision(
ctx.tool_name,
ctx.call_id,
decision,
ToolDecisionSource::User,
);
match decision {
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
if matches!(decision, ReviewDecision::ApprovedForSession) {
session
.services
.executor
.record_session_approval(approval_command);
}
session
.notify_background_event(ctx.sub_id, "retrying command without sandbox")
.await;
Some(decision)
}
ReviewDecision::Denied | ReviewDecision::Abort => None,
}
}
/// Determines how a command should be sandboxed, prompting the user when
/// policy requires explicit approval.
#[allow(clippy::too_many_arguments)]
@@ -64,7 +200,7 @@ pub async fn select_sandbox(
otel_event_manager: &OtelEventManager,
) -> Result<SandboxDecision, ExecError> {
match &request.mode {
ExecutionMode::Shell => {
ExecutionMode::Shell | ExecutionMode::InteractiveShell => {
select_shell_sandbox(
request,
approval_policy,
@@ -207,7 +343,7 @@ mod tests {
action,
user_explicitly_approved_this_action: true,
};
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
let request = ExecutionRequest {
params: ExecParams {
command: vec!["apply_patch".into()],
@@ -250,7 +386,12 @@ mod tests {
action,
user_explicitly_approved_this_action: false,
};
let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None);
let cfg = ExecutorConfig::new(
SandboxPolicy::DangerFullAccess,
std::env::temp_dir(),
None,
None,
);
let request = ExecutionRequest {
params: ExecParams {
command: vec!["apply_patch".into()],
@@ -278,9 +419,8 @@ mod tests {
)
.await
.expect("ok");
// On platforms with a sandbox, DangerFullAccess still prefers it
let expected = crate::safety::get_platform_sandbox().unwrap_or(SandboxType::None);
assert_eq!(decision.initial_sandbox, expected);
// DangerFullAccess bypasses sandboxing entirely.
assert_eq!(decision.initial_sandbox, SandboxType::None);
assert_eq!(decision.escalate_on_failure, false);
}
@@ -294,7 +434,7 @@ mod tests {
action,
user_explicitly_approved_this_action: false,
};
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
let request = ExecutionRequest {
params: ExecParams {
command: vec!["apply_patch".into()],
@@ -333,7 +473,12 @@ mod tests {
#[tokio::test]
async fn select_shell_autoapprove_in_danger_mode() {
let (session, ctx) = make_session_and_context();
let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None);
let cfg = ExecutorConfig::new(
SandboxPolicy::DangerFullAccess,
std::env::temp_dir(),
None,
None,
);
let request = ExecutionRequest {
params: ExecParams {
command: vec!["some-unknown".into()],
@@ -369,7 +514,7 @@ mod tests {
#[tokio::test]
async fn select_shell_escalates_on_failure_with_platform_sandbox() {
let (session, ctx) = make_session_and_context();
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
let request = ExecutionRequest {
params: ExecParams {
// Unknown command => untrusted but not flagged dangerous

View File

@@ -40,7 +40,7 @@ where
}
/// Converts the sandbox policy into the CLI invocation for `codex-linux-sandbox`.
fn create_linux_sandbox_command_args(
pub(crate) fn create_linux_sandbox_command_args(
command: Vec<String>,
sandbox_policy: &SandboxPolicy,
sandbox_policy_cwd: &Path,

View File

@@ -39,6 +39,7 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
mod pty;
mod truncate;
mod unified_exec;
mod user_instructions;

130
codex-rs/core/src/pty.rs Normal file
View File

@@ -0,0 +1,130 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use portable_pty::CommandBuilder;
use portable_pty::PtySize;
use portable_pty::native_pty_system;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::exec_command::ExecCommandSession;
#[derive(Debug)]
pub(crate) struct SpawnedPty {
pub session: ExecCommandSession,
pub output_rx: broadcast::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}
/// Spawn a PTY-based process and return the interactive session along with
/// receivers for streaming output and exit status.
pub(crate) async fn spawn_pty_process(
program: &str,
args: &[String],
env: &HashMap<String, String>,
) -> anyhow::Result<SpawnedPty> {
if program.is_empty() {
anyhow::bail!("missing program for PTY spawn");
}
let pty_system = native_pty_system();
let pair = pty_system.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})?;
let mut command_builder = CommandBuilder::new(program);
for arg in args {
command_builder.arg(arg.clone());
}
for (key, value) in env {
command_builder.env(key.clone(), value.clone());
}
let mut child = pair.slave.spawn_command(command_builder)?;
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let mut reader = pair.master.try_clone_reader()?;
let output_tx_clone = output_tx.clone();
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = output_tx_clone.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(5));
continue;
}
Err(_) => break,
}
}
});
let writer = pair.master.take_writer()?;
let writer = Arc::new(StdMutex::new(writer));
let writer_handle: JoinHandle<()> = tokio::spawn({
let writer = writer.clone();
async move {
while let Some(bytes) = writer_rx.recv().await {
let writer = writer.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut guard) = writer.lock() {
use std::io::Write;
let _ = guard.write_all(&bytes);
let _ = guard.flush();
}
})
.await;
}
}
});
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = exit_status.clone();
let exit_code = Arc::new(StdMutex::new(None));
let wait_exit_code = exit_code.clone();
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.exit_code() as i32,
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = wait_exit_code.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
let (session, output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
killer,
reader_handle,
writer_handle,
wait_handle,
exit_status,
exit_code,
);
Ok(SpawnedPty {
session,
output_rx,
exit_rx,
})
}

View File

@@ -55,23 +55,23 @@ pub fn assess_patch_safety(
if is_write_patch_constrained_to_writable_paths(action, sandbox_policy, cwd)
|| policy == AskForApproval::OnFailure
{
// Only autoapprove when we can actually enforce a sandbox. Otherwise
// fall back to asking the user because the patch may touch arbitrary
// paths outside the project.
match get_platform_sandbox() {
Some(sandbox_type) => SafetyCheck::AutoApprove {
sandbox_type,
if matches!(sandbox_policy, SandboxPolicy::DangerFullAccess) {
// DangerFullAccess is intended to bypass sandboxing entirely.
SafetyCheck::AutoApprove {
sandbox_type: SandboxType::None,
user_explicitly_approved: false,
},
None if sandbox_policy == &SandboxPolicy::DangerFullAccess => {
// If the user has explicitly requested DangerFullAccess, then
// we can auto-approve even without a sandbox.
SafetyCheck::AutoApprove {
sandbox_type: SandboxType::None,
user_explicitly_approved: false,
}
}
None => SafetyCheck::AskUser,
} else {
// Only autoapprove when we can actually enforce a sandbox. Otherwise
// fall back to asking the user because the patch may touch arbitrary
// paths outside the project.
match get_platform_sandbox() {
Some(sandbox_type) => SafetyCheck::AutoApprove {
sandbox_type,
user_explicitly_approved: false,
},
None => SafetyCheck::AskUser,
}
}
} else if policy == AskForApproval::Never {
SafetyCheck::Reject {

View File

@@ -14,7 +14,7 @@ const MACOS_SEATBELT_BASE_POLICY: &str = include_str!("seatbelt_base_policy.sbpl
/// to defend against an attacker trying to inject a malicious version on the
/// PATH. If /usr/bin/sandbox-exec has been tampered with, then the attacker
/// already has root access.
const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec";
pub(crate) const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec";
pub async fn spawn_command_under_seatbelt(
command: Vec<String>,
@@ -39,7 +39,7 @@ pub async fn spawn_command_under_seatbelt(
.await
}
fn create_seatbelt_command_args(
pub(crate) fn create_seatbelt_command_args(
command: Vec<String>,
sandbox_policy: &SandboxPolicy,
sandbox_policy_cwd: &Path,

View File

@@ -35,7 +35,13 @@ impl ToolHandler for UnifiedExecHandler {
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
let ToolInvocation {
session, payload, ..
session,
turn,
sub_id,
call_id,
tool_name,
payload,
..
} = invocation;
let args = match payload {
@@ -73,13 +79,24 @@ impl ToolHandler for UnifiedExecHandler {
};
let request = UnifiedExecRequest {
session_id: parsed_session_id,
input_chunks: &input,
timeout_ms,
};
let value = session
.run_unified_exec_request(request)
.services
.unified_exec_manager
.handle_request(
request,
crate::unified_exec::UnifiedExecContext {
session: &session,
turn: turn.as_ref(),
sub_id: &sub_id,
call_id: &call_id,
tool_name: &tool_name,
session_id: parsed_session_id,
},
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}"))

View File

@@ -1,22 +1,39 @@
use crate::executor::SandboxLaunchError;
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec session: {pty_error}")]
CreateSession {
#[source]
pty_error: anyhow::Error,
},
#[error("Failed to create unified exec session: {message}")]
CreateSession { message: String },
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]
MissingCommandLine,
#[error("missing codex-linux-sandbox executable path")]
MissingLinuxSandboxExecutable,
#[error("Command denied by sandbox: {message}")]
SandboxDenied { message: String },
}
impl UnifiedExecError {
pub(crate) fn create_session(error: anyhow::Error) -> Self {
Self::CreateSession { pty_error: error }
pub(crate) fn create_session(message: String) -> Self {
Self::CreateSession { message }
}
pub(crate) fn sandbox_denied(message: String) -> Self {
Self::SandboxDenied { message }
}
}
impl From<SandboxLaunchError> for UnifiedExecError {
fn from(err: SandboxLaunchError) -> Self {
match err {
SandboxLaunchError::MissingCommandLine => UnifiedExecError::MissingCommandLine,
SandboxLaunchError::MissingLinuxSandboxExecutable => {
UnifiedExecError::MissingLinuxSandboxExecutable
}
}
}
}

View File

@@ -1,23 +1,61 @@
use portable_pty::CommandBuilder;
use portable_pty::PtySize;
use portable_pty::native_pty_system;
//! Unified Exec: interactive PTY execution with session management.
//!
//! Purpose and responsibilities
//! - Manages interactive PTY sessions (create, reuse, buffer output with caps).
//! - Delegates sandbox selection, approvals, and policy decisions to the
//! executor (single source of truth) via `prepare_execution_plan`.
//! - Spawns the PTY using the `SandboxLaunch` produced by the plan and reuses
//! `ExecutionPlan::attempt_with_retry_if` to optionally retry without a
//! sandbox when policy allows and the user approves.
//! - After process exit, classifies sandbox denials using the shared
//! `is_likely_sandbox_denied` heuristic so denial messages stay consistent.
//!
//! Why not call `executor.run`?
//! `executor.run` drives a nonPTY (piped) execution flow endtoend. Unified
//! Exec needs an interactive PTY that persists across requests and supports
//! streaming I/O. To keep policy logic centralized while still owning the PTY
//! lifecycle, Unified Exec builds an `ExecutionRequest` with
//! `ExecutionMode::InteractiveShell`, asks the executor for an
//! `ExecutionPlan`, then performs the PTY spawn itself with the plans
//! sandboxed command and environment.
//!
//! Handoff at a glance
//! 1) Build `ExecutionRequest` (interactive shell).
//! 2) `executor.update_environment(turn.sandbox_policy, turn.cwd)`.
//! 3) `plan = executor.prepare_execution_plan(request, …)`.
//! 4) `plan.attempt_with_retry_if(|launch| spawn_pty_process(launch), retry_on_denied)`.
//! 5) Buffer+stream output, apply timeouts, and return `UnifiedExecResult`.
//!
//! This structure ensures Unified Exec benefits from the same approval,
//! sandbox selection, and escalation rules as regular exec, while keeping the
//! PTY/session concerns isolated here.
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::ErrorKind;
use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::exec::ExecParams;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StreamOutput;
use crate::exec::is_likely_sandbox_denied;
use crate::exec_command::ExecCommandSession;
use crate::executor::ExecutionMode;
use crate::executor::ExecutionPlan;
use crate::executor::ExecutionRequest;
use crate::pty::SpawnedPty;
use crate::tools::context::ExecCommandContext;
use crate::truncate::truncate_middle;
mod errors;
@@ -28,9 +66,17 @@ const DEFAULT_TIMEOUT_MS: u64 = 1_000;
const MAX_TIMEOUT_MS: u64 = 60_000;
const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB
pub(crate) struct UnifiedExecContext<'a> {
pub session: &'a Session,
pub turn: &'a TurnContext,
pub sub_id: &'a str,
pub call_id: &'a str,
pub tool_name: &'a str,
pub session_id: Option<i32>,
}
#[derive(Debug)]
pub(crate) struct UnifiedExecRequest<'a> {
pub session_id: Option<i32>,
pub input_chunks: &'a [String],
pub timeout_ms: Option<u64>,
}
@@ -44,17 +90,19 @@ pub(crate) struct UnifiedExecResult {
#[derive(Debug, Default)]
pub(crate) struct UnifiedExecSessionManager {
next_session_id: AtomicI32,
sessions: Mutex<HashMap<i32, ManagedUnifiedExecSession>>,
sessions: Mutex<HashMap<i32, UnifiedExecSession>>,
}
#[derive(Debug)]
struct ManagedUnifiedExecSession {
/// Wraps a PTY session with buffered output and sandbox metadata for unified exec.
struct UnifiedExecSession {
session: ExecCommandSession,
output_buffer: OutputBuffer,
/// Notifies waiters whenever new output has been appended to
/// `output_buffer`, allowing clients to poll for fresh data.
output_notify: Arc<Notify>,
output_task: JoinHandle<()>,
sandbox_type: SandboxType,
}
#[derive(Debug, Default)]
@@ -94,15 +142,20 @@ impl OutputBufferState {
self.total_bytes = 0;
drained
}
fn snapshot(&self) -> Vec<Vec<u8>> {
self.chunks.iter().cloned().collect()
}
}
type OutputBuffer = Arc<Mutex<OutputBufferState>>;
type OutputHandles = (OutputBuffer, Arc<Notify>);
impl ManagedUnifiedExecSession {
impl UnifiedExecSession {
fn new(
session: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
) -> Self {
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
let output_notify = Arc::new(Notify::new());
@@ -134,6 +187,7 @@ impl ManagedUnifiedExecSession {
output_buffer,
output_notify,
output_task,
sandbox_type,
}
}
@@ -151,18 +205,158 @@ impl ManagedUnifiedExecSession {
fn has_exited(&self) -> bool {
self.session.has_exited()
}
fn exit_code(&self) -> Option<i32> {
self.session.exit_code()
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot()
}
fn sandbox_type(&self) -> SandboxType {
self.sandbox_type
}
async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
if self.sandbox_type() == SandboxType::None || !self.has_exited() {
return Ok(());
}
// Give the reader task a brief moment to flush any final PTY bytes after exit.
let _ =
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
let collected_chunks = self.snapshot_output().await;
let mut aggregated: Vec<u8> = Vec::new();
for chunk in collected_chunks {
aggregated.extend_from_slice(&chunk);
}
let aggregated_text = String::from_utf8_lossy(&aggregated).to_string();
let exit_code = self.exit_code().unwrap_or(-1);
let exec_output = ExecToolCallOutput {
exit_code,
stdout: StreamOutput::new(aggregated_text.clone()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(aggregated_text.clone()),
duration: Duration::ZERO,
timed_out: false,
};
if is_likely_sandbox_denied(self.sandbox_type(), &exec_output) {
let (snippet, _) = truncate_middle(&aggregated_text, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
let message = if snippet.is_empty() {
format!("exit code {exit_code}")
} else {
snippet
};
return Err(UnifiedExecError::sandbox_denied(message));
}
Ok(())
}
async fn from_spawned(
spawned: SpawnedPty,
sandbox_type: SandboxType,
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session,
output_rx,
mut exit_rx,
} = spawned;
let managed = Self::new(session, output_rx, sandbox_type);
let exit_ready = match exit_rx.try_recv() {
Ok(_) | Err(TryRecvError::Closed) => true,
Err(TryRecvError::Empty) => false,
};
if exit_ready {
managed.check_for_sandbox_denial().await?;
}
Ok(managed)
}
}
impl Drop for ManagedUnifiedExecSession {
impl Drop for UnifiedExecSession {
fn drop(&mut self) {
self.output_task.abort();
}
}
impl UnifiedExecSessionManager {
async fn open_session_with_sandbox(
&self,
command: Vec<String>,
context: &UnifiedExecContext<'_>,
) -> Result<UnifiedExecSession, UnifiedExecError> {
let executor = &context.session.services.executor;
let otel_event_manager = context.turn.client.get_otel_event_manager();
let approval_command = command.clone();
let exec_context = ExecCommandContext {
sub_id: context.sub_id.to_string(),
call_id: context.call_id.to_string(),
command_for_display: approval_command.clone(),
cwd: context.turn.cwd.clone(),
apply_patch: None,
tool_name: context.tool_name.to_string(),
otel_event_manager,
};
let execution_request = ExecutionRequest {
params: ExecParams {
command,
cwd: context.turn.cwd.clone(),
timeout_ms: None,
env: HashMap::new(),
with_escalated_permissions: None,
justification: None,
},
approval_command,
mode: ExecutionMode::InteractiveShell,
stdout_stream: None,
use_shell_profile: false,
};
// Ensure the executor's environment reflects this turn before planning
executor.update_environment(
context.turn.sandbox_policy.clone(),
context.turn.cwd.clone(),
);
let plan: ExecutionPlan = executor
.prepare_execution_plan(
execution_request,
context.session,
context.turn.approval_policy,
&exec_context,
)
.await
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
plan.attempt_with_retry_if(
context.session,
|launch| async move {
let sandbox_type = launch.sandbox_type;
let spawned =
crate::pty::spawn_pty_process(&launch.program, &launch.args, &launch.env)
.await
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
UnifiedExecSession::from_spawned(spawned, sandbox_type).await
},
|err: &UnifiedExecError| matches!(err, UnifiedExecError::SandboxDenied { .. }),
)
.await
}
pub async fn handle_request(
&self,
request: UnifiedExecRequest<'_>,
context: UnifiedExecContext<'_>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
let (timeout_ms, timeout_warning) = match request.timeout_ms {
Some(requested) if requested > MAX_TIMEOUT_MS => (
@@ -175,13 +369,13 @@ impl UnifiedExecSessionManager {
None => (DEFAULT_TIMEOUT_MS, None),
};
let mut new_session: Option<ManagedUnifiedExecSession> = None;
let mut new_session: Option<UnifiedExecSession> = None;
let session_id;
let writer_tx;
let output_buffer;
let output_notify;
if let Some(existing_id) = request.session_id {
if let Some(existing_id) = context.session_id {
let mut sessions = self.sessions.lock().await;
match sessions.get(&existing_id) {
Some(session) => {
@@ -207,8 +401,7 @@ impl UnifiedExecSessionManager {
} else {
let command = request.input_chunks.to_vec();
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let (session, initial_output_rx) = create_unified_exec_session(&command).await?;
let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx);
let managed_session = self.open_session_with_sandbox(command, &context).await?;
let (buffer, notify) = managed_session.output_handles();
writer_tx = managed_session.writer_sender();
output_buffer = buffer;
@@ -217,11 +410,35 @@ impl UnifiedExecSessionManager {
new_session = Some(managed_session);
};
if request.session_id.is_some() {
let joined_input = request.input_chunks.join(" ");
if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err()
{
return Err(UnifiedExecError::WriteToStdin);
if context.session_id.is_some() {
let mut trailing_whitespace = true;
for chunk in request.input_chunks {
if chunk.is_empty() {
continue;
}
let leading_whitespace = chunk
.chars()
.next()
.map(char::is_whitespace)
.unwrap_or(true);
if !trailing_whitespace
&& !leading_whitespace
&& writer_tx.send(vec![b' ']).await.is_err()
{
return Err(UnifiedExecError::WriteToStdin);
}
if writer_tx.send(chunk.as_bytes().to_vec()).await.is_err() {
return Err(UnifiedExecError::WriteToStdin);
}
trailing_whitespace = chunk
.chars()
.next_back()
.map(char::is_whitespace)
.unwrap_or(trailing_whitespace);
}
}
@@ -276,7 +493,7 @@ impl UnifiedExecSessionManager {
let should_store_session = if let Some(session) = new_session.as_ref() {
!session.has_exited()
} else if request.session_id.is_some() {
} else if context.session_id.is_some() {
let mut sessions = self.sessions.lock().await;
if let Some(existing) = sessions.get(&session_id) {
if existing.has_exited() {
@@ -309,114 +526,55 @@ impl UnifiedExecSessionManager {
}
}
async fn create_unified_exec_session(
command: &[String],
) -> Result<
(
ExecCommandSession,
tokio::sync::broadcast::Receiver<Vec<u8>>,
),
UnifiedExecError,
> {
if command.is_empty() {
return Err(UnifiedExecError::MissingCommandLine);
}
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.map_err(UnifiedExecError::create_session)?;
// Safe thanks to the check at the top of the function.
let mut command_builder = CommandBuilder::new(command[0].clone());
for arg in &command[1..] {
command_builder.arg(arg);
}
let mut child = pair
.slave
.spawn_command(command_builder)
.map_err(UnifiedExecError::create_session)?;
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
let mut reader = pair
.master
.try_clone_reader()
.map_err(UnifiedExecError::create_session)?;
let output_tx_clone = output_tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = output_tx_clone.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(5));
continue;
}
Err(_) => break,
}
}
});
let writer = pair
.master
.take_writer()
.map_err(UnifiedExecError::create_session)?;
let writer = Arc::new(StdMutex::new(writer));
let writer_handle = tokio::spawn({
let writer = writer.clone();
async move {
while let Some(bytes) = writer_rx.recv().await {
let writer = writer.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut guard) = writer.lock() {
use std::io::Write;
let _ = guard.write_all(&bytes);
let _ = guard.flush();
}
})
.await;
}
}
});
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let wait_handle = tokio::task::spawn_blocking(move || {
let _ = child.wait();
wait_exit_status.store(true, Ordering::SeqCst);
});
let (session, initial_output_rx) = ExecCommandSession::new(
writer_tx,
output_tx,
killer,
reader_handle,
writer_handle,
wait_handle,
exit_status,
);
Ok((session, initial_output_rx))
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use super::*;
#[cfg(unix)]
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::make_session_and_context;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use core_test_support::skip_if_sandbox;
use std::sync::Arc;
fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
let (session, mut turn) = make_session_and_context();
turn.approval_policy = AskForApproval::Never;
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
(Arc::new(session), Arc::new(turn))
}
async fn run_unified_exec_request(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
session_id: Option<i32>,
input: Vec<String>,
timeout_ms: Option<u64>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
let request_input = input;
let request = UnifiedExecRequest {
input_chunks: &request_input,
timeout_ms,
};
session
.services
.unified_exec_manager
.handle_request(
request,
UnifiedExecContext {
session,
turn: turn.as_ref(),
sub_id: "sub",
call_id: "call",
tool_name: "unified_exec",
session_id,
},
)
.await
}
#[test]
fn push_chunk_trims_only_excess_bytes() {
@@ -435,158 +593,160 @@ mod tests {
assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'b']);
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_persists_across_requests_jif() -> Result<(), UnifiedExecError> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let (session, turn) = test_session_and_turn();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["bash".to_string(), "-i".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let session_id = open_shell.session_id.expect("expected session_id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
timeout_ms: Some(2_500),
})
.await?;
run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec![
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
Some(2_500),
)
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let out_2 = run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
Some(2_500),
)
.await?;
assert!(out_2.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let (session, turn) = test_session_and_turn();
let shell_a = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let shell_a = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let session_a = shell_a.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(2_500),
})
.await?;
run_unified_exec_request(
&session,
&turn,
Some(session_a),
vec!["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
Some(2_500),
)
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &[
"echo".to_string(),
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
],
timeout_ms: Some(2_500),
})
.await?;
let out_2 = run_unified_exec_request(
&session,
&turn,
None,
vec![
"echo".to_string(),
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
],
Some(2_500),
)
.await?;
assert!(!out_2.output.contains("codex"));
let out_3 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let out_3 = run_unified_exec_request(
&session,
&turn,
Some(session_a),
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
Some(2_500),
)
.await?;
assert!(out_3.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let (session, turn) = test_session_and_turn();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["bash".to_string(), "-i".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
timeout_ms: Some(2_500),
})
.await?;
run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec![
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
Some(2_500),
)
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(10),
})
.await?;
let out_2 = run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec!["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
Some(10),
)
.await?;
assert!(!out_2.output.contains("codex"));
tokio::time::sleep(Duration::from_secs(7)).await;
let empty = Vec::new();
let out_3 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &empty,
timeout_ms: Some(100),
})
.await?;
let out_3 =
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
.await?;
assert!(out_3.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test]
#[ignore] // Ignored while we have a better way to test this.
async fn requests_with_large_timeout_are_capped() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let (session, turn) = test_session_and_turn();
let result = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["echo".to_string(), "codex".to_string()],
timeout_ms: Some(120_000),
})
.await?;
let result = run_unified_exec_request(
&session,
&turn,
None,
vec!["echo".to_string(), "codex".to_string()],
Some(120_000),
)
.await?;
assert!(result.output.starts_with(
"Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n"
@@ -596,61 +756,66 @@ mod tests {
Ok(())
}
#[cfg(unix)]
#[tokio::test]
#[ignore] // Ignored while we have a better way to test this.
async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/echo".to_string(), "codex".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let (session, turn) = test_session_and_turn();
let result = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/echo".to_string(), "codex".to_string()],
Some(2_500),
)
.await?;
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
assert!(manager.sessions.lock().await.is_empty());
assert!(
session
.services
.unified_exec_manager
.sessions
.lock()
.await
.is_empty()
);
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reusing_completed_session_returns_unknown_session() -> Result<(), UnifiedExecError> {
skip_if_sandbox!(Ok(()));
let manager = UnifiedExecSessionManager::default();
let (session, turn) = test_session_and_turn();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(2_500),
})
.await?;
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["exit\n".to_string()],
timeout_ms: Some(2_500),
})
.await?;
run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec!["exit\n".to_string()],
Some(2_500),
)
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[],
timeout_ms: Some(100),
})
.await
.expect_err("expected unknown session error");
let err =
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
.await
.expect_err("expected unknown session error");
match err {
UnifiedExecError::UnknownSessionId { session_id: err_id } => {
@@ -659,7 +824,15 @@ mod tests {
other => panic!("expected UnknownSessionId, got {other:?}"),
}
assert!(!manager.sessions.lock().await.contains_key(&session_id));
assert!(
!session
.services
.unified_exec_manager
.sessions
.lock()
.await
.contains_key(&session_id)
);
Ok(())
}