mirror of
https://github.com/openai/codex.git
synced 2026-05-05 05:42:33 +03:00
Added remote capability
This commit is contained in:
@@ -18,11 +18,14 @@ codex-arg0 = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "time", "rt"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
|
||||
//! surfaces as channel-full errors rather than unbounded memory growth.
|
||||
|
||||
mod remote;
|
||||
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
@@ -35,8 +37,11 @@ use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
@@ -53,6 +58,9 @@ use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
pub use crate::remote::RemoteAppServerClient;
|
||||
pub use crate::remote::RemoteAppServerConnectArgs;
|
||||
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Raw app-server request result for typed in-process requests.
|
||||
@@ -62,6 +70,30 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// `MessageProcessor` continues to produce that shape internally.
|
||||
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppServerEvent {
|
||||
Lagged { skipped: usize },
|
||||
ServerNotification(ServerNotification),
|
||||
LegacyNotification(JSONRPCNotification),
|
||||
ServerRequest(ServerRequest),
|
||||
Disconnected { message: String },
|
||||
}
|
||||
|
||||
impl From<InProcessServerEvent> for AppServerEvent {
|
||||
fn from(value: InProcessServerEvent) -> Self {
|
||||
match value {
|
||||
InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped },
|
||||
InProcessServerEvent::ServerNotification(notification) => {
|
||||
Self::ServerNotification(notification)
|
||||
}
|
||||
InProcessServerEvent::LegacyNotification(notification) => {
|
||||
Self::LegacyNotification(notification)
|
||||
}
|
||||
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn local_external_chatgpt_tokens(
|
||||
config: &Config,
|
||||
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
|
||||
@@ -709,6 +741,113 @@ impl InProcessAppServerClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum AppServerClient {
|
||||
InProcess(InProcessAppServerClient),
|
||||
Remote(RemoteAppServerClient),
|
||||
}
|
||||
|
||||
impl AppServerClient {
|
||||
pub fn in_process(client: InProcessAppServerClient) -> Self {
|
||||
Self::InProcess(client)
|
||||
}
|
||||
|
||||
pub async fn connect_remote(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
|
||||
RemoteAppServerClient::connect(args).await.map(Self::Remote)
|
||||
}
|
||||
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.request(request).await,
|
||||
Self::Remote(client) => client.request(request).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
match self {
|
||||
Self::InProcess(client) => client.request_typed(request).await,
|
||||
Self::Remote(client) => client.request_typed(request).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.notify(notification).await,
|
||||
Self::Remote(client) => client.notify(notification).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.resolve_server_request(request_id, result).await,
|
||||
Self::Remote(client) => client.resolve_server_request(request_id, result).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.reject_server_request(request_id, error).await,
|
||||
Self::Remote(client) => client.reject_server_request(request_id, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.next_event().await.map(Into::into),
|
||||
Self::Remote(client) => client.next_event().await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next_typed_event(&mut self) -> Option<AppServerEvent> {
|
||||
loop {
|
||||
match self.next_event().await {
|
||||
Some(AppServerEvent::LegacyNotification(notification)) => {
|
||||
warn!(
|
||||
notification.method = %notification.method,
|
||||
"dropping legacy app-server notification"
|
||||
);
|
||||
}
|
||||
Some(event) => return Some(event),
|
||||
None => return None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn submit_legacy_thread_op(
|
||||
&self,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
op: codex_protocol::protocol::Op,
|
||||
) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.submit_legacy_thread_op(thread_id, op).await,
|
||||
Self::Remote(_) => Err(IoError::other(
|
||||
"legacy TUI operation is not supported over remote app-server transport",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(client) => client.shutdown().await,
|
||||
Self::Remote(client) => client.shutdown().await,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_remote(&self) -> bool {
|
||||
matches!(self, Self::Remote(_))
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the JSON-RPC method name for diagnostics without extending the
|
||||
/// protocol crate with in-process-only helpers.
|
||||
fn request_method_name(request: &ClientRequest) -> String {
|
||||
|
||||
807
codex-rs/app-server-client/src/remote.rs
Normal file
807
codex-rs/app-server-client/src/remote.rs
Normal file
@@ -0,0 +1,807 @@
|
||||
/*
|
||||
This module implements the websocket-backed app-server client transport.
|
||||
|
||||
It owns the remote connection lifecycle, including the initialize/initialized
|
||||
handshake, JSON-RPC request/response routing, server-request resolution, and
|
||||
notification streaming.
|
||||
*/
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::AppServerEvent;
|
||||
use crate::RequestResult;
|
||||
use crate::SHUTDOWN_TIMEOUT;
|
||||
use crate::TypedRequestError;
|
||||
use crate::request_method_name;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::MaybeTlsStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::warn;
|
||||
use url::Url;
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RemoteAppServerConnectArgs {
|
||||
pub websocket_url: String,
|
||||
pub client_name: String,
|
||||
pub client_version: String,
|
||||
pub experimental_api: bool,
|
||||
pub opt_out_notification_methods: Vec<String>,
|
||||
pub channel_capacity: usize,
|
||||
}
|
||||
|
||||
impl RemoteAppServerConnectArgs {
|
||||
fn initialize_params(&self) -> InitializeParams {
|
||||
let capabilities = InitializeCapabilities {
|
||||
experimental_api: self.experimental_api,
|
||||
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.opt_out_notification_methods.clone())
|
||||
},
|
||||
};
|
||||
|
||||
InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: self.client_name.clone(),
|
||||
title: None,
|
||||
version: self.client_version.clone(),
|
||||
},
|
||||
capabilities: Some(capabilities),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum RemoteClientCommand {
|
||||
Request {
|
||||
request: Box<ClientRequest>,
|
||||
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
||||
},
|
||||
Notify {
|
||||
notification: ClientNotification,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
ResolveServerRequest {
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
RejectServerRequest {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
Shutdown {
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct RemoteAppServerClient {
|
||||
command_tx: mpsc::Sender<RemoteClientCommand>,
|
||||
event_rx: mpsc::Receiver<AppServerEvent>,
|
||||
worker_handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl RemoteAppServerClient {
|
||||
pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let url = Url::parse(&websocket_url).map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
||||
)
|
||||
})?;
|
||||
let stream = timeout(CONNECT_TIMEOUT, connect_async(url.as_str()))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out connecting to remote app server at `{websocket_url}`"),
|
||||
)
|
||||
})?
|
||||
.map(|(stream, _response)| stream)
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to connect to remote app server at `{websocket_url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
let mut stream = stream;
|
||||
initialize_remote_connection(
|
||||
&mut stream,
|
||||
&websocket_url,
|
||||
args.initialize_params(),
|
||||
INITIALIZE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (command_tx, mut command_rx) = mpsc::channel::<RemoteClientCommand>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<AppServerEvent>(channel_capacity);
|
||||
let worker_handle = tokio::spawn(async move {
|
||||
let mut pending_requests =
|
||||
HashMap::<RequestId, oneshot::Sender<IoResult<RequestResult>>>::new();
|
||||
let mut skipped_events = 0usize;
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = command_rx.recv() => {
|
||||
let Some(command) = command else {
|
||||
let _ = stream.close(None).await;
|
||||
break;
|
||||
};
|
||||
match command {
|
||||
RemoteClientCommand::Request { request, response_tx } => {
|
||||
let request_id = request_id_from_client_request(&request);
|
||||
if let Some(previous) = pending_requests.insert(request_id.clone(), response_tx) {
|
||||
let _ = previous.send(Err(IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("duplicate remote app-server request id `{request_id}`"),
|
||||
)));
|
||||
}
|
||||
if let Err(err) = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
|
||||
&websocket_url,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err_message = err.to_string();
|
||||
if let Some(response_tx) = pending_requests.remove(&request_id) {
|
||||
let _ = response_tx.send(Err(err));
|
||||
}
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` write failed: {err_message}"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
RemoteClientCommand::Notify { notification, response_tx } => {
|
||||
let result = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Notification(
|
||||
jsonrpc_notification_from_client_notification(notification),
|
||||
),
|
||||
&websocket_url,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
RemoteClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
} => {
|
||||
let result = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request_id,
|
||||
result,
|
||||
}),
|
||||
&websocket_url,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
RemoteClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
} => {
|
||||
let result = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Error(JSONRPCError {
|
||||
error,
|
||||
id: request_id,
|
||||
}),
|
||||
&websocket_url,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
RemoteClientCommand::Shutdown { response_tx } => {
|
||||
let close_result = stream.close(None).await.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to close websocket app server `{websocket_url}`: {err}"
|
||||
))
|
||||
});
|
||||
let _ = response_tx.send(close_result);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
message = stream.next() => {
|
||||
match message {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
match serde_json::from_str::<JSONRPCMessage>(&text) {
|
||||
Ok(JSONRPCMessage::Response(response)) => {
|
||||
if let Some(response_tx) = pending_requests.remove(&response.id) {
|
||||
let _ = response_tx.send(Ok(Ok(response.result)));
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Error(error)) => {
|
||||
if let Some(response_tx) = pending_requests.remove(&error.id) {
|
||||
let _ = response_tx.send(Ok(Err(error.error)));
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Notification(notification)) => {
|
||||
let event = match ServerNotification::try_from(notification.clone()) {
|
||||
Ok(notification) => AppServerEvent::ServerNotification(notification),
|
||||
Err(_) => AppServerEvent::LegacyNotification(notification),
|
||||
};
|
||||
if let Err(err) = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
event,
|
||||
&mut stream,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(%err, "failed to deliver remote app-server event");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Request(request)) => {
|
||||
let request_id = request.id.clone();
|
||||
let method = request.method.clone();
|
||||
match ServerRequest::try_from(request) {
|
||||
Ok(request) => {
|
||||
if let Err(err) = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::ServerRequest(request),
|
||||
&mut stream,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(%err, "failed to deliver remote app-server server request");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, method, "rejecting unknown remote app-server request");
|
||||
if let Err(reject_err) = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Error(JSONRPCError {
|
||||
error: JSONRPCErrorError {
|
||||
code: -32601,
|
||||
message: format!(
|
||||
"unsupported remote app-server request `{method}`"
|
||||
),
|
||||
data: None,
|
||||
},
|
||||
id: request_id,
|
||||
}),
|
||||
&websocket_url,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err_message = reject_err.to_string();
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` write failed: {err_message}"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Close(frame))) => {
|
||||
let reason = frame
|
||||
.as_ref()
|
||||
.map(|frame| frame.reason.to_string())
|
||||
.filter(|reason| !reason.is_empty())
|
||||
.unwrap_or_else(|| "connection closed".to_string());
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` disconnected: {reason}"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
Some(Ok(Message::Binary(_)))
|
||||
| Some(Ok(Message::Ping(_)))
|
||||
| Some(Ok(Message::Pong(_)))
|
||||
| Some(Ok(Message::Frame(_))) => {}
|
||||
Some(Err(err)) => {
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` transport failed: {err}"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
AppServerEvent::Disconnected {
|
||||
message: format!(
|
||||
"remote app server at `{websocket_url}` closed the connection"
|
||||
),
|
||||
},
|
||||
&mut stream,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let err = IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
);
|
||||
for (_, response_tx) in pending_requests {
|
||||
let _ = response_tx.send(Err(IoError::new(err.kind(), err.to_string())));
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::Request {
|
||||
request: Box::new(request),
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server request channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
let method = request_method_name(&request);
|
||||
let response =
|
||||
self.request(request)
|
||||
.await
|
||||
.map_err(|source| TypedRequestError::Transport {
|
||||
method: method.clone(),
|
||||
source,
|
||||
})?;
|
||||
let result = response.map_err(|source| TypedRequestError::Server {
|
||||
method: method.clone(),
|
||||
source,
|
||||
})?;
|
||||
serde_json::from_value(result)
|
||||
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
||||
}
|
||||
|
||||
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::Notify {
|
||||
notification,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server notify channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server resolve channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server reject channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
||||
self.event_rx.recv().await
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) -> IoResult<()> {
|
||||
let Self {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
} = self;
|
||||
let mut worker_handle = worker_handle;
|
||||
drop(event_rx);
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
if command_tx
|
||||
.send(RemoteClientCommand::Shutdown { response_tx })
|
||||
.await
|
||||
.is_ok()
|
||||
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
||||
{
|
||||
command_result.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server shutdown channel is closed",
|
||||
)
|
||||
})??;
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
||||
worker_handle.abort();
|
||||
let _ = worker_handle.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialize_remote_connection(
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
websocket_url: &str,
|
||||
params: InitializeParams,
|
||||
initialize_timeout: Duration,
|
||||
) -> IoResult<()> {
|
||||
let initialize_request_id = RequestId::String("initialize".to_string());
|
||||
write_jsonrpc_message(
|
||||
stream,
|
||||
JSONRPCMessage::Request(jsonrpc_request_from_client_request(
|
||||
ClientRequest::Initialize {
|
||||
request_id: initialize_request_id.clone(),
|
||||
params,
|
||||
},
|
||||
)),
|
||||
websocket_url,
|
||||
)
|
||||
.await?;
|
||||
|
||||
timeout(initialize_timeout, async {
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
let message = serde_json::from_str::<JSONRPCMessage>(&text).map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` sent invalid initialize response: {err}"
|
||||
))
|
||||
})?;
|
||||
match message {
|
||||
JSONRPCMessage::Response(response) if response.id == initialize_request_id => {
|
||||
break Ok(());
|
||||
}
|
||||
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
|
||||
break Err(IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` rejected initialize: {}",
|
||||
error.error.message
|
||||
)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Binary(_)))
|
||||
| Some(Ok(Message::Ping(_)))
|
||||
| Some(Ok(Message::Pong(_)))
|
||||
| Some(Ok(Message::Frame(_))) => {}
|
||||
Some(Ok(Message::Close(frame))) => {
|
||||
let reason = frame
|
||||
.as_ref()
|
||||
.map(|frame| frame.reason.to_string())
|
||||
.filter(|reason| !reason.is_empty())
|
||||
.unwrap_or_else(|| "connection closed during initialize".to_string());
|
||||
break Err(IoError::new(
|
||||
ErrorKind::ConnectionAborted,
|
||||
format!(
|
||||
"remote app server at `{websocket_url}` closed during initialize: {reason}"
|
||||
),
|
||||
));
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
break Err(IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` transport failed during initialize: {err}"
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
break Err(IoError::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
format!("remote app server at `{websocket_url}` closed during initialize"),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out waiting for initialize response from `{websocket_url}`"),
|
||||
)
|
||||
})??;
|
||||
|
||||
write_jsonrpc_message(
|
||||
stream,
|
||||
JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification(
|
||||
ClientNotification::Initialized,
|
||||
)),
|
||||
websocket_url,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn deliver_event(
|
||||
event_tx: &mpsc::Sender<AppServerEvent>,
|
||||
skipped_events: &mut usize,
|
||||
event: AppServerEvent,
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
) -> IoResult<()> {
|
||||
if *skipped_events > 0 {
|
||||
if event_requires_delivery(&event) {
|
||||
if event_tx
|
||||
.send(AppServerEvent::Lagged {
|
||||
skipped: *skipped_events,
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Err(IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server event consumer channel is closed",
|
||||
));
|
||||
}
|
||||
*skipped_events = 0;
|
||||
} else {
|
||||
match event_tx.try_send(AppServerEvent::Lagged {
|
||||
skipped: *skipped_events,
|
||||
}) {
|
||||
Ok(()) => *skipped_events = 0,
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
*skipped_events = (*skipped_events).saturating_add(1);
|
||||
reject_if_server_request_dropped(stream, &event).await?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
return Err(IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server event consumer channel is closed",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if event_requires_delivery(&event) {
|
||||
event_tx.send(event).await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server event consumer channel is closed",
|
||||
)
|
||||
})?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match event_tx.try_send(event) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(mpsc::error::TrySendError::Full(event)) => {
|
||||
*skipped_events = (*skipped_events).saturating_add(1);
|
||||
reject_if_server_request_dropped(stream, &event).await
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server event consumer channel is closed",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn reject_if_server_request_dropped(
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
event: &AppServerEvent,
|
||||
) -> IoResult<()> {
|
||||
let AppServerEvent::ServerRequest(request) = event else {
|
||||
return Ok(());
|
||||
};
|
||||
write_jsonrpc_message(
|
||||
stream,
|
||||
JSONRPCMessage::Error(JSONRPCError {
|
||||
error: JSONRPCErrorError {
|
||||
code: -32001,
|
||||
message: "remote app-server event queue is full".to_string(),
|
||||
data: None,
|
||||
},
|
||||
id: request.id().clone(),
|
||||
}),
|
||||
"<remote-app-server>",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn event_requires_delivery(event: &AppServerEvent) -> bool {
|
||||
match event {
|
||||
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
|
||||
AppServerEvent::LegacyNotification(notification) => matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
),
|
||||
AppServerEvent::Disconnected { .. } => true,
|
||||
AppServerEvent::Lagged { .. }
|
||||
| AppServerEvent::ServerNotification(_)
|
||||
| AppServerEvent::ServerRequest(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
|
||||
jsonrpc_request_from_client_request(request.clone()).id
|
||||
}
|
||||
|
||||
fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest {
|
||||
let value = match serde_json::to_value(request) {
|
||||
Ok(value) => value,
|
||||
Err(err) => panic!("client request should serialize: {err}"),
|
||||
};
|
||||
match serde_json::from_value(value) {
|
||||
Ok(request) => request,
|
||||
Err(err) => panic!("client request should encode as JSON-RPC request: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn jsonrpc_notification_from_client_notification(
|
||||
notification: ClientNotification,
|
||||
) -> JSONRPCNotification {
|
||||
let value = match serde_json::to_value(notification) {
|
||||
Ok(value) => value,
|
||||
Err(err) => panic!("client notification should serialize: {err}"),
|
||||
};
|
||||
match serde_json::from_value(value) {
|
||||
Ok(notification) => notification,
|
||||
Err(err) => panic!("client notification should encode as JSON-RPC notification: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_jsonrpc_message(
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
message: JSONRPCMessage,
|
||||
websocket_url: &str,
|
||||
) -> IoResult<()> {
|
||||
let payload = serde_json::to_string(&message).map_err(IoError::other)?;
|
||||
stream
|
||||
.send(Message::Text(payload.into()))
|
||||
.await
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to write websocket message to `{websocket_url}`: {err}"
|
||||
))
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user