mirror of
https://github.com/openai/codex.git
synced 2026-05-05 05:42:33 +03:00
Compare commits
1 Commits
automation
...
pr20460
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8356585f11 |
@@ -1436,6 +1436,375 @@ async fn subagent_thread_started_publishes_without_initialize() {
|
||||
assert_eq!(payload[0]["event_params"]["subagent_source"], "review");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subagent_thread_started_preserves_existing_connection_when_parent_lookup_fails() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
|
||||
.expect("valid parent thread id");
|
||||
|
||||
ingest_initialize(&mut reducer, &mut events).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-review",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
|
||||
SubAgentThreadStartedInput {
|
||||
thread_id: "thread-review".to_string(),
|
||||
parent_thread_id: Some("missing-parent".to_string()),
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
client_name: "codex-tui".to_string(),
|
||||
client_version: "1.0.0".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
ephemeral: false,
|
||||
subagent_source: SubAgentSource::Other("guardian".to_string()),
|
||||
created_at: 128,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-review".to_string(),
|
||||
turn_id: "turn-compact".to_string(),
|
||||
trigger: CompactionTrigger::Manual,
|
||||
reason: CompactionReason::UserRequested,
|
||||
implementation: CompactionImplementation::Responses,
|
||||
phase: CompactionPhase::StandaloneTurn,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status: CompactionStatus::Failed,
|
||||
error: Some("context limit exceeded".to_string()),
|
||||
active_context_tokens_before: 131_000,
|
||||
active_context_tokens_after: 131_000,
|
||||
started_at: 100,
|
||||
completed_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
},
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["product_client_id"],
|
||||
"codex-tui"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["subagent_source"],
|
||||
"thread_spawn"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subagent_thread_started_preserves_existing_connection_when_parent_lookup_succeeds() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("44444444-4444-4444-4444-444444444444")
|
||||
.expect("valid parent thread id");
|
||||
let parent_thread_id_string = parent_thread_id.to_string();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "parent-client".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "parent-client".to_string(),
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 8,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "child-client".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "child-client".to_string(),
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(1),
|
||||
response: Box::new(sample_thread_start_response(
|
||||
&parent_thread_id_string,
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 8,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-review",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
|
||||
SubAgentThreadStartedInput {
|
||||
thread_id: "thread-review".to_string(),
|
||||
parent_thread_id: Some(parent_thread_id_string),
|
||||
product_client_id: "child-client".to_string(),
|
||||
client_name: "child-client".to_string(),
|
||||
client_version: "1.0.0".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
ephemeral: false,
|
||||
subagent_source: SubAgentSource::Other("guardian".to_string()),
|
||||
created_at: 130,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-review".to_string(),
|
||||
turn_id: "turn-compact".to_string(),
|
||||
trigger: CompactionTrigger::Manual,
|
||||
reason: CompactionReason::UserRequested,
|
||||
implementation: CompactionImplementation::Responses,
|
||||
phase: CompactionPhase::StandaloneTurn,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status: CompactionStatus::Failed,
|
||||
error: Some("context limit exceeded".to_string()),
|
||||
active_context_tokens_before: 131_000,
|
||||
active_context_tokens_after: 131_000,
|
||||
started_at: 100,
|
||||
completed_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
},
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["product_client_id"],
|
||||
"child-client"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subagent_thread_started_preserves_resumed_thread_metadata() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("33333333-3333-3333-3333-333333333333")
|
||||
.expect("valid parent thread id");
|
||||
let parent_thread_id_string = parent_thread_id.to_string();
|
||||
|
||||
ingest_initialize(&mut reducer, &mut events).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(1),
|
||||
response: Box::new(sample_thread_start_response(
|
||||
&parent_thread_id_string,
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-review",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
|
||||
SubAgentThreadStartedInput {
|
||||
thread_id: "thread-review".to_string(),
|
||||
parent_thread_id: Some(parent_thread_id_string.clone()),
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
client_name: "codex-tui".to_string(),
|
||||
client_version: "1.0.0".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
ephemeral: false,
|
||||
subagent_source: SubAgentSource::Other("guardian".to_string()),
|
||||
created_at: 129,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
events.clear();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request(
|
||||
"thread-review",
|
||||
/*request_id*/ 3,
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
response: Box::new(sample_turn_start_response("turn-review")),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
let mut resolved_config = sample_turn_resolved_config("turn-review");
|
||||
resolved_config.thread_id = "thread-review".to_string();
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
|
||||
resolved_config,
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_started_notification(
|
||||
"thread-review",
|
||||
"turn-review",
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(Box::new(
|
||||
sample_turn_token_usage_fact("thread-review", "turn-review"),
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-review",
|
||||
"turn-review",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_turn_event");
|
||||
assert_eq!(payload[0]["event_params"]["initialization_mode"], "resumed");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["subagent_source"],
|
||||
"thread_spawn"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["parent_thread_id"],
|
||||
parent_thread_id_string
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plugin_used_event_serializes_expected_shape() {
|
||||
let tracking = TrackEventsContext {
|
||||
@@ -1989,7 +2358,7 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejected_turn_steer_uses_request_connection_metadata() {
|
||||
async fn rejected_turn_steer_uses_thread_connection_metadata() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
let payload = ingest_rejected_turn_steer(
|
||||
|
||||
@@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer {
|
||||
requests: HashMap<(u64, RequestId), RequestState>,
|
||||
turns: HashMap<String, TurnState>,
|
||||
connections: HashMap<u64, ConnectionState>,
|
||||
thread_connections: HashMap<String, u64>,
|
||||
thread_metadata: HashMap<String, ThreadMetadataState>,
|
||||
threads: HashMap<String, ThreadAnalyticsState>,
|
||||
}
|
||||
|
||||
struct ConnectionState {
|
||||
@@ -83,6 +82,69 @@ struct ConnectionState {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ThreadAnalyticsState {
|
||||
connection_id: Option<u64>,
|
||||
metadata: Option<ThreadMetadataState>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct AnalyticsDropSite<'a> {
|
||||
event_name: &'static str,
|
||||
thread_id: &'a str,
|
||||
turn_id: Option<&'a str>,
|
||||
review_id: Option<&'a str>,
|
||||
item_id: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> AnalyticsDropSite<'a> {
|
||||
fn guardian(input: &'a GuardianReviewEventParams) -> Self {
|
||||
Self {
|
||||
event_name: "guardian",
|
||||
thread_id: &input.thread_id,
|
||||
turn_id: Some(&input.turn_id),
|
||||
review_id: Some(&input.review_id),
|
||||
item_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn compaction(input: &'a CodexCompactionEvent) -> Self {
|
||||
Self {
|
||||
event_name: "compaction",
|
||||
thread_id: &input.thread_id,
|
||||
turn_id: Some(&input.turn_id),
|
||||
review_id: None,
|
||||
item_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn turn_steer(thread_id: &'a str) -> Self {
|
||||
Self {
|
||||
event_name: "turn steer",
|
||||
thread_id,
|
||||
turn_id: None,
|
||||
review_id: None,
|
||||
item_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
|
||||
Self {
|
||||
event_name: "turn",
|
||||
thread_id,
|
||||
turn_id: Some(turn_id),
|
||||
review_id: None,
|
||||
item_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MissingAnalyticsContext {
|
||||
ThreadConnection,
|
||||
Connection { connection_id: u64 },
|
||||
ThreadMetadata,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ThreadMetadataState {
|
||||
thread_source: Option<&'static str>,
|
||||
@@ -144,7 +206,6 @@ struct CompletedTurnState {
|
||||
}
|
||||
|
||||
struct TurnState {
|
||||
connection_id: Option<u64>,
|
||||
thread_id: Option<String>,
|
||||
num_input_images: Option<usize>,
|
||||
resolved_config: Option<TurnResolvedConfigFact>,
|
||||
@@ -274,6 +335,26 @@ impl AnalyticsReducer {
|
||||
input: SubAgentThreadStartedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let parent_thread_id = input
|
||||
.parent_thread_id
|
||||
.clone()
|
||||
.or_else(|| subagent_parent_thread_id(&input.subagent_source));
|
||||
let parent_connection_id = parent_thread_id
|
||||
.as_ref()
|
||||
.and_then(|parent_thread_id| self.threads.get(parent_thread_id))
|
||||
.and_then(|thread| thread.connection_id);
|
||||
let thread_state = self.threads.entry(input.thread_id.clone()).or_default();
|
||||
thread_state
|
||||
.metadata
|
||||
.get_or_insert_with(|| ThreadMetadataState {
|
||||
thread_source: Some("subagent"),
|
||||
initialization_mode: ThreadInitializationMode::New,
|
||||
subagent_source: Some(subagent_source_name(&input.subagent_source)),
|
||||
parent_thread_id,
|
||||
});
|
||||
if thread_state.connection_id.is_none() {
|
||||
thread_state.connection_id = parent_connection_id;
|
||||
}
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
subagent_thread_started_event_request(input),
|
||||
));
|
||||
@@ -284,23 +365,9 @@ impl AnalyticsReducer {
|
||||
input: GuardianReviewEventParams,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %input.thread_id,
|
||||
turn_id = %input.turn_id,
|
||||
review_id = %input.review_id,
|
||||
"dropping guardian analytics event: missing thread connection metadata"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let Some(connection_state) = self.connections.get(connection_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %input.thread_id,
|
||||
turn_id = %input.turn_id,
|
||||
review_id = %input.review_id,
|
||||
connection_id,
|
||||
"dropping guardian analytics event: missing connection metadata"
|
||||
);
|
||||
let Some(connection_state) =
|
||||
self.thread_connection_or_warn(AnalyticsDropSite::guardian(&input))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::GuardianReview(Box::new(
|
||||
@@ -355,7 +422,6 @@ impl AnalyticsReducer {
|
||||
let thread_id = input.thread_id.clone();
|
||||
let num_input_images = input.num_input_images;
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
@@ -377,7 +443,6 @@ impl AnalyticsReducer {
|
||||
) {
|
||||
let turn_id = input.turn_id.clone();
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
@@ -538,7 +603,6 @@ impl AnalyticsReducer {
|
||||
return;
|
||||
};
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
@@ -547,7 +611,6 @@ impl AnalyticsReducer {
|
||||
completed: None,
|
||||
steer_count: 0,
|
||||
});
|
||||
turn_state.connection_id = Some(connection_id);
|
||||
turn_state.thread_id = Some(pending_request.thread_id);
|
||||
turn_state.num_input_images = Some(pending_request.num_input_images);
|
||||
self.maybe_emit_turn_event(&turn_id, out);
|
||||
@@ -597,13 +660,12 @@ impl AnalyticsReducer {
|
||||
|
||||
fn ingest_turn_steer_error_response(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
_connection_id: u64,
|
||||
pending_request: PendingTurnSteerState,
|
||||
error_type: Option<AnalyticsJsonRpcError>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.emit_turn_steer_event(
|
||||
connection_id,
|
||||
pending_request,
|
||||
/*accepted_turn_id*/ None,
|
||||
TurnSteerResult::Rejected,
|
||||
@@ -620,7 +682,6 @@ impl AnalyticsReducer {
|
||||
match notification {
|
||||
ServerNotification::TurnStarted(notification) => {
|
||||
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
@@ -639,7 +700,6 @@ impl AnalyticsReducer {
|
||||
self.turns
|
||||
.entry(notification.turn.id.clone())
|
||||
.or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
@@ -686,10 +746,13 @@ impl AnalyticsReducer {
|
||||
};
|
||||
let thread_metadata =
|
||||
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
|
||||
self.thread_connections
|
||||
.insert(thread_id.clone(), connection_id);
|
||||
self.thread_metadata
|
||||
.insert(thread_id.clone(), thread_metadata.clone());
|
||||
self.threads.insert(
|
||||
thread_id.clone(),
|
||||
ThreadAnalyticsState {
|
||||
connection_id: Some(connection_id),
|
||||
metadata: Some(thread_metadata.clone()),
|
||||
},
|
||||
);
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
@@ -710,29 +773,9 @@ impl AnalyticsReducer {
|
||||
}
|
||||
|
||||
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
|
||||
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %input.thread_id,
|
||||
turn_id = %input.turn_id,
|
||||
"dropping compaction analytics event: missing thread connection metadata"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let Some(connection_state) = self.connections.get(connection_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %input.thread_id,
|
||||
turn_id = %input.turn_id,
|
||||
connection_id,
|
||||
"dropping compaction analytics event: missing connection metadata"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %input.thread_id,
|
||||
turn_id = %input.turn_id,
|
||||
"dropping compaction analytics event: missing thread lifecycle metadata"
|
||||
);
|
||||
let Some((connection_state, thread_metadata)) =
|
||||
self.thread_context_or_warn(AnalyticsDropSite::compaction(&input))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::Compaction(Box::new(
|
||||
@@ -766,7 +809,6 @@ impl AnalyticsReducer {
|
||||
turn_state.steer_count += 1;
|
||||
}
|
||||
self.emit_turn_steer_event(
|
||||
connection_id,
|
||||
pending_request,
|
||||
Some(response.turn_id),
|
||||
TurnSteerResult::Accepted,
|
||||
@@ -777,21 +819,15 @@ impl AnalyticsReducer {
|
||||
|
||||
fn emit_turn_steer_event(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
pending_request: PendingTurnSteerState,
|
||||
accepted_turn_id: Option<String>,
|
||||
result: TurnSteerResult,
|
||||
rejection_reason: Option<TurnSteerRejectionReason>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %pending_request.thread_id,
|
||||
"dropping turn steer analytics event: missing thread lifecycle metadata"
|
||||
);
|
||||
let Some((connection_state, thread_metadata)) =
|
||||
self.thread_context_or_warn(AnalyticsDropSite::turn_steer(&pending_request.thread_id))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest {
|
||||
@@ -824,42 +860,20 @@ impl AnalyticsReducer {
|
||||
{
|
||||
return;
|
||||
}
|
||||
let connection_metadata = turn_state
|
||||
.connection_id
|
||||
.and_then(|connection_id| self.connections.get(&connection_id))
|
||||
.map(|connection_state| {
|
||||
(
|
||||
connection_state.app_server_client.clone(),
|
||||
connection_state.runtime.clone(),
|
||||
)
|
||||
});
|
||||
let Some((app_server_client, runtime)) = connection_metadata else {
|
||||
if let Some(connection_id) = turn_state.connection_id {
|
||||
tracing::warn!(
|
||||
turn_id,
|
||||
connection_id,
|
||||
"dropping turn analytics event: missing connection metadata"
|
||||
);
|
||||
}
|
||||
return;
|
||||
};
|
||||
let Some(thread_id) = turn_state.thread_id.as_ref() else {
|
||||
return;
|
||||
};
|
||||
let Some(thread_metadata) = self.thread_metadata.get(thread_id) else {
|
||||
tracing::warn!(
|
||||
thread_id,
|
||||
turn_id,
|
||||
"dropping turn analytics event: missing thread lifecycle metadata"
|
||||
);
|
||||
let Some((connection_state, thread_metadata)) =
|
||||
self.thread_context_or_warn(AnalyticsDropSite::turn(thread_id, turn_id))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::TurnEvent(Box::new(
|
||||
CodexTurnEventRequest {
|
||||
event_type: "codex_turn_event",
|
||||
event_params: codex_turn_event_params(
|
||||
app_server_client,
|
||||
runtime,
|
||||
connection_state.app_server_client.clone(),
|
||||
connection_state.runtime.clone(),
|
||||
turn_id.to_string(),
|
||||
turn_state,
|
||||
thread_metadata,
|
||||
@@ -868,6 +882,90 @@ impl AnalyticsReducer {
|
||||
)));
|
||||
self.turns.remove(turn_id);
|
||||
}
|
||||
|
||||
fn thread_connection_or_warn(
|
||||
&self,
|
||||
drop_site: AnalyticsDropSite<'_>,
|
||||
) -> Option<&ConnectionState> {
|
||||
let connection_id = self.thread_connection_id_or_warn(&drop_site)?;
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
warn_missing_analytics_context(
|
||||
&drop_site,
|
||||
MissingAnalyticsContext::Connection { connection_id },
|
||||
);
|
||||
return None;
|
||||
};
|
||||
Some(connection_state)
|
||||
}
|
||||
|
||||
fn thread_context_or_warn(
|
||||
&self,
|
||||
drop_site: AnalyticsDropSite<'_>,
|
||||
) -> Option<(&ConnectionState, &ThreadMetadataState)> {
|
||||
let connection_state = self.thread_connection_or_warn(drop_site)?;
|
||||
let Some(thread_metadata) = self.thread_metadata(drop_site.thread_id) else {
|
||||
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
|
||||
return None;
|
||||
};
|
||||
Some((connection_state, thread_metadata))
|
||||
}
|
||||
|
||||
fn thread_connection_id_or_warn(&self, drop_site: &AnalyticsDropSite<'_>) -> Option<u64> {
|
||||
let Some(thread_state) = self.threads.get(drop_site.thread_id) else {
|
||||
warn_missing_analytics_context(drop_site, MissingAnalyticsContext::ThreadConnection);
|
||||
return None;
|
||||
};
|
||||
let Some(connection_id) = thread_state.connection_id else {
|
||||
warn_missing_analytics_context(drop_site, MissingAnalyticsContext::ThreadConnection);
|
||||
return None;
|
||||
};
|
||||
Some(connection_id)
|
||||
}
|
||||
|
||||
fn thread_metadata(&self, thread_id: &str) -> Option<&ThreadMetadataState> {
|
||||
self.threads
|
||||
.get(thread_id)
|
||||
.and_then(|thread| thread.metadata.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
fn warn_missing_analytics_context(
|
||||
drop_site: &AnalyticsDropSite<'_>,
|
||||
missing: MissingAnalyticsContext,
|
||||
) {
|
||||
match missing {
|
||||
MissingAnalyticsContext::ThreadConnection => {
|
||||
tracing::warn!(
|
||||
thread_id = %drop_site.thread_id,
|
||||
turn_id = ?drop_site.turn_id,
|
||||
review_id = ?drop_site.review_id,
|
||||
item_id = ?drop_site.item_id,
|
||||
"dropping {} analytics event: missing thread connection metadata",
|
||||
drop_site.event_name
|
||||
);
|
||||
}
|
||||
MissingAnalyticsContext::Connection { connection_id } => {
|
||||
tracing::warn!(
|
||||
thread_id = %drop_site.thread_id,
|
||||
turn_id = ?drop_site.turn_id,
|
||||
review_id = ?drop_site.review_id,
|
||||
item_id = ?drop_site.item_id,
|
||||
connection_id,
|
||||
"dropping {} analytics event: missing connection metadata",
|
||||
drop_site.event_name
|
||||
);
|
||||
}
|
||||
MissingAnalyticsContext::ThreadMetadata => {
|
||||
tracing::warn!(
|
||||
thread_id = %drop_site.thread_id,
|
||||
turn_id = ?drop_site.turn_id,
|
||||
review_id = ?drop_site.review_id,
|
||||
item_id = ?drop_site.item_id,
|
||||
"dropping {} analytics event: missing thread lifecycle metadata",
|
||||
drop_site.event_name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_turn_event_params(
|
||||
|
||||
Reference in New Issue
Block a user