diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 3f75e67d3d..d4b5fb6431 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1476,12 +1476,15 @@ dependencies = [ "codex-core", "codex-feedback", "codex-protocol", + "futures", "pretty_assertions", "serde", "serde_json", "tokio", + "tokio-tungstenite", "toml 0.9.11+spec-1.1.0", "tracing", + "url", ] [[package]] diff --git a/codex-rs/app-server-client/Cargo.toml b/codex-rs/app-server-client/Cargo.toml index addde4e529..a0b98c0fec 100644 --- a/codex-rs/app-server-client/Cargo.toml +++ b/codex-rs/app-server-client/Cargo.toml @@ -18,11 +18,14 @@ codex-arg0 = { workspace = true } codex-core = { workspace = true } codex-feedback = { workspace = true } codex-protocol = { workspace = true } +futures = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["sync", "time", "rt"] } +tokio-tungstenite = { workspace = true } toml = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs index cd488832c9..fe85a4de98 100644 --- a/codex-rs/app-server-client/src/lib.rs +++ b/codex-rs/app-server-client/src/lib.rs @@ -15,6 +15,8 @@ //! bridging async `mpsc` channels on both sides. Queues are bounded so overload //! surfaces as channel-full errors rather than unbounded memory growth. +mod remote; + use std::error::Error; use std::fmt; use std::io::Error as IoError; @@ -35,8 +37,11 @@ use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::InitializeCapabilities; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result as JsonRpcResult; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; use codex_arg0::Arg0DispatchPaths; use codex_core::AuthManager; use codex_core::ThreadManager; @@ -53,6 +58,9 @@ use tokio::time::timeout; use toml::Value as TomlValue; use tracing::warn; +pub use crate::remote::RemoteAppServerClient; +pub use crate::remote::RemoteAppServerConnectArgs; + const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); /// Raw app-server request result for typed in-process requests. @@ -62,6 +70,30 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); /// `MessageProcessor` continues to produce that shape internally. pub type RequestResult = std::result::Result; +#[derive(Debug, Clone)] +pub enum AppServerEvent { + Lagged { skipped: usize }, + ServerNotification(ServerNotification), + LegacyNotification(JSONRPCNotification), + ServerRequest(ServerRequest), + Disconnected { message: String }, +} + +impl From for AppServerEvent { + fn from(value: InProcessServerEvent) -> Self { + match value { + InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped }, + InProcessServerEvent::ServerNotification(notification) => { + Self::ServerNotification(notification) + } + InProcessServerEvent::LegacyNotification(notification) => { + Self::LegacyNotification(notification) + } + InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request), + } + } +} + pub fn local_external_chatgpt_tokens( config: &Config, ) -> Result { @@ -709,6 +741,113 @@ impl InProcessAppServerClient { } } +pub enum AppServerClient { + InProcess(InProcessAppServerClient), + Remote(RemoteAppServerClient), +} + +impl AppServerClient { + pub fn in_process(client: InProcessAppServerClient) -> Self { + Self::InProcess(client) + } + + pub async fn connect_remote(args: RemoteAppServerConnectArgs) -> IoResult { + RemoteAppServerClient::connect(args).await.map(Self::Remote) + } + + pub async fn request(&self, request: ClientRequest) -> IoResult { + match self { + Self::InProcess(client) => client.request(request).await, + Self::Remote(client) => client.request(request).await, + } + } + + pub async fn request_typed(&self, request: ClientRequest) -> Result + where + T: DeserializeOwned, + { + match self { + Self::InProcess(client) => client.request_typed(request).await, + Self::Remote(client) => client.request_typed(request).await, + } + } + + pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> { + match self { + Self::InProcess(client) => client.notify(notification).await, + Self::Remote(client) => client.notify(notification).await, + } + } + + pub async fn resolve_server_request( + &self, + request_id: RequestId, + result: JsonRpcResult, + ) -> IoResult<()> { + match self { + Self::InProcess(client) => client.resolve_server_request(request_id, result).await, + Self::Remote(client) => client.resolve_server_request(request_id, result).await, + } + } + + pub async fn reject_server_request( + &self, + request_id: RequestId, + error: JSONRPCErrorError, + ) -> IoResult<()> { + match self { + Self::InProcess(client) => client.reject_server_request(request_id, error).await, + Self::Remote(client) => client.reject_server_request(request_id, error).await, + } + } + + pub async fn next_event(&mut self) -> Option { + match self { + Self::InProcess(client) => client.next_event().await.map(Into::into), + Self::Remote(client) => client.next_event().await, + } + } + + pub async fn next_typed_event(&mut self) -> Option { + loop { + match self.next_event().await { + Some(AppServerEvent::LegacyNotification(notification)) => { + warn!( + notification.method = %notification.method, + "dropping legacy app-server notification" + ); + } + Some(event) => return Some(event), + None => return None, + } + } + } + + pub async fn submit_legacy_thread_op( + &self, + thread_id: codex_protocol::ThreadId, + op: codex_protocol::protocol::Op, + ) -> IoResult<()> { + match self { + Self::InProcess(client) => client.submit_legacy_thread_op(thread_id, op).await, + Self::Remote(_) => Err(IoError::other( + "legacy TUI operation is not supported over remote app-server transport", + )), + } + } + + pub async fn shutdown(self) -> IoResult<()> { + match self { + Self::InProcess(client) => client.shutdown().await, + Self::Remote(client) => client.shutdown().await, + } + } + + pub fn is_remote(&self) -> bool { + matches!(self, Self::Remote(_)) + } +} + /// Extracts the JSON-RPC method name for diagnostics without extending the /// protocol crate with in-process-only helpers. fn request_method_name(request: &ClientRequest) -> String { diff --git a/codex-rs/app-server-client/src/remote.rs b/codex-rs/app-server-client/src/remote.rs new file mode 100644 index 0000000000..eb3b2fe665 --- /dev/null +++ b/codex-rs/app-server-client/src/remote.rs @@ -0,0 +1,807 @@ +/* +This module implements the websocket-backed app-server client transport. + +It owns the remote connection lifecycle, including the initialize/initialized +handshake, JSON-RPC request/response routing, server-request resolution, and +notification streaming. +*/ + +use std::collections::HashMap; +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::io::Result as IoResult; +use std::time::Duration; + +use crate::AppServerEvent; +use crate::RequestResult; +use crate::SHUTDOWN_TIMEOUT; +use crate::TypedRequestError; +use crate::request_method_name; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::InitializeCapabilities; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Result as JsonRpcResult; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use futures::SinkExt; +use futures::StreamExt; +use serde::de::DeserializeOwned; +use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; +use tracing::warn; +use url::Url; + +const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Clone)] +pub struct RemoteAppServerConnectArgs { + pub websocket_url: String, + pub client_name: String, + pub client_version: String, + pub experimental_api: bool, + pub opt_out_notification_methods: Vec, + pub channel_capacity: usize, +} + +impl RemoteAppServerConnectArgs { + fn initialize_params(&self) -> InitializeParams { + let capabilities = InitializeCapabilities { + experimental_api: self.experimental_api, + opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() { + None + } else { + Some(self.opt_out_notification_methods.clone()) + }, + }; + + InitializeParams { + client_info: ClientInfo { + name: self.client_name.clone(), + title: None, + version: self.client_version.clone(), + }, + capabilities: Some(capabilities), + } + } +} + +enum RemoteClientCommand { + Request { + request: Box, + response_tx: oneshot::Sender>, + }, + Notify { + notification: ClientNotification, + response_tx: oneshot::Sender>, + }, + ResolveServerRequest { + request_id: RequestId, + result: JsonRpcResult, + response_tx: oneshot::Sender>, + }, + RejectServerRequest { + request_id: RequestId, + error: JSONRPCErrorError, + response_tx: oneshot::Sender>, + }, + Shutdown { + response_tx: oneshot::Sender>, + }, +} + +pub struct RemoteAppServerClient { + command_tx: mpsc::Sender, + event_rx: mpsc::Receiver, + worker_handle: tokio::task::JoinHandle<()>, +} + +impl RemoteAppServerClient { + pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult { + let channel_capacity = args.channel_capacity.max(1); + let websocket_url = args.websocket_url.clone(); + let url = Url::parse(&websocket_url).map_err(|err| { + IoError::new( + ErrorKind::InvalidInput, + format!("invalid websocket URL `{websocket_url}`: {err}"), + ) + })?; + let stream = timeout(CONNECT_TIMEOUT, connect_async(url.as_str())) + .await + .map_err(|_| { + IoError::new( + ErrorKind::TimedOut, + format!("timed out connecting to remote app server at `{websocket_url}`"), + ) + })? + .map(|(stream, _response)| stream) + .map_err(|err| { + IoError::other(format!( + "failed to connect to remote app server at `{websocket_url}`: {err}" + )) + })?; + let mut stream = stream; + initialize_remote_connection( + &mut stream, + &websocket_url, + args.initialize_params(), + INITIALIZE_TIMEOUT, + ) + .await?; + + let (command_tx, mut command_rx) = mpsc::channel::(channel_capacity); + let (event_tx, event_rx) = mpsc::channel::(channel_capacity); + let worker_handle = tokio::spawn(async move { + let mut pending_requests = + HashMap::>>::new(); + let mut skipped_events = 0usize; + loop { + tokio::select! { + command = command_rx.recv() => { + let Some(command) = command else { + let _ = stream.close(None).await; + break; + }; + match command { + RemoteClientCommand::Request { request, response_tx } => { + let request_id = request_id_from_client_request(&request); + if let Some(previous) = pending_requests.insert(request_id.clone(), response_tx) { + let _ = previous.send(Err(IoError::new( + ErrorKind::InvalidInput, + format!("duplicate remote app-server request id `{request_id}`"), + ))); + } + if let Err(err) = write_jsonrpc_message( + &mut stream, + JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)), + &websocket_url, + ) + .await + { + let err_message = err.to_string(); + if let Some(response_tx) = pending_requests.remove(&request_id) { + let _ = response_tx.send(Err(err)); + } + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` write failed: {err_message}" + ), + }, + &mut stream, + ) + .await; + break; + } + } + RemoteClientCommand::Notify { notification, response_tx } => { + let result = write_jsonrpc_message( + &mut stream, + JSONRPCMessage::Notification( + jsonrpc_notification_from_client_notification(notification), + ), + &websocket_url, + ) + .await; + let _ = response_tx.send(result); + } + RemoteClientCommand::ResolveServerRequest { + request_id, + result, + response_tx, + } => { + let result = write_jsonrpc_message( + &mut stream, + JSONRPCMessage::Response(JSONRPCResponse { + id: request_id, + result, + }), + &websocket_url, + ) + .await; + let _ = response_tx.send(result); + } + RemoteClientCommand::RejectServerRequest { + request_id, + error, + response_tx, + } => { + let result = write_jsonrpc_message( + &mut stream, + JSONRPCMessage::Error(JSONRPCError { + error, + id: request_id, + }), + &websocket_url, + ) + .await; + let _ = response_tx.send(result); + } + RemoteClientCommand::Shutdown { response_tx } => { + let close_result = stream.close(None).await.map_err(|err| { + IoError::other(format!( + "failed to close websocket app server `{websocket_url}`: {err}" + )) + }); + let _ = response_tx.send(close_result); + break; + } + } + } + message = stream.next() => { + match message { + Some(Ok(Message::Text(text))) => { + match serde_json::from_str::(&text) { + Ok(JSONRPCMessage::Response(response)) => { + if let Some(response_tx) = pending_requests.remove(&response.id) { + let _ = response_tx.send(Ok(Ok(response.result))); + } + } + Ok(JSONRPCMessage::Error(error)) => { + if let Some(response_tx) = pending_requests.remove(&error.id) { + let _ = response_tx.send(Ok(Err(error.error))); + } + } + Ok(JSONRPCMessage::Notification(notification)) => { + let event = match ServerNotification::try_from(notification.clone()) { + Ok(notification) => AppServerEvent::ServerNotification(notification), + Err(_) => AppServerEvent::LegacyNotification(notification), + }; + if let Err(err) = deliver_event( + &event_tx, + &mut skipped_events, + event, + &mut stream, + ) + .await + { + warn!(%err, "failed to deliver remote app-server event"); + break; + } + } + Ok(JSONRPCMessage::Request(request)) => { + let request_id = request.id.clone(); + let method = request.method.clone(); + match ServerRequest::try_from(request) { + Ok(request) => { + if let Err(err) = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::ServerRequest(request), + &mut stream, + ) + .await + { + warn!(%err, "failed to deliver remote app-server server request"); + break; + } + } + Err(err) => { + warn!(%err, method, "rejecting unknown remote app-server request"); + if let Err(reject_err) = write_jsonrpc_message( + &mut stream, + JSONRPCMessage::Error(JSONRPCError { + error: JSONRPCErrorError { + code: -32601, + message: format!( + "unsupported remote app-server request `{method}`" + ), + data: None, + }, + id: request_id, + }), + &websocket_url, + ) + .await + { + let err_message = reject_err.to_string(); + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` write failed: {err_message}" + ), + }, + &mut stream, + ) + .await; + break; + } + } + } + } + Err(err) => { + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}" + ), + }, + &mut stream, + ) + .await; + break; + } + } + } + Some(Ok(Message::Close(frame))) => { + let reason = frame + .as_ref() + .map(|frame| frame.reason.to_string()) + .filter(|reason| !reason.is_empty()) + .unwrap_or_else(|| "connection closed".to_string()); + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` disconnected: {reason}" + ), + }, + &mut stream, + ) + .await; + break; + } + Some(Ok(Message::Binary(_))) + | Some(Ok(Message::Ping(_))) + | Some(Ok(Message::Pong(_))) + | Some(Ok(Message::Frame(_))) => {} + Some(Err(err)) => { + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` transport failed: {err}" + ), + }, + &mut stream, + ) + .await; + break; + } + None => { + let _ = deliver_event( + &event_tx, + &mut skipped_events, + AppServerEvent::Disconnected { + message: format!( + "remote app server at `{websocket_url}` closed the connection" + ), + }, + &mut stream, + ) + .await; + break; + } + } + } + } + } + + let err = IoError::new( + ErrorKind::BrokenPipe, + "remote app-server worker channel is closed", + ); + for (_, response_tx) in pending_requests { + let _ = response_tx.send(Err(IoError::new(err.kind(), err.to_string()))); + } + }); + + Ok(Self { + command_tx, + event_rx, + worker_handle, + }) + } + + pub async fn request(&self, request: ClientRequest) -> IoResult { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(RemoteClientCommand::Request { + request: Box::new(request), + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server request channel is closed", + ) + })? + } + + pub async fn request_typed(&self, request: ClientRequest) -> Result + where + T: DeserializeOwned, + { + let method = request_method_name(&request); + let response = + self.request(request) + .await + .map_err(|source| TypedRequestError::Transport { + method: method.clone(), + source, + })?; + let result = response.map_err(|source| TypedRequestError::Server { + method: method.clone(), + source, + })?; + serde_json::from_value(result) + .map_err(|source| TypedRequestError::Deserialize { method, source }) + } + + pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(RemoteClientCommand::Notify { + notification, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server notify channel is closed", + ) + })? + } + + pub async fn resolve_server_request( + &self, + request_id: RequestId, + result: JsonRpcResult, + ) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(RemoteClientCommand::ResolveServerRequest { + request_id, + result, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server resolve channel is closed", + ) + })? + } + + pub async fn reject_server_request( + &self, + request_id: RequestId, + error: JSONRPCErrorError, + ) -> IoResult<()> { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(RemoteClientCommand::RejectServerRequest { + request_id, + error, + response_tx, + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server worker channel is closed", + ) + })?; + response_rx.await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server reject channel is closed", + ) + })? + } + + pub async fn next_event(&mut self) -> Option { + self.event_rx.recv().await + } + + pub async fn shutdown(self) -> IoResult<()> { + let Self { + command_tx, + event_rx, + worker_handle, + } = self; + let mut worker_handle = worker_handle; + drop(event_rx); + let (response_tx, response_rx) = oneshot::channel(); + if command_tx + .send(RemoteClientCommand::Shutdown { response_tx }) + .await + .is_ok() + && let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await + { + command_result.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server shutdown channel is closed", + ) + })??; + } + + if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await { + worker_handle.abort(); + let _ = worker_handle.await; + } + Ok(()) + } +} + +async fn initialize_remote_connection( + stream: &mut WebSocketStream>, + websocket_url: &str, + params: InitializeParams, + initialize_timeout: Duration, +) -> IoResult<()> { + let initialize_request_id = RequestId::String("initialize".to_string()); + write_jsonrpc_message( + stream, + JSONRPCMessage::Request(jsonrpc_request_from_client_request( + ClientRequest::Initialize { + request_id: initialize_request_id.clone(), + params, + }, + )), + websocket_url, + ) + .await?; + + timeout(initialize_timeout, async { + loop { + match stream.next().await { + Some(Ok(Message::Text(text))) => { + let message = serde_json::from_str::(&text).map_err(|err| { + IoError::other(format!( + "remote app server at `{websocket_url}` sent invalid initialize response: {err}" + )) + })?; + match message { + JSONRPCMessage::Response(response) if response.id == initialize_request_id => { + break Ok(()); + } + JSONRPCMessage::Error(error) if error.id == initialize_request_id => { + break Err(IoError::other(format!( + "remote app server at `{websocket_url}` rejected initialize: {}", + error.error.message + ))); + } + _ => {} + } + } + Some(Ok(Message::Binary(_))) + | Some(Ok(Message::Ping(_))) + | Some(Ok(Message::Pong(_))) + | Some(Ok(Message::Frame(_))) => {} + Some(Ok(Message::Close(frame))) => { + let reason = frame + .as_ref() + .map(|frame| frame.reason.to_string()) + .filter(|reason| !reason.is_empty()) + .unwrap_or_else(|| "connection closed during initialize".to_string()); + break Err(IoError::new( + ErrorKind::ConnectionAborted, + format!( + "remote app server at `{websocket_url}` closed during initialize: {reason}" + ), + )); + } + Some(Err(err)) => { + break Err(IoError::other(format!( + "remote app server at `{websocket_url}` transport failed during initialize: {err}" + ))); + } + None => { + break Err(IoError::new( + ErrorKind::UnexpectedEof, + format!("remote app server at `{websocket_url}` closed during initialize"), + )); + } + } + } + }) + .await + .map_err(|_| { + IoError::new( + ErrorKind::TimedOut, + format!("timed out waiting for initialize response from `{websocket_url}`"), + ) + })??; + + write_jsonrpc_message( + stream, + JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification( + ClientNotification::Initialized, + )), + websocket_url, + ) + .await +} + +async fn deliver_event( + event_tx: &mpsc::Sender, + skipped_events: &mut usize, + event: AppServerEvent, + stream: &mut WebSocketStream>, +) -> IoResult<()> { + if *skipped_events > 0 { + if event_requires_delivery(&event) { + if event_tx + .send(AppServerEvent::Lagged { + skipped: *skipped_events, + }) + .await + .is_err() + { + return Err(IoError::new( + ErrorKind::BrokenPipe, + "remote app-server event consumer channel is closed", + )); + } + *skipped_events = 0; + } else { + match event_tx.try_send(AppServerEvent::Lagged { + skipped: *skipped_events, + }) { + Ok(()) => *skipped_events = 0, + Err(mpsc::error::TrySendError::Full(_)) => { + *skipped_events = (*skipped_events).saturating_add(1); + reject_if_server_request_dropped(stream, &event).await?; + return Ok(()); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + return Err(IoError::new( + ErrorKind::BrokenPipe, + "remote app-server event consumer channel is closed", + )); + } + } + } + } + + if event_requires_delivery(&event) { + event_tx.send(event).await.map_err(|_| { + IoError::new( + ErrorKind::BrokenPipe, + "remote app-server event consumer channel is closed", + ) + })?; + return Ok(()); + } + + match event_tx.try_send(event) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(event)) => { + *skipped_events = (*skipped_events).saturating_add(1); + reject_if_server_request_dropped(stream, &event).await + } + Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new( + ErrorKind::BrokenPipe, + "remote app-server event consumer channel is closed", + )), + } +} + +async fn reject_if_server_request_dropped( + stream: &mut WebSocketStream>, + event: &AppServerEvent, +) -> IoResult<()> { + let AppServerEvent::ServerRequest(request) = event else { + return Ok(()); + }; + write_jsonrpc_message( + stream, + JSONRPCMessage::Error(JSONRPCError { + error: JSONRPCErrorError { + code: -32001, + message: "remote app-server event queue is full".to_string(), + data: None, + }, + id: request.id().clone(), + }), + "", + ) + .await +} + +fn event_requires_delivery(event: &AppServerEvent) -> bool { + match event { + AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true, + AppServerEvent::LegacyNotification(notification) => matches!( + notification + .method + .strip_prefix("codex/event/") + .unwrap_or(¬ification.method), + "task_complete" | "turn_aborted" | "shutdown_complete" + ), + AppServerEvent::Disconnected { .. } => true, + AppServerEvent::Lagged { .. } + | AppServerEvent::ServerNotification(_) + | AppServerEvent::ServerRequest(_) => false, + } +} + +fn request_id_from_client_request(request: &ClientRequest) -> RequestId { + jsonrpc_request_from_client_request(request.clone()).id +} + +fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest { + let value = match serde_json::to_value(request) { + Ok(value) => value, + Err(err) => panic!("client request should serialize: {err}"), + }; + match serde_json::from_value(value) { + Ok(request) => request, + Err(err) => panic!("client request should encode as JSON-RPC request: {err}"), + } +} + +fn jsonrpc_notification_from_client_notification( + notification: ClientNotification, +) -> JSONRPCNotification { + let value = match serde_json::to_value(notification) { + Ok(value) => value, + Err(err) => panic!("client notification should serialize: {err}"), + }; + match serde_json::from_value(value) { + Ok(notification) => notification, + Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"), + } +} + +async fn write_jsonrpc_message( + stream: &mut WebSocketStream>, + message: JSONRPCMessage, + websocket_url: &str, +) -> IoResult<()> { + let payload = serde_json::to_string(&message).map_err(IoError::other)?; + stream + .send(Message::Text(payload.into())) + .await + .map_err(|err| { + IoError::other(format!( + "failed to write websocket message to `{websocket_url}`: {err}" + )) + }) +} diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index d6f2681e53..ba372cf1b0 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -74,6 +74,9 @@ struct MultitoolCli { #[clap(flatten)] pub feature_toggles: FeatureToggles, + #[clap(flatten)] + remote: InteractiveRemoteOptions, + #[clap(flatten)] interactive: TuiCli, @@ -204,6 +207,9 @@ struct ResumeCommand { #[arg(long = "all", default_value_t = false)] all: bool, + #[clap(flatten)] + remote: InteractiveRemoteOptions, + #[clap(flatten)] config_overrides: TuiCli, } @@ -223,10 +229,20 @@ struct ForkCommand { #[arg(long = "all", default_value_t = false)] all: bool, + #[clap(flatten)] + remote: InteractiveRemoteOptions, + #[clap(flatten)] config_overrides: TuiCli, } +#[derive(Debug, Args, Clone, Default)] +struct InteractiveRemoteOptions { + /// Connect the interactive TUI to a remote app server over websocket. + #[arg(long = "remote", value_name = "ADDR")] + remote: Option, +} + #[derive(Debug, Parser)] struct SandboxArgs { #[command(subcommand)] @@ -561,6 +577,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { let MultitoolCli { config_overrides: mut root_config_overrides, feature_toggles, + remote, mut interactive, subcommand, } = MultitoolCli::parse(); @@ -575,10 +592,12 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { &mut interactive.config_overrides, root_config_overrides.clone(), ); - let exit_info = run_interactive_tui(interactive, arg0_paths.clone()).await?; + let exit_info = + run_interactive_tui(interactive, arg0_paths.clone(), remote.remote.clone()).await?; handle_app_exit(exit_info)?; } Some(Subcommand::Exec(mut exec_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "exec")?; prepend_config_flags( &mut exec_cli.config_overrides, root_config_overrides.clone(), @@ -586,6 +605,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { codex_exec::run_main(exec_cli, arg0_paths.clone()).await?; } Some(Subcommand::Review(review_args)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "review")?; let mut exec_cli = ExecCli::try_parse_from(["codex", "exec"])?; exec_cli.command = Some(ExecCommand::Review(review_args)); prepend_config_flags( @@ -595,15 +615,18 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { codex_exec::run_main(exec_cli, arg0_paths.clone()).await?; } Some(Subcommand::McpServer) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "mcp-server")?; codex_mcp_server::run_main(arg0_paths.clone(), root_config_overrides).await?; } Some(Subcommand::Mcp(mut mcp_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "mcp")?; // Propagate any root-level config overrides (e.g. `-c key=value`). prepend_config_flags(&mut mcp_cli.config_overrides, root_config_overrides.clone()); mcp_cli.run().await?; } Some(Subcommand::AppServer(app_server_cli)) => match app_server_cli.subcommand { None => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "app-server")?; let transport = app_server_cli.listen; codex_app_server::run_main_with_transport( arg0_paths.clone(), @@ -615,6 +638,10 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { .await?; } Some(AppServerSubcommand::GenerateTs(gen_cli)) => { + reject_remote_mode_for_subcommand( + remote.remote.as_deref(), + "app-server generate-ts", + )?; let options = codex_app_server_protocol::GenerateTsOptions { experimental_api: gen_cli.experimental, ..Default::default() @@ -626,6 +653,10 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { )?; } Some(AppServerSubcommand::GenerateJsonSchema(gen_cli)) => { + reject_remote_mode_for_subcommand( + remote.remote.as_deref(), + "app-server generate-json-schema", + )?; codex_app_server_protocol::generate_json_with_experimental( &gen_cli.out_dir, gen_cli.experimental, @@ -634,12 +665,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { }, #[cfg(target_os = "macos")] Some(Subcommand::App(app_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "app")?; app_cmd::run_app(app_cli).await?; } Some(Subcommand::Resume(ResumeCommand { session_id, last, all, + remote, config_overrides, })) => { interactive = finalize_resume_interactive( @@ -650,13 +683,15 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { all, config_overrides, ); - let exit_info = run_interactive_tui(interactive, arg0_paths.clone()).await?; + let exit_info = + run_interactive_tui(interactive, arg0_paths.clone(), remote.remote).await?; handle_app_exit(exit_info)?; } Some(Subcommand::Fork(ForkCommand { session_id, last, all, + remote, config_overrides, })) => { interactive = finalize_fork_interactive( @@ -667,10 +702,12 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { all, config_overrides, ); - let exit_info = run_interactive_tui(interactive, arg0_paths.clone()).await?; + let exit_info = + run_interactive_tui(interactive, arg0_paths.clone(), remote.remote).await?; handle_app_exit(exit_info)?; } Some(Subcommand::Login(mut login_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "login")?; prepend_config_flags( &mut login_cli.config_overrides, root_config_overrides.clone(), @@ -702,6 +739,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { } } Some(Subcommand::Logout(mut logout_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "logout")?; prepend_config_flags( &mut logout_cli.config_overrides, root_config_overrides.clone(), @@ -709,9 +747,11 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { run_logout(logout_cli.config_overrides).await; } Some(Subcommand::Completion(completion_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "completion")?; print_completion(completion_cli); } Some(Subcommand::Cloud(mut cloud_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "cloud")?; prepend_config_flags( &mut cloud_cli.config_overrides, root_config_overrides.clone(), @@ -721,6 +761,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { } Some(Subcommand::Sandbox(sandbox_args)) => match sandbox_args.cmd { SandboxCommand::Macos(mut seatbelt_cli) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "sandbox macos")?; prepend_config_flags( &mut seatbelt_cli.config_overrides, root_config_overrides.clone(), @@ -732,6 +773,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { .await?; } SandboxCommand::Linux(mut landlock_cli) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "sandbox linux")?; prepend_config_flags( &mut landlock_cli.config_overrides, root_config_overrides.clone(), @@ -743,6 +785,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { .await?; } SandboxCommand::Windows(mut windows_cli) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "sandbox windows")?; prepend_config_flags( &mut windows_cli.config_overrides, root_config_overrides.clone(), @@ -756,16 +799,25 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { }, Some(Subcommand::Debug(DebugCommand { subcommand })) => match subcommand { DebugSubcommand::AppServer(cmd) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "debug app-server")?; run_debug_app_server_command(cmd).await?; } DebugSubcommand::ClearMemories => { + reject_remote_mode_for_subcommand( + remote.remote.as_deref(), + "debug clear-memories", + )?; run_debug_clear_memories_command(&root_config_overrides, &interactive).await?; } }, Some(Subcommand::Execpolicy(ExecpolicyCommand { sub })) => match sub { - ExecpolicySubcommand::Check(cmd) => run_execpolicycheck(cmd)?, + ExecpolicySubcommand::Check(cmd) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "execpolicy check")?; + run_execpolicycheck(cmd)? + } }, Some(Subcommand::Apply(mut apply_cli)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "apply")?; prepend_config_flags( &mut apply_cli.config_overrides, root_config_overrides.clone(), @@ -773,16 +825,19 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { run_apply_command(apply_cli, None).await?; } Some(Subcommand::ResponsesApiProxy(args)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "responses-api-proxy")?; tokio::task::spawn_blocking(move || codex_responses_api_proxy::run_main(args)) .await??; } Some(Subcommand::StdioToUds(cmd)) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "stdio-to-uds")?; let socket_path = cmd.socket_path; tokio::task::spawn_blocking(move || codex_stdio_to_uds::run(socket_path.as_path())) .await??; } Some(Subcommand::Features(FeaturesCli { sub })) => match sub { FeaturesSubcommand::List => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "features list")?; // Respect root-level `-c` overrides plus top-level flags like `--profile`. let mut cli_kv_overrides = root_config_overrides .parse_overrides() @@ -825,9 +880,11 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { } } FeaturesSubcommand::Enable(FeatureSetArgs { feature }) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "features enable")?; enable_feature_in_config(&interactive, &feature).await?; } FeaturesSubcommand::Disable(FeatureSetArgs { feature }) => { + reject_remote_mode_for_subcommand(remote.remote.as_deref(), "features disable")?; disable_feature_in_config(&interactive, &feature).await?; } }, @@ -952,6 +1009,7 @@ fn prepend_config_flags( async fn run_interactive_tui( mut interactive: TuiCli, arg0_paths: Arg0DispatchPaths, + remote: Option, ) -> std::io::Result { if let Some(prompt) = interactive.prompt.take() { // Normalize CRLF/CR to LF so CLI-provided text can't leak `\r` into TUI state. @@ -980,10 +1038,20 @@ async fn run_interactive_tui( interactive, arg0_paths, codex_core::config_loader::LoaderOverrides::default(), + remote, ) .await } +fn reject_remote_mode_for_subcommand(remote: Option<&str>, subcommand: &str) -> anyhow::Result<()> { + if let Some(addr) = remote { + anyhow::bail!( + "--remote {addr} is only supported for interactive TUI commands, not `codex {subcommand}`" + ); + } + Ok(()) +} + fn confirm(prompt: &str) -> std::io::Result { eprintln!("{prompt}"); @@ -1114,12 +1182,14 @@ mod tests { config_overrides: root_overrides, subcommand, feature_toggles: _, + remote: _, } = cli; let Subcommand::Resume(ResumeCommand { session_id, last, all, + remote: _, config_overrides: resume_cli, }) = subcommand.expect("resume present") else { @@ -1143,12 +1213,14 @@ mod tests { config_overrides: root_overrides, subcommand, feature_toggles: _, + remote: _, } = cli; let Subcommand::Fork(ForkCommand { session_id, last, all, + remote: _, config_overrides: fork_cli, }) = subcommand.expect("fork present") else { @@ -1158,6 +1230,44 @@ mod tests { finalize_fork_interactive(interactive, root_overrides, session_id, last, all, fork_cli) } + #[test] + fn root_cli_parses_remote_for_interactive_mode() { + let cli = MultitoolCli::try_parse_from(["codex", "--remote", "ws://example.test/socket"]) + .expect("parse should succeed"); + assert_eq!( + cli.remote.remote.as_deref(), + Some("ws://example.test/socket") + ); + assert!(cli.subcommand.is_none()); + } + + #[test] + fn resume_subcommand_parses_remote() { + let cli = MultitoolCli::try_parse_from([ + "codex", + "resume", + "--remote", + "wss://example.test/socket", + "--last", + ]) + .expect("parse should succeed"); + let Some(Subcommand::Resume(ResumeCommand { remote, last, .. })) = cli.subcommand else { + panic!("expected resume subcommand"); + }; + assert_eq!(remote.remote.as_deref(), Some("wss://example.test/socket")); + assert!(last); + } + + #[test] + fn reject_remote_mode_reports_noninteractive_subcommand() { + let err = reject_remote_mode_for_subcommand(Some("ws://example.test/socket"), "exec") + .expect_err("remote exec should be rejected"); + assert_eq!( + err.to_string(), + "--remote ws://example.test/socket is only supported for interactive TUI commands, not `codex exec`" + ); + } + #[test] fn exec_resume_last_accepts_prompt_positional() { let cli = diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 9654557b77..cda6317adf 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -43,6 +43,8 @@ use crate::tui::TuiEvent; use crate::update_action::UpdateAction; use crate::version::CODEX_CLI_VERSION; use codex_ansi_escape::ansi_escape_line; +use codex_app_server_client::AppServerClient; +#[cfg(test)] use codex_app_server_client::InProcessAppServerClient; use codex_app_server_protocol::ConfigLayerSource; use codex_app_server_protocol::RequestId; @@ -1188,7 +1190,7 @@ impl App { self.backtrack_render_pending = false; } - async fn shutdown_current_thread(&mut self, app_server: &InProcessAppServerClient) { + async fn shutdown_current_thread(&mut self, app_server: &AppServerClient) { if let Some(thread_id) = self.chat_widget.thread_id() { // Clear any in-flight rollback guard when switching threads. self.backtrack.pending_rollback = None; @@ -1515,7 +1517,7 @@ impl App { async fn submit_op_to_thread( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, thread_id: ThreadId, op: Op, ) { @@ -1832,7 +1834,7 @@ impl App { async fn start_fresh_session_with_summary_hint( &mut self, tui: &mut tui::Tui, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, ) { // Start a fresh in-memory session while preserving resumability via persisted rollout // history. @@ -2007,7 +2009,7 @@ impl App { #[allow(clippy::too_many_arguments)] pub async fn run( tui: &mut tui::Tui, - mut app_server: InProcessAppServerClient, + mut app_server: AppServerClient, mut config: Config, cli_kv_overrides: Vec<(String, TomlValue)>, harness_overrides: ConfigOverrides, @@ -2215,7 +2217,8 @@ impl App { app.thread_resume_via_app_server( &app_server, &resume_config, - target_session.path.clone(), + target_session.thread_id, + target_session.rollout_path.clone(), ) .await .map_err(|err| { @@ -2233,7 +2236,8 @@ impl App { app.thread_fork_via_app_server( &app_server, &fork_config, - target_session.path.clone(), + target_session.thread_id, + target_session.rollout_path.clone(), ) .await .map_err(|err| { @@ -2346,9 +2350,11 @@ impl App { None => { listen_for_app_server_events = false; tracing::warn!("app-server event stream closed"); + AppRunControl::Exit(ExitReason::Fatal( + "App server event stream closed unexpectedly".to_string(), + )) } } - AppRunControl::Continue } }; if App::should_stop_waiting_for_initial_session( @@ -2452,7 +2458,7 @@ impl App { async fn handle_event( &mut self, tui: &mut tui::Tui, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, event: AppEvent, ) -> Result { match event { @@ -2468,7 +2474,18 @@ impl App { .await; } AppEvent::OpenResumePicker => { - match crate::resume_picker::run_resume_picker(tui, &self.config, false).await? { + let selection = if app_server.is_remote() { + crate::resume_picker::run_remote_resume_picker( + tui, + app_server, + &self.config, + false, + ) + .await? + } else { + crate::resume_picker::run_resume_picker(tui, &self.config, false).await? + }; + match selection { SessionSelection::Resume(target_session) => { let current_cwd = self.config.cwd.clone(); let resume_cwd = match crate::resolve_cwd_for_resume_or_fork( @@ -2476,7 +2493,8 @@ impl App { &self.config, ¤t_cwd, target_session.thread_id, - &target_session.path, + target_session.cwd.as_deref(), + target_session.rollout_path.as_deref(), CwdPromptAction::Resume, true, ) @@ -2510,7 +2528,8 @@ impl App { .thread_resume_via_app_server( app_server, &resume_config, - target_session.path.clone(), + target_session.thread_id, + target_session.rollout_path.clone(), ) .await { @@ -2583,8 +2602,19 @@ impl App { // materialized lazily on first user message. if path.exists() { let fork_config = self.config.clone(); + let Some(thread_id) = self.chat_widget.thread_id() else { + self.chat_widget.add_error_message( + "Failed to determine the current thread to fork.".to_string(), + ); + return Ok(AppRunControl::Continue); + }; match self - .thread_fork_via_app_server(app_server, &fork_config, path.clone()) + .thread_fork_via_app_server( + app_server, + &fork_config, + thread_id, + Some(path.clone()), + ) .await { Ok(session_configured) => { @@ -3847,7 +3877,7 @@ impl App { async fn handle_exit_mode( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, mode: ExitMode, ) -> AppRunControl { match mode { @@ -4424,24 +4454,26 @@ mod tests { }) } - async fn start_test_app_server(config: Config) -> InProcessAppServerClient { - InProcessAppServerClient::start(codex_app_server_client::InProcessClientStartArgs { - arg0_paths: Arg0DispatchPaths::default(), - config: Arc::new(config), - cli_overrides: Vec::new(), - loader_overrides: LoaderOverrides::default(), - feedback: codex_feedback::CodexFeedback::new(), - config_warnings: Vec::new(), - session_source: SessionSource::Cli, - enable_codex_api_key_env: false, - client_name: "codex-tui-test".to_string(), - client_version: "0.0.0-test".to_string(), - experimental_api: true, - opt_out_notification_methods: Vec::new(), - channel_capacity: codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, - }) - .await - .expect("in-process app server should start") + async fn start_test_app_server(config: Config) -> AppServerClient { + let client = + InProcessAppServerClient::start(codex_app_server_client::InProcessClientStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(config), + cli_overrides: Vec::new(), + loader_overrides: LoaderOverrides::default(), + feedback: codex_feedback::CodexFeedback::new(), + config_warnings: Vec::new(), + session_source: SessionSource::Cli, + enable_codex_api_key_env: false, + client_name: "codex-tui-test".to_string(), + client_version: "0.0.0-test".to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await + .expect("in-process app server should start"); + AppServerClient::in_process(client) } #[test] @@ -4477,7 +4509,9 @@ mod tests { App::should_wait_for_initial_session(&SessionSelection::Resume( crate::resume_picker::SessionTarget { path: PathBuf::from("/tmp/restore"), + rollout_path: Some(PathBuf::from("/tmp/restore")), thread_id: ThreadId::new(), + cwd: None, } )), false @@ -4486,7 +4520,9 @@ mod tests { App::should_wait_for_initial_session(&SessionSelection::Fork( crate::resume_picker::SessionTarget { path: PathBuf::from("/tmp/fork"), + rollout_path: Some(PathBuf::from("/tmp/fork")), thread_id: ThreadId::new(), + cwd: None, } )), false @@ -4526,7 +4562,9 @@ mod tests { let wait_for_resume = App::should_wait_for_initial_session(&SessionSelection::Resume( crate::resume_picker::SessionTarget { path: PathBuf::from("/tmp/restore"), + rollout_path: Some(PathBuf::from("/tmp/restore")), thread_id: ThreadId::new(), + cwd: None, }, )); assert_eq!( @@ -4536,7 +4574,9 @@ mod tests { let wait_for_fork = App::should_wait_for_initial_session(&SessionSelection::Fork( crate::resume_picker::SessionTarget { path: PathBuf::from("/tmp/fork"), + rollout_path: Some(PathBuf::from("/tmp/fork")), thread_id: ThreadId::new(), + cwd: None, }, )); assert_eq!( diff --git a/codex-rs/tui/src/app/app_server_adapter.rs b/codex-rs/tui/src/app/app_server_adapter.rs index 202b463fa7..1398b5cd64 100644 --- a/codex-rs/tui/src/app/app_server_adapter.rs +++ b/codex-rs/tui/src/app/app_server_adapter.rs @@ -3,8 +3,10 @@ use std::path::Path; use std::path::PathBuf; use super::App; -use codex_app_server_client::InProcessAppServerClient; -use codex_app_server_client::InProcessServerEvent; +use super::AppRunControl; +use super::ExitReason; +use codex_app_server_client::AppServerClient; +use codex_app_server_client::AppServerEvent; use codex_app_server_client::local_external_chatgpt_tokens; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; @@ -94,7 +96,7 @@ use crate::thread_update::ThreadUpdate; impl App { pub(super) async fn list_models_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, ) -> Result, String> { let response: ModelListResponse = send_request_with_response( app_server_client, @@ -113,7 +115,7 @@ impl App { } pub(super) async fn list_collaboration_modes_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, ) -> Result, String> { let response: CollaborationModeListResponse = send_request_with_response( app_server_client, @@ -132,7 +134,7 @@ impl App { } pub(super) async fn read_account_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, ) -> Result, String> { let response: GetAccountResponse = send_request_with_response( app_server_client, @@ -149,7 +151,7 @@ impl App { } pub(super) async fn read_account_rate_limits_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, ) -> Result { send_request_with_response( app_server_client, @@ -163,7 +165,7 @@ impl App { } pub(super) async fn list_apps_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, thread_id: Option, force_refetch: bool, ) -> Result, String> { @@ -185,7 +187,7 @@ impl App { } pub(super) async fn upload_feedback_via_app_server( - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, request_id: RequestId, classification: String, reason: Option, @@ -219,7 +221,7 @@ impl App { pub(super) async fn thread_start_via_app_server( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, config: &Config, ) -> Result { let response: ThreadStartResponse = send_request_with_response( @@ -231,7 +233,8 @@ impl App { "thread/start", ) .await?; - let (history_log_id, history_entry_count) = history_metadata(config).await; + let (history_log_id, history_entry_count) = + history_metadata(config, app_server_client).await; session_configured_from_thread_start_response( &response, history_log_id, @@ -241,20 +244,22 @@ impl App { pub(super) async fn thread_resume_via_app_server( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, config: &Config, - path: PathBuf, + thread_id: ThreadId, + path: Option, ) -> Result { let response: ThreadResumeResponse = send_request_with_response( app_server_client, ClientRequest::ThreadResume { request_id: self.next_app_server_request_id(), - params: thread_resume_params_from_config(config, path), + params: thread_resume_params_from_config(config, thread_id, path), }, "thread/resume", ) .await?; - let (history_log_id, history_entry_count) = history_metadata(config).await; + let (history_log_id, history_entry_count) = + history_metadata(config, app_server_client).await; session_configured_from_thread_resume_response( &response, history_log_id, @@ -264,26 +269,28 @@ impl App { pub(super) async fn thread_fork_via_app_server( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, config: &Config, - path: PathBuf, + thread_id: ThreadId, + path: Option, ) -> Result { let response: ThreadForkResponse = send_request_with_response( app_server_client, ClientRequest::ThreadFork { request_id: self.next_app_server_request_id(), - params: thread_fork_params_from_config(config, path), + params: thread_fork_params_from_config(config, thread_id, path), }, "thread/fork", ) .await?; - let (history_log_id, history_entry_count) = history_metadata(config).await; + let (history_log_id, history_entry_count) = + history_metadata(config, app_server_client).await; session_configured_from_thread_fork_response(&response, history_log_id, history_entry_count) } pub(super) async fn unsubscribe_thread_via_app_server( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, thread_id: ThreadId, ) -> Result<(), String> { let _: ThreadUnsubscribeResponse = send_request_with_response( @@ -303,7 +310,7 @@ impl App { pub(super) async fn submit_app_server_op( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, thread_id: ThreadId, op: Op, ) -> bool { @@ -319,7 +326,7 @@ impl App { async fn submit_app_server_op_inner( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, thread_id: ThreadId, op: Op, ) -> Result<(), String> { @@ -729,6 +736,15 @@ impl App { | Op::GetHistoryEntryRequest { .. } | Op::ListMcpTools | Op::OverrideTurnContext { .. } => { + if app_server_client.is_remote() { + match remote_legacy_op_behavior(&op) { + RemoteLegacyOpBehavior::Ignore => return Ok(()), + RemoteLegacyOpBehavior::UserFacingError(message) => { + return Err(message.to_string()); + } + RemoteLegacyOpBehavior::UseLegacyRuntime => {} + } + } // TODO(app-server): migrate these legacy-only TUI features once app-server grows // equivalent APIs. Until then, keep routing the still-emitted TUI ops through the // shared in-process thread runtime so existing behavior does not regress. @@ -843,7 +859,7 @@ impl App { async fn handle_server_notification( &mut self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, notification: ServerNotification, ) { match notification { @@ -1049,11 +1065,11 @@ impl App { pub(super) async fn handle_app_server_event( &mut self, - app_server_client: &InProcessAppServerClient, - event: InProcessServerEvent, - ) { + app_server_client: &AppServerClient, + event: AppServerEvent, + ) -> AppRunControl { match event { - InProcessServerEvent::Lagged { skipped } => { + AppServerEvent::Lagged { skipped } => { tracing::warn!( skipped, "app-server event consumer lagged; dropping ignored events" @@ -1061,14 +1077,14 @@ impl App { self.chat_widget .add_error_message(lagged_event_warning_message(skipped)); } - InProcessServerEvent::ServerNotification(notification) => { + AppServerEvent::ServerNotification(notification) => { self.handle_server_notification(app_server_client, notification) .await; } - InProcessServerEvent::LegacyNotification(notification) => { + AppServerEvent::LegacyNotification(notification) => { self.handle_legacy_notification(notification); } - InProcessServerEvent::ServerRequest(request) => { + AppServerEvent::ServerRequest(request) => { self.note_server_request(&request); match request.clone() { ServerRequest::CommandExecutionRequestApproval { request_id, params } => { @@ -1144,7 +1160,13 @@ impl App { } } } + AppServerEvent::Disconnected { message } => { + return AppRunControl::Exit(ExitReason::Fatal(format!( + "Remote app server disconnected: {message}" + ))); + } } + AppRunControl::Continue } fn handle_legacy_notification(&mut self, notification: JSONRPCNotification) { @@ -1183,7 +1205,7 @@ impl App { async fn reject_app_server_request( &self, - app_server_client: &InProcessAppServerClient, + app_server_client: &AppServerClient, request_id: RequestId, reason: String, ) -> Result<(), String> { @@ -1202,7 +1224,7 @@ impl App { } async fn send_request_with_response( - client: &InProcessAppServerClient, + client: &AppServerClient, request: ClientRequest, method: &str, ) -> Result @@ -1240,11 +1262,15 @@ fn thread_start_params_from_config(config: &Config) -> ThreadStartParams { } } -fn thread_resume_params_from_config(config: &Config, path: PathBuf) -> ThreadResumeParams { +fn thread_resume_params_from_config( + config: &Config, + thread_id: ThreadId, + path: Option, +) -> ThreadResumeParams { ThreadResumeParams { - thread_id: "resume".to_string(), + thread_id: thread_id.to_string(), history: None, - path: Some(path), + path, model: config.model.clone(), model_provider: Some(config.model_provider_id.clone()), service_tier: config.service_tier.map(Some), @@ -1260,10 +1286,14 @@ fn thread_resume_params_from_config(config: &Config, path: PathBuf) -> ThreadRes } } -fn thread_fork_params_from_config(config: &Config, path: PathBuf) -> ThreadForkParams { +fn thread_fork_params_from_config( + config: &Config, + thread_id: ThreadId, + path: Option, +) -> ThreadForkParams { ThreadForkParams { - thread_id: "fork".to_string(), - path: Some(path), + thread_id: thread_id.to_string(), + path, model: config.model.clone(), model_provider: Some(config.model_provider_id.clone()), service_tier: config.service_tier.map(Some), @@ -1403,7 +1433,10 @@ fn session_configured_from_thread_response( }) } -async fn history_metadata(config: &Config) -> (u64, usize) { +async fn history_metadata(config: &Config, app_server_client: &AppServerClient) -> (u64, usize) { + if app_server_client.is_remote() { + return (0, 0); + } let path = history_filepath(config); history_metadata_for_file(&path).await } @@ -1412,6 +1445,40 @@ fn history_filepath(config: &Config) -> PathBuf { config.codex_home.join("history.jsonl") } +enum RemoteLegacyOpBehavior { + Ignore, + UserFacingError(&'static str), + UseLegacyRuntime, +} + +fn remote_legacy_op_behavior(op: &Op) -> RemoteLegacyOpBehavior { + match op { + Op::AddToHistory { .. } | Op::ListCustomPrompts => RemoteLegacyOpBehavior::Ignore, + Op::GetHistoryEntryRequest { .. } => RemoteLegacyOpBehavior::UserFacingError( + "Persistent prompt history is not available when connected to a remote app server yet. Reconnect without `--remote` to use shared history.", + ), + Op::ListMcpTools => RemoteLegacyOpBehavior::UserFacingError( + "The `/mcp` tools listing is not available when connected to a remote app server yet. Reconnect without `--remote` to use this command.", + ), + Op::RunUserShellCommand { .. } => RemoteLegacyOpBehavior::UserFacingError( + "User shell commands are not available when connected to a remote app server yet. Reconnect without `--remote` to use `!cmd`.", + ), + Op::DropMemories | Op::UpdateMemories => RemoteLegacyOpBehavior::UserFacingError( + "Memory management is not available when connected to a remote app server yet. Reconnect without `--remote` to use `/memory`.", + ), + Op::ReloadUserConfig => RemoteLegacyOpBehavior::UserFacingError( + "Reloading the active session config is not available when connected to a remote app server yet. Reconnect without `--remote` to use this action.", + ), + Op::OverrideTurnContext { .. } => RemoteLegacyOpBehavior::UserFacingError( + "Changing live turn settings is not available when connected to a remote app server yet. Reconnect without `--remote` to use this action.", + ), + Op::Undo => RemoteLegacyOpBehavior::UserFacingError( + "Undo is not available when connected to a remote app server yet. Reconnect without `--remote` to use this action.", + ), + _ => RemoteLegacyOpBehavior::UseLegacyRuntime, + } +} + async fn history_metadata_for_file(path: &Path) -> (u64, usize) { let log_id = match fs::metadata(path).await { Ok(metadata) => history_log_id(&metadata).unwrap_or(0), @@ -1602,7 +1669,7 @@ fn credits_snapshot_from_api( } async fn resolve_server_request( - client: &InProcessAppServerClient, + client: &AppServerClient, request_id: RequestId, value: T, method: &str, @@ -1657,3 +1724,53 @@ fn legacy_op_name(op: &Op) -> &'static str { _ => "unknown", } } + +#[cfg(test)] +mod tests { + use super::RemoteLegacyOpBehavior; + use super::history_metadata_for_file; + use super::remote_legacy_op_behavior; + use codex_protocol::protocol::Op; + use pretty_assertions::assert_eq; + use tempfile::NamedTempFile; + + #[test] + fn remote_mode_ignores_startup_only_legacy_ops() { + assert!(matches!( + remote_legacy_op_behavior(&Op::ListCustomPrompts), + RemoteLegacyOpBehavior::Ignore + )); + assert!(matches!( + remote_legacy_op_behavior(&Op::AddToHistory { + text: "hello".to_string(), + }), + RemoteLegacyOpBehavior::Ignore + )); + } + + #[test] + fn remote_mode_returns_targeted_message_for_unsupported_legacy_ops() { + let behavior = remote_legacy_op_behavior(&Op::RunUserShellCommand { + command: "pwd".to_string(), + }); + let RemoteLegacyOpBehavior::UserFacingError(message) = behavior else { + panic!("expected targeted remote-mode error"); + }; + assert_eq!( + message, + "User shell commands are not available when connected to a remote app server yet. Reconnect without `--remote` to use `!cmd`." + ); + } + + #[tokio::test] + async fn history_metadata_counts_lines_for_existing_file() { + let file = NamedTempFile::new().expect("tempfile"); + tokio::fs::write(file.path(), b"{\"text\":\"one\"}\n{\"text\":\"two\"}\n") + .await + .expect("history write"); + + let (_, count) = history_metadata_for_file(file.path()).await; + + assert_eq!(count, 2); + } +} diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 808f2d6f60..be3b86be66 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -7,9 +7,11 @@ use additional_dirs::add_dir_warning_message; use app::App; pub use app::AppExitInfo; pub use app::ExitReason; +use codex_app_server_client::AppServerClient; use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; use codex_app_server_client::InProcessAppServerClient; use codex_app_server_client::InProcessClientStartArgs; +use codex_app_server_client::RemoteAppServerConnectArgs; use codex_app_server_client::shared_cloud_requirements_loader; use codex_app_server_protocol::ConfigWarningNotification; use codex_core::INTERACTIVE_SESSION_SOURCES; @@ -297,10 +299,25 @@ where Ok(client) } +async fn connect_remote_app_server(websocket_url: &str) -> color_eyre::Result { + AppServerClient::connect_remote(RemoteAppServerConnectArgs { + websocket_url: websocket_url.to_string(), + client_name: "codex-tui".to_string(), + client_version: env!("CARGO_PKG_VERSION").to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await + .map_err(color_eyre::Report::from) + .wrap_err("failed to connect to remote app server") +} + pub async fn run_main( mut cli: Cli, arg0_paths: Arg0DispatchPaths, loader_overrides: LoaderOverrides, + remote_url: Option, ) -> std::io::Result { let (sandbox_mode, approval_policy) = if cli.full_auto { ( @@ -591,6 +608,7 @@ pub async fn run_main( cli, arg0_paths, loader_overrides, + remote_url, config, overrides, cli_kv_overrides, @@ -606,6 +624,7 @@ async fn run_ratatui_app( cli: Cli, arg0_paths: Arg0DispatchPaths, loader_overrides: LoaderOverrides, + remote_url: Option, initial_config: Config, overrides: ConfigOverrides, cli_kv_overrides: Vec<(String, toml::Value)>, @@ -656,8 +675,16 @@ async fn run_ratatui_app( session_log::maybe_init(&initial_config); let mut onboarding_app_server = if initial_config.model_provider.requires_openai_auth { - Some( - match start_embedded_app_server( + Some(match remote_url.as_deref() { + Some(websocket_url) => match connect_remote_app_server(websocket_url).await { + Ok(app_server) => app_server, + Err(err) => { + restore(); + session_log::log_session_end(); + return Err(err); + } + }, + None => match start_embedded_app_server( arg0_paths.clone(), initial_config.clone(), cli_kv_overrides.clone(), @@ -666,14 +693,14 @@ async fn run_ratatui_app( ) .await { - Ok(app_server) => app_server, + Ok(app_server) => AppServerClient::in_process(app_server), Err(err) => { restore(); session_log::log_session_end(); return Err(err); } }, - ) + }) } else { None }; @@ -694,6 +721,7 @@ async fn run_ratatui_app( show_login_screen, show_trust_screen: should_show_trust_screen_flag, login_status, + allow_device_code_login: remote_url.is_none(), config: initial_config.clone(), }, &mut tui, @@ -773,7 +801,104 @@ async fn run_ratatui_app( }; let use_fork = cli.fork_picker || cli.fork_last || cli.fork_session_id.is_some(); - let session_selection = if use_fork { + let mut remote_session_app_server = if let Some(websocket_url) = remote_url.as_deref() { + Some(match connect_remote_app_server(websocket_url).await { + Ok(app_server) => app_server, + Err(err) => { + restore(); + session_log::log_session_end(); + return Err(err); + } + }) + } else { + None + }; + let session_selection = if let Some(app_server) = remote_session_app_server.as_ref() { + if use_fork { + if let Some(id_str) = cli.fork_session_id.as_deref() { + match resume_picker::find_remote_session_target(app_server, &config, id_str).await? + { + Some(target_session) => resume_picker::SessionSelection::Fork(target_session), + None => return missing_session_exit(id_str, "fork"), + } + } else if cli.fork_last { + match resume_picker::latest_remote_session_target( + app_server, + &config, + cli.fork_show_all, + ) + .await? + { + Some(target_session) => resume_picker::SessionSelection::Fork(target_session), + None => resume_picker::SessionSelection::StartFresh, + } + } else if cli.fork_picker { + match resume_picker::run_remote_fork_picker( + &mut tui, + app_server, + &config, + cli.fork_show_all, + ) + .await? + { + resume_picker::SessionSelection::Exit => { + restore(); + session_log::log_session_end(); + return Ok(AppExitInfo { + token_usage: codex_protocol::protocol::TokenUsage::default(), + thread_id: None, + thread_name: None, + update_action: None, + exit_reason: ExitReason::UserRequested, + }); + } + other => other, + } + } else { + resume_picker::SessionSelection::StartFresh + } + } else if let Some(id_str) = cli.resume_session_id.as_deref() { + match resume_picker::find_remote_session_target(app_server, &config, id_str).await? { + Some(target_session) => resume_picker::SessionSelection::Resume(target_session), + None => return missing_session_exit(id_str, "resume"), + } + } else if cli.resume_last { + match resume_picker::latest_remote_session_target( + app_server, + &config, + cli.resume_show_all, + ) + .await? + { + Some(target_session) => resume_picker::SessionSelection::Resume(target_session), + None => resume_picker::SessionSelection::StartFresh, + } + } else if cli.resume_picker { + match resume_picker::run_remote_resume_picker( + &mut tui, + app_server, + &config, + cli.resume_show_all, + ) + .await? + { + resume_picker::SessionSelection::Exit => { + restore(); + session_log::log_session_end(); + return Ok(AppExitInfo { + token_usage: codex_protocol::protocol::TokenUsage::default(), + thread_id: None, + thread_name: None, + update_action: None, + exit_reason: ExitReason::UserRequested, + }); + } + other => other, + } + } else { + resume_picker::SessionSelection::StartFresh + } + } else if use_fork { if let Some(id_str) = cli.fork_session_id.as_deref() { let is_uuid = Uuid::parse_str(id_str).is_ok(); let path = if is_uuid { @@ -791,8 +916,10 @@ async fn run_ratatui_app( None => return missing_session_exit(id_str, "fork"), }; resume_picker::SessionSelection::Fork(resume_picker::SessionTarget { - path, + path: path.clone(), + rollout_path: Some(path.clone()), thread_id, + cwd: None, }) } None => return missing_session_exit(id_str, "fork"), @@ -817,7 +944,9 @@ async fn run_ratatui_app( Some(thread_id) => resume_picker::SessionSelection::Fork( resume_picker::SessionTarget { path: item.path.clone(), + rollout_path: Some(item.path.clone()), thread_id, + cwd: item.cwd.clone(), }, ), None => { @@ -881,8 +1010,10 @@ async fn run_ratatui_app( None => return missing_session_exit(id_str, "resume"), }; resume_picker::SessionSelection::Resume(resume_picker::SessionTarget { - path, + path: path.clone(), + rollout_path: Some(path.clone()), thread_id, + cwd: None, }) } None => return missing_session_exit(id_str, "resume"), @@ -909,8 +1040,10 @@ async fn run_ratatui_app( Ok(Some(path)) => match resolve_session_thread_id(path.as_path(), None).await { Some(thread_id) => { resume_picker::SessionSelection::Resume(resume_picker::SessionTarget { - path, + path: path.clone(), + rollout_path: Some(path.clone()), thread_id, + cwd: None, }) } None => { @@ -969,7 +1102,8 @@ async fn run_ratatui_app( &config, ¤t_cwd, target_session.thread_id, - &target_session.path, + target_session.cwd.as_deref(), + target_session.rollout_path.as_deref(), action, allow_prompt, ) @@ -1031,21 +1165,24 @@ async fn run_ratatui_app( let use_alt_screen = determine_alt_screen_mode(no_alt_screen, config.tui_alternate_screen); tui.set_alt_screen_enabled(use_alt_screen); - let app_server = match start_embedded_app_server( - arg0_paths, - config.clone(), - cli_kv_overrides.clone(), - loader_overrides, - feedback.clone(), - ) - .await - { - Ok(app_server) => app_server, - Err(err) => { - restore(); - session_log::log_session_end(); - return Err(err); - } + let app_server = match remote_session_app_server.take() { + Some(app_server) => app_server, + None => match start_embedded_app_server( + arg0_paths, + config.clone(), + cli_kv_overrides.clone(), + loader_overrides, + feedback.clone(), + ) + .await + { + Ok(app_server) => AppServerClient::in_process(app_server), + Err(err) => { + restore(); + session_log::log_session_end(); + return Err(err); + } + }, }; let app_result = App::run( @@ -1149,16 +1286,28 @@ pub(crate) enum ResolveCwdOutcome { Exit, } +#[expect( + clippy::too_many_arguments, + reason = "resume/fork cwd resolution needs explicit inputs" +)] pub(crate) async fn resolve_cwd_for_resume_or_fork( tui: &mut Tui, config: &Config, current_cwd: &Path, thread_id: ThreadId, - path: &Path, + session_cwd_hint: Option<&Path>, + path: Option<&Path>, action: CwdPromptAction, allow_prompt: bool, ) -> color_eyre::Result { - let Some(history_cwd) = read_session_cwd(config, thread_id, path).await else { + let history_cwd = if let Some(session_cwd_hint) = session_cwd_hint { + Some(session_cwd_hint.to_path_buf()) + } else if let Some(path) = path { + read_session_cwd(config, thread_id, path).await + } else { + None + }; + let Some(history_cwd) = history_cwd else { return Ok(ResolveCwdOutcome::Continue(None)); }; if allow_prompt && cwds_differ(current_cwd, &history_cwd) { diff --git a/codex-rs/tui/src/main.rs b/codex-rs/tui/src/main.rs index 3fe279df3f..e21590c9ca 100644 --- a/codex-rs/tui/src/main.rs +++ b/codex-rs/tui/src/main.rs @@ -10,10 +10,19 @@ struct TopCli { #[clap(flatten)] config_overrides: CliConfigOverrides, + #[clap(flatten)] + remote: InteractiveRemoteOptions, + #[clap(flatten)] inner: Cli, } +#[derive(Parser, Debug, Default)] +struct InteractiveRemoteOptions { + #[arg(long = "remote", value_name = "ADDR")] + remote: Option, +} + fn main() -> anyhow::Result<()> { arg0_dispatch_or_else(|arg0_paths: Arg0DispatchPaths| async move { let top_cli = TopCli::parse(); @@ -26,6 +35,7 @@ fn main() -> anyhow::Result<()> { inner, arg0_paths, codex_core::config_loader::LoaderOverrides::default(), + top_cli.remote.remote, ) .await?; let token_usage = exit_info.token_usage; diff --git a/codex-rs/tui/src/onboarding/account_login.rs b/codex-rs/tui/src/onboarding/account_login.rs index 16831ba297..aa941037d1 100644 --- a/codex-rs/tui/src/onboarding/account_login.rs +++ b/codex-rs/tui/src/onboarding/account_login.rs @@ -1,4 +1,4 @@ -use codex_app_server_client::InProcessAppServerClient; +use codex_app_server_client::AppServerClient; use codex_app_server_protocol::Account; use codex_app_server_protocol::CancelLoginAccountParams; use codex_app_server_protocol::CancelLoginAccountResponse; @@ -30,7 +30,7 @@ pub(crate) struct OnboardingAccountApi { impl OnboardingAccountApi { pub(crate) async fn read_account( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, ) -> Result { send_request_with_response( app_server, @@ -47,7 +47,7 @@ impl OnboardingAccountApi { pub(crate) async fn start_api_key_login( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, api_key: String, ) -> Result { send_request_with_response( @@ -63,7 +63,7 @@ impl OnboardingAccountApi { pub(crate) async fn start_chatgpt_login( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, ) -> Result { send_request_with_response( app_server, @@ -78,7 +78,7 @@ impl OnboardingAccountApi { pub(crate) async fn cancel_chatgpt_login( &mut self, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, login_id: String, ) -> Result { send_request_with_response( @@ -120,15 +120,13 @@ fn login_status_from_account_read_result( } } -pub(crate) async fn read_login_status_via_app_server( - app_server: &InProcessAppServerClient, -) -> LoginStatus { +pub(crate) async fn read_login_status_via_app_server(app_server: &AppServerClient) -> LoginStatus { let mut api = OnboardingAccountApi::default(); login_status_from_account_read_result(api.read_account(app_server).await) } async fn send_request_with_response( - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, request: ClientRequest, method: &str, ) -> Result diff --git a/codex-rs/tui/src/onboarding/auth.rs b/codex-rs/tui/src/onboarding/auth.rs index 4402095a4c..e1d492a43d 100644 --- a/codex-rs/tui/src/onboarding/auth.rs +++ b/codex-rs/tui/src/onboarding/auth.rs @@ -200,6 +200,7 @@ pub(crate) struct AuthModeWidget { pub login_status: LoginStatus, pub forced_chatgpt_workspace_id: Option, pub forced_login_method: Option, + pub allow_device_code_login: bool, pub animations_enabled: bool, } @@ -214,7 +215,7 @@ impl AuthModeWidget { fn displayed_sign_in_options(&self) -> Vec { let mut options = vec![SignInOption::ChatGpt]; - if self.is_chatgpt_login_allowed() { + if self.is_chatgpt_login_allowed() && self.allow_device_code_login { options.push(SignInOption::DeviceCode); } if self.is_api_login_allowed() { @@ -881,6 +882,7 @@ mod tests { login_status: LoginStatus::NotAuthenticated, forced_chatgpt_workspace_id: None, forced_login_method, + allow_device_code_login: true, animations_enabled: true, } } @@ -1014,6 +1016,17 @@ mod tests { assert_snapshot!("device_code_login_pending", format!("{buf:?}")); } + #[test] + fn device_code_option_hidden_when_not_allowed() { + let mut widget = auth_widget(None, SignInOption::ChatGpt); + widget.allow_device_code_login = false; + + assert_eq!( + widget.displayed_sign_in_options(), + vec![SignInOption::ChatGpt, SignInOption::ApiKey] + ); + } + #[test] fn mark_url_hyperlink_wraps_cyan_underlined_cells() { let url = "https://example.com"; diff --git a/codex-rs/tui/src/onboarding/onboarding_screen.rs b/codex-rs/tui/src/onboarding/onboarding_screen.rs index 81360ee957..85701b5014 100644 --- a/codex-rs/tui/src/onboarding/onboarding_screen.rs +++ b/codex-rs/tui/src/onboarding/onboarding_screen.rs @@ -1,5 +1,5 @@ -use codex_app_server_client::InProcessAppServerClient; -use codex_app_server_client::InProcessServerEvent; +use codex_app_server_client::AppServerClient; +use codex_app_server_client::AppServerEvent; use codex_app_server_protocol::LoginAccountResponse; use codex_app_server_protocol::ServerNotification; use codex_core::config::Config; @@ -71,6 +71,7 @@ pub(crate) struct OnboardingScreenArgs { pub show_trust_screen: bool, pub show_login_screen: bool, pub login_status: LoginStatus, + pub allow_device_code_login: bool, pub config: Config, } @@ -89,6 +90,7 @@ impl OnboardingScreen { show_trust_screen, show_login_screen, login_status, + allow_device_code_login, config, } = args; let cwd = config.cwd.clone(); @@ -119,6 +121,7 @@ impl OnboardingScreen { login_status, forced_chatgpt_workspace_id: config.forced_chatgpt_workspace_id.clone(), forced_login_method, + allow_device_code_login, animations_enabled: config.animations, })) } @@ -412,7 +415,7 @@ impl WidgetRef for Step { pub(crate) async fn run_onboarding_app( args: OnboardingScreenArgs, tui: &mut Tui, - app_server: Option<&mut InProcessAppServerClient>, + app_server: Option<&mut AppServerClient>, ) -> Result { use tokio_stream::StreamExt; @@ -458,7 +461,7 @@ pub(crate) async fn run_onboarding_app( app_server, &mut account_api, &mut onboarding_screen, - ).await; + ).await?; } else { return Err(eyre!( "onboarding app server event stream closed before onboarding completed" @@ -538,7 +541,7 @@ fn handle_tui_event( async fn handle_auth_command( command: AuthCommand, - app_server: &InProcessAppServerClient, + app_server: &AppServerClient, account_api: &mut OnboardingAccountApi, onboarding_screen: &mut OnboardingScreen, ) { @@ -587,13 +590,13 @@ async fn handle_auth_command( } async fn handle_app_server_event( - event: InProcessServerEvent, - app_server: &InProcessAppServerClient, + event: AppServerEvent, + app_server: &AppServerClient, account_api: &mut OnboardingAccountApi, onboarding_screen: &mut OnboardingScreen, -) { +) -> Result<()> { match event { - InProcessServerEvent::ServerNotification(ServerNotification::AccountUpdated(_)) => { + AppServerEvent::ServerNotification(ServerNotification::AccountUpdated(_)) => { match account_api.read_account(app_server).await { Ok(response) => { if let Some(auth_widget) = onboarding_screen.auth_widget_mut() { @@ -603,16 +606,20 @@ async fn handle_app_server_event( Err(err) => tracing::warn!("failed to refresh onboarding account state: {err}"), } } - InProcessServerEvent::ServerNotification(ServerNotification::AccountLoginCompleted( - payload, - )) => { + AppServerEvent::ServerNotification(ServerNotification::AccountLoginCompleted(payload)) => { if let Some(auth_widget) = onboarding_screen.auth_widget_mut() { auth_widget.apply_login_completed(payload); } } - InProcessServerEvent::ServerNotification(_) - | InProcessServerEvent::ServerRequest(_) - | InProcessServerEvent::Lagged { .. } => {} - _ => {} + AppServerEvent::Disconnected { message } => { + return Err(eyre!( + "onboarding app server disconnected before onboarding completed: {message}" + )); + } + AppServerEvent::ServerNotification(_) + | AppServerEvent::ServerRequest(_) + | AppServerEvent::Lagged { .. } + | AppServerEvent::LegacyNotification(_) => {} } + Ok(()) } diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 793e0b947d..9c11dd20ab 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -12,6 +12,16 @@ use crate::tui::Tui; use crate::tui::TuiEvent; use chrono::DateTime; use chrono::Utc; +use codex_app_server_client::AppServerClient; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadReadParams; +use codex_app_server_protocol::ThreadReadResponse; +use codex_app_server_protocol::ThreadSortKey as ApiThreadSortKey; +use codex_app_server_protocol::ThreadSourceKind; use codex_core::Cursor; use codex_core::INTERACTIVE_SESSION_SOURCES; use codex_core::RolloutRecorder; @@ -43,7 +53,9 @@ const LOAD_NEAR_THRESHOLD: usize = 5; #[derive(Debug, Clone)] pub struct SessionTarget { pub path: PathBuf, + pub rollout_path: Option, pub thread_id: ThreadId, + pub cwd: Option, } #[derive(Debug, Clone)] @@ -75,8 +87,19 @@ impl SessionPickerAction { } } - fn selection(self, path: PathBuf, thread_id: ThreadId) -> SessionSelection { - let target_session = SessionTarget { path, thread_id }; + fn selection( + self, + path: PathBuf, + rollout_path: Option, + thread_id: ThreadId, + cwd: Option, + ) -> SessionSelection { + let target_session = SessionTarget { + path, + rollout_path, + thread_id, + cwd, + }; match self { SessionPickerAction::Resume => SessionSelection::Resume(target_session), SessionPickerAction::Fork => SessionSelection::Fork(target_session), @@ -135,6 +158,31 @@ pub async fn run_fork_picker( run_session_picker(tui, config, show_all, SessionPickerAction::Fork).await } +pub async fn run_remote_resume_picker( + tui: &mut Tui, + app_server: &AppServerClient, + config: &Config, + show_all: bool, +) -> Result { + run_remote_session_picker( + tui, + app_server, + config, + show_all, + SessionPickerAction::Resume, + ) + .await +} + +pub async fn run_remote_fork_picker( + tui: &mut Tui, + app_server: &AppServerClient, + config: &Config, + show_all: bool, +) -> Result { + run_remote_session_picker(tui, app_server, config, show_all, SessionPickerAction::Fork).await +} + async fn run_session_picker( tui: &mut Tui, config: &Config, @@ -227,6 +275,62 @@ async fn run_session_picker( Ok(SessionSelection::StartFresh) } +async fn run_remote_session_picker( + tui: &mut Tui, + app_server: &AppServerClient, + config: &Config, + show_all: bool, + action: SessionPickerAction, +) -> Result { + let alt = AltScreenGuard::enter(tui); + let filter_cwd = if show_all { + None + } else { + Some(config.cwd.to_path_buf()) + }; + let threads = fetch_remote_threads(app_server, config, show_all, None).await?; + + let no_op_loader: PageLoader = Arc::new(|_request: PageLoadRequest| {}); + let mut state = PickerState::new( + config.codex_home.clone(), + alt.tui.frame_requester(), + no_op_loader, + config.model_provider_id.to_string(), + show_all, + filter_cwd, + action, + ); + state.all_rows = threads.into_iter().map(remote_thread_to_row).collect(); + state.apply_filter(); + state.update_thread_names().await; + state.request_frame(); + + let mut tui_events = alt.tui.event_stream().fuse(); + loop { + match tui_events.next().await { + Some(TuiEvent::Key(key)) => { + if matches!(key.kind, KeyEventKind::Release) { + continue; + } + if let Some(selection) = state.handle_key(key).await? { + return Ok(selection); + } + } + Some(TuiEvent::Draw) => { + if let Ok(size) = alt.tui.terminal.size() { + let list_height = size.height.saturating_sub(4) as usize; + state.update_view_rows(list_height); + } + draw_picker(alt.tui, &state)?; + } + Some(_) => {} + None => break, + } + } + + Ok(SessionSelection::StartFresh) +} + /// Returns the human-readable column header for the given sort key. fn sort_key_label(sort_key: ThreadSortKey) -> &'static str { match sort_key { @@ -329,6 +433,7 @@ impl SearchState { #[derive(Clone)] struct Row { path: PathBuf, + rollout_path: Option, preview: String, thread_id: Option, thread_name: Option, @@ -419,7 +524,12 @@ impl PickerState { None => crate::resolve_session_thread_id(path.as_path(), None).await, }; if let Some(thread_id) = thread_id { - return Ok(Some(self.action.selection(path, thread_id))); + return Ok(Some(self.action.selection( + path, + row.rollout_path.clone(), + thread_id, + row.cwd.clone(), + ))); } self.inline_error = Some(format!( "Failed to read session metadata from {}", @@ -837,6 +947,7 @@ fn head_to_row(item: &ThreadItem) -> Row { Row { path: item.path.clone(), + rollout_path: Some(item.path.clone()), preview, thread_id: item.thread_id, thread_name: None, @@ -863,6 +974,112 @@ fn parse_timestamp_str(ts: &str) -> Option> { .ok() } +fn remote_thread_to_row(thread: Thread) -> Row { + let thread_id = ThreadId::from_string(&thread.id).ok(); + let rollout_path = thread.path.clone(); + let path = rollout_path + .clone() + .unwrap_or_else(|| PathBuf::from(format!(".codex-remote-threads/{}", thread.id))); + Row { + path, + rollout_path, + preview: if thread.preview.trim().is_empty() { + "(no message yet)".to_string() + } else { + thread.preview + }, + thread_id, + thread_name: thread.name, + created_at: DateTime::::from_timestamp(thread.created_at, 0), + updated_at: DateTime::::from_timestamp(thread.updated_at, 0), + cwd: Some(thread.cwd), + git_branch: thread.git_info.and_then(|info| info.branch), + } +} + +async fn fetch_remote_threads( + app_server: &AppServerClient, + config: &Config, + show_all: bool, + search_term: Option, +) -> Result> { + let response: ThreadListResponse = app_server + .request_typed(ClientRequest::ThreadList { + request_id: RequestId::Integer(0), + params: ThreadListParams { + cursor: None, + limit: Some(100), + sort_key: Some(ApiThreadSortKey::UpdatedAt), + model_providers: Some(vec![config.model_provider_id.to_string()]), + source_kinds: Some(vec![ThreadSourceKind::Cli]), + archived: Some(false), + cwd: (!show_all).then(|| config.cwd.to_string_lossy().to_string()), + search_term, + }, + }) + .await + .map_err(color_eyre::Report::from)?; + Ok(response.data) +} + +pub async fn find_remote_session_target( + app_server: &AppServerClient, + config: &Config, + id_or_name: &str, +) -> Result> { + if let Ok(thread_id) = ThreadId::from_string(id_or_name) { + let response: ThreadReadResponse = app_server + .request_typed(ClientRequest::ThreadRead { + request_id: RequestId::Integer(0), + params: ThreadReadParams { + thread_id: thread_id.to_string(), + include_turns: false, + }, + }) + .await + .map_err(color_eyre::Report::from)?; + let row = remote_thread_to_row(response.thread); + return Ok(Some(SessionTarget { + path: row.path, + rollout_path: row.rollout_path, + thread_id, + cwd: row.cwd, + })); + } + + let threads = + fetch_remote_threads(app_server, config, true, Some(id_or_name.to_string())).await?; + Ok(threads.into_iter().find_map(|thread| { + if thread.name.as_deref() != Some(id_or_name) { + return None; + } + let row = remote_thread_to_row(thread); + Some(SessionTarget { + path: row.path, + rollout_path: row.rollout_path, + thread_id: row.thread_id?, + cwd: row.cwd, + }) + })) +} + +pub async fn latest_remote_session_target( + app_server: &AppServerClient, + config: &Config, + show_all: bool, +) -> Result> { + let threads = fetch_remote_threads(app_server, config, show_all, None).await?; + Ok(threads.into_iter().next().and_then(|thread| { + let row = remote_thread_to_row(thread); + Some(SessionTarget { + path: row.path, + rollout_path: row.rollout_path, + thread_id: row.thread_id?, + cwd: row.cwd, + }) + })) +} + fn draw_picker(tui: &mut Tui, state: &PickerState) -> std::io::Result<()> { // Render full-screen overlay let height = tui.terminal.size()?.height; @@ -1543,6 +1760,7 @@ mod tests { fn row_display_preview_prefers_thread_name() { let row = Row { path: PathBuf::from("/tmp/a.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/a.jsonl")), preview: String::from("first message"), thread_id: None, thread_name: Some(String::from("My session")), @@ -1577,6 +1795,7 @@ mod tests { let rows = vec![ Row { path: PathBuf::from("/tmp/a.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/a.jsonl")), preview: String::from("Fix resume picker timestamps"), thread_id: None, thread_name: None, @@ -1587,6 +1806,7 @@ mod tests { }, Row { path: PathBuf::from("/tmp/b.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/b.jsonl")), preview: String::from("Investigate lazy pagination cap"), thread_id: None, thread_name: None, @@ -1597,6 +1817,7 @@ mod tests { }, Row { path: PathBuf::from("/tmp/c.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/c.jsonl")), preview: String::from("Explain the codebase"), thread_id: None, thread_name: None, @@ -1891,6 +2112,7 @@ mod tests { let rows = vec![ Row { path: PathBuf::from("/tmp/a.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/a.jsonl")), preview: String::from("First message preview"), thread_id: Some(id1), thread_name: None, @@ -1901,6 +2123,7 @@ mod tests { }, Row { path: PathBuf::from("/tmp/b.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/b.jsonl")), preview: String::from("Second message preview"), thread_id: Some(id2), thread_name: None, @@ -2181,6 +2404,7 @@ mod tests { let row = Row { path: PathBuf::from("/tmp/missing.jsonl"), + rollout_path: Some(PathBuf::from("/tmp/missing.jsonl")), preview: String::from("missing metadata"), thread_id: None, thread_name: None,