diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 3358f11934..0183482f6a 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -360,12 +360,19 @@ impl ExecServerClient { && let Err(err) = handle_server_notification(&inner, notification).await { - let _ = err; + fail_all_sessions( + &inner, + format!("exec-server notification handling failed: {err}"), + ) + .await; return; } } RpcClientEvent::Disconnected { reason } => { - let _ = reason; + if let Some(inner) = weak.upgrade() { + fail_all_sessions(&inner, disconnected_message(reason.as_deref())) + .await; + } return; } } @@ -407,6 +414,32 @@ impl From for ExecServerError { } } +fn disconnected_message(reason: Option<&str>) -> String { + match reason { + Some(reason) => format!("exec-server transport disconnected: {reason}"), + None => "exec-server transport disconnected".to_string(), + } +} + +async fn fail_all_sessions(inner: &Arc, message: String) { + let sessions = { + let _sessions_write_guard = inner.sessions_write_lock.lock().await; + let sessions = inner.sessions.load(); + let drained_sessions = sessions.as_ref().clone(); + inner.sessions.store(Arc::new(HashMap::new())); + drained_sessions + }; + + for (_, events_tx) in sessions { + // Do not block disconnect handling behind a full bounded queue. Best + // effort deliver a terminal failure event, then drop the sender so + // receivers still observe EOF if the queue was already saturated. + let _ = events_tx.try_send(ExecSessionEvent::Failed { + message: message.clone(), + }); + } +} + async fn handle_server_notification( inner: &Arc, notification: JSONRPCNotification, diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index a8a89cdfc9..2c89dd2838 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -25,6 +25,9 @@ pub enum ExecSessionEvent { Closed { seq: u64, }, + Failed { + message: String, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 7647ef63d6..f92e8edfe3 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -22,7 +22,7 @@ use common::exec_server::exec_server; struct ProcessContext { backend: Arc, - _server: Option, + server: Option, } async fn create_process_context(use_remote: bool) -> Result { @@ -31,13 +31,13 @@ async fn create_process_context(use_remote: bool) -> Result { let environment = Environment::create(Some(server.websocket_url().to_string())).await?; Ok(ProcessContext { backend: environment.get_exec_backend(), - _server: Some(server), + server: Some(server), }) } else { let environment = Environment::create(None).await?; Ok(ProcessContext { backend: environment.get_exec_backend(), - _server: None, + server: None, }) } } @@ -67,6 +67,9 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> { } => exit_code = Some(code), ExecSessionEvent::Closed { .. } => break, ExecSessionEvent::Output { .. } => {} + ExecSessionEvent::Failed { message } => { + anyhow::bail!("process failed before Closed event: {message}") + } }, None => anyhow::bail!("event stream closed before Closed event"), } @@ -94,6 +97,9 @@ async fn collect_process_output_from_events( ExecSessionEvent::Closed { .. } => { break; } + ExecSessionEvent::Failed { message } => { + anyhow::bail!("process failed before Closed event: {message}"); + } }, None => { anyhow::bail!("event stream closed before Closed event"); @@ -196,6 +202,52 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe( Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { + let mut context = create_process_context(/*use_remote*/ true).await?; + let session = context + .backend + .start(ExecParams { + process_id: "proc-disconnect".to_string(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "sleep 10".to_string(), + ], + cwd: std::env::current_dir()?, + env: Default::default(), + tty: false, + arg0: None, + }) + .await?; + + let server = context + .server + .as_mut() + .expect("remote context should include exec-server harness"); + server.shutdown().await?; + + let mut events = session.events; + loop { + match timeout(Duration::from_secs(2), events.recv()).await? { + Some(ExecSessionEvent::Failed { message }) => { + assert!( + message.starts_with("exec-server transport disconnected"), + "unexpected failure message: {message}" + ); + break; + } + Some(ExecSessionEvent::Output { .. } | ExecSessionEvent::Exited { .. }) => {} + Some(ExecSessionEvent::Closed { .. }) => { + anyhow::bail!("received Closed instead of transport failure") + } + None => anyhow::bail!("event stream closed before Failed event"), + } + } + + Ok(()) +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]