This commit is contained in:
Roy Han
2026-03-26 00:00:51 -07:00
4 changed files with 180 additions and 60 deletions

View File

@@ -147,7 +147,8 @@ pub struct AppInvocation {
pub enum AnalyticsInput {
CodexThreadInitialized(CodexThreadInitializedInput),
TurnEvent(TurnEventInput),
TurnStarted(TurnStartedInput),
TurnCompleted(TurnCompletedInput),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),
@@ -155,10 +156,15 @@ pub enum AnalyticsInput {
PluginStateChanged(PluginStateChangedInput),
}
pub struct TurnEventInput {
pub struct TurnStartedInput {
pub tracking: TrackEventsContext,
pub turn_event: CodexTurnEvent,
}
pub struct TurnCompletedInput {
pub turn_id: String,
}
pub struct SkillInvokedInput {
pub tracking: TrackEventsContext,
pub invocations: Vec<SkillInvocation>,
@@ -195,12 +201,17 @@ pub enum PluginState {
#[derive(Default)]
pub struct AnalyticsReducer {
threads: HashMap<String, ThreadState>,
turns: HashMap<String, TurnState>,
}
struct ThreadState {
_initialized_input: CodexThreadInitializedInput,
}
struct TurnState {
started_input: TurnStartedInput,
}
#[derive(Clone)]
pub(crate) struct AnalyticsEventsQueue {
sender: mpsc::Sender<AnalyticsInput>,
@@ -326,13 +337,19 @@ impl AnalyticsEventsClient {
}));
}
pub fn track_turn_event(&self, tracking: TrackEventsContext, turn_event: CodexTurnEvent) {
self.record(AnalyticsInput::TurnEvent(TurnEventInput {
pub fn track_turn_started(&self, tracking: TrackEventsContext, turn_event: CodexTurnEvent) {
self.record(AnalyticsInput::TurnStarted(TurnStartedInput {
tracking,
turn_event,
}));
}
pub fn track_turn_completed(&self, turn_id: String) {
self.record(AnalyticsInput::TurnCompleted(TurnCompletedInput {
turn_id,
}));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
@@ -545,8 +562,11 @@ impl AnalyticsReducer {
AnalyticsInput::CodexThreadInitialized(input) => {
self.ingest_thread_initialized(input, out);
}
AnalyticsInput::TurnEvent(input) => {
self.ingest_turn_event(input, out);
AnalyticsInput::TurnStarted(input) => {
self.ingest_turn_started(input);
}
AnalyticsInput::TurnCompleted(input) => {
self.ingest_turn_completed(input, out);
}
AnalyticsInput::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
@@ -582,11 +602,27 @@ impl AnalyticsReducer {
));
}
fn ingest_turn_event(&mut self, input: TurnEventInput, out: &mut Vec<TrackEventRequest>) {
let TurnEventInput {
fn ingest_turn_started(&mut self, input: TurnStartedInput) {
self.turns.insert(
input.tracking.turn_id.clone(),
TurnState {
started_input: input,
},
);
}
fn ingest_turn_completed(
&mut self,
input: TurnCompletedInput,
out: &mut Vec<TrackEventRequest>,
) {
let Some(TurnState { started_input }) = self.turns.remove(&input.turn_id) else {
return;
};
let TurnStartedInput {
tracking,
turn_event,
} = input;
} = started_input;
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",

View File

@@ -1,4 +1,6 @@
use super::AnalyticsEventsQueue;
use super::AnalyticsInput;
use super::AnalyticsReducer;
use super::AppInvocation;
use super::CodexAppMentionedEventRequest;
use super::CodexAppUsedEventRequest;
@@ -12,6 +14,9 @@ use super::InitializationMode;
use super::InvocationType;
use super::TrackEventRequest;
use super::TrackEventsContext;
use super::TurnCompletedInput;
use super::TurnStartedInput;
use super::TurnSubmissionType;
use super::TurnSubmissionType;
use super::codex_app_metadata;
use super::codex_plugin_metadata;
@@ -300,6 +305,77 @@ fn turn_event_serializes_expected_shape() {
);
}
#[tokio::test]
async fn turn_started_then_completed_emits_turn_event() {
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-2".to_string(),
turn_id: "turn-2".to_string(),
};
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
reducer
.ingest(
AnalyticsInput::TurnStarted(TurnStartedInput {
tracking: tracking.clone(),
turn_event: CodexTurnEvent {
submission_type: None,
model_provider: "openai".to_string(),
sandbox_policy: SandboxPolicy::new_read_only_policy(),
reasoning_effort: Some(ReasoningEffort::High),
reasoning_summary: Some(ReasoningSummary::Detailed),
service_tier: Some(ServiceTier::Flex),
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::GuardianSubagent,
sandbox_network_access: true,
collaboration_mode: ModeKind::Plan,
personality: Some(Personality::Pragmatic),
num_input_images: 2,
is_first_turn: true,
status: None,
turn_error: None,
steer_count: None,
total_tool_call_count: None,
shell_command_count: None,
file_change_count: None,
mcp_tool_call_count: None,
dynamic_tool_call_count: None,
subagent_tool_call_count: None,
web_search_count: None,
image_generation_count: None,
input_tokens: None,
cached_input_tokens: None,
output_tokens: None,
reasoning_output_tokens: None,
total_tokens: None,
duration_ms: None,
started_at: None,
completed_at: None,
},
}),
&mut out,
)
.await;
assert!(out.is_empty());
reducer
.ingest(
AnalyticsInput::TurnCompleted(TurnCompletedInput {
turn_id: tracking.turn_id.clone(),
}),
&mut out,
)
.await;
assert_eq!(out.len(), 1);
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
assert_eq!(payload["event_type"], json!("codex_turn_event"));
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
assert_eq!(payload["event_params"]["turn_id"], json!("turn-2"));
}
#[test]
fn thread_initialized_event_serializes_expected_shape() {
let event = TrackEventRequest::CodexThreadInitialized(codex_thread_initialized_event_request(

View File

@@ -17,6 +17,8 @@ pub use analytics_client::PluginUsedInput;
pub use analytics_client::SkillInvocation;
pub use analytics_client::SkillInvokedInput;
pub use analytics_client::TrackEventsContext;
pub use analytics_client::TurnCompletedInput;
pub use analytics_client::TurnStartedInput;
pub use analytics_client::TurnStatus;
pub use analytics_client::TurnSubmissionType;
pub use analytics_client::build_track_events_context;

View File

@@ -840,7 +840,7 @@ fn session_source_parent_thread_id(session_source: &SessionSource) -> Option<Str
fn turn_submission_type(submission_type: SubmissionType) -> TurnSubmissionType {
match submission_type {
SubmissionType::Prompt => TurnSubmissionType::Default,
SubmissionType::QueuedPrompt => TurnSubmissionType::Queued,
SubmissionType::PromptQueued => TurnSubmissionType::Queued,
}
}
@@ -5892,6 +5892,59 @@ pub(crate) async fn run_turn(
.await;
}
if !input.is_empty() {
let is_first_turn = {
let mut state = sess.state.lock().await;
state.take_next_turn_is_first()
};
let submission_type = turn_context
.submission_type
.map(turn_submission_type)
.unwrap_or(TurnSubmissionType::Default);
sess.services.analytics_events_client.track_turn_started(
tracking.clone(),
CodexTurnEvent {
submission_type: Some(submission_type),
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
collaboration_mode: turn_context.collaboration_mode.mode,
personality: turn_context.personality,
num_input_images: input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
is_first_turn,
status: None,
turn_error: None,
steer_count: None,
total_tool_call_count: None,
shell_command_count: None,
file_change_count: None,
mcp_tool_call_count: None,
dynamic_tool_call_count: None,
subagent_tool_call_count: None,
web_search_count: None,
image_generation_count: None,
input_tokens: None,
cached_input_tokens: None,
output_tokens: None,
reasoning_output_tokens: None,
total_tokens: None,
duration_ms: None,
started_at: None,
completed_at: None,
},
);
}
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
@@ -6175,56 +6228,9 @@ pub(crate) async fn run_turn(
}
if !input.is_empty() {
let submission_type = turn_context
.submission_type
.map(turn_submission_type)
.unwrap_or(TurnSubmissionType::Default);
let is_first_turn = {
let mut state = sess.state.lock().await;
state.take_next_turn_is_first()
};
sess.services.analytics_events_client.track_turn_event(
tracking,
CodexTurnEvent {
submission_type: Some(submission_type),
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
collaboration_mode: turn_context.collaboration_mode.mode,
personality: turn_context.personality,
num_input_images: input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
is_first_turn,
status: None,
turn_error: None,
steer_count: None,
total_tool_call_count: None,
shell_command_count: None,
file_change_count: None,
mcp_tool_call_count: None,
dynamic_tool_call_count: None,
subagent_tool_call_count: None,
web_search_count: None,
image_generation_count: None,
input_tokens: None,
cached_input_tokens: None,
output_tokens: None,
reasoning_output_tokens: None,
total_tokens: None,
duration_ms: None,
started_at: None,
completed_at: None,
},
);
sess.services
.analytics_events_client
.track_turn_completed(tracking.turn_id.clone());
}
last_agent_message