diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 73ea42d760..0fced56907 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -6,6 +6,7 @@ use crate::events::CodexAppUsedEventRequest; use crate::events::CodexPluginEventRequest; use crate::events::CodexPluginUsedEventRequest; use crate::events::CodexRuntimeMetadata; +use crate::events::CodexTurnEventRequest; use crate::events::ThreadInitializationMode; use crate::events::ThreadInitializedEvent; use crate::events::ThreadInitializedEventParams; @@ -27,28 +28,43 @@ use crate::facts::SkillInvocation; use crate::facts::SkillInvokedInput; use crate::facts::SubAgentThreadStartedInput; use crate::facts::TrackEventsContext; +use crate::facts::TurnResolvedConfigFact; +use crate::facts::TurnStatus; use crate::reducer::AnalyticsReducer; use crate::reducer::normalize_path_for_skill_id; use crate::reducer::skill_id_for_local_skill; use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer; use codex_app_server_protocol::AskForApproval as AppServerAskForApproval; use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy; +use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::SessionSource as AppServerSessionSource; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus; +use codex_app_server_protocol::Turn; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnError as AppServerTurnError; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::TurnStatus as AppServerTurnStatus; +use codex_app_server_protocol::UserInput; use codex_login::default_client::DEFAULT_ORIGINATOR; use codex_login::default_client::originator; use codex_plugin::AppConnectorId; use codex_plugin::PluginCapabilitySummary; use codex_plugin::PluginId; use codex_plugin::PluginTelemetryMetadata; +use codex_protocol::config_types::ApprovalsReviewer; +use codex_protocol::config_types::ModeKind; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SubAgentSource; use pretty_assertions::assert_eq; use serde_json::json; @@ -114,6 +130,179 @@ fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) } } +fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest { + ClientRequest::TurnStart { + request_id: RequestId::Integer(request_id), + params: TurnStartParams { + thread_id: thread_id.to_string(), + input: vec![ + UserInput::Text { + text: "hello".to_string(), + text_elements: vec![], + }, + UserInput::Image { + url: "https://example.com/a.png".to_string(), + }, + ], + ..Default::default() + }, + } +} + +fn sample_turn_start_response(turn_id: &str, request_id: i64) -> ClientResponse { + ClientResponse::TurnStart { + request_id: RequestId::Integer(request_id), + response: codex_app_server_protocol::TurnStartResponse { + turn: Turn { + id: turn_id.to_string(), + items: vec![], + status: AppServerTurnStatus::InProgress, + error: None, + started_at: None, + completed_at: None, + duration_ms: None, + }, + }, + } +} + +fn sample_turn_started_notification(thread_id: &str, turn_id: &str) -> ServerNotification { + ServerNotification::TurnStarted(TurnStartedNotification { + thread_id: thread_id.to_string(), + turn: Turn { + id: turn_id.to_string(), + items: vec![], + status: AppServerTurnStatus::InProgress, + error: None, + started_at: Some(455), + completed_at: None, + duration_ms: None, + }, + }) +} + +fn sample_turn_completed_notification( + thread_id: &str, + turn_id: &str, + status: AppServerTurnStatus, + codex_error_info: Option, +) -> ServerNotification { + ServerNotification::TurnCompleted(TurnCompletedNotification { + thread_id: thread_id.to_string(), + turn: Turn { + id: turn_id.to_string(), + items: vec![], + status, + error: codex_error_info.map(|codex_error_info| AppServerTurnError { + message: "turn failed".to_string(), + codex_error_info: Some(codex_error_info), + additional_details: None, + }), + started_at: None, + completed_at: Some(456), + duration_ms: Some(1234), + }, + }) +} + +fn sample_turn_resolved_config(turn_id: &str) -> TurnResolvedConfigFact { + TurnResolvedConfigFact { + turn_id: turn_id.to_string(), + thread_id: "thread-2".to_string(), + num_input_images: 1, + submission_type: None, + model: "gpt-5".to_string(), + model_provider: "openai".to_string(), + sandbox_policy: SandboxPolicy::new_read_only_policy(), + reasoning_effort: None, + reasoning_summary: None, + service_tier: None, + approval_policy: AskForApproval::OnRequest, + approvals_reviewer: ApprovalsReviewer::GuardianSubagent, + sandbox_network_access: true, + collaboration_mode: ModeKind::Plan, + personality: None, + is_first_turn: true, + } +} + +async fn ingest_turn_prerequisites( + reducer: &mut AnalyticsReducer, + out: &mut Vec, + include_initialize: bool, + include_resolved_config: bool, + include_started: bool, +) { + if include_initialize { + reducer + .ingest( + AnalyticsFact::Initialize { + connection_id: 7, + params: InitializeParams { + client_info: ClientInfo { + name: "codex-tui".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: None, + }, + product_client_id: "codex-tui".to_string(), + runtime: CodexRuntimeMetadata { + codex_rs_version: "0.1.0".to_string(), + runtime_os: "macos".to_string(), + runtime_os_version: "15.3.1".to_string(), + runtime_arch: "aarch64".to_string(), + }, + rpc_transport: AppServerRpcTransport::Stdio, + }, + out, + ) + .await; + } + + reducer + .ingest( + AnalyticsFact::Request { + connection_id: 7, + request_id: RequestId::Integer(3), + request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), + }, + out, + ) + .await; + reducer + .ingest( + AnalyticsFact::Response { + connection_id: 7, + response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), + }, + out, + ) + .await; + + if include_resolved_config { + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new( + sample_turn_resolved_config("turn-2"), + ))), + out, + ) + .await; + } + + if include_started { + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_started_notification( + "thread-2", "turn-2", + ))), + out, + ) + .await; + } +} + fn expected_absolute_path(path: &PathBuf) -> String { std::fs::canonicalize(path) .unwrap_or_else(|_| path.to_path_buf()) @@ -823,6 +1012,215 @@ async fn reducer_ingests_plugin_state_changed_fact() { ); } +#[test] +fn turn_event_serializes_expected_shape() { + let event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest { + event_type: "codex_turn_event", + event_params: crate::events::CodexTurnEventParams { + thread_id: "thread-2".to_string(), + turn_id: "turn-2".to_string(), + product_client_id: "codex-tui".to_string(), + submission_type: None, + model: Some("gpt-5".to_string()), + model_provider: "openai".to_string(), + sandbox_policy: Some("read_only"), + reasoning_effort: Some("high".to_string()), + reasoning_summary: Some("detailed".to_string()), + service_tier: "flex".to_string(), + approval_policy: "on-request".to_string(), + approvals_reviewer: "guardian_subagent".to_string(), + sandbox_network_access: true, + collaboration_mode: Some("plan"), + personality: Some("pragmatic".to_string()), + num_input_images: 2, + is_first_turn: true, + status: Some(TurnStatus::Completed), + turn_error: None, + steer_count: None, + total_tool_call_count: None, + shell_command_count: None, + file_change_count: None, + mcp_tool_call_count: None, + dynamic_tool_call_count: None, + subagent_tool_call_count: None, + web_search_count: None, + image_generation_count: None, + duration_ms: Some(1234), + started_at: Some(455), + completed_at: Some(456), + }, + })); + + let payload = serde_json::to_value(&event).expect("serialize turn event"); + + assert_eq!( + payload, + json!({ + "event_type": "codex_turn_event", + "event_params": { + "thread_id": "thread-2", + "turn_id": "turn-2", + "product_client_id": "codex-tui", + "submission_type": null, + "model": "gpt-5", + "model_provider": "openai", + "sandbox_policy": "read_only", + "reasoning_effort": "high", + "reasoning_summary": "detailed", + "service_tier": "flex", + "approval_policy": "on-request", + "approvals_reviewer": "guardian_subagent", + "sandbox_network_access": true, + "collaboration_mode": "plan", + "personality": "pragmatic", + "num_input_images": 2, + "is_first_turn": true, + "status": "completed", + "turn_error": null, + "steer_count": null, + "total_tool_call_count": null, + "shell_command_count": null, + "file_change_count": null, + "mcp_tool_call_count": null, + "dynamic_tool_call_count": null, + "subagent_tool_call_count": null, + "web_search_count": null, + "image_generation_count": null, + "duration_ms": 1234, + "created_at": 455, + "completed_at": 456 + } + }) + ); +} + +#[tokio::test] +async fn turn_lifecycle_emits_turn_event() { + let mut reducer = AnalyticsReducer::default(); + let mut out = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut out, + /*include_initialize*/ true, + /*include_resolved_config*/ true, + /*include_started*/ true, + ) + .await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut out, + ) + .await; + + assert_eq!(out.len(), 1); + let payload = serde_json::to_value(&out[0]).expect("serialize turn event"); + assert_eq!(payload["event_type"], json!("codex_turn_event")); + assert_eq!(payload["event_params"]["thread_id"], json!("thread-2")); + assert_eq!(payload["event_params"]["turn_id"], json!("turn-2")); + assert_eq!( + payload["event_params"]["product_client_id"], + json!("codex-tui") + ); + assert_eq!(payload["event_params"]["num_input_images"], json!(1)); + assert_eq!(payload["event_params"]["status"], json!("completed")); + assert_eq!(payload["event_params"]["started_at"], json!(455)); + assert_eq!(payload["event_params"]["completed_at"], json!(456)); + assert_eq!(payload["event_params"]["duration_ms"], json!(1234)); +} + +#[tokio::test] +async fn turn_does_not_emit_without_required_prerequisites() { + let mut reducer = AnalyticsReducer::default(); + let mut out = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut out, + /*include_initialize*/ false, + /*include_resolved_config*/ true, + /*include_started*/ false, + ) + .await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut out, + ) + .await; + assert_eq!(out.len(), 1); + let payload = serde_json::to_value(&out[0]).expect("serialize turn event"); + assert_eq!( + payload["event_params"]["product_client_id"], + json!(originator().value) + ); + + let mut reducer = AnalyticsReducer::default(); + let mut out = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut out, + /*include_initialize*/ true, + /*include_resolved_config*/ false, + /*include_started*/ false, + ) + .await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut out, + ) + .await; + assert!(out.is_empty()); +} + +#[tokio::test] +async fn turn_completed_without_started_notification_emits_null_started_at() { + let mut reducer = AnalyticsReducer::default(); + let mut out = Vec::new(); + + ingest_turn_prerequisites( + &mut reducer, + &mut out, + /*include_initialize*/ true, + /*include_resolved_config*/ true, + /*include_started*/ false, + ) + .await; + reducer + .ingest( + AnalyticsFact::Notification(Box::new(sample_turn_completed_notification( + "thread-2", + "turn-2", + AppServerTurnStatus::Completed, + /*codex_error_info*/ None, + ))), + &mut out, + ) + .await; + + let payload = serde_json::to_value(&out[0]).expect("serialize turn event"); + assert_eq!(payload["event_params"]["started_at"], json!(null)); + assert_eq!(payload["event_params"]["duration_ms"], json!(1234)); +} + fn sample_plugin_metadata() -> PluginTelemetryMetadata { PluginTelemetryMetadata { plugin_id: PluginId::parse("sample@test").expect("valid plugin id"), diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index dd300a7bd6..e61affe76c 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -13,9 +13,13 @@ use crate::facts::SkillInvocation; use crate::facts::SkillInvokedInput; use crate::facts::SubAgentThreadStartedInput; use crate::facts::TrackEventsContext; +use crate::facts::TurnResolvedConfigFact; use crate::reducer::AnalyticsReducer; +use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; @@ -160,6 +164,14 @@ impl AnalyticsEventsClient { ))); } + pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) { + self.record_fact(AnalyticsFact::Request { + connection_id, + request_id, + request: Box::new(request), + }); + } + pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { if !self.queue.should_enqueue_app_used(&tracking, &app) { return; @@ -178,6 +190,12 @@ impl AnalyticsEventsClient { ))); } + pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) { + self.record_fact(AnalyticsFact::Custom( + CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)), + )); + } + pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) { self.record_fact(AnalyticsFact::Custom( CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { @@ -227,6 +245,10 @@ impl AnalyticsEventsClient { response: Box::new(response), }); } + + pub fn track_notification(&self, notification: ServerNotification) { + self.record_fact(AnalyticsFact::Notification(Box::new(notification))); + } } async fn send_track_events( diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 885e93bbb9..1947fd506d 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -3,6 +3,9 @@ use crate::facts::InvocationType; use crate::facts::PluginState; use crate::facts::SubAgentThreadStartedInput; use crate::facts::TrackEventsContext; +use crate::facts::TurnStatus; +use crate::facts::TurnSubmissionType; +use codex_app_server_protocol::CodexErrorInfo; use codex_login::default_client::originator; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::protocol::SessionSource; @@ -37,6 +40,7 @@ pub(crate) enum TrackEventRequest { ThreadInitialized(ThreadInitializedEvent), AppMentioned(CodexAppMentionedEventRequest), AppUsed(CodexAppUsedEventRequest), + TurnEvent(Box), PluginUsed(CodexPluginUsedEventRequest), PluginInstalled(CodexPluginEventRequest), PluginUninstalled(CodexPluginEventRequest), @@ -122,6 +126,47 @@ pub(crate) struct CodexAppUsedEventRequest { pub(crate) event_params: CodexAppMetadata, } +#[derive(Serialize)] +pub(crate) struct CodexTurnEventParams { + pub(crate) thread_id: String, + pub(crate) turn_id: String, + pub(crate) product_client_id: String, + pub(crate) submission_type: Option, + pub(crate) model: Option, + pub(crate) model_provider: String, + pub(crate) sandbox_policy: Option<&'static str>, + pub(crate) reasoning_effort: Option, + pub(crate) reasoning_summary: Option, + pub(crate) service_tier: String, + pub(crate) approval_policy: String, + pub(crate) approvals_reviewer: String, + pub(crate) sandbox_network_access: bool, + pub(crate) collaboration_mode: Option<&'static str>, + pub(crate) personality: Option, + pub(crate) num_input_images: usize, + pub(crate) is_first_turn: bool, + pub(crate) status: Option, + pub(crate) turn_error: Option, + pub(crate) steer_count: Option, + pub(crate) total_tool_call_count: Option, + pub(crate) shell_command_count: Option, + pub(crate) file_change_count: Option, + pub(crate) mcp_tool_call_count: Option, + pub(crate) dynamic_tool_call_count: Option, + pub(crate) subagent_tool_call_count: Option, + pub(crate) web_search_count: Option, + pub(crate) image_generation_count: Option, + pub(crate) duration_ms: Option, + pub(crate) started_at: Option, + pub(crate) completed_at: Option, +} + +#[derive(Serialize)] +pub(crate) struct CodexTurnEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexTurnEventParams, +} + #[derive(Serialize)] pub(crate) struct CodexPluginMetadata { pub(crate) plugin_id: Option, diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index e19d15d847..fdc45cb9b3 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -6,6 +6,14 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_plugin::PluginTelemetryMetadata; +use codex_protocol::config_types::ApprovalsReviewer; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Personality; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::config_types::ServiceTier; +use codex_protocol::openai_models::ReasoningEffort; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SkillScope; use codex_protocol::protocol::SubAgentSource; use serde::Serialize; @@ -30,6 +38,41 @@ pub fn build_track_events_context( } } +#[derive(Clone)] +pub struct TurnResolvedConfigFact { + pub turn_id: String, + pub thread_id: String, + pub num_input_images: usize, + pub submission_type: Option, + pub model: String, + pub model_provider: String, + pub sandbox_policy: SandboxPolicy, + pub reasoning_effort: Option, + pub reasoning_summary: Option, + pub service_tier: Option, + pub approval_policy: AskForApproval, + pub approvals_reviewer: ApprovalsReviewer, + pub sandbox_network_access: bool, + pub collaboration_mode: ModeKind, + pub personality: Option, + pub is_first_turn: bool, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TurnSubmissionType { + Default, + Queued, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TurnStatus { + Completed, + Failed, + Interrupted, +} + #[derive(Clone, Debug)] pub struct SkillInvocation { pub skill_name: String, @@ -89,6 +132,7 @@ pub(crate) enum AnalyticsFact { pub(crate) enum CustomAnalyticsFact { SubAgentThreadStarted(SubAgentThreadStartedInput), + TurnResolvedConfig(Box), SkillInvoked(SkillInvokedInput), AppMentioned(AppMentionedInput), AppUsed(AppUsedInput), diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index f2f76ca8cf..d3193c6fd1 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -10,6 +10,9 @@ pub use facts::InvocationType; pub use facts::SkillInvocation; pub use facts::SubAgentThreadStartedInput; pub use facts::TrackEventsContext; +pub use facts::TurnResolvedConfigFact; +pub use facts::TurnStatus; +pub use facts::TurnSubmissionType; pub use facts::build_track_events_context; #[cfg(test)] diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 63b9c3d5be..80e6786af5 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -5,6 +5,8 @@ use crate::events::CodexAppUsedEventRequest; use crate::events::CodexPluginEventRequest; use crate::events::CodexPluginUsedEventRequest; use crate::events::CodexRuntimeMetadata; +use crate::events::CodexTurnEventParams; +use crate::events::CodexTurnEventRequest; use crate::events::SkillInvocationEventParams; use crate::events::SkillInvocationEventRequest; use crate::events::ThreadInitializationMode; @@ -26,11 +28,22 @@ use crate::facts::PluginStateChangedInput; use crate::facts::PluginUsedInput; use crate::facts::SkillInvokedInput; use crate::facts::SubAgentThreadStartedInput; +use crate::facts::TurnResolvedConfigFact; +use crate::facts::TurnStatus; +use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ClientResponse; +use codex_app_server_protocol::CodexErrorInfo; use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::UserInput; use codex_git_utils::collect_git_info; use codex_git_utils::get_git_repo_root; use codex_login::default_client::originator; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Personality; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SkillScope; use sha1::Digest; @@ -39,6 +52,8 @@ use std::path::Path; #[derive(Default)] pub(crate) struct AnalyticsReducer { + requests: HashMap<(u64, RequestId), RequestState>, + turns: HashMap, connections: HashMap, } @@ -47,6 +62,32 @@ struct ConnectionState { runtime: CodexRuntimeMetadata, } +enum RequestState { + TurnStart(PendingTurnStartState), +} + +struct PendingTurnStartState { + thread_id: String, + num_input_images: usize, +} + +#[derive(Clone)] +struct CompletedTurnState { + status: Option, + turn_error: Option, + completed_at: u64, + duration_ms: Option, +} + +struct TurnState { + connection_id: Option, + thread_id: Option, + num_input_images: Option, + resolved_config: Option, + started_at: Option, + completed: Option, +} + impl AnalyticsReducer { pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec) { match input { @@ -66,21 +107,28 @@ impl AnalyticsReducer { ); } AnalyticsFact::Request { - connection_id: _connection_id, - request_id: _request_id, - request: _request, - } => {} + connection_id, + request_id, + request, + } => { + self.ingest_request(connection_id, request_id, *request); + } AnalyticsFact::Response { connection_id, response, } => { self.ingest_response(connection_id, *response, out); } - AnalyticsFact::Notification(_notification) => {} + AnalyticsFact::Notification(notification) => { + self.ingest_notification(*notification, out); + } AnalyticsFact::Custom(input) => match input { CustomAnalyticsFact::SubAgentThreadStarted(input) => { self.ingest_subagent_thread_started(input, out); } + CustomAnalyticsFact::TurnResolvedConfig(input) => { + self.ingest_turn_resolved_config(*input, out); + } CustomAnalyticsFact::SkillInvoked(input) => { self.ingest_skill_invoked(input, out).await; } @@ -135,6 +183,52 @@ impl AnalyticsReducer { )); } + fn ingest_request( + &mut self, + connection_id: u64, + request_id: RequestId, + request: ClientRequest, + ) { + let ClientRequest::TurnStart { params, .. } = request else { + return; + }; + self.requests.insert( + (connection_id, request_id), + RequestState::TurnStart(PendingTurnStartState { + thread_id: params.thread_id, + num_input_images: params + .input + .iter() + .filter(|item| { + matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }) + }) + .count(), + }), + ); + } + + fn ingest_turn_resolved_config( + &mut self, + input: TurnResolvedConfigFact, + out: &mut Vec, + ) { + let turn_id = input.turn_id.clone(); + let thread_id = input.thread_id.clone(); + let num_input_images = input.num_input_images; + let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState { + connection_id: None, + thread_id: None, + num_input_images: None, + resolved_config: None, + started_at: None, + completed: None, + }); + turn_state.thread_id = Some(thread_id); + turn_state.num_input_images = Some(num_input_images); + turn_state.resolved_config = Some(input); + self.maybe_emit_turn_event(&turn_id, out); + } + async fn ingest_skill_invoked( &mut self, input: SkillInvokedInput, @@ -235,24 +329,124 @@ impl AnalyticsReducer { response: ClientResponse, out: &mut Vec, ) { - let (thread, model, initialization_mode) = match response { - ClientResponse::ThreadStart { response, .. } => ( - response.thread, - response.model, - ThreadInitializationMode::New, - ), - ClientResponse::ThreadResume { response, .. } => ( - response.thread, - response.model, - ThreadInitializationMode::Resumed, - ), - ClientResponse::ThreadFork { response, .. } => ( - response.thread, - response.model, - ThreadInitializationMode::Forked, - ), - _ => return, - }; + match response { + ClientResponse::ThreadStart { response, .. } => { + self.emit_thread_initialized( + connection_id, + response.thread, + response.model, + ThreadInitializationMode::New, + out, + ); + } + ClientResponse::ThreadResume { response, .. } => { + self.emit_thread_initialized( + connection_id, + response.thread, + response.model, + ThreadInitializationMode::Resumed, + out, + ); + } + ClientResponse::ThreadFork { response, .. } => { + self.emit_thread_initialized( + connection_id, + response.thread, + response.model, + ThreadInitializationMode::Forked, + out, + ); + } + ClientResponse::TurnStart { + request_id, + response, + } => { + let turn_id = response.turn.id; + let Some(RequestState::TurnStart(pending_request)) = + self.requests.remove(&(connection_id, request_id)) + else { + return; + }; + let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState { + connection_id: None, + thread_id: None, + num_input_images: None, + resolved_config: None, + started_at: None, + completed: None, + }); + turn_state.connection_id = Some(connection_id); + turn_state.thread_id = Some(pending_request.thread_id); + turn_state.num_input_images = Some(pending_request.num_input_images); + self.maybe_emit_turn_event(&turn_id, out); + } + _ => {} + } + } + + fn ingest_notification( + &mut self, + notification: ServerNotification, + out: &mut Vec, + ) { + match notification { + ServerNotification::TurnStarted(notification) => { + let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState { + connection_id: None, + thread_id: None, + num_input_images: None, + resolved_config: None, + started_at: None, + completed: None, + }); + turn_state.started_at = notification + .turn + .started_at + .and_then(|started_at| u64::try_from(started_at).ok()); + } + ServerNotification::TurnCompleted(notification) => { + let turn_state = + self.turns + .entry(notification.turn.id.clone()) + .or_insert(TurnState { + connection_id: None, + thread_id: None, + num_input_images: None, + resolved_config: None, + started_at: None, + completed: None, + }); + turn_state.completed = Some(CompletedTurnState { + status: analytics_turn_status(notification.turn.status), + turn_error: notification + .turn + .error + .and_then(|error| error.codex_error_info), + completed_at: notification + .turn + .completed_at + .and_then(|completed_at| u64::try_from(completed_at).ok()) + .unwrap_or_default(), + duration_ms: notification + .turn + .duration_ms + .and_then(|duration_ms| u64::try_from(duration_ms).ok()), + }); + let turn_id = notification.turn.id; + self.maybe_emit_turn_event(&turn_id, out); + } + _ => {} + } + } + + fn emit_thread_initialized( + &mut self, + connection_id: u64, + thread: codex_app_server_protocol::Thread, + model: String, + initialization_mode: ThreadInitializationMode, + out: &mut Vec, + ) { let thread_source: SessionSource = thread.source.into(); let Some(connection_state) = self.connections.get(&connection_id) else { return; @@ -275,6 +469,143 @@ impl AnalyticsReducer { }, )); } + + fn maybe_emit_turn_event(&mut self, turn_id: &str, out: &mut Vec) { + let Some(turn_state) = self.turns.get(turn_id) else { + return; + }; + if turn_state.thread_id.is_none() + || turn_state.num_input_images.is_none() + || turn_state.resolved_config.is_none() + || turn_state.completed.is_none() + { + return; + } + let product_client_id = turn_state + .connection_id + .and_then(|connection_id| self.connections.get(&connection_id)) + .map(|connection_state| connection_state.app_server_client.product_client_id.clone()) + .unwrap_or_else(|| originator().value); + out.push(TrackEventRequest::TurnEvent(Box::new( + CodexTurnEventRequest { + event_type: "codex_turn_event", + event_params: codex_turn_event_params( + product_client_id, + turn_id.to_string(), + turn_state, + ), + }, + ))); + self.turns.remove(turn_id); + } +} + +fn codex_turn_event_params( + product_client_id: String, + turn_id: String, + turn_state: &TurnState, +) -> CodexTurnEventParams { + let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = ( + turn_state.thread_id.clone(), + turn_state.num_input_images, + turn_state.resolved_config.clone(), + turn_state.completed.clone(), + ) else { + unreachable!("turn event params require a fully populated turn state"); + }; + let started_at = turn_state.started_at; + let TurnResolvedConfigFact { + turn_id: _resolved_turn_id, + thread_id: _resolved_thread_id, + num_input_images: _resolved_num_input_images, + submission_type, + model, + model_provider, + sandbox_policy, + reasoning_effort, + reasoning_summary, + service_tier, + approval_policy, + approvals_reviewer, + sandbox_network_access, + collaboration_mode, + personality, + is_first_turn, + } = resolved_config; + CodexTurnEventParams { + thread_id, + turn_id, + product_client_id, + submission_type, + model: Some(model), + model_provider, + sandbox_policy: Some(sandbox_policy_mode(&sandbox_policy)), + reasoning_effort: reasoning_effort.map(|value| value.to_string()), + reasoning_summary: reasoning_summary_mode(reasoning_summary), + service_tier: service_tier + .map(|value| value.to_string()) + .unwrap_or_else(|| "default".to_string()), + approval_policy: approval_policy.to_string(), + approvals_reviewer: approvals_reviewer.to_string(), + sandbox_network_access, + collaboration_mode: Some(collaboration_mode_mode(collaboration_mode)), + personality: personality_mode(personality), + num_input_images, + is_first_turn, + status: completed.status, + turn_error: completed.turn_error, + steer_count: None, + total_tool_call_count: None, + shell_command_count: None, + file_change_count: None, + mcp_tool_call_count: None, + dynamic_tool_call_count: None, + subagent_tool_call_count: None, + web_search_count: None, + image_generation_count: None, + duration_ms: completed.duration_ms, + started_at, + completed_at: Some(completed.completed_at), + } +} + +fn sandbox_policy_mode(sandbox_policy: &SandboxPolicy) -> &'static str { + match sandbox_policy { + SandboxPolicy::DangerFullAccess => "full_access", + SandboxPolicy::ReadOnly { .. } => "read_only", + SandboxPolicy::WorkspaceWrite { .. } => "workspace_write", + SandboxPolicy::ExternalSandbox { .. } => "external_sandbox", + } +} + +fn collaboration_mode_mode(mode: ModeKind) -> &'static str { + match mode { + ModeKind::Plan => "plan", + ModeKind::Default | ModeKind::PairProgramming | ModeKind::Execute => "default", + } +} + +fn reasoning_summary_mode(summary: Option) -> Option { + match summary { + Some(ReasoningSummary::None) | None => None, + Some(summary) => Some(summary.to_string()), + } +} + +fn personality_mode(personality: Option) -> Option { + match personality { + Some(Personality::None) | None => None, + Some(personality) => Some(personality.to_string()), + } +} + +fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Option { + match status { + codex_app_server_protocol::TurnStatus::Completed => Some(TurnStatus::Completed), + codex_app_server_protocol::TurnStatus::Failed => Some(TurnStatus::Failed), + codex_app_server_protocol::TurnStatus::Interrupted => Some(TurnStatus::Interrupted), + codex_app_server_protocol::TurnStatus::InProgress => None, + } } pub(crate) fn skill_id_for_local_skill( diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 4fd5bd4f95..f5d7443c15 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -12,6 +12,7 @@ use crate::thread_state::TurnSummary; use crate::thread_state::resolve_server_request_on_thread_listener; use crate::thread_status::ThreadWatchActiveGuard; use crate::thread_status::ThreadWatchManager; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile; use codex_app_server_protocol::AgentMessageDeltaNotification; @@ -167,6 +168,7 @@ pub(crate) async fn apply_bespoke_event_handling( conversation_id: ThreadId, conversation: Arc, thread_manager: Arc, + analytics_events_client: AnalyticsEventsClient, outgoing: ThreadScopedOutgoingMessageSender, thread_state: Arc>, thread_watch_manager: ThreadWatchManager, @@ -202,6 +204,8 @@ pub(crate) async fn apply_bespoke_event_handling( thread_id: conversation_id.to_string(), turn, }; + analytics_events_client + .track_notification(ServerNotification::TurnStarted(notification.clone())); outgoing .send_server_notification(ServerNotification::TurnStarted(notification)) .await; @@ -219,6 +223,7 @@ pub(crate) async fn apply_bespoke_event_handling( event_turn_id, turn_complete_event, terminal_turn, + Some(&analytics_events_client), &outgoing, &thread_state, ) @@ -1723,6 +1728,7 @@ pub(crate) async fn apply_bespoke_event_handling( event_turn_id, turn_aborted_event, terminal_turn, + Some(&analytics_events_client), &outgoing, &thread_state, ) @@ -1900,6 +1906,7 @@ async fn emit_turn_completed_with_status( conversation_id: ThreadId, event_turn_id: String, turn_completion_metadata: TurnCompletionMetadata, + analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, ) { let notification = TurnCompletedNotification { @@ -1914,6 +1921,10 @@ async fn emit_turn_completed_with_status( duration_ms: turn_completion_metadata.duration_ms, }, }; + if let Some(analytics_events_client) = analytics_events_client { + analytics_events_client + .track_notification(ServerNotification::TurnCompleted(notification.clone())); + } outgoing .send_server_notification(ServerNotification::TurnCompleted(notification)) .await; @@ -2107,6 +2118,7 @@ async fn handle_turn_complete( event_turn_id: String, turn_complete_event: TurnCompleteEvent, terminal_turn: Option, + analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { @@ -2127,6 +2139,7 @@ async fn handle_turn_complete( completed_at: turn_complete_event.completed_at, duration_ms: turn_complete_event.duration_ms, }, + analytics_events_client, outgoing, ) .await; @@ -2137,6 +2150,7 @@ async fn handle_turn_interrupted( event_turn_id: String, turn_aborted_event: TurnAbortedEvent, terminal_turn: Option, + analytics_events_client: Option<&AnalyticsEventsClient>, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, ) { @@ -2152,6 +2166,7 @@ async fn handle_turn_interrupted( completed_at: turn_aborted_event.completed_at, duration_ms: turn_aborted_event.duration_ms, }, + analytics_events_client, outgoing, ) .await; @@ -3743,6 +3758,7 @@ mod tests { event_turn_id.clone(), turn_complete_event(&event_turn_id), terminal_turn, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3792,6 +3808,7 @@ mod tests { event_turn_id.clone(), turn_aborted_event(&event_turn_id), /*terminal_turn*/ None, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -3840,6 +3857,7 @@ mod tests { event_turn_id.clone(), turn_complete_event(&event_turn_id), /*terminal_turn*/ None, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -4107,6 +4125,7 @@ mod tests { a_turn1.clone(), turn_complete_event(&a_turn1), /*terminal_turn*/ None, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -4129,6 +4148,7 @@ mod tests { b_turn1.clone(), turn_complete_event(&b_turn1), /*terminal_turn*/ None, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) @@ -4141,6 +4161,7 @@ mod tests { a_turn2.clone(), turn_complete_event(&a_turn2), /*terminal_turn*/ None, + /*analytics_events_client*/ None, &outgoing, &thread_state, ) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index f65d205ffc..e583c6253f 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -6617,6 +6617,15 @@ impl CodexMessageProcessor { }; let response = TurnStartResponse { turn }; + if self.config.features.enabled(Feature::GeneralAnalytics) { + self.analytics_events_client.track_response( + request_id.connection_id.0, + ClientResponse::TurnStart { + request_id: request_id.request_id.clone(), + response: response.clone(), + }, + ); + } self.outgoing.send_response(request_id, response).await; } Err(err) => { @@ -7394,6 +7403,7 @@ impl CodexMessageProcessor { conversation_id, conversation.clone(), thread_manager.clone(), + listener_task_context.analytics_events_client.clone(), thread_outgoing, thread_state.clone(), thread_watch_manager.clone(), diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 02b948bc1f..78cb5b994f 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -224,7 +224,12 @@ impl MessageProcessor { outgoing: outgoing.clone(), }), ); - let thread_manager = Arc::new(ThreadManager::new( + let analytics_events_client = AnalyticsEventsClient::new( + Arc::clone(&auth_manager), + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ); + let thread_manager = Arc::new(ThreadManager::new_with_analytics_events_client( config.as_ref(), auth_manager.clone(), session_source, @@ -234,13 +239,9 @@ impl MessageProcessor { .enabled(Feature::DefaultModeRequestUserInput), }, environment_manager, + Some(analytics_events_client.clone()), )); auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); @@ -677,6 +678,15 @@ impl MessageProcessor { self.outgoing.send_error(connection_request_id, error).await; return; } + if self.config.features.enabled(Feature::GeneralAnalytics) + && let ClientRequest::TurnStart { request_id, .. } = &codex_request + { + self.analytics_events_client.track_request( + connection_id.0, + request_id.clone(), + codex_request.clone(), + ); + } match codex_request { ClientRequest::ConfigRead { request_id, params } => { diff --git a/codex-rs/app-server/tests/common/config.rs b/codex-rs/app-server/tests/common/config.rs index deb16c6322..1ac2572fa2 100644 --- a/codex-rs/app-server/tests/common/config.rs +++ b/codex-rs/app-server/tests/common/config.rs @@ -78,3 +78,31 @@ model_provider = "{model_provider_id}" ), ) } + +pub fn write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home: &Path, + server_uri: &str, + chatgpt_base_url: &str, +) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +chatgpt_base_url = "{chatgpt_base_url}" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/common/lib.rs b/codex-rs/app-server/tests/common/lib.rs index 3f89765851..90553760d9 100644 --- a/codex-rs/app-server/tests/common/lib.rs +++ b/codex-rs/app-server/tests/common/lib.rs @@ -14,6 +14,7 @@ pub use auth_fixtures::encode_id_token; pub use auth_fixtures::write_chatgpt_auth; use codex_app_server_protocol::JSONRPCResponse; pub use config::write_mock_responses_config_toml; +pub use config::write_mock_responses_config_toml_with_chatgpt_base_url; pub use core_test_support::format_with_current_shell; pub use core_test_support::format_with_current_shell_display; pub use core_test_support::format_with_current_shell_display_non_login; diff --git a/codex-rs/app-server/tests/suite/v2/analytics.rs b/codex-rs/app-server/tests/suite/v2/analytics.rs index 8e8e328a84..bb2bdb8f0c 100644 --- a/codex-rs/app-server/tests/suite/v2/analytics.rs +++ b/codex-rs/app-server/tests/suite/v2/analytics.rs @@ -120,6 +120,41 @@ pub(crate) async fn wait_for_analytics_payload( serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}")) } +pub(crate) async fn wait_for_analytics_event( + server: &MockServer, + read_timeout: Duration, + event_type: &str, +) -> Result { + timeout(read_timeout, async { + loop { + let Some(requests) = server.received_requests().await else { + tokio::time::sleep(Duration::from_millis(25)).await; + continue; + }; + for request in &requests { + if request.method != "POST" + || request.url.path() != "/codex/analytics-events/events" + { + continue; + } + let payload: Value = serde_json::from_slice(&request.body) + .map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?; + let Some(events) = payload["events"].as_array() else { + continue; + }; + if let Some(event) = events + .iter() + .find(|event| event["event_type"] == event_type) + { + return Ok::(event.clone()); + } + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + }) + .await? +} + pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> { let events = payload["events"] .as_array() diff --git a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs index 8f944dca07..108166a4f2 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs @@ -3,8 +3,10 @@ use anyhow::Result; use app_test_support::McpProcess; use app_test_support::create_mock_responses_server_sequence; +use app_test_support::create_mock_responses_server_sequence_unchecked; use app_test_support::create_shell_command_sse_response; use app_test_support::to_response; +use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -22,6 +24,9 @@ use codex_app_server_protocol::UserInput as V2UserInput; use tempfile::TempDir; use tokio::time::timeout; +use super::analytics::enable_analytics_capture; +use super::analytics::wait_for_analytics_event; + const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] @@ -43,14 +48,20 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { std::fs::create_dir(&working_directory)?; // Mock server: long-running shell command then (after abort) nothing else needed. - let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response( - shell_command.clone(), - Some(&working_directory), - Some(10_000), - "call_sleep", - )?]) - .await; - create_config_toml(&codex_home, &server.uri(), "never")?; + let server = + create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response( + shell_command.clone(), + Some(&working_directory), + Some(10_000), + "call_sleep", + )?]) + .await; + write_mock_responses_config_toml_with_chatgpt_base_url( + &codex_home, + &server.uri(), + &server.uri(), + )?; + enable_analytics_capture(&server, &codex_home).await?; let mut mcp = McpProcess::new(&codex_home).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; @@ -87,6 +98,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { ) .await??; let TurnStartResponse { turn } = to_response::(turn_resp)?; + let turn_id = turn.id.clone(); // Give the command a brief moment to start. tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -96,7 +108,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { let interrupt_id = mcp .send_turn_interrupt_request(TurnInterruptParams { thread_id: thread_id.clone(), - turn_id: turn.id, + turn_id: turn_id.clone(), }) .await?; let interrupt_resp: JSONRPCResponse = timeout( @@ -119,6 +131,12 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { assert_eq!(completed.thread_id, thread_id); assert_eq!(completed.turn.status, TurnStatus::Interrupted); + let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?; + assert_eq!(event["event_params"]["thread_id"], thread_id); + assert_eq!(event["event_params"]["turn_id"], turn_id); + assert_eq!(event["event_params"]["status"], "interrupted"); + assert_eq!(event["event_params"]["turn_error"], serde_json::Value::Null); + Ok(()) } @@ -131,7 +149,11 @@ async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<() "Start-Sleep -Seconds 10".to_string(), ]; #[cfg(not(target_os = "windows"))] - let shell_command = vec!["sleep".to_string(), "10".to_string()]; + let shell_command = vec![ + "python3".to_string(), + "-c".to_string(), + "import time; time.sleep(10)".to_string(), + ]; let tmp = TempDir::new()?; let codex_home = tmp.path().join("codex_home"); diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index b99d1cb73e..95b662f840 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use app_test_support::DEFAULT_CLIENT_NAME; use app_test_support::McpProcess; use app_test_support::create_apply_patch_sse_response; use app_test_support::create_exec_command_sse_response; @@ -9,6 +10,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked; use app_test_support::create_shell_command_sse_response; use app_test_support::format_with_current_shell_display; use app_test_support::to_response; +use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE; use codex_app_server::INVALID_PARAMS_ERROR_CODE; use codex_app_server_protocol::ByteRange; @@ -64,6 +66,9 @@ use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; +use super::analytics::enable_analytics_capture; +use super::analytics::wait_for_analytics_event; + #[cfg(windows)] const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); #[cfg(not(windows))] @@ -238,6 +243,143 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_tracks_turn_event_analytics() -> Result<()> { + let responses = vec![create_final_assistant_message_sse_response("Done")?]; + let server = create_mock_responses_server_sequence_unchecked(responses).await; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Image { + url: "https://example.com/a.png".to_string(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?; + assert_eq!(event["event_params"]["thread_id"], thread.id); + assert_eq!(event["event_params"]["turn_id"], turn.id); + assert_eq!( + event["event_params"]["product_client_id"], + DEFAULT_CLIENT_NAME + ); + assert_eq!(event["event_params"]["model"], "mock-model"); + assert_eq!(event["event_params"]["model_provider"], "mock_provider"); + assert_eq!(event["event_params"]["sandbox_policy"], "read_only"); + assert_eq!(event["event_params"]["num_input_images"], 1); + assert_eq!(event["event_params"]["status"], "completed"); + assert!(event["event_params"]["started_at"].as_u64().is_some()); + assert!(event["event_params"]["completed_at"].as_u64().is_some()); + assert!(event["event_params"]["duration_ms"].as_u64().is_some()); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_tracks_failed_turn_event_analytics() -> Result<()> { + let server = create_mock_responses_server_sequence(vec![String::new()]).await; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "trigger failed turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let completed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.turn.status, TurnStatus::Failed); + assert!(completed.turn.error.is_some()); + + let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?; + assert_eq!(event["event_params"]["thread_id"], thread.id); + assert_eq!(event["event_params"]["turn_id"], turn.id); + assert_eq!(event["event_params"]["status"], "failed"); + assert_ne!(event["event_params"]["turn_error"], serde_json::Value::Null); + + Ok(()) +} + #[tokio::test] async fn turn_start_accepts_text_at_limit_with_mention_item() -> Result<()> { let responses = vec![create_final_assistant_message_sse_response("Done")?]; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 758b592047..1b316af971 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -51,6 +51,7 @@ use codex_analytics::AnalyticsEventsClient; use codex_analytics::AppInvocation; use codex_analytics::InvocationType; use codex_analytics::SubAgentThreadStartedInput; +use codex_analytics::TurnResolvedConfigFact; use codex_analytics::build_track_events_context; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; @@ -185,6 +186,7 @@ use crate::config::StartedNetworkProxy; use crate::config::resolve_web_search_mode_for_turn; use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; +use crate::context_manager::is_user_turn_boundary; use crate::environment_context::EnvironmentContext; use codex_config::CONFIG_TOML_FILE; use codex_config::types::McpServerConfig; @@ -430,6 +432,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) inherited_exec_policy: Option>, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, + pub(crate) analytics_events_client: Option, } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; @@ -484,6 +487,7 @@ impl Codex { user_shell_override, inherited_exec_policy, parent_trace: _, + analytics_events_client, } = args; let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); @@ -670,6 +674,7 @@ impl Codex { mcp_manager.clone(), skills_watcher, agent_control, + analytics_events_client, ) .await .map_err(|e| { @@ -809,6 +814,17 @@ pub(crate) fn session_loop_termination_from_handle( .shared() } +fn initial_history_has_prior_user_turns(conversation_history: &InitialHistory) -> bool { + conversation_history.scan_rollout_items(rollout_item_is_user_turn_boundary) +} + +fn rollout_item_is_user_turn_boundary(item: &RolloutItem) -> bool { + match item { + RolloutItem::ResponseItem(item) => is_user_turn_boundary(item), + _ => false, + } +} + /// Context for an initialized model agent /// /// A session has at most 1 running task at a time, and can be interrupted by user input. @@ -1517,6 +1533,7 @@ impl Session { mcp_manager: Arc, skills_watcher: Arc, agent_control: AgentControl, + analytics_events_client: Option, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -1920,11 +1937,13 @@ impl Session { ), shell_zsh_path: config.zsh_path.clone(), main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(), - analytics_events_client: AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ), + analytics_events_client: analytics_events_client.unwrap_or_else(|| { + AnalyticsEventsClient::new( + Arc::clone(&auth_manager), + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ) + }), hooks, rollout: Mutex::new(rollout_recorder), user_shell: Arc::new(default_shell), @@ -2238,6 +2257,11 @@ impl Session { SessionSource::SubAgent(_) ) }; + let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history); + { + let mut state = self.state.lock().await; + state.set_next_turn_is_first(!has_prior_user_turns); + } match conversation_history { InitialHistory::New => { // Defer initial context insertion until the first real turn starts so @@ -6005,6 +6029,8 @@ pub(crate) async fn run_turn( .await; } + track_turn_resolved_config_analytics(&sess, &turn_context, &input).await; + let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token()) .await; @@ -6291,6 +6317,42 @@ pub(crate) async fn run_turn( last_agent_message } +async fn track_turn_resolved_config_analytics( + sess: &Session, + turn_context: &TurnContext, + input: &[UserInput], +) { + let is_first_turn = { + let mut state = sess.state.lock().await; + state.take_next_turn_is_first() + }; + sess.services + .analytics_events_client + .track_turn_resolved_config(TurnResolvedConfigFact { + turn_id: turn_context.sub_id.clone(), + thread_id: sess.conversation_id.to_string(), + num_input_images: input + .iter() + .filter(|item| { + matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }) + }) + .count(), + submission_type: None, + model: turn_context.model_info.slug.clone(), + model_provider: turn_context.config.model_provider_id.clone(), + sandbox_policy: turn_context.sandbox_policy.get().clone(), + reasoning_effort: turn_context.reasoning_effort, + reasoning_summary: Some(turn_context.reasoning_summary), + service_tier: turn_context.config.service_tier, + approval_policy: turn_context.approval_policy.value(), + approvals_reviewer: turn_context.config.approvals_reviewer, + sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(), + collaboration_mode: turn_context.collaboration_mode.mode, + personality: turn_context.personality, + is_first_turn, + }); +} + async fn run_pre_sampling_compact( sess: &Arc, turn_context: &Arc, diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 93d3eba021..28ecd4e359 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -95,6 +95,7 @@ pub(crate) async fn run_codex_thread_interactive( user_shell_override: None, inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)), parent_trace: None, + analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), }) .await?; if parent_session.enabled(codex_features::Feature::GeneralAnalytics) { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 3a7ceb6f8c..a9190aea81 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2620,6 +2620,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { mcp_manager, Arc::new(SkillsWatcher::noop()), AgentControl::default(), + /*analytics_events_client*/ None, ) .await; diff --git a/codex-rs/core/src/codex_tests_guardian.rs b/codex-rs/core/src/codex_tests_guardian.rs index 4f60c2f28e..7738cbbe91 100644 --- a/codex-rs/core/src/codex_tests_guardian.rs +++ b/codex-rs/core/src/codex_tests_guardian.rs @@ -457,6 +457,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { inherited_exec_policy: Some(Arc::new(parent_exec_policy)), user_shell_override: None, parent_trace: None, + analytics_events_client: None, }) .await .expect("spawn guardian subagent"); diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 206f75060c..4360b16de4 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -33,6 +33,7 @@ pub(crate) struct SessionState { pub(crate) active_connector_selection: HashSet, pub(crate) pending_session_start_source: Option, granted_permissions: Option, + next_turn_is_first: bool, } impl SessionState { @@ -51,6 +52,7 @@ impl SessionState { active_connector_selection: HashSet::new(), pending_session_start_source: None, granted_permissions: None, + next_turn_is_first: true, } } @@ -73,6 +75,16 @@ impl SessionState { self.previous_turn_settings = previous_turn_settings; } + pub(crate) fn set_next_turn_is_first(&mut self, value: bool) { + self.next_turn_is_first = value; + } + + pub(crate) fn take_next_turn_is_first(&mut self) -> bool { + let is_first_turn = self.next_turn_is_first; + self.next_turn_is_first = false; + is_first_turn + } + pub(crate) fn clone_history(&self) -> ContextManager { self.history.clone() } diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 6d93a1a329..8941c14c58 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -15,6 +15,7 @@ use crate::shell_snapshot::ShellSnapshot; use crate::skills_watcher::SkillsWatcher; use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::interrupted_turn_history_marker; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; use codex_exec_server::EnvironmentManager; @@ -208,6 +209,7 @@ pub(crate) struct ThreadManagerState { mcp_manager: Arc, skills_watcher: Arc, session_source: SessionSource, + analytics_events_client: Option, // Captures submitted ops for testing purpose when test mode is enabled. ops_log: Option, } @@ -219,6 +221,24 @@ impl ThreadManager { session_source: SessionSource, collaboration_modes_config: CollaborationModesConfig, environment_manager: Arc, + ) -> Self { + Self::new_with_analytics_events_client( + config, + auth_manager, + session_source, + collaboration_modes_config, + environment_manager, + /*analytics_events_client*/ None, + ) + } + + pub fn new_with_analytics_events_client( + config: &Config, + auth_manager: Arc, + session_source: SessionSource, + collaboration_modes_config: CollaborationModesConfig, + environment_manager: Arc, + analytics_events_client: Option, ) -> Self { let codex_home = config.codex_home.clone(); let restriction_product = session_source.restriction_product(); @@ -257,6 +277,7 @@ impl ThreadManager { skills_watcher, auth_manager, session_source, + analytics_events_client, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -326,6 +347,7 @@ impl ThreadManager { skills_watcher, auth_manager, session_source: SessionSource::Exec, + analytics_events_client: None, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -867,6 +889,7 @@ impl ThreadManagerState { inherited_exec_policy, user_shell_override, parent_trace, + analytics_events_client: self.analytics_events_client.clone(), }) .await?; self.finalize_thread_spawn(codex, thread_id, watch_registration) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 7116878b47..961e76d79e 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2275,6 +2275,14 @@ pub enum InitialHistory { } impl InitialHistory { + pub fn scan_rollout_items(&self, mut predicate: impl FnMut(&RolloutItem) -> bool) -> bool { + match self { + InitialHistory::New => false, + InitialHistory::Resumed(resumed) => resumed.history.iter().any(&mut predicate), + InitialHistory::Forked(items) => items.iter().any(predicate), + } + } + pub fn forked_from_id(&self) -> Option { match self { InitialHistory::New => None,