diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs index 115e9808f0..1ea9f6fd87 100644 --- a/codex-rs/app-server-client/src/lib.rs +++ b/codex-rs/app-server-client/src/lib.rs @@ -85,17 +85,128 @@ impl From for AppServerEvent { } fn event_requires_delivery(event: &InProcessServerEvent) -> bool { - // These terminal events drive surface shutdown/completion state. Dropping - // them under backpressure can leave exec/TUI waiting forever even though - // the underlying turn has already ended. + // These transcript and terminal events must remain lossless. Dropping + // streamed assistant text or the authoritative completed item can leave + // the TUI with permanently corrupted markdown, while dropping completion + // notifications can leave surfaces waiting forever. + match event { + InProcessServerEvent::ServerNotification(notification) => { + server_notification_requires_delivery(notification) + } + _ => false, + } +} + +/// Returns `true` for notifications that must survive backpressure. +/// +/// Transcript events (`AgentMessageDelta`, `PlanDelta`, reasoning deltas) and +/// the authoritative `ItemCompleted` / `TurnCompleted` form the lossless tier +/// of the event stream. Dropping any of these corrupts the visible assistant +/// output or leaves surfaces waiting for a completion signal that already +/// fired. Everything else (`CommandExecutionOutputDelta`, progress, etc.) is +/// best-effort and may be dropped with only cosmetic impact. +/// +/// Both the in-process and remote transports delegate to this function so the +/// classification stays in sync. +pub(crate) fn server_notification_requires_delivery(notification: &ServerNotification) -> bool { matches!( - event, - InProcessServerEvent::ServerNotification( - codex_app_server_protocol::ServerNotification::TurnCompleted(_), - ) + notification, + ServerNotification::TurnCompleted(_) + | ServerNotification::ItemCompleted(_) + | ServerNotification::AgentMessageDelta(_) + | ServerNotification::PlanDelta(_) + | ServerNotification::ReasoningSummaryTextDelta(_) + | ServerNotification::ReasoningTextDelta(_) ) } +/// Outcome of attempting to forward a single event to the consumer channel. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ForwardEventResult { + /// The event was delivered (or intentionally dropped); the stream is healthy. + Continue, + /// The consumer channel is closed; the caller should stop producing events. + DisableStream, +} + +/// Forwards a single in-process event to the consumer, respecting the +/// lossless/best-effort split. +/// +/// Lossless events (transcript deltas, item/turn completions) block until the +/// consumer drains capacity. Best-effort events use `try_send` and increment +/// `skipped_events` on failure. When a lag marker needs to be flushed before a +/// lossless event, the flush itself blocks so the marker is never lost. +/// +/// If a dropped event is a `ServerRequest`, `reject_server_request` is called +/// so the server does not wait for a response that will never come. +async fn forward_in_process_event( + event_tx: &mpsc::Sender, + skipped_events: &mut usize, + event: InProcessServerEvent, + mut reject_server_request: F, +) -> ForwardEventResult +where + F: FnMut(ServerRequest), +{ + if *skipped_events > 0 { + if event_requires_delivery(&event) { + // Surface lag before the lossless event, but do not let the lag marker itself cause + // us to drop the transcript/completion notification the caller is blocked on. + if event_tx + .send(InProcessServerEvent::Lagged { + skipped: *skipped_events, + }) + .await + .is_err() + { + return ForwardEventResult::DisableStream; + } + *skipped_events = 0; + } else { + match event_tx.try_send(InProcessServerEvent::Lagged { + skipped: *skipped_events, + }) { + Ok(()) => { + *skipped_events = 0; + } + Err(mpsc::error::TrySendError::Full(_)) => { + *skipped_events = skipped_events.saturating_add(1); + warn!("dropping in-process app-server event because consumer queue is full"); + if let InProcessServerEvent::ServerRequest(request) = event { + reject_server_request(request); + } + return ForwardEventResult::Continue; + } + Err(mpsc::error::TrySendError::Closed(_)) => { + return ForwardEventResult::DisableStream; + } + } + } + } + + if event_requires_delivery(&event) { + // Block until the consumer catches up for transcript/completion notifications; this + // preserves the visible assistant output even when the queue is otherwise saturated. + if event_tx.send(event).await.is_err() { + return ForwardEventResult::DisableStream; + } + return ForwardEventResult::Continue; + } + + match event_tx.try_send(event) { + Ok(()) => ForwardEventResult::Continue, + Err(mpsc::error::TrySendError::Full(event)) => { + *skipped_events = skipped_events.saturating_add(1); + warn!("dropping in-process app-server event because consumer queue is full"); + if let InProcessServerEvent::ServerRequest(request) = event { + reject_server_request(request); + } + ForwardEventResult::Continue + } + Err(mpsc::error::TrySendError::Closed(_)) => ForwardEventResult::DisableStream, + } +} + /// Layered error for [`InProcessAppServerClient::request_typed`]. /// /// This keeps transport failures, server-side JSON-RPC failures, and response @@ -366,83 +477,26 @@ impl InProcessAppServerClient { continue; } - if skipped_events > 0 { - if event_requires_delivery(&event) { - // Surface lag before the terminal event, but - // do not let the lag marker itself cause us to - // drop the completion/abort notification that - // the caller is blocked on. - if event_tx - .send(InProcessServerEvent::Lagged { - skipped: skipped_events, - }) - .await - .is_err() - { - event_stream_enabled = false; - continue; - } - skipped_events = 0; - } else { - match event_tx.try_send(InProcessServerEvent::Lagged { - skipped: skipped_events, - }) { - Ok(()) => { - skipped_events = 0; - } - Err(mpsc::error::TrySendError::Full(_)) => { - skipped_events = skipped_events.saturating_add(1); - warn!( - "dropping in-process app-server event because consumer queue is full" - ); - if let InProcessServerEvent::ServerRequest(request) = event { - let _ = request_sender.fail_server_request( - request.id().clone(), - JSONRPCErrorError { - code: -32001, - message: "in-process app-server event queue is full".to_string(), - data: None, - }, - ); - } - continue; - } - Err(mpsc::error::TrySendError::Closed(_)) => { - event_stream_enabled = false; - continue; - } - } - } - } - - if event_requires_delivery(&event) { - // Block until the consumer catches up for - // terminal notifications; this preserves the - // completion signal even when the queue is - // otherwise saturated. - if event_tx.send(event).await.is_err() { - event_stream_enabled = false; - } - continue; - } - - match event_tx.try_send(event) { - Ok(()) => {} - Err(mpsc::error::TrySendError::Full(event)) => { - skipped_events = skipped_events.saturating_add(1); - warn!("dropping in-process app-server event because consumer queue is full"); - if let InProcessServerEvent::ServerRequest(request) = event { - let _ = request_sender.fail_server_request( - request.id().clone(), - JSONRPCErrorError { - code: -32001, - message: "in-process app-server event queue is full".to_string(), - data: None, - }, - ); - } - } - Err(mpsc::error::TrySendError::Closed(_)) => { + match forward_in_process_event( + &event_tx, + &mut skipped_events, + event, + |request| { + let _ = request_sender.fail_server_request( + request.id().clone(), + JSONRPCErrorError { + code: -32001, + message: "in-process app-server event queue is full" + .to_string(), + data: None, + }, + ); + }, + ) + .await + { + ForwardEventResult::Continue => {} + ForwardEventResult::DisableStream => { event_stream_enabled = false; } } @@ -933,6 +987,53 @@ mod tests { .expect("message should send"); } + fn command_execution_output_delta_notification(delta: &str) -> ServerNotification { + ServerNotification::CommandExecutionOutputDelta( + codex_app_server_protocol::CommandExecutionOutputDeltaNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item_id: "item".to_string(), + delta: delta.to_string(), + }, + ) + } + + fn agent_message_delta_notification(delta: &str) -> ServerNotification { + ServerNotification::AgentMessageDelta( + codex_app_server_protocol::AgentMessageDeltaNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item_id: "item".to_string(), + delta: delta.to_string(), + }, + ) + } + + fn item_completed_notification(text: &str) -> ServerNotification { + ServerNotification::ItemCompleted(codex_app_server_protocol::ItemCompletedNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item: codex_app_server_protocol::ThreadItem::AgentMessage { + id: "item".to_string(), + text: text.to_string(), + phase: None, + memory_citation: None, + }, + }) + } + + fn turn_completed_notification() -> ServerNotification { + ServerNotification::TurnCompleted(codex_app_server_protocol::TurnCompletedNotification { + thread_id: "thread".to_string(), + turn: codex_app_server_protocol::Turn { + id: "turn".to_string(), + items: Vec::new(), + status: codex_app_server_protocol::TurnStatus::Completed, + error: None, + }, + }) + } + fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs { RemoteAppServerConnectArgs { websocket_url, @@ -1043,6 +1144,94 @@ mod tests { client.shutdown().await.expect("shutdown should complete"); } + #[tokio::test] + async fn forward_in_process_event_preserves_transcript_notifications_under_backpressure() { + let (event_tx, mut event_rx) = mpsc::channel(1); + event_tx + .send(InProcessServerEvent::ServerNotification( + command_execution_output_delta_notification("stdout-1"), + )) + .await + .expect("initial event should enqueue"); + + let mut skipped_events = 0usize; + let result = forward_in_process_event( + &event_tx, + &mut skipped_events, + InProcessServerEvent::ServerNotification(command_execution_output_delta_notification( + "stdout-2", + )), + |_| {}, + ) + .await; + assert_eq!(result, ForwardEventResult::Continue); + assert_eq!(skipped_events, 1); + + let receive_task = tokio::spawn(async move { + let mut events = Vec::new(); + for _ in 0..5 { + events.push( + timeout(Duration::from_secs(2), event_rx.recv()) + .await + .expect("event should arrive before timeout") + .expect("event stream should stay open"), + ); + } + events + }); + + for notification in [ + agent_message_delta_notification("hello"), + item_completed_notification("hello"), + turn_completed_notification(), + ] { + let result = forward_in_process_event( + &event_tx, + &mut skipped_events, + InProcessServerEvent::ServerNotification(notification), + |_| {}, + ) + .await; + assert_eq!(result, ForwardEventResult::Continue); + } + assert_eq!(skipped_events, 0); + + let events = receive_task + .await + .expect("receiver task should join successfully"); + assert!(matches!( + &events[0], + InProcessServerEvent::ServerNotification( + ServerNotification::CommandExecutionOutputDelta(notification) + ) if notification.delta == "stdout-1" + )); + assert!(matches!( + &events[1], + InProcessServerEvent::Lagged { skipped: 1 } + )); + assert!(matches!( + &events[2], + InProcessServerEvent::ServerNotification(ServerNotification::AgentMessageDelta( + notification + )) if notification.delta == "hello" + )); + assert!(matches!( + &events[3], + InProcessServerEvent::ServerNotification(ServerNotification::ItemCompleted( + notification + )) if matches!( + ¬ification.item, + codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello" + ) + )); + assert!(matches!( + &events[4], + InProcessServerEvent::ServerNotification(ServerNotification::TurnCompleted( + notification + )) if notification.turn.status == codex_app_server_protocol::TurnStatus::Completed + )); + } + #[tokio::test] async fn remote_typed_request_roundtrip_works() { let websocket_url = start_test_remote_server(|mut websocket| async move { @@ -1207,6 +1396,107 @@ mod tests { client.shutdown().await.expect("shutdown should complete"); } + #[tokio::test] + async fn remote_backpressure_preserves_transcript_notifications() { + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + let websocket_url = start_test_remote_server(|mut websocket| async move { + expect_remote_initialize(&mut websocket).await; + for notification in [ + command_execution_output_delta_notification("stdout-1"), + command_execution_output_delta_notification("stdout-2"), + agent_message_delta_notification("hello"), + item_completed_notification("hello"), + turn_completed_notification(), + ] { + write_websocket_message( + &mut websocket, + JSONRPCMessage::Notification( + serde_json::from_value( + serde_json::to_value(notification) + .expect("notification should serialize"), + ) + .expect("notification should convert to JSON-RPC"), + ), + ) + .await; + } + let _ = done_rx.await; + }) + .await; + let mut client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs { + websocket_url, + client_name: "codex-app-server-client-test".to_string(), + client_version: "0.0.0-test".to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: 1, + }) + .await + .expect("remote client should connect"); + + let first_event = timeout(Duration::from_secs(2), client.next_event()) + .await + .expect("first event should arrive before timeout") + .expect("event stream should stay open"); + assert!(matches!( + first_event, + AppServerEvent::ServerNotification(ServerNotification::CommandExecutionOutputDelta( + notification + )) if notification.delta == "stdout-1" + )); + + let mut remaining_events = Vec::new(); + for _ in 0..4 { + remaining_events.push( + timeout(Duration::from_secs(2), client.next_event()) + .await + .expect("event should arrive before timeout") + .expect("event stream should stay open"), + ); + } + + let mut transcript_event_names = Vec::new(); + for event in &remaining_events { + match event { + AppServerEvent::Lagged { skipped: 1 } => {} + AppServerEvent::ServerNotification( + ServerNotification::CommandExecutionOutputDelta(notification), + ) if notification.delta == "stdout-2" => {} + AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta( + notification, + )) if notification.delta == "hello" => { + transcript_event_names.push("agent_message_delta"); + } + AppServerEvent::ServerNotification(ServerNotification::ItemCompleted( + notification, + )) if matches!( + ¬ification.item, + codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello" + ) => + { + transcript_event_names.push("item_completed"); + } + AppServerEvent::ServerNotification(ServerNotification::TurnCompleted( + notification, + )) if notification.turn.status + == codex_app_server_protocol::TurnStatus::Completed => + { + transcript_event_names.push("turn_completed"); + } + _ => panic!("unexpected remaining event: {event:?}"), + } + } + assert_eq!( + transcript_event_names, + vec!["agent_message_delta", "item_completed", "turn_completed"] + ); + + done_tx + .send(()) + .expect("server completion signal should send"); + client.shutdown().await.expect("shutdown should complete"); + } + #[tokio::test] async fn remote_server_request_resolution_roundtrip_works() { let websocket_url = start_test_remote_server(|mut websocket| async move { @@ -1446,7 +1736,7 @@ mod tests { } #[test] - fn event_requires_delivery_marks_terminal_events() { + fn event_requires_delivery_marks_transcript_and_terminal_events() { assert!(event_requires_delivery( &InProcessServerEvent::ServerNotification( codex_app_server_protocol::ServerNotification::TurnCompleted( @@ -1462,9 +1752,49 @@ mod tests { ) ) )); + assert!(event_requires_delivery( + &InProcessServerEvent::ServerNotification( + codex_app_server_protocol::ServerNotification::AgentMessageDelta( + codex_app_server_protocol::AgentMessageDeltaNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item_id: "item".to_string(), + delta: "hello".to_string(), + } + ) + ) + )); + assert!(event_requires_delivery( + &InProcessServerEvent::ServerNotification( + codex_app_server_protocol::ServerNotification::ItemCompleted( + codex_app_server_protocol::ItemCompletedNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item: codex_app_server_protocol::ThreadItem::AgentMessage { + id: "item".to_string(), + text: "hello".to_string(), + phase: None, + memory_citation: None, + }, + } + ) + ) + )); assert!(!event_requires_delivery(&InProcessServerEvent::Lagged { skipped: 1 })); + assert!(!event_requires_delivery( + &InProcessServerEvent::ServerNotification( + codex_app_server_protocol::ServerNotification::CommandExecutionOutputDelta( + codex_app_server_protocol::CommandExecutionOutputDeltaNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item_id: "item".to_string(), + delta: "stdout".to_string(), + } + ) + ) + )); } #[tokio::test] diff --git a/codex-rs/app-server-client/src/remote.rs b/codex-rs/app-server-client/src/remote.rs index b9179716fe..9cf37e262f 100644 --- a/codex-rs/app-server-client/src/remote.rs +++ b/codex-rs/app-server-client/src/remote.rs @@ -21,6 +21,7 @@ use crate::RequestResult; use crate::SHUTDOWN_TIMEOUT; use crate::TypedRequestError; use crate::request_method_name; +use crate::server_notification_requires_delivery; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; @@ -854,11 +855,11 @@ async fn reject_if_server_request_dropped( fn event_requires_delivery(event: &AppServerEvent) -> bool { match event { - AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true, + AppServerEvent::ServerNotification(notification) => { + server_notification_requires_delivery(notification) + } AppServerEvent::Disconnected { .. } => true, - AppServerEvent::Lagged { .. } - | AppServerEvent::ServerNotification(_) - | AppServerEvent::ServerRequest(_) => false, + AppServerEvent::Lagged { .. } | AppServerEvent::ServerRequest(_) => false, } } @@ -905,3 +906,40 @@ async fn write_jsonrpc_message( )) }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_requires_delivery_marks_transcript_and_disconnect_events() { + assert!(event_requires_delivery( + &AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta( + codex_app_server_protocol::AgentMessageDeltaNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item_id: "item".to_string(), + delta: "hello".to_string(), + }, + ),) + )); + assert!(event_requires_delivery( + &AppServerEvent::ServerNotification(ServerNotification::ItemCompleted( + codex_app_server_protocol::ItemCompletedNotification { + thread_id: "thread".to_string(), + turn_id: "turn".to_string(), + item: codex_app_server_protocol::ThreadItem::Plan { + id: "item".to_string(), + text: "step".to_string(), + }, + } + ),) + )); + assert!(event_requires_delivery(&AppServerEvent::Disconnected { + message: "closed".to_string(), + })); + assert!(!event_requires_delivery(&AppServerEvent::Lagged { + skipped: 1 + })); + } +}