fix exec-server processor ordering and task cleanup

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-19 01:54:00 +00:00
parent 963d109cff
commit 62450991e1
7 changed files with 34 additions and 25 deletions

View File

@@ -11,7 +11,6 @@ use tokio::sync::mpsc;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Message;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
#[derive(Debug)]

View File

@@ -1,7 +1,7 @@
mod filesystem;
mod handler;
mod registry;
mod processor;
mod registry;
mod transport;
pub(crate) use handler::ExecServerHandler;

View File

@@ -17,7 +17,7 @@ use crate::server::registry::build_router;
pub(crate) async fn run_connection(connection: JsonRpcConnection) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
let (json_outgoing_tx, mut incoming_rx, connection_tasks) = connection.into_parts();
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
@@ -38,6 +38,7 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
}
});
// Process inbound events sequentially to preserve initialize/initialized ordering.
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::MalformedMessage { reason } => {
@@ -114,5 +115,9 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
handler.shutdown().await;
drop(outgoing_tx);
for task in connection_tasks {
task.abort();
let _ = task.await;
}
let _ = outbound_task.await;
}

View File

@@ -1,7 +1,7 @@
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::io;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;

View File

@@ -5,8 +5,8 @@ use super::parse_listen_url;
#[test]
fn parse_listen_url_accepts_default_websocket_url() {
let bind_address = parse_listen_url(DEFAULT_LISTEN_URL)
.expect("default listen URL should parse");
let bind_address =
parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse");
assert_eq!(
bind_address,
super::ListenAddress::Websocket("127.0.0.1:0".parse().expect("valid socket address"))
@@ -21,8 +21,8 @@ fn parse_listen_url_accepts_stdio_url() {
#[test]
fn parse_listen_url_accepts_websocket_url() {
let bind_address = parse_listen_url("ws://127.0.0.1:1234")
.expect("websocket listen URL should parse");
let bind_address =
parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse");
assert_eq!(
bind_address,
super::ListenAddress::Websocket("127.0.0.1:1234".parse().expect("valid socket address"))

View File

@@ -1,3 +1,5 @@
#![allow(dead_code)]
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@@ -6,16 +8,17 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use codex_utils_cargo_bin::cargo_bin;
use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use futures::StreamExt;
use std::process::Stdio;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
use std::process::Stdio;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
enum OutgoingMessage {
Json(JSONRPCMessage),
@@ -113,7 +116,10 @@ pub mod exec_server {
child.stderr(Stdio::piped());
let mut child = child.spawn()?;
let stderr = child.stderr.take().expect("stderr should be piped");
let stderr = child
.stderr
.take()
.ok_or_else(|| anyhow::anyhow!("stderr should be piped"))?;
let mut stderr_lines = BufReader::new(stderr).lines();
let websocket_url = read_websocket_url(&mut stderr_lines).await?;
@@ -132,7 +138,9 @@ pub mod exec_server {
Message::Binary(bytes) => serde_json::from_slice::<JSONRPCMessage>(&bytes),
_ => continue,
};
if let Ok(message) = outgoing && let Err(_err) = incoming_tx.send(message).await {
if let Ok(message) = outgoing
&& let Err(_err) = incoming_tx.send(message).await
{
break;
}
}
@@ -141,12 +149,10 @@ pub mod exec_server {
let writer_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
let outgoing = match message {
OutgoingMessage::Json(message) => {
match serde_json::to_string(&message) {
Ok(json) => Message::Text(json.into()),
Err(_) => continue,
}
}
OutgoingMessage::Json(message) => match serde_json::to_string(&message) {
Ok(json) => Message::Text(json.into()),
Err(_) => continue,
},
OutgoingMessage::RawText(message) => Message::Text(message.into()),
};
if outgoing_ws.send(outgoing).await.is_err() {
@@ -165,7 +171,9 @@ pub mod exec_server {
})
}
async fn read_websocket_url<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> anyhow::Result<String>
async fn read_websocket_url<R>(
lines: &mut tokio::io::Lines<BufReader<R>>,
) -> anyhow::Result<String>
where
R: tokio::io::AsyncRead + Unpin,
{

View File

@@ -4,8 +4,8 @@ mod common;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::InitializeParams;
use codex_exec_server::ExecResponse;
use codex_exec_server::InitializeParams;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
@@ -30,10 +30,7 @@ async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()>
.await?;
server
.send_notification(
"initialized",
serde_json::json!({}),
)
.send_notification("initialized", serde_json::json!({}))
.await?;
let process_start_id = server