[codex-analytics] guardian review thread and token metadata

This commit is contained in:
rhan-oai
2026-04-14 17:30:30 -07:00
parent 8de41717a0
commit e243d74e9d
3 changed files with 221 additions and 58 deletions

View File

@@ -37,6 +37,7 @@ use super::approval_request::guardian_request_target_item_id;
use super::approval_request::guardian_request_turn_id;
use super::prompt::guardian_output_schema;
use super::prompt::parse_guardian_assessment;
use super::review_session::GuardianReviewSessionMetadata;
use super::review_session::GuardianReviewSessionOutcome;
use super::review_session::GuardianReviewSessionParams;
use super::review_session::build_guardian_review_session_config;
@@ -86,10 +87,13 @@ pub(crate) fn guardian_timeout_message() -> String {
#[derive(Debug)]
pub(super) enum GuardianReviewOutcome {
Completed(anyhow::Result<GuardianAssessment>),
Failed(GuardianReviewFailure),
TimedOut,
Aborted,
Completed(
anyhow::Result<GuardianAssessment>,
Option<GuardianReviewSessionMetadata>,
),
Failed(GuardianReviewFailure, Option<GuardianReviewSessionMetadata>),
TimedOut(Option<GuardianReviewSessionMetadata>),
Aborted(Option<GuardianReviewSessionMetadata>),
}
#[derive(Debug)]
@@ -220,6 +224,24 @@ struct GuardianReviewMetadataFields {
time_to_first_token_ms: Option<u64>,
}
fn guardian_review_metadata_fields(
metadata: Option<GuardianReviewSessionMetadata>,
) -> GuardianReviewMetadataFields {
match metadata {
Some(metadata) => GuardianReviewMetadataFields {
guardian_thread_id: Some(metadata.guardian_thread_id),
guardian_session_kind: Some(metadata.guardian_session_kind),
guardian_model: Some(metadata.guardian_model),
guardian_reasoning_effort: metadata.guardian_reasoning_effort,
had_prior_review_context: Some(metadata.had_prior_review_context),
reviewed_action_truncated: false,
token_usage: metadata.token_usage,
time_to_first_token_ms: None,
},
None => GuardianReviewMetadataFields::default(),
}
}
impl GuardianReviewAnalyticsContext {
fn track(
&self,
@@ -404,8 +426,8 @@ async fn run_guardian_review(
let completed_at = now_unix_seconds();
let assessment = match outcome {
GuardianReviewOutcome::Completed(Ok(assessment)) => {
let metadata = GuardianReviewMetadataFields::default();
GuardianReviewOutcome::Completed(Ok(assessment), metadata) => {
let metadata = guardian_review_metadata_fields(metadata);
analytics_context.track(
session.as_ref(),
turn.as_ref(),
@@ -440,8 +462,8 @@ async fn run_guardian_review(
);
assessment
}
GuardianReviewOutcome::Completed(Err(err)) => {
let metadata = GuardianReviewMetadataFields::default();
GuardianReviewOutcome::Completed(Err(err), metadata) => {
let metadata = guardian_review_metadata_fields(metadata);
let rationale = format!("Automatic approval review failed: {err}");
analytics_context.track(
session.as_ref(),
@@ -471,8 +493,8 @@ async fn run_guardian_review(
rationale,
}
}
GuardianReviewOutcome::Failed(failure) => {
let metadata = GuardianReviewMetadataFields::default();
GuardianReviewOutcome::Failed(failure, metadata) => {
let metadata = guardian_review_metadata_fields(metadata);
let rationale = format!("Automatic approval review failed: {}", failure.error());
analytics_context.track(
session.as_ref(),
@@ -502,8 +524,8 @@ async fn run_guardian_review(
rationale,
}
}
GuardianReviewOutcome::TimedOut => {
let metadata = GuardianReviewMetadataFields::default();
GuardianReviewOutcome::TimedOut(metadata) => {
let metadata = guardian_review_metadata_fields(metadata);
let rationale =
"Automatic approval review timed out while evaluating the requested approval."
.to_string();
@@ -554,8 +576,8 @@ async fn run_guardian_review(
.await;
return ReviewDecision::TimedOut;
}
GuardianReviewOutcome::Aborted => {
let metadata = GuardianReviewMetadataFields::default();
GuardianReviewOutcome::Aborted(metadata) => {
let metadata = guardian_review_metadata_fields(metadata);
analytics_context.track(
session.as_ref(),
turn.as_ref(),
@@ -748,7 +770,10 @@ pub(super) async fn run_guardian_review_session(
Some(network_proxy) => match network_proxy.proxy().current_cfg().await {
Ok(config) => Some(config),
Err(err) => {
return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err));
return GuardianReviewOutcome::Failed(
GuardianReviewFailure::PromptBuild(err),
None,
);
}
},
None => None,
@@ -799,10 +824,12 @@ pub(super) async fn run_guardian_review_session(
);
let guardian_config = match guardian_config {
Ok(config) => config,
Err(err) => return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err)),
Err(err) => {
return GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err), None);
}
};
match session
let (session_outcome, session_metadata) = session
.guardian_review_session
.run_review(GuardianReviewSessionParams {
parent_session: Arc::clone(&session),
@@ -817,25 +844,37 @@ pub(super) async fn run_guardian_review_session(
personality: turn.personality,
external_cancel,
})
.await
{
.await;
match session_outcome {
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) => match last_agent_message
{
Some(last_agent_message) => {
match parse_guardian_assessment(Some(&last_agent_message)) {
Ok(assessment) => GuardianReviewOutcome::Completed(Ok(assessment)),
Err(err) => GuardianReviewOutcome::Failed(GuardianReviewFailure::Parse(err)),
Ok(assessment) => {
GuardianReviewOutcome::Completed(Ok(assessment), session_metadata)
}
Err(err) => GuardianReviewOutcome::Failed(
GuardianReviewFailure::Parse(err),
session_metadata,
),
}
}
None => GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(anyhow::anyhow!(
"guardian review completed without an assessment payload"
))),
None => GuardianReviewOutcome::Failed(
GuardianReviewFailure::Session(anyhow::anyhow!(
"guardian review completed without an assessment payload"
)),
session_metadata,
),
},
GuardianReviewSessionOutcome::Completed(Err(err)) => {
GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(err))
GuardianReviewOutcome::Failed(GuardianReviewFailure::Session(err), session_metadata)
}
GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut,
GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted,
GuardianReviewSessionOutcome::PromptBuildFailed(err) => {
GuardianReviewOutcome::Failed(GuardianReviewFailure::PromptBuild(err), session_metadata)
}
GuardianReviewSessionOutcome::TimedOut => GuardianReviewOutcome::TimedOut(session_metadata),
GuardianReviewSessionOutcome::Aborted => GuardianReviewOutcome::Aborted(session_metadata),
}
}

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use codex_analytics::GuardianReviewSessionKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::DeveloperInstructions;
@@ -17,6 +18,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
@@ -58,10 +60,21 @@ const GUARDIAN_FOLLOWUP_REVIEW_REMINDER: &str = concat!(
#[derive(Debug)]
pub(crate) enum GuardianReviewSessionOutcome {
Completed(anyhow::Result<Option<String>>),
PromptBuildFailed(anyhow::Error),
TimedOut,
Aborted,
}
#[derive(Debug, Clone)]
pub(crate) struct GuardianReviewSessionMetadata {
pub(crate) guardian_thread_id: String,
pub(crate) guardian_session_kind: GuardianReviewSessionKind,
pub(crate) guardian_model: String,
pub(crate) guardian_reasoning_effort: Option<String>,
pub(crate) had_prior_review_context: bool,
pub(crate) token_usage: Option<TokenUsage>,
}
pub(crate) struct GuardianReviewSessionParams {
pub(crate) parent_session: Arc<Session>,
pub(crate) parent_turn: Arc<TurnContext>,
@@ -101,6 +114,21 @@ struct GuardianReviewState {
last_committed_fork_snapshot: Option<GuardianReviewForkSnapshot>,
}
fn had_prior_review_context(prompt_mode: &GuardianPromptMode) -> bool {
matches!(prompt_mode, GuardianPromptMode::Delta { .. })
}
fn token_usage_delta(start: &TokenUsage, end: &TokenUsage) -> TokenUsage {
TokenUsage {
input_tokens: (end.input_tokens - start.input_tokens).max(0),
cached_input_tokens: (end.cached_input_tokens - start.cached_input_tokens).max(0),
output_tokens: (end.output_tokens - start.output_tokens).max(0),
reasoning_output_tokens: (end.reasoning_output_tokens - start.reasoning_output_tokens)
.max(0),
total_tokens: (end.total_tokens - start.total_tokens).max(0),
}
}
struct EphemeralReviewCleanup {
state: Arc<Mutex<GuardianReviewSessionState>>,
review_session: Option<Arc<GuardianReviewSession>>,
@@ -267,10 +295,14 @@ impl GuardianReviewSessionManager {
pub(crate) async fn run_review(
&self,
params: GuardianReviewSessionParams,
) -> GuardianReviewSessionOutcome {
) -> (
GuardianReviewSessionOutcome,
Option<GuardianReviewSessionMetadata>,
) {
let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT;
let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(&params.spawn_config);
let mut stale_trunk_to_shutdown = None;
let mut spawned_trunk = false;
let trunk_candidate = match run_before_review_deadline(
deadline,
params.external_cancel.as_ref(),
@@ -304,16 +336,17 @@ impl GuardianReviewSessionManager {
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => {
return GuardianReviewSessionOutcome::Completed(Err(err));
return (GuardianReviewSessionOutcome::PromptBuildFailed(err), None);
}
Err(outcome) => return outcome,
Err(outcome) => return (outcome, None),
};
state.trunk = Some(Arc::clone(&review_session));
spawned_trunk = true;
}
state.trunk.as_ref().cloned()
}
Err(outcome) => return outcome,
Err(outcome) => return (outcome, None),
};
if let Some(review_session) = stale_trunk_to_shutdown {
@@ -321,9 +354,12 @@ impl GuardianReviewSessionManager {
}
let Some(trunk) = trunk_candidate else {
return GuardianReviewSessionOutcome::Completed(Err(anyhow!(
"guardian review session was not available after spawn"
)));
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(
"guardian review session was not available after spawn"
))),
None,
);
};
if trunk.reuse_key != next_reuse_key {
@@ -351,20 +387,25 @@ impl GuardianReviewSessionManager {
}
};
let (outcome, keep_review_session) =
run_review_on_session(trunk.as_ref(), &params, deadline).await;
let guardian_session_kind = if spawned_trunk {
GuardianReviewSessionKind::TrunkNew
} else {
GuardianReviewSessionKind::TrunkReused
};
let (outcome, keep_review_session, metadata) =
run_review_on_session(trunk.as_ref(), &params, guardian_session_kind, deadline).await;
if keep_review_session && matches!(outcome, GuardianReviewSessionOutcome::Completed(_)) {
trunk.refresh_last_committed_fork_snapshot().await;
}
drop(trunk_guard);
if keep_review_session {
outcome
(outcome, Some(metadata))
} else {
if let Some(review_session) = self.remove_trunk_if_current(&trunk).await {
review_session.shutdown_in_background();
}
outcome
(outcome, Some(metadata))
}
}
@@ -461,7 +502,10 @@ impl GuardianReviewSessionManager {
reuse_key: GuardianReviewSessionReuseKey,
deadline: tokio::time::Instant,
fork_snapshot: Option<GuardianReviewForkSnapshot>,
) -> GuardianReviewSessionOutcome {
) -> (
GuardianReviewSessionOutcome,
Option<GuardianReviewSessionMetadata>,
) {
let spawn_cancel_token = CancellationToken::new();
let mut fork_config = params.spawn_config.clone();
fork_config.ephemeral = true;
@@ -480,20 +524,26 @@ impl GuardianReviewSessionManager {
.await
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => return GuardianReviewSessionOutcome::Completed(Err(err)),
Err(outcome) => return outcome,
Ok(Err(err)) => return (GuardianReviewSessionOutcome::PromptBuildFailed(err), None),
Err(outcome) => return (outcome, None),
};
self.register_active_ephemeral(Arc::clone(&review_session))
.await;
let mut cleanup =
EphemeralReviewCleanup::new(Arc::clone(&self.state), Arc::clone(&review_session));
let (outcome, _) = run_review_on_session(review_session.as_ref(), &params, deadline).await;
let (outcome, _, metadata) = run_review_on_session(
review_session.as_ref(),
&params,
GuardianReviewSessionKind::EphemeralForked,
deadline,
)
.await;
if let Some(review_session) = self.take_active_ephemeral(&review_session).await {
cleanup.disarm();
review_session.shutdown_in_background();
}
outcome
(outcome, Some(metadata))
}
}
@@ -540,8 +590,13 @@ async fn spawn_guardian_review_session(
async fn run_review_on_session(
review_session: &GuardianReviewSession,
params: &GuardianReviewSessionParams,
guardian_session_kind: GuardianReviewSessionKind,
deadline: tokio::time::Instant,
) -> (GuardianReviewSessionOutcome, bool) {
) -> (
GuardianReviewSessionOutcome,
bool,
GuardianReviewSessionMetadata,
) {
let (send_followup_reminder, prompt_mode) = {
let state = review_session.state.lock().await;
@@ -556,6 +611,14 @@ async fn run_review_on_session(
(send_followup_reminder, prompt_mode)
};
let mut guardian_metadata = GuardianReviewSessionMetadata {
guardian_thread_id: review_session.codex.session.conversation_id.to_string(),
guardian_session_kind,
guardian_model: params.model.clone(),
guardian_reasoning_effort: params.reasoning_effort.map(|effort| effort.to_string()),
had_prior_review_context: had_prior_review_context(&prompt_mode),
token_usage: None,
};
if send_followup_reminder {
append_guardian_followup_reminder(review_session).await;
}
@@ -580,6 +643,8 @@ async fn run_review_on_session(
prompt_mode,
)
.await?;
let token_usage_at_review_start =
review_session.codex.session.total_token_usage().await;
review_session
.codex
@@ -599,29 +664,45 @@ async fn run_review_on_session(
})
.await?;
Ok::<GuardianTranscriptCursor, anyhow::Error>(prompt_items.transcript_cursor)
Ok::<(GuardianTranscriptCursor, Option<TokenUsage>), anyhow::Error>((
prompt_items.transcript_cursor,
token_usage_at_review_start,
))
}),
)
.await;
let submit_result = match submit_result {
Ok(submit_result) => submit_result,
Err(outcome) => return (outcome, false),
Err(outcome) => return (outcome, false, guardian_metadata),
};
let transcript_cursor = match submit_result {
Ok(transcript_cursor) => transcript_cursor,
let (transcript_cursor, token_usage_at_review_start) = match submit_result {
Ok(submit_result) => submit_result,
Err(err) => {
return (GuardianReviewSessionOutcome::Completed(Err(err)), false);
return (
GuardianReviewSessionOutcome::PromptBuildFailed(err),
false,
guardian_metadata,
);
}
};
let outcome =
wait_for_guardian_review(review_session, deadline, params.external_cancel.as_ref()).await;
if matches!(outcome.0, GuardianReviewSessionOutcome::Completed(_)) {
if outcome.2
&& let Some(token_usage_at_review_start) = token_usage_at_review_start
&& let Some(total_token_usage) = review_session.codex.session.total_token_usage().await
{
guardian_metadata.token_usage = Some(token_usage_delta(
&token_usage_at_review_start,
&total_token_usage,
));
}
let mut state = review_session.state.lock().await;
state.prior_review_count = state.prior_review_count.saturating_add(1);
state.last_reviewed_transcript_cursor = Some(transcript_cursor);
}
outcome
(outcome.0, outcome.1, guardian_metadata)
}
async fn append_guardian_followup_reminder(review_session: &GuardianReviewSession) {
@@ -650,7 +731,7 @@ async fn wait_for_guardian_review(
review_session: &GuardianReviewSession,
deadline: tokio::time::Instant,
external_cancel: Option<&CancellationToken>,
) -> (GuardianReviewSessionOutcome, bool) {
) -> (GuardianReviewSessionOutcome, bool, bool) {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
let mut last_error_message: Option<String> = None;
@@ -659,7 +740,7 @@ async fn wait_for_guardian_review(
tokio::select! {
_ = &mut timeout => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::TimedOut, keep_review_session);
return (GuardianReviewSessionOutcome::TimedOut, keep_review_session, false);
}
_ = async {
if let Some(cancel_token) = external_cancel {
@@ -669,7 +750,7 @@ async fn wait_for_guardian_review(
}
} => {
let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok();
return (GuardianReviewSessionOutcome::Aborted, keep_review_session);
return (GuardianReviewSessionOutcome::Aborted, keep_review_session, false);
}
event = review_session.codex.next_event() => {
match event {
@@ -681,18 +762,20 @@ async fn wait_for_guardian_review(
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
true,
true,
);
}
return (
GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)),
true,
true,
);
}
EventMsg::Error(error) => {
last_error_message = Some(error.message);
}
EventMsg::TurnAborted(_) => {
return (GuardianReviewSessionOutcome::Aborted, true);
return (GuardianReviewSessionOutcome::Aborted, true, false);
}
_ => {}
},
@@ -700,6 +783,7 @@ async fn wait_for_guardian_review(
return (
GuardianReviewSessionOutcome::Completed(Err(err.into())),
false,
false,
);
}
}
@@ -951,4 +1035,44 @@ mod tests {
assert_eq!(outcome.unwrap(), 42);
assert!(!cancel_token.is_cancelled());
}
#[test]
fn had_prior_review_context_tracks_prompt_mode() {
assert!(!had_prior_review_context(&GuardianPromptMode::Full));
assert!(had_prior_review_context(&GuardianPromptMode::Delta {
cursor: GuardianTranscriptCursor {
parent_history_version: 7,
transcript_entry_count: 42,
}
}));
}
#[test]
fn token_usage_delta_never_reports_negative_usage() {
let start = TokenUsage {
input_tokens: 10,
cached_input_tokens: 8,
output_tokens: 6,
reasoning_output_tokens: 4,
total_tokens: 28,
};
let end = TokenUsage {
input_tokens: 15,
cached_input_tokens: 7,
output_tokens: 10,
reasoning_output_tokens: 2,
total_tokens: 34,
};
assert_eq!(
token_usage_delta(&start, &end),
TokenUsage {
input_tokens: 5,
cached_input_tokens: 0,
output_tokens: 4,
reasoning_output_tokens: 0,
total_tokens: 6,
}
);
}
}

View File

@@ -916,7 +916,7 @@ async fn guardian_review_request_layout_matches_model_visible_request_snapshot()
/*external_cancel*/ None,
)
.await;
let GuardianReviewOutcome::Completed(Ok(assessment)) = outcome else {
let GuardianReviewOutcome::Completed(Ok(assessment), _) = outcome else {
panic!("expected guardian assessment");
};
assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow);
@@ -1125,13 +1125,13 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
)
.await;
let GuardianReviewOutcome::Completed(Ok(first_assessment)) = first_outcome else {
let GuardianReviewOutcome::Completed(Ok(first_assessment), _) = first_outcome else {
panic!("expected first guardian assessment");
};
let GuardianReviewOutcome::Completed(Ok(second_assessment)) = second_outcome else {
let GuardianReviewOutcome::Completed(Ok(second_assessment), _) = second_outcome else {
panic!("expected second guardian assessment");
};
let GuardianReviewOutcome::Completed(Ok(third_assessment)) = third_outcome else {
let GuardianReviewOutcome::Completed(Ok(third_assessment), _) = third_outcome else {
panic!("expected third guardian assessment");
};
assert_eq!(first_assessment.outcome, GuardianAssessmentOutcome::Allow);