diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 73ea42d760..eb511483a4 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -353,7 +353,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-no-client", @@ -397,7 +397,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response( "thread-1", /*ephemeral*/ true, "gpt-5", diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index dd300a7bd6..e8ff85584a 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -16,6 +16,8 @@ use crate::facts::TrackEventsContext; use crate::reducer::AnalyticsReducer; use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; @@ -113,6 +115,18 @@ impl AnalyticsEventsClient { } } + pub fn new_disabled() -> Self { + let (sender, _receiver) = mpsc::channel(1); + Self { + queue: AnalyticsEventsQueue { + sender, + app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), + plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), + }, + analytics_enabled: Some(false), + } + } + pub fn track_skill_invocations( &self, tracking: TrackEventsContext, @@ -222,11 +236,24 @@ impl AnalyticsEventsClient { } pub fn track_response(&self, connection_id: u64, response: ClientResponse) { - self.record_fact(AnalyticsFact::Response { + self.record_fact(AnalyticsFact::ClientResponse { connection_id, response: Box::new(response), }); } + + pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) { + self.record_fact(AnalyticsFact::ServerRequest { + connection_id, + request: Box::new(request), + }); + } + + pub fn track_server_response(&self, response: ServerResponse) { + self.record_fact(AnalyticsFact::ServerResponse { + response: Box::new(response), + }); + } } async fn send_track_events( diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index e19d15d847..f1ede6a162 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -5,6 +5,8 @@ use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::protocol::SkillScope; use codex_protocol::protocol::SubAgentSource; @@ -72,15 +74,22 @@ pub(crate) enum AnalyticsFact { runtime: CodexRuntimeMetadata, rpc_transport: AppServerRpcTransport, }, - Request { + ClientRequest { connection_id: u64, request_id: RequestId, request: Box, }, - Response { + ClientResponse { connection_id: u64, response: Box, }, + ServerRequest { + connection_id: u64, + request: Box, + }, + ServerResponse { + response: Box, + }, Notification(Box), // Facts that do not naturally exist on the app-server protocol surface, or // would require non-trivial protocol reshaping on this branch. diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 63b9c3d5be..62bf19e99a 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -65,17 +65,24 @@ impl AnalyticsReducer { rpc_transport, ); } - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: _connection_id, request_id: _request_id, request: _request, } => {} - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id, response, } => { self.ingest_response(connection_id, *response, out); } + AnalyticsFact::ServerRequest { + connection_id: _connection_id, + request: _request, + } => {} + AnalyticsFact::ServerResponse { + response: _response, + } => {} AnalyticsFact::Notification(_notification) => {} AnalyticsFact::Custom(input) => match input { CustomAnalyticsFact::SubAgentThreadStarted(input) => { diff --git a/codex-rs/app-server/src/analytics_events.rs b/codex-rs/app-server/src/analytics_events.rs new file mode 100644 index 0000000000..24ed12d2ad --- /dev/null +++ b/codex-rs/app-server/src/analytics_events.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use codex_analytics::AnalyticsEventsClient; +use codex_core::config::Config; +use codex_login::AuthManager; + +pub(crate) fn analytics_events_client_from_config( + auth_manager: Arc, + config: &Config, +) -> AnalyticsEventsClient { + AnalyticsEventsClient::new( + auth_manager, + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ) +} diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 976518465b..d0e925c7d3 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -3166,7 +3166,7 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3235,7 +3235,7 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3327,7 +3327,7 @@ mod tests { let thread_state = new_thread_state(); let thread_watch_manager = ThreadWatchManager::new(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3722,7 +3722,7 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "complete1".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3785,7 +3785,7 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3832,7 +3832,7 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3873,7 +3873,7 @@ mod tests { #[tokio::test] async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3927,7 +3927,7 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-123".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4015,7 +4015,7 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-456".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4086,7 +4086,7 @@ mod tests { let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4339,7 +4339,7 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4377,7 +4377,7 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4403,7 +4403,7 @@ mod tests { #[tokio::test] async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let conversation_id = ThreadId::new(); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index dafaffca3c..a261b608ac 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -9638,7 +9638,7 @@ mod tests { let connection_id = ConnectionId(7); let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx)); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), vec![connection_id], diff --git a/codex-rs/app-server/src/command_exec.rs b/codex-rs/app-server/src/command_exec.rs index b72c84e906..7f76cda5c3 100644 --- a/codex-rs/app-server/src/command_exec.rs +++ b/codex-rs/app-server/src/command_exec.rs @@ -757,7 +757,7 @@ mod tests { let manager = CommandExecManager::default(); let err = manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)), request_id: ConnectionRequestId { connection_id: ConnectionId(1), request_id: codex_app_server_protocol::RequestId::Integer(42), @@ -793,7 +793,7 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)), request_id: request_id.clone(), process_id: Some("proc-99".to_string()), exec_request: windows_sandbox_exec_request(), @@ -843,7 +843,7 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)), request_id: request_id.clone(), process_id: Some("proc-100".to_string()), exec_request: ExecRequest::new( diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index 3a5b226248..41b2d97ce7 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -235,7 +235,7 @@ mod tests { const OUTGOING_BUFFER: usize = 1; let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER); FsWatchManager::new_with_file_watcher( - Arc::new(OutgoingMessageSender::new(tx)), + Arc::new(OutgoingMessageSender::new_for_tests(tx)), Arc::new(FileWatcher::noop()), ) } diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 0ae8690096..a865891940 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use crate::analytics_events::analytics_events_client_from_config; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::error_code::OVERLOADED_ERROR_CODE; @@ -357,7 +358,14 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let runtime_handle = tokio::spawn(async move { let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(channel_capacity); - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let auth_manager = + AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref()); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); let outbound_initialized = Arc::new(AtomicBool::new(false)); @@ -382,12 +390,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { }); let processor_outgoing = Arc::clone(&outgoing_message_sender); - let auth_manager = - AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env); let (processor_tx, mut processor_rx) = mpsc::channel::(channel_capacity); let mut processor_handle = tokio::spawn(async move { let mut processor = MessageProcessor::new(MessageProcessorArgs { outgoing: Arc::clone(&processor_outgoing), + analytics_events_client, arg0_paths: args.arg0_paths, config: args.config, environment_manager: args.environment_manager, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index a837d9c754..f49efb135e 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; +use crate::analytics_events::analytics_events_client_from_config; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; use crate::outgoing_message::ConnectionId; @@ -63,6 +64,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; use tracing_subscriber::util::SubscriberInitExt; +mod analytics_events; mod app_server_tracing; mod bespoke_event_handling; mod codex_message_processor; @@ -640,14 +642,20 @@ pub async fn run_main_with_transport( }); let processor_handle = tokio::spawn({ - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); let outbound_control_tx = outbound_control_tx; let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), &config); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone(); let loader_overrides = loader_overrides_for_config_api; let mut processor = MessageProcessor::new(MessageProcessorArgs { outgoing: outgoing_message_sender, + analytics_events_client, arg0_paths, config: Arc::new(config), environment_manager, diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fbc1bd6d10..2e21079530 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -185,6 +185,7 @@ pub(crate) struct ConnectionSessionState { pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) environment_manager: Arc, @@ -206,6 +207,7 @@ impl MessageProcessor { pub(crate) fn new(args: MessageProcessorArgs) -> Self { let MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths, config, environment_manager, @@ -234,11 +236,6 @@ impl MessageProcessor { }, environment_manager, )); - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index ef88364a3a..71ead804a6 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -1,6 +1,7 @@ use super::ConnectionSessionState; use super::MessageProcessor; use super::MessageProcessorArgs; +use crate::analytics_events::analytics_events_client_from_config; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingMessageSender; use crate::transport::AppServerTransport; @@ -234,11 +235,14 @@ fn build_test_processor( mpsc::Receiver, ) { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx)); let auth_manager = AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref()); let processor = MessageProcessor::new(MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths: Arg0DispatchPaths::default(), config, environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index d4e4bde063..3306af159c 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -4,12 +4,14 @@ use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerRequestPayload; +use codex_app_server_protocol::ServerResponse; use codex_otel::span_w3c_trace_context; use codex_protocol::ThreadId; use codex_protocol::protocol::W3cTraceContext; @@ -117,6 +119,7 @@ pub(crate) struct OutgoingMessageSender { /// We keep them here because this is where responses, errors, and /// disconnect cleanup all get handled. request_contexts: Mutex>, + analytics_events_client: AnalyticsEventsClient, } #[derive(Clone)] @@ -203,15 +206,24 @@ impl ThreadScopedOutgoingMessageSender { } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new( + sender: mpsc::Sender, + analytics_events_client: AnalyticsEventsClient, + ) -> Self { Self { next_server_request_id: AtomicI64::new(0), sender, request_id_to_callback: Mutex::new(HashMap::new()), request_contexts: Mutex::new(HashMap::new()), + analytics_events_client, } } + #[cfg(test)] + pub(crate) fn new_for_tests(sender: mpsc::Sender) -> Self { + Self::new(sender, AnalyticsEventsClient::new_disabled()) + } + pub(crate) async fn register_request_context(&self, request_context: RequestContext) { let mut request_contexts = self.request_contexts.lock().await; if request_contexts @@ -298,7 +310,7 @@ impl OutgoingMessageSender { ); } - let outgoing_message = OutgoingMessage::Request(request); + let outgoing_message = OutgoingMessage::Request(request.clone()); let send_result = match connection_ids { None => { self.sender @@ -321,6 +333,9 @@ impl OutgoingMessageSender { { send_error = Some(err); break; + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -364,6 +379,10 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { + if let Some(response) = server_response_from_result(&entry.request, result.clone()) + { + self.analytics_events_client.track_server_response(response); + } if let Err(err) = entry.callback.send(Ok(result)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } @@ -621,6 +640,14 @@ impl OutgoingMessageSender { } } +fn server_response_from_result(request: &ServerRequest, result: Result) -> Option { + let mut value = serde_json::to_value(request).ok()?; + let object = value.as_object_mut()?; + object.remove("params"); + object.insert("response".to_string(), result); + serde_json::from_value(value).ok() +} + /// Outgoing message from the server to the client. #[derive(Debug, Clone, Serialize)] #[serde(untagged)] @@ -654,6 +681,8 @@ mod tests { use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::AuthMode; + use codex_app_server_protocol::CommandExecutionApprovalDecision; + use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::FileChangeRequestApprovalParams; @@ -838,10 +867,53 @@ mod tests { ); } + #[test] + fn server_response_from_result_decodes_typed_response_with_original_method() { + let request = ServerRequest::CommandExecutionRequestApproval { + request_id: RequestId::Integer(7), + params: CommandExecutionRequestApprovalParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "item-1".to_string(), + approval_id: None, + reason: None, + network_approval_context: None, + command: Some("echo hi".to_string()), + cwd: None, + command_actions: None, + additional_permissions: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + available_decisions: None, + }, + }; + + let response = server_response_from_result( + &request, + json!({ + "decision": "acceptForSession", + }), + ) + .expect("decode typed server response"); + + let ServerResponse::CommandExecutionRequestApproval { + request_id, + response, + } = response + else { + panic!("expected command execution approval response"); + }; + assert_eq!(request_id, RequestId::Integer(7)); + assert_eq!( + response.decision, + CommandExecutionApprovalDecision::AcceptForSession + ); + } + #[tokio::test] async fn send_response_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -876,7 +948,7 @@ mod tests { #[tokio::test] async fn send_response_clears_registered_request_context() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -901,7 +973,7 @@ mod tests { #[tokio::test] async fn send_error_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let request_id = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -939,7 +1011,7 @@ mod tests { #[tokio::test] async fn send_server_notification_to_connection_and_wait_tracks_write_completion() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let send_task = tokio::spawn(async move { outgoing .send_server_notification_to_connection_and_wait( @@ -983,7 +1055,7 @@ mod tests { #[tokio::test] async fn connection_closed_clears_registered_request_contexts() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let closed_connection_request = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -1017,7 +1089,7 @@ mod tests { #[tokio::test] async fn notify_client_error_forwards_error_to_waiter() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = OutgoingMessageSender::new_for_tests(tx); let (request_id, wait_for_result) = outgoing .send_request(ServerRequestPayload::ApplyPatchApproval( @@ -1051,7 +1123,7 @@ mod tests { #[tokio::test] async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), @@ -1108,7 +1180,7 @@ mod tests { #[tokio::test] async fn cancel_requests_for_thread_cancels_all_thread_requests() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx)); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 802f7e197c..a562d53289 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -674,9 +674,9 @@ mod tests { #[tokio::test] async fn status_change_emits_notification() { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); - let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( - outgoing_tx, - ))); + let manager = ThreadWatchManager::new_with_outgoing(Arc::new( + OutgoingMessageSender::new_for_tests(outgoing_tx), + )); manager .upsert_thread(test_thread( @@ -716,9 +716,9 @@ mod tests { #[tokio::test] async fn silent_upsert_skips_initial_notification() { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); - let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( - outgoing_tx, - ))); + let manager = ThreadWatchManager::new_with_outgoing(Arc::new( + OutgoingMessageSender::new_for_tests(outgoing_tx), + )); manager .upsert_thread_silently(test_thread(