Compare commits

...

2 Commits

Author SHA1 Message Date
starr-openai
0ee08bc702 refactor(core): add explicit exec-server unified exec runtime path
Co-authored-by: Codex <noreply@openai.com>
2026-03-21 00:58:05 +00:00
starr-openai
04190334dc Route unified exec through ProcessBackend abstraction
Co-authored-by: Codex <noreply@openai.com>
2026-03-21 00:40:35 +00:00
4 changed files with 608 additions and 70 deletions

View File

@@ -34,7 +34,7 @@ use crate::tools::sandboxing::with_cached_approval;
use crate::tools::spec::UnifiedExecShellMode;
use crate::unified_exec::NoopSpawnLifecycle;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecProcess;
use crate::unified_exec::ProcessBackend;
use crate::unified_exec::UnifiedExecProcessManager;
use codex_network_proxy::NetworkProxy;
use codex_protocol::models::PermissionProfile;
@@ -53,6 +53,8 @@ pub struct UnifiedExecRequest {
pub tty: bool,
pub sandbox_permissions: SandboxPermissions,
pub additional_permissions: Option<PermissionProfile>,
pub process_id: i32,
pub use_exec_server: bool,
#[cfg(unix)]
pub additional_permissions_preapproved: bool,
pub justification: Option<String>,
@@ -71,6 +73,7 @@ pub struct UnifiedExecApprovalKey {
pub struct UnifiedExecRuntime<'a> {
manager: &'a UnifiedExecProcessManager,
shell_mode: UnifiedExecShellMode,
use_exec_server: bool,
}
impl<'a> UnifiedExecRuntime<'a> {
@@ -78,6 +81,18 @@ impl<'a> UnifiedExecRuntime<'a> {
Self {
manager,
shell_mode,
use_exec_server: false,
}
}
pub fn with_exec_server(
manager: &'a UnifiedExecProcessManager,
shell_mode: UnifiedExecShellMode,
) -> Self {
Self {
manager,
shell_mode,
use_exec_server: true,
}
}
}
@@ -172,7 +187,7 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
}
}
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRuntime<'a> {
impl<'a> ToolRuntime<UnifiedExecRequest, Arc<ProcessBackend>> for UnifiedExecRuntime<'a> {
fn network_approval_spec(
&self,
req: &UnifiedExecRequest,
@@ -190,7 +205,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
req: &UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
) -> Result<UnifiedExecProcess, ToolError> {
) -> Result<Arc<ProcessBackend>, ToolError> {
let use_exec_server = if self.use_exec_server {
true
} else {
req.use_exec_server
};
let base_command = &req.command;
let session_shell = ctx.session.user_shell();
let command = maybe_wrap_shell_lc_with_snapshot(
@@ -237,7 +258,10 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.manager
.open_session_with_exec_env(
&prepared.exec_request,
req.process_id,
req.tty,
use_exec_server,
Some(ctx.session.services.environment.get_executor()),
prepared.spawn_lifecycle,
)
.await
@@ -272,7 +296,18 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
self.manager
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
.open_session_with_exec_env(
&exec_env,
req.process_id,
req.tty,
use_exec_server,
if use_exec_server {
Some(ctx.session.services.environment.get_executor())
} else {
None
},
Box::new(NoopSpawnLifecycle),
)
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {

View File

@@ -18,10 +18,12 @@ use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecCommandSource;
use crate::protocol::ExecOutputStream;
use crate::unified_exec::process::OutputHandles;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventStage;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::ProcessBackend;
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
@@ -37,21 +39,81 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
/// shared transcript, and emits ExecCommandOutputDelta events on UTF8
/// boundaries.
pub(crate) fn start_streaming_output(
process: &UnifiedExecProcess,
process: Arc<ProcessBackend>,
context: &UnifiedExecContext,
transcript: Arc<Mutex<HeadTailBuffer>>,
) {
let mut receiver = process.output_receiver();
let output_drained = process.output_drained_notify();
let exit_token = process.cancellation_token();
let session_ref = Arc::clone(&context.session);
let turn_ref = Arc::clone(&context.turn);
let call_id = context.call_id.clone();
let OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = process.output_handles();
if let Some(local_process) = process.as_local_process() {
let mut receiver = local_process.output_receiver();
let output_drained = local_process.output_drained_notify();
tokio::spawn(async move {
use tokio::sync::broadcast::error::RecvError;
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
loop {
tokio::select! {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
}
_ = async {
if let Some(sleep) = grace_sleep.as_mut() {
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
output_drained.notify_one();
break;
}
received = receiver.recv() => {
let chunk = match received {
Ok(chunk) => chunk,
Err(RecvError::Lagged(_)) => {
continue;
},
Err(RecvError::Closed) => {
output_drained.notify_one();
break;
}
};
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
}
}
}
});
return;
}
let output_drained = process
.output_drained()
.unwrap_or_else(|| Arc::new(Notify::new()));
tokio::spawn(async move {
use tokio::sync::broadcast::error::RecvError;
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
@@ -59,7 +121,7 @@ pub(crate) fn start_streaming_output(
loop {
tokio::select! {
_ = exit_token.cancelled(), if grace_sleep.is_none() => {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
}
@@ -73,27 +135,35 @@ pub(crate) fn start_streaming_output(
break;
}
received = receiver.recv() => {
let chunk = match received {
Ok(chunk) => chunk,
Err(RecvError::Lagged(_)) => {
continue;
},
Err(RecvError::Closed) => {
_ = output_notify.notified() => {
let drained_chunks = {
let mut guard = output_buffer.lock().await;
guard.drain_chunks()
};
if drained_chunks.is_empty() {
if cancellation_token.is_cancelled() && output_closed.load(std::sync::atomic::Ordering::Acquire) {
output_drained.notify_one();
break;
}
};
continue;
}
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
for chunk in drained_chunks {
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
}
continue;
}
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
grace_sleep.replace(Box::pin(tokio::time::sleep_until(Instant::now() + TRAILING_OUTPUT_GRACE)));
}
}
}
@@ -104,7 +174,7 @@ pub(crate) fn start_streaming_output(
/// single ExecCommandEnd event with the aggregated transcript.
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_exit_watcher(
process: Arc<UnifiedExecProcess>,
process: Arc<ProcessBackend>,
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
@@ -114,8 +184,12 @@ pub(crate) fn spawn_exit_watcher(
transcript: Arc<Mutex<HeadTailBuffer>>,
started_at: Instant,
) {
let exit_token = process.cancellation_token();
let output_drained = process.output_drained_notify();
let Some(exit_token) = process.cancellation_token() else {
return;
};
let Some(output_drained) = process.output_drained() else {
return;
};
tokio::spawn(async move {
exit_token.cancelled().await;

View File

@@ -25,17 +25,30 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::Weak;
use std::sync::atomic::Ordering;
use codex_exec_server::process::ExecProcess;
use codex_network_proxy::NetworkProxy;
use codex_protocol::models::PermissionProfile;
use rand::Rng;
use rand::rng;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::sandboxing::SandboxPermissions;
use crate::exec::is_likely_sandbox_denied;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StreamOutput;
use crate::truncate::TruncationPolicy;
use crate::truncate::formatted_truncate_text;
mod async_watcher;
mod errors;
@@ -142,7 +155,7 @@ impl Default for UnifiedExecProcessManager {
}
struct ProcessEntry {
process: Arc<UnifiedExecProcess>,
backend: ProcessBackend,
call_id: String,
process_id: i32,
command: Vec<String>,
@@ -152,6 +165,240 @@ struct ProcessEntry {
last_used: tokio::time::Instant,
}
#[derive(Debug, Clone)]
pub(crate) enum ProcessBackend {
Local {
process: Arc<UnifiedExecProcess>,
},
ExecServer {
process_id: String,
executor: Arc<dyn ExecProcess>,
output_buffer: crate::unified_exec::process::OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
output_drained: Arc<Notify>,
cancellation_token: CancellationToken,
exit_code: Arc<RwLock<Option<i32>>>,
has_exited: Arc<AtomicBool>,
sandbox_type: SandboxType,
output_seq: Arc<AtomicU64>,
},
}
impl ProcessBackend {
pub(crate) fn is_local(&self) -> bool {
matches!(self, Self::Local { .. })
}
pub(crate) fn as_local_process(&self) -> Option<&Arc<UnifiedExecProcess>> {
match self {
Self::Local { process } => Some(process),
Self::ExecServer { .. } => None,
}
}
pub(crate) fn output_handles(
&self,
) -> (
crate::unified_exec::process::OutputBuffer,
Arc<Notify>,
Arc<AtomicBool>,
Arc<Notify>,
CancellationToken,
) {
match self {
Self::Local { process } => {
let handles = process.output_handles();
(
handles.output_buffer,
handles.output_notify,
handles.output_closed,
handles.output_closed_notify,
handles.cancellation_token,
)
}
Self::ExecServer {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
..
} => (
Arc::clone(output_buffer),
Arc::clone(output_notify),
Arc::clone(output_closed),
Arc::clone(output_closed_notify),
cancellation_token.clone(),
),
}
}
pub(crate) async fn check_for_sandbox_denial_with_text(
&self,
text: &str,
) -> Result<(), UnifiedExecError> {
let sandbox_type = self.sandbox_type();
if sandbox_type == SandboxType::None || !self.has_exited() {
return Ok(());
}
match self {
Self::Local { process } => {
process.check_for_sandbox_denial_with_text(text).await
}
Self::ExecServer { .. } => {
let exit_code = self.exit_code().unwrap_or(-1);
let exec_output = ExecToolCallOutput {
exit_code,
stderr: StreamOutput::new(text.to_string()),
aggregated_output: StreamOutput::new(text.to_string()),
..Default::default()
};
if is_likely_sandbox_denied(sandbox_type, &exec_output) {
let snippet = formatted_truncate_text(
text,
TruncationPolicy::Tokens(UNIFIED_EXEC_OUTPUT_MAX_TOKENS),
);
let message = if snippet.is_empty() {
format!("Process exited with code {exit_code}")
} else {
snippet
};
return Err(UnifiedExecError::sandbox_denied(message, exec_output));
}
Ok(())
}
}
}
fn exit_code(&self) -> Option<i32> {
match self {
Self::Local { process } => process.exit_code(),
Self::ExecServer { exit_code, .. } => *exit_code.read().unwrap_or_else(|err| err.into_inner()),
}
}
pub(crate) fn set_exit_code(&self, exit_code: i32) {
if let Self::ExecServer { exit_code: state, .. } = self {
let mut guard = state.write().unwrap_or_else(|err| err.into_inner());
*guard = Some(exit_code);
}
}
pub(crate) fn mark_exited(&self) {
if let Self::ExecServer { has_exited, .. } = self {
has_exited.store(true, Ordering::Release);
}
}
pub(crate) fn has_exited(&self) -> bool {
match self {
Self::Local { process } => process.has_exited(),
Self::ExecServer { has_exited, .. } => has_exited.load(Ordering::Acquire),
}
}
pub(crate) fn cancellation_token(&self) -> Option<CancellationToken> {
match self {
Self::Local { process } => Some(process.cancellation_token()),
Self::ExecServer {
cancellation_token, ..
} => Some(cancellation_token.clone()),
}
}
pub(crate) fn output_drained(&self) -> Option<Arc<Notify>> {
match self {
Self::Local { process } => Some(process.output_drained_notify()),
Self::ExecServer { output_drained, .. } => Some(Arc::clone(output_drained)),
}
}
pub(crate) fn exit_code_handle(&self) -> Option<Arc<RwLock<Option<i32>>> {
match self {
Self::ExecServer { exit_code, .. } => Some(Arc::clone(exit_code)),
Self::Local { .. } => None,
}
}
pub(crate) fn remote_output_seq(&self) -> Option<Arc<AtomicU64>> {
match self {
Self::ExecServer { output_seq, .. } => Some(Arc::clone(output_seq)),
Self::Local { .. } => None,
}
}
pub(crate) async fn write_stdin(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
match self {
Self::Local { process } => process
.writer_sender()
.send(data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)?,
Self::ExecServer {
process_id,
executor,
..
} => {
let response = executor
.write(process_id.as_str(), data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)?;
if !response.accepted {
return Err(UnifiedExecError::WriteToStdin);
}
}
}
Ok(())
}
pub(crate) async fn terminate(&self) {
match self {
Self::Local { process } => process.terminate(),
Self::ExecServer {
process_id,
executor,
output_closed,
output_closed_notify,
output_drained,
cancellation_token,
has_exited,
..
} => {
has_exited.store(true, Ordering::Release);
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
output_drained.notify_one();
cancellation_token.cancel();
let _ = executor.terminate(process_id.as_str()).await;
}
}
}
pub(crate) fn sandbox_type(&self) -> SandboxType {
match self {
Self::Local { process } => process.sandbox_type(),
Self::ExecServer { sandbox_type, .. } => *sandbox_type,
}
}
pub(crate) fn remote_exec_state(&self) -> Option<(&str, &Arc<dyn ExecProcess>)> {
match self {
Self::ExecServer {
process_id,
executor,
..
} => Some((process_id.as_str(), executor)),
Self::Local { .. } => None,
}
}
}
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
yield_time_ms.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS)
}

View File

@@ -4,14 +4,18 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::atomic::AtomicU64;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use codex_exec_server::process::ExecProcess;
use codex_exec_server::protocol::ExecParams;
use codex_exec_server::protocol::ReadParams;
use crate::exec_env::create_env;
use crate::exec_policy::ExecApprovalRequest;
@@ -34,6 +38,7 @@ use crate::unified_exec::MAX_YIELD_TIME_MS;
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
use crate::unified_exec::MIN_YIELD_TIME_MS;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::ProcessBackend;
use crate::unified_exec::ProcessStore;
use crate::unified_exec::UnifiedExecContext;
use crate::unified_exec::UnifiedExecError;
@@ -49,7 +54,6 @@ use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::SpawnLifecycleHandle;
use crate::unified_exec::process::UnifiedExecProcess;
const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
("NO_COLOR", "1"),
@@ -90,7 +94,7 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
}
struct PreparedProcessHandles {
writer_tx: mpsc::Sender<Vec<u8>>,
process: Arc<ProcessBackend>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
@@ -166,9 +170,7 @@ impl UnifiedExecProcessManager {
.await;
let (process, mut deferred_network_approval) = match process {
Ok((process, deferred_network_approval)) => {
(Arc::new(process), deferred_network_approval)
}
Ok((process, deferred_network_approval)) => (process, deferred_network_approval),
Err(err) => {
self.release_process_id(request.process_id).await;
return Err(err);
@@ -190,7 +192,7 @@ impl UnifiedExecProcessManager {
);
emitter.emit(event_ctx, ToolEventStage::Begin).await;
start_streaming_output(&process, context, Arc::clone(&transcript));
start_streaming_output(process.clone(), context, Arc::clone(&transcript));
let start = Instant::now();
// Persist live sessions before the initial yield wait so interrupting the
// turn cannot drop the last Arc and terminate the background process.
@@ -312,7 +314,6 @@ impl UnifiedExecProcessManager {
let process_id = request.process_id;
let PreparedProcessHandles {
writer_tx,
output_buffer,
output_notify,
output_closed,
@@ -329,7 +330,7 @@ impl UnifiedExecProcessManager {
if !tty {
return Err(UnifiedExecError::StdinClosed);
}
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
process.write_stdin(request.input.as_bytes()).await?;
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -407,10 +408,10 @@ impl UnifiedExecProcessManager {
return ProcessStatus::Unknown;
};
let exit_code = entry.process.exit_code();
let exit_code = entry.backend.exit_code();
let process_id = entry.process_id;
if entry.process.has_exited() {
if entry.backend.has_exited() {
let Some(entry) = store.remove(process_id) else {
return ProcessStatus::Unknown;
};
@@ -448,14 +449,13 @@ impl UnifiedExecProcessManager {
output_closed,
output_closed_notify,
cancellation_token,
} = entry.process.output_handles();
} = entry.backend.output_handles();
let pause_state = entry
.session
.upgrade()
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
Ok(PreparedProcessHandles {
writer_tx: entry.process.writer_sender(),
output_buffer,
output_notify,
output_closed,
@@ -468,20 +468,10 @@ impl UnifiedExecProcessManager {
})
}
async fn send_input(
writer_tx: &mpsc::Sender<Vec<u8>>,
data: &[u8],
) -> Result<(), UnifiedExecError> {
writer_tx
.send(data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)
}
#[allow(clippy::too_many_arguments)]
async fn store_process(
&self,
process: Arc<UnifiedExecProcess>,
process: Arc<ProcessBackend>,
context: &UnifiedExecContext,
command: &[String],
cwd: PathBuf,
@@ -492,7 +482,7 @@ impl UnifiedExecProcessManager {
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
) {
let entry = ProcessEntry {
process: Arc::clone(&process),
backend: Arc::clone(&process),
call_id: context.call_id.clone(),
process_id,
command: command.to_vec(),
@@ -511,7 +501,7 @@ impl UnifiedExecProcessManager {
// network-approval cleanup only after dropping that lock.
if let Some(pruned_entry) = pruned_entry {
Self::unregister_network_approval_for_entry(&pruned_entry).await;
pruned_entry.process.terminate();
pruned_entry.backend.terminate();
}
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
@@ -540,9 +530,29 @@ impl UnifiedExecProcessManager {
pub(crate) async fn open_session_with_exec_env(
&self,
env: &ExecRequest,
process_id: i32,
tty: bool,
use_exec_server: bool,
mut executor: Option<Arc<dyn ExecProcess>>,
mut spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
if use_exec_server {
return self
.open_session_with_exec_server(env, process_id, tty, &mut executor, spawn_lifecycle)
.await;
}
self.open_session_with_local_process(env, process_id, tty, spawn_lifecycle)
.await
}
async fn open_session_with_local_process(
&self,
env: &ExecRequest,
process_id: i32,
tty: bool,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
let (program, args) = env
.command
.split_first()
@@ -571,10 +581,58 @@ impl UnifiedExecProcessManager {
)
.await
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
let spawned = spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
let process = UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await?;
Ok(Arc::new(ProcessBackend::Local {
process: Arc::new(process),
}))
}
async fn open_session_with_exec_server(
&self,
env: &ExecRequest,
process_id: i32,
tty: bool,
mut executor: &mut Option<Arc<dyn ExecProcess>>,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
let executor = executor
.take()
.ok_or_else(|| UnifiedExecError::create_process("exec-server unavailable".to_string()))?;
let response = executor
.start(ExecParams {
process_id: process_id.to_string(),
argv: env.command.clone(),
cwd: env.cwd.clone(),
env: env.env.clone(),
tty,
arg0: env.arg0.clone(),
})
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
let _ = response.process_id;
let process = Arc::new(ProcessBackend::ExecServer {
process_id: process_id.to_string(),
executor,
output_buffer: Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())),
output_notify: Arc::new(Notify::new()),
output_closed: Arc::new(AtomicBool::new(false)),
output_closed_notify: Arc::new(Notify::new()),
output_drained: Arc::new(Notify::new()),
cancellation_token: CancellationToken::new(),
exit_code: Arc::new(RwLock::new(None)),
has_exited: Arc::new(AtomicBool::new(false)),
sandbox_type: env.sandbox,
output_seq: Arc::new(AtomicU64::new(0)),
});
spawn_lifecycle.after_spawn();
Self::spawn_exec_server_output_watcher(Arc::clone(&process));
Ok(process)
}
pub(super) async fn open_session_with_sandbox(
@@ -582,16 +640,20 @@ impl UnifiedExecProcessManager {
request: &ExecCommandRequest,
cwd: PathBuf,
context: &UnifiedExecContext,
) -> Result<(UnifiedExecProcess, Option<DeferredNetworkApproval>), UnifiedExecError> {
) -> Result<(Arc<ProcessBackend>, Option<DeferredNetworkApproval>), UnifiedExecError> {
let env = apply_unified_exec_env(create_env(
&context.turn.shell_environment_policy,
Some(context.session.conversation_id),
));
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = UnifiedExecRuntime::new(
self,
context.turn.tools_config.unified_exec_shell_mode.clone(),
);
let mut runtime = if context.turn.config.experimental_exec_server_url.is_some() {
UnifiedExecRuntime::with_exec_server(
self,
context.turn.tools_config.unified_exec_shell_mode.clone(),
)
} else {
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_shell_mode.clone())
};
let exec_approval_requirement = context
.session
.services
@@ -618,6 +680,8 @@ impl UnifiedExecProcessManager {
tty: request.tty,
sandbox_permissions: request.sandbox_permissions,
additional_permissions: request.additional_permissions.clone(),
process_id: request.process_id,
use_exec_server: context.turn.config.experimental_exec_server_url.is_some(),
#[cfg(unix)]
additional_permissions_preapproved: request.additional_permissions_preapproved,
justification: request.justification.clone(),
@@ -775,7 +839,7 @@ impl UnifiedExecProcessManager {
let meta: Vec<(i32, Instant, bool)> = store
.processes
.iter()
.map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited()))
.map(|(id, entry)| (*id, entry.last_used, entry.backend.has_exited()))
.collect();
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
@@ -828,11 +892,129 @@ impl UnifiedExecProcessManager {
for entry in entries {
Self::unregister_network_approval_for_entry(&entry).await;
entry.process.terminate();
entry.backend.terminate();
}
}
}
impl UnifiedExecProcessManager {
fn spawn_exec_server_output_watcher(process: Arc<ProcessBackend>) {
let (
process_id,
executor,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
output_drained,
output_seq,
has_exited,
exit_code,
cancellation_token,
) = {
if let ProcessBackend::ExecServer {
process_id,
executor,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
output_drained,
output_seq,
has_exited,
exit_code,
cancellation_token,
..
} = process.as_ref()
{
(
process_id.clone(),
Arc::clone(executor),
Arc::clone(output_buffer),
Arc::clone(output_notify),
Arc::clone(output_closed),
Arc::clone(output_closed_notify),
Arc::clone(output_drained),
Arc::clone(output_seq),
Arc::clone(has_exited),
Arc::clone(exit_code),
cancellation_token.clone(),
)
} else {
return;
}
};
tokio::spawn(async move {
let mut after_seq = None;
loop {
if cancellation_token.is_cancelled() {
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
output_drained.notify_waiters();
break;
}
let response = match executor
.read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: Some(64 * 1024),
wait_ms: Some(100),
})
.await
{
Ok(response) => response,
Err(_) => {
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
if guard.is_none() {
*guard = Some(-1);
}
has_exited.store(true, Ordering::Release);
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
output_drained.notify_waiters();
cancellation_token.cancel();
break;
}
};
if !response.chunks.is_empty() {
let mut current_seq = output_seq.load(Ordering::Acquire);
for chunk in response.chunks {
if chunk.seq < current_seq {
continue;
}
{
let mut guard = output_buffer.lock().await;
guard.push_chunk(chunk.chunk.into_inner());
}
current_seq = chunk.seq.saturating_add(1);
output_seq.store(current_seq, Ordering::Release);
output_notify.notify_waiters();
}
}
if let Some(code) = response.exit_code {
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
*guard = Some(code);
}
after_seq = Some(response.next_seq);
if response.exited {
has_exited.store(true, Ordering::Release);
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
output_drained.notify_waiters();
cancellation_token.cancel();
break;
}
}
});
}
}
enum ProcessStatus {
Alive {
exit_code: Option<i32>,