Compare commits

...

3 Commits

Author SHA1 Message Date
Owen Lin
f80ad0f5da feat(guardian): emit guardian review analytics 2026-04-07 16:15:41 -07:00
Owen Lin
409a3b3b32 feat(guardian): collect review session metrics 2026-04-07 16:13:10 -07:00
Owen Lin
823938ff87 feat(analytics): add guardian review event schema 2026-04-07 16:06:57 -07:00
14 changed files with 1286 additions and 69 deletions

View File

@@ -20,6 +20,7 @@ codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
os_info = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha1 = { workspace = true }
tokio = { workspace = true, features = [
"macros",
@@ -29,4 +30,3 @@ tracing = { workspace = true, features = ["log"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
serde_json = { workspace = true }

View File

@@ -19,6 +19,15 @@ use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::GuardianReviewDecision;
use crate::facts::GuardianReviewEventParams;
use crate::facts::GuardianReviewFailureKind;
use crate::facts::GuardianReviewRiskLevel;
use crate::facts::GuardianReviewSessionKind;
use crate::facts::GuardianReviewTerminalStatus;
use crate::facts::GuardianReviewTrigger;
use crate::facts::GuardianReviewedAction;
use crate::facts::GuardianToolCallCounts;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
@@ -823,6 +832,127 @@ async fn reducer_ingests_plugin_state_changed_fact() {
);
}
#[tokio::test]
async fn reducer_ingests_guardian_review_fact() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let tool_counts = GuardianToolCallCounts {
shell: 1,
mcp: 2,
..Default::default()
};
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
GuardianReviewEventParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
review_id: "review-1".to_string(),
target_item_id: "tool-1".to_string(),
product_client_id: Some("codex_app".to_string()),
trigger: GuardianReviewTrigger::McpToolCall,
retry_reason: Some("requires approval".to_string()),
delegated_review: true,
reviewed_action: GuardianReviewedAction::McpToolCall {
server: "github".to_string(),
tool_name: "create_pr".to_string(),
arguments: Some(json!({"title": "Guardian analytics"})),
connector_id: Some("github".to_string()),
connector_name: Some("GitHub".to_string()),
tool_title: Some("Create PR".to_string()),
},
reviewed_action_truncated: false,
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_kind: Some(GuardianReviewFailureKind::ParseError),
risk_score: Some(100),
risk_level: Some(GuardianReviewRiskLevel::High),
rationale: Some("Automatic approval review failed".to_string()),
guardian_thread_id: Some("guardian-thread-1".to_string()),
guardian_session_kind: Some(GuardianReviewSessionKind::EphemeralForked),
guardian_model: Some("gpt-5.4".to_string()),
guardian_reasoning_effort: Some("low".to_string()),
had_prior_review_context: Some(true),
review_timeout_ms: 90_000,
guardian_tool_call_count: tool_counts.total(),
guardian_tool_call_counts: tool_counts,
guardian_time_to_first_token_ms: Some(123),
guardian_completion_latency_ms: Some(456),
started_at: 1_716_000_000,
completed_at: Some(1_716_000_001),
input_tokens: Some(10),
cached_input_tokens: Some(2),
output_tokens: Some(3),
reasoning_output_tokens: Some(1),
total_tokens: Some(13),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize guardian review event");
assert_eq!(
payload,
json!([{
"event_type": "codex_guardian_review",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"review_id": "review-1",
"target_item_id": "tool-1",
"product_client_id": "codex_app",
"trigger": "mcp_tool_call",
"retry_reason": "requires approval",
"delegated_review": true,
"reviewed_action": {
"type": "mcp_tool_call",
"server": "github",
"tool_name": "create_pr",
"arguments": {"title": "Guardian analytics"},
"connector_id": "github",
"connector_name": "GitHub",
"tool_title": "Create PR"
},
"reviewed_action_truncated": false,
"decision": "denied",
"terminal_status": "failed_closed",
"failure_kind": "parse_error",
"risk_score": 100,
"risk_level": "high",
"rationale": "Automatic approval review failed",
"guardian_thread_id": "guardian-thread-1",
"guardian_session_kind": "ephemeral_forked",
"guardian_model": "gpt-5.4",
"guardian_reasoning_effort": "low",
"had_prior_review_context": true,
"review_timeout_ms": 90000,
"guardian_tool_call_count": 3,
"guardian_tool_call_counts": {
"shell": 1,
"unified_exec": 0,
"mcp": 2,
"dynamic": 0,
"apply_patch": 0,
"web_search": 0,
"image_generation": 0,
"view_image": 0
},
"guardian_time_to_first_token_ms": 123,
"guardian_completion_latency_ms": 456,
"started_at": 1716000000,
"completed_at": 1716000001,
"input_tokens": 10,
"cached_input_tokens": 2,
"output_tokens": 3,
"reasoning_output_tokens": 1,
"total_tokens": 13
}
}])
);
}
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
PluginTelemetryMetadata {
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),

View File

@@ -7,6 +7,7 @@ use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::GuardianReviewEventParams;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
use crate::facts::SkillInvocation;
@@ -151,6 +152,12 @@ impl AnalyticsEventsClient {
));
}
pub fn track_guardian_review(&self, input: GuardianReviewEventParams) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(
Box::new(input),
)));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
if mentions.is_empty() {
return;

View File

@@ -1,4 +1,5 @@
use crate::facts::AppInvocation;
use crate::facts::GuardianReviewEventParams;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::SubAgentThreadStartedInput;
@@ -35,6 +36,7 @@ pub(crate) struct TrackEventsRequest {
pub(crate) enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
ThreadInitialized(ThreadInitializedEvent),
GuardianReview(Box<GuardianReviewEventRequest>),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
@@ -99,6 +101,12 @@ pub(crate) struct ThreadInitializedEvent {
pub(crate) event_params: ThreadInitializedEventParams,
}
#[derive(Serialize)]
pub(crate) struct GuardianReviewEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: GuardianReviewEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexAppMetadata {
pub(crate) connector_id: Option<String>,

View File

@@ -6,6 +6,9 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::approvals::NetworkApprovalProtocol;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::SandboxPermissions;
use codex_protocol::protocol::SkillScope;
use codex_protocol::protocol::SubAgentSource;
use serde::Serialize;
@@ -89,6 +92,7 @@ pub(crate) enum AnalyticsFact {
pub(crate) enum CustomAnalyticsFact {
SubAgentThreadStarted(SubAgentThreadStartedInput),
GuardianReview(Box<GuardianReviewEventParams>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),
@@ -128,3 +132,175 @@ pub(crate) enum PluginState {
Enabled,
Disabled,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewTrigger {
Shell,
UnifiedExec,
Execve,
ApplyPatch,
NetworkAccess,
McpToolCall,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewDecision {
Approved,
Denied,
Aborted,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewTerminalStatus {
Approved,
Denied,
Aborted,
TimedOut,
FailedClosed,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewFailureKind {
Timeout,
Cancelled,
PromptBuildError,
SessionError,
ParseError,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewSessionKind {
TrunkSpawned,
TrunkReused,
EphemeralForked,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum GuardianReviewRiskLevel {
Low,
Medium,
High,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct GuardianToolCallCounts {
pub shell: u64,
pub unified_exec: u64,
pub mcp: u64,
pub dynamic: u64,
pub apply_patch: u64,
pub web_search: u64,
pub image_generation: u64,
pub view_image: u64,
}
impl GuardianToolCallCounts {
pub fn total(&self) -> u64 {
self.shell
+ self.unified_exec
+ self.mcp
+ self.dynamic
+ self.apply_patch
+ self.web_search
+ self.image_generation
+ self.view_image
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum GuardianReviewedAction {
Shell {
command: Vec<String>,
command_display: String,
cwd: String,
sandbox_permissions: SandboxPermissions,
additional_permissions: Option<PermissionProfile>,
justification: Option<String>,
},
UnifiedExec {
command: Vec<String>,
command_display: String,
cwd: String,
sandbox_permissions: SandboxPermissions,
additional_permissions: Option<PermissionProfile>,
justification: Option<String>,
tty: bool,
},
Execve {
source: GuardianCommandSource,
program: String,
argv: Vec<String>,
cwd: String,
additional_permissions: Option<PermissionProfile>,
},
ApplyPatch {
cwd: String,
files: Vec<String>,
patch: Option<String>,
},
NetworkAccess {
target: String,
host: String,
protocol: NetworkApprovalProtocol,
port: u16,
},
McpToolCall {
server: String,
tool_name: String,
arguments: Option<serde_json::Value>,
connector_id: Option<String>,
connector_name: Option<String>,
tool_title: Option<String>,
},
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianCommandSource {
Shell,
UnifiedExec,
}
#[derive(Clone, Debug, Serialize)]
pub struct GuardianReviewEventParams {
pub thread_id: String,
pub turn_id: String,
pub review_id: String,
pub target_item_id: String,
pub product_client_id: Option<String>,
pub trigger: GuardianReviewTrigger,
pub retry_reason: Option<String>,
pub delegated_review: bool,
pub reviewed_action: GuardianReviewedAction,
pub reviewed_action_truncated: bool,
pub decision: GuardianReviewDecision,
pub terminal_status: GuardianReviewTerminalStatus,
pub failure_kind: Option<GuardianReviewFailureKind>,
pub risk_score: Option<u8>,
pub risk_level: Option<GuardianReviewRiskLevel>,
pub rationale: Option<String>,
pub guardian_thread_id: Option<String>,
pub guardian_session_kind: Option<GuardianReviewSessionKind>,
pub guardian_model: Option<String>,
pub guardian_reasoning_effort: Option<String>,
pub had_prior_review_context: Option<bool>,
pub review_timeout_ms: u64,
pub guardian_tool_call_count: u64,
pub guardian_tool_call_counts: GuardianToolCallCounts,
pub guardian_time_to_first_token_ms: Option<u64>,
pub guardian_completion_latency_ms: Option<u64>,
pub started_at: u64,
pub completed_at: Option<u64>,
pub input_tokens: Option<i64>,
pub cached_input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub reasoning_output_tokens: Option<i64>,
pub total_tokens: Option<i64>,
}

View File

@@ -6,6 +6,16 @@ mod reducer;
pub use client::AnalyticsEventsClient;
pub use events::AppServerRpcTransport;
pub use facts::AppInvocation;
pub use facts::GuardianCommandSource;
pub use facts::GuardianReviewDecision;
pub use facts::GuardianReviewEventParams;
pub use facts::GuardianReviewFailureKind;
pub use facts::GuardianReviewRiskLevel;
pub use facts::GuardianReviewSessionKind;
pub use facts::GuardianReviewTerminalStatus;
pub use facts::GuardianReviewTrigger;
pub use facts::GuardianReviewedAction;
pub use facts::GuardianToolCallCounts;
pub use facts::InvocationType;
pub use facts::SkillInvocation;
pub use facts::SubAgentThreadStartedInput;

View File

@@ -5,6 +5,7 @@ use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::GuardianReviewEventRequest;
use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializationMode;
@@ -81,6 +82,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::GuardianReview(input) => {
self.ingest_guardian_review(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -135,6 +139,19 @@ impl AnalyticsReducer {
));
}
fn ingest_guardian_review(
&mut self,
input: crate::facts::GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
out.push(TrackEventRequest::GuardianReview(Box::new(
GuardianReviewEventRequest {
event_type: "codex_guardian_review",
event_params: input,
},
)));
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,

View File

@@ -39,7 +39,7 @@ use crate::codex::TurnContext;
use crate::codex::emit_subagent_session_started;
use crate::config::Config;
use crate::guardian::GuardianApprovalRequest;
use crate::guardian::review_approval_request_with_cancel;
use crate::guardian::review_delegated_approval_request_with_cancel;
use crate::guardian::routes_approval_to_guardian;
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_ACCEPT;
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_ACCEPT_FOR_SESSION;
@@ -739,7 +739,7 @@ fn spawn_guardian_review(
let _ = tx.send(ReviewDecision::Denied);
return;
};
let decision = runtime.block_on(review_approval_request_with_cancel(
let decision = runtime.block_on(review_delegated_approval_request_with_cancel(
&session,
&turn,
request,

View File

@@ -14,7 +14,9 @@
mod approval_request;
mod prompt;
mod review;
mod review_analytics;
mod review_session;
mod review_session_analytics;
use std::time::Duration;
@@ -27,7 +29,9 @@ pub(crate) use approval_request::guardian_approval_request_to_json;
pub(crate) use review::GUARDIAN_REJECTION_MESSAGE;
pub(crate) use review::is_guardian_reviewer_source;
pub(crate) use review::review_approval_request;
#[cfg(test)]
pub(crate) use review::review_approval_request_with_cancel;
pub(crate) use review::review_delegated_approval_request_with_cancel;
pub(crate) use review::routes_approval_to_guardian;
pub(crate) use review_session::GuardianReviewSessionManager;

View File

@@ -1,5 +1,9 @@
use std::sync::Arc;
use std::time::Instant;
use codex_analytics::GuardianReviewDecision;
use codex_analytics::GuardianReviewFailureKind;
use codex_analytics::GuardianReviewTerminalStatus;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
@@ -24,9 +28,15 @@ use super::approval_request::guardian_request_turn_id;
use super::prompt::build_guardian_prompt_items;
use super::prompt::guardian_output_schema;
use super::prompt::parse_guardian_assessment;
use super::review_analytics::GuardianReviewAnalyticsInput;
use super::review_analytics::duration_millis_u64;
use super::review_analytics::guardian_reviewed_action;
use super::review_analytics::now_unix_timestamp_secs;
use super::review_analytics::track_guardian_review;
use super::review_session::GuardianReviewSessionOutcome;
use super::review_session::GuardianReviewSessionParams;
use super::review_session::build_guardian_review_session_config;
use super::review_session_analytics::GuardianReviewSessionReport;
pub(crate) const GUARDIAN_REJECTION_MESSAGE: &str = concat!(
"This action was rejected due to unacceptable risk. ",
@@ -39,9 +49,17 @@ pub(crate) const GUARDIAN_REJECTION_MESSAGE: &str = concat!(
#[derive(Debug)]
pub(super) enum GuardianReviewOutcome {
Completed(anyhow::Result<GuardianAssessment>),
TimedOut,
Aborted,
Completed {
result: anyhow::Result<GuardianAssessment>,
report: Option<GuardianReviewSessionReport>,
failure_kind: Option<GuardianReviewFailureKind>,
},
TimedOut {
report: Option<GuardianReviewSessionReport>,
},
Aborted {
report: Option<GuardianReviewSessionReport>,
},
}
fn guardian_risk_level_str(level: GuardianRiskLevel) -> &'static str {
@@ -78,10 +96,15 @@ async fn run_guardian_review(
request: GuardianApprovalRequest,
retry_reason: Option<String>,
external_cancel: Option<CancellationToken>,
delegated_review: bool,
) -> ReviewDecision {
let review_started_at = Instant::now();
let started_at = now_unix_timestamp_secs();
let assessment_id = guardian_request_id(&request).to_string();
let assessment_turn_id = guardian_request_turn_id(&request, &turn.sub_id).to_string();
let action_summary = guardian_assessment_action(&request);
let (trigger, reviewed_action, reviewed_action_truncated) = guardian_reviewed_action(&request);
let retry_reason_for_analytics = retry_reason.clone();
session
.send_event(
turn.as_ref(),
@@ -101,6 +124,28 @@ async fn run_guardian_review(
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
track_guardian_review(
session.as_ref(),
GuardianReviewAnalyticsInput {
review_id: assessment_id.clone(),
target_item_id: assessment_id.clone(),
turn_id: assessment_turn_id.clone(),
trigger,
retry_reason: retry_reason_for_analytics,
delegated_review,
reviewed_action,
reviewed_action_truncated,
decision: GuardianReviewDecision::Aborted,
terminal_status: GuardianReviewTerminalStatus::Aborted,
failure_kind: Some(GuardianReviewFailureKind::Cancelled),
assessment: None,
report: None,
started_at,
completed_at: Some(now_unix_timestamp_secs()),
completion_latency_ms: Some(duration_millis_u64(review_started_at.elapsed())),
},
)
.await;
session
.send_event(
turn.as_ref(),
@@ -131,26 +176,68 @@ async fn run_guardian_review(
)
.await
}
Err(err) => GuardianReviewOutcome::Completed(Err(err.into())),
Err(err) => GuardianReviewOutcome::Completed {
result: Err(err.into()),
report: None,
failure_kind: Some(GuardianReviewFailureKind::PromptBuildError),
},
};
let assessment = match outcome {
GuardianReviewOutcome::Completed(Ok(assessment)) => assessment,
GuardianReviewOutcome::Completed(Err(err)) => GuardianAssessment {
risk_level: GuardianRiskLevel::High,
risk_score: 100,
rationale: format!("Automatic approval review failed: {err}"),
evidence: vec![],
},
GuardianReviewOutcome::TimedOut => GuardianAssessment {
risk_level: GuardianRiskLevel::High,
risk_score: 100,
rationale:
"Automatic approval review timed out while evaluating the requested approval."
.to_string(),
evidence: vec![],
},
GuardianReviewOutcome::Aborted => {
let (assessment, report, failure_kind) = match outcome {
GuardianReviewOutcome::Completed {
result: Ok(assessment),
report,
failure_kind,
} => (assessment, report, failure_kind),
GuardianReviewOutcome::Completed {
result: Err(err),
report,
failure_kind,
} => (
GuardianAssessment {
risk_level: GuardianRiskLevel::High,
risk_score: 100,
rationale: format!("Automatic approval review failed: {err}"),
evidence: vec![],
},
report,
failure_kind.or(Some(GuardianReviewFailureKind::SessionError)),
),
GuardianReviewOutcome::TimedOut { report } => (
GuardianAssessment {
risk_level: GuardianRiskLevel::High,
risk_score: 100,
rationale:
"Automatic approval review timed out while evaluating the requested approval."
.to_string(),
evidence: vec![],
},
report,
Some(GuardianReviewFailureKind::Timeout),
),
GuardianReviewOutcome::Aborted { report } => {
track_guardian_review(
session.as_ref(),
GuardianReviewAnalyticsInput {
review_id: assessment_id.clone(),
target_item_id: assessment_id.clone(),
turn_id: assessment_turn_id.clone(),
trigger,
retry_reason: retry_reason_for_analytics,
delegated_review,
reviewed_action,
reviewed_action_truncated,
decision: GuardianReviewDecision::Aborted,
terminal_status: GuardianReviewTerminalStatus::Aborted,
failure_kind: Some(GuardianReviewFailureKind::Cancelled),
assessment: None,
report,
started_at,
completed_at: Some(now_unix_timestamp_secs()),
completion_latency_ms: Some(duration_millis_u64(review_started_at.elapsed())),
},
)
.await;
session
.send_event(
turn.as_ref(),
@@ -170,6 +257,42 @@ async fn run_guardian_review(
};
let approved = assessment.risk_score < GUARDIAN_APPROVAL_RISK_THRESHOLD;
let decision = if approved {
GuardianReviewDecision::Approved
} else {
GuardianReviewDecision::Denied
};
let terminal_status = if approved {
GuardianReviewTerminalStatus::Approved
} else if matches!(failure_kind, Some(GuardianReviewFailureKind::Timeout)) {
GuardianReviewTerminalStatus::TimedOut
} else if failure_kind.is_some() {
GuardianReviewTerminalStatus::FailedClosed
} else {
GuardianReviewTerminalStatus::Denied
};
track_guardian_review(
session.as_ref(),
GuardianReviewAnalyticsInput {
review_id: assessment_id.clone(),
target_item_id: assessment_id.clone(),
turn_id: assessment_turn_id.clone(),
trigger,
retry_reason: retry_reason_for_analytics,
delegated_review,
reviewed_action,
reviewed_action_truncated,
decision,
terminal_status,
failure_kind,
assessment: Some(&assessment),
report,
started_at,
completed_at: Some(now_unix_timestamp_secs()),
completion_latency_ms: Some(duration_millis_u64(review_started_at.elapsed())),
},
)
.await;
let verdict = if approved { "approved" } else { "denied" };
let warning = format!(
"Automatic approval review {verdict} (risk: {}): {}",
@@ -222,10 +345,12 @@ pub(crate) async fn review_approval_request(
request,
retry_reason,
/*external_cancel*/ None,
/*delegated_review*/ false,
)
.await
}
#[cfg(test)]
pub(crate) async fn review_approval_request_with_cancel(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
@@ -239,6 +364,25 @@ pub(crate) async fn review_approval_request_with_cancel(
request,
retry_reason,
Some(cancel_token),
/*delegated_review*/ false,
)
.await
}
pub(crate) async fn review_delegated_approval_request_with_cancel(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
request: GuardianApprovalRequest,
retry_reason: Option<String>,
cancel_token: CancellationToken,
) -> ReviewDecision {
run_guardian_review(
Arc::clone(session),
Arc::clone(turn),
request,
retry_reason,
Some(cancel_token),
/*delegated_review*/ true,
)
.await
}
@@ -267,7 +411,13 @@ pub(super) async fn run_guardian_review_session(
let live_network_config = match session.services.network_proxy.as_ref() {
Some(network_proxy) => match network_proxy.proxy().current_cfg().await {
Ok(config) => Some(config),
Err(err) => return GuardianReviewOutcome::Completed(Err(err)),
Err(err) => {
return GuardianReviewOutcome::Completed {
result: Err(err),
report: None,
failure_kind: Some(GuardianReviewFailureKind::SessionError),
};
}
},
None => None,
};
@@ -317,7 +467,13 @@ pub(super) async fn run_guardian_review_session(
);
let guardian_config = match guardian_config {
Ok(config) => config,
Err(err) => return GuardianReviewOutcome::Completed(Err(err)),
Err(err) => {
return GuardianReviewOutcome::Completed {
result: Err(err),
report: None,
failure_kind: Some(GuardianReviewFailureKind::SessionError),
};
}
};
match session
@@ -336,15 +492,32 @@ pub(super) async fn run_guardian_review_session(
})
.await
{
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => {
GuardianReviewOutcome::Completed(parse_guardian_assessment(
last_agent_message.as_deref(),
))
GuardianReviewSessionOutcome::Completed { result, report } => match result {
Ok(last_agent_message) => {
match parse_guardian_assessment(last_agent_message.as_deref()) {
Ok(assessment) => GuardianReviewOutcome::Completed {
result: Ok(assessment),
report,
failure_kind: None,
},
Err(err) => GuardianReviewOutcome::Completed {
result: Err(err),
report,
failure_kind: Some(GuardianReviewFailureKind::ParseError),
},
}
}
Err(err) => GuardianReviewOutcome::Completed {
result: Err(err),
report,
failure_kind: Some(GuardianReviewFailureKind::SessionError),
},
},
GuardianReviewSessionOutcome::TimedOut { report } => {
GuardianReviewOutcome::TimedOut { report }
}
GuardianReviewSessionOutcome::Completed(Err(err)) => {
GuardianReviewOutcome::Completed(Err(err))
GuardianReviewSessionOutcome::Aborted { report } => {
GuardianReviewOutcome::Aborted { report }
}
GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut,
GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted,
}
}

View File

@@ -0,0 +1,334 @@
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use codex_analytics::GuardianCommandSource as AnalyticsGuardianCommandSource;
use codex_analytics::GuardianReviewDecision;
use codex_analytics::GuardianReviewEventParams;
use codex_analytics::GuardianReviewFailureKind;
use codex_analytics::GuardianReviewRiskLevel as AnalyticsGuardianRiskLevel;
use codex_analytics::GuardianReviewTerminalStatus;
use codex_analytics::GuardianReviewTrigger;
use codex_analytics::GuardianReviewedAction;
use codex_features::Feature;
use codex_protocol::protocol::GuardianCommandSource;
use codex_protocol::protocol::GuardianRiskLevel;
use codex_shell_command::parse_command::shlex_join;
use crate::codex::Session;
use super::GUARDIAN_MAX_ACTION_STRING_TOKENS;
use super::GUARDIAN_REVIEW_TIMEOUT;
use super::GuardianApprovalRequest;
use super::GuardianAssessment;
use super::prompt::guardian_truncate_text;
use super::review_session_analytics::GuardianReviewSessionReport;
pub(super) struct GuardianReviewAnalyticsInput<'a> {
pub(super) review_id: String,
pub(super) target_item_id: String,
pub(super) turn_id: String,
pub(super) trigger: GuardianReviewTrigger,
pub(super) retry_reason: Option<String>,
pub(super) delegated_review: bool,
pub(super) reviewed_action: GuardianReviewedAction,
pub(super) reviewed_action_truncated: bool,
pub(super) decision: GuardianReviewDecision,
pub(super) terminal_status: GuardianReviewTerminalStatus,
pub(super) failure_kind: Option<GuardianReviewFailureKind>,
pub(super) assessment: Option<&'a GuardianAssessment>,
pub(super) report: Option<GuardianReviewSessionReport>,
pub(super) started_at: u64,
pub(super) completed_at: Option<u64>,
pub(super) completion_latency_ms: Option<u64>,
}
pub(super) async fn track_guardian_review(
session: &Session,
input: GuardianReviewAnalyticsInput<'_>,
) {
if !session.enabled(Feature::GeneralAnalytics) {
return;
}
let client_metadata = session.app_server_client_metadata().await;
let GuardianReviewAnalyticsInput {
review_id,
target_item_id,
turn_id,
trigger,
retry_reason,
delegated_review,
reviewed_action,
reviewed_action_truncated,
decision,
terminal_status,
failure_kind,
assessment,
report,
started_at,
completed_at,
completion_latency_ms,
} = input;
let (risk_score, risk_level, rationale) = assessment.map_or((None, None, None), |assessment| {
(
Some(assessment.risk_score),
Some(analytics_risk_level(assessment.risk_level)),
Some(assessment.rationale.clone()),
)
});
let (
guardian_thread_id,
guardian_session_kind,
guardian_model,
guardian_reasoning_effort,
had_prior_review_context,
guardian_tool_call_counts,
guardian_time_to_first_token_ms,
token_usage,
) = match report {
Some(report) => (
Some(report.guardian_thread_id),
Some(report.session_kind),
report.guardian_model,
report.guardian_reasoning_effort,
Some(report.had_prior_review_context),
report.tool_call_counts,
report.time_to_first_token_ms,
report.token_usage,
),
None => (None, None, None, None, None, Default::default(), None, None),
};
let guardian_tool_call_count = guardian_tool_call_counts.total();
session
.services
.analytics_events_client
.track_guardian_review(GuardianReviewEventParams {
thread_id: session.conversation_id.to_string(),
turn_id,
review_id,
target_item_id,
product_client_id: client_metadata.client_name,
trigger,
retry_reason,
delegated_review,
reviewed_action,
reviewed_action_truncated,
decision,
terminal_status,
failure_kind,
risk_score,
risk_level,
rationale,
guardian_thread_id,
guardian_session_kind,
guardian_model,
guardian_reasoning_effort,
had_prior_review_context,
review_timeout_ms: duration_millis_u64(GUARDIAN_REVIEW_TIMEOUT),
guardian_tool_call_count,
guardian_tool_call_counts,
guardian_time_to_first_token_ms,
guardian_completion_latency_ms: completion_latency_ms,
started_at,
completed_at,
input_tokens: token_usage.as_ref().map(|usage| usage.input_tokens),
cached_input_tokens: token_usage.as_ref().map(|usage| usage.cached_input_tokens),
output_tokens: token_usage.as_ref().map(|usage| usage.output_tokens),
reasoning_output_tokens: token_usage
.as_ref()
.map(|usage| usage.reasoning_output_tokens),
total_tokens: token_usage.as_ref().map(|usage| usage.total_tokens),
});
}
pub(super) fn guardian_reviewed_action(
request: &GuardianApprovalRequest,
) -> (GuardianReviewTrigger, GuardianReviewedAction, bool) {
match request {
GuardianApprovalRequest::Shell {
command,
cwd,
sandbox_permissions,
additional_permissions,
justification,
..
} => (
GuardianReviewTrigger::Shell,
GuardianReviewedAction::Shell {
command: command.clone(),
command_display: shlex_join(command),
cwd: cwd.display().to_string(),
sandbox_permissions: *sandbox_permissions,
additional_permissions: additional_permissions.clone(),
justification: justification.clone(),
},
false,
),
GuardianApprovalRequest::ExecCommand {
command,
cwd,
sandbox_permissions,
additional_permissions,
justification,
tty,
..
} => (
GuardianReviewTrigger::UnifiedExec,
GuardianReviewedAction::UnifiedExec {
command: command.clone(),
command_display: shlex_join(command),
cwd: cwd.display().to_string(),
sandbox_permissions: *sandbox_permissions,
additional_permissions: additional_permissions.clone(),
justification: justification.clone(),
tty: *tty,
},
false,
),
#[cfg(unix)]
GuardianApprovalRequest::Execve {
source,
program,
argv,
cwd,
additional_permissions,
..
} => (
GuardianReviewTrigger::Execve,
GuardianReviewedAction::Execve {
source: analytics_command_source(*source),
program: program.clone(),
argv: argv.clone(),
cwd: cwd.display().to_string(),
additional_permissions: additional_permissions.clone(),
},
false,
),
GuardianApprovalRequest::ApplyPatch {
cwd, files, patch, ..
} => {
let (patch, truncated) = truncate_analytics_string(patch);
(
GuardianReviewTrigger::ApplyPatch,
GuardianReviewedAction::ApplyPatch {
cwd: cwd.display().to_string(),
files: files
.iter()
.map(|path| path.to_path_buf().display().to_string())
.collect(),
patch: Some(patch),
},
truncated,
)
}
GuardianApprovalRequest::NetworkAccess {
target,
host,
protocol,
port,
..
} => (
GuardianReviewTrigger::NetworkAccess,
GuardianReviewedAction::NetworkAccess {
target: target.clone(),
host: host.clone(),
protocol: *protocol,
port: *port,
},
false,
),
GuardianApprovalRequest::McpToolCall {
server,
tool_name,
arguments,
connector_id,
connector_name,
tool_title,
..
} => {
let (arguments, truncated) = arguments
.clone()
.map(truncate_analytics_json_value)
.map_or((None, false), |(value, truncated)| (Some(value), truncated));
(
GuardianReviewTrigger::McpToolCall,
GuardianReviewedAction::McpToolCall {
server: server.clone(),
tool_name: tool_name.clone(),
arguments,
connector_id: connector_id.clone(),
connector_name: connector_name.clone(),
tool_title: tool_title.clone(),
},
truncated,
)
}
}
}
pub(super) fn now_unix_timestamp_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
pub(super) fn duration_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
fn analytics_risk_level(risk_level: GuardianRiskLevel) -> AnalyticsGuardianRiskLevel {
match risk_level {
GuardianRiskLevel::Low => AnalyticsGuardianRiskLevel::Low,
GuardianRiskLevel::Medium => AnalyticsGuardianRiskLevel::Medium,
GuardianRiskLevel::High => AnalyticsGuardianRiskLevel::High,
}
}
fn analytics_command_source(source: GuardianCommandSource) -> AnalyticsGuardianCommandSource {
match source {
GuardianCommandSource::Shell => AnalyticsGuardianCommandSource::Shell,
GuardianCommandSource::UnifiedExec => AnalyticsGuardianCommandSource::UnifiedExec,
}
}
fn truncate_analytics_json_value(value: serde_json::Value) -> (serde_json::Value, bool) {
match value {
serde_json::Value::String(text) => {
let (text, truncated) = truncate_analytics_string(&text);
(serde_json::Value::String(text), truncated)
}
serde_json::Value::Array(values) => {
let mut truncated = false;
let values = values
.into_iter()
.map(|value| {
let (value, value_truncated) = truncate_analytics_json_value(value);
truncated |= value_truncated;
value
})
.collect();
(serde_json::Value::Array(values), truncated)
}
serde_json::Value::Object(values) => {
let mut truncated = false;
let values = values
.into_iter()
.map(|(key, value)| {
let (value, value_truncated) = truncate_analytics_json_value(value);
truncated |= value_truncated;
(key, value)
})
.collect();
(serde_json::Value::Object(values), truncated)
}
value => (value, false),
}
}
fn truncate_analytics_string(text: &str) -> (String, bool) {
let truncated = guardian_truncate_text(text, GUARDIAN_MAX_ACTION_STRING_TOKENS);
let was_truncated = truncated != text;
(truncated, was_truncated)
}

View File

@@ -5,8 +5,11 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use anyhow::anyhow;
use codex_analytics::GuardianReviewSessionKind;
use codex_analytics::GuardianToolCallCounts;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::DeveloperInstructions;
@@ -19,6 +22,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use serde_json::Value;
use tokio::sync::Mutex;
@@ -42,6 +46,10 @@ use codex_model_provider_info::ModelProviderInfo;
use super::GUARDIAN_REVIEW_TIMEOUT;
use super::GUARDIAN_REVIEWER_NAME;
use super::prompt::guardian_policy_prompt;
use super::review_session_analytics::GuardianReviewSessionReport;
use super::review_session_analytics::duration_millis_u64;
use super::review_session_analytics::guardian_event_records_time_to_first_token;
use super::review_session_analytics::record_guardian_tool_call_count;
const GUARDIAN_INTERRUPT_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!(
@@ -53,9 +61,16 @@ const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!(
#[derive(Debug)]
pub(crate) enum GuardianReviewSessionOutcome {
Completed(anyhow::Result<Option<String>>),
TimedOut,
Aborted,
Completed {
result: anyhow::Result<Option<String>>,
report: Option<GuardianReviewSessionReport>,
},
TimedOut {
report: Option<GuardianReviewSessionReport>,
},
Aborted {
report: Option<GuardianReviewSessionReport>,
},
}
pub(crate) struct GuardianReviewSessionParams {
@@ -248,9 +263,11 @@ impl GuardianReviewSessionManager {
&self,
params: GuardianReviewSessionParams,
) -> GuardianReviewSessionOutcome {
let request_started_at = Instant::now();
let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT;
let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(&params.spawn_config);
let mut stale_trunk_to_shutdown = None;
let mut trunk_session_kind = GuardianReviewSessionKind::TrunkReused;
let trunk_candidate = match run_before_review_deadline(
deadline,
params.external_cancel.as_ref(),
@@ -284,11 +301,15 @@ impl GuardianReviewSessionManager {
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => {
return GuardianReviewSessionOutcome::Completed(Err(err));
return GuardianReviewSessionOutcome::Completed {
result: Err(err),
report: None,
};
}
Err(outcome) => return outcome,
};
state.trunk = Some(Arc::clone(&review_session));
trunk_session_kind = GuardianReviewSessionKind::TrunkSpawned;
}
state.trunk.as_ref().cloned()
@@ -301,9 +322,12 @@ impl GuardianReviewSessionManager {
}
let Some(trunk) = trunk_candidate else {
return GuardianReviewSessionOutcome::Completed(Err(anyhow!(
"guardian review session was not available after spawn"
)));
return GuardianReviewSessionOutcome::Completed {
result: Err(anyhow!(
"guardian review session was not available after spawn"
)),
report: None,
};
};
if trunk.reuse_key != next_reuse_key {
@@ -313,6 +337,7 @@ impl GuardianReviewSessionManager {
next_reuse_key,
deadline,
/*initial_history*/ None,
request_started_at,
)
.await;
}
@@ -322,14 +347,34 @@ impl GuardianReviewSessionManager {
Err(_) => {
let initial_history = trunk.fork_initial_history().await;
return self
.run_ephemeral_review(params, next_reuse_key, deadline, initial_history)
.run_ephemeral_review(
params,
next_reuse_key,
deadline,
initial_history,
request_started_at,
)
.await;
}
};
let (outcome, keep_review_session) =
run_review_on_session(trunk.as_ref(), &params, deadline).await;
if keep_review_session && matches!(outcome, GuardianReviewSessionOutcome::Completed(_)) {
let (outcome, keep_review_session) = run_review_on_session(
trunk.as_ref(),
&params,
deadline,
trunk_session_kind,
request_started_at,
)
.await;
if keep_review_session
&& matches!(
outcome,
GuardianReviewSessionOutcome::Completed {
result: _,
report: _
}
)
{
trunk.refresh_last_committed_rollout_items().await;
}
drop(trunk_guard);
@@ -420,6 +465,7 @@ impl GuardianReviewSessionManager {
reuse_key: GuardianReviewSessionReuseKey,
deadline: tokio::time::Instant,
initial_history: Option<InitialHistory>,
request_started_at: Instant,
) -> GuardianReviewSessionOutcome {
let spawn_cancel_token = CancellationToken::new();
let mut fork_config = params.spawn_config.clone();
@@ -439,7 +485,12 @@ impl GuardianReviewSessionManager {
.await
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => return GuardianReviewSessionOutcome::Completed(Err(err)),
Ok(Err(err)) => {
return GuardianReviewSessionOutcome::Completed {
result: Err(err),
report: None,
};
}
Err(outcome) => return outcome,
};
self.register_active_ephemeral(Arc::clone(&review_session))
@@ -447,7 +498,14 @@ impl GuardianReviewSessionManager {
let mut cleanup =
EphemeralReviewCleanup::new(Arc::clone(&self.state), Arc::clone(&review_session));
let (outcome, _) = run_review_on_session(review_session.as_ref(), &params, deadline).await;
let (outcome, _) = run_review_on_session(
review_session.as_ref(),
&params,
deadline,
GuardianReviewSessionKind::EphemeralForked,
request_started_at,
)
.await;
if let Some(review_session) = self.take_active_ephemeral(&review_session).await {
cleanup.disarm();
review_session.shutdown_in_background();
@@ -490,8 +548,11 @@ async fn run_review_on_session(
review_session: &GuardianReviewSession,
params: &GuardianReviewSessionParams,
deadline: tokio::time::Instant,
session_kind: GuardianReviewSessionKind,
request_started_at: Instant,
) -> (GuardianReviewSessionOutcome, bool) {
if review_session.has_prior_review.load(Ordering::Relaxed) {
let had_prior_review_context = review_session.has_prior_review.load(Ordering::Relaxed);
if had_prior_review_context {
append_guardian_followup_reminder(review_session).await;
}
@@ -534,14 +595,37 @@ async fn run_review_on_session(
};
if let Err(err) = submit_result {
return (
GuardianReviewSessionOutcome::Completed(Err(err.into())),
GuardianReviewSessionOutcome::Completed {
result: Err(err.into()),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
GuardianToolCallCounts::default(),
/*time_to_first_token_ms*/ None,
/*token_usage*/ None,
)),
},
false,
);
}
let outcome =
wait_for_guardian_review(review_session, deadline, params.external_cancel.as_ref()).await;
if matches!(outcome.0, GuardianReviewSessionOutcome::Completed(_)) {
let outcome = wait_for_guardian_review(
review_session,
deadline,
params.external_cancel.as_ref(),
session_kind,
had_prior_review_context,
request_started_at,
)
.await;
if matches!(
outcome.0,
GuardianReviewSessionOutcome::Completed {
result: _,
report: _
}
) {
review_session
.has_prior_review
.store(true, Ordering::Relaxed);
@@ -575,16 +659,34 @@ async fn wait_for_guardian_review(
review_session: &GuardianReviewSession,
deadline: tokio::time::Instant,
external_cancel: Option<&CancellationToken>,
session_kind: GuardianReviewSessionKind,
had_prior_review_context: bool,
request_started_at: Instant,
) -> (GuardianReviewSessionOutcome, bool) {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
let mut last_error_message: Option<String> = None;
let mut tool_call_counts = GuardianToolCallCounts::default();
let mut time_to_first_token_ms = None;
let mut token_usage = None;
loop {
tokio::select! {
_ = &mut timeout => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::TimedOut, keep_review_session);
return (
GuardianReviewSessionOutcome::TimedOut {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
keep_review_session,
);
}
_ = async {
if let Some(cancel_token) = external_cancel {
@@ -594,22 +696,69 @@ async fn wait_for_guardian_review(
}
} => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::Aborted, keep_review_session);
return (
GuardianReviewSessionOutcome::Aborted {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
keep_review_session,
);
}
event = review_session.codex.next_event() => {
match event {
Ok(event) => match event.msg {
Ok(event) => {
if time_to_first_token_ms.is_none()
&& guardian_event_records_time_to_first_token(&event.msg)
{
time_to_first_token_ms = Some(duration_millis_u64(
request_started_at.elapsed(),
));
}
record_guardian_tool_call_count(&event.msg, &mut tool_call_counts);
if let EventMsg::TokenCount(token_count) = &event.msg
&& let Some(info) = token_count.info.as_ref()
{
token_usage = Some(info.last_token_usage.clone());
}
match event.msg {
EventMsg::TurnComplete(turn_complete) => {
if turn_complete.last_agent_message.is_none()
&& let Some(error_message) = last_error_message
{
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
GuardianReviewSessionOutcome::Completed {
result: Err(anyhow!(error_message)),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
return (
GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)),
GuardianReviewSessionOutcome::Completed {
result: Ok(turn_complete.last_agent_message),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
@@ -617,13 +766,36 @@ async fn wait_for_guardian_review(
last_error_message = Some(error.message);
}
EventMsg::TurnAborted(_) => {
return (GuardianReviewSessionOutcome::Aborted, true);
return (
GuardianReviewSessionOutcome::Aborted {
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
true,
);
}
_ => {}
},
}
}
Err(err) => {
return (
GuardianReviewSessionOutcome::Completed(Err(err.into())),
GuardianReviewSessionOutcome::Completed {
result: Err(err.into()),
report: Some(guardian_review_session_report(
review_session,
session_kind,
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
)),
},
false,
);
}
@@ -633,6 +805,29 @@ async fn wait_for_guardian_review(
}
}
fn guardian_review_session_report(
review_session: &GuardianReviewSession,
session_kind: GuardianReviewSessionKind,
had_prior_review_context: bool,
tool_call_counts: GuardianToolCallCounts,
time_to_first_token_ms: Option<u64>,
token_usage: Option<TokenUsage>,
) -> GuardianReviewSessionReport {
GuardianReviewSessionReport {
guardian_thread_id: review_session.codex.session.conversation_id.to_string(),
session_kind,
guardian_model: review_session.reuse_key.model.clone(),
guardian_reasoning_effort: review_session
.reuse_key
.model_reasoning_effort
.map(|effort| effort.to_string()),
had_prior_review_context,
tool_call_counts,
time_to_first_token_ms,
token_usage,
}
}
pub(crate) fn build_guardian_review_session_config(
parent_config: &Config,
live_network_config: Option<codex_network_proxy::NetworkProxyConfig>,
@@ -694,7 +889,9 @@ async fn run_before_review_deadline<T>(
future: impl Future<Output = T>,
) -> Result<T, GuardianReviewSessionOutcome> {
tokio::select! {
_ = tokio::time::sleep_until(deadline) => Err(GuardianReviewSessionOutcome::TimedOut),
_ = tokio::time::sleep_until(deadline) => Err(GuardianReviewSessionOutcome::TimedOut {
report: None,
}),
result = future => Ok(result),
_ = async {
if let Some(cancel_token) = external_cancel {
@@ -702,7 +899,9 @@ async fn run_before_review_deadline<T>(
} else {
std::future::pending::<()>().await;
}
} => Err(GuardianReviewSessionOutcome::Aborted),
} => Err(GuardianReviewSessionOutcome::Aborted {
report: None,
}),
}
}
@@ -788,7 +987,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::TimedOut)
Err(GuardianReviewSessionOutcome::TimedOut { report: None })
));
}
@@ -810,7 +1009,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::Aborted)
Err(GuardianReviewSessionOutcome::Aborted { report: None })
));
}
@@ -830,7 +1029,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::TimedOut)
Err(GuardianReviewSessionOutcome::TimedOut { report: None })
));
assert!(cancel_token.is_cancelled());
}
@@ -855,7 +1054,7 @@ mod tests {
assert!(matches!(
outcome,
Err(GuardianReviewSessionOutcome::Aborted)
Err(GuardianReviewSessionOutcome::Aborted { report: None })
));
assert!(cancel_token.is_cancelled());
}

View File

@@ -0,0 +1,147 @@
use std::time::Duration;
use codex_analytics::GuardianReviewSessionKind;
use codex_analytics::GuardianToolCallCounts;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::TokenUsage;
#[derive(Debug, Clone)]
pub(crate) struct GuardianReviewSessionReport {
pub(crate) guardian_thread_id: String,
pub(crate) session_kind: GuardianReviewSessionKind,
pub(crate) guardian_model: Option<String>,
pub(crate) guardian_reasoning_effort: Option<String>,
pub(crate) had_prior_review_context: bool,
pub(crate) tool_call_counts: GuardianToolCallCounts,
pub(crate) time_to_first_token_ms: Option<u64>,
pub(crate) token_usage: Option<TokenUsage>,
}
pub(super) fn duration_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
pub(super) fn guardian_event_records_time_to_first_token(event: &EventMsg) -> bool {
matches!(
event,
EventMsg::AgentMessage(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::ImageGenerationBegin(_)
| EventMsg::ViewImageToolCall(_)
)
}
pub(super) fn record_guardian_tool_call_count(
event: &EventMsg,
counts: &mut GuardianToolCallCounts,
) {
match event {
EventMsg::ExecCommandBegin(begin) => match begin.source {
ExecCommandSource::Agent | ExecCommandSource::UserShell => {
counts.shell += 1;
}
ExecCommandSource::UnifiedExecStartup | ExecCommandSource::UnifiedExecInteraction => {
counts.unified_exec += 1;
}
},
EventMsg::McpToolCallBegin(_) => {
counts.mcp += 1;
}
EventMsg::DynamicToolCallRequest(_) => {
counts.dynamic += 1;
}
EventMsg::PatchApplyBegin(_) => {
counts.apply_patch += 1;
}
EventMsg::WebSearchBegin(_) => {
counts.web_search += 1;
}
EventMsg::ImageGenerationBegin(_) => {
counts.image_generation += 1;
}
EventMsg::ViewImageToolCall(_) => {
counts.view_image += 1;
}
EventMsg::Error(_)
| EventMsg::Warning(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationRealtime(_)
| EventMsg::RealtimeConversationClosed(_)
| EventMsg::ModelReroute(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::TokenCount(_)
| EventMsg::AgentMessage(_)
| EventMsg::UserMessage(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ImageGenerationEnd(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::TerminalInteraction(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::RequestPermissions(_)
| EventMsg::RequestUserInput(_)
| EventMsg::DynamicToolCallResponse(_)
| EventMsg::ElicitationRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::GuardianAssessment(_)
| EventMsg::DeprecationNotice(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::UndoStarted(_)
| EventMsg::UndoCompleted(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListSkillsResponse(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::ShutdownComplete
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::HookStarted(_)
| EventMsg::HookCompleted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabCloseEnd(_)
| EventMsg::CollabResumeBegin(_)
| EventMsg::CollabResumeEnd(_) => {}
}
}

View File

@@ -603,7 +603,11 @@ async fn guardian_review_request_layout_matches_model_visible_request_snapshot()
/*external_cancel*/ None,
)
.await;
let GuardianReviewOutcome::Completed(Ok(assessment)) = outcome else {
let GuardianReviewOutcome::Completed {
result: Ok(assessment),
..
} = outcome
else {
panic!("expected guardian assessment");
};
assert_eq!(assessment.risk_score, 35);
@@ -707,10 +711,18 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
)
.await;
let GuardianReviewOutcome::Completed(Ok(first_assessment)) = first_outcome else {
let GuardianReviewOutcome::Completed {
result: Ok(first_assessment),
..
} = first_outcome
else {
panic!("expected first guardian assessment");
};
let GuardianReviewOutcome::Completed(Ok(second_assessment)) = second_outcome else {
let GuardianReviewOutcome::Completed {
result: Ok(second_assessment),
..
} = second_outcome
else {
panic!("expected second guardian assessment");
};
assert_eq!(first_assessment.risk_score, 5);