Compare commits

...

1 Commits

Author SHA1 Message Date
rhan-oai
d877e6acb5 [codex-analytics] centralize thread analytics state 2026-04-30 13:58:01 -07:00
2 changed files with 323 additions and 81 deletions

View File

@@ -303,10 +303,10 @@ fn sample_turn_completed_notification(
})
}
fn sample_turn_resolved_config(turn_id: &str) -> TurnResolvedConfigFact {
fn sample_turn_resolved_config(thread_id: &str, turn_id: &str) -> TurnResolvedConfigFact {
TurnResolvedConfigFact {
turn_id: turn_id.to_string(),
thread_id: "thread-2".to_string(),
thread_id: thread_id.to_string(),
num_input_images: 1,
submission_type: None,
ephemeral: false,
@@ -419,6 +419,38 @@ async fn ingest_rejected_turn_steer(
/*include_started*/ false, /*include_token_usage*/ false,
)
.await;
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 8,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-web".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "codex-web".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
},
out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 8,
request_id: RequestId::Integer(6),
response: Box::new(sample_thread_resume_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
out,
)
.await;
out.clear();
reducer
.ingest(
AnalyticsFact::ClientRequest {
@@ -519,7 +551,7 @@ async fn ingest_turn_prerequisites(
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
sample_turn_resolved_config("turn-2"),
sample_turn_resolved_config("thread-2", "turn-2"),
))),
out,
)
@@ -1436,6 +1468,110 @@ async fn subagent_thread_started_publishes_without_initialize() {
assert_eq!(payload[0]["event_params"]["subagent_source"], "review");
}
#[tokio::test]
async fn subagent_thread_started_inherits_parent_connection_for_new_thread() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
codex_protocol::ThreadId::from_string("44444444-4444-4444-4444-444444444444")
.expect("valid parent thread id");
let parent_thread_id_string = parent_thread_id.to_string();
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "parent-client".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "parent-client".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
&parent_thread_id_string,
/*ephemeral*/ false,
"gpt-5",
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-review".to_string(),
parent_thread_id: None,
product_client_id: "parent-client".to_string(),
client_name: "parent-client".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
},
created_at: 130,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-review".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Completed,
error: None,
active_context_tokens_before: 131_000,
active_context_tokens_after: 64_000,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["product_client_id"],
"parent-client"
);
assert_eq!(
payload[0]["event_params"]["parent_thread_id"],
"44444444-4444-4444-4444-444444444444"
);
}
#[test]
fn plugin_used_event_serializes_expected_shape() {
let tracking = TrackEventsContext {
@@ -2130,7 +2266,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
sample_turn_resolved_config("turn-2"),
sample_turn_resolved_config("thread-2", "turn-2"),
))),
&mut out,
)

View File

@@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer {
requests: HashMap<(u64, RequestId), RequestState>,
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
thread_metadata: HashMap<String, ThreadMetadataState>,
threads: HashMap<String, ThreadAnalyticsState>,
}
struct ConnectionState {
@@ -83,6 +82,69 @@ struct ConnectionState {
runtime: CodexRuntimeMetadata,
}
#[derive(Default)]
struct ThreadAnalyticsState {
connection_id: Option<u64>,
metadata: Option<ThreadMetadataState>,
}
#[derive(Clone, Copy)]
struct AnalyticsDropSite<'a> {
event_name: &'static str,
thread_id: &'a str,
turn_id: Option<&'a str>,
review_id: Option<&'a str>,
item_id: Option<&'a str>,
}
impl<'a> AnalyticsDropSite<'a> {
fn guardian(input: &'a GuardianReviewEventParams) -> Self {
Self {
event_name: "guardian",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: Some(&input.review_id),
item_id: None,
}
}
fn compaction(input: &'a CodexCompactionEvent) -> Self {
Self {
event_name: "compaction",
thread_id: &input.thread_id,
turn_id: Some(&input.turn_id),
review_id: None,
item_id: None,
}
}
fn turn_steer(thread_id: &'a str) -> Self {
Self {
event_name: "turn steer",
thread_id,
turn_id: None,
review_id: None,
item_id: None,
}
}
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
Self {
event_name: "turn",
thread_id,
turn_id: Some(turn_id),
review_id: None,
item_id: None,
}
}
}
enum MissingAnalyticsContext {
ThreadConnection,
Connection { connection_id: u64 },
ThreadMetadata,
}
#[derive(Clone)]
struct ThreadMetadataState {
thread_source: Option<&'static str>,
@@ -274,6 +336,26 @@ impl AnalyticsReducer {
input: SubAgentThreadStartedInput,
out: &mut Vec<TrackEventRequest>,
) {
let parent_thread_id = input
.parent_thread_id
.clone()
.or_else(|| subagent_parent_thread_id(&input.subagent_source));
let parent_connection_id = parent_thread_id
.as_ref()
.and_then(|parent_thread_id| self.threads.get(parent_thread_id))
.and_then(|thread| thread.connection_id);
let thread_state = self.threads.entry(input.thread_id.clone()).or_default();
thread_state
.metadata
.get_or_insert_with(|| ThreadMetadataState {
thread_source: Some("subagent"),
initialization_mode: ThreadInitializationMode::New,
subagent_source: Some(subagent_source_name(&input.subagent_source)),
parent_thread_id,
});
if thread_state.connection_id.is_none() {
thread_state.connection_id = parent_connection_id;
}
out.push(TrackEventRequest::ThreadInitialized(
subagent_thread_started_event_request(input),
));
@@ -284,23 +366,9 @@ impl AnalyticsReducer {
input: GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
"dropping guardian analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
review_id = %input.review_id,
connection_id,
"dropping guardian analytics event: missing connection metadata"
);
let Some(connection_state) =
self.thread_connection_or_warn(AnalyticsDropSite::guardian(&input))
else {
return;
};
out.push(TrackEventRequest::GuardianReview(Box::new(
@@ -686,10 +754,13 @@ impl AnalyticsReducer {
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
.insert(thread_id.clone(), thread_metadata.clone());
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
connection_id: Some(connection_id),
metadata: Some(thread_metadata.clone()),
},
);
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
@@ -710,29 +781,9 @@ impl AnalyticsReducer {
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread connection metadata"
);
return;
};
let Some(connection_state) = self.connections.get(connection_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
connection_id,
"dropping compaction analytics event: missing connection metadata"
);
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else {
tracing::warn!(
thread_id = %input.thread_id,
turn_id = %input.turn_id,
"dropping compaction analytics event: missing thread lifecycle metadata"
);
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::compaction(&input))
else {
return;
};
out.push(TrackEventRequest::Compaction(Box::new(
@@ -787,11 +838,13 @@ impl AnalyticsReducer {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else {
tracing::warn!(
thread_id = %pending_request.thread_id,
"dropping turn steer analytics event: missing thread lifecycle metadata"
);
let drop_site = AnalyticsDropSite::turn_steer(&pending_request.thread_id);
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest {
@@ -824,42 +877,34 @@ impl AnalyticsReducer {
{
return;
}
let connection_metadata = turn_state
.connection_id
.and_then(|connection_id| self.connections.get(&connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
if let Some(connection_id) = turn_state.connection_id {
tracing::warn!(
turn_id,
connection_id,
"dropping turn analytics event: missing connection metadata"
);
}
return;
};
let Some(thread_id) = turn_state.thread_id.as_ref() else {
return;
};
let Some(thread_metadata) = self.thread_metadata.get(thread_id) else {
tracing::warn!(
thread_id,
turn_id,
"dropping turn analytics event: missing thread lifecycle metadata"
let Some(connection_id) = turn_state.connection_id else {
return;
};
let Some(connection_state) = self.connections.get(&connection_id) else {
warn_missing_analytics_context(
&AnalyticsDropSite::turn(thread_id, turn_id),
MissingAnalyticsContext::Connection { connection_id },
);
return;
};
let drop_site = AnalyticsDropSite::turn(thread_id, turn_id);
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
out.push(TrackEventRequest::TurnEvent(Box::new(
CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
app_server_client,
runtime,
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
turn_id.to_string(),
turn_state,
thread_metadata,
@@ -868,6 +913,67 @@ impl AnalyticsReducer {
)));
self.turns.remove(turn_id);
}
fn thread_connection_or_warn(
&self,
drop_site: AnalyticsDropSite<'_>,
) -> Option<&ConnectionState> {
let Some(thread_state) = self.threads.get(drop_site.thread_id) else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadConnection);
return None;
};
let Some(connection_id) = thread_state.connection_id else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadConnection);
return None;
};
let Some(connection_state) = self.connections.get(&connection_id) else {
warn_missing_analytics_context(
&drop_site,
MissingAnalyticsContext::Connection { connection_id },
);
return None;
};
Some(connection_state)
}
fn thread_context_or_warn(
&self,
drop_site: AnalyticsDropSite<'_>,
) -> Option<(&ConnectionState, &ThreadMetadataState)> {
let connection_state = self.thread_connection_or_warn(drop_site)?;
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return None;
};
Some((connection_state, thread_metadata))
}
}
fn warn_missing_analytics_context(
drop_site: &AnalyticsDropSite<'_>,
missing: MissingAnalyticsContext,
) {
let (missing_context, connection_id) = match missing {
MissingAnalyticsContext::ThreadConnection => ("thread_connection", None),
MissingAnalyticsContext::Connection { connection_id } => {
("connection", Some(connection_id))
}
MissingAnalyticsContext::ThreadMetadata => ("thread_metadata", None),
};
tracing::warn!(
thread_id = %drop_site.thread_id,
turn_id = ?drop_site.turn_id,
review_id = ?drop_site.review_id,
item_id = ?drop_site.item_id,
missing_context,
connection_id,
"dropping {} analytics event: missing analytics context",
drop_site.event_name
);
}
fn codex_turn_event_params(