Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
08446a0c15 exec-server: simplify transport and dispatch shape
Co-authored-by: Codex <noreply@openai.com>
2026-03-18 23:14:02 +00:00
8 changed files with 2406 additions and 0 deletions

View File

@@ -0,0 +1,876 @@
use std::collections::HashMap;
use std::sync::Arc;
#[cfg(test)]
use std::sync::Mutex as StdMutex;
#[cfg(test)]
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde::Serialize;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerEvent;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::FS_COPY_METHOD;
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
use crate::protocol::FS_GET_METADATA_METHOD;
use crate::protocol::FS_READ_DIRECTORY_METHOD;
use crate::protocol::FS_READ_FILE_METHOD;
use crate::protocol::FS_REMOVE_METHOD;
use crate::protocol::FS_WRITE_FILE_METHOD;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::server::ExecServerHandler;
use crate::server::ExecServerServerNotification;
impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
Self {
client_name: "codex-core".to_string(),
auth_token: None,
initialize_timeout: INITIALIZE_TIMEOUT,
}
}
}
impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: RemoteExecServerConnectArgs) -> Self {
Self {
client_name: value.client_name,
auth_token: value.auth_token,
initialize_timeout: value.initialize_timeout,
}
}
}
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
impl RemoteExecServerConnectArgs {
pub fn new(websocket_url: String, client_name: String) -> Self {
Self {
websocket_url,
client_name,
auth_token: None,
connect_timeout: CONNECT_TIMEOUT,
initialize_timeout: INITIALIZE_TIMEOUT,
}
}
}
#[cfg(test)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct ExecServerOutput {
stream: crate::protocol::ExecOutputStream,
chunk: Vec<u8>,
}
#[cfg(test)]
struct ExecServerProcess {
process_id: String,
output_rx: broadcast::Receiver<ExecServerOutput>,
status: Arc<RemoteProcessStatus>,
}
#[cfg(test)]
impl ExecServerProcess {
fn output_receiver(&self) -> broadcast::Receiver<ExecServerOutput> {
self.output_rx.resubscribe()
}
fn has_exited(&self) -> bool {
self.status.has_exited()
}
}
#[cfg(test)]
struct RemoteProcessStatus {
exited: AtomicBool,
exit_code: StdMutex<Option<i32>>,
}
#[cfg(test)]
impl RemoteProcessStatus {
fn new() -> Self {
Self {
exited: AtomicBool::new(false),
exit_code: StdMutex::new(None),
}
}
fn has_exited(&self) -> bool {
self.exited.load(Ordering::SeqCst)
}
fn mark_exited(&self, exit_code: Option<i32>) {
self.exited.store(true, Ordering::SeqCst);
if let Ok(mut guard) = self.exit_code.lock() {
*guard = exit_code;
}
}
}
enum PendingRequest {
Initialize(oneshot::Sender<Result<InitializeResponse, JSONRPCErrorError>>),
Exec(oneshot::Sender<Result<ExecResponse, JSONRPCErrorError>>),
Read(oneshot::Sender<Result<ReadResponse, JSONRPCErrorError>>),
Write(oneshot::Sender<Result<WriteResponse, JSONRPCErrorError>>),
Terminate(oneshot::Sender<Result<TerminateResponse, JSONRPCErrorError>>),
FsReadFile(oneshot::Sender<Result<FsReadFileResponse, JSONRPCErrorError>>),
FsWriteFile(oneshot::Sender<Result<FsWriteFileResponse, JSONRPCErrorError>>),
FsCreateDirectory(oneshot::Sender<Result<FsCreateDirectoryResponse, JSONRPCErrorError>>),
FsGetMetadata(oneshot::Sender<Result<FsGetMetadataResponse, JSONRPCErrorError>>),
FsReadDirectory(oneshot::Sender<Result<FsReadDirectoryResponse, JSONRPCErrorError>>),
FsRemove(oneshot::Sender<Result<FsRemoveResponse, JSONRPCErrorError>>),
FsCopy(oneshot::Sender<Result<FsCopyResponse, JSONRPCErrorError>>),
}
impl PendingRequest {
fn resolve_json(self, result: Value) -> Result<(), ExecServerError> {
match self {
PendingRequest::Initialize(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::Exec(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::Read(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::Write(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::Terminate(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsReadFile(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsWriteFile(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsCreateDirectory(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsGetMetadata(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsReadDirectory(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsRemove(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
PendingRequest::FsCopy(tx) => {
let _ = tx.send(Ok(serde_json::from_value(result)?));
}
}
Ok(())
}
fn resolve_error(self, error: JSONRPCErrorError) {
match self {
PendingRequest::Initialize(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::Exec(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::Read(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::Write(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::Terminate(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsReadFile(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsWriteFile(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsCreateDirectory(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsGetMetadata(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsReadDirectory(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsRemove(tx) => {
let _ = tx.send(Err(error));
}
PendingRequest::FsCopy(tx) => {
let _ = tx.send(Err(error));
}
}
}
}
enum ClientBackend {
JsonRpc {
write_tx: mpsc::Sender<JSONRPCMessage>,
},
InProcess {
handler: Arc<Mutex<ExecServerHandler>>,
},
}
struct Inner {
backend: ClientBackend,
pending: Mutex<HashMap<RequestId, PendingRequest>>,
events_tx: broadcast::Sender<ExecServerEvent>,
next_request_id: AtomicI64,
transport_tasks: Vec<JoinHandle<()>>,
reader_task: JoinHandle<()>,
}
impl Drop for Inner {
fn drop(&mut self) {
if let ClientBackend::InProcess { handler } = &self.backend
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
let handler = Arc::clone(handler);
handle.spawn(async move {
handler.lock().await.shutdown().await;
});
}
for task in &self.transport_tasks {
task.abort();
}
self.reader_task.abort();
}
}
#[derive(Clone)]
pub struct ExecServerClient {
inner: Arc<Inner>,
}
#[derive(Debug, thiserror::Error)]
pub enum ExecServerError {
#[error("failed to spawn exec-server: {0}")]
Spawn(#[source] std::io::Error),
#[error("timed out connecting to exec-server websocket `{url}` after {timeout:?}")]
WebSocketConnectTimeout { url: String, timeout: Duration },
#[error("failed to connect to exec-server websocket `{url}`: {source}")]
WebSocketConnect {
url: String,
#[source]
source: tokio_tungstenite::tungstenite::Error,
},
#[error("timed out waiting for exec-server initialize handshake after {timeout:?}")]
InitializeTimedOut { timeout: Duration },
#[error("exec-server transport closed")]
Closed,
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("exec-server protocol error: {0}")]
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]
Server { code: i64, message: String },
}
impl ExecServerClient {
pub async fn connect_in_process(
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError> {
let (notification_tx, mut notification_rx) =
mpsc::channel::<ExecServerServerNotification>(256);
let handler = Arc::new(Mutex::new(ExecServerHandler::new(notification_tx, None)));
let inner = Arc::new_cyclic(|weak| {
let weak = weak.clone();
let reader_task = tokio::spawn(async move {
while let Some(notification) = notification_rx.recv().await {
if let Some(inner) = weak.upgrade() {
handle_in_process_notification(&inner, notification).await;
}
}
if let Some(inner) = weak.upgrade() {
handle_transport_shutdown(&inner).await;
}
});
Inner {
backend: ClientBackend::InProcess { handler },
pending: Mutex::new(HashMap::new()),
events_tx: broadcast::channel(256).0,
next_request_id: AtomicI64::new(1),
transport_tasks: Vec::new(),
reader_task,
}
});
let client = Self { inner };
client.initialize(options).await?;
Ok(client)
}
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
let websocket_url = args.websocket_url.clone();
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
stream,
format!("exec-server websocket {websocket_url}"),
),
args.into(),
)
.await
}
async fn connect(
connection: JsonRpcConnection,
options: ExecServerClientConnectOptions,
) -> Result<Self, ExecServerError> {
let (write_tx, mut incoming_rx, transport_tasks) = connection.into_parts();
let inner = Arc::new_cyclic(|weak| {
let weak = weak.clone();
let reader_task = tokio::spawn(async move {
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::Message(message) => {
if let Some(inner) = weak.upgrade()
&& let Err(err) = handle_server_message(&inner, message).await
{
warn!("exec-server client closing after protocol error: {err}");
handle_transport_shutdown(&inner).await;
return;
}
}
JsonRpcConnectionEvent::Disconnected { reason } => {
if let Some(reason) = reason {
warn!("exec-server client transport disconnected: {reason}");
}
if let Some(inner) = weak.upgrade() {
handle_transport_shutdown(&inner).await;
}
return;
}
}
}
if let Some(inner) = weak.upgrade() {
handle_transport_shutdown(&inner).await;
}
});
Inner {
backend: ClientBackend::JsonRpc { write_tx },
pending: Mutex::new(HashMap::new()),
events_tx: broadcast::channel(256).0,
next_request_id: AtomicI64::new(1),
transport_tasks,
reader_task,
}
});
let client = Self { inner };
client.initialize(options).await?;
Ok(client)
}
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
}
#[cfg(test)]
async fn start_process(
&self,
params: ExecParams,
) -> Result<ExecServerProcess, ExecServerError> {
let response = self.exec(params).await?;
let process_id = response.process_id;
let status = Arc::new(RemoteProcessStatus::new());
let (output_tx, output_rx) = broadcast::channel(256);
let mut events_rx = self.event_receiver();
let status_watcher = Arc::clone(&status);
let watch_process_id = process_id.clone();
tokio::spawn(async move {
while let Ok(event) = events_rx.recv().await {
match event {
ExecServerEvent::OutputDelta(notification)
if notification.process_id == watch_process_id =>
{
let _ = output_tx.send(ExecServerOutput {
stream: notification.stream,
chunk: notification.chunk.into_inner(),
});
}
ExecServerEvent::Exited(notification)
if notification.process_id == watch_process_id =>
{
status_watcher.mark_exited(Some(notification.exit_code));
break;
}
ExecServerEvent::OutputDelta(_) | ExecServerEvent::Exited(_) => {}
}
}
});
Ok(ExecServerProcess {
process_id,
output_rx,
status,
})
}
pub async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.request_exec(params).await
}
pub async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.request_read(params).await
}
pub async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
self.write_process(WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
})
.await
}
pub async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
self.terminate_session(process_id).await
}
pub async fn fs_read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_read_file(params).await);
}
self.send_pending_request(FS_READ_FILE_METHOD, &params, PendingRequest::FsReadFile)
.await
}
pub async fn fs_write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_write_file(params).await);
}
self.send_pending_request(FS_WRITE_FILE_METHOD, &params, PendingRequest::FsWriteFile)
.await
}
pub async fn fs_create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_create_directory(params).await);
}
self.send_pending_request(
FS_CREATE_DIRECTORY_METHOD,
&params,
PendingRequest::FsCreateDirectory,
)
.await
}
pub async fn fs_get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_get_metadata(params).await);
}
self.send_pending_request(
FS_GET_METADATA_METHOD,
&params,
PendingRequest::FsGetMetadata,
)
.await
}
pub async fn fs_read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_read_directory(params).await);
}
self.send_pending_request(
FS_READ_DIRECTORY_METHOD,
&params,
PendingRequest::FsReadDirectory,
)
.await
}
pub async fn fs_remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_remove(params).await);
}
self.send_pending_request(FS_REMOVE_METHOD, &params, PendingRequest::FsRemove)
.await
}
pub async fn fs_copy(&self, params: FsCopyParams) -> Result<FsCopyResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.fs_copy(params).await);
}
self.send_pending_request(FS_COPY_METHOD, &params, PendingRequest::FsCopy)
.await
}
async fn initialize(
&self,
options: ExecServerClientConnectOptions,
) -> Result<(), ExecServerError> {
let ExecServerClientConnectOptions {
client_name,
auth_token,
initialize_timeout,
} = options;
timeout(initialize_timeout, async {
let _: InitializeResponse = self
.request_initialize(InitializeParams {
client_name,
auth_token,
})
.await?;
self.notify(INITIALIZED_METHOD, &serde_json::json!({}))
.await
})
.await
.map_err(|_| ExecServerError::InitializeTimedOut {
timeout: initialize_timeout,
})?
}
async fn request_exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.exec(params).await);
}
self.send_pending_request(EXEC_METHOD, &params, PendingRequest::Exec)
.await
}
async fn write_process(&self, params: WriteParams) -> Result<WriteResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.write(params).await);
}
self.send_pending_request(EXEC_WRITE_METHOD, &params, PendingRequest::Write)
.await
}
async fn request_read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.read(params).await);
}
self.send_pending_request(EXEC_READ_METHOD, &params, PendingRequest::Read)
.await
}
async fn terminate_session(
&self,
process_id: &str,
) -> Result<TerminateResponse, ExecServerError> {
let params = TerminateParams {
process_id: process_id.to_string(),
};
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.terminate(params).await);
}
self.send_pending_request(EXEC_TERMINATE_METHOD, &params, PendingRequest::Terminate)
.await
}
async fn notify<P: Serialize>(&self, method: &str, params: &P) -> Result<(), ExecServerError> {
match &self.inner.backend {
ClientBackend::JsonRpc { write_tx } => {
let params = serde_json::to_value(params)?;
write_tx
.send(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
}))
.await
.map_err(|_| ExecServerError::Closed)
}
ClientBackend::InProcess { handler } => match method {
INITIALIZED_METHOD => handler
.lock()
.await
.initialized()
.map_err(ExecServerError::Protocol),
other => Err(ExecServerError::Protocol(format!(
"unsupported in-process notification method `{other}`"
))),
},
}
}
async fn request_initialize(
&self,
params: InitializeParams,
) -> Result<InitializeResponse, ExecServerError> {
if let ClientBackend::InProcess { handler } = &self.inner.backend {
return server_result_to_client(handler.lock().await.initialize(params));
}
self.send_pending_request(INITIALIZE_METHOD, &params, PendingRequest::Initialize)
.await
}
fn next_request_id(&self) -> RequestId {
RequestId::Integer(self.inner.next_request_id.fetch_add(1, Ordering::SeqCst))
}
async fn send_pending_request<P, T>(
&self,
method: &str,
params: &P,
build_pending: impl FnOnce(oneshot::Sender<Result<T, JSONRPCErrorError>>) -> PendingRequest,
) -> Result<T, ExecServerError>
where
P: Serialize,
{
let request_id = self.next_request_id();
let (response_tx, response_rx) = oneshot::channel();
self.inner
.pending
.lock()
.await
.insert(request_id.clone(), build_pending(response_tx));
let ClientBackend::JsonRpc { write_tx } = &self.inner.backend else {
unreachable!("in-process requests return before JSON-RPC setup");
};
let send_result = send_jsonrpc_request(write_tx, request_id.clone(), method, params).await;
self.finish_request(request_id, send_result, response_rx)
.await
}
async fn finish_request<T>(
&self,
request_id: RequestId,
send_result: Result<(), ExecServerError>,
response_rx: oneshot::Receiver<Result<T, JSONRPCErrorError>>,
) -> Result<T, ExecServerError> {
if let Err(err) = send_result {
self.inner.pending.lock().await.remove(&request_id);
return Err(err);
}
receive_typed_response(response_rx).await
}
}
async fn receive_typed_response<T>(
response_rx: oneshot::Receiver<Result<T, JSONRPCErrorError>>,
) -> Result<T, ExecServerError> {
let result = response_rx.await.map_err(|_| ExecServerError::Closed)?;
match result {
Ok(response) => Ok(response),
Err(error) => Err(ExecServerError::Server {
code: error.code,
message: error.message,
}),
}
}
fn server_result_to_client<T>(result: Result<T, JSONRPCErrorError>) -> Result<T, ExecServerError> {
match result {
Ok(response) => Ok(response),
Err(error) => Err(ExecServerError::Server {
code: error.code,
message: error.message,
}),
}
}
async fn send_jsonrpc_request<P: Serialize>(
write_tx: &mpsc::Sender<JSONRPCMessage>,
request_id: RequestId,
method: &str,
params: &P,
) -> Result<(), ExecServerError> {
let params = serde_json::to_value(params)?;
write_tx
.send(JSONRPCMessage::Request(JSONRPCRequest {
id: request_id,
method: method.to_string(),
params: Some(params),
trace: None,
}))
.await
.map_err(|_| ExecServerError::Closed)
}
async fn handle_in_process_notification(
inner: &Arc<Inner>,
notification: ExecServerServerNotification,
) {
match notification {
ExecServerServerNotification::OutputDelta(params) => {
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
ExecServerServerNotification::Exited(params) => {
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
}
}
async fn handle_server_message(
inner: &Arc<Inner>,
message: JSONRPCMessage,
) -> Result<(), ExecServerError> {
match message {
JSONRPCMessage::Response(JSONRPCResponse { id, result }) => {
if let Some(pending) = inner.pending.lock().await.remove(&id) {
pending.resolve_json(result)?;
}
}
JSONRPCMessage::Error(JSONRPCError { id, error }) => {
if let Some(pending) = inner.pending.lock().await.remove(&id) {
pending.resolve_error(error);
}
}
JSONRPCMessage::Notification(notification) => {
handle_server_notification(inner, notification).await?;
}
JSONRPCMessage::Request(request) => {
return Err(ExecServerError::Protocol(format!(
"unexpected exec-server request from remote server: {}",
request.method
)));
}
}
Ok(())
}
async fn handle_server_notification(
inner: &Arc<Inner>,
notification: JSONRPCNotification,
) -> Result<(), ExecServerError> {
match notification.method.as_str() {
EXEC_OUTPUT_DELTA_METHOD => {
let params: ExecOutputDeltaNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
}
other => {
debug!("ignoring unknown exec-server notification: {other}");
}
}
Ok(())
}
async fn handle_transport_shutdown(inner: &Arc<Inner>) {
let pending = {
let mut pending = inner.pending.lock().await;
pending
.drain()
.map(|(_, pending)| pending)
.collect::<Vec<_>>()
};
for pending in pending {
pending.resolve_error(JSONRPCErrorError {
code: -32000,
data: None,
message: "exec-server transport closed".to_string(),
});
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,37 @@
mod filesystem;
mod handler;
mod jsonrpc;
mod processor;
mod transport;
pub(crate) use handler::ExecServerHandler;
pub(crate) use handler::ExecServerServerNotification;
pub(crate) use jsonrpc::internal_error;
pub(crate) use jsonrpc::invalid_params;
pub(crate) use jsonrpc::invalid_request;
pub(crate) use jsonrpc::unauthorized;
pub use transport::ExecServerTransport;
pub use transport::ExecServerTransportParseError;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ExecServerConfig {
pub auth_token: Option<String>,
}
pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
run_main_with_transport_and_config(ExecServerTransport::default(), ExecServerConfig::default())
.await
}
pub async fn run_main_with_transport(
transport: ExecServerTransport,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
run_main_with_transport_and_config(transport, ExecServerConfig::default()).await
}
pub async fn run_main_with_transport_and_config(
transport: ExecServerTransport,
config: ExecServerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
transport::run_transport(transport, config).await
}

View File

@@ -0,0 +1,170 @@
use std::io;
use std::sync::Arc;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryEntry;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_environment::CopyOptions;
use codex_environment::CreateDirectoryOptions;
use codex_environment::Environment;
use codex_environment::ExecutorFileSystem;
use codex_environment::RemoveOptions;
use crate::server::internal_error;
use crate::server::invalid_request;
#[derive(Clone)]
pub(crate) struct ExecServerFileSystem {
file_system: Arc<dyn ExecutorFileSystem>,
}
impl Default for ExecServerFileSystem {
fn default() -> Self {
Self {
file_system: Environment::default().get_filesystem(),
}
}
}
impl ExecServerFileSystem {
pub(crate) async fn read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, JSONRPCErrorError> {
let bytes = self
.file_system
.read_file(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsReadFileResponse {
data_base64: STANDARD.encode(bytes),
})
}
pub(crate) async fn write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, JSONRPCErrorError> {
let bytes = STANDARD.decode(params.data_base64).map_err(|err| {
invalid_request(format!(
"fs/writeFile requires valid base64 dataBase64: {err}"
))
})?;
self.file_system
.write_file(&params.path, bytes)
.await
.map_err(map_fs_error)?;
Ok(FsWriteFileResponse {})
}
pub(crate) async fn create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, JSONRPCErrorError> {
self.file_system
.create_directory(
&params.path,
CreateDirectoryOptions {
recursive: params.recursive.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCreateDirectoryResponse {})
}
pub(crate) async fn get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, JSONRPCErrorError> {
let metadata = self
.file_system
.get_metadata(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsGetMetadataResponse {
is_directory: metadata.is_directory,
is_file: metadata.is_file,
created_at_ms: metadata.created_at_ms,
modified_at_ms: metadata.modified_at_ms,
})
}
pub(crate) async fn read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, JSONRPCErrorError> {
let entries = self
.file_system
.read_directory(&params.path)
.await
.map_err(map_fs_error)?;
Ok(FsReadDirectoryResponse {
entries: entries
.into_iter()
.map(|entry| FsReadDirectoryEntry {
file_name: entry.file_name,
is_directory: entry.is_directory,
is_file: entry.is_file,
})
.collect(),
})
}
pub(crate) async fn remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, JSONRPCErrorError> {
self.file_system
.remove(
&params.path,
RemoveOptions {
recursive: params.recursive.unwrap_or(true),
force: params.force.unwrap_or(true),
},
)
.await
.map_err(map_fs_error)?;
Ok(FsRemoveResponse {})
}
pub(crate) async fn copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, JSONRPCErrorError> {
self.file_system
.copy(
&params.source_path,
&params.destination_path,
CopyOptions {
recursive: params.recursive,
},
)
.await
.map_err(map_fs_error)?;
Ok(FsCopyResponse {})
}
}
fn map_fs_error(err: io::Error) -> JSONRPCErrorError {
if err.kind() == io::ErrorKind::InvalidInput {
invalid_request(err.to_string())
} else {
internal_error(err.to_string())
}
}

View File

@@ -0,0 +1,496 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsCreateDirectoryResponse;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsGetMetadataResponse;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadDirectoryResponse;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsReadFileResponse;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsRemoveResponse;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tracing::warn;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecResponse;
use crate::protocol::ExecSandboxMode;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
use crate::server::filesystem::ExecServerFileSystem;
use crate::server::internal_error;
use crate::server::invalid_params;
use crate::server::invalid_request;
use crate::server::unauthorized;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum ExecServerServerNotification {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}
#[derive(Clone)]
struct RetainedOutputChunk {
seq: u64,
stream: ExecOutputStream,
chunk: Vec<u8>,
}
struct RunningProcess {
session: ExecCommandSession,
tty: bool,
output: VecDeque<RetainedOutputChunk>,
retained_bytes: usize,
next_seq: u64,
exit_code: Option<i32>,
output_notify: Arc<Notify>,
}
pub(crate) struct ExecServerHandler {
notification_tx: mpsc::Sender<ExecServerServerNotification>,
file_system: ExecServerFileSystem,
required_auth_token: Option<String>,
// Keyed by client-chosen logical `processId` scoped to this connection.
// This is a protocol handle, not an OS pid.
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
initialize_requested: bool,
initialized: bool,
}
impl ExecServerHandler {
pub(crate) fn new(
notification_tx: mpsc::Sender<ExecServerServerNotification>,
required_auth_token: Option<String>,
) -> Self {
Self {
notification_tx,
file_system: ExecServerFileSystem::default(),
required_auth_token,
processes: Arc::new(Mutex::new(HashMap::new())),
initialize_requested: false,
initialized: false,
}
}
pub(crate) async fn shutdown(&self) {
let remaining = {
let mut processes = self.processes.lock().await;
processes
.drain()
.map(|(_, process)| process)
.collect::<Vec<_>>()
};
for process in remaining {
process.session.terminate();
}
}
pub(crate) fn initialized(&mut self) -> Result<(), String> {
if !self.initialize_requested {
return Err("received `initialized` notification before `initialize`".into());
}
self.initialized = true;
Ok(())
}
pub(crate) fn initialize(
&mut self,
params: crate::protocol::InitializeParams,
) -> Result<InitializeResponse, codex_app_server_protocol::JSONRPCErrorError> {
if self.initialize_requested {
return Err(invalid_request(
"initialize may only be sent once per connection".to_string(),
));
}
if let Some(required_auth_token) = &self.required_auth_token
&& params.auth_token.as_deref() != Some(required_auth_token.as_str())
{
return Err(unauthorized("invalid exec-server auth token".to_string()));
}
self.initialize_requested = true;
Ok(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
})
}
fn require_initialized(&self) -> Result<(), codex_app_server_protocol::JSONRPCErrorError> {
if !self.initialize_requested {
return Err(invalid_request(
"client must call initialize before using exec methods".to_string(),
));
}
if !self.initialized {
return Err(invalid_request(
"client must send initialized before using exec methods".to_string(),
));
}
Ok(())
}
pub(crate) async fn exec(
&self,
params: crate::protocol::ExecParams,
) -> Result<ExecResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let process_id = params.process_id.clone();
// Same-connection requests are serialized by the RPC processor, and the
// in-process client holds the handler mutex across this full call. That
// makes this pre-spawn duplicate check safe for the current entrypoints.
{
let process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
return Err(invalid_request(format!(
"process {process_id} already exists"
)));
}
}
if matches!(
params.sandbox.as_ref().map(|sandbox| sandbox.mode),
Some(ExecSandboxMode::HostDefault)
) {
return Err(invalid_request(
"sandbox mode `hostDefault` is not supported by exec-server yet".to_string(),
));
}
let (program, args) = params
.argv
.split_first()
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
let spawned = if params.tty {
codex_utils_pty::spawn_pty_process(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
TerminalSize::default(),
)
.await
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(
program,
args,
params.cwd.as_path(),
&params.env,
&params.arg0,
)
.await
}
.map_err(|err| internal_error(err.to_string()))?;
let output_notify = Arc::new(Notify::new());
{
let mut process_map = self.processes.lock().await;
process_map.insert(
process_id.clone(),
RunningProcess {
session: spawned.session,
tty: params.tty,
output: std::collections::VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
output_notify: Arc::clone(&output_notify),
},
);
}
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stdout
},
spawned.stdout_rx,
self.notification_tx.clone(),
Arc::clone(&self.processes),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
if params.tty {
ExecOutputStream::Pty
} else {
ExecOutputStream::Stderr
},
spawned.stderr_rx,
self.notification_tx.clone(),
Arc::clone(&self.processes),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id.clone(),
spawned.exit_rx,
self.notification_tx.clone(),
Arc::clone(&self.processes),
output_notify,
));
Ok(ExecResponse { process_id })
}
pub(crate) async fn fs_read_file(
&self,
params: FsReadFileParams,
) -> Result<FsReadFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.read_file(params).await
}
pub(crate) async fn fs_write_file(
&self,
params: FsWriteFileParams,
) -> Result<FsWriteFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.write_file(params).await
}
pub(crate) async fn fs_create_directory(
&self,
params: FsCreateDirectoryParams,
) -> Result<FsCreateDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.create_directory(params).await
}
pub(crate) async fn fs_get_metadata(
&self,
params: FsGetMetadataParams,
) -> Result<FsGetMetadataResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.get_metadata(params).await
}
pub(crate) async fn fs_read_directory(
&self,
params: FsReadDirectoryParams,
) -> Result<FsReadDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.read_directory(params).await
}
pub(crate) async fn fs_remove(
&self,
params: FsRemoveParams,
) -> Result<FsRemoveResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.remove(params).await
}
pub(crate) async fn fs_copy(
&self,
params: FsCopyParams,
) -> Result<FsCopyResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
self.file_system.copy(params).await
}
pub(crate) async fn read(
&self,
params: crate::protocol::ReadParams,
) -> Result<ReadResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
let deadline = tokio::time::Instant::now() + wait;
loop {
let (response, output_notify) = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let mut chunks = Vec::new();
let mut total_bytes = 0;
let mut next_seq = process.next_seq;
for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) {
let chunk_len = retained.chunk.len();
if !chunks.is_empty() && total_bytes + chunk_len > max_bytes {
break;
}
total_bytes += chunk_len;
chunks.push(ProcessOutputChunk {
seq: retained.seq,
stream: retained.stream,
chunk: retained.chunk.clone().into(),
});
next_seq = retained.seq + 1;
if total_bytes >= max_bytes {
break;
}
}
(
ReadResponse {
chunks,
next_seq,
exited: process.exit_code.is_some(),
exit_code: process.exit_code,
},
Arc::clone(&process.output_notify),
)
};
if !response.chunks.is_empty()
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
return Ok(response);
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(response);
}
let _ = tokio::time::timeout(remaining, output_notify.notified()).await;
}
}
pub(crate) async fn write(
&self,
params: crate::protocol::WriteParams,
) -> Result<WriteResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let writer_tx = {
let process_map = self.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
if !process.tty {
return Err(invalid_request(format!(
"stdin is closed for process {}",
params.process_id
)));
}
process.session.writer_sender()
};
writer_tx
.send(params.chunk.into_inner())
.await
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
Ok(WriteResponse { accepted: true })
}
pub(crate) async fn terminate(
&self,
params: crate::protocol::TerminateParams,
) -> Result<TerminateResponse, codex_app_server_protocol::JSONRPCErrorError> {
self.require_initialized()?;
let running = {
let process_map = self.processes.lock().await;
if let Some(process) = process_map.get(&params.process_id) {
process.session.terminate();
true
} else {
false
}
};
Ok(TerminateResponse { running })
}
}
async fn stream_output(
process_id: String,
stream: ExecOutputStream,
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
notification_tx: mpsc::Sender<ExecServerServerNotification>,
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
output_notify: Arc<Notify>,
) {
while let Some(chunk) = receiver.recv().await {
let notification = {
let mut processes = processes.lock().await;
let Some(process) = processes.get_mut(&process_id) else {
break;
};
let seq = process.next_seq;
process.next_seq += 1;
process.retained_bytes += chunk.len();
process.output.push_back(RetainedOutputChunk {
seq,
stream,
chunk: chunk.clone(),
});
while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS {
let Some(evicted) = process.output.pop_front() else {
break;
};
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
warn!(
"retained output cap exceeded for process {process_id}; dropping oldest output"
);
}
ExecOutputDeltaNotification {
process_id: process_id.clone(),
stream,
chunk: chunk.into(),
}
};
output_notify.notify_waiters();
if notification_tx
.send(ExecServerServerNotification::OutputDelta(notification))
.await
.is_err()
{
break;
}
}
}
async fn watch_exit(
process_id: String,
exit_rx: tokio::sync::oneshot::Receiver<i32>,
notification_tx: mpsc::Sender<ExecServerServerNotification>,
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
{
let mut processes = processes.lock().await;
if let Some(process) = processes.get_mut(&process_id) {
process.exit_code = Some(exit_code);
}
}
output_notify.notify_waiters();
let _ = notification_tx
.send(ExecServerServerNotification::Exited(
ExecExitedNotification {
process_id,
exit_code,
},
))
.await;
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,226 @@
use std::collections::HashMap;
use pretty_assertions::assert_eq;
use super::ExecServerHandler;
use crate::protocol::ExecParams;
use crate::protocol::ExecSandboxConfig;
use crate::protocol::ExecSandboxMode;
use crate::protocol::InitializeParams;
use crate::protocol::PROTOCOL_VERSION;
use crate::protocol::TerminateParams;
use crate::protocol::WriteParams;
fn exec_params(process_id: &str) -> ExecParams {
ExecParams {
process_id: process_id.to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 30".to_string(),
],
cwd: std::env::current_dir().expect("cwd"),
env: HashMap::new(),
tty: false,
arg0: None,
sandbox: None,
}
}
async fn initialized_handler() -> ExecServerHandler {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(8);
let mut handler = ExecServerHandler::new(notification_tx, None);
let response = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect("initialize should succeed");
assert_eq!(response.protocol_version, PROTOCOL_VERSION);
handler
.initialized()
.expect("initialized notification should succeed");
handler
}
#[tokio::test]
async fn initialize_reports_protocol_version() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let response = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect("initialize should succeed");
assert_eq!(response.protocol_version, PROTOCOL_VERSION);
}
#[tokio::test]
async fn exec_methods_require_initialize() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let handler = ExecServerHandler::new(notification_tx, None);
let error = handler
.exec(exec_params("proc-1"))
.await
.expect_err("exec should fail before initialize");
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"client must call initialize before using exec methods"
);
}
#[tokio::test]
async fn exec_methods_require_initialized_notification_after_initialize() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let _ = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect("initialize should succeed");
let error = handler
.exec(exec_params("proc-1"))
.await
.expect_err("exec should fail before initialized notification");
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"client must send initialized before using exec methods"
);
}
#[tokio::test]
async fn initialized_before_initialize_is_a_protocol_error() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let error = handler
.initialized()
.expect_err("expected protocol error for early initialized notification");
assert_eq!(
error,
"received `initialized` notification before `initialize`"
);
}
#[tokio::test]
async fn initialize_may_only_be_sent_once_per_connection() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let _ = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect("first initialize should succeed");
let error = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect_err("duplicate initialize should fail");
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"initialize may only be sent once per connection"
);
}
#[tokio::test]
async fn initialize_rejects_invalid_auth_token() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, Some("secret-token".to_string()));
let error = handler
.initialize(InitializeParams {
client_name: "test".to_string(),
auth_token: Some("wrong-token".to_string()),
})
.expect_err("invalid auth token should fail");
assert_eq!(error.code, -32001);
assert_eq!(error.message, "invalid exec-server auth token");
}
#[tokio::test]
async fn exec_rejects_host_default_sandbox_mode() {
let handler = initialized_handler().await;
let error = handler
.exec(ExecParams {
sandbox: Some(ExecSandboxConfig {
mode: ExecSandboxMode::HostDefault,
}),
..exec_params("proc-1")
})
.await
.expect_err("hostDefault sandbox should be rejected");
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"sandbox mode `hostDefault` is not supported by exec-server yet"
);
}
#[tokio::test]
async fn exec_rejects_duplicate_process_ids() {
let handler = initialized_handler().await;
let first = handler
.exec(exec_params("proc-1"))
.await
.expect("first exec should succeed");
assert_eq!(first.process_id, "proc-1");
let error = handler
.exec(exec_params("proc-1"))
.await
.expect_err("duplicate process id should fail");
assert_eq!(error.code, -32600);
assert_eq!(error.message, "process proc-1 already exists");
handler.shutdown().await;
}
#[tokio::test]
async fn write_rejects_unknown_process_ids() {
let handler = initialized_handler().await;
let error = handler
.write(WriteParams {
process_id: "missing".to_string(),
chunk: b"input".to_vec().into(),
})
.await
.expect_err("writing to an unknown process should fail");
assert_eq!(error.code, -32600);
assert_eq!(error.message, "unknown process id missing");
}
#[tokio::test]
async fn terminate_reports_missing_processes_as_not_running() {
let handler = initialized_handler().await;
let response = handler
.terminate(TerminateParams {
process_id: "missing".to_string(),
})
.await
.expect("terminate should succeed");
assert_eq!(response.running, false);
}

View File

@@ -0,0 +1,33 @@
use codex_app_server_protocol::JSONRPCErrorError;
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32600,
data: None,
message,
}
}
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32602,
data: None,
message,
}
}
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32603,
data: None,
message,
}
}
pub(crate) fn unauthorized(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32001,
data: None,
message,
}
}

View File

@@ -0,0 +1,407 @@
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::warn;
use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::FS_COPY_METHOD;
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
use crate::protocol::FS_GET_METADATA_METHOD;
use crate::protocol::FS_READ_DIRECTORY_METHOD;
use crate::protocol::FS_READ_FILE_METHOD;
use crate::protocol::FS_REMOVE_METHOD;
use crate::protocol::FS_WRITE_FILE_METHOD;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::server::ExecServerConfig;
use crate::server::ExecServerHandler;
use crate::server::ExecServerServerNotification;
use crate::server::internal_error;
use crate::server::invalid_params;
use crate::server::invalid_request;
pub(crate) async fn run_connection(connection: JsonRpcConnection, config: ExecServerConfig) {
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
let json_outgoing_tx_for_notifications = json_outgoing_tx.clone();
let (notification_tx, mut notification_rx) =
mpsc::channel::<ExecServerServerNotification>(CHANNEL_CAPACITY);
let mut handler = ExecServerHandler::new(notification_tx, config.auth_token);
let outbound_task = tokio::spawn(async move {
while let Some(notification) = notification_rx.recv().await {
let json_message = match notification_message(notification) {
Ok(json_message) => json_message,
Err(err) => {
warn!("failed to serialize exec-server notification: {err}");
break;
}
};
if json_outgoing_tx_for_notifications
.send(json_message)
.await
.is_err()
{
break;
}
}
});
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::Message(message) => {
let maybe_response = match handle_connection_message(&mut handler, message).await {
Ok(maybe_response) => maybe_response,
Err(err) => {
warn!("closing exec-server connection after protocol error: {err}");
break;
}
};
if let Some(response) = maybe_response
&& json_outgoing_tx.send(response).await.is_err()
{
break;
}
}
JsonRpcConnectionEvent::Disconnected { reason } => {
if let Some(reason) = reason {
debug!("exec-server connection disconnected: {reason}");
}
break;
}
}
}
handler.shutdown().await;
drop(handler);
let _ = outbound_task.await;
}
async fn handle_connection_message(
handler: &mut ExecServerHandler,
message: JSONRPCMessage,
) -> Result<Option<JSONRPCMessage>, String> {
match message {
JSONRPCMessage::Request(request) => Ok(Some(dispatch_request(handler, request).await)),
JSONRPCMessage::Notification(notification) => {
handle_notification(handler, notification)?;
Ok(None)
}
JSONRPCMessage::Response(response) => Err(format!(
"unexpected client response for request id {:?}",
response.id
)),
JSONRPCMessage::Error(error) => Err(format!(
"unexpected client error for request id {:?}",
error.id
)),
}
}
async fn dispatch_request(
handler: &mut ExecServerHandler,
request: JSONRPCRequest,
) -> JSONRPCMessage {
let JSONRPCRequest {
id,
method,
params,
trace: _,
} = request;
let params = params.unwrap_or(serde_json::Value::Null);
match method.as_str() {
INITIALIZE_METHOD => request_response(
id,
parse_params(params).and_then(|params| handler.initialize(params)),
),
EXEC_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.exec(params)).await,
),
EXEC_READ_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.read(params)).await,
),
EXEC_WRITE_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.write(params)).await,
),
EXEC_TERMINATE_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.terminate(params)).await,
),
FS_READ_FILE_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_read_file(params)).await,
),
FS_WRITE_FILE_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_write_file(params)).await,
),
FS_CREATE_DIRECTORY_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_create_directory(params)).await,
),
FS_GET_METADATA_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_get_metadata(params)).await,
),
FS_READ_DIRECTORY_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_read_directory(params)).await,
),
FS_REMOVE_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_remove(params)).await,
),
FS_COPY_METHOD => request_response(
id,
dispatch_async_request(params, |params| handler.fs_copy(params)).await,
),
other => jsonrpc_error_response(id, invalid_request(format!("unknown method: {other}"))),
}
}
fn handle_notification(
handler: &mut ExecServerHandler,
notification: JSONRPCNotification,
) -> Result<(), String> {
match notification.method.as_str() {
INITIALIZED_METHOD => handler.initialized(),
other => Err(format!("unexpected notification method: {other}")),
}
}
fn parse_params<P>(params: serde_json::Value) -> Result<P, JSONRPCErrorError>
where
P: DeserializeOwned,
{
serde_json::from_value(params).map_err(|err| invalid_params(err.to_string()))
}
async fn dispatch_async_request<P, T, F, Fut>(
params: serde_json::Value,
f: F,
) -> Result<T, JSONRPCErrorError>
where
P: DeserializeOwned,
F: FnOnce(P) -> Fut,
Fut: std::future::Future<Output = Result<T, JSONRPCErrorError>>,
{
match parse_params(params) {
Ok(params) => f(params).await,
Err(err) => Err(err),
}
}
fn request_response<T>(
request_id: RequestId,
result: Result<T, JSONRPCErrorError>,
) -> JSONRPCMessage
where
T: Serialize,
{
match result.and_then(serialize_response) {
Ok(result) => JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
}),
Err(error) => JSONRPCMessage::Error(JSONRPCError {
id: request_id,
error,
}),
}
}
fn serialize_response<T>(response: T) -> Result<serde_json::Value, JSONRPCErrorError>
where
T: Serialize,
{
serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))
}
fn jsonrpc_error_response(request_id: RequestId, error: JSONRPCErrorError) -> JSONRPCMessage {
JSONRPCMessage::Error(JSONRPCError {
id: request_id,
error,
})
}
fn notification_message(
notification: ExecServerServerNotification,
) -> Result<JSONRPCMessage, serde_json::Error> {
match notification {
ExecServerServerNotification::OutputDelta(params) => {
typed_notification(EXEC_OUTPUT_DELTA_METHOD, params)
}
ExecServerServerNotification::Exited(params) => {
typed_notification(EXEC_EXITED_METHOD, params)
}
}
}
fn typed_notification<T>(method: &str, params: T) -> Result<JSONRPCMessage, serde_json::Error>
where
T: Serialize,
{
Ok(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(serde_json::to_value(params)?),
}))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use pretty_assertions::assert_eq;
use super::dispatch_request;
use super::handle_connection_message;
use super::notification_message;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::PROTOCOL_VERSION;
use crate::server::ExecServerHandler;
use crate::server::ExecServerServerNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
#[tokio::test]
async fn dispatch_initialize_returns_initialize_response() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let message = dispatch_request(
&mut handler,
JSONRPCRequest {
id: RequestId::Integer(1),
method: INITIALIZE_METHOD.to_string(),
params: Some(
serde_json::to_value(InitializeParams {
client_name: "test".to_string(),
auth_token: None,
})
.expect("serialize initialize params"),
),
trace: None,
},
)
.await;
assert_eq!(
message,
JSONRPCMessage::Response(JSONRPCResponse {
id: RequestId::Integer(1),
result: serde_json::json!({
"protocolVersion": PROTOCOL_VERSION,
}),
})
);
}
#[tokio::test]
async fn dispatch_exec_returns_invalid_request_before_initialize() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let message = dispatch_request(
&mut handler,
JSONRPCRequest {
id: RequestId::Integer(7),
method: EXEC_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"argv": ["bash", "-lc", "true"],
"cwd": std::env::current_dir().expect("cwd"),
"env": HashMap::<String, String>::new(),
"tty": true,
"arg0": null,
"sandbox": null,
})),
trace: None,
},
)
.await;
let JSONRPCMessage::Error(JSONRPCError { id, error }) = message else {
panic!("expected invalid-request error");
};
assert_eq!(id, RequestId::Integer(7));
assert_eq!(error.code, -32600);
assert_eq!(
error.message,
"client must call initialize before using exec methods"
);
}
#[tokio::test]
async fn initialized_notification_before_initialize_is_protocol_error() {
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
let mut handler = ExecServerHandler::new(notification_tx, None);
let err = handle_connection_message(
&mut handler,
JSONRPCMessage::Notification(JSONRPCNotification {
method: INITIALIZED_METHOD.to_string(),
params: Some(serde_json::json!({})),
}),
)
.await
.expect_err("expected early initialized to fail");
assert_eq!(
err,
"received `initialized` notification before `initialize`"
);
}
#[test]
fn notification_message_serializes_process_output() {
let message = notification_message(ExecServerServerNotification::OutputDelta(
ExecOutputDeltaNotification {
process_id: "proc-1".to_string(),
stream: ExecOutputStream::Stdout,
chunk: b"hello".to_vec().into(),
},
))
.expect("serialize notification");
assert_eq!(
message,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"stream": "stdout",
"chunk": "aGVsbG8=",
})),
})
);
}
}

View File

@@ -0,0 +1,161 @@
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;
use crate::connection::JsonRpcConnection;
use crate::server::ExecServerConfig;
use crate::server::processor::run_connection;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecServerTransport {
WebSocket { bind_address: SocketAddr },
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ExecServerTransportParseError {
UnsupportedListenUrl(String),
InvalidWebSocketListenUrl(String),
}
impl std::fmt::Display for ExecServerTransportParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
}
}
}
impl std::error::Error for ExecServerTransportParseError {}
impl ExecServerTransport {
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
pub fn from_listen_url(listen_url: &str) -> Result<Self, ExecServerTransportParseError> {
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
let bind_address = socket_addr.parse::<SocketAddr>().map_err(|_| {
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string())
})?;
return Ok(Self::WebSocket { bind_address });
}
Err(ExecServerTransportParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
}
}
impl Default for ExecServerTransport {
fn default() -> Self {
Self::WebSocket {
bind_address: "127.0.0.1:0".parse().unwrap_or_else(|err| {
panic!("default exec-server websocket bind address should parse: {err}")
}),
}
}
}
impl FromStr for ExecServerTransport {
type Err = ExecServerTransportParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_listen_url(s)
}
}
pub(crate) async fn run_transport(
transport: ExecServerTransport,
config: ExecServerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match transport {
ExecServerTransport::WebSocket { bind_address } => {
run_websocket_listener(bind_address, config).await
}
}
}
async fn run_websocket_listener(
bind_address: SocketAddr,
config: ExecServerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
print_websocket_startup_banner(local_addr);
loop {
let (stream, peer_addr) = listener.accept().await?;
let config = config.clone();
tokio::spawn(async move {
match accept_async(stream).await {
Ok(websocket) => {
run_connection(
JsonRpcConnection::from_websocket(
websocket,
format!("exec-server websocket {peer_addr}"),
),
config,
)
.await;
}
Err(err) => {
warn!(
"failed to accept exec-server websocket connection from {peer_addr}: {err}"
);
}
}
});
}
}
#[allow(clippy::print_stderr)]
fn print_websocket_startup_banner(addr: SocketAddr) {
eprintln!("codex-exec-server listening on ws://{addr}");
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::ExecServerTransport;
#[test]
fn exec_server_transport_parses_websocket_listen_url() {
let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234")
.expect("websocket listen URL should parse");
assert_eq!(
transport,
ExecServerTransport::WebSocket {
bind_address: "127.0.0.1:1234".parse().expect("valid socket address"),
}
);
}
#[test]
fn exec_server_transport_rejects_invalid_websocket_listen_url() {
let err = ExecServerTransport::from_listen_url("ws://localhost:1234")
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
);
}
#[test]
fn exec_server_transport_rejects_unsupported_listen_url() {
let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234")
.expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`"
);
}
}