mirror of
https://github.com/openai/codex.git
synced 2026-04-22 07:21:46 +03:00
Compare commits
22 Commits
dev/steve/
...
jif/sandbo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3eb9355dc | ||
|
|
aeb82f79c0 | ||
|
|
802082f748 | ||
|
|
8327dc047b | ||
|
|
d09165383f | ||
|
|
65be622e9f | ||
|
|
b7e834a02b | ||
|
|
9865a0cfe1 | ||
|
|
6755b3b3d8 | ||
|
|
a55fc6a88f | ||
|
|
b622a1c850 | ||
|
|
53f07c304b | ||
|
|
1e2f6f21e2 | ||
|
|
703c3825ce | ||
|
|
f61c7aa160 | ||
|
|
ecac194a66 | ||
|
|
b3cf2a4b41 | ||
|
|
f5c626b2e0 | ||
|
|
30ae983c3a | ||
|
|
b47d54d57c | ||
|
|
29a2305025 | ||
|
|
3aadc93924 |
@@ -472,6 +472,7 @@ impl Session {
|
||||
turn_context.sandbox_policy.clone(),
|
||||
turn_context.cwd.clone(),
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
None,
|
||||
)),
|
||||
};
|
||||
|
||||
@@ -1073,16 +1074,6 @@ impl Session {
|
||||
.map_err(FunctionCallError::RespondToModel)
|
||||
}
|
||||
|
||||
pub(crate) async fn run_unified_exec_request(
|
||||
&self,
|
||||
request: crate::unified_exec::UnifiedExecRequest<'_>,
|
||||
) -> Result<crate::unified_exec::UnifiedExecResult, crate::unified_exec::UnifiedExecError> {
|
||||
self.services
|
||||
.unified_exec_manager
|
||||
.handle_request(request)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn interrupt_task(self: &Arc<Self>) {
|
||||
info!("interrupt received: abort current task, if any");
|
||||
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
@@ -2771,6 +2762,7 @@ mod tests {
|
||||
turn_context.sandbox_policy.clone(),
|
||||
turn_context.cwd.clone(),
|
||||
None,
|
||||
None,
|
||||
)),
|
||||
};
|
||||
let session = Session {
|
||||
@@ -2839,6 +2831,7 @@ mod tests {
|
||||
config.sandbox_policy.clone(),
|
||||
config.cwd.clone(),
|
||||
None,
|
||||
None,
|
||||
)),
|
||||
};
|
||||
let session = Arc::new(Session {
|
||||
|
||||
@@ -18,13 +18,14 @@ use tokio::process::Child;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::error::SandboxErr;
|
||||
use crate::landlock::spawn_command_under_linux_sandbox;
|
||||
use crate::executor::SandboxLaunch;
|
||||
use crate::executor::SandboxLaunchError;
|
||||
use crate::executor::build_launch_for_sandbox;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::seatbelt::spawn_command_under_seatbelt;
|
||||
use crate::spawn::StdioPolicy;
|
||||
use crate::spawn::spawn_child_async;
|
||||
|
||||
@@ -86,58 +87,36 @@ pub async fn process_exec_tool_call(
|
||||
sandbox_cwd: &Path,
|
||||
codex_linux_sandbox_exe: &Option<PathBuf>,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
let launch = build_launch_for_sandbox(
|
||||
sandbox_type,
|
||||
¶ms.command,
|
||||
sandbox_policy,
|
||||
sandbox_cwd,
|
||||
codex_linux_sandbox_exe.as_ref(),
|
||||
)
|
||||
.map_err(CodexErr::from)?;
|
||||
execute_sandbox_launch(params, launch, sandbox_type, sandbox_policy, stdout_stream).await
|
||||
}
|
||||
|
||||
pub(crate) async fn execute_sandbox_launch(
|
||||
params: ExecParams,
|
||||
launch: SandboxLaunch,
|
||||
sandbox_type: SandboxType,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
let start = Instant::now();
|
||||
|
||||
let timeout_duration = params.timeout_duration();
|
||||
|
||||
let raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr> = match sandbox_type
|
||||
{
|
||||
SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await,
|
||||
SandboxType::MacosSeatbelt => {
|
||||
let ExecParams {
|
||||
command,
|
||||
cwd: command_cwd,
|
||||
env,
|
||||
..
|
||||
} = params;
|
||||
let child = spawn_command_under_seatbelt(
|
||||
command,
|
||||
command_cwd,
|
||||
sandbox_policy,
|
||||
sandbox_cwd,
|
||||
StdioPolicy::RedirectForShellTool,
|
||||
env,
|
||||
)
|
||||
.await?;
|
||||
consume_truncated_output(child, timeout_duration, stdout_stream.clone()).await
|
||||
}
|
||||
SandboxType::LinuxSeccomp => {
|
||||
let ExecParams {
|
||||
command,
|
||||
cwd: command_cwd,
|
||||
env,
|
||||
..
|
||||
} = params;
|
||||
|
||||
let codex_linux_sandbox_exe = codex_linux_sandbox_exe
|
||||
.as_ref()
|
||||
.ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?;
|
||||
let child = spawn_command_under_linux_sandbox(
|
||||
codex_linux_sandbox_exe,
|
||||
command,
|
||||
command_cwd,
|
||||
sandbox_policy,
|
||||
sandbox_cwd,
|
||||
StdioPolicy::RedirectForShellTool,
|
||||
env,
|
||||
)
|
||||
.await?;
|
||||
|
||||
consume_truncated_output(child, timeout_duration, stdout_stream).await
|
||||
}
|
||||
};
|
||||
let raw_output_result = spawn_with_launch(params, launch, sandbox_policy, stdout_stream).await;
|
||||
let duration = start.elapsed();
|
||||
finalize_exec_result(raw_output_result, sandbox_type, duration)
|
||||
}
|
||||
|
||||
fn finalize_exec_result(
|
||||
raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr>,
|
||||
sandbox_type: SandboxType,
|
||||
duration: Duration,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
match raw_output_result {
|
||||
Ok(raw_output) => {
|
||||
#[allow(unused_mut)]
|
||||
@@ -192,12 +171,73 @@ pub async fn process_exec_tool_call(
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_with_launch(
|
||||
params: ExecParams,
|
||||
launch: SandboxLaunch,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> std::result::Result<RawExecToolCallOutput, CodexErr> {
|
||||
let ExecParams {
|
||||
command: _,
|
||||
cwd,
|
||||
timeout_ms,
|
||||
mut env,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
} = params;
|
||||
|
||||
let SandboxLaunch {
|
||||
program,
|
||||
args,
|
||||
env: launch_env,
|
||||
..
|
||||
} = launch;
|
||||
env.extend(launch_env);
|
||||
|
||||
let mut command = Vec::with_capacity(1 + args.len());
|
||||
command.push(program);
|
||||
command.extend(args);
|
||||
|
||||
let updated_params = ExecParams {
|
||||
command,
|
||||
cwd,
|
||||
timeout_ms,
|
||||
env,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
};
|
||||
|
||||
exec(updated_params, sandbox_policy, stdout_stream).await
|
||||
}
|
||||
|
||||
pub(crate) mod errors {
|
||||
use super::CodexErr;
|
||||
use super::SandboxLaunchError;
|
||||
|
||||
impl From<SandboxLaunchError> for CodexErr {
|
||||
fn from(err: SandboxLaunchError) -> Self {
|
||||
match err {
|
||||
SandboxLaunchError::MissingCommandLine => CodexErr::Io(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"command args are empty",
|
||||
)),
|
||||
SandboxLaunchError::MissingLinuxSandboxExecutable => {
|
||||
CodexErr::LandlockSandboxExecutableNotProvided
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We don't have a fully deterministic way to tell if our command failed
|
||||
/// because of the sandbox - a command in the user's zshrc file might hit an
|
||||
/// error, but the command itself might fail or succeed for other reasons.
|
||||
/// For now, we conservatively check for well known command failure exit codes and
|
||||
/// also look for common sandbox denial keywords in the command output.
|
||||
fn is_likely_sandbox_denied(sandbox_type: SandboxType, exec_output: &ExecToolCallOutput) -> bool {
|
||||
pub(crate) fn is_likely_sandbox_denied(
|
||||
sandbox_type: SandboxType,
|
||||
exec_output: &ExecToolCallOutput,
|
||||
) -> bool {
|
||||
if sandbox_type == SandboxType::None || exec_output.exit_code == 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -27,9 +27,12 @@ pub(crate) struct ExecCommandSession {
|
||||
|
||||
/// Tracks whether the underlying process has exited.
|
||||
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
/// Captures the process exit code once it becomes available.
|
||||
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
|
||||
}
|
||||
|
||||
impl ExecCommandSession {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
@@ -38,6 +41,7 @@ impl ExecCommandSession {
|
||||
writer_handle: JoinHandle<()>,
|
||||
wait_handle: JoinHandle<()>,
|
||||
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
exit_code: std::sync::Arc<StdMutex<Option<i32>>>,
|
||||
) -> (Self, broadcast::Receiver<Vec<u8>>) {
|
||||
let initial_output_rx = output_tx.subscribe();
|
||||
(
|
||||
@@ -49,6 +53,7 @@ impl ExecCommandSession {
|
||||
writer_handle: StdMutex::new(Some(writer_handle)),
|
||||
wait_handle: StdMutex::new(Some(wait_handle)),
|
||||
exit_status,
|
||||
exit_code,
|
||||
},
|
||||
initial_output_rx,
|
||||
)
|
||||
@@ -65,6 +70,10 @@ impl ExecCommandSession {
|
||||
pub(crate) fn has_exited(&self) -> bool {
|
||||
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub(crate) fn exit_code(&self) -> Option<i32> {
|
||||
self.exit_code.lock().ok().and_then(|guard| *guard)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ExecCommandSession {
|
||||
|
||||
@@ -1,16 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
|
||||
use portable_pty::CommandBuilder;
|
||||
use portable_pty::PtySize;
|
||||
use portable_pty::native_pty_system;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
@@ -20,6 +11,7 @@ use crate::exec_command::exec_command_params::ExecCommandParams;
|
||||
use crate::exec_command::exec_command_params::WriteStdinParams;
|
||||
use crate::exec_command::exec_command_session::ExecCommandSession;
|
||||
use crate::exec_command::session_id::SessionId;
|
||||
use crate::pty::spawn_pty_process;
|
||||
use crate::truncate::truncate_middle;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -242,102 +234,12 @@ async fn create_exec_command_session(
|
||||
login,
|
||||
} = params;
|
||||
|
||||
// Use the native pty implementation for the system
|
||||
let pty_system = native_pty_system();
|
||||
|
||||
// Create a new pty
|
||||
let pair = pty_system.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})?;
|
||||
|
||||
// Spawn a shell into the pty
|
||||
let mut command_builder = CommandBuilder::new(shell);
|
||||
let shell_mode_opt = if login { "-lc" } else { "-c" };
|
||||
command_builder.arg(shell_mode_opt);
|
||||
command_builder.arg(cmd);
|
||||
let args = vec![shell_mode_opt.to_string(), cmd];
|
||||
|
||||
let mut child = pair.slave.spawn_command(command_builder)?;
|
||||
// Obtain a killer that can signal the process independently of `.wait()`.
|
||||
let killer = child.clone_killer();
|
||||
|
||||
// Channel to forward write requests to the PTY writer.
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
// Broadcast for streaming PTY output to readers: subscribers receive from subscription time.
|
||||
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
|
||||
// Reader task: drain PTY and forward chunks to output channel.
|
||||
let mut reader = pair.master.try_clone_reader()?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break, // EOF
|
||||
Ok(n) => {
|
||||
// Forward to broadcast; best-effort if there are subscribers.
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => {
|
||||
// Retry on EINTR
|
||||
continue;
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
// We're in a blocking thread; back off briefly and retry.
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
continue;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Writer task: apply stdin writes to the PTY writer.
|
||||
let writer = pair.master.take_writer()?;
|
||||
let writer = Arc::new(StdMutex::new(writer));
|
||||
let writer_handle = tokio::spawn({
|
||||
let writer = writer.clone();
|
||||
async move {
|
||||
while let Some(bytes) = writer_rx.recv().await {
|
||||
let writer = writer.clone();
|
||||
// Perform blocking write on a blocking thread.
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
if let Ok(mut guard) = writer.lock() {
|
||||
use std::io::Write;
|
||||
let _ = guard.write_all(&bytes);
|
||||
let _ = guard.flush();
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Keep the child alive until it exits, then signal exit code.
|
||||
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = exit_status.clone();
|
||||
let wait_handle = tokio::task::spawn_blocking(move || {
|
||||
let code = match child.wait() {
|
||||
Ok(status) => status.exit_code() as i32,
|
||||
Err(_) => -1,
|
||||
};
|
||||
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
// Create and store the session with channels.
|
||||
let (session, initial_output_rx) = ExecCommandSession::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer,
|
||||
reader_handle,
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
);
|
||||
Ok((session, initial_output_rx, exit_rx))
|
||||
let env = HashMap::new();
|
||||
let spawned = spawn_pty_process(&shell, &args, &env).await?;
|
||||
Ok((spawned.session, spawned.output_rx, spawned.exit_rx))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::function_tool::FunctionCallError;
|
||||
|
||||
pub(crate) enum ExecutionMode {
|
||||
Shell,
|
||||
InteractiveShell,
|
||||
ApplyPatch(ApplyPatchExec),
|
||||
}
|
||||
|
||||
@@ -36,7 +37,7 @@ static APPLY_PATCH_BACKEND: ApplyPatchBackend = ApplyPatchBackend;
|
||||
|
||||
pub(crate) fn backend_for_mode(mode: &ExecutionMode) -> &'static dyn ExecutionBackend {
|
||||
match mode {
|
||||
ExecutionMode::Shell => &SHELL_BACKEND,
|
||||
ExecutionMode::Shell | ExecutionMode::InteractiveShell => &SHELL_BACKEND,
|
||||
ExecutionMode::ApplyPatch(_) => &APPLY_PATCH_BACKEND,
|
||||
}
|
||||
}
|
||||
@@ -52,7 +53,7 @@ impl ExecutionBackend for ShellBackend {
|
||||
_config: &ExecutorConfig,
|
||||
) -> Result<ExecParams, FunctionCallError> {
|
||||
match mode {
|
||||
ExecutionMode::Shell => Ok(params),
|
||||
ExecutionMode::Shell | ExecutionMode::InteractiveShell => Ok(params),
|
||||
_ => Err(FunctionCallError::RespondToModel(
|
||||
"shell backend invoked with non-shell mode".to_string(),
|
||||
)),
|
||||
@@ -97,9 +98,11 @@ impl ExecutionBackend for ApplyPatchBackend {
|
||||
justification: params.justification,
|
||||
})
|
||||
}
|
||||
ExecutionMode::Shell => Err(FunctionCallError::RespondToModel(
|
||||
"apply_patch backend invoked without patch context".to_string(),
|
||||
)),
|
||||
ExecutionMode::Shell | ExecutionMode::InteractiveShell => {
|
||||
Err(FunctionCallError::RespondToModel(
|
||||
"apply_patch backend invoked without patch context".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,46 @@
|
||||
//! Executor: centralized sandbox policy, approvals, and execution planning.
|
||||
//!
|
||||
//! Purpose and responsibilities
|
||||
//! - Normalizes per‑mode parameters via backends (`backends.rs`).
|
||||
//! - Selects sandbox placement and handles approvals (`sandbox.rs`).
|
||||
//! - Produces an `ExecutionPlan` (single source of truth for policy) that
|
||||
//! callers can either execute directly via `Executor::run` (non‑PTY, piped),
|
||||
//! or consume piecemeal (e.g., Unified Exec) to launch with a PTY while
|
||||
//! retaining consistent policy decisions.
|
||||
//!
|
||||
//! Key types
|
||||
//! - `ExecutionMode`: `Shell`, `InteractiveShell`, `ApplyPatch`.
|
||||
//! - `ExecutionRequest`: inputs + mode + stdout streaming preference.
|
||||
//! - `ExecutionPlan`: immutable snapshot of the policy decision and helpers to
|
||||
//! build a `SandboxLaunch` and retry without a sandbox when approved.
|
||||
//! - `SandboxLaunch`: concrete program/args/env to execute under the chosen
|
||||
//! sandbox.
|
||||
//!
|
||||
//! Typical flows
|
||||
//! - Non‑PTY (piped): `Executor::run(request, …)` handles plan → launch →
|
||||
//! execution and post‑processing, including converting sandbox failures into
|
||||
//! user‑facing messages.
|
||||
//! - PTY (Unified Exec): build the plan with `prepare_execution_plan` and then
|
||||
//! use `ExecutionPlan::attempt_with_retry_if` to drive the spawn with
|
||||
//! `SandboxLaunch`; PTY I/O and buffering remain the caller’s responsibility.
|
||||
//!
|
||||
//! This separation keeps sandbox logic and user interaction consistent while
|
||||
//! allowing different transports (piped vs PTY) to manage their own lifecycles.
|
||||
|
||||
mod backends;
|
||||
mod cache;
|
||||
mod runner;
|
||||
mod sandbox;
|
||||
|
||||
pub(crate) use backends::ExecutionMode;
|
||||
pub(crate) use runner::ExecutionPlan;
|
||||
pub(crate) use runner::ExecutionRequest;
|
||||
pub(crate) use runner::Executor;
|
||||
pub(crate) use runner::ExecutorConfig;
|
||||
pub(crate) use runner::normalize_exec_result;
|
||||
pub(crate) use sandbox::SandboxLaunch;
|
||||
pub(crate) use sandbox::SandboxLaunchError;
|
||||
pub(crate) use sandbox::build_launch_for_sandbox;
|
||||
|
||||
pub(crate) mod linkers {
|
||||
use crate::exec::ExecParams;
|
||||
@@ -45,6 +78,7 @@ pub(crate) mod linkers {
|
||||
|
||||
pub mod errors {
|
||||
use crate::error::CodexErr;
|
||||
use crate::executor::SandboxLaunchError;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -61,4 +95,10 @@ pub mod errors {
|
||||
FunctionCallError::RespondToModel(msg.into()).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SandboxLaunchError> for ExecError {
|
||||
fn from(err: SandboxLaunchError) -> Self {
|
||||
CodexErr::from(err).into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
@@ -15,21 +16,27 @@ use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StdoutStream;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec::process_exec_tool_call;
|
||||
use crate::exec::execute_sandbox_launch;
|
||||
use crate::executor::errors::ExecError;
|
||||
use crate::executor::sandbox::RetrySandboxContext;
|
||||
use crate::executor::sandbox::SandboxDecision;
|
||||
use crate::executor::sandbox::SandboxLaunch;
|
||||
use crate::executor::sandbox::SandboxLaunchError;
|
||||
use crate::executor::sandbox::build_launch_for_sandbox;
|
||||
use crate::executor::sandbox::select_sandbox;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::shell;
|
||||
use crate::tools::context::ExecCommandContext;
|
||||
use codex_otel::otel_event_manager::ToolDecisionSource;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ExecutorConfig {
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) sandbox_cwd: PathBuf,
|
||||
// Path to codex-linux-sandbox executable (Linux-only). Used by initial_launch when selecting Linux sandbox.
|
||||
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
// Path to the codex binary itself (used by apply_patch backend to self-invoke when needed).
|
||||
pub(crate) codex_exe: Option<PathBuf>,
|
||||
}
|
||||
|
||||
@@ -37,16 +44,134 @@ impl ExecutorConfig {
|
||||
pub(crate) fn new(
|
||||
sandbox_policy: SandboxPolicy,
|
||||
sandbox_cwd: PathBuf,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
codex_exe: Option<PathBuf>,
|
||||
) -> Self {
|
||||
let codex_exe = codex_exe.or_else(|| derive_codex_exe(&codex_linux_sandbox_exe));
|
||||
Self {
|
||||
sandbox_policy,
|
||||
sandbox_cwd,
|
||||
codex_linux_sandbox_exe,
|
||||
codex_exe,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_codex_exe(sandbox_exe: &Option<PathBuf>) -> Option<PathBuf> {
|
||||
sandbox_exe.as_ref().and_then(|path| {
|
||||
let stem_matches_sandbox = path
|
||||
.file_stem()
|
||||
.and_then(|stem| stem.to_str())
|
||||
.is_some_and(|stem| stem == "codex-linux-sandbox");
|
||||
if stem_matches_sandbox {
|
||||
None
|
||||
} else {
|
||||
Some(path.clone())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) struct ExecutionPlan {
|
||||
request: ExecutionRequest,
|
||||
config: ExecutorConfig,
|
||||
sandbox_decision: SandboxDecision,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
context: ExecCommandContext,
|
||||
}
|
||||
|
||||
impl ExecutionPlan {
|
||||
pub(crate) fn request(&self) -> &ExecutionRequest {
|
||||
&self.request
|
||||
}
|
||||
|
||||
pub(crate) fn config(&self) -> &ExecutorConfig {
|
||||
&self.config
|
||||
}
|
||||
|
||||
pub(crate) fn stdout_stream(&self) -> Option<StdoutStream> {
|
||||
self.stdout_stream.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn initial_launch(&self) -> Result<SandboxLaunch, SandboxLaunchError> {
|
||||
build_launch_for_sandbox(
|
||||
self.sandbox_decision.initial_sandbox,
|
||||
&self.request.params.command,
|
||||
&self.config.sandbox_policy,
|
||||
&self.config.sandbox_cwd,
|
||||
self.config.codex_linux_sandbox_exe.as_ref(),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn retry_launch(&self) -> Result<SandboxLaunch, SandboxLaunchError> {
|
||||
build_launch_for_sandbox(
|
||||
SandboxType::None,
|
||||
&self.request.params.command,
|
||||
&self.config.sandbox_policy,
|
||||
&self.config.sandbox_cwd,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn approval_command(&self) -> &[String] {
|
||||
&self.request.approval_command
|
||||
}
|
||||
|
||||
pub(crate) async fn prompt_retry_without_sandbox(
|
||||
&self,
|
||||
session: &Session,
|
||||
failure_message: impl Into<String>,
|
||||
) -> bool {
|
||||
if !self.sandbox_decision.escalate_on_failure {
|
||||
return false;
|
||||
}
|
||||
|
||||
let approval = crate::executor::sandbox::request_retry_without_sandbox(
|
||||
session,
|
||||
failure_message.into(),
|
||||
self.approval_command(),
|
||||
self.request.params.cwd.clone(),
|
||||
RetrySandboxContext {
|
||||
sub_id: &self.context.sub_id,
|
||||
call_id: &self.context.call_id,
|
||||
tool_name: &self.context.tool_name,
|
||||
otel_event_manager: &self.context.otel_event_manager,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
approval.is_some()
|
||||
}
|
||||
|
||||
/// Like `attempt_with_retry`, but only retries if `should_retry(err)` returns true.
|
||||
pub(crate) async fn attempt_with_retry_if<R, E, Fut, Attempt, Should>(
|
||||
&self,
|
||||
session: &Session,
|
||||
attempt: Attempt,
|
||||
should_retry: Should,
|
||||
) -> Result<R, E>
|
||||
where
|
||||
Attempt: Fn(SandboxLaunch) -> Fut,
|
||||
Fut: Future<Output = Result<R, E>>,
|
||||
Should: Fn(&E) -> bool,
|
||||
E: From<SandboxLaunchError> + std::fmt::Display,
|
||||
{
|
||||
let initial_launch = self.initial_launch().map_err(E::from)?;
|
||||
match attempt(initial_launch).await {
|
||||
Ok(result) => Ok(result),
|
||||
Err(err) if self.sandbox_decision.escalate_on_failure && should_retry(&err) => {
|
||||
let failure = format!("Execution failed: {err}");
|
||||
if self.prompt_retry_without_sandbox(session, failure).await {
|
||||
let retry_launch = self.retry_launch().map_err(E::from)?;
|
||||
attempt(retry_launch).await
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Coordinates sandbox selection, backend-specific preparation, and command
|
||||
/// execution for tool calls requested by the model.
|
||||
pub(crate) struct Executor {
|
||||
@@ -62,26 +187,21 @@ impl Executor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the sandbox policy and working directory used for future
|
||||
/// executions without recreating the executor.
|
||||
pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) {
|
||||
if let Ok(mut cfg) = self.config.write() {
|
||||
cfg.sandbox_policy = sandbox_policy;
|
||||
cfg.sandbox_cwd = sandbox_cwd;
|
||||
}
|
||||
pub(crate) fn record_session_approval(&self, command: Vec<String>) {
|
||||
self.approval_cache.insert(command);
|
||||
}
|
||||
|
||||
/// Runs a prepared execution request end-to-end: prepares parameters, decides on
|
||||
/// sandbox placement (prompting the user when necessary), launches the command,
|
||||
/// and lets the backend post-process the final output.
|
||||
pub(crate) async fn run(
|
||||
pub(crate) async fn prepare_execution_plan(
|
||||
&self,
|
||||
mut request: ExecutionRequest,
|
||||
session: &Session,
|
||||
approval_policy: AskForApproval,
|
||||
context: &ExecCommandContext,
|
||||
) -> Result<ExecToolCallOutput, ExecError> {
|
||||
if matches!(request.mode, ExecutionMode::Shell) {
|
||||
) -> Result<ExecutionPlan, ExecError> {
|
||||
if matches!(
|
||||
request.mode,
|
||||
ExecutionMode::Shell | ExecutionMode::InteractiveShell
|
||||
) {
|
||||
request.params =
|
||||
maybe_translate_shell_command(request.params, session, request.use_shell_profile);
|
||||
}
|
||||
@@ -104,6 +224,12 @@ impl Executor {
|
||||
.prepare(request.params, &request.mode, &config)
|
||||
.map_err(ExecError::from)?;
|
||||
|
||||
let config = self
|
||||
.config
|
||||
.read()
|
||||
.map_err(|_| ExecError::rejection("executor config poisoned"))?
|
||||
.clone();
|
||||
|
||||
// Step 3: Decide sandbox placement, prompting for approval when needed.
|
||||
let sandbox_decision = select_sandbox(
|
||||
&request,
|
||||
@@ -120,118 +246,71 @@ impl Executor {
|
||||
self.approval_cache.insert(request.approval_command.clone());
|
||||
}
|
||||
|
||||
// Step 4: Launch the command within the chosen sandbox.
|
||||
let first_attempt = self
|
||||
.spawn(
|
||||
request.params.clone(),
|
||||
sandbox_decision.initial_sandbox,
|
||||
&config,
|
||||
stdout_stream.clone(),
|
||||
Ok(ExecutionPlan {
|
||||
request,
|
||||
config,
|
||||
sandbox_decision,
|
||||
stdout_stream,
|
||||
context: context.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Updates the sandbox policy and working directory used for future
|
||||
/// executions without recreating the executor.
|
||||
pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) {
|
||||
if let Ok(mut cfg) = self.config.write() {
|
||||
cfg.sandbox_policy = sandbox_policy;
|
||||
cfg.sandbox_cwd = sandbox_cwd;
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a prepared execution request end-to-end: prepares parameters, decides on
|
||||
/// sandbox placement (prompting the user when necessary), launches the command,
|
||||
/// and lets the backend post-process the final output.
|
||||
pub(crate) async fn run(
|
||||
&self,
|
||||
request: ExecutionRequest,
|
||||
session: &Session,
|
||||
approval_policy: AskForApproval,
|
||||
context: &ExecCommandContext,
|
||||
) -> Result<ExecToolCallOutput, ExecError> {
|
||||
let plan = self
|
||||
.prepare_execution_plan(request, session, approval_policy, context)
|
||||
.await?;
|
||||
|
||||
let stdout_stream = plan.stdout_stream();
|
||||
let sandbox_policy = plan.config().sandbox_policy.clone();
|
||||
|
||||
// Drive attempts via the shared helper, but do not retry on timeouts.
|
||||
let result: Result<ExecToolCallOutput, CodexErr> = plan
|
||||
.attempt_with_retry_if(
|
||||
session,
|
||||
|launch| {
|
||||
let params = plan.request().params.clone();
|
||||
let sandbox = plan.sandbox_decision.initial_sandbox;
|
||||
let policy = sandbox_policy.clone();
|
||||
let stream = stdout_stream.clone();
|
||||
async move { execute_sandbox_launch(params, launch, sandbox, &policy, stream).await }
|
||||
},
|
||||
|err: &CodexErr| { !matches!(err, CodexErr::Sandbox(SandboxErr::Timeout { .. })) },
|
||||
)
|
||||
.await;
|
||||
|
||||
// Step 5: Handle sandbox outcomes, optionally escalating to an unsandboxed retry.
|
||||
match first_attempt {
|
||||
match result {
|
||||
Ok(output) => Ok(output),
|
||||
Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => {
|
||||
Err(CodexErr::Sandbox(SandboxErr::Timeout { output }).into())
|
||||
Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout {
|
||||
output,
|
||||
})))
|
||||
}
|
||||
Err(CodexErr::Sandbox(error)) => {
|
||||
if sandbox_decision.escalate_on_failure {
|
||||
self.retry_without_sandbox(
|
||||
&request,
|
||||
&config,
|
||||
session,
|
||||
context,
|
||||
stdout_stream,
|
||||
error,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
let message = sandbox_failure_message(error);
|
||||
Err(ExecError::rejection(message))
|
||||
}
|
||||
// Convert non-timeout sandbox errors into user-facing rejection messages.
|
||||
let message = sandbox_failure_message(error);
|
||||
Err(ExecError::rejection(message))
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Fallback path invoked when a sandboxed run is denied so the user can
|
||||
/// approve rerunning without isolation.
|
||||
async fn retry_without_sandbox(
|
||||
&self,
|
||||
request: &ExecutionRequest,
|
||||
config: &ExecutorConfig,
|
||||
session: &Session,
|
||||
context: &ExecCommandContext,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
sandbox_error: SandboxErr,
|
||||
) -> Result<ExecToolCallOutput, ExecError> {
|
||||
session
|
||||
.notify_background_event(
|
||||
&context.sub_id,
|
||||
format!("Execution failed: {sandbox_error}"),
|
||||
)
|
||||
.await;
|
||||
let decision = session
|
||||
.request_command_approval(
|
||||
context.sub_id.to_string(),
|
||||
context.call_id.to_string(),
|
||||
request.approval_command.clone(),
|
||||
request.params.cwd.clone(),
|
||||
Some("command failed; retry without sandbox?".to_string()),
|
||||
)
|
||||
.await;
|
||||
|
||||
context.otel_event_manager.tool_decision(
|
||||
&context.tool_name,
|
||||
&context.call_id,
|
||||
decision,
|
||||
ToolDecisionSource::User,
|
||||
);
|
||||
match decision {
|
||||
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
|
||||
if matches!(decision, ReviewDecision::ApprovedForSession) {
|
||||
self.approval_cache.insert(request.approval_command.clone());
|
||||
}
|
||||
session
|
||||
.notify_background_event(&context.sub_id, "retrying command without sandbox")
|
||||
.await;
|
||||
|
||||
let retry_output = self
|
||||
.spawn(
|
||||
request.params.clone(),
|
||||
SandboxType::None,
|
||||
config,
|
||||
stdout_stream,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(retry_output)
|
||||
}
|
||||
ReviewDecision::Denied | ReviewDecision::Abort => {
|
||||
Err(ExecError::rejection("exec command rejected by user"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn(
|
||||
&self,
|
||||
params: ExecParams,
|
||||
sandbox: SandboxType,
|
||||
config: &ExecutorConfig,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<ExecToolCallOutput, CodexErr> {
|
||||
process_exec_tool_call(
|
||||
params,
|
||||
sandbox,
|
||||
&config.sandbox_policy,
|
||||
&config.sandbox_cwd,
|
||||
&config.codex_exe,
|
||||
stdout_stream,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_translate_shell_command(
|
||||
|
||||
@@ -5,14 +5,103 @@ use crate::executor::ExecutionMode;
|
||||
use crate::executor::ExecutionRequest;
|
||||
use crate::executor::ExecutorConfig;
|
||||
use crate::executor::errors::ExecError;
|
||||
use crate::landlock::create_linux_sandbox_command_args;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::safety::SafetyCheck;
|
||||
use crate::safety::assess_command_safety;
|
||||
use crate::safety::assess_patch_safety;
|
||||
use crate::seatbelt::MACOS_PATH_TO_SEATBELT_EXECUTABLE;
|
||||
use crate::seatbelt::create_seatbelt_command_args;
|
||||
use crate::spawn::CODEX_SANDBOX_ENV_VAR;
|
||||
use crate::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_otel::otel_event_manager::ToolDecisionSource;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SandboxLaunch {
|
||||
pub sandbox_type: SandboxType,
|
||||
pub program: String,
|
||||
pub args: Vec<String>,
|
||||
pub env: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum SandboxLaunchError {
|
||||
#[error("missing command line for sandbox launch")]
|
||||
MissingCommandLine,
|
||||
#[error("missing codex-linux-sandbox executable path")]
|
||||
MissingLinuxSandboxExecutable,
|
||||
}
|
||||
|
||||
pub(crate) fn build_launch_for_sandbox(
|
||||
sandbox: SandboxType,
|
||||
command: &[String],
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
sandbox_policy_cwd: &Path,
|
||||
codex_linux_sandbox_exe: Option<&PathBuf>,
|
||||
) -> Result<SandboxLaunch, SandboxLaunchError> {
|
||||
let mut env = HashMap::new();
|
||||
if !sandbox_policy.has_full_network_access() {
|
||||
env.insert(
|
||||
CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR.to_string(),
|
||||
"1".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
match sandbox {
|
||||
SandboxType::None => {
|
||||
let (program, args) = command
|
||||
.split_first()
|
||||
.ok_or(SandboxLaunchError::MissingCommandLine)?;
|
||||
Ok(SandboxLaunch {
|
||||
sandbox_type: SandboxType::None,
|
||||
program: program.clone(),
|
||||
args: args.to_vec(),
|
||||
env,
|
||||
})
|
||||
}
|
||||
SandboxType::MacosSeatbelt => {
|
||||
env.insert(CODEX_SANDBOX_ENV_VAR.to_string(), "seatbelt".to_string());
|
||||
let args =
|
||||
create_seatbelt_command_args(command.to_vec(), sandbox_policy, sandbox_policy_cwd);
|
||||
Ok(SandboxLaunch {
|
||||
sandbox_type: SandboxType::MacosSeatbelt,
|
||||
program: MACOS_PATH_TO_SEATBELT_EXECUTABLE.to_string(),
|
||||
args,
|
||||
env,
|
||||
})
|
||||
}
|
||||
SandboxType::LinuxSeccomp => {
|
||||
let exe =
|
||||
codex_linux_sandbox_exe.ok_or(SandboxLaunchError::MissingLinuxSandboxExecutable)?;
|
||||
let args = create_linux_sandbox_command_args(
|
||||
command.to_vec(),
|
||||
sandbox_policy,
|
||||
sandbox_policy_cwd,
|
||||
);
|
||||
Ok(SandboxLaunch {
|
||||
sandbox_type: SandboxType::LinuxSeccomp,
|
||||
program: exe.to_string_lossy().to_string(),
|
||||
args,
|
||||
env,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RetrySandboxContext<'a> {
|
||||
pub sub_id: &'a str,
|
||||
pub call_id: &'a str,
|
||||
pub tool_name: &'a str,
|
||||
pub otel_event_manager: &'a OtelEventManager,
|
||||
}
|
||||
|
||||
/// Sandbox placement options selected for an execution run, including whether
|
||||
/// to escalate after failures and whether approvals should persist.
|
||||
@@ -50,6 +139,53 @@ fn should_escalate_on_failure(approval: AskForApproval, sandbox: SandboxType) ->
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn request_retry_without_sandbox(
|
||||
session: &Session,
|
||||
failure_message: impl Into<String>,
|
||||
command: &[String],
|
||||
cwd: PathBuf,
|
||||
ctx: RetrySandboxContext<'_>,
|
||||
) -> Option<ReviewDecision> {
|
||||
session
|
||||
.notify_background_event(ctx.sub_id, failure_message.into())
|
||||
.await;
|
||||
|
||||
let approval_command = command.to_vec();
|
||||
let decision = session
|
||||
.request_command_approval(
|
||||
ctx.sub_id.to_string(),
|
||||
ctx.call_id.to_string(),
|
||||
approval_command.clone(),
|
||||
cwd,
|
||||
Some("command failed; retry without sandbox?".to_string()),
|
||||
)
|
||||
.await;
|
||||
|
||||
ctx.otel_event_manager.tool_decision(
|
||||
ctx.tool_name,
|
||||
ctx.call_id,
|
||||
decision,
|
||||
ToolDecisionSource::User,
|
||||
);
|
||||
|
||||
match decision {
|
||||
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
|
||||
if matches!(decision, ReviewDecision::ApprovedForSession) {
|
||||
session
|
||||
.services
|
||||
.executor
|
||||
.record_session_approval(approval_command);
|
||||
}
|
||||
|
||||
session
|
||||
.notify_background_event(ctx.sub_id, "retrying command without sandbox")
|
||||
.await;
|
||||
Some(decision)
|
||||
}
|
||||
ReviewDecision::Denied | ReviewDecision::Abort => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines how a command should be sandboxed, prompting the user when
|
||||
/// policy requires explicit approval.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -64,7 +200,7 @@ pub async fn select_sandbox(
|
||||
otel_event_manager: &OtelEventManager,
|
||||
) -> Result<SandboxDecision, ExecError> {
|
||||
match &request.mode {
|
||||
ExecutionMode::Shell => {
|
||||
ExecutionMode::Shell | ExecutionMode::InteractiveShell => {
|
||||
select_shell_sandbox(
|
||||
request,
|
||||
approval_policy,
|
||||
@@ -207,7 +343,7 @@ mod tests {
|
||||
action,
|
||||
user_explicitly_approved_this_action: true,
|
||||
};
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
|
||||
let request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
command: vec!["apply_patch".into()],
|
||||
@@ -250,7 +386,12 @@ mod tests {
|
||||
action,
|
||||
user_explicitly_approved_this_action: false,
|
||||
};
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None);
|
||||
let cfg = ExecutorConfig::new(
|
||||
SandboxPolicy::DangerFullAccess,
|
||||
std::env::temp_dir(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
command: vec!["apply_patch".into()],
|
||||
@@ -278,9 +419,8 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.expect("ok");
|
||||
// On platforms with a sandbox, DangerFullAccess still prefers it
|
||||
let expected = crate::safety::get_platform_sandbox().unwrap_or(SandboxType::None);
|
||||
assert_eq!(decision.initial_sandbox, expected);
|
||||
// DangerFullAccess bypasses sandboxing entirely.
|
||||
assert_eq!(decision.initial_sandbox, SandboxType::None);
|
||||
assert_eq!(decision.escalate_on_failure, false);
|
||||
}
|
||||
|
||||
@@ -294,7 +434,7 @@ mod tests {
|
||||
action,
|
||||
user_explicitly_approved_this_action: false,
|
||||
};
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
|
||||
let request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
command: vec!["apply_patch".into()],
|
||||
@@ -333,7 +473,12 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn select_shell_autoapprove_in_danger_mode() {
|
||||
let (session, ctx) = make_session_and_context();
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None);
|
||||
let cfg = ExecutorConfig::new(
|
||||
SandboxPolicy::DangerFullAccess,
|
||||
std::env::temp_dir(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
command: vec!["some-unknown".into()],
|
||||
@@ -369,7 +514,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn select_shell_escalates_on_failure_with_platform_sandbox() {
|
||||
let (session, ctx) = make_session_and_context();
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None);
|
||||
let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None, None);
|
||||
let request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
// Unknown command => untrusted but not flagged dangerous
|
||||
|
||||
@@ -40,7 +40,7 @@ where
|
||||
}
|
||||
|
||||
/// Converts the sandbox policy into the CLI invocation for `codex-linux-sandbox`.
|
||||
fn create_linux_sandbox_command_args(
|
||||
pub(crate) fn create_linux_sandbox_command_args(
|
||||
command: Vec<String>,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
sandbox_policy_cwd: &Path,
|
||||
|
||||
@@ -39,6 +39,7 @@ mod mcp_tool_call;
|
||||
mod message_history;
|
||||
mod model_provider_info;
|
||||
pub mod parse_command;
|
||||
mod pty;
|
||||
mod truncate;
|
||||
mod unified_exec;
|
||||
mod user_instructions;
|
||||
|
||||
130
codex-rs/core/src/pty.rs
Normal file
130
codex-rs/core/src/pty.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use portable_pty::CommandBuilder;
|
||||
use portable_pty::PtySize;
|
||||
use portable_pty::native_pty_system;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::exec_command::ExecCommandSession;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SpawnedPty {
|
||||
pub session: ExecCommandSession,
|
||||
pub output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
pub exit_rx: oneshot::Receiver<i32>,
|
||||
}
|
||||
|
||||
/// Spawn a PTY-based process and return the interactive session along with
|
||||
/// receivers for streaming output and exit status.
|
||||
pub(crate) async fn spawn_pty_process(
|
||||
program: &str,
|
||||
args: &[String],
|
||||
env: &HashMap<String, String>,
|
||||
) -> anyhow::Result<SpawnedPty> {
|
||||
if program.is_empty() {
|
||||
anyhow::bail!("missing program for PTY spawn");
|
||||
}
|
||||
|
||||
let pty_system = native_pty_system();
|
||||
let pair = pty_system.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})?;
|
||||
|
||||
let mut command_builder = CommandBuilder::new(program);
|
||||
for arg in args {
|
||||
command_builder.arg(arg.clone());
|
||||
}
|
||||
for (key, value) in env {
|
||||
command_builder.env(key.clone(), value.clone());
|
||||
}
|
||||
|
||||
let mut child = pair.slave.spawn_command(command_builder)?;
|
||||
let killer = child.clone_killer();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
|
||||
|
||||
let mut reader = pair.master.try_clone_reader()?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
continue;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let writer = pair.master.take_writer()?;
|
||||
let writer = Arc::new(StdMutex::new(writer));
|
||||
let writer_handle: JoinHandle<()> = tokio::spawn({
|
||||
let writer = writer.clone();
|
||||
async move {
|
||||
while let Some(bytes) = writer_rx.recv().await {
|
||||
let writer = writer.clone();
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
if let Ok(mut guard) = writer.lock() {
|
||||
use std::io::Write;
|
||||
let _ = guard.write_all(&bytes);
|
||||
let _ = guard.flush();
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = exit_status.clone();
|
||||
let exit_code = Arc::new(StdMutex::new(None));
|
||||
let wait_exit_code = exit_code.clone();
|
||||
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
|
||||
let code = match child.wait() {
|
||||
Ok(status) => status.exit_code() as i32,
|
||||
Err(_) => -1,
|
||||
};
|
||||
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
if let Ok(mut guard) = wait_exit_code.lock() {
|
||||
*guard = Some(code);
|
||||
}
|
||||
let _ = exit_tx.send(code);
|
||||
});
|
||||
|
||||
let (session, output_rx) = ExecCommandSession::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer,
|
||||
reader_handle,
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
exit_code,
|
||||
);
|
||||
|
||||
Ok(SpawnedPty {
|
||||
session,
|
||||
output_rx,
|
||||
exit_rx,
|
||||
})
|
||||
}
|
||||
@@ -55,23 +55,23 @@ pub fn assess_patch_safety(
|
||||
if is_write_patch_constrained_to_writable_paths(action, sandbox_policy, cwd)
|
||||
|| policy == AskForApproval::OnFailure
|
||||
{
|
||||
// Only auto‑approve when we can actually enforce a sandbox. Otherwise
|
||||
// fall back to asking the user because the patch may touch arbitrary
|
||||
// paths outside the project.
|
||||
match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
if matches!(sandbox_policy, SandboxPolicy::DangerFullAccess) {
|
||||
// DangerFullAccess is intended to bypass sandboxing entirely.
|
||||
SafetyCheck::AutoApprove {
|
||||
sandbox_type: SandboxType::None,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
None if sandbox_policy == &SandboxPolicy::DangerFullAccess => {
|
||||
// If the user has explicitly requested DangerFullAccess, then
|
||||
// we can auto-approve even without a sandbox.
|
||||
SafetyCheck::AutoApprove {
|
||||
sandbox_type: SandboxType::None,
|
||||
user_explicitly_approved: false,
|
||||
}
|
||||
}
|
||||
None => SafetyCheck::AskUser,
|
||||
} else {
|
||||
// Only auto‑approve when we can actually enforce a sandbox. Otherwise
|
||||
// fall back to asking the user because the patch may touch arbitrary
|
||||
// paths outside the project.
|
||||
match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
None => SafetyCheck::AskUser,
|
||||
}
|
||||
}
|
||||
} else if policy == AskForApproval::Never {
|
||||
SafetyCheck::Reject {
|
||||
|
||||
@@ -14,7 +14,7 @@ const MACOS_SEATBELT_BASE_POLICY: &str = include_str!("seatbelt_base_policy.sbpl
|
||||
/// to defend against an attacker trying to inject a malicious version on the
|
||||
/// PATH. If /usr/bin/sandbox-exec has been tampered with, then the attacker
|
||||
/// already has root access.
|
||||
const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec";
|
||||
pub(crate) const MACOS_PATH_TO_SEATBELT_EXECUTABLE: &str = "/usr/bin/sandbox-exec";
|
||||
|
||||
pub async fn spawn_command_under_seatbelt(
|
||||
command: Vec<String>,
|
||||
@@ -39,7 +39,7 @@ pub async fn spawn_command_under_seatbelt(
|
||||
.await
|
||||
}
|
||||
|
||||
fn create_seatbelt_command_args(
|
||||
pub(crate) fn create_seatbelt_command_args(
|
||||
command: Vec<String>,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
sandbox_policy_cwd: &Path,
|
||||
|
||||
@@ -35,7 +35,13 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
|
||||
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
|
||||
let ToolInvocation {
|
||||
session, payload, ..
|
||||
session,
|
||||
turn,
|
||||
sub_id,
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
let args = match payload {
|
||||
@@ -73,13 +79,24 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
};
|
||||
|
||||
let request = UnifiedExecRequest {
|
||||
session_id: parsed_session_id,
|
||||
input_chunks: &input,
|
||||
timeout_ms,
|
||||
};
|
||||
|
||||
let value = session
|
||||
.run_unified_exec_request(request)
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.handle_request(
|
||||
request,
|
||||
crate::unified_exec::UnifiedExecContext {
|
||||
session: &session,
|
||||
turn: turn.as_ref(),
|
||||
sub_id: &sub_id,
|
||||
call_id: &call_id,
|
||||
tool_name: &tool_name,
|
||||
session_id: parsed_session_id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}"))
|
||||
|
||||
@@ -1,22 +1,39 @@
|
||||
use crate::executor::SandboxLaunchError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum UnifiedExecError {
|
||||
#[error("Failed to create unified exec session: {pty_error}")]
|
||||
CreateSession {
|
||||
#[source]
|
||||
pty_error: anyhow::Error,
|
||||
},
|
||||
#[error("Failed to create unified exec session: {message}")]
|
||||
CreateSession { message: String },
|
||||
#[error("Unknown session id {session_id}")]
|
||||
UnknownSessionId { session_id: i32 },
|
||||
#[error("failed to write to stdin")]
|
||||
WriteToStdin,
|
||||
#[error("missing command line for unified exec request")]
|
||||
MissingCommandLine,
|
||||
#[error("missing codex-linux-sandbox executable path")]
|
||||
MissingLinuxSandboxExecutable,
|
||||
#[error("Command denied by sandbox: {message}")]
|
||||
SandboxDenied { message: String },
|
||||
}
|
||||
|
||||
impl UnifiedExecError {
|
||||
pub(crate) fn create_session(error: anyhow::Error) -> Self {
|
||||
Self::CreateSession { pty_error: error }
|
||||
pub(crate) fn create_session(message: String) -> Self {
|
||||
Self::CreateSession { message }
|
||||
}
|
||||
|
||||
pub(crate) fn sandbox_denied(message: String) -> Self {
|
||||
Self::SandboxDenied { message }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SandboxLaunchError> for UnifiedExecError {
|
||||
fn from(err: SandboxLaunchError) -> Self {
|
||||
match err {
|
||||
SandboxLaunchError::MissingCommandLine => UnifiedExecError::MissingCommandLine,
|
||||
SandboxLaunchError::MissingLinuxSandboxExecutable => {
|
||||
UnifiedExecError::MissingLinuxSandboxExecutable
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,23 +1,61 @@
|
||||
use portable_pty::CommandBuilder;
|
||||
use portable_pty::PtySize;
|
||||
use portable_pty::native_pty_system;
|
||||
//! Unified Exec: interactive PTY execution with session management.
|
||||
//!
|
||||
//! Purpose and responsibilities
|
||||
//! - Manages interactive PTY sessions (create, reuse, buffer output with caps).
|
||||
//! - Delegates sandbox selection, approvals, and policy decisions to the
|
||||
//! executor (single source of truth) via `prepare_execution_plan`.
|
||||
//! - Spawns the PTY using the `SandboxLaunch` produced by the plan and reuses
|
||||
//! `ExecutionPlan::attempt_with_retry_if` to optionally retry without a
|
||||
//! sandbox when policy allows and the user approves.
|
||||
//! - After process exit, classifies sandbox denials using the shared
|
||||
//! `is_likely_sandbox_denied` heuristic so denial messages stay consistent.
|
||||
//!
|
||||
//! Why not call `executor.run`?
|
||||
//! `executor.run` drives a non‑PTY (piped) execution flow end‑to‑end. Unified
|
||||
//! Exec needs an interactive PTY that persists across requests and supports
|
||||
//! streaming I/O. To keep policy logic centralized while still owning the PTY
|
||||
//! lifecycle, Unified Exec builds an `ExecutionRequest` with
|
||||
//! `ExecutionMode::InteractiveShell`, asks the executor for an
|
||||
//! `ExecutionPlan`, then performs the PTY spawn itself with the plan’s
|
||||
//! sandboxed command and environment.
|
||||
//!
|
||||
//! Handoff at a glance
|
||||
//! 1) Build `ExecutionRequest` (interactive shell).
|
||||
//! 2) `executor.update_environment(turn.sandbox_policy, turn.cwd)`.
|
||||
//! 3) `plan = executor.prepare_execution_plan(request, …)`.
|
||||
//! 4) `plan.attempt_with_retry_if(|launch| spawn_pty_process(launch), retry_on_denied)`.
|
||||
//! 5) Buffer+stream output, apply timeouts, and return `UnifiedExecResult`.
|
||||
//!
|
||||
//! This structure ensures Unified Exec benefits from the same approval,
|
||||
//! sandbox selection, and escalation rules as regular exec, while keeping the
|
||||
//! PTY/session concerns isolated here.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::ExecParams;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec::is_likely_sandbox_denied;
|
||||
use crate::exec_command::ExecCommandSession;
|
||||
use crate::executor::ExecutionMode;
|
||||
use crate::executor::ExecutionPlan;
|
||||
use crate::executor::ExecutionRequest;
|
||||
use crate::pty::SpawnedPty;
|
||||
use crate::tools::context::ExecCommandContext;
|
||||
use crate::truncate::truncate_middle;
|
||||
|
||||
mod errors;
|
||||
@@ -28,9 +66,17 @@ const DEFAULT_TIMEOUT_MS: u64 = 1_000;
|
||||
const MAX_TIMEOUT_MS: u64 = 60_000;
|
||||
const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub(crate) struct UnifiedExecContext<'a> {
|
||||
pub session: &'a Session,
|
||||
pub turn: &'a TurnContext,
|
||||
pub sub_id: &'a str,
|
||||
pub call_id: &'a str,
|
||||
pub tool_name: &'a str,
|
||||
pub session_id: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnifiedExecRequest<'a> {
|
||||
pub session_id: Option<i32>,
|
||||
pub input_chunks: &'a [String],
|
||||
pub timeout_ms: Option<u64>,
|
||||
}
|
||||
@@ -44,17 +90,19 @@ pub(crate) struct UnifiedExecResult {
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct UnifiedExecSessionManager {
|
||||
next_session_id: AtomicI32,
|
||||
sessions: Mutex<HashMap<i32, ManagedUnifiedExecSession>>,
|
||||
sessions: Mutex<HashMap<i32, UnifiedExecSession>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ManagedUnifiedExecSession {
|
||||
/// Wraps a PTY session with buffered output and sandbox metadata for unified exec.
|
||||
struct UnifiedExecSession {
|
||||
session: ExecCommandSession,
|
||||
output_buffer: OutputBuffer,
|
||||
/// Notifies waiters whenever new output has been appended to
|
||||
/// `output_buffer`, allowing clients to poll for fresh data.
|
||||
output_notify: Arc<Notify>,
|
||||
output_task: JoinHandle<()>,
|
||||
sandbox_type: SandboxType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -94,15 +142,20 @@ impl OutputBufferState {
|
||||
self.total_bytes = 0;
|
||||
drained
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> Vec<Vec<u8>> {
|
||||
self.chunks.iter().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
type OutputBuffer = Arc<Mutex<OutputBufferState>>;
|
||||
type OutputHandles = (OutputBuffer, Arc<Notify>);
|
||||
|
||||
impl ManagedUnifiedExecSession {
|
||||
impl UnifiedExecSession {
|
||||
fn new(
|
||||
session: ExecCommandSession,
|
||||
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
sandbox_type: SandboxType,
|
||||
) -> Self {
|
||||
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
@@ -134,6 +187,7 @@ impl ManagedUnifiedExecSession {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_task,
|
||||
sandbox_type,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,18 +205,158 @@ impl ManagedUnifiedExecSession {
|
||||
fn has_exited(&self) -> bool {
|
||||
self.session.has_exited()
|
||||
}
|
||||
|
||||
fn exit_code(&self) -> Option<i32> {
|
||||
self.session.exit_code()
|
||||
}
|
||||
|
||||
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
|
||||
let guard = self.output_buffer.lock().await;
|
||||
guard.snapshot()
|
||||
}
|
||||
|
||||
fn sandbox_type(&self) -> SandboxType {
|
||||
self.sandbox_type
|
||||
}
|
||||
|
||||
async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
|
||||
if self.sandbox_type() == SandboxType::None || !self.has_exited() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Give the reader task a brief moment to flush any final PTY bytes after exit.
|
||||
let _ =
|
||||
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
|
||||
|
||||
let collected_chunks = self.snapshot_output().await;
|
||||
let mut aggregated: Vec<u8> = Vec::new();
|
||||
for chunk in collected_chunks {
|
||||
aggregated.extend_from_slice(&chunk);
|
||||
}
|
||||
let aggregated_text = String::from_utf8_lossy(&aggregated).to_string();
|
||||
let exit_code = self.exit_code().unwrap_or(-1);
|
||||
|
||||
let exec_output = ExecToolCallOutput {
|
||||
exit_code,
|
||||
stdout: StreamOutput::new(aggregated_text.clone()),
|
||||
stderr: StreamOutput::new(String::new()),
|
||||
aggregated_output: StreamOutput::new(aggregated_text.clone()),
|
||||
duration: Duration::ZERO,
|
||||
timed_out: false,
|
||||
};
|
||||
|
||||
if is_likely_sandbox_denied(self.sandbox_type(), &exec_output) {
|
||||
let (snippet, _) = truncate_middle(&aggregated_text, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
|
||||
let message = if snippet.is_empty() {
|
||||
format!("exit code {exit_code}")
|
||||
} else {
|
||||
snippet
|
||||
};
|
||||
return Err(UnifiedExecError::sandbox_denied(message));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_spawned(
|
||||
spawned: SpawnedPty,
|
||||
sandbox_type: SandboxType,
|
||||
) -> Result<Self, UnifiedExecError> {
|
||||
let SpawnedPty {
|
||||
session,
|
||||
output_rx,
|
||||
mut exit_rx,
|
||||
} = spawned;
|
||||
let managed = Self::new(session, output_rx, sandbox_type);
|
||||
|
||||
let exit_ready = match exit_rx.try_recv() {
|
||||
Ok(_) | Err(TryRecvError::Closed) => true,
|
||||
Err(TryRecvError::Empty) => false,
|
||||
};
|
||||
|
||||
if exit_ready {
|
||||
managed.check_for_sandbox_denial().await?;
|
||||
}
|
||||
|
||||
Ok(managed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ManagedUnifiedExecSession {
|
||||
impl Drop for UnifiedExecSession {
|
||||
fn drop(&mut self) {
|
||||
self.output_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl UnifiedExecSessionManager {
|
||||
async fn open_session_with_sandbox(
|
||||
&self,
|
||||
command: Vec<String>,
|
||||
context: &UnifiedExecContext<'_>,
|
||||
) -> Result<UnifiedExecSession, UnifiedExecError> {
|
||||
let executor = &context.session.services.executor;
|
||||
let otel_event_manager = context.turn.client.get_otel_event_manager();
|
||||
let approval_command = command.clone();
|
||||
let exec_context = ExecCommandContext {
|
||||
sub_id: context.sub_id.to_string(),
|
||||
call_id: context.call_id.to_string(),
|
||||
command_for_display: approval_command.clone(),
|
||||
cwd: context.turn.cwd.clone(),
|
||||
apply_patch: None,
|
||||
tool_name: context.tool_name.to_string(),
|
||||
otel_event_manager,
|
||||
};
|
||||
|
||||
let execution_request = ExecutionRequest {
|
||||
params: ExecParams {
|
||||
command,
|
||||
cwd: context.turn.cwd.clone(),
|
||||
timeout_ms: None,
|
||||
env: HashMap::new(),
|
||||
with_escalated_permissions: None,
|
||||
justification: None,
|
||||
},
|
||||
approval_command,
|
||||
mode: ExecutionMode::InteractiveShell,
|
||||
stdout_stream: None,
|
||||
use_shell_profile: false,
|
||||
};
|
||||
|
||||
// Ensure the executor's environment reflects this turn before planning
|
||||
executor.update_environment(
|
||||
context.turn.sandbox_policy.clone(),
|
||||
context.turn.cwd.clone(),
|
||||
);
|
||||
|
||||
let plan: ExecutionPlan = executor
|
||||
.prepare_execution_plan(
|
||||
execution_request,
|
||||
context.session,
|
||||
context.turn.approval_policy,
|
||||
&exec_context,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
|
||||
|
||||
plan.attempt_with_retry_if(
|
||||
context.session,
|
||||
|launch| async move {
|
||||
let sandbox_type = launch.sandbox_type;
|
||||
let spawned =
|
||||
crate::pty::spawn_pty_process(&launch.program, &launch.args, &launch.env)
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
|
||||
UnifiedExecSession::from_spawned(spawned, sandbox_type).await
|
||||
},
|
||||
|err: &UnifiedExecError| matches!(err, UnifiedExecError::SandboxDenied { .. }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn handle_request(
|
||||
&self,
|
||||
request: UnifiedExecRequest<'_>,
|
||||
context: UnifiedExecContext<'_>,
|
||||
) -> Result<UnifiedExecResult, UnifiedExecError> {
|
||||
let (timeout_ms, timeout_warning) = match request.timeout_ms {
|
||||
Some(requested) if requested > MAX_TIMEOUT_MS => (
|
||||
@@ -175,13 +369,13 @@ impl UnifiedExecSessionManager {
|
||||
None => (DEFAULT_TIMEOUT_MS, None),
|
||||
};
|
||||
|
||||
let mut new_session: Option<ManagedUnifiedExecSession> = None;
|
||||
let mut new_session: Option<UnifiedExecSession> = None;
|
||||
let session_id;
|
||||
let writer_tx;
|
||||
let output_buffer;
|
||||
let output_notify;
|
||||
|
||||
if let Some(existing_id) = request.session_id {
|
||||
if let Some(existing_id) = context.session_id {
|
||||
let mut sessions = self.sessions.lock().await;
|
||||
match sessions.get(&existing_id) {
|
||||
Some(session) => {
|
||||
@@ -207,8 +401,7 @@ impl UnifiedExecSessionManager {
|
||||
} else {
|
||||
let command = request.input_chunks.to_vec();
|
||||
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
|
||||
let (session, initial_output_rx) = create_unified_exec_session(&command).await?;
|
||||
let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx);
|
||||
let managed_session = self.open_session_with_sandbox(command, &context).await?;
|
||||
let (buffer, notify) = managed_session.output_handles();
|
||||
writer_tx = managed_session.writer_sender();
|
||||
output_buffer = buffer;
|
||||
@@ -217,11 +410,35 @@ impl UnifiedExecSessionManager {
|
||||
new_session = Some(managed_session);
|
||||
};
|
||||
|
||||
if request.session_id.is_some() {
|
||||
let joined_input = request.input_chunks.join(" ");
|
||||
if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err()
|
||||
{
|
||||
return Err(UnifiedExecError::WriteToStdin);
|
||||
if context.session_id.is_some() {
|
||||
let mut trailing_whitespace = true;
|
||||
for chunk in request.input_chunks {
|
||||
if chunk.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let leading_whitespace = chunk
|
||||
.chars()
|
||||
.next()
|
||||
.map(char::is_whitespace)
|
||||
.unwrap_or(true);
|
||||
|
||||
if !trailing_whitespace
|
||||
&& !leading_whitespace
|
||||
&& writer_tx.send(vec![b' ']).await.is_err()
|
||||
{
|
||||
return Err(UnifiedExecError::WriteToStdin);
|
||||
}
|
||||
|
||||
if writer_tx.send(chunk.as_bytes().to_vec()).await.is_err() {
|
||||
return Err(UnifiedExecError::WriteToStdin);
|
||||
}
|
||||
|
||||
trailing_whitespace = chunk
|
||||
.chars()
|
||||
.next_back()
|
||||
.map(char::is_whitespace)
|
||||
.unwrap_or(trailing_whitespace);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,7 +493,7 @@ impl UnifiedExecSessionManager {
|
||||
|
||||
let should_store_session = if let Some(session) = new_session.as_ref() {
|
||||
!session.has_exited()
|
||||
} else if request.session_id.is_some() {
|
||||
} else if context.session_id.is_some() {
|
||||
let mut sessions = self.sessions.lock().await;
|
||||
if let Some(existing) = sessions.get(&session_id) {
|
||||
if existing.has_exited() {
|
||||
@@ -309,114 +526,55 @@ impl UnifiedExecSessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_unified_exec_session(
|
||||
command: &[String],
|
||||
) -> Result<
|
||||
(
|
||||
ExecCommandSession,
|
||||
tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
),
|
||||
UnifiedExecError,
|
||||
> {
|
||||
if command.is_empty() {
|
||||
return Err(UnifiedExecError::MissingCommandLine);
|
||||
}
|
||||
|
||||
let pty_system = native_pty_system();
|
||||
|
||||
let pair = pty_system
|
||||
.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})
|
||||
.map_err(UnifiedExecError::create_session)?;
|
||||
|
||||
// Safe thanks to the check at the top of the function.
|
||||
let mut command_builder = CommandBuilder::new(command[0].clone());
|
||||
for arg in &command[1..] {
|
||||
command_builder.arg(arg);
|
||||
}
|
||||
|
||||
let mut child = pair
|
||||
.slave
|
||||
.spawn_command(command_builder)
|
||||
.map_err(UnifiedExecError::create_session)?;
|
||||
let killer = child.clone_killer();
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
|
||||
|
||||
let mut reader = pair
|
||||
.master
|
||||
.try_clone_reader()
|
||||
.map_err(UnifiedExecError::create_session)?;
|
||||
let output_tx_clone = output_tx.clone();
|
||||
let reader_handle = tokio::task::spawn_blocking(move || {
|
||||
let mut buf = [0u8; 8192];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let _ = output_tx_clone.send(buf[..n].to_vec());
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
continue;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let writer = pair
|
||||
.master
|
||||
.take_writer()
|
||||
.map_err(UnifiedExecError::create_session)?;
|
||||
let writer = Arc::new(StdMutex::new(writer));
|
||||
let writer_handle = tokio::spawn({
|
||||
let writer = writer.clone();
|
||||
async move {
|
||||
while let Some(bytes) = writer_rx.recv().await {
|
||||
let writer = writer.clone();
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
if let Ok(mut guard) = writer.lock() {
|
||||
use std::io::Write;
|
||||
let _ = guard.write_all(&bytes);
|
||||
let _ = guard.flush();
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let exit_status = Arc::new(AtomicBool::new(false));
|
||||
let wait_exit_status = Arc::clone(&exit_status);
|
||||
let wait_handle = tokio::task::spawn_blocking(move || {
|
||||
let _ = child.wait();
|
||||
wait_exit_status.store(true, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
let (session, initial_output_rx) = ExecCommandSession::new(
|
||||
writer_tx,
|
||||
output_tx,
|
||||
killer,
|
||||
reader_handle,
|
||||
writer_handle,
|
||||
wait_handle,
|
||||
exit_status,
|
||||
);
|
||||
Ok((session, initial_output_rx))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(unix)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[cfg(unix)]
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use core_test_support::skip_if_sandbox;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
|
||||
let (session, mut turn) = make_session_and_context();
|
||||
turn.approval_policy = AskForApproval::Never;
|
||||
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
(Arc::new(session), Arc::new(turn))
|
||||
}
|
||||
|
||||
async fn run_unified_exec_request(
|
||||
session: &Arc<Session>,
|
||||
turn: &Arc<TurnContext>,
|
||||
session_id: Option<i32>,
|
||||
input: Vec<String>,
|
||||
timeout_ms: Option<u64>,
|
||||
) -> Result<UnifiedExecResult, UnifiedExecError> {
|
||||
let request_input = input;
|
||||
let request = UnifiedExecRequest {
|
||||
input_chunks: &request_input,
|
||||
timeout_ms,
|
||||
};
|
||||
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.handle_request(
|
||||
request,
|
||||
UnifiedExecContext {
|
||||
session,
|
||||
turn: turn.as_ref(),
|
||||
sub_id: "sub",
|
||||
call_id: "call",
|
||||
tool_name: "unified_exec",
|
||||
session_id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn push_chunk_trims_only_excess_bytes() {
|
||||
@@ -435,158 +593,160 @@ mod tests {
|
||||
assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'b']);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_persists_across_requests_jif() -> Result<(), UnifiedExecError> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let (session, turn) = test_session_and_turn();
|
||||
|
||||
let open_shell = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["bash".to_string(), "-i".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let open_shell = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["bash".to_string(), "-i".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
let session_id = open_shell.session_id.expect("expected session_id");
|
||||
|
||||
manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &[
|
||||
"export".to_string(),
|
||||
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
|
||||
],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_id),
|
||||
vec![
|
||||
"export".to_string(),
|
||||
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
|
||||
],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_2 = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let out_2 = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_id),
|
||||
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
assert!(out_2.output.contains("codex"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let (session, turn) = test_session_and_turn();
|
||||
|
||||
let shell_a = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let shell_a = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["/bin/bash".to_string(), "-i".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
let session_a = shell_a.session_id.expect("expected session id");
|
||||
|
||||
manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_a),
|
||||
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_a),
|
||||
vec!["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_2 = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &[
|
||||
"echo".to_string(),
|
||||
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
|
||||
],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let out_2 = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec![
|
||||
"echo".to_string(),
|
||||
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
|
||||
],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
assert!(!out_2.output.contains("codex"));
|
||||
|
||||
let out_3 = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_a),
|
||||
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let out_3 = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_a),
|
||||
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
assert!(out_3.output.contains("codex"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let (session, turn) = test_session_and_turn();
|
||||
|
||||
let open_shell = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["bash".to_string(), "-i".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let open_shell = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["bash".to_string(), "-i".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
let session_id = open_shell.session_id.expect("expected session id");
|
||||
|
||||
manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &[
|
||||
"export".to_string(),
|
||||
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
|
||||
],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_id),
|
||||
vec![
|
||||
"export".to_string(),
|
||||
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
|
||||
],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_2 = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
timeout_ms: Some(10),
|
||||
})
|
||||
.await?;
|
||||
let out_2 = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_id),
|
||||
vec!["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
|
||||
Some(10),
|
||||
)
|
||||
.await?;
|
||||
assert!(!out_2.output.contains("codex"));
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(7)).await;
|
||||
|
||||
let empty = Vec::new();
|
||||
let out_3 = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &empty,
|
||||
timeout_ms: Some(100),
|
||||
})
|
||||
.await?;
|
||||
let out_3 =
|
||||
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
|
||||
.await?;
|
||||
|
||||
assert!(out_3.output.contains("codex"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
#[ignore] // Ignored while we have a better way to test this.
|
||||
async fn requests_with_large_timeout_are_capped() -> Result<(), UnifiedExecError> {
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let (session, turn) = test_session_and_turn();
|
||||
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["echo".to_string(), "codex".to_string()],
|
||||
timeout_ms: Some(120_000),
|
||||
})
|
||||
.await?;
|
||||
let result = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["echo".to_string(), "codex".to_string()],
|
||||
Some(120_000),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(result.output.starts_with(
|
||||
"Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n"
|
||||
@@ -596,61 +756,66 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
#[ignore] // Ignored while we have a better way to test this.
|
||||
async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> {
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let result = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["/bin/echo".to_string(), "codex".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let (session, turn) = test_session_and_turn();
|
||||
let result = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["/bin/echo".to_string(), "codex".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(result.session_id.is_none());
|
||||
assert!(result.output.contains("codex"));
|
||||
|
||||
assert!(manager.sessions.lock().await.is_empty());
|
||||
assert!(
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.is_empty()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn reusing_completed_session_returns_unknown_session() -> Result<(), UnifiedExecError> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let manager = UnifiedExecSessionManager::default();
|
||||
let (session, turn) = test_session_and_turn();
|
||||
|
||||
let open_shell = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: None,
|
||||
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
let open_shell = run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
None,
|
||||
vec!["/bin/bash".to_string(), "-i".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
let session_id = open_shell.session_id.expect("expected session id");
|
||||
|
||||
manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &["exit\n".to_string()],
|
||||
timeout_ms: Some(2_500),
|
||||
})
|
||||
.await?;
|
||||
run_unified_exec_request(
|
||||
&session,
|
||||
&turn,
|
||||
Some(session_id),
|
||||
vec!["exit\n".to_string()],
|
||||
Some(2_500),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let err = manager
|
||||
.handle_request(UnifiedExecRequest {
|
||||
session_id: Some(session_id),
|
||||
input_chunks: &[],
|
||||
timeout_ms: Some(100),
|
||||
})
|
||||
.await
|
||||
.expect_err("expected unknown session error");
|
||||
let err =
|
||||
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
|
||||
.await
|
||||
.expect_err("expected unknown session error");
|
||||
|
||||
match err {
|
||||
UnifiedExecError::UnknownSessionId { session_id: err_id } => {
|
||||
@@ -659,7 +824,15 @@ mod tests {
|
||||
other => panic!("expected UnknownSessionId, got {other:?}"),
|
||||
}
|
||||
|
||||
assert!(!manager.sessions.lock().await.contains_key(&session_id));
|
||||
assert!(
|
||||
!session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.contains_key(&session_id)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user