mirror of
https://github.com/openai/codex.git
synced 2026-05-01 11:52:10 +03:00
Reapply "Add app-server transport layer with websocket support" (#11370)
Reapply "Add app-server transport layer with websocket support" with
additional fixes from https://github.com/openai/codex/pull/11313/changes
to avoid deadlocking.
This reverts commit 47356ff83c.
## Summary
To avoid deadlocking when queues are full, we maintain separate tokio
tasks dedicated to incoming vs outgoing event handling
- split the app-server main loop into two tasks in
`run_main_with_transport`
- inbound handling (`transport_event_rx`)
- outbound handling (`outgoing_rx` + `thread_created_rx`)
- separate incoming and outgoing websocket tasks
## Validation
Integration tests, testing thoroughly e2e in codex app w/ >10 concurrent
requests
<img width="1365" height="979" alt="Screenshot 2026-02-10 at 2 54 22 PM"
src="https://github.com/user-attachments/assets/47ca2c13-f322-4e5c-bedd-25859cbdc45f"
/>
---------
Co-authored-by: jif-oai <jif@openai.com>
This commit is contained in:
@@ -8,14 +8,29 @@ use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::ConfigLayerStackOrdering;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::CHANNEL_CAPACITY;
|
||||
use crate::transport::ConnectionState;
|
||||
use crate::transport::OutboundConnectionState;
|
||||
use crate::transport::TransportEvent;
|
||||
use crate::transport::has_initialized_connections;
|
||||
use crate::transport::route_outgoing_envelope;
|
||||
use crate::transport::start_stdio_connection;
|
||||
use crate::transport::start_websocket_acceptor;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
@@ -26,13 +41,9 @@ use codex_core::check_execpolicy_for_warnings;
|
||||
use codex_core::config_loader::ConfigLoadError;
|
||||
use codex_core::config_loader::TextRange as CoreTextRange;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{self};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
@@ -51,11 +62,30 @@ mod fuzzy_file_search;
|
||||
mod message_processor;
|
||||
mod models;
|
||||
mod outgoing_message;
|
||||
mod transport;
|
||||
|
||||
/// Size of the bounded channels used to communicate between tasks. The value
|
||||
/// is a balance between throughput and memory usage – 128 messages should be
|
||||
/// plenty for an interactive CLI.
|
||||
const CHANNEL_CAPACITY: usize = 128;
|
||||
pub use crate::transport::AppServerTransport;
|
||||
|
||||
/// Control-plane messages from the processor/transport side to the outbound router task.
|
||||
///
|
||||
/// `run_main_with_transport` now uses two loops/tasks:
|
||||
/// - processor loop: handles incoming JSON-RPC and request dispatch
|
||||
/// - outbound loop: performs potentially slow writes to per-connection writers
|
||||
///
|
||||
/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable
|
||||
/// connection state directly. In particular, the outbound loop needs to know
|
||||
/// when a connection opens/closes so it can route messages correctly.
|
||||
enum OutboundControlEvent {
|
||||
/// Register a new writer for an opened connection.
|
||||
Opened {
|
||||
connection_id: ConnectionId,
|
||||
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
|
||||
initialized: Arc<AtomicBool>,
|
||||
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
||||
},
|
||||
/// Remove state for a closed/disconnected connection.
|
||||
Closed { connection_id: ConnectionId },
|
||||
}
|
||||
|
||||
fn config_warning_from_error(
|
||||
summary: impl Into<String>,
|
||||
@@ -173,32 +203,41 @@ pub async fn run_main(
|
||||
loader_overrides: LoaderOverrides,
|
||||
default_analytics_enabled: bool,
|
||||
) -> IoResult<()> {
|
||||
// Set up channels.
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||||
run_main_with_transport(
|
||||
codex_linux_sandbox_exe,
|
||||
cli_config_overrides,
|
||||
loader_overrides,
|
||||
default_analytics_enabled,
|
||||
AppServerTransport::Stdio,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// Task: read from stdin, push to `incoming_tx`.
|
||||
let stdin_reader_handle = tokio::spawn({
|
||||
async move {
|
||||
let stdin = io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
let mut lines = reader.lines();
|
||||
pub async fn run_main_with_transport(
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
loader_overrides: LoaderOverrides,
|
||||
default_analytics_enabled: bool,
|
||||
transport: AppServerTransport,
|
||||
) -> IoResult<()> {
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
let (outbound_control_tx, mut outbound_control_rx) =
|
||||
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);
|
||||
|
||||
while let Some(line) = lines.next_line().await.unwrap_or_default() {
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(msg) => {
|
||||
if incoming_tx.send(msg).await.is_err() {
|
||||
// Receiver gone – nothing left to do.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
debug!("stdin reader finished (EOF)");
|
||||
let mut stdio_handles = Vec::<JoinHandle<()>>::new();
|
||||
let mut websocket_accept_handle = None;
|
||||
match transport {
|
||||
AppServerTransport::Stdio => {
|
||||
start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?;
|
||||
}
|
||||
});
|
||||
AppServerTransport::WebSocket { bind_address } => {
|
||||
websocket_accept_handle =
|
||||
Some(start_websocket_acceptor(bind_address, transport_event_tx.clone()).await?);
|
||||
}
|
||||
}
|
||||
let shutdown_when_no_connections = matches!(transport, AppServerTransport::Stdio);
|
||||
|
||||
// Parse CLI overrides once and derive the base Config eagerly so later
|
||||
// components do not need to work with raw TOML values.
|
||||
@@ -329,15 +368,76 @@ pub async fn run_main(
|
||||
}
|
||||
}
|
||||
|
||||
// Task: process incoming messages.
|
||||
let transport_event_tx_for_outbound = transport_event_tx.clone();
|
||||
let outbound_handle = tokio::spawn(async move {
|
||||
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
||||
let mut pending_closed_connections = VecDeque::<ConnectionId>::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
event = outbound_control_rx.recv() => {
|
||||
let Some(event) = event else {
|
||||
break;
|
||||
};
|
||||
match event {
|
||||
OutboundControlEvent::Opened {
|
||||
connection_id,
|
||||
writer,
|
||||
initialized,
|
||||
opted_out_notification_methods,
|
||||
} => {
|
||||
outbound_connections.insert(
|
||||
connection_id,
|
||||
OutboundConnectionState::new(
|
||||
writer,
|
||||
initialized,
|
||||
opted_out_notification_methods,
|
||||
),
|
||||
);
|
||||
}
|
||||
OutboundControlEvent::Closed { connection_id } => {
|
||||
outbound_connections.remove(&connection_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
envelope = outgoing_rx.recv() => {
|
||||
let Some(envelope) = envelope else {
|
||||
break;
|
||||
};
|
||||
let disconnected_connections =
|
||||
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
||||
pending_closed_connections.extend(disconnected_connections);
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(connection_id) = pending_closed_connections.front().copied() {
|
||||
match transport_event_tx_for_outbound
|
||||
.try_send(TransportEvent::ConnectionClosed { connection_id })
|
||||
{
|
||||
Ok(()) => {
|
||||
pending_closed_connections.pop_front();
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
break;
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("outbound router task exited (channel closed)");
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
|
||||
let loader_overrides = loader_overrides_for_config_api;
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
codex_linux_sandbox_exe,
|
||||
config: std::sync::Arc::new(config),
|
||||
config: Arc::new(config),
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
@@ -345,25 +445,107 @@ pub async fn run_main(
|
||||
config_warnings,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = incoming_rx.recv() => {
|
||||
let Some(msg) = msg else {
|
||||
event = transport_event_rx.recv() => {
|
||||
let Some(event) = event else {
|
||||
break;
|
||||
};
|
||||
match msg {
|
||||
JSONRPCMessage::Request(r) => processor.process_request(r).await,
|
||||
JSONRPCMessage::Response(r) => processor.process_response(r).await,
|
||||
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
|
||||
JSONRPCMessage::Error(e) => processor.process_error(e).await,
|
||||
match event {
|
||||
TransportEvent::ConnectionOpened { connection_id, writer } => {
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
let outbound_opted_out_notification_methods =
|
||||
Arc::new(RwLock::new(HashSet::new()));
|
||||
if outbound_control_tx
|
||||
.send(OutboundControlEvent::Opened {
|
||||
connection_id,
|
||||
writer,
|
||||
initialized: Arc::clone(&outbound_initialized),
|
||||
opted_out_notification_methods: Arc::clone(
|
||||
&outbound_opted_out_notification_methods,
|
||||
),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
connections.insert(
|
||||
connection_id,
|
||||
ConnectionState::new(
|
||||
outbound_initialized,
|
||||
outbound_opted_out_notification_methods,
|
||||
),
|
||||
);
|
||||
}
|
||||
TransportEvent::ConnectionClosed { connection_id } => {
|
||||
if outbound_control_tx
|
||||
.send(OutboundControlEvent::Closed { connection_id })
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
connections.remove(&connection_id);
|
||||
if shutdown_when_no_connections && connections.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
TransportEvent::IncomingMessage { connection_id, message } => {
|
||||
match message {
|
||||
JSONRPCMessage::Request(request) => {
|
||||
let Some(connection_state) = connections.get_mut(&connection_id) else {
|
||||
warn!("dropping request from unknown connection: {:?}", connection_id);
|
||||
continue;
|
||||
};
|
||||
let was_initialized = connection_state.session.initialized;
|
||||
processor
|
||||
.process_request(
|
||||
connection_id,
|
||||
request,
|
||||
&mut connection_state.session,
|
||||
&connection_state.outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
if let Ok(mut opted_out_notification_methods) = connection_state
|
||||
.outbound_opted_out_notification_methods
|
||||
.write()
|
||||
{
|
||||
*opted_out_notification_methods = connection_state
|
||||
.session
|
||||
.opted_out_notification_methods
|
||||
.clone();
|
||||
} else {
|
||||
warn!(
|
||||
"failed to update outbound opted-out notifications"
|
||||
);
|
||||
}
|
||||
if !was_initialized && connection_state.session.initialized {
|
||||
processor.send_initialize_notifications().await;
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Response(response) => {
|
||||
processor.process_response(response).await;
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
processor.process_notification(notification).await;
|
||||
}
|
||||
JSONRPCMessage::Error(err) => {
|
||||
processor.process_error(err).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
if has_initialized_connections(&connections) {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
// TODO(jif) handle lag.
|
||||
@@ -384,33 +566,18 @@ pub async fn run_main(
|
||||
}
|
||||
});
|
||||
|
||||
// Task: write outgoing messages to stdout.
|
||||
let stdout_writer_handle = tokio::spawn(async move {
|
||||
let mut stdout = io::stdout();
|
||||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||||
let Ok(value) = serde_json::to_value(outgoing_message) else {
|
||||
error!("Failed to convert OutgoingMessage to JSON value");
|
||||
continue;
|
||||
};
|
||||
match serde_json::to_string(&value) {
|
||||
Ok(mut json) => {
|
||||
json.push('\n');
|
||||
if let Err(e) = stdout.write_all(json.as_bytes()).await {
|
||||
error!("Failed to write to stdout: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
drop(transport_event_tx);
|
||||
|
||||
info!("stdout writer exited (channel closed)");
|
||||
});
|
||||
let _ = processor_handle.await;
|
||||
let _ = outbound_handle.await;
|
||||
|
||||
// Wait for all tasks to finish. The typical exit path is the stdin reader
|
||||
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
|
||||
// the processor and then to the stdout task.
|
||||
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
|
||||
if let Some(handle) = websocket_accept_handle {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
for handle in stdio_handles {
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user