Compare commits

...

1 Commits

Author SHA1 Message Date
rhan-oai
8356585f11 [codex-analytics] centralize thread analytics state 2026-04-30 11:40:26 -07:00
2 changed files with 560 additions and 93 deletions

View File

@@ -1436,6 +1436,375 @@ async fn subagent_thread_started_publishes_without_initialize() {
assert_eq!(payload[0]["event_params"]["subagent_source"], "review");
}
#[tokio::test]
async fn subagent_thread_started_preserves_existing_connection_when_parent_lookup_fails() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
.expect("valid parent thread id");
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_resume_response_with_source(
"thread-review",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)),
},
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-review".to_string(),
parent_thread_id: Some("missing-parent".to_string()),
product_client_id: "codex-tui".to_string(),
client_name: "codex-tui".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::Other("guardian".to_string()),
created_at: 128,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-review".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
active_context_tokens_before: 131_000,
active_context_tokens_after: 131_000,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["product_client_id"],
"codex-tui"
);
assert_eq!(
payload[0]["event_params"]["subagent_source"],
"thread_spawn"
);
}
#[tokio::test]
async fn subagent_thread_started_preserves_existing_connection_when_parent_lookup_succeeds() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
codex_protocol::ThreadId::from_string("44444444-4444-4444-4444-444444444444")
.expect("valid parent thread id");
let parent_thread_id_string = parent_thread_id.to_string();
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "parent-client".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "parent-client".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 8,
params: InitializeParams {
client_info: ClientInfo {
name: "child-client".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "child-client".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
&parent_thread_id_string,
/*ephemeral*/ false,
"gpt-5",
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 8,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_resume_response_with_source(
"thread-review",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)),
},
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-review".to_string(),
parent_thread_id: Some(parent_thread_id_string),
product_client_id: "child-client".to_string(),
client_name: "child-client".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::Other("guardian".to_string()),
created_at: 130,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-review".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
active_context_tokens_before: 131_000,
active_context_tokens_after: 131_000,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["product_client_id"],
"child-client"
);
}
#[tokio::test]
async fn subagent_thread_started_preserves_resumed_thread_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
codex_protocol::ThreadId::from_string("33333333-3333-3333-3333-333333333333")
.expect("valid parent thread id");
let parent_thread_id_string = parent_thread_id.to_string();
ingest_initialize(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
&parent_thread_id_string,
/*ephemeral*/ false,
"gpt-5",
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_resume_response_with_source(
"thread-review",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)),
},
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-review".to_string(),
parent_thread_id: Some(parent_thread_id_string.clone()),
product_client_id: "codex-tui".to_string(),
client_name: "codex-tui".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::Other("guardian".to_string()),
created_at: 129,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request(
"thread-review",
/*request_id*/ 3,
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-review")),
},
&mut events,
)
.await;
let mut resolved_config = sample_turn_resolved_config("turn-review");
resolved_config.thread_id = "thread-review".to_string();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
resolved_config,
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_started_notification(
"thread-review",
"turn-review",
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(Box::new(
sample_turn_token_usage_fact("thread-review", "turn-review"),
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-review",
"turn-review",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_turn_event");
assert_eq!(payload[0]["event_params"]["initialization_mode"], "resumed");
assert_eq!(
payload[0]["event_params"]["subagent_source"],
"thread_spawn"
);
assert_eq!(
payload[0]["event_params"]["parent_thread_id"],
parent_thread_id_string
);
}
#[test]
fn plugin_used_event_serializes_expected_shape() {
let tracking = TrackEventsContext {
@@ -1989,7 +2358,7 @@ async fn accepted_turn_steer_emits_expected_event() {
}
#[tokio::test]
async fn rejected_turn_steer_uses_request_connection_metadata() {
async fn rejected_turn_steer_uses_thread_connection_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
let payload = ingest_rejected_turn_steer(

View File

@@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer {
requests: HashMap<(u64, RequestId), RequestState>,
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
thread_metadata: HashMap<String, ThreadMetadataState>,
threads: HashMap<String, ThreadAnalyticsState>,
}
struct ConnectionState {
@@ -83,6 +82,69 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
#[derive(Default)]
struct ThreadAnalyticsState {
connection_id: Option<u64>,
metadata: Option<ThreadMetadataState>,
}
#[derive(Clone, Copy)]
struct AnalyticsDropSite<'a> {
event_name: &'static str,
thread_id: &'a str,
turn_id: Option<&'a str>,
review_id: Option<&'a str>,
item_id: Option<&'a str>,
}
impl<'a> AnalyticsDropSite<'a> {
fn guardian(input: &'a GuardianReviewEventParams) -> Self {
Self {
event_name: "guardian",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: Some(&input.review_id),
item_id: None,
}
}
fn compaction(input: &'a CodexCompactionEvent) -> Self {
Self {
event_name: "compaction",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: None,
item_id: None,
}
}
fn turn_steer(thread_id: &'a str) -> Self {
Self {
event_name: "turn steer",
thread_id,
turn_id: None,
review_id: None,
item_id: None,
}
}
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
Self {
event_name: "turn",
thread_id,
turn_id: Some(turn_id),
review_id: None,
item_id: None,
}
}
}
enum MissingAnalyticsContext {
ThreadConnection,
Connection { connection_id: u64 },
ThreadMetadata,
}
#[derive(Clone)]
struct ThreadMetadataState {
thread_source: Option<&'static str>,
@@ -144,7 +206,6 @@ struct CompletedTurnState {
}
struct TurnState {
connection_id: Option<u64>,
thread_id: Option<String>,
num_input_images: Option<usize>,
resolved_config: Option<TurnResolvedConfigFact>,
@@ -274,6 +335,26 @@ impl AnalyticsReducer {
input: SubAgentThreadStartedInput,
out: &mut Vec<TrackEventRequest>,
) {
let parent_thread_id = input
.parent_thread_id
.clone()
.or_else(|| subagent_parent_thread_id(&input.subagent_source));
let parent_connection_id = parent_thread_id
.as_ref()
.and_then(|parent_thread_id| self.threads.get(parent_thread_id))
.and_then(|thread| thread.connection_id);
let thread_state = self.threads.entry(input.thread_id.clone()).or_default();
thread_state
.metadata
.get_or_insert_with(|| ThreadMetadataState {
thread_source: Some("subagent"),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
parent_thread_id,
});
if thread_state.connection_id.is_none() {
thread_state.connection_id = parent_connection_id;
}
out.push(TrackEventRequest::ThreadInitialized(
subagent_thread_started_event_request(input),
));
@@ -284,23 +365,9 @@ impl AnalyticsReducer {
input: GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
"dropping guardian analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
connection_id,
"dropping guardian analytics event: missing connection metadata"
);
let Some(connection_state) =
self.thread_connection_or_warn(AnalyticsDropSite::guardian(&input))
else {
return;
};
out.push(TrackEventRequest::GuardianReview(Box::new(
@@ -355,7 +422,6 @@ impl AnalyticsReducer {
let thread_id = input.thread_id.clone();
let num_input_images = input.num_input_images;
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
@@ -377,7 +443,6 @@ impl AnalyticsReducer {
) {
let turn_id = input.turn_id.clone();
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
@@ -538,7 +603,6 @@ impl AnalyticsReducer {
return;
};
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
@@ -547,7 +611,6 @@ impl AnalyticsReducer {
completed: None,
steer_count: 0,
});
turn_state.connection_id = Some(connection_id);
turn_state.thread_id = Some(pending_request.thread_id);
turn_state.num_input_images = Some(pending_request.num_input_images);
self.maybe_emit_turn_event(&turn_id, out);
@@ -597,13 +660,12 @@ impl AnalyticsReducer {
fn ingest_turn_steer_error_response(
&mut self,
connection_id: u64,
_connection_id: u64,
pending_request: PendingTurnSteerState,
error_type: Option<AnalyticsJsonRpcError>,
out: &mut Vec<TrackEventRequest>,
) {
self.emit_turn_steer_event(
connection_id,
pending_request,
/*accepted_turn_id*/ None,
TurnSteerResult::Rejected,
@@ -620,7 +682,6 @@ impl AnalyticsReducer {
match notification {
ServerNotification::TurnStarted(notification) => {
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
@@ -639,7 +700,6 @@ impl AnalyticsReducer {
self.turns
.entry(notification.turn.id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
@@ -686,10 +746,13 @@ impl AnalyticsReducer {
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
.insert(thread_id.clone(), thread_metadata.clone());
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
connection_id: Some(connection_id),
metadata: Some(thread_metadata.clone()),
},
);
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
@@ -710,29 +773,9 @@ impl AnalyticsReducer {
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
connection_id,
"dropping compaction analytics event: missing connection metadata"
);
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread lifecycle metadata"
);
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::compaction(&input))
else {
return;
};
out.push(TrackEventRequest::Compaction(Box::new(
@@ -766,7 +809,6 @@ impl AnalyticsReducer {
turn_state.steer_count += 1;
}
self.emit_turn_steer_event(
connection_id,
pending_request,
Some(response.turn_id),
TurnSteerResult::Accepted,
@@ -777,21 +819,15 @@ impl AnalyticsReducer {
fn emit_turn_steer_event(
&mut self,
connection_id: u64,
pending_request: PendingTurnSteerState,
accepted_turn_id: Option<String>,
result: TurnSteerResult,
rejection_reason: Option<TurnSteerRejectionReason>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else {
tracing::warn!(
thread_id = %pending_request.thread_id,
"dropping turn steer analytics event: missing thread lifecycle metadata"
);
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::turn_steer(&pending_request.thread_id))
else {
return;
};
out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest {
@@ -824,42 +860,20 @@ impl AnalyticsReducer {
{
return;
}
let connection_metadata = turn_state
.connection_id
.and_then(|connection_id| self.connections.get(&connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
if let Some(connection_id) = turn_state.connection_id {
tracing::warn!(
turn_id,
connection_id,
"dropping turn analytics event: missing connection metadata"
);
}
return;
};
let Some(thread_id) = turn_state.thread_id.as_ref() else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(thread_id) else {
tracing::warn!(
thread_id,
turn_id,
"dropping turn analytics event: missing thread lifecycle metadata"
);
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::turn(thread_id, turn_id))
else {
return;
};
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
app_server_client,
runtime,
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
turn_id.to_string(),
turn_state,
thread_metadata,
@@ -868,6 +882,90 @@ impl AnalyticsReducer {
)));
self.turns.remove(turn_id);
}
fn thread_connection_or_warn(
&self,
drop_site: AnalyticsDropSite<'_>,
) -> Option<&ConnectionState> {
let connection_id = self.thread_connection_id_or_warn(&drop_site)?;
let Some(connection_state) = self.connections.get(&connection_id) else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::Connection { connection_id },
);
return None;
};
Some(connection_state)
}
fn thread_context_or_warn(
&self,
drop_site: AnalyticsDropSite<'_>,
) -> Option<(&ConnectionState, &ThreadMetadataState)> {
let connection_state = self.thread_connection_or_warn(drop_site)?;
let Some(thread_metadata) = self.thread_metadata(drop_site.thread_id) else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return None;
};
Some((connection_state, thread_metadata))
}
fn thread_connection_id_or_warn(&self, drop_site: &AnalyticsDropSite<'_>) -> Option<u64> {
let Some(thread_state) = self.threads.get(drop_site.thread_id) else {
warn_missing_analytics_context(drop_site, MissingAnalyticsContext::ThreadConnection);
return None;
};
let Some(connection_id) = thread_state.connection_id else {
warn_missing_analytics_context(drop_site, MissingAnalyticsContext::ThreadConnection);
return None;
};
Some(connection_id)
}
fn thread_metadata(&self, thread_id: &str) -> Option<&ThreadMetadataState> {
self.threads
.get(thread_id)
.and_then(|thread| thread.metadata.as_ref())
}
}
fn warn_missing_analytics_context(
drop_site: &AnalyticsDropSite<'_>,
missing: MissingAnalyticsContext,
) {
match missing {
MissingAnalyticsContext::ThreadConnection => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
"dropping {} analytics event: missing thread connection metadata",
drop_site.event_name
);
}
MissingAnalyticsContext::Connection { connection_id } => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
connection_id,
"dropping {} analytics event: missing connection metadata",
drop_site.event_name
);
}
MissingAnalyticsContext::ThreadMetadata => {
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
"dropping {} analytics event: missing thread lifecycle metadata",
drop_site.event_name
);
}
}
}
fn codex_turn_event_params(