use std::collections::HashSet; use std::future::Future; use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use crate::codex_message_processor::CodexMessageProcessor; use crate::codex_message_processor::CodexMessageProcessorArgs; use crate::config_api::ConfigApi; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::external_agent_config_api::ExternalAgentConfigApi; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::RequestContext; use crate::transport::AppServerTransport; use async_trait::async_trait; use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; use codex_app_server_protocol::ChatgptAuthTokensRefreshReason; use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConfigBatchWriteParams; use codex_app_server_protocol::ConfigReadParams; use codex_app_server_protocol::ConfigValueWriteParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::ExperimentalApi; use codex_app_server_protocol::ExternalAgentConfigDetectParams; use codex_app_server_protocol::ExternalAgentConfigImportParams; use codex_app_server_protocol::InitializeResponse; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::experimental_required_message; use codex_arg0::Arg0DispatchPaths; use codex_core::AnalyticsEventsClient; use codex_core::AuthManager; use codex_core::ThreadManager; use codex_core::auth::ExternalAuthRefreshContext; use codex_core::auth::ExternalAuthRefreshReason; use codex_core::auth::ExternalAuthRefresher; use codex_core::auth::ExternalAuthTokens; use codex_core::config::Config; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; use codex_core::default_client::SetOriginatorError; use codex_core::default_client::USER_AGENT_SUFFIX; use codex_core::default_client::get_codex_user_agent; use codex_core::default_client::set_default_client_residency_requirement; use codex_core::default_client::set_default_originator; use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; use codex_feedback::CodexFeedback; use codex_protocol::ThreadId; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::W3cTraceContext; use codex_state::log_db::LogDbLayer; use futures::FutureExt; use tokio::sync::broadcast; use tokio::sync::watch; use tokio::time::Duration; use tokio::time::timeout; use toml::Value as TomlValue; use tracing::Instrument; const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Clone)] struct ExternalAuthRefreshBridge { outgoing: Arc, } impl ExternalAuthRefreshBridge { fn map_reason(reason: ExternalAuthRefreshReason) -> ChatgptAuthTokensRefreshReason { match reason { ExternalAuthRefreshReason::Unauthorized => ChatgptAuthTokensRefreshReason::Unauthorized, } } } #[async_trait] impl ExternalAuthRefresher for ExternalAuthRefreshBridge { async fn refresh( &self, context: ExternalAuthRefreshContext, ) -> std::io::Result { let params = ChatgptAuthTokensRefreshParams { reason: Self::map_reason(context.reason), previous_account_id: context.previous_account_id, }; let (request_id, rx) = self .outgoing .send_request(ServerRequestPayload::ChatgptAuthTokensRefresh(params)) .await; let result = match timeout(EXTERNAL_AUTH_REFRESH_TIMEOUT, rx).await { Ok(result) => { // Two failure scenarios: // 1) `oneshot::Receiver` failed (sender dropped) => request canceled/channel closed. // 2) client answered with JSON-RPC error payload => propagate code/message. let result = result.map_err(|err| { std::io::Error::other(format!("auth refresh request canceled: {err}")) })?; result.map_err(|err| { std::io::Error::other(format!( "auth refresh request failed: code={} message={}", err.code, err.message )) })? } Err(_) => { let _canceled = self.outgoing.cancel_request(&request_id).await; return Err(std::io::Error::other(format!( "auth refresh request timed out after {}s", EXTERNAL_AUTH_REFRESH_TIMEOUT.as_secs() ))); } }; let response: ChatgptAuthTokensRefreshResponse = serde_json::from_value(result).map_err(std::io::Error::other)?; Ok(ExternalAuthTokens { access_token: response.access_token, chatgpt_account_id: response.chatgpt_account_id, chatgpt_plan_type: response.chatgpt_plan_type, }) } } pub(crate) struct MessageProcessor { outgoing: Arc, codex_message_processor: CodexMessageProcessor, config_api: ConfigApi, external_agent_config_api: ExternalAgentConfigApi, auth_manager: Arc, config: Arc, config_warnings: Arc>, } #[derive(Clone, Debug, Default)] pub(crate) struct ConnectionSessionState { pub(crate) initialized: bool, pub(crate) experimental_api_enabled: bool, pub(crate) opted_out_notification_methods: HashSet, pub(crate) app_server_client_name: Option, pub(crate) client_version: Option, } pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) cli_overrides: Vec<(String, TomlValue)>, pub(crate) loader_overrides: LoaderOverrides, pub(crate) cloud_requirements: CloudRequirementsLoader, pub(crate) auth_manager: Option>, pub(crate) thread_manager: Option>, pub(crate) feedback: CodexFeedback, pub(crate) log_db: Option, pub(crate) config_warnings: Vec, pub(crate) session_source: SessionSource, pub(crate) enable_codex_api_key_env: bool, } impl MessageProcessor { /// Create a new `MessageProcessor`, retaining a handle to the outgoing /// `Sender` so handlers can enqueue messages to be written to stdout. pub(crate) fn new(args: MessageProcessorArgs) -> Self { let MessageProcessorArgs { outgoing, arg0_paths, config, cli_overrides, loader_overrides, cloud_requirements, auth_manager, thread_manager, feedback, log_db, config_warnings, session_source, enable_codex_api_key_env, } = args; let (auth_manager, thread_manager) = match (auth_manager, thread_manager) { (Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager), (None, None) => { let auth_manager = AuthManager::shared( config.codex_home.clone(), enable_codex_api_key_env, config.cli_auth_credentials_store_mode, ); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), session_source, CollaborationModesConfig { default_mode_request_user_input: config .features .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), }, )); (auth_manager, thread_manager) } _ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"), }; auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge { outgoing: outgoing.clone(), })); let analytics_events_client = AnalyticsEventsClient::new(Arc::clone(&config), Arc::clone(&auth_manager)); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); // TODO(xl): Move into PluginManager once this no longer depends on config feature gating. thread_manager .plugins_manager() .maybe_start_curated_repo_sync_for_config(&config); let cloud_requirements = Arc::new(RwLock::new(cloud_requirements)); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager: auth_manager.clone(), thread_manager: Arc::clone(&thread_manager), outgoing: outgoing.clone(), arg0_paths, config: Arc::clone(&config), cli_overrides: cli_overrides.clone(), cloud_requirements: cloud_requirements.clone(), feedback, log_db, }); let config_api = ConfigApi::new( config.codex_home.clone(), cli_overrides, loader_overrides, cloud_requirements, thread_manager, analytics_events_client, ); let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone()); Self { outgoing, codex_message_processor, config_api, external_agent_config_api, auth_manager, config, config_warnings: Arc::new(config_warnings), } } pub(crate) fn clear_runtime_references(&self) { self.auth_manager.clear_external_auth_refresher(); } pub(crate) async fn process_request( &mut self, connection_id: ConnectionId, request: JSONRPCRequest, transport: AppServerTransport, session: &mut ConnectionSessionState, ) { let request_method = request.method.as_str(); tracing::trace!( ?connection_id, request_id = ?request.id, "app-server request: {request_method}" ); let request_id = ConnectionRequestId { connection_id, request_id: request.id.clone(), }; let request_span = crate::app_server_tracing::request_span(&request, transport, connection_id, session); let request_trace = request.trace.as_ref().map(|trace| W3cTraceContext { traceparent: trace.traceparent.clone(), tracestate: trace.tracestate.clone(), }); let request_context = RequestContext::new(request_id.clone(), request_span, request_trace); Self::run_request_with_context( Arc::clone(&self.outgoing), request_context.clone(), async { let request_json = match serde_json::to_value(&request) { Ok(request_json) => request_json, Err(err) => { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!("Invalid request: {err}"), data: None, }; self.outgoing.send_error(request_id.clone(), error).await; return; } }; let codex_request = match serde_json::from_value::(request_json) { Ok(codex_request) => codex_request, Err(err) => { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!("Invalid request: {err}"), data: None, }; self.outgoing.send_error(request_id.clone(), error).await; return; } }; // Websocket callers finalize outbound readiness in lib.rs after mirroring // session state into outbound state and sending initialize notifications to // this specific connection. Passing `None` avoids marking the connection // ready too early from inside the shared request handler. self.handle_client_request( request_id.clone(), codex_request, session, None, request_context.clone(), ) .await; }, ) .await; } /// Handles a typed request path used by in-process embedders. /// /// This bypasses JSON request deserialization but keeps identical request /// semantics by delegating to `handle_client_request`. pub(crate) async fn process_client_request( &mut self, connection_id: ConnectionId, request: ClientRequest, session: &mut ConnectionSessionState, outbound_initialized: &AtomicBool, ) { let request_id = ConnectionRequestId { connection_id, request_id: request.id().clone(), }; let request_span = crate::app_server_tracing::typed_request_span(&request, connection_id, session); let request_context = RequestContext::new(request_id.clone(), request_span, None); tracing::trace!( ?connection_id, request_id = ?request_id.request_id, "app-server typed request" ); Self::run_request_with_context( Arc::clone(&self.outgoing), request_context.clone(), async { // In-process clients do not have the websocket transport loop that performs // post-initialize bookkeeping, so they still finalize outbound readiness in // the shared request handler. self.handle_client_request( request_id.clone(), request, session, Some(outbound_initialized), request_context.clone(), ) .await; }, ) .await; } pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) { // Currently, we do not expect to receive any notifications from the // client, so we just log them. tracing::info!("<- notification: {:?}", notification); } /// Handles typed notifications from in-process clients. pub(crate) async fn process_client_notification(&self, notification: ClientNotification) { // Currently, we do not expect to receive any typed notifications from // in-process clients, so we just log them. tracing::info!("<- typed notification: {:?}", notification); } async fn run_request_with_context( outgoing: Arc, request_context: RequestContext, request_fut: F, ) where F: Future, { outgoing .register_request_context(request_context.clone()) .await; request_fut.instrument(request_context.span()).await; } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { self.codex_message_processor.thread_created_receiver() } pub(crate) async fn send_initialize_notifications_to_connection( &self, connection_id: ConnectionId, ) { for notification in self.config_warnings.iter().cloned() { self.outgoing .send_server_notification_to_connections( &[connection_id], ServerNotification::ConfigWarning(notification), ) .await; } } pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) { self.codex_message_processor .connection_initialized(connection_id) .await; } pub(crate) async fn send_initialize_notifications(&self) { for notification in self.config_warnings.iter().cloned() { self.outgoing .send_server_notification(ServerNotification::ConfigWarning(notification)) .await; } } pub(crate) async fn try_attach_thread_listener( &mut self, thread_id: ThreadId, connection_ids: Vec, ) { self.codex_message_processor .try_attach_thread_listener(thread_id, connection_ids) .await; } pub(crate) async fn drain_background_tasks(&self) { self.codex_message_processor.drain_background_tasks().await; } pub(crate) async fn shutdown_threads(&self) { self.codex_message_processor.shutdown_threads().await; } pub(crate) async fn connection_closed(&mut self, connection_id: ConnectionId) { self.outgoing.connection_closed(connection_id).await; self.codex_message_processor .connection_closed(connection_id) .await; } pub(crate) fn subscribe_running_assistant_turn_count(&self) -> watch::Receiver { self.codex_message_processor .subscribe_running_assistant_turn_count() } /// Handle a standalone JSON-RPC response originating from the peer. pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) { tracing::info!("<- response: {:?}", response); let JSONRPCResponse { id, result, .. } = response; self.outgoing.notify_client_response(id, result).await } /// Handle an error object received from the peer. pub(crate) async fn process_error(&mut self, err: JSONRPCError) { tracing::error!("<- error: {:?}", err); self.outgoing.notify_client_error(err.id, err.error).await; } async fn handle_client_request( &mut self, connection_request_id: ConnectionRequestId, codex_request: ClientRequest, session: &mut ConnectionSessionState, // `Some(...)` means the caller wants initialize to immediately mark the // connection outbound-ready. Websocket JSON-RPC calls pass `None` so // lib.rs can deliver connection-scoped initialize notifications first. outbound_initialized: Option<&AtomicBool>, request_context: RequestContext, ) { let connection_id = connection_request_id.connection_id; match codex_request { // Handle Initialize internally so CodexMessageProcessor does not have to concern // itself with the `initialized` bool. ClientRequest::Initialize { request_id, params } => { let connection_request_id = ConnectionRequestId { connection_id, request_id, }; if session.initialized { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "Already initialized".to_string(), data: None, }; self.outgoing.send_error(connection_request_id, error).await; return; } // TODO(maxj): Revisit capability scoping for `experimental_api_enabled`. // Current behavior is per-connection. Reviewer feedback notes this can // create odd cross-client behavior (for example dynamic tool calls on a // shared thread when another connected client did not opt into // experimental API). Proposed direction is instance-global first-write-wins // with initialize-time mismatch rejection. let (experimental_api_enabled, opt_out_notification_methods) = match params.capabilities { Some(capabilities) => ( capabilities.experimental_api, capabilities .opt_out_notification_methods .unwrap_or_default(), ), None => (false, Vec::new()), }; session.experimental_api_enabled = experimental_api_enabled; session.opted_out_notification_methods = opt_out_notification_methods.into_iter().collect(); let ClientInfo { name, title: _title, version, } = params.client_info; session.app_server_client_name = Some(name.clone()); session.client_version = Some(version.clone()); if let Err(error) = set_default_originator(name.clone()) { match error { SetOriginatorError::InvalidHeaderValue => { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!( "Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value." ), data: None, }; self.outgoing .send_error(connection_request_id.clone(), error) .await; return; } SetOriginatorError::AlreadyInitialized => { // No-op. This is expected to happen if the originator is already set via env var. // TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE, // this will be an unexpected state and we can return a JSON-RPC error indicating // internal server error. } } } set_default_client_residency_requirement(self.config.enforce_residency.value()); let user_agent_suffix = format!("{name}; {version}"); if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() { *suffix = Some(user_agent_suffix); } let user_agent = get_codex_user_agent(); let response = InitializeResponse { user_agent }; self.outgoing .send_response(connection_request_id, response) .await; session.initialized = true; if let Some(outbound_initialized) = outbound_initialized { // In-process clients can complete readiness immediately here. The // websocket path defers this until lib.rs finishes transport-layer // initialize handling for the specific connection. outbound_initialized.store(true, Ordering::Release); self.codex_message_processor .connection_initialized(connection_id) .await; } return; } _ => { if !session.initialized { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "Not initialized".to_string(), data: None, }; self.outgoing.send_error(connection_request_id, error).await; return; } } } if let Some(reason) = codex_request.experimental_reason() && !session.experimental_api_enabled { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: experimental_required_message(reason), data: None, }; self.outgoing.send_error(connection_request_id, error).await; return; } match codex_request { ClientRequest::ConfigRead { request_id, params } => { self.handle_config_read( ConnectionRequestId { connection_id, request_id, }, params, ) .await; } ClientRequest::ExternalAgentConfigDetect { request_id, params } => { self.handle_external_agent_config_detect( ConnectionRequestId { connection_id, request_id, }, params, ) .await; } ClientRequest::ExternalAgentConfigImport { request_id, params } => { self.handle_external_agent_config_import( ConnectionRequestId { connection_id, request_id, }, params, ) .await; } ClientRequest::ConfigValueWrite { request_id, params } => { self.handle_config_value_write( ConnectionRequestId { connection_id, request_id, }, params, ) .await; } ClientRequest::ConfigBatchWrite { request_id, params } => { self.handle_config_batch_write( ConnectionRequestId { connection_id, request_id, }, params, ) .await; } ClientRequest::ConfigRequirementsRead { request_id, params: _, } => { self.handle_config_requirements_read(ConnectionRequestId { connection_id, request_id, }) .await; } other => { // Box the delegated future so this wrapper's async state machine does not // inline the full `CodexMessageProcessor::process_request` future, which // can otherwise push worker-thread stack usage over the edge. self.codex_message_processor .process_request( connection_id, other, session.app_server_client_name.clone(), request_context, ) .boxed() .await; } } } async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) { match self.config_api.read(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, } } async fn handle_config_value_write( &self, request_id: ConnectionRequestId, params: ConfigValueWriteParams, ) { match self.config_api.write_value(params).await { Ok(response) => { self.codex_message_processor.clear_plugin_related_caches(); self.codex_message_processor .maybe_start_curated_repo_sync_for_latest_config() .await; self.outgoing.send_response(request_id, response).await; } Err(error) => self.outgoing.send_error(request_id, error).await, } } async fn handle_config_batch_write( &self, request_id: ConnectionRequestId, params: ConfigBatchWriteParams, ) { match self.config_api.batch_write(params).await { Ok(response) => { self.codex_message_processor.clear_plugin_related_caches(); self.codex_message_processor .maybe_start_curated_repo_sync_for_latest_config() .await; self.outgoing.send_response(request_id, response).await; } Err(error) => self.outgoing.send_error(request_id, error).await, } } async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) { match self.config_api.config_requirements_read().await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, } } async fn handle_external_agent_config_detect( &self, request_id: ConnectionRequestId, params: ExternalAgentConfigDetectParams, ) { match self.external_agent_config_api.detect(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, } } async fn handle_external_agent_config_import( &self, request_id: ConnectionRequestId, params: ExternalAgentConfigImportParams, ) { match self.external_agent_config_api.import(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, } } } #[cfg(test)] mod tracing_tests;