diff --git a/codex-rs/core/src/guardian/review.rs b/codex-rs/core/src/guardian/review.rs index 215ab2b159..60dbde6ecd 100644 --- a/codex-rs/core/src/guardian/review.rs +++ b/codex-rs/core/src/guardian/review.rs @@ -37,6 +37,7 @@ use super::approval_request::guardian_request_target_item_id; use super::approval_request::guardian_request_turn_id; use super::prompt::guardian_output_schema; use super::prompt::parse_guardian_assessment; +use super::review_session::GuardianReviewSessionMetadata; use super::review_session::GuardianReviewSessionOutcome; use super::review_session::GuardianReviewSessionParams; use super::review_session::build_guardian_review_session_config; @@ -86,10 +87,13 @@ pub(crate) fn guardian_timeout_message() -> String { #[derive(Debug)] pub(super) enum GuardianReviewOutcome { - Completed(anyhow::Result), - Failed(GuardianReviewFailure), - TimedOut, - Aborted, + Completed( + anyhow::Result, + Option, + ), + Failed(GuardianReviewFailure, Option), + TimedOut(Option), + Aborted(Option), } #[derive(Debug)] @@ -220,6 +224,24 @@ struct GuardianReviewMetadataFields { time_to_first_token_ms: Option, } +fn guardian_review_metadata_fields( + metadata: Option, +) -> GuardianReviewMetadataFields { + match metadata { + Some(metadata) => GuardianReviewMetadataFields { + guardian_thread_id: Some(metadata.guardian_thread_id), + guardian_session_kind: Some(metadata.guardian_session_kind), + guardian_model: Some(metadata.guardian_model), + guardian_reasoning_effort: metadata.guardian_reasoning_effort, + had_prior_review_context: Some(metadata.had_prior_review_context), + reviewed_action_truncated: false, + token_usage: metadata.token_usage, + time_to_first_token_ms: None, + }, + None => GuardianReviewMetadataFields::default(), + } +} + impl GuardianReviewAnalyticsContext { fn track( &self, @@ -404,8 +426,8 @@ async fn run_guardian_review( let completed_at = now_unix_seconds(); let assessment = match outcome { - GuardianReviewOutcome::Completed(Ok(assessment)) => { - let metadata = GuardianReviewMetadataFields::default(); + GuardianReviewOutcome::Completed(Ok(assessment), metadata) => { + let metadata = guardian_review_metadata_fields(metadata); analytics_context.track( session.as_ref(), turn.as_ref(), @@ -440,8 +462,8 @@ async fn run_guardian_review( ); assessment } - GuardianReviewOutcome::Completed(Err(err)) => { - let metadata = GuardianReviewMetadataFields::default(); + GuardianReviewOutcome::Completed(Err(err), metadata) => { + let metadata = guardian_review_metadata_fields(metadata); let rationale = format!("Automatic approval review failed: {err}"); analytics_context.track( session.as_ref(), @@ -471,8 +493,8 @@ async fn run_guardian_review( rationale, } } - GuardianReviewOutcome::Failed(failure) => { - let metadata = GuardianReviewMetadataFields::default(); + GuardianReviewOutcome::Failed(failure, metadata) => { + let metadata = guardian_review_metadata_fields(metadata); let rationale = format!("Automatic approval review failed: {}", failure.error()); analytics_context.track( session.as_ref(), @@ -502,8 +524,8 @@ async fn run_guardian_review( rationale, } } - GuardianReviewOutcome::TimedOut => { - let metadata = GuardianReviewMetadataFields::default(); + GuardianReviewOutcome::TimedOut(metadata) => { + let metadata = guardian_review_metadata_fields(metadata); let rationale = "Automatic approval review timed out while evaluating the requested approval." .to_string(); @@ -554,8 +576,8 @@ async fn run_guardian_review( .await; return ReviewDecision::TimedOut; } - GuardianReviewOutcome::Aborted => { - let metadata = GuardianReviewMetadataFields::default(); + GuardianReviewOutcome::Aborted(metadata) => { + let metadata = guardian_review_metadata_fields(metadata); analytics_context.track( session.as_ref(), turn.as_ref(), @@ -748,7 +770,10 @@ pub(super) async fn run_guardian_review_session( Some(network_proxy) => match network_proxy.proxy().current_cfg().await { Ok(config) => Some(config), Err(err) => { - return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err)); + return GuardianReviewOutcome::Failed( + GuardianReviewFailure::PromptBuild(err), + None, + ); } }, None => None, @@ -799,10 +824,12 @@ pub(super) async fn run_guardian_review_session( ); let guardian_config = match guardian_config { Ok(config) => config, - Err(err) => return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err)), + Err(err) => { + return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err), None); + } }; - match session + let (session_outcome, session_metadata) = session .guardian_review_session .run_review(GuardianReviewSessionParams { parent_session: Arc::clone(&session), @@ -817,25 +844,37 @@ pub(super) async fn run_guardian_review_session( personality: turn.personality, external_cancel, }) - .await - { + .await; + + match session_outcome { GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => match last_agent_message { Some(last_agent_message) => { match parse_guardian_assessment(Some(&last_agent_message)) { - Ok(assessment) => GuardianReviewOutcome::Completed(Ok(assessment)), - Err(err) => GuardianReviewOutcome::Failed(GuardianReviewFailure::Parse(err)), + Ok(assessment) => { + GuardianReviewOutcome::Completed(Ok(assessment), session_metadata) + } + Err(err) => GuardianReviewOutcome::Failed( + GuardianReviewFailure::Parse(err), + session_metadata, + ), } } - None => GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(anyhow::anyhow!( - "guardian review completed without an assessment payload" - ))), + None => GuardianReviewOutcome::Failed( + GuardianReviewFailure::Session(anyhow::anyhow!( + "guardian review completed without an assessment payload" + )), + session_metadata, + ), }, GuardianReviewSessionOutcome::Completed(Err(err)) => { - GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(err)) + GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(err), session_metadata) } - GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut, - GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted, + GuardianReviewSessionOutcome::PromptBuildFailed(err) => { + GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err), session_metadata) + } + GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut(session_metadata), + GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted(session_metadata), } } diff --git a/codex-rs/core/src/guardian/review_session.rs b/codex-rs/core/src/guardian/review_session.rs index 08825cfde2..0384e27f12 100644 --- a/codex-rs/core/src/guardian/review_session.rs +++ b/codex-rs/core/src/guardian/review_session.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; +use codex_analytics::GuardianReviewSessionKind; use codex_protocol::config_types::Personality; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::DeveloperInstructions; @@ -17,6 +18,7 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::TokenUsage; use serde_json::Value; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; @@ -58,10 +60,21 @@ const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!( #[derive(Debug)] pub(crate) enum GuardianReviewSessionOutcome { Completed(anyhow::Result>), + PromptBuildFailed(anyhow::Error), TimedOut, Aborted, } +#[derive(Debug, Clone)] +pub(crate) struct GuardianReviewSessionMetadata { + pub(crate) guardian_thread_id: String, + pub(crate) guardian_session_kind: GuardianReviewSessionKind, + pub(crate) guardian_model: String, + pub(crate) guardian_reasoning_effort: Option, + pub(crate) had_prior_review_context: bool, + pub(crate) token_usage: Option, +} + pub(crate) struct GuardianReviewSessionParams { pub(crate) parent_session: Arc, pub(crate) parent_turn: Arc, @@ -101,6 +114,21 @@ struct GuardianReviewState { last_committed_fork_snapshot: Option, } +fn had_prior_review_context(prompt_mode: &GuardianPromptMode) -> bool { + matches!(prompt_mode, GuardianPromptMode::Delta { .. }) +} + +fn token_usage_delta(start: &TokenUsage, end: &TokenUsage) -> TokenUsage { + TokenUsage { + input_tokens: (end.input_tokens - start.input_tokens).max(0), + cached_input_tokens: (end.cached_input_tokens - start.cached_input_tokens).max(0), + output_tokens: (end.output_tokens - start.output_tokens).max(0), + reasoning_output_tokens: (end.reasoning_output_tokens - start.reasoning_output_tokens) + .max(0), + total_tokens: (end.total_tokens - start.total_tokens).max(0), + } +} + struct EphemeralReviewCleanup { state: Arc>, review_session: Option>, @@ -267,10 +295,14 @@ impl GuardianReviewSessionManager { pub(crate) async fn run_review( &self, params: GuardianReviewSessionParams, - ) -> GuardianReviewSessionOutcome { + ) -> ( + GuardianReviewSessionOutcome, + Option, + ) { let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT; let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(¶ms.spawn_config); let mut stale_trunk_to_shutdown = None; + let mut spawned_trunk = false; let trunk_candidate = match run_before_review_deadline( deadline, params.external_cancel.as_ref(), @@ -304,16 +336,17 @@ impl GuardianReviewSessionManager { { Ok(Ok(review_session)) => Arc::new(review_session), Ok(Err(err)) => { - return GuardianReviewSessionOutcome::Completed(Err(err)); + return (GuardianReviewSessionOutcome::PromptBuildFailed(err), None); } - Err(outcome) => return outcome, + Err(outcome) => return (outcome, None), }; state.trunk = Some(Arc::clone(&review_session)); + spawned_trunk = true; } state.trunk.as_ref().cloned() } - Err(outcome) => return outcome, + Err(outcome) => return (outcome, None), }; if let Some(review_session) = stale_trunk_to_shutdown { @@ -321,9 +354,12 @@ impl GuardianReviewSessionManager { } let Some(trunk) = trunk_candidate else { - return GuardianReviewSessionOutcome::Completed(Err(anyhow!( - "guardian review session was not available after spawn" - ))); + return ( + GuardianReviewSessionOutcome::Completed(Err(anyhow!( + "guardian review session was not available after spawn" + ))), + None, + ); }; if trunk.reuse_key != next_reuse_key { @@ -351,20 +387,25 @@ impl GuardianReviewSessionManager { } }; - let (outcome, keep_review_session) = - run_review_on_session(trunk.as_ref(), ¶ms, deadline).await; + let guardian_session_kind = if spawned_trunk { + GuardianReviewSessionKind::TrunkNew + } else { + GuardianReviewSessionKind::TrunkReused + }; + let (outcome, keep_review_session, metadata) = + run_review_on_session(trunk.as_ref(), ¶ms, guardian_session_kind, deadline).await; if keep_review_session && matches!(outcome, GuardianReviewSessionOutcome::Completed(_)) { trunk.refresh_last_committed_fork_snapshot().await; } drop(trunk_guard); if keep_review_session { - outcome + (outcome, Some(metadata)) } else { if let Some(review_session) = self.remove_trunk_if_current(&trunk).await { review_session.shutdown_in_background(); } - outcome + (outcome, Some(metadata)) } } @@ -461,7 +502,10 @@ impl GuardianReviewSessionManager { reuse_key: GuardianReviewSessionReuseKey, deadline: tokio::time::Instant, fork_snapshot: Option, - ) -> GuardianReviewSessionOutcome { + ) -> ( + GuardianReviewSessionOutcome, + Option, + ) { let spawn_cancel_token = CancellationToken::new(); let mut fork_config = params.spawn_config.clone(); fork_config.ephemeral = true; @@ -480,20 +524,26 @@ impl GuardianReviewSessionManager { .await { Ok(Ok(review_session)) => Arc::new(review_session), - Ok(Err(err)) => return GuardianReviewSessionOutcome::Completed(Err(err)), - Err(outcome) => return outcome, + Ok(Err(err)) => return (GuardianReviewSessionOutcome::PromptBuildFailed(err), None), + Err(outcome) => return (outcome, None), }; self.register_active_ephemeral(Arc::clone(&review_session)) .await; let mut cleanup = EphemeralReviewCleanup::new(Arc::clone(&self.state), Arc::clone(&review_session)); - let (outcome, _) = run_review_on_session(review_session.as_ref(), ¶ms, deadline).await; + let (outcome, _, metadata) = run_review_on_session( + review_session.as_ref(), + ¶ms, + GuardianReviewSessionKind::EphemeralForked, + deadline, + ) + .await; if let Some(review_session) = self.take_active_ephemeral(&review_session).await { cleanup.disarm(); review_session.shutdown_in_background(); } - outcome + (outcome, Some(metadata)) } } @@ -540,8 +590,13 @@ async fn spawn_guardian_review_session( async fn run_review_on_session( review_session: &GuardianReviewSession, params: &GuardianReviewSessionParams, + guardian_session_kind: GuardianReviewSessionKind, deadline: tokio::time::Instant, -) -> (GuardianReviewSessionOutcome, bool) { +) -> ( + GuardianReviewSessionOutcome, + bool, + GuardianReviewSessionMetadata, +) { let (send_followup_reminder, prompt_mode) = { let state = review_session.state.lock().await; @@ -556,6 +611,14 @@ async fn run_review_on_session( (send_followup_reminder, prompt_mode) }; + let mut guardian_metadata = GuardianReviewSessionMetadata { + guardian_thread_id: review_session.codex.session.conversation_id.to_string(), + guardian_session_kind, + guardian_model: params.model.clone(), + guardian_reasoning_effort: params.reasoning_effort.map(|effort| effort.to_string()), + had_prior_review_context: had_prior_review_context(&prompt_mode), + token_usage: None, + }; if send_followup_reminder { append_guardian_followup_reminder(review_session).await; } @@ -580,6 +643,8 @@ async fn run_review_on_session( prompt_mode, ) .await?; + let token_usage_at_review_start = + review_session.codex.session.total_token_usage().await; review_session .codex @@ -599,29 +664,45 @@ async fn run_review_on_session( }) .await?; - Ok::(prompt_items.transcript_cursor) + Ok::<(GuardianTranscriptCursor, Option), anyhow::Error>(( + prompt_items.transcript_cursor, + token_usage_at_review_start, + )) }), ) .await; let submit_result = match submit_result { Ok(submit_result) => submit_result, - Err(outcome) => return (outcome, false), + Err(outcome) => return (outcome, false, guardian_metadata), }; - let transcript_cursor = match submit_result { - Ok(transcript_cursor) => transcript_cursor, + let (transcript_cursor, token_usage_at_review_start) = match submit_result { + Ok(submit_result) => submit_result, Err(err) => { - return (GuardianReviewSessionOutcome::Completed(Err(err)), false); + return ( + GuardianReviewSessionOutcome::PromptBuildFailed(err), + false, + guardian_metadata, + ); } }; let outcome = wait_for_guardian_review(review_session, deadline, params.external_cancel.as_ref()).await; if matches!(outcome.0, GuardianReviewSessionOutcome::Completed(_)) { + if outcome.2 + && let Some(token_usage_at_review_start) = token_usage_at_review_start + && let Some(total_token_usage) = review_session.codex.session.total_token_usage().await + { + guardian_metadata.token_usage = Some(token_usage_delta( + &token_usage_at_review_start, + &total_token_usage, + )); + } let mut state = review_session.state.lock().await; state.prior_review_count = state.prior_review_count.saturating_add(1); state.last_reviewed_transcript_cursor = Some(transcript_cursor); } - outcome + (outcome.0, outcome.1, guardian_metadata) } async fn append_guardian_followup_reminder(review_session: &GuardianReviewSession) { @@ -650,7 +731,7 @@ async fn wait_for_guardian_review( review_session: &GuardianReviewSession, deadline: tokio::time::Instant, external_cancel: Option<&CancellationToken>, -) -> (GuardianReviewSessionOutcome, bool) { +) -> (GuardianReviewSessionOutcome, bool, bool) { let timeout = tokio::time::sleep_until(deadline); tokio::pin!(timeout); let mut last_error_message: Option = None; @@ -659,7 +740,7 @@ async fn wait_for_guardian_review( tokio::select! { _ = &mut timeout => { let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok(); - return (GuardianReviewSessionOutcome::TimedOut, keep_review_session); + return (GuardianReviewSessionOutcome::TimedOut, keep_review_session, false); } _ = async { if let Some(cancel_token) = external_cancel { @@ -669,7 +750,7 @@ async fn wait_for_guardian_review( } } => { let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok(); - return (GuardianReviewSessionOutcome::Aborted, keep_review_session); + return (GuardianReviewSessionOutcome::Aborted, keep_review_session, false); } event = review_session.codex.next_event() => { match event { @@ -681,18 +762,20 @@ async fn wait_for_guardian_review( return ( GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))), true, + true, ); } return ( GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)), true, + true, ); } EventMsg::Error(error) => { last_error_message = Some(error.message); } EventMsg::TurnAborted(_) => { - return (GuardianReviewSessionOutcome::Aborted, true); + return (GuardianReviewSessionOutcome::Aborted, true, false); } _ => {} }, @@ -700,6 +783,7 @@ async fn wait_for_guardian_review( return ( GuardianReviewSessionOutcome::Completed(Err(err.into())), false, + false, ); } } @@ -951,4 +1035,44 @@ mod tests { assert_eq!(outcome.unwrap(), 42); assert!(!cancel_token.is_cancelled()); } + + #[test] + fn had_prior_review_context_tracks_prompt_mode() { + assert!(!had_prior_review_context(&GuardianPromptMode::Full)); + assert!(had_prior_review_context(&GuardianPromptMode::Delta { + cursor: GuardianTranscriptCursor { + parent_history_version: 7, + transcript_entry_count: 42, + } + })); + } + + #[test] + fn token_usage_delta_never_reports_negative_usage() { + let start = TokenUsage { + input_tokens: 10, + cached_input_tokens: 8, + output_tokens: 6, + reasoning_output_tokens: 4, + total_tokens: 28, + }; + let end = TokenUsage { + input_tokens: 15, + cached_input_tokens: 7, + output_tokens: 10, + reasoning_output_tokens: 2, + total_tokens: 34, + }; + + assert_eq!( + token_usage_delta(&start, &end), + TokenUsage { + input_tokens: 5, + cached_input_tokens: 0, + output_tokens: 4, + reasoning_output_tokens: 0, + total_tokens: 6, + } + ); + } } diff --git a/codex-rs/core/src/guardian/tests.rs b/codex-rs/core/src/guardian/tests.rs index 5e44aa972a..881e6f9e53 100644 --- a/codex-rs/core/src/guardian/tests.rs +++ b/codex-rs/core/src/guardian/tests.rs @@ -916,7 +916,7 @@ async fn guardian_review_request_layout_matches_model_visible_request_snapshot() /*external_cancel*/ None, ) .await; - let GuardianReviewOutcome::Completed(Ok(assessment)) = outcome else { + let GuardianReviewOutcome::Completed(Ok(assessment), _) = outcome else { panic!("expected guardian assessment"); }; assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow); @@ -1125,13 +1125,13 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow: ) .await; - let GuardianReviewOutcome::Completed(Ok(first_assessment)) = first_outcome else { + let GuardianReviewOutcome::Completed(Ok(first_assessment), _) = first_outcome else { panic!("expected first guardian assessment"); }; - let GuardianReviewOutcome::Completed(Ok(second_assessment)) = second_outcome else { + let GuardianReviewOutcome::Completed(Ok(second_assessment), _) = second_outcome else { panic!("expected second guardian assessment"); }; - let GuardianReviewOutcome::Completed(Ok(third_assessment)) = third_outcome else { + let GuardianReviewOutcome::Completed(Ok(third_assessment), _) = third_outcome else { panic!("expected third guardian assessment"); }; assert_eq!(first_assessment.outcome, GuardianAssessmentOutcome::Allow);