From aee456ef18c23bd2227f9e42d04caf35b17b4512 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:08:48 +0000 Subject: [PATCH] Document app-server backpressure --- codex-rs/app-server/README.md | 6 + codex-rs/app-server/src/error_code.rs | 1 + codex-rs/app-server/src/lib.rs | 106 +++++++++++++++-- codex-rs/app-server/src/transport.rs | 158 +++++++++++++++++++++++--- 4 files changed, 248 insertions(+), 23 deletions(-) diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 66d4a501ec..c40693c601 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -28,6 +28,12 @@ Supported transports: Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads. +Backpressure behavior: + +- The server uses bounded queues between transport ingress, request processing, and outbound writes. +- When request ingress is saturated, new requests are rejected with a JSON-RPC error code `-32001` and message `"Server overloaded; retry later."`. +- Clients should treat this as retryable and use exponential backoff with jitter. + ## Message Schema Currently, you can dump a TypeScript version of the schema using `codex app-server generate-ts`, or a JSON Schema bundle via `codex app-server generate-json-schema`. Each output is specific to the version of Codex you used to run the command, so the generated artifacts are guaranteed to match that version. diff --git a/codex-rs/app-server/src/error_code.rs b/codex-rs/app-server/src/error_code.rs index 1ffd889d40..ca93b2f2d3 100644 --- a/codex-rs/app-server/src/error_code.rs +++ b/codex-rs/app-server/src/error_code.rs @@ -1,2 +1,3 @@ pub(crate) const INVALID_REQUEST_ERROR_CODE: i64 = -32600; pub(crate) const INTERNAL_ERROR_CODE: i64 = -32603; +pub(crate) const OVERLOADED_ERROR_CODE: i64 = -32001; diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ad049ad305..0de4ddbf8e 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -21,6 +21,7 @@ use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessageSender; use crate::transport::CHANNEL_CAPACITY; use crate::transport::ConnectionState; +use crate::transport::OutboundConnectionState; use crate::transport::TransportEvent; use crate::transport::has_initialized_connections; use crate::transport::route_outgoing_envelope; @@ -37,6 +38,7 @@ use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use toml::Value as TomlValue; use tracing::error; @@ -61,6 +63,29 @@ mod transport; pub use crate::transport::AppServerTransport; +/// Control-plane messages from the processor/transport side to the outbound router task. +/// +/// `run_main_with_transport` now uses two loops/tasks: +/// - processor loop: handles incoming JSON-RPC and request dispatch +/// - outbound loop: performs potentially slow writes to per-connection writers +/// +/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable +/// connection state directly. In particular, the outbound loop needs to know: +/// - when a connection opens/closes so it can route messages correctly +/// - when a connection becomes initialized so broadcast semantics remain unchanged +enum OutboundControlEvent { + /// Register a new writer for an opened connection. + Opened { + connection_id: ConnectionId, + writer: mpsc::Sender, + ready: oneshot::Sender<()>, + }, + /// Remove state for a closed/disconnected connection. + Closed { connection_id: ConnectionId }, + /// Mark the connection as initialized, enabling broadcast delivery. + Initialized { connection_id: ConnectionId }, +} + fn config_warning_from_error( summary: impl Into, err: &std::io::Error, @@ -197,6 +222,8 @@ pub async fn run_main_with_transport( let (transport_event_tx, mut transport_event_rx) = mpsc::channel::(CHANNEL_CAPACITY); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outbound_control_tx, mut outbound_control_rx) = + mpsc::channel::(CHANNEL_CAPACITY); let mut stdio_handles = Vec::>::new(); let mut websocket_accept_handle = None; @@ -336,8 +363,47 @@ pub async fn run_main_with_transport( } } + let outbound_handle = tokio::spawn(async move { + let mut outbound_connections = HashMap::::new(); + loop { + tokio::select! { + envelope = outgoing_rx.recv() => { + let Some(envelope) = envelope else { + break; + }; + route_outgoing_envelope(&mut outbound_connections, envelope).await; + } + event = outbound_control_rx.recv() => { + let Some(event) = event else { + break; + }; + match event { + OutboundControlEvent::Opened { + connection_id, + writer, + ready, + } => { + outbound_connections.insert(connection_id, OutboundConnectionState::new(writer)); + let _ = ready.send(()); + } + OutboundControlEvent::Closed { connection_id } => { + outbound_connections.remove(&connection_id); + } + OutboundControlEvent::Initialized { connection_id } => { + if let Some(connection_state) = outbound_connections.get_mut(&connection_id) { + connection_state.initialized = true; + } + } + } + } + } + } + info!("outbound router task exited (channel closed)"); + }); + let processor_handle = tokio::spawn({ let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outbound_control_tx = outbound_control_tx; let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone(); let loader_overrides = loader_overrides_for_config_api; let mut processor = MessageProcessor::new(MessageProcessorArgs { @@ -362,9 +428,31 @@ pub async fn run_main_with_transport( }; match event { TransportEvent::ConnectionOpened { connection_id, writer } => { - connections.insert(connection_id, ConnectionState::new(writer)); + let (ready_tx, ready_rx) = oneshot::channel(); + if outbound_control_tx + .send(OutboundControlEvent::Opened { + connection_id, + writer: writer.clone(), + ready: ready_tx, + }) + .await + .is_err() + { + break; + } + if ready_rx.await.is_err() { + break; + } + connections.insert(connection_id, ConnectionState::new()); } TransportEvent::ConnectionClosed { connection_id } => { + if outbound_control_tx + .send(OutboundControlEvent::Closed { connection_id }) + .await + .is_err() + { + break; + } connections.remove(&connection_id); if shutdown_when_no_connections && connections.is_empty() { break; @@ -377,6 +465,7 @@ pub async fn run_main_with_transport( warn!("dropping request from unknown connection: {:?}", connection_id); continue; }; + let was_initialized = connection_state.session.initialized; processor .process_request( connection_id, @@ -384,6 +473,14 @@ pub async fn run_main_with_transport( &mut connection_state.session, ) .await; + if !was_initialized && connection_state.session.initialized { + let send_result = outbound_control_tx + .send(OutboundControlEvent::Initialized { connection_id }) + .await; + if send_result.is_err() { + break; + } + } } JSONRPCMessage::Response(response) => { processor.process_response(response).await; @@ -398,12 +495,6 @@ pub async fn run_main_with_transport( } } } - envelope = outgoing_rx.recv() => { - let Some(envelope) = envelope else { - break; - }; - route_outgoing_envelope(&mut connections, envelope).await; - } created = thread_created_rx.recv(), if listen_for_threads => { match created { Ok(thread_id) => { @@ -433,6 +524,7 @@ pub async fn run_main_with_transport( drop(transport_event_tx); let _ = processor_handle.await; + let _ = outbound_handle.await; if let Some(handle) = websocket_accept_handle { handle.abort(); diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 39fd13212c..80a7c3424e 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -1,8 +1,12 @@ +use crate::error_code::OVERLOADED_ERROR_CODE; use crate::message_processor::ConnectionSessionState; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; +use crate::outgoing_message::OutgoingError; use crate::outgoing_message::OutgoingMessage; +use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCRequest; use futures::SinkExt; use futures::StreamExt; use owo_colors::OwoColorize; @@ -140,15 +144,27 @@ pub(crate) enum TransportEvent { } pub(crate) struct ConnectionState { - pub(crate) writer: mpsc::Sender, pub(crate) session: ConnectionSessionState, } impl ConnectionState { + pub(crate) fn new() -> Self { + Self { + session: ConnectionSessionState::default(), + } + } +} + +pub(crate) struct OutboundConnectionState { + pub(crate) writer: mpsc::Sender, + pub(crate) initialized: bool, +} + +impl OutboundConnectionState { pub(crate) fn new(writer: mpsc::Sender) -> Self { Self { writer, - session: ConnectionSessionState::default(), + initialized: false, } } } @@ -159,6 +175,7 @@ pub(crate) async fn start_stdio_connection( ) -> IoResult<()> { let connection_id = ConnectionId(0); let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let writer_tx_for_reader = writer_tx.clone(); transport_event_tx .send(TransportEvent::ConnectionOpened { connection_id, @@ -178,11 +195,10 @@ pub(crate) async fn start_stdio_connection( Ok(Some(line)) => { if !forward_incoming_message( &transport_event_tx_for_reader, + &writer_tx_for_reader, connection_id, &line, - ) - .await - { + ) { break; } } @@ -267,6 +283,7 @@ async fn run_websocket_connection( }; let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let writer_tx_for_reader = writer_tx.clone(); if transport_event_tx .send(TransportEvent::ConnectionOpened { connection_id, @@ -295,7 +312,12 @@ async fn run_websocket_connection( incoming_message = websocket_reader.next() => { match incoming_message { Some(Ok(WebSocketMessage::Text(text))) => { - if !forward_incoming_message(&transport_event_tx, connection_id, &text).await { + if !forward_incoming_message( + &transport_event_tx, + &writer_tx_for_reader, + connection_id, + &text, + ) { break; } } @@ -324,19 +346,14 @@ async fn run_websocket_connection( .await; } -async fn forward_incoming_message( +fn forward_incoming_message( transport_event_tx: &mpsc::Sender, + writer: &mpsc::Sender, connection_id: ConnectionId, payload: &str, ) -> bool { match serde_json::from_str::(payload) { - Ok(message) => transport_event_tx - .send(TransportEvent::IncomingMessage { - connection_id, - message, - }) - .await - .is_ok(), + Ok(message) => enqueue_incoming_message(transport_event_tx, writer, connection_id, message), Err(err) => { error!("Failed to deserialize JSONRPCMessage: {err}"); true @@ -344,6 +361,50 @@ async fn forward_incoming_message( } } +fn enqueue_incoming_message( + transport_event_tx: &mpsc::Sender, + writer: &mpsc::Sender, + connection_id: ConnectionId, + message: JSONRPCMessage, +) -> bool { + let event = TransportEvent::IncomingMessage { + connection_id, + message, + }; + match transport_event_tx.try_send(event) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Closed(_)) => false, + Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { + connection_id, + message: JSONRPCMessage::Request(request), + })) => { + if writer + .try_send(overloaded_error_for_request(request)) + .is_err() + { + warn!("failed to enqueue overload response for connection: {connection_id:?}"); + } + true + } + Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { .. })) => { + warn!("dropping non-request incoming message because processor queue is full"); + true + } + Err(mpsc::error::TrySendError::Full(_)) => true, + } +} + +fn overloaded_error_for_request(request: JSONRPCRequest) -> OutgoingMessage { + OutgoingMessage::Error(OutgoingError { + id: request.id, + error: JSONRPCErrorError { + code: OVERLOADED_ERROR_CODE, + message: "Server overloaded; retry later.".to_string(), + data: None, + }, + }) +} + fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option { let value = match serde_json::to_value(outgoing_message) { Ok(value) => value, @@ -362,7 +423,7 @@ fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option, + connections: &mut HashMap, envelope: OutgoingEnvelope, ) { match envelope { @@ -385,7 +446,7 @@ pub(crate) async fn route_outgoing_envelope( let target_connections: Vec = connections .iter() .filter_map(|(connection_id, connection_state)| { - if connection_state.session.initialized { + if connection_state.initialized { Some(*connection_id) } else { None @@ -416,7 +477,9 @@ pub(crate) fn has_initialized_connections( #[cfg(test)] mod tests { use super::*; + use crate::error_code::OVERLOADED_ERROR_CODE; use pretty_assertions::assert_eq; + use serde_json::json; #[test] fn app_server_transport_parses_stdio_listen_url() { @@ -456,4 +519,67 @@ mod tests { "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" ); } + + #[tokio::test] + async fn enqueue_incoming_request_returns_overload_error_when_queue_is_full() { + let connection_id = ConnectionId(42); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(1); + let (writer_tx, mut writer_rx) = mpsc::channel(1); + + let first_message = + JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: first_message.clone(), + }) + .await + .expect("queue should accept first message"); + + let request = JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest { + id: codex_app_server_protocol::RequestId::Integer(7), + method: "config/read".to_string(), + params: Some(json!({ "includeLayers": false })), + }); + assert!(enqueue_incoming_message( + &transport_event_tx, + &writer_tx, + connection_id, + request + )); + + let queued_event = transport_event_rx + .recv() + .await + .expect("first event should stay queued"); + match queued_event { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + message, + } => { + assert_eq!(queued_connection_id, connection_id); + assert_eq!(message, first_message); + } + _ => panic!("expected queued incoming message"), + } + + let overload = writer_rx + .recv() + .await + .expect("request should receive overload error"); + let overload_json = serde_json::to_value(overload).expect("serialize overload error"); + assert_eq!( + overload_json, + json!({ + "id": 7, + "error": { + "code": OVERLOADED_ERROR_CODE, + "message": "Server overloaded; retry later." + } + }) + ); + } }