Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Lin
e75fa2e460 fix(guardian): fix ordering of guardian events 2026-03-31 14:37:28 -07:00

View File

@@ -135,6 +135,7 @@ use codex_protocol::request_permissions::RequestPermissionsResponse as CoreReque
use codex_protocol::request_user_input::RequestUserInputAnswer as CoreRequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestUserInputResponse;
use codex_sandboxing::policy_transforms::intersect_permission_profiles;
use codex_shell_command::parse_command::parse_command;
use codex_shell_command::parse_command::shlex_join;
use std::collections::HashMap;
use std::convert::TryFrom;
@@ -159,6 +160,116 @@ struct CommandExecutionCompletionItem {
command_actions: Vec<V2ParsedCommand>,
}
fn synthetic_command_execution_item(
item_id: String,
command: String,
cwd: PathBuf,
command_actions: Vec<V2ParsedCommand>,
source: CommandExecutionSource,
status: CommandExecutionStatus,
) -> ThreadItem {
ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id: None,
source,
status,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
}
#[allow(clippy::too_many_arguments)]
async fn start_command_execution_item_once(
conversation_id: &ThreadId,
turn_id: String,
item_id: String,
command: String,
cwd: PathBuf,
command_actions: Vec<V2ParsedCommand>,
source: CommandExecutionSource,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) -> bool {
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.insert(item_id.clone())
};
if first_start {
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id,
item: synthetic_command_execution_item(
item_id,
command,
cwd,
command_actions,
source,
CommandExecutionStatus::InProgress,
),
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
first_start
}
fn guardian_pending_command_execution_item(
assessment: &GuardianAssessmentEvent,
) -> Option<CommandExecutionCompletionItem> {
let action = assessment.action.as_ref()?;
let tool = action.get("tool")?.as_str()?;
if !matches!(tool, "shell" | "exec_command") {
return None;
}
let command = action.get("command")?;
let (command, command_actions) = match command {
JsonValue::String(command) => (
command.clone(),
vec![V2ParsedCommand::Unknown {
command: command.clone(),
}],
),
JsonValue::Array(args) => {
let argv = args
.iter()
.map(JsonValue::as_str)
.collect::<Option<Vec<_>>>()?
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let command = shlex_join(&argv);
let parsed_cmd = parse_command(&argv);
let command_actions = if parsed_cmd.is_empty() {
vec![V2ParsedCommand::Unknown {
command: command.clone(),
}]
} else {
parsed_cmd
.into_iter()
.map(V2ParsedCommand::from)
.collect::<Vec<_>>()
};
(command, command_actions)
}
_ => return None,
};
let cwd = action.get("cwd")?.as_str().map(PathBuf::from)?;
Some(CommandExecutionCompletionItem {
command,
cwd,
command_actions,
})
}
async fn resolve_server_request_on_thread_listener(
thread_state: &Arc<Mutex<ThreadState>>,
request_id: RequestId,
@@ -343,12 +454,57 @@ pub(crate) async fn apply_bespoke_event_handling(
EventMsg::Warning(_warning_event) => {}
EventMsg::GuardianAssessment(assessment) => {
if let ApiVersion::V2 = api_version {
let pending_command_execution =
guardian_pending_command_execution_item(&assessment);
let assessment_turn_id = if assessment.turn_id.is_empty() {
event_turn_id.clone()
} else {
assessment.turn_id.clone()
};
if assessment.status
== codex_protocol::protocol::GuardianAssessmentStatus::InProgress
&& let Some(completion_item) = pending_command_execution.as_ref()
{
start_command_execution_item_once(
&conversation_id,
assessment_turn_id.clone(),
assessment.id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
}
let notification = guardian_auto_approval_review_notification(
&conversation_id,
&event_turn_id,
&assessment,
);
outgoing.send_server_notification(notification).await;
if matches!(
assessment.status,
codex_protocol::protocol::GuardianAssessmentStatus::Denied
| codex_protocol::protocol::GuardianAssessmentStatus::Aborted
) && let Some(completion_item) = pending_command_execution
{
complete_command_execution_item(
&conversation_id,
assessment_turn_id,
assessment.id.clone(),
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
}
}
}
EventMsg::ModelReroute(event) => {
@@ -667,6 +823,22 @@ pub(crate) async fn apply_bespoke_event_handling(
Some(completion_item),
),
};
if approval_id.is_none()
&& let Some(completion_item) = completion_item.as_ref()
{
start_command_execution_item_once(
&conversation_id,
event_turn_id.clone(),
call_id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
}
let proposed_execpolicy_amendment_v2 =
proposed_execpolicy_amendment.map(V2ExecPolicyAmendment::from);
let proposed_network_policy_amendments_v2 = proposed_network_policy_amendments
@@ -1607,35 +1779,35 @@ pub(crate) async fn apply_bespoke_event_handling(
let command = shlex_join(&exec_command_begin_event.command);
let cwd = exec_command_begin_event.cwd;
let process_id = exec_command_begin_event.process_id;
{
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.insert(item_id.clone());
.insert(item_id.clone())
};
if first_start {
let item = ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id,
source: exec_command_begin_event.source.into(),
status: CommandExecutionStatus::InProgress,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
let item = ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id,
source: exec_command_begin_event.source.into(),
status: CommandExecutionStatus::InProgress,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => {
let item_id = exec_command_output_delta_event.call_id.clone();
@@ -1992,7 +2164,7 @@ async fn complete_file_change_item(
#[allow(clippy::too_many_arguments)]
async fn complete_command_execution_item(
conversation_id: ThreadId,
conversation_id: &ThreadId,
turn_id: String,
item_id: String,
command: String,
@@ -2002,7 +2174,18 @@ async fn complete_command_execution_item(
command_actions: Vec<V2ParsedCommand>,
status: CommandExecutionStatus,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let mut state = thread_state.lock().await;
let should_emit = state
.turn_summary
.command_execution_started
.remove(&item_id);
drop(state);
if !should_emit {
return;
}
let item = ThreadItem::CommandExecution {
id: item_id,
command,
@@ -2709,7 +2892,7 @@ async fn on_command_execution_request_approval_response(
&& let Some(completion_item) = completion_item
{
complete_command_execution_item(
conversation_id,
&conversation_id,
event_turn_id.clone(),
item_id.clone(),
completion_item.command,
@@ -2719,6 +2902,7 @@ async fn on_command_execution_request_approval_response(
completion_item.command_actions,
status,
&outgoing,
&thread_state,
)
.await;
}
@@ -2880,6 +3064,7 @@ mod tests {
use rmcp::model::Content;
use serde_json::Value as JsonValue;
use serde_json::json;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
@@ -2901,6 +3086,16 @@ mod tests {
}
}
fn command_execution_completion_item(command: &str) -> CommandExecutionCompletionItem {
CommandExecutionCompletionItem {
command: command.to_string(),
cwd: PathBuf::from("/tmp"),
command_actions: vec![V2ParsedCommand::Unknown {
command: command.to_string(),
}],
}
}
#[test]
fn guardian_assessment_started_uses_event_turn_id_fallback() {
let conversation_id = ThreadId::new();
@@ -3015,6 +3210,351 @@ mod tests {
}
}
#[tokio::test]
async fn command_execution_started_helper_emits_once() -> Result<()> {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let completion_item = command_execution_completion_item("printf hi");
let first_start = start_command_execution_item_once(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
assert!(first_start);
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, "turn-1");
assert_eq!(
payload.item,
synthetic_command_execution_item(
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
CommandExecutionStatus::InProgress,
)
);
}
other => bail!("unexpected message: {other:?}"),
}
let second_start = start_command_execution_item_once(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
assert!(!second_start);
assert!(rx.try_recv().is_err(), "duplicate start should not emit");
Ok(())
}
#[tokio::test]
async fn complete_command_execution_item_emits_declined_once_for_pending_command() -> Result<()>
{
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let completion_item = command_execution_completion_item("printf hi");
start_command_execution_item_once(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
let _started = recv_broadcast_message(&mut rx).await?;
complete_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions.clone(),
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
let completed = recv_broadcast_message(&mut rx).await?;
match completed {
OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => {
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected command execution completion");
};
assert_eq!(id, "cmd-1");
assert_eq!(status, CommandExecutionStatus::Declined);
}
other => bail!("unexpected message: {other:?}"),
}
complete_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
assert!(
rx.try_recv().is_err(),
"completion should not emit after the pending item is cleared"
);
Ok(())
}
#[tokio::test]
async fn guardian_command_execution_notifications_wrap_review_lifecycle() -> Result<()> {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let in_progress = GuardianAssessmentEvent {
id: "cmd-guardian".to_string(),
turn_id: "turn-guardian".to_string(),
status: codex_protocol::protocol::GuardianAssessmentStatus::InProgress,
risk_score: None,
risk_level: None,
rationale: None,
action: Some(json!({
"tool": "exec_command",
"command": "/bin/zsh -lc 'rm -f /tmp/file.sqlite'",
"cwd": "/tmp",
})),
};
let completion_item =
guardian_pending_command_execution_item(&in_progress).expect("command-like action");
start_command_execution_item_once(
&conversation_id,
in_progress.turn_id.clone(),
in_progress.id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
outgoing
.send_server_notification(guardian_auto_approval_review_notification(
&conversation_id,
&in_progress.turn_id,
&in_progress,
))
.await;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => {
assert_eq!(payload.turn_id, "turn-guardian");
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected synthetic command execution item");
};
assert_eq!(id, "cmd-guardian");
assert_eq!(status, CommandExecutionStatus::InProgress);
}
other => bail!("unexpected message: {other:?}"),
}
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewStarted(payload),
) => {
assert_eq!(payload.target_item_id, "cmd-guardian");
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::InProgress
);
}
other => bail!("unexpected message: {other:?}"),
}
let denied = GuardianAssessmentEvent {
id: "cmd-guardian".to_string(),
turn_id: "turn-guardian".to_string(),
status: codex_protocol::protocol::GuardianAssessmentStatus::Denied,
risk_score: Some(88),
risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High),
rationale: Some("too risky".to_string()),
action: in_progress.action.clone(),
};
outgoing
.send_server_notification(guardian_auto_approval_review_notification(
&conversation_id,
&denied.turn_id,
&denied,
))
.await;
complete_command_execution_item(
&conversation_id,
denied.turn_id.clone(),
denied.id.clone(),
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
let third = recv_broadcast_message(&mut rx).await?;
match third {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewCompleted(payload),
) => {
assert_eq!(payload.target_item_id, "cmd-guardian");
assert_eq!(payload.review.status, GuardianApprovalReviewStatus::Denied);
}
other => bail!("unexpected message: {other:?}"),
}
let fourth = recv_broadcast_message(&mut rx).await?;
match fourth {
OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => {
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected synthetic command execution completion");
};
assert_eq!(id, "cmd-guardian");
assert_eq!(status, CommandExecutionStatus::Declined);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn guardian_approved_review_does_not_complete_synthetic_command_item() -> Result<()> {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let assessment = GuardianAssessmentEvent {
id: "cmd-approved".to_string(),
turn_id: "turn-approved".to_string(),
status: codex_protocol::protocol::GuardianAssessmentStatus::InProgress,
risk_score: None,
risk_level: None,
rationale: None,
action: Some(json!({
"tool": "shell",
"command": "rm -f /tmp/guardian-approved.sqlite",
"cwd": "/tmp",
})),
};
let completion_item =
guardian_pending_command_execution_item(&assessment).expect("command-like action");
start_command_execution_item_once(
&conversation_id,
assessment.turn_id.clone(),
assessment.id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
let approved = GuardianAssessmentEvent {
id: assessment.id.clone(),
turn_id: assessment.turn_id.clone(),
status: codex_protocol::protocol::GuardianAssessmentStatus::Approved,
risk_score: Some(12),
risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::Low),
rationale: Some("ok".to_string()),
action: assessment.action.clone(),
};
outgoing
.send_server_notification(guardian_auto_approval_review_notification(
&conversation_id,
&approved.turn_id,
&approved,
))
.await;
let _ = recv_broadcast_message(&mut rx).await?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewCompleted(payload),
) => {
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::Approved
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(
rx.try_recv().is_err(),
"approved review should not complete the item"
);
Ok(())
}
#[test]
fn file_change_accept_for_session_maps_to_approved_for_session() {
let (decision, completion_status) =