Compare commits

...

6 Commits

Author SHA1 Message Date
starr-openai
580c5eb99d exec-server: cancel pending startup approval on terminate
Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:10:16 -07:00
starr-openai
da8522119d codex: remove stale exec-server disabled-environment test
Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:10:16 -07:00
starr-openai
16596276e5 codex: fix sdk follow-up on PR #16937
Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:10:16 -07:00
starr-openai
bfebabe26b codex: fix follow-up CI failures on PR #16937
Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:10:15 -07:00
starr-openai
b01ed02390 codex: fix CI failure on PR #16937
Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:10:15 -07:00
starr-openai
fa2389b61d Surface remote startup exec approvals
Add exec-server startup approval plumbing, wire it into unified exec, and cover the live remote path with focused smoke coverage.

Co-authored-by: Codex <noreply@openai.com>
2026-04-08 21:09:05 -07:00
19 changed files with 1048 additions and 81 deletions

View File

@@ -33,6 +33,7 @@ use crate::unified_exec::NoopSpawnLifecycle;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecProcessManager;
use codex_exec_server::ExecApprovalRequest as RemoteExecApprovalRequest;
use codex_network_proxy::NetworkProxy;
use codex_protocol::error::CodexErr;
use codex_protocol::error::SandboxErr;
@@ -62,6 +63,7 @@ pub struct UnifiedExecRequest {
pub additional_permissions_preapproved: bool,
pub justification: Option<String>,
pub exec_approval_requirement: ExecApprovalRequirement,
pub remote_startup_exec_approval: Option<RemoteExecApprovalRequest>,
}
/// Cache key for approval decisions that can be reused across equivalent
@@ -173,7 +175,17 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
&self,
req: &UnifiedExecRequest,
) -> Option<ExecApprovalRequirement> {
Some(req.exec_approval_requirement.clone())
if req.remote_startup_exec_approval.is_some() {
Some(ExecApprovalRequirement::Skip {
bypass_sandbox: false,
proposed_execpolicy_amendment: req
.exec_approval_requirement
.proposed_execpolicy_amendment()
.cloned(),
})
} else {
Some(req.exec_approval_requirement.clone())
}
}
fn sandbox_mode_for_first_attempt(&self, req: &UnifiedExecRequest) -> SandboxOverride {
@@ -256,6 +268,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
req.process_id,
&prepared.exec_request,
req.tty,
/*startup_exec_approval*/ None,
prepared.spawn_lifecycle,
environment.as_ref(),
)
@@ -297,6 +310,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
req.process_id,
&exec_env,
req.tty,
req.remote_startup_exec_approval.clone(),
Box::new(NoopSpawnLifecycle),
environment.as_ref(),
)

View File

@@ -99,6 +99,7 @@ async fn exec_command_with_tty(
process_id,
&request,
tty,
/*startup_exec_approval*/ None,
Box::new(NoopSpawnLifecycle),
turn.environment.as_ref().expect("turn environment"),
)
@@ -514,6 +515,7 @@ async fn completed_pipe_commands_preserve_exit_code() -> anyhow::Result<()> {
/*process_id*/ 1234,
&request,
/*tty*/ false,
/*startup_exec_approval*/ None,
Box::new(NoopSpawnLifecycle),
&environment,
)
@@ -556,6 +558,7 @@ async fn unified_exec_uses_remote_exec_server_when_configured() -> anyhow::Resul
/*process_id*/ 1234,
&request,
/*tty*/ true,
/*startup_exec_approval*/ None,
Box::new(NoopSpawnLifecycle),
remote_test_env.environment(),
)
@@ -610,6 +613,7 @@ async fn remote_exec_server_rejects_inherited_fd_launches() -> anyhow::Result<()
/*process_id*/ 1234,
&request,
/*tty*/ true,
/*startup_exec_approval*/ None,
Box::new(TestSpawnLifecycle {
inherited_fds: vec![42],
}),

View File

@@ -13,6 +13,7 @@ use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::exec::is_likely_sandbox_denied;
use codex_exec_server::ExecApprovalRequest;
use codex_exec_server::ExecProcess;
use codex_exec_server::ReadResponse as ExecReadResponse;
use codex_exec_server::StartedExecProcess;
@@ -224,6 +225,32 @@ impl UnifiedExecProcess {
self.state_rx.borrow().failure_message.clone()
}
pub(super) async fn take_pending_exec_approval(&self) -> Option<ExecApprovalRequest> {
let state = self.state_rx.borrow().clone();
let pending_exec_approval = state.pending_exec_approval.clone();
let _ = self
.state_tx
.send_replace(state.clear_pending_exec_approval());
pending_exec_approval
}
pub(super) async fn resolve_exec_approval(
&self,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<(), UnifiedExecError> {
match &self.process_handle {
ProcessHandle::Local(_) => Err(UnifiedExecError::create_process(
"local unified exec process cannot resolve remote exec approvals".to_string(),
)),
ProcessHandle::Remote(process_handle) => process_handle
.resolve_exec_approval(approval_id, decision)
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))
.map(|_| ()),
}
}
pub(super) async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
let _ =
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
@@ -349,7 +376,10 @@ impl UnifiedExecProcess {
if tokio::time::timeout(EARLY_EXIT_GRACE_PERIOD, async {
loop {
let state = state_rx.borrow().clone();
if state.has_exited || state.failure_message.is_some() {
if state.has_exited
|| state.failure_message.is_some()
|| state.pending_exec_approval.is_some()
{
break;
}
if state_rx.changed().await.is_err() {
@@ -396,6 +426,7 @@ impl UnifiedExecProcess {
exit_code,
closed,
failure,
exec_approval,
} = response;
for chunk in chunks {
@@ -416,6 +447,13 @@ impl UnifiedExecProcess {
break;
}
if let Some(exec_approval) = exec_approval {
let state = state_tx.borrow().clone();
let _ = state_tx
.send_replace(state.with_pending_exec_approval(exec_approval));
output_notify.notify_waiters();
}
if exited {
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.exited(exit_code));

View File

@@ -2,6 +2,8 @@ use rand::Rng;
use std::cmp::Reverse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -11,6 +13,7 @@ use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec_env::create_env;
use crate::exec_policy::ExecApprovalRequest;
use crate::sandboxing::ExecRequest;
@@ -21,9 +24,11 @@ use crate::tools::events::ToolEventStage;
use crate::tools::network_approval::DeferredNetworkApproval;
use crate::tools::network_approval::finish_deferred_network_approval;
use crate::tools::orchestrator::ToolOrchestrator;
use crate::tools::runtimes::unified_exec::UnifiedExecApprovalKey;
use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest;
use crate::tools::runtimes::unified_exec::UnifiedExecRuntime;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::with_cached_approval;
use crate::unified_exec::ExecCommandRequest;
use crate::unified_exec::MAX_UNIFIED_EXEC_PROCESSES;
use crate::unified_exec::MAX_YIELD_TIME_MS;
@@ -47,7 +52,9 @@ use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::SpawnLifecycleHandle;
use crate::unified_exec::process::UnifiedExecProcess;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::ReviewDecision;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::approx_token_count;
@@ -107,6 +114,34 @@ fn exec_server_process_id(process_id: i32) -> String {
process_id.to_string()
}
fn startup_exec_approval_request(
context: &UnifiedExecContext,
request: &ExecCommandRequest,
cwd: &AbsolutePathBuf,
exec_approval_requirement: &crate::tools::sandboxing::ExecApprovalRequirement,
) -> Option<codex_exec_server::ExecApprovalRequest> {
match exec_approval_requirement {
crate::tools::sandboxing::ExecApprovalRequirement::NeedsApproval {
reason,
proposed_execpolicy_amendment,
} => Some(codex_exec_server::ExecApprovalRequest {
call_id: context.call_id.clone(),
approval_id: None,
turn_id: context.turn.sub_id.clone(),
command: request.command.clone(),
cwd: cwd.to_path_buf(),
reason: reason.clone().or_else(|| request.justification.clone()),
additional_permissions: request.additional_permissions.clone().map(Into::into),
proposed_execpolicy_amendment: proposed_execpolicy_amendment
.clone()
.map(codex_app_server_protocol::ExecPolicyAmendment::from),
available_decisions: None,
}),
crate::tools::sandboxing::ExecApprovalRequirement::Skip { .. }
| crate::tools::sandboxing::ExecApprovalRequirement::Forbidden { .. } => None,
}
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> i32 {
loop {
@@ -230,7 +265,7 @@ impl UnifiedExecProcessManager {
cancellation_token,
} = process.output_handles();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected = Self::collect_output_until_deadline(
let mut collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
@@ -244,6 +279,71 @@ impl UnifiedExecProcessManager {
deadline,
)
.await;
if let Some(exec_approval) = process.take_pending_exec_approval().await {
let call_id = exec_approval.call_id.clone();
let approval_id = exec_approval.approval_id.clone();
let effective_approval_id = approval_id.clone().unwrap_or_else(|| call_id.clone());
let approval_keys = vec![UnifiedExecApprovalKey {
command: canonicalize_command_for_approval(&request.command),
cwd: cwd.clone(),
tty: request.tty,
sandbox_permissions: request.sandbox_permissions,
additional_permissions: request.additional_permissions.clone(),
}];
let decision = with_cached_approval(
&context.session.services,
"unified_exec",
approval_keys,
|| async move {
context
.session
.request_command_approval(
&context.turn,
call_id,
approval_id,
exec_approval.command,
exec_approval.cwd,
exec_approval.reason,
/*network_approval_context*/ None,
exec_approval
.proposed_execpolicy_amendment
.map(codex_app_server_protocol::ExecPolicyAmendment::into_core),
exec_approval.additional_permissions.map(Into::into),
exec_approval.available_decisions.map(|decisions| {
decisions
.into_iter()
.map(command_execution_approval_decision_to_core)
.collect()
}),
)
.await
},
)
.await;
process
.resolve_exec_approval(
effective_approval_id,
command_execution_approval_decision_from_core(decision),
)
.await?;
let remaining_deadline = deadline.max(Instant::now() + Duration::from_millis(1));
let additional_output = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
Some(
context
.session
.subscribe_out_of_band_elicitation_pause_state(),
),
remaining_deadline,
)
.await;
collected.extend(additional_output);
}
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
@@ -584,6 +684,7 @@ impl UnifiedExecProcessManager {
process_id: i32,
request: &ExecRequest,
tty: bool,
startup_exec_approval: Option<codex_exec_server::ExecApprovalRequest>,
mut spawn_lifecycle: SpawnLifecycleHandle,
environment: &codex_exec_server::Environment,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
@@ -609,6 +710,7 @@ impl UnifiedExecProcessManager {
env: request.env.clone(),
tty,
arg0: request.arg0.clone(),
startup_exec_approval,
})
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
@@ -675,6 +777,11 @@ impl UnifiedExecProcessManager {
prefix_rule: request.prefix_rule.clone(),
})
.await;
let remote_startup_exec_approval = if context.turn.environment.exec_server_url().is_some() {
startup_exec_approval_request(context, request, &cwd, &exec_approval_requirement)
} else {
None
};
let req = UnifiedExecToolRequest {
command: request.command.clone(),
process_id: request.process_id,
@@ -689,6 +796,7 @@ impl UnifiedExecProcessManager {
additional_permissions_preapproved: request.additional_permissions_preapproved,
justification: request.justification.clone(),
exec_approval_requirement,
remote_startup_exec_approval,
};
let tool_ctx = ToolCtx {
session: context.session.clone(),
@@ -913,6 +1021,48 @@ enum ProcessStatus {
Unknown,
}
fn command_execution_approval_decision_to_core(
decision: CommandExecutionApprovalDecision,
) -> ReviewDecision {
match decision {
CommandExecutionApprovalDecision::Accept => ReviewDecision::Approved,
CommandExecutionApprovalDecision::AcceptForSession => ReviewDecision::ApprovedForSession,
CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment,
} => ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment: execpolicy_amendment.into_core(),
},
CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment {
network_policy_amendment,
} => ReviewDecision::NetworkPolicyAmendment {
network_policy_amendment: network_policy_amendment.into_core(),
},
CommandExecutionApprovalDecision::Decline => ReviewDecision::Denied,
CommandExecutionApprovalDecision::Cancel => ReviewDecision::Abort,
}
}
fn command_execution_approval_decision_from_core(
decision: ReviewDecision,
) -> CommandExecutionApprovalDecision {
match decision {
ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept,
ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment,
} => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment: proposed_execpolicy_amendment.into(),
},
ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession,
ReviewDecision::NetworkPolicyAmendment {
network_policy_amendment,
} => CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment {
network_policy_amendment: network_policy_amendment.into(),
},
ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline,
ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel,
}
}
#[cfg(test)]
#[path = "process_manager_tests.rs"]
mod tests;

View File

@@ -1,8 +1,11 @@
#[derive(Clone, Debug, Default, Eq, PartialEq)]
use codex_exec_server::ExecApprovalRequest;
#[derive(Clone, Debug, Default, PartialEq)]
pub(crate) struct ProcessState {
pub(crate) has_exited: bool,
pub(crate) exit_code: Option<i32>,
pub(crate) failure_message: Option<String>,
pub(crate) pending_exec_approval: Option<ExecApprovalRequest>,
}
impl ProcessState {
@@ -11,6 +14,7 @@ impl ProcessState {
has_exited: true,
exit_code,
failure_message: self.failure_message.clone(),
pending_exec_approval: self.pending_exec_approval.clone(),
}
}
@@ -19,6 +23,28 @@ impl ProcessState {
has_exited: true,
exit_code: self.exit_code,
failure_message: Some(message),
pending_exec_approval: self.pending_exec_approval.clone(),
}
}
pub(crate) fn with_pending_exec_approval(
&self,
pending_exec_approval: ExecApprovalRequest,
) -> Self {
Self {
has_exited: self.has_exited,
exit_code: self.exit_code,
failure_message: self.failure_message.clone(),
pending_exec_approval: Some(pending_exec_approval),
}
}
pub(crate) fn clear_pending_exec_approval(&self) -> Self {
Self {
has_exited: self.has_exited,
exit_code: self.exit_code,
failure_message: self.failure_message.clone(),
pending_exec_approval: None,
}
}
}

View File

@@ -1,10 +1,13 @@
use super::process::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecError;
use async_trait::async_trait;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_exec_server::ExecApprovalRequest;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecServerError;
use codex_exec_server::ProcessId;
use codex_exec_server::ReadResponse;
use codex_exec_server::ResolveExecApprovalResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteResponse;
use codex_exec_server::WriteStatus;
@@ -51,6 +54,7 @@ impl ExecProcess for MockExecProcess {
exit_code: None,
closed: false,
failure: None,
exec_approval: None,
}))
}
@@ -61,6 +65,14 @@ impl ExecProcess for MockExecProcess {
async fn terminate(&self) -> Result<(), ExecServerError> {
Ok(())
}
async fn resolve_exec_approval(
&self,
_approval_id: String,
_decision: CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError> {
Ok(ResolveExecApprovalResponse { accepted: true })
}
}
async fn remote_process(write_status: WriteStatus) -> UnifiedExecProcess {
@@ -123,6 +135,7 @@ async fn remote_process_waits_for_early_exit_event() {
exit_code: Some(17),
closed: true,
failure: None,
exec_approval: None,
}])),
wake_tx: wake_tx.clone(),
}),
@@ -140,3 +153,48 @@ async fn remote_process_waits_for_early_exit_event() {
assert!(process.has_exited());
assert_eq!(process.exit_code(), Some(17));
}
#[tokio::test]
async fn remote_process_surfaces_pending_exec_approval() {
let approval = ExecApprovalRequest {
call_id: "call-1".to_string(),
approval_id: Some("approval-1".to_string()),
turn_id: "turn-1".to_string(),
command: vec!["git".to_string(), "status".to_string()],
cwd: "/tmp".into(),
reason: Some("approval required".to_string()),
additional_permissions: None,
proposed_execpolicy_amendment: None,
available_decisions: Some(vec![CommandExecutionApprovalDecision::Accept]),
};
let (wake_tx, _wake_rx) = watch::channel(0);
let started = StartedExecProcess {
process: Arc::new(MockExecProcess {
process_id: "test-process".to_string().into(),
write_response: WriteResponse {
status: WriteStatus::Accepted,
},
read_responses: Mutex::new(VecDeque::from([ReadResponse {
chunks: Vec::new(),
next_seq: 2,
exited: false,
exit_code: None,
closed: false,
failure: None,
exec_approval: Some(approval.clone()),
}])),
wake_tx: wake_tx.clone(),
}),
};
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
let _ = wake_tx.send(1);
});
let process = UnifiedExecProcess::from_remote_started(started, SandboxType::None)
.await
.expect("remote process should start");
assert_eq!(process.take_pending_exec_approval().await, Some(approval));
}

View File

@@ -11,9 +11,11 @@ use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::assert_regex_match;
use core_test_support::get_remote_test_env;
use core_test_support::process::process_is_alive;
use core_test_support::process::wait_for_pid_file;
use core_test_support::process::wait_for_process_exit;
@@ -390,6 +392,120 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_unified_exec_smoke_requests_approval_before_running_command() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
if get_remote_test_env().is_none() {
return Ok(());
}
let builder = test_codex().with_model("gpt-5").with_config(|config| {
config.use_experimental_unified_exec_tool = true;
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let server = start_mock_server().await;
let mut builder = builder;
let test = builder.build_remote_aware(&server).await?;
let call_id = "uexec-remote-approval-smoke";
let smoke_target = "/tmp/nonexistent";
let smoke_command = format!("rm -rf {smoke_target}");
let args = json!({
"cmd": smoke_command,
"yield_time_ms": 250,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "approved"),
ev_completed("resp-2"),
]),
];
mount_sse_sequence(&server, responses).await;
let codex = test.codex.clone();
let cwd = test.cwd_path().to_path_buf();
let session_model = test.session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "run remote unified exec approval smoke".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd,
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let approval_or_completion = wait_for_event(&codex, |msg| {
matches!(
msg,
EventMsg::ExecApprovalRequest(approval) if approval.call_id == call_id
) || matches!(msg, EventMsg::TurnComplete(_))
})
.await;
let EventMsg::ExecApprovalRequest(approval) = approval_or_completion else {
let bodies: Vec<Value> = server
.received_requests()
.await
.expect("mock server should not fail")
.into_iter()
.filter_map(|req| serde_json::from_slice::<Value>(&req.body).ok())
.collect();
panic!("remote smoke completed without approval event; bodies={bodies:?}");
};
codex
.submit(Op::ExecApproval {
id: approval.effective_approval_id(),
turn_id: None,
decision: ReviewDecision::Approved,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let bodies: Vec<Value> = server
.received_requests()
.await
.expect("mock server should not fail")
.into_iter()
.filter_map(|req| serde_json::from_slice::<Value>(&req.body).ok())
.collect();
let outputs = collect_tool_outputs(&bodies)?;
let output = outputs
.get(call_id)
.expect("missing function call output for remote smoke");
assert!(
output.process_id.is_some() || output.exit_code.is_some(),
"expected approved remote unified exec metadata, got: {output:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_resolves_relative_workdir() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -21,6 +21,7 @@ use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_RESOLVE_APPROVAL_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecClosedNotification;
@@ -55,6 +56,8 @@ use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -257,6 +260,26 @@ impl ExecServerClient {
.map_err(Into::into)
}
pub async fn resolve_exec_approval(
&self,
process_id: &ProcessId,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError> {
self.inner
.client
.call(
EXEC_RESOLVE_APPROVAL_METHOD,
&ResolveExecApprovalParams {
process_id: process_id.clone(),
approval_id,
decision,
},
)
.await
.map_err(Into::into)
}
pub async fn fs_read_file(
&self,
params: FsReadFileParams,
@@ -464,6 +487,7 @@ impl SessionState {
exit_code: None,
closed: true,
failure: Some(message),
exec_approval: None,
}
}
}
@@ -516,6 +540,16 @@ impl Session {
Ok(())
}
pub(crate) async fn resolve_exec_approval(
&self,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError> {
self.client
.resolve_exec_approval(&self.process_id, approval_id, decision)
.await
}
pub(crate) async fn unregister(&self) {
self.client.unregister_session(&self.process_id).await;
}

View File

@@ -291,6 +291,7 @@ mod tests {
env: Default::default(),
tty: false,
arg0: None,
startup_exec_approval: None,
})
.await
.expect("start process");

View File

@@ -32,6 +32,7 @@ pub use process::ExecBackend;
pub use process::ExecProcess;
pub use process::StartedExecProcess;
pub use process_id::ProcessId;
pub use protocol::ExecApprovalRequest;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
@@ -57,6 +58,8 @@ pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::ResolveExecApprovalParams;
pub use protocol::ResolveExecApprovalResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;

View File

@@ -8,6 +8,7 @@ use std::time::Duration;
use async_trait::async_trait;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::SpawnedPty;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
@@ -20,6 +21,7 @@ use crate::ExecServerError;
use crate::ProcessId;
use crate::StartedExecProcess;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::ExecApprovalRequest;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
@@ -30,6 +32,8 @@ use crate::protocol::InitializeResponse;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -56,7 +60,7 @@ struct RetainedOutputChunk {
}
struct RunningProcess {
session: ExecCommandSession,
session: Option<ExecCommandSession>,
tty: bool,
output: VecDeque<RetainedOutputChunk>,
retained_bytes: usize,
@@ -66,6 +70,9 @@ struct RunningProcess {
output_notify: Arc<Notify>,
open_streams: usize,
closed: bool,
failure: Option<String>,
pending_exec_approval: Option<ExecApprovalRequest>,
pending_start: Option<ExecParams>,
}
enum ProcessEntry {
@@ -124,7 +131,9 @@ impl LocalProcess {
.collect::<Vec<_>>()
};
for process in remaining {
process.session.terminate();
if let Some(session) = process.session.as_ref() {
session.terminate();
}
}
}
@@ -162,16 +171,41 @@ impl LocalProcess {
Ok(())
}
async fn spawn_process_session(params: &ExecParams) -> Result<SpawnedPty, JSONRPCErrorError> {
let (program, args) = params
.argv
.split_first()
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
if params.tty {
codex_utils_pty::spawn_pty_process(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
TerminalSize::default(),
)
.await
.map_err(|err| internal_error(err.to_string()))
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
)
.await
.map_err(|err| internal_error(err.to_string()))
}
}
async fn start_process(
&self,
params: ExecParams,
) -> Result<(ExecResponse, watch::Sender<u64>), JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
let (program, args) = params
.argv
.split_first()
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
{
let mut process_map = self.inner.processes.lock().await;
@@ -183,45 +217,14 @@ impl LocalProcess {
process_map.insert(process_id.clone(), ProcessEntry::Starting);
}
let spawned_result = if params.tty {
codex_utils_pty::spawn_pty_process(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
TerminalSize::default(),
)
.await
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
)
.await
};
let spawned = match spawned_result {
Ok(spawned) => spawned,
Err(err) => {
let mut process_map = self.inner.processes.lock().await;
if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) {
process_map.remove(&process_id);
}
return Err(internal_error(err.to_string()));
}
};
let output_notify = Arc::new(Notify::new());
let (wake_tx, _wake_rx) = watch::channel(0);
{
if let Some(startup_exec_approval) = params.startup_exec_approval.clone() {
let mut process_map = self.inner.processes.lock().await;
process_map.insert(
process_id.clone(),
ProcessEntry::Running(Box::new(RunningProcess {
session: spawned.session,
session: None,
tty: params.tty,
output: VecDeque::new(),
retained_bytes: 0,
@@ -229,40 +232,76 @@ impl LocalProcess {
exit_code: None,
wake_tx: wake_tx.clone(),
output_notify: Arc::clone(&output_notify),
open_streams: 2,
open_streams: 0,
closed: false,
failure: None,
pending_exec_approval: Some(startup_exec_approval),
pending_start: Some(params),
})),
);
}
} else {
let spawned = match Self::spawn_process_session(&params).await {
Ok(spawned) => spawned,
Err(err) => {
let mut process_map = self.inner.processes.lock().await;
if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) {
process_map.remove(&process_id);
}
return Err(err);
}
};
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
Arc::clone(&self.inner),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
Arc::clone(&self.inner),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id.clone(),
spawned.exit_rx,
Arc::clone(&self.inner),
output_notify,
));
{
let mut process_map = self.inner.processes.lock().await;
process_map.insert(
process_id.clone(),
ProcessEntry::Running(Box::new(RunningProcess {
session: Some(spawned.session),
tty: params.tty,
output: VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
wake_tx: wake_tx.clone(),
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
failure: None,
pending_exec_approval: None,
pending_start: None,
})),
);
}
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
Arc::clone(&self.inner),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
Arc::clone(&self.inner),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id.clone(),
spawned.exit_rx,
Arc::clone(&self.inner),
output_notify,
));
}
Ok((ExecResponse { process_id }, wake_tx))
}
@@ -324,14 +363,18 @@ impl LocalProcess {
exited: process.exit_code.is_some(),
exit_code: process.exit_code,
closed: process.closed,
failure: None,
failure: process.failure.clone(),
exec_approval: process.pending_exec_approval.clone(),
},
Arc::clone(&process.output_notify),
)
};
if !response.chunks.is_empty()
|| response.exec_approval.is_some()
|| response.failure.is_some()
|| response.exited
|| response.closed
|| tokio::time::Instant::now() >= deadline
{
let _total_bytes: usize = response
@@ -374,7 +417,12 @@ impl LocalProcess {
status: WriteStatus::StdinClosed,
});
}
process.session.writer_sender()
let Some(session) = process.session.as_ref() else {
return Ok(WriteResponse {
status: WriteStatus::Starting,
});
};
session.writer_sender()
};
writer_tx
@@ -387,6 +435,151 @@ impl LocalProcess {
})
}
pub(crate) async fn resolve_exec_approval(
&self,
params: ResolveExecApprovalParams,
) -> Result<ResolveExecApprovalResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let allows_spawn = matches!(
params.decision,
codex_app_server_protocol::CommandExecutionApprovalDecision::Accept
| codex_app_server_protocol::CommandExecutionApprovalDecision::AcceptForSession
| codex_app_server_protocol::CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment { .. }
| codex_app_server_protocol::CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment { .. }
);
let pending_start = {
let mut process_map = self.inner.processes.lock().await;
let Some(process) = process_map.get_mut(&params.process_id) else {
return Err(invalid_request(format!(
"unknown process id {}",
params.process_id
)));
};
let ProcessEntry::Running(process) = process else {
return Err(invalid_request(format!(
"process id {} is starting",
params.process_id
)));
};
let Some(pending) = process.pending_exec_approval.as_ref() else {
return Err(invalid_request(format!(
"process id {} has no pending exec approval",
params.process_id
)));
};
let effective_approval_id = pending
.approval_id
.as_deref()
.unwrap_or(pending.call_id.as_str());
if effective_approval_id != params.approval_id {
return Err(invalid_request(format!(
"process id {} has no pending approval {}",
params.process_id, params.approval_id
)));
}
process.pending_exec_approval = None;
if allows_spawn {
process.pending_start.take()
} else {
process.failure = Some("rejected by user".to_string());
process.exit_code = Some(1);
process.closed = true;
process.pending_start = None;
process.output_notify.notify_waiters();
let _ = process.wake_tx.send(process.next_seq);
None
}
};
if let Some(pending_start) = pending_start {
let output_notify = {
let process_map = self.inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = process_map.get(&params.process_id)
else {
return Err(invalid_request(format!(
"process id {} disappeared while resolving approval",
params.process_id
)));
};
Arc::clone(&process.output_notify)
};
match Self::spawn_process_session(&pending_start).await {
Ok(spawned) => {
{
let mut process_map = self.inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) =
process_map.get_mut(&params.process_id)
else {
return Err(invalid_request(format!(
"process id {} disappeared while spawning",
params.process_id
)));
};
process.session = Some(spawned.session);
process.open_streams = 2;
}
let process_id = params.process_id.clone();
let inner = Arc::clone(&self.inner);
tokio::spawn(stream_output(
process_id.clone(),
if pending_start.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
Arc::clone(&inner),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
if pending_start.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
Arc::clone(&inner),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id,
spawned.exit_rx,
inner,
Arc::clone(&output_notify),
));
let process_map = self.inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = process_map.get(&params.process_id)
else {
return Err(invalid_request(format!(
"process id {} disappeared after spawning",
params.process_id
)));
};
process.output_notify.notify_waiters();
let _ = process.wake_tx.send(process.next_seq);
}
Err(err) => {
let mut process_map = self.inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) =
process_map.get_mut(&params.process_id)
else {
return Err(invalid_request(format!(
"process id {} disappeared after failed spawn",
params.process_id
)));
};
process.failure = Some(err.message.clone());
process.closed = true;
process.output_notify.notify_waiters();
let _ = process.wake_tx.send(process.next_seq);
}
}
}
Ok(ResolveExecApprovalResponse { accepted: true })
}
pub(crate) async fn terminate_process(
&self,
params: TerminateParams,
@@ -394,13 +587,23 @@ impl LocalProcess {
self.require_initialized_for("exec")?;
let _process_id = params.process_id.clone();
let running = {
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
let mut process_map = self.inner.processes.lock().await;
match process_map.get_mut(&params.process_id) {
Some(ProcessEntry::Running(process)) => {
if process.exit_code.is_some() {
if process.exit_code.is_some() || process.closed {
return Ok(TerminateResponse { running: false });
}
process.session.terminate();
if let Some(session) = process.session.as_ref() {
session.terminate();
} else {
process.pending_exec_approval = None;
process.pending_start = None;
process.failure = Some("terminated before process start".to_string());
process.exit_code = Some(1);
process.closed = true;
process.output_notify.notify_waiters();
let _ = process.wake_tx.send(process.next_seq);
}
true
}
Some(ProcessEntry::Starting) | None => false,
@@ -456,6 +659,21 @@ impl ExecProcess for LocalExecProcess {
async fn terminate(&self) -> Result<(), ExecServerError> {
self.backend.terminate(&self.process_id).await
}
async fn resolve_exec_approval(
&self,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError> {
self.backend
.resolve_exec_approval(ResolveExecApprovalParams {
process_id: self.process_id.clone(),
approval_id,
decision,
})
.await
.map_err(map_handler_error)
}
}
impl LocalProcess {

View File

@@ -7,6 +7,7 @@ use crate::ExecServerError;
use crate::ProcessId;
use crate::protocol::ExecParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::WriteResponse;
pub struct StartedExecProcess {
@@ -29,6 +30,12 @@ pub trait ExecProcess: Send + Sync {
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError>;
async fn terminate(&self) -> Result<(), ExecServerError>;
async fn resolve_exec_approval(
&self,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError>;
}
#[async_trait]

View File

@@ -12,6 +12,7 @@ use crate::ProcessId;
pub const INITIALIZE_METHOD: &str = "initialize";
pub const INITIALIZED_METHOD: &str = "initialized";
pub const EXEC_METHOD: &str = "process/start";
pub const EXEC_RESOLVE_APPROVAL_METHOD: &str = "process/resolveApproval";
pub const EXEC_READ_METHOD: &str = "process/read";
pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
@@ -52,7 +53,7 @@ pub struct InitializeParams {
#[serde(rename_all = "camelCase")]
pub struct InitializeResponse {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecParams {
/// Client-chosen logical process handle scoped to this connection/session.
@@ -63,6 +64,7 @@ pub struct ExecParams {
pub env: HashMap<String, String>,
pub tty: bool,
pub arg0: Option<String>,
pub startup_exec_approval: Option<ExecApprovalRequest>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -71,6 +73,21 @@ pub struct ExecResponse {
pub process_id: ProcessId,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecApprovalRequest {
pub call_id: String,
pub approval_id: Option<String>,
pub turn_id: String,
pub command: Vec<String>,
pub cwd: PathBuf,
pub reason: Option<String>,
pub additional_permissions: Option<codex_app_server_protocol::AdditionalPermissionProfile>,
pub proposed_execpolicy_amendment: Option<codex_app_server_protocol::ExecPolicyAmendment>,
pub available_decisions:
Option<Vec<codex_app_server_protocol::CommandExecutionApprovalDecision>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadParams {
@@ -88,7 +105,7 @@ pub struct ProcessOutputChunk {
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse {
pub chunks: Vec<ProcessOutputChunk>,
@@ -97,6 +114,7 @@ pub struct ReadResponse {
pub exit_code: Option<i32>,
pub closed: bool,
pub failure: Option<String>,
pub exec_approval: Option<ExecApprovalRequest>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -133,6 +151,14 @@ pub struct TerminateResponse {
pub running: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ResolveExecApprovalParams {
pub process_id: ProcessId,
pub approval_id: String,
pub decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FsReadFileParams {
@@ -233,6 +259,12 @@ pub struct FsCopyParams {
#[serde(rename_all = "camelCase")]
pub struct FsCopyResponse {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ResolveExecApprovalResponse {
pub accepted: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ExecOutputStream {

View File

@@ -12,6 +12,7 @@ use crate::client::ExecServerClient;
use crate::client::Session;
use crate::protocol::ExecParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::WriteResponse;
#[derive(Clone)]
@@ -74,6 +75,17 @@ impl ExecProcess for RemoteExecProcess {
trace!("exec process terminate");
self.session.terminate().await
}
async fn resolve_exec_approval(
&self,
approval_id: String,
decision: codex_app_server_protocol::CommandExecutionApprovalDecision,
) -> Result<ResolveExecApprovalResponse, ExecServerError> {
trace!("exec process resolve approval");
self.session
.resolve_exec_approval(approval_id, decision)
.await
}
}
impl Drop for RemoteExecProcess {

View File

@@ -19,6 +19,8 @@ use crate::protocol::FsWriteFileResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -64,6 +66,13 @@ impl ExecServerHandler {
self.process.exec_read(params).await
}
pub(crate) async fn resolve_exec_approval(
&self,
params: ResolveExecApprovalParams,
) -> Result<ResolveExecApprovalResponse, JSONRPCErrorError> {
self.process.resolve_exec_approval(params).await
}
pub(crate) async fn exec_write(
&self,
params: WriteParams,

View File

@@ -9,9 +9,12 @@ use super::ExecServerHandler;
use crate::ProcessId;
use crate::protocol::ExecParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::rpc::RpcNotificationSender;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
fn exec_params(process_id: &str) -> ExecParams {
let mut env = HashMap::new();
@@ -29,6 +32,7 @@ fn exec_params(process_id: &str) -> ExecParams {
env,
tty: false,
arg0: None,
startup_exec_approval: None,
}
}
@@ -101,3 +105,222 @@ async fn terminate_reports_false_after_process_exit() {
handler.shutdown().await;
}
#[tokio::test]
async fn startup_exec_approval_spawns_only_after_resolution() {
let handler = initialized_handler().await;
let mut params = exec_params("proc-approval");
params.argv = vec![
"bash".to_string(),
"-lc".to_string(),
"printf ready".to_string(),
];
params.startup_exec_approval = Some(crate::protocol::ExecApprovalRequest {
call_id: "call-1".to_string(),
approval_id: None,
turn_id: "turn-1".to_string(),
command: vec![
"bash".to_string(),
"-lc".to_string(),
"printf ready".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
reason: Some("approval required".to_string()),
additional_permissions: None,
proposed_execpolicy_amendment: None,
available_decisions: Some(vec![CommandExecutionApprovalDecision::Accept]),
});
handler.exec(params).await.expect("start process");
let pending = handler
.exec_read(ReadParams {
process_id: ProcessId::from("proc-approval"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
})
.await
.expect("read pending approval");
assert_eq!(pending.chunks, Vec::new());
assert!(pending.exec_approval.is_some());
handler
.resolve_exec_approval(ResolveExecApprovalParams {
process_id: ProcessId::from("proc-approval"),
approval_id: "call-1".to_string(),
decision: CommandExecutionApprovalDecision::Accept,
})
.await
.expect("resolve approval");
let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
loop {
let response = handler
.exec_read(ReadParams {
process_id: ProcessId::from("proc-approval"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
})
.await
.expect("read process output");
let output = response
.chunks
.iter()
.flat_map(|chunk| chunk.chunk.0.iter().copied())
.collect::<Vec<_>>();
if String::from_utf8_lossy(&output).contains("ready") {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"approved process did not produce output within timeout"
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
handler.shutdown().await;
}
#[tokio::test]
async fn startup_exec_approval_decline_returns_failure_without_spawning() {
let handler = initialized_handler().await;
let mut params = exec_params("proc-decline");
params.argv = vec![
"bash".to_string(),
"-lc".to_string(),
"printf should-not-run".to_string(),
];
params.startup_exec_approval = Some(crate::protocol::ExecApprovalRequest {
call_id: "call-2".to_string(),
approval_id: None,
turn_id: "turn-2".to_string(),
command: vec![
"bash".to_string(),
"-lc".to_string(),
"printf should-not-run".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
reason: Some("approval required".to_string()),
additional_permissions: None,
proposed_execpolicy_amendment: None,
available_decisions: Some(vec![CommandExecutionApprovalDecision::Decline]),
});
handler.exec(params).await.expect("start process");
handler
.resolve_exec_approval(ResolveExecApprovalParams {
process_id: ProcessId::from("proc-decline"),
approval_id: "call-2".to_string(),
decision: CommandExecutionApprovalDecision::Decline,
})
.await
.expect("resolve approval");
let response = handler
.exec_read(ReadParams {
process_id: ProcessId::from("proc-decline"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
})
.await
.expect("read declined process");
assert_eq!(response.chunks, Vec::new());
assert_eq!(response.failure.as_deref(), Some("rejected by user"));
assert!(response.closed);
handler.shutdown().await;
}
#[tokio::test]
async fn startup_exec_approval_terminate_cancels_pending_start() {
let handler = initialized_handler().await;
let mut params = exec_params("proc-terminated");
params.argv = vec![
"bash".to_string(),
"-lc".to_string(),
"printf should-not-run".to_string(),
];
params.startup_exec_approval = Some(crate::protocol::ExecApprovalRequest {
call_id: "call-terminate".to_string(),
approval_id: None,
turn_id: "turn-terminate".to_string(),
command: vec![
"bash".to_string(),
"-lc".to_string(),
"printf should-not-run".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
reason: Some("approval required".to_string()),
additional_permissions: None,
proposed_execpolicy_amendment: None,
available_decisions: Some(vec![CommandExecutionApprovalDecision::Accept]),
});
handler.exec(params).await.expect("start process");
let pending = handler
.exec_read(ReadParams {
process_id: ProcessId::from("proc-terminated"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
})
.await
.expect("read pending approval");
assert!(pending.exec_approval.is_some());
assert_eq!(
handler
.terminate(TerminateParams {
process_id: ProcessId::from("proc-terminated"),
})
.await
.expect("terminate response"),
TerminateResponse { running: true }
);
let cancelled = handler
.exec_read(ReadParams {
process_id: ProcessId::from("proc-terminated"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
})
.await
.expect("read cancelled process");
assert_eq!(cancelled.chunks, Vec::new());
assert_eq!(
cancelled.failure.as_deref(),
Some("terminated before process start")
);
assert!(cancelled.exec_approval.is_none());
assert!(cancelled.closed);
assert_eq!(cancelled.exit_code, Some(1));
let error = handler
.resolve_exec_approval(ResolveExecApprovalParams {
process_id: ProcessId::from("proc-terminated"),
approval_id: "call-terminate".to_string(),
decision: CommandExecutionApprovalDecision::Accept,
})
.await
.expect_err("terminated process should not accept approval");
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"process id proc-terminated has no pending exec approval"
);
assert_eq!(
handler
.terminate(TerminateParams {
process_id: ProcessId::from("proc-terminated"),
})
.await
.expect("second terminate response"),
TerminateResponse { running: false }
);
handler.shutdown().await;
}

View File

@@ -6,6 +6,8 @@ use crate::protocol::ExecResponse;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::ResolveExecApprovalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -54,6 +56,13 @@ impl ProcessHandler {
self.process.exec_read(params).await
}
pub(crate) async fn resolve_exec_approval(
&self,
params: ResolveExecApprovalParams,
) -> Result<ResolveExecApprovalResponse, JSONRPCErrorError> {
self.process.resolve_exec_approval(params).await
}
pub(crate) async fn exec_write(
&self,
params: WriteParams,

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_RESOLVE_APPROVAL_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecParams;
@@ -23,6 +24,7 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::ReadParams;
use crate::protocol::ResolveExecApprovalParams;
use crate::protocol::TerminateParams;
use crate::protocol::WriteParams;
use crate::rpc::RpcRouter;
@@ -52,6 +54,12 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
handler.exec_read(params).await
},
);
router.request(
EXEC_RESOLVE_APPROVAL_METHOD,
|handler: Arc<ExecServerHandler>, params: ResolveExecApprovalParams| async move {
handler.resolve_exec_approval(params).await
},
);
router.request(
EXEC_WRITE_METHOD,
|handler: Arc<ExecServerHandler>, params: WriteParams| async move {

View File

@@ -54,6 +54,7 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
env: Default::default(),
tty: false,
arg0: None,
startup_exec_approval: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), "proc-1");
@@ -130,6 +131,7 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
env: Default::default(),
tty: false,
arg0: None,
startup_exec_approval: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
@@ -159,6 +161,7 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
env: Default::default(),
tty: true,
arg0: None,
startup_exec_approval: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
@@ -195,6 +198,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe(
env: Default::default(),
tty: false,
arg0: None,
startup_exec_approval: None,
})
.await?;
@@ -225,6 +229,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
env: Default::default(),
tty: false,
arg0: None,
startup_exec_approval: None,
})
.await?;