mirror of
https://github.com/openai/codex.git
synced 2026-03-23 08:36:30 +03:00
Compare commits
1 Commits
dev/cc/mul
...
mbolin/exe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08446a0c15 |
876
codex-rs/exec-server/src/client.rs
Normal file
876
codex-rs/exec-server/src/client.rs
Normal file
@@ -0,0 +1,876 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
#[cfg(test)]
|
||||
use std::sync::Mutex as StdMutex;
|
||||
#[cfg(test)]
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCopyResponse;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
use codex_app_server_protocol::FsCreateDirectoryResponse;
|
||||
use codex_app_server_protocol::FsGetMetadataParams;
|
||||
use codex_app_server_protocol::FsGetMetadataResponse;
|
||||
use codex_app_server_protocol::FsReadDirectoryParams;
|
||||
use codex_app_server_protocol::FsReadDirectoryResponse;
|
||||
use codex_app_server_protocol::FsReadFileParams;
|
||||
use codex_app_server_protocol::FsReadFileResponse;
|
||||
use codex_app_server_protocol::FsRemoveParams;
|
||||
use codex_app_server_protocol::FsRemoveResponse;
|
||||
use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_app_server_protocol::FsWriteFileResponse;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::client_api::ExecServerClientConnectOptions;
|
||||
use crate::client_api::ExecServerEvent;
|
||||
use crate::client_api::RemoteExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
use crate::protocol::EXEC_EXITED_METHOD;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
use crate::protocol::EXEC_WRITE_METHOD;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::FS_COPY_METHOD;
|
||||
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
|
||||
use crate::protocol::FS_GET_METADATA_METHOD;
|
||||
use crate::protocol::FS_READ_DIRECTORY_METHOD;
|
||||
use crate::protocol::FS_READ_FILE_METHOD;
|
||||
use crate::protocol::FS_REMOVE_METHOD;
|
||||
use crate::protocol::FS_WRITE_FILE_METHOD;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::ExecServerServerNotification;
|
||||
|
||||
impl Default for ExecServerClientConnectOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
client_name: "codex-core".to_string(),
|
||||
auth_token: None,
|
||||
initialize_timeout: INITIALIZE_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
|
||||
fn from(value: RemoteExecServerConnectArgs) -> Self {
|
||||
Self {
|
||||
client_name: value.client_name,
|
||||
auth_token: value.auth_token,
|
||||
initialize_timeout: value.initialize_timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
impl RemoteExecServerConnectArgs {
|
||||
pub fn new(websocket_url: String, client_name: String) -> Self {
|
||||
Self {
|
||||
websocket_url,
|
||||
client_name,
|
||||
auth_token: None,
|
||||
connect_timeout: CONNECT_TIMEOUT,
|
||||
initialize_timeout: INITIALIZE_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct ExecServerOutput {
|
||||
stream: crate::protocol::ExecOutputStream,
|
||||
chunk: Vec<u8>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
struct ExecServerProcess {
|
||||
process_id: String,
|
||||
output_rx: broadcast::Receiver<ExecServerOutput>,
|
||||
status: Arc<RemoteProcessStatus>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl ExecServerProcess {
|
||||
fn output_receiver(&self) -> broadcast::Receiver<ExecServerOutput> {
|
||||
self.output_rx.resubscribe()
|
||||
}
|
||||
|
||||
fn has_exited(&self) -> bool {
|
||||
self.status.has_exited()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
struct RemoteProcessStatus {
|
||||
exited: AtomicBool,
|
||||
exit_code: StdMutex<Option<i32>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl RemoteProcessStatus {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
exited: AtomicBool::new(false),
|
||||
exit_code: StdMutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_exited(&self) -> bool {
|
||||
self.exited.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
fn mark_exited(&self, exit_code: Option<i32>) {
|
||||
self.exited.store(true, Ordering::SeqCst);
|
||||
if let Ok(mut guard) = self.exit_code.lock() {
|
||||
*guard = exit_code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum PendingRequest {
|
||||
Initialize(oneshot::Sender<Result<InitializeResponse, JSONRPCErrorError>>),
|
||||
Exec(oneshot::Sender<Result<ExecResponse, JSONRPCErrorError>>),
|
||||
Read(oneshot::Sender<Result<ReadResponse, JSONRPCErrorError>>),
|
||||
Write(oneshot::Sender<Result<WriteResponse, JSONRPCErrorError>>),
|
||||
Terminate(oneshot::Sender<Result<TerminateResponse, JSONRPCErrorError>>),
|
||||
FsReadFile(oneshot::Sender<Result<FsReadFileResponse, JSONRPCErrorError>>),
|
||||
FsWriteFile(oneshot::Sender<Result<FsWriteFileResponse, JSONRPCErrorError>>),
|
||||
FsCreateDirectory(oneshot::Sender<Result<FsCreateDirectoryResponse, JSONRPCErrorError>>),
|
||||
FsGetMetadata(oneshot::Sender<Result<FsGetMetadataResponse, JSONRPCErrorError>>),
|
||||
FsReadDirectory(oneshot::Sender<Result<FsReadDirectoryResponse, JSONRPCErrorError>>),
|
||||
FsRemove(oneshot::Sender<Result<FsRemoveResponse, JSONRPCErrorError>>),
|
||||
FsCopy(oneshot::Sender<Result<FsCopyResponse, JSONRPCErrorError>>),
|
||||
}
|
||||
|
||||
impl PendingRequest {
|
||||
fn resolve_json(self, result: Value) -> Result<(), ExecServerError> {
|
||||
match self {
|
||||
PendingRequest::Initialize(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::Exec(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::Read(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::Write(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::Terminate(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsReadFile(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsWriteFile(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsCreateDirectory(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsGetMetadata(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsReadDirectory(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsRemove(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
PendingRequest::FsCopy(tx) => {
|
||||
let _ = tx.send(Ok(serde_json::from_value(result)?));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn resolve_error(self, error: JSONRPCErrorError) {
|
||||
match self {
|
||||
PendingRequest::Initialize(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::Exec(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::Read(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::Write(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::Terminate(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsReadFile(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsWriteFile(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsCreateDirectory(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsGetMetadata(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsReadDirectory(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsRemove(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
PendingRequest::FsCopy(tx) => {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ClientBackend {
|
||||
JsonRpc {
|
||||
write_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
},
|
||||
InProcess {
|
||||
handler: Arc<Mutex<ExecServerHandler>>,
|
||||
},
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
backend: ClientBackend,
|
||||
pending: Mutex<HashMap<RequestId, PendingRequest>>,
|
||||
events_tx: broadcast::Sender<ExecServerEvent>,
|
||||
next_request_id: AtomicI64,
|
||||
transport_tasks: Vec<JoinHandle<()>>,
|
||||
reader_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Drop for Inner {
|
||||
fn drop(&mut self) {
|
||||
if let ClientBackend::InProcess { handler } = &self.backend
|
||||
&& let Ok(handle) = tokio::runtime::Handle::try_current()
|
||||
{
|
||||
let handler = Arc::clone(handler);
|
||||
handle.spawn(async move {
|
||||
handler.lock().await.shutdown().await;
|
||||
});
|
||||
}
|
||||
for task in &self.transport_tasks {
|
||||
task.abort();
|
||||
}
|
||||
self.reader_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ExecServerClient {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ExecServerError {
|
||||
#[error("failed to spawn exec-server: {0}")]
|
||||
Spawn(#[source] std::io::Error),
|
||||
#[error("timed out connecting to exec-server websocket `{url}` after {timeout:?}")]
|
||||
WebSocketConnectTimeout { url: String, timeout: Duration },
|
||||
#[error("failed to connect to exec-server websocket `{url}`: {source}")]
|
||||
WebSocketConnect {
|
||||
url: String,
|
||||
#[source]
|
||||
source: tokio_tungstenite::tungstenite::Error,
|
||||
},
|
||||
#[error("timed out waiting for exec-server initialize handshake after {timeout:?}")]
|
||||
InitializeTimedOut { timeout: Duration },
|
||||
#[error("exec-server transport closed")]
|
||||
Closed,
|
||||
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
#[error("exec-server protocol error: {0}")]
|
||||
Protocol(String),
|
||||
#[error("exec-server rejected request ({code}): {message}")]
|
||||
Server { code: i64, message: String },
|
||||
}
|
||||
|
||||
impl ExecServerClient {
|
||||
pub async fn connect_in_process(
|
||||
options: ExecServerClientConnectOptions,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let (notification_tx, mut notification_rx) =
|
||||
mpsc::channel::<ExecServerServerNotification>(256);
|
||||
let handler = Arc::new(Mutex::new(ExecServerHandler::new(notification_tx, None)));
|
||||
|
||||
let inner = Arc::new_cyclic(|weak| {
|
||||
let weak = weak.clone();
|
||||
let reader_task = tokio::spawn(async move {
|
||||
while let Some(notification) = notification_rx.recv().await {
|
||||
if let Some(inner) = weak.upgrade() {
|
||||
handle_in_process_notification(&inner, notification).await;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(inner) = weak.upgrade() {
|
||||
handle_transport_shutdown(&inner).await;
|
||||
}
|
||||
});
|
||||
|
||||
Inner {
|
||||
backend: ClientBackend::InProcess { handler },
|
||||
pending: Mutex::new(HashMap::new()),
|
||||
events_tx: broadcast::channel(256).0,
|
||||
next_request_id: AtomicI64::new(1),
|
||||
transport_tasks: Vec::new(),
|
||||
reader_task,
|
||||
}
|
||||
});
|
||||
|
||||
let client = Self { inner };
|
||||
client.initialize(options).await?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub async fn connect_websocket(
|
||||
args: RemoteExecServerConnectArgs,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let connect_timeout = args.connect_timeout;
|
||||
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
|
||||
.await
|
||||
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
|
||||
url: websocket_url.clone(),
|
||||
timeout: connect_timeout,
|
||||
})?
|
||||
.map_err(|source| ExecServerError::WebSocketConnect {
|
||||
url: websocket_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
Self::connect(
|
||||
JsonRpcConnection::from_websocket(
|
||||
stream,
|
||||
format!("exec-server websocket {websocket_url}"),
|
||||
),
|
||||
args.into(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn connect(
|
||||
connection: JsonRpcConnection,
|
||||
options: ExecServerClientConnectOptions,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let (write_tx, mut incoming_rx, transport_tasks) = connection.into_parts();
|
||||
let inner = Arc::new_cyclic(|weak| {
|
||||
let weak = weak.clone();
|
||||
let reader_task = tokio::spawn(async move {
|
||||
while let Some(event) = incoming_rx.recv().await {
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
if let Some(inner) = weak.upgrade()
|
||||
&& let Err(err) = handle_server_message(&inner, message).await
|
||||
{
|
||||
warn!("exec-server client closing after protocol error: {err}");
|
||||
handle_transport_shutdown(&inner).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
if let Some(reason) = reason {
|
||||
warn!("exec-server client transport disconnected: {reason}");
|
||||
}
|
||||
if let Some(inner) = weak.upgrade() {
|
||||
handle_transport_shutdown(&inner).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(inner) = weak.upgrade() {
|
||||
handle_transport_shutdown(&inner).await;
|
||||
}
|
||||
});
|
||||
|
||||
Inner {
|
||||
backend: ClientBackend::JsonRpc { write_tx },
|
||||
pending: Mutex::new(HashMap::new()),
|
||||
events_tx: broadcast::channel(256).0,
|
||||
next_request_id: AtomicI64::new(1),
|
||||
transport_tasks,
|
||||
reader_task,
|
||||
}
|
||||
});
|
||||
|
||||
let client = Self { inner };
|
||||
client.initialize(options).await?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
|
||||
self.inner.events_tx.subscribe()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn start_process(
|
||||
&self,
|
||||
params: ExecParams,
|
||||
) -> Result<ExecServerProcess, ExecServerError> {
|
||||
let response = self.exec(params).await?;
|
||||
let process_id = response.process_id;
|
||||
let status = Arc::new(RemoteProcessStatus::new());
|
||||
let (output_tx, output_rx) = broadcast::channel(256);
|
||||
let mut events_rx = self.event_receiver();
|
||||
let status_watcher = Arc::clone(&status);
|
||||
let watch_process_id = process_id.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = events_rx.recv().await {
|
||||
match event {
|
||||
ExecServerEvent::OutputDelta(notification)
|
||||
if notification.process_id == watch_process_id =>
|
||||
{
|
||||
let _ = output_tx.send(ExecServerOutput {
|
||||
stream: notification.stream,
|
||||
chunk: notification.chunk.into_inner(),
|
||||
});
|
||||
}
|
||||
ExecServerEvent::Exited(notification)
|
||||
if notification.process_id == watch_process_id =>
|
||||
{
|
||||
status_watcher.mark_exited(Some(notification.exit_code));
|
||||
break;
|
||||
}
|
||||
ExecServerEvent::OutputDelta(_) | ExecServerEvent::Exited(_) => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ExecServerProcess {
|
||||
process_id,
|
||||
output_rx,
|
||||
status,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
self.request_exec(params).await
|
||||
}
|
||||
|
||||
pub async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
|
||||
self.request_read(params).await
|
||||
}
|
||||
|
||||
pub async fn write(
|
||||
&self,
|
||||
process_id: &str,
|
||||
chunk: Vec<u8>,
|
||||
) -> Result<WriteResponse, ExecServerError> {
|
||||
self.write_process(WriteParams {
|
||||
process_id: process_id.to_string(),
|
||||
chunk: chunk.into(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
|
||||
self.terminate_session(process_id).await
|
||||
}
|
||||
|
||||
pub async fn fs_read_file(
|
||||
&self,
|
||||
params: FsReadFileParams,
|
||||
) -> Result<FsReadFileResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_read_file(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(FS_READ_FILE_METHOD, ¶ms, PendingRequest::FsReadFile)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_write_file(
|
||||
&self,
|
||||
params: FsWriteFileParams,
|
||||
) -> Result<FsWriteFileResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_write_file(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(FS_WRITE_FILE_METHOD, ¶ms, PendingRequest::FsWriteFile)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_create_directory(
|
||||
&self,
|
||||
params: FsCreateDirectoryParams,
|
||||
) -> Result<FsCreateDirectoryResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_create_directory(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(
|
||||
FS_CREATE_DIRECTORY_METHOD,
|
||||
¶ms,
|
||||
PendingRequest::FsCreateDirectory,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_get_metadata(
|
||||
&self,
|
||||
params: FsGetMetadataParams,
|
||||
) -> Result<FsGetMetadataResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_get_metadata(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(
|
||||
FS_GET_METADATA_METHOD,
|
||||
¶ms,
|
||||
PendingRequest::FsGetMetadata,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_read_directory(
|
||||
&self,
|
||||
params: FsReadDirectoryParams,
|
||||
) -> Result<FsReadDirectoryResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_read_directory(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(
|
||||
FS_READ_DIRECTORY_METHOD,
|
||||
¶ms,
|
||||
PendingRequest::FsReadDirectory,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_remove(
|
||||
&self,
|
||||
params: FsRemoveParams,
|
||||
) -> Result<FsRemoveResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_remove(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(FS_REMOVE_METHOD, ¶ms, PendingRequest::FsRemove)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fs_copy(&self, params: FsCopyParams) -> Result<FsCopyResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.fs_copy(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(FS_COPY_METHOD, ¶ms, PendingRequest::FsCopy)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn initialize(
|
||||
&self,
|
||||
options: ExecServerClientConnectOptions,
|
||||
) -> Result<(), ExecServerError> {
|
||||
let ExecServerClientConnectOptions {
|
||||
client_name,
|
||||
auth_token,
|
||||
initialize_timeout,
|
||||
} = options;
|
||||
timeout(initialize_timeout, async {
|
||||
let _: InitializeResponse = self
|
||||
.request_initialize(InitializeParams {
|
||||
client_name,
|
||||
auth_token,
|
||||
})
|
||||
.await?;
|
||||
self.notify(INITIALIZED_METHOD, &serde_json::json!({}))
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| ExecServerError::InitializeTimedOut {
|
||||
timeout: initialize_timeout,
|
||||
})?
|
||||
}
|
||||
|
||||
async fn request_exec(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.exec(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(EXEC_METHOD, ¶ms, PendingRequest::Exec)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn write_process(&self, params: WriteParams) -> Result<WriteResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.write(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(EXEC_WRITE_METHOD, ¶ms, PendingRequest::Write)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn request_read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.read(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(EXEC_READ_METHOD, ¶ms, PendingRequest::Read)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn terminate_session(
|
||||
&self,
|
||||
process_id: &str,
|
||||
) -> Result<TerminateResponse, ExecServerError> {
|
||||
let params = TerminateParams {
|
||||
process_id: process_id.to_string(),
|
||||
};
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.terminate(params).await);
|
||||
}
|
||||
|
||||
self.send_pending_request(EXEC_TERMINATE_METHOD, ¶ms, PendingRequest::Terminate)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn notify<P: Serialize>(&self, method: &str, params: &P) -> Result<(), ExecServerError> {
|
||||
match &self.inner.backend {
|
||||
ClientBackend::JsonRpc { write_tx } => {
|
||||
let params = serde_json::to_value(params)?;
|
||||
write_tx
|
||||
.send(JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
}))
|
||||
.await
|
||||
.map_err(|_| ExecServerError::Closed)
|
||||
}
|
||||
ClientBackend::InProcess { handler } => match method {
|
||||
INITIALIZED_METHOD => handler
|
||||
.lock()
|
||||
.await
|
||||
.initialized()
|
||||
.map_err(ExecServerError::Protocol),
|
||||
other => Err(ExecServerError::Protocol(format!(
|
||||
"unsupported in-process notification method `{other}`"
|
||||
))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_initialize(
|
||||
&self,
|
||||
params: InitializeParams,
|
||||
) -> Result<InitializeResponse, ExecServerError> {
|
||||
if let ClientBackend::InProcess { handler } = &self.inner.backend {
|
||||
return server_result_to_client(handler.lock().await.initialize(params));
|
||||
}
|
||||
|
||||
self.send_pending_request(INITIALIZE_METHOD, ¶ms, PendingRequest::Initialize)
|
||||
.await
|
||||
}
|
||||
|
||||
fn next_request_id(&self) -> RequestId {
|
||||
RequestId::Integer(self.inner.next_request_id.fetch_add(1, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
async fn send_pending_request<P, T>(
|
||||
&self,
|
||||
method: &str,
|
||||
params: &P,
|
||||
build_pending: impl FnOnce(oneshot::Sender<Result<T, JSONRPCErrorError>>) -> PendingRequest,
|
||||
) -> Result<T, ExecServerError>
|
||||
where
|
||||
P: Serialize,
|
||||
{
|
||||
let request_id = self.next_request_id();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.inner
|
||||
.pending
|
||||
.lock()
|
||||
.await
|
||||
.insert(request_id.clone(), build_pending(response_tx));
|
||||
let ClientBackend::JsonRpc { write_tx } = &self.inner.backend else {
|
||||
unreachable!("in-process requests return before JSON-RPC setup");
|
||||
};
|
||||
let send_result = send_jsonrpc_request(write_tx, request_id.clone(), method, params).await;
|
||||
self.finish_request(request_id, send_result, response_rx)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn finish_request<T>(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
send_result: Result<(), ExecServerError>,
|
||||
response_rx: oneshot::Receiver<Result<T, JSONRPCErrorError>>,
|
||||
) -> Result<T, ExecServerError> {
|
||||
if let Err(err) = send_result {
|
||||
self.inner.pending.lock().await.remove(&request_id);
|
||||
return Err(err);
|
||||
}
|
||||
receive_typed_response(response_rx).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn receive_typed_response<T>(
|
||||
response_rx: oneshot::Receiver<Result<T, JSONRPCErrorError>>,
|
||||
) -> Result<T, ExecServerError> {
|
||||
let result = response_rx.await.map_err(|_| ExecServerError::Closed)?;
|
||||
match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => Err(ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn server_result_to_client<T>(result: Result<T, JSONRPCErrorError>) -> Result<T, ExecServerError> {
|
||||
match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => Err(ExecServerError::Server {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_jsonrpc_request<P: Serialize>(
|
||||
write_tx: &mpsc::Sender<JSONRPCMessage>,
|
||||
request_id: RequestId,
|
||||
method: &str,
|
||||
params: &P,
|
||||
) -> Result<(), ExecServerError> {
|
||||
let params = serde_json::to_value(params)?;
|
||||
write_tx
|
||||
.send(JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: request_id,
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
trace: None,
|
||||
}))
|
||||
.await
|
||||
.map_err(|_| ExecServerError::Closed)
|
||||
}
|
||||
|
||||
async fn handle_in_process_notification(
|
||||
inner: &Arc<Inner>,
|
||||
notification: ExecServerServerNotification,
|
||||
) {
|
||||
match notification {
|
||||
ExecServerServerNotification::OutputDelta(params) => {
|
||||
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
|
||||
}
|
||||
ExecServerServerNotification::Exited(params) => {
|
||||
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_message(
|
||||
inner: &Arc<Inner>,
|
||||
message: JSONRPCMessage,
|
||||
) -> Result<(), ExecServerError> {
|
||||
match message {
|
||||
JSONRPCMessage::Response(JSONRPCResponse { id, result }) => {
|
||||
if let Some(pending) = inner.pending.lock().await.remove(&id) {
|
||||
pending.resolve_json(result)?;
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Error(JSONRPCError { id, error }) => {
|
||||
if let Some(pending) = inner.pending.lock().await.remove(&id) {
|
||||
pending.resolve_error(error);
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
handle_server_notification(inner, notification).await?;
|
||||
}
|
||||
JSONRPCMessage::Request(request) => {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"unexpected exec-server request from remote server: {}",
|
||||
request.method
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_server_notification(
|
||||
inner: &Arc<Inner>,
|
||||
notification: JSONRPCNotification,
|
||||
) -> Result<(), ExecServerError> {
|
||||
match notification.method.as_str() {
|
||||
EXEC_OUTPUT_DELTA_METHOD => {
|
||||
let params: ExecOutputDeltaNotification =
|
||||
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
|
||||
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
|
||||
}
|
||||
EXEC_EXITED_METHOD => {
|
||||
let params: ExecExitedNotification =
|
||||
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
|
||||
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
|
||||
}
|
||||
other => {
|
||||
debug!("ignoring unknown exec-server notification: {other}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_transport_shutdown(inner: &Arc<Inner>) {
|
||||
let pending = {
|
||||
let mut pending = inner.pending.lock().await;
|
||||
pending
|
||||
.drain()
|
||||
.map(|(_, pending)| pending)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
for pending in pending {
|
||||
pending.resolve_error(JSONRPCErrorError {
|
||||
code: -32000,
|
||||
data: None,
|
||||
message: "exec-server transport closed".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
37
codex-rs/exec-server/src/server.rs
Normal file
37
codex-rs/exec-server/src/server.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
mod filesystem;
|
||||
mod handler;
|
||||
mod jsonrpc;
|
||||
mod processor;
|
||||
mod transport;
|
||||
|
||||
pub(crate) use handler::ExecServerHandler;
|
||||
pub(crate) use handler::ExecServerServerNotification;
|
||||
pub(crate) use jsonrpc::internal_error;
|
||||
pub(crate) use jsonrpc::invalid_params;
|
||||
pub(crate) use jsonrpc::invalid_request;
|
||||
pub(crate) use jsonrpc::unauthorized;
|
||||
pub use transport::ExecServerTransport;
|
||||
pub use transport::ExecServerTransportParseError;
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct ExecServerConfig {
|
||||
pub auth_token: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
run_main_with_transport_and_config(ExecServerTransport::default(), ExecServerConfig::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn run_main_with_transport(
|
||||
transport: ExecServerTransport,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
run_main_with_transport_and_config(transport, ExecServerConfig::default()).await
|
||||
}
|
||||
|
||||
pub async fn run_main_with_transport_and_config(
|
||||
transport: ExecServerTransport,
|
||||
config: ExecServerConfig,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
transport::run_transport(transport, config).await
|
||||
}
|
||||
170
codex-rs/exec-server/src/server/filesystem.rs
Normal file
170
codex-rs/exec-server/src/server/filesystem.rs
Normal file
@@ -0,0 +1,170 @@
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCopyResponse;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
use codex_app_server_protocol::FsCreateDirectoryResponse;
|
||||
use codex_app_server_protocol::FsGetMetadataParams;
|
||||
use codex_app_server_protocol::FsGetMetadataResponse;
|
||||
use codex_app_server_protocol::FsReadDirectoryEntry;
|
||||
use codex_app_server_protocol::FsReadDirectoryParams;
|
||||
use codex_app_server_protocol::FsReadDirectoryResponse;
|
||||
use codex_app_server_protocol::FsReadFileParams;
|
||||
use codex_app_server_protocol::FsReadFileResponse;
|
||||
use codex_app_server_protocol::FsRemoveParams;
|
||||
use codex_app_server_protocol::FsRemoveResponse;
|
||||
use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_app_server_protocol::FsWriteFileResponse;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_environment::CopyOptions;
|
||||
use codex_environment::CreateDirectoryOptions;
|
||||
use codex_environment::Environment;
|
||||
use codex_environment::ExecutorFileSystem;
|
||||
use codex_environment::RemoveOptions;
|
||||
|
||||
use crate::server::internal_error;
|
||||
use crate::server::invalid_request;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ExecServerFileSystem {
|
||||
file_system: Arc<dyn ExecutorFileSystem>,
|
||||
}
|
||||
|
||||
impl Default for ExecServerFileSystem {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
file_system: Environment::default().get_filesystem(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecServerFileSystem {
|
||||
pub(crate) async fn read_file(
|
||||
&self,
|
||||
params: FsReadFileParams,
|
||||
) -> Result<FsReadFileResponse, JSONRPCErrorError> {
|
||||
let bytes = self
|
||||
.file_system
|
||||
.read_file(¶ms.path)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsReadFileResponse {
|
||||
data_base64: STANDARD.encode(bytes),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn write_file(
|
||||
&self,
|
||||
params: FsWriteFileParams,
|
||||
) -> Result<FsWriteFileResponse, JSONRPCErrorError> {
|
||||
let bytes = STANDARD.decode(params.data_base64).map_err(|err| {
|
||||
invalid_request(format!(
|
||||
"fs/writeFile requires valid base64 dataBase64: {err}"
|
||||
))
|
||||
})?;
|
||||
self.file_system
|
||||
.write_file(¶ms.path, bytes)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsWriteFileResponse {})
|
||||
}
|
||||
|
||||
pub(crate) async fn create_directory(
|
||||
&self,
|
||||
params: FsCreateDirectoryParams,
|
||||
) -> Result<FsCreateDirectoryResponse, JSONRPCErrorError> {
|
||||
self.file_system
|
||||
.create_directory(
|
||||
¶ms.path,
|
||||
CreateDirectoryOptions {
|
||||
recursive: params.recursive.unwrap_or(true),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsCreateDirectoryResponse {})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_metadata(
|
||||
&self,
|
||||
params: FsGetMetadataParams,
|
||||
) -> Result<FsGetMetadataResponse, JSONRPCErrorError> {
|
||||
let metadata = self
|
||||
.file_system
|
||||
.get_metadata(¶ms.path)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsGetMetadataResponse {
|
||||
is_directory: metadata.is_directory,
|
||||
is_file: metadata.is_file,
|
||||
created_at_ms: metadata.created_at_ms,
|
||||
modified_at_ms: metadata.modified_at_ms,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn read_directory(
|
||||
&self,
|
||||
params: FsReadDirectoryParams,
|
||||
) -> Result<FsReadDirectoryResponse, JSONRPCErrorError> {
|
||||
let entries = self
|
||||
.file_system
|
||||
.read_directory(¶ms.path)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsReadDirectoryResponse {
|
||||
entries: entries
|
||||
.into_iter()
|
||||
.map(|entry| FsReadDirectoryEntry {
|
||||
file_name: entry.file_name,
|
||||
is_directory: entry.is_directory,
|
||||
is_file: entry.is_file,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn remove(
|
||||
&self,
|
||||
params: FsRemoveParams,
|
||||
) -> Result<FsRemoveResponse, JSONRPCErrorError> {
|
||||
self.file_system
|
||||
.remove(
|
||||
¶ms.path,
|
||||
RemoveOptions {
|
||||
recursive: params.recursive.unwrap_or(true),
|
||||
force: params.force.unwrap_or(true),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsRemoveResponse {})
|
||||
}
|
||||
|
||||
pub(crate) async fn copy(
|
||||
&self,
|
||||
params: FsCopyParams,
|
||||
) -> Result<FsCopyResponse, JSONRPCErrorError> {
|
||||
self.file_system
|
||||
.copy(
|
||||
¶ms.source_path,
|
||||
¶ms.destination_path,
|
||||
CopyOptions {
|
||||
recursive: params.recursive,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(map_fs_error)?;
|
||||
Ok(FsCopyResponse {})
|
||||
}
|
||||
}
|
||||
|
||||
fn map_fs_error(err: io::Error) -> JSONRPCErrorError {
|
||||
if err.kind() == io::ErrorKind::InvalidInput {
|
||||
invalid_request(err.to_string())
|
||||
} else {
|
||||
internal_error(err.to_string())
|
||||
}
|
||||
}
|
||||
496
codex-rs/exec-server/src/server/handler.rs
Normal file
496
codex-rs/exec-server/src/server/handler.rs
Normal file
@@ -0,0 +1,496 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCopyResponse;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
use codex_app_server_protocol::FsCreateDirectoryResponse;
|
||||
use codex_app_server_protocol::FsGetMetadataParams;
|
||||
use codex_app_server_protocol::FsGetMetadataResponse;
|
||||
use codex_app_server_protocol::FsReadDirectoryParams;
|
||||
use codex_app_server_protocol::FsReadDirectoryResponse;
|
||||
use codex_app_server_protocol::FsReadFileParams;
|
||||
use codex_app_server_protocol::FsReadFileResponse;
|
||||
use codex_app_server_protocol::FsRemoveParams;
|
||||
use codex_app_server_protocol::FsRemoveResponse;
|
||||
use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_app_server_protocol::FsWriteFileResponse;
|
||||
use codex_utils_pty::ExecCommandSession;
|
||||
use codex_utils_pty::TerminalSize;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::ExecSandboxMode;
|
||||
use crate::protocol::InitializeResponse;
|
||||
use crate::protocol::PROTOCOL_VERSION;
|
||||
use crate::protocol::ProcessOutputChunk;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::server::filesystem::ExecServerFileSystem;
|
||||
use crate::server::internal_error;
|
||||
use crate::server::invalid_params;
|
||||
use crate::server::invalid_request;
|
||||
use crate::server::unauthorized;
|
||||
|
||||
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum ExecServerServerNotification {
|
||||
OutputDelta(ExecOutputDeltaNotification),
|
||||
Exited(ExecExitedNotification),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RetainedOutputChunk {
|
||||
seq: u64,
|
||||
stream: ExecOutputStream,
|
||||
chunk: Vec<u8>,
|
||||
}
|
||||
|
||||
struct RunningProcess {
|
||||
session: ExecCommandSession,
|
||||
tty: bool,
|
||||
output: VecDeque<RetainedOutputChunk>,
|
||||
retained_bytes: usize,
|
||||
next_seq: u64,
|
||||
exit_code: Option<i32>,
|
||||
output_notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
pub(crate) struct ExecServerHandler {
|
||||
notification_tx: mpsc::Sender<ExecServerServerNotification>,
|
||||
file_system: ExecServerFileSystem,
|
||||
required_auth_token: Option<String>,
|
||||
// Keyed by client-chosen logical `processId` scoped to this connection.
|
||||
// This is a protocol handle, not an OS pid.
|
||||
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
|
||||
initialize_requested: bool,
|
||||
initialized: bool,
|
||||
}
|
||||
|
||||
impl ExecServerHandler {
|
||||
pub(crate) fn new(
|
||||
notification_tx: mpsc::Sender<ExecServerServerNotification>,
|
||||
required_auth_token: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
notification_tx,
|
||||
file_system: ExecServerFileSystem::default(),
|
||||
required_auth_token,
|
||||
processes: Arc::new(Mutex::new(HashMap::new())),
|
||||
initialize_requested: false,
|
||||
initialized: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
let remaining = {
|
||||
let mut processes = self.processes.lock().await;
|
||||
processes
|
||||
.drain()
|
||||
.map(|(_, process)| process)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
for process in remaining {
|
||||
process.session.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn initialized(&mut self) -> Result<(), String> {
|
||||
if !self.initialize_requested {
|
||||
return Err("received `initialized` notification before `initialize`".into());
|
||||
}
|
||||
self.initialized = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn initialize(
|
||||
&mut self,
|
||||
params: crate::protocol::InitializeParams,
|
||||
) -> Result<InitializeResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
if self.initialize_requested {
|
||||
return Err(invalid_request(
|
||||
"initialize may only be sent once per connection".to_string(),
|
||||
));
|
||||
}
|
||||
if let Some(required_auth_token) = &self.required_auth_token
|
||||
&& params.auth_token.as_deref() != Some(required_auth_token.as_str())
|
||||
{
|
||||
return Err(unauthorized("invalid exec-server auth token".to_string()));
|
||||
}
|
||||
self.initialize_requested = true;
|
||||
Ok(InitializeResponse {
|
||||
protocol_version: PROTOCOL_VERSION.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn require_initialized(&self) -> Result<(), codex_app_server_protocol::JSONRPCErrorError> {
|
||||
if !self.initialize_requested {
|
||||
return Err(invalid_request(
|
||||
"client must call initialize before using exec methods".to_string(),
|
||||
));
|
||||
}
|
||||
if !self.initialized {
|
||||
return Err(invalid_request(
|
||||
"client must send initialized before using exec methods".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn exec(
|
||||
&self,
|
||||
params: crate::protocol::ExecParams,
|
||||
) -> Result<ExecResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
let process_id = params.process_id.clone();
|
||||
// Same-connection requests are serialized by the RPC processor, and the
|
||||
// in-process client holds the handler mutex across this full call. That
|
||||
// makes this pre-spawn duplicate check safe for the current entrypoints.
|
||||
{
|
||||
let process_map = self.processes.lock().await;
|
||||
if process_map.contains_key(&process_id) {
|
||||
return Err(invalid_request(format!(
|
||||
"process {process_id} already exists"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if matches!(
|
||||
params.sandbox.as_ref().map(|sandbox| sandbox.mode),
|
||||
Some(ExecSandboxMode::HostDefault)
|
||||
) {
|
||||
return Err(invalid_request(
|
||||
"sandbox mode `hostDefault` is not supported by exec-server yet".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let (program, args) = params
|
||||
.argv
|
||||
.split_first()
|
||||
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
|
||||
|
||||
let spawned = if params.tty {
|
||||
codex_utils_pty::spawn_pty_process(
|
||||
program,
|
||||
args,
|
||||
params.cwd.as_path(),
|
||||
¶ms.env,
|
||||
¶ms.arg0,
|
||||
TerminalSize::default(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
codex_utils_pty::spawn_pipe_process_no_stdin(
|
||||
program,
|
||||
args,
|
||||
params.cwd.as_path(),
|
||||
¶ms.env,
|
||||
¶ms.arg0,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.map_err(|err| internal_error(err.to_string()))?;
|
||||
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
{
|
||||
let mut process_map = self.processes.lock().await;
|
||||
process_map.insert(
|
||||
process_id.clone(),
|
||||
RunningProcess {
|
||||
session: spawned.session,
|
||||
tty: params.tty,
|
||||
output: std::collections::VecDeque::new(),
|
||||
retained_bytes: 0,
|
||||
next_seq: 1,
|
||||
exit_code: None,
|
||||
output_notify: Arc::clone(&output_notify),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
if params.tty {
|
||||
ExecOutputStream::Pty
|
||||
} else {
|
||||
ExecOutputStream::Stdout
|
||||
},
|
||||
spawned.stdout_rx,
|
||||
self.notification_tx.clone(),
|
||||
Arc::clone(&self.processes),
|
||||
Arc::clone(&output_notify),
|
||||
));
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
if params.tty {
|
||||
ExecOutputStream::Pty
|
||||
} else {
|
||||
ExecOutputStream::Stderr
|
||||
},
|
||||
spawned.stderr_rx,
|
||||
self.notification_tx.clone(),
|
||||
Arc::clone(&self.processes),
|
||||
Arc::clone(&output_notify),
|
||||
));
|
||||
tokio::spawn(watch_exit(
|
||||
process_id.clone(),
|
||||
spawned.exit_rx,
|
||||
self.notification_tx.clone(),
|
||||
Arc::clone(&self.processes),
|
||||
output_notify,
|
||||
));
|
||||
|
||||
Ok(ExecResponse { process_id })
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_read_file(
|
||||
&self,
|
||||
params: FsReadFileParams,
|
||||
) -> Result<FsReadFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.read_file(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_write_file(
|
||||
&self,
|
||||
params: FsWriteFileParams,
|
||||
) -> Result<FsWriteFileResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.write_file(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_create_directory(
|
||||
&self,
|
||||
params: FsCreateDirectoryParams,
|
||||
) -> Result<FsCreateDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.create_directory(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_get_metadata(
|
||||
&self,
|
||||
params: FsGetMetadataParams,
|
||||
) -> Result<FsGetMetadataResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.get_metadata(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_read_directory(
|
||||
&self,
|
||||
params: FsReadDirectoryParams,
|
||||
) -> Result<FsReadDirectoryResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.read_directory(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_remove(
|
||||
&self,
|
||||
params: FsRemoveParams,
|
||||
) -> Result<FsRemoveResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.remove(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn fs_copy(
|
||||
&self,
|
||||
params: FsCopyParams,
|
||||
) -> Result<FsCopyResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
self.file_system.copy(params).await
|
||||
}
|
||||
|
||||
pub(crate) async fn read(
|
||||
&self,
|
||||
params: crate::protocol::ReadParams,
|
||||
) -> Result<ReadResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
let after_seq = params.after_seq.unwrap_or(0);
|
||||
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
|
||||
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
|
||||
let deadline = tokio::time::Instant::now() + wait;
|
||||
|
||||
loop {
|
||||
let (response, output_notify) = {
|
||||
let process_map = self.processes.lock().await;
|
||||
let process = process_map.get(¶ms.process_id).ok_or_else(|| {
|
||||
invalid_request(format!("unknown process id {}", params.process_id))
|
||||
})?;
|
||||
|
||||
let mut chunks = Vec::new();
|
||||
let mut total_bytes = 0;
|
||||
let mut next_seq = process.next_seq;
|
||||
for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) {
|
||||
let chunk_len = retained.chunk.len();
|
||||
if !chunks.is_empty() && total_bytes + chunk_len > max_bytes {
|
||||
break;
|
||||
}
|
||||
total_bytes += chunk_len;
|
||||
chunks.push(ProcessOutputChunk {
|
||||
seq: retained.seq,
|
||||
stream: retained.stream,
|
||||
chunk: retained.chunk.clone().into(),
|
||||
});
|
||||
next_seq = retained.seq + 1;
|
||||
if total_bytes >= max_bytes {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
ReadResponse {
|
||||
chunks,
|
||||
next_seq,
|
||||
exited: process.exit_code.is_some(),
|
||||
exit_code: process.exit_code,
|
||||
},
|
||||
Arc::clone(&process.output_notify),
|
||||
)
|
||||
};
|
||||
|
||||
if !response.chunks.is_empty()
|
||||
|| response.exited
|
||||
|| tokio::time::Instant::now() >= deadline
|
||||
{
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
if remaining.is_zero() {
|
||||
return Ok(response);
|
||||
}
|
||||
let _ = tokio::time::timeout(remaining, output_notify.notified()).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn write(
|
||||
&self,
|
||||
params: crate::protocol::WriteParams,
|
||||
) -> Result<WriteResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
let writer_tx = {
|
||||
let process_map = self.processes.lock().await;
|
||||
let process = process_map.get(¶ms.process_id).ok_or_else(|| {
|
||||
invalid_request(format!("unknown process id {}", params.process_id))
|
||||
})?;
|
||||
if !process.tty {
|
||||
return Err(invalid_request(format!(
|
||||
"stdin is closed for process {}",
|
||||
params.process_id
|
||||
)));
|
||||
}
|
||||
process.session.writer_sender()
|
||||
};
|
||||
|
||||
writer_tx
|
||||
.send(params.chunk.into_inner())
|
||||
.await
|
||||
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
|
||||
|
||||
Ok(WriteResponse { accepted: true })
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate(
|
||||
&self,
|
||||
params: crate::protocol::TerminateParams,
|
||||
) -> Result<TerminateResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
self.require_initialized()?;
|
||||
let running = {
|
||||
let process_map = self.processes.lock().await;
|
||||
if let Some(process) = process_map.get(¶ms.process_id) {
|
||||
process.session.terminate();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
Ok(TerminateResponse { running })
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_output(
|
||||
process_id: String,
|
||||
stream: ExecOutputStream,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
|
||||
notification_tx: mpsc::Sender<ExecServerServerNotification>,
|
||||
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
|
||||
output_notify: Arc<Notify>,
|
||||
) {
|
||||
while let Some(chunk) = receiver.recv().await {
|
||||
let notification = {
|
||||
let mut processes = processes.lock().await;
|
||||
let Some(process) = processes.get_mut(&process_id) else {
|
||||
break;
|
||||
};
|
||||
let seq = process.next_seq;
|
||||
process.next_seq += 1;
|
||||
process.retained_bytes += chunk.len();
|
||||
process.output.push_back(RetainedOutputChunk {
|
||||
seq,
|
||||
stream,
|
||||
chunk: chunk.clone(),
|
||||
});
|
||||
while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS {
|
||||
let Some(evicted) = process.output.pop_front() else {
|
||||
break;
|
||||
};
|
||||
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
|
||||
warn!(
|
||||
"retained output cap exceeded for process {process_id}; dropping oldest output"
|
||||
);
|
||||
}
|
||||
ExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
stream,
|
||||
chunk: chunk.into(),
|
||||
}
|
||||
};
|
||||
output_notify.notify_waiters();
|
||||
|
||||
if notification_tx
|
||||
.send(ExecServerServerNotification::OutputDelta(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn watch_exit(
|
||||
process_id: String,
|
||||
exit_rx: tokio::sync::oneshot::Receiver<i32>,
|
||||
notification_tx: mpsc::Sender<ExecServerServerNotification>,
|
||||
processes: Arc<Mutex<HashMap<String, RunningProcess>>>,
|
||||
output_notify: Arc<Notify>,
|
||||
) {
|
||||
let exit_code = exit_rx.await.unwrap_or(-1);
|
||||
{
|
||||
let mut processes = processes.lock().await;
|
||||
if let Some(process) = processes.get_mut(&process_id) {
|
||||
process.exit_code = Some(exit_code);
|
||||
}
|
||||
}
|
||||
output_notify.notify_waiters();
|
||||
let _ = notification_tx
|
||||
.send(ExecServerServerNotification::Exited(
|
||||
ExecExitedNotification {
|
||||
process_id,
|
||||
exit_code,
|
||||
},
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
226
codex-rs/exec-server/src/server/handler/tests.rs
Normal file
226
codex-rs/exec-server/src/server/handler/tests.rs
Normal file
@@ -0,0 +1,226 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::ExecServerHandler;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecSandboxConfig;
|
||||
use crate::protocol::ExecSandboxMode;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::PROTOCOL_VERSION;
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::WriteParams;
|
||||
|
||||
fn exec_params(process_id: &str) -> ExecParams {
|
||||
ExecParams {
|
||||
process_id: process_id.to_string(),
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 30".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir().expect("cwd"),
|
||||
env: HashMap::new(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
sandbox: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialized_handler() -> ExecServerHandler {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(8);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
let response = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect("initialize should succeed");
|
||||
assert_eq!(response.protocol_version, PROTOCOL_VERSION);
|
||||
handler
|
||||
.initialized()
|
||||
.expect("initialized notification should succeed");
|
||||
handler
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_reports_protocol_version() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let response = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect("initialize should succeed");
|
||||
|
||||
assert_eq!(response.protocol_version, PROTOCOL_VERSION);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_methods_require_initialize() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let error = handler
|
||||
.exec(exec_params("proc-1"))
|
||||
.await
|
||||
.expect_err("exec should fail before initialize");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"client must call initialize before using exec methods"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_methods_require_initialized_notification_after_initialize() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
let _ = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect("initialize should succeed");
|
||||
|
||||
let error = handler
|
||||
.exec(exec_params("proc-1"))
|
||||
.await
|
||||
.expect_err("exec should fail before initialized notification");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"client must send initialized before using exec methods"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialized_before_initialize_is_a_protocol_error() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let error = handler
|
||||
.initialized()
|
||||
.expect_err("expected protocol error for early initialized notification");
|
||||
|
||||
assert_eq!(
|
||||
error,
|
||||
"received `initialized` notification before `initialize`"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_may_only_be_sent_once_per_connection() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
let _ = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect("first initialize should succeed");
|
||||
|
||||
let error = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect_err("duplicate initialize should fail");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"initialize may only be sent once per connection"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_rejects_invalid_auth_token() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, Some("secret-token".to_string()));
|
||||
|
||||
let error = handler
|
||||
.initialize(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: Some("wrong-token".to_string()),
|
||||
})
|
||||
.expect_err("invalid auth token should fail");
|
||||
|
||||
assert_eq!(error.code, -32001);
|
||||
assert_eq!(error.message, "invalid exec-server auth token");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_rejects_host_default_sandbox_mode() {
|
||||
let handler = initialized_handler().await;
|
||||
|
||||
let error = handler
|
||||
.exec(ExecParams {
|
||||
sandbox: Some(ExecSandboxConfig {
|
||||
mode: ExecSandboxMode::HostDefault,
|
||||
}),
|
||||
..exec_params("proc-1")
|
||||
})
|
||||
.await
|
||||
.expect_err("hostDefault sandbox should be rejected");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"sandbox mode `hostDefault` is not supported by exec-server yet"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exec_rejects_duplicate_process_ids() {
|
||||
let handler = initialized_handler().await;
|
||||
let first = handler
|
||||
.exec(exec_params("proc-1"))
|
||||
.await
|
||||
.expect("first exec should succeed");
|
||||
assert_eq!(first.process_id, "proc-1");
|
||||
|
||||
let error = handler
|
||||
.exec(exec_params("proc-1"))
|
||||
.await
|
||||
.expect_err("duplicate process id should fail");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "process proc-1 already exists");
|
||||
|
||||
handler.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_rejects_unknown_process_ids() {
|
||||
let handler = initialized_handler().await;
|
||||
|
||||
let error = handler
|
||||
.write(WriteParams {
|
||||
process_id: "missing".to_string(),
|
||||
chunk: b"input".to_vec().into(),
|
||||
})
|
||||
.await
|
||||
.expect_err("writing to an unknown process should fail");
|
||||
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "unknown process id missing");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminate_reports_missing_processes_as_not_running() {
|
||||
let handler = initialized_handler().await;
|
||||
|
||||
let response = handler
|
||||
.terminate(TerminateParams {
|
||||
process_id: "missing".to_string(),
|
||||
})
|
||||
.await
|
||||
.expect("terminate should succeed");
|
||||
|
||||
assert_eq!(response.running, false);
|
||||
}
|
||||
33
codex-rs/exec-server/src/server/jsonrpc.rs
Normal file
33
codex-rs/exec-server/src/server/jsonrpc.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
|
||||
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32600,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32602,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32603,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unauthorized(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32001,
|
||||
data: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
407
codex-rs/exec-server/src/server/processor.rs
Normal file
407
codex-rs/exec-server/src/server/processor.rs
Normal file
@@ -0,0 +1,407 @@
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::connection::CHANNEL_CAPACITY;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
use crate::protocol::EXEC_EXITED_METHOD;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
use crate::protocol::EXEC_WRITE_METHOD;
|
||||
use crate::protocol::FS_COPY_METHOD;
|
||||
use crate::protocol::FS_CREATE_DIRECTORY_METHOD;
|
||||
use crate::protocol::FS_GET_METADATA_METHOD;
|
||||
use crate::protocol::FS_READ_DIRECTORY_METHOD;
|
||||
use crate::protocol::FS_READ_FILE_METHOD;
|
||||
use crate::protocol::FS_REMOVE_METHOD;
|
||||
use crate::protocol::FS_WRITE_FILE_METHOD;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::server::ExecServerConfig;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::ExecServerServerNotification;
|
||||
use crate::server::internal_error;
|
||||
use crate::server::invalid_params;
|
||||
use crate::server::invalid_request;
|
||||
|
||||
pub(crate) async fn run_connection(connection: JsonRpcConnection, config: ExecServerConfig) {
|
||||
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
|
||||
let json_outgoing_tx_for_notifications = json_outgoing_tx.clone();
|
||||
let (notification_tx, mut notification_rx) =
|
||||
mpsc::channel::<ExecServerServerNotification>(CHANNEL_CAPACITY);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, config.auth_token);
|
||||
|
||||
let outbound_task = tokio::spawn(async move {
|
||||
while let Some(notification) = notification_rx.recv().await {
|
||||
let json_message = match notification_message(notification) {
|
||||
Ok(json_message) => json_message,
|
||||
Err(err) => {
|
||||
warn!("failed to serialize exec-server notification: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if json_outgoing_tx_for_notifications
|
||||
.send(json_message)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(event) = incoming_rx.recv().await {
|
||||
match event {
|
||||
JsonRpcConnectionEvent::Message(message) => {
|
||||
let maybe_response = match handle_connection_message(&mut handler, message).await {
|
||||
Ok(maybe_response) => maybe_response,
|
||||
Err(err) => {
|
||||
warn!("closing exec-server connection after protocol error: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if let Some(response) = maybe_response
|
||||
&& json_outgoing_tx.send(response).await.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
if let Some(reason) = reason {
|
||||
debug!("exec-server connection disconnected: {reason}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handler.shutdown().await;
|
||||
drop(handler);
|
||||
let _ = outbound_task.await;
|
||||
}
|
||||
|
||||
async fn handle_connection_message(
|
||||
handler: &mut ExecServerHandler,
|
||||
message: JSONRPCMessage,
|
||||
) -> Result<Option<JSONRPCMessage>, String> {
|
||||
match message {
|
||||
JSONRPCMessage::Request(request) => Ok(Some(dispatch_request(handler, request).await)),
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
handle_notification(handler, notification)?;
|
||||
Ok(None)
|
||||
}
|
||||
JSONRPCMessage::Response(response) => Err(format!(
|
||||
"unexpected client response for request id {:?}",
|
||||
response.id
|
||||
)),
|
||||
JSONRPCMessage::Error(error) => Err(format!(
|
||||
"unexpected client error for request id {:?}",
|
||||
error.id
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch_request(
|
||||
handler: &mut ExecServerHandler,
|
||||
request: JSONRPCRequest,
|
||||
) -> JSONRPCMessage {
|
||||
let JSONRPCRequest {
|
||||
id,
|
||||
method,
|
||||
params,
|
||||
trace: _,
|
||||
} = request;
|
||||
let params = params.unwrap_or(serde_json::Value::Null);
|
||||
|
||||
match method.as_str() {
|
||||
INITIALIZE_METHOD => request_response(
|
||||
id,
|
||||
parse_params(params).and_then(|params| handler.initialize(params)),
|
||||
),
|
||||
EXEC_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.exec(params)).await,
|
||||
),
|
||||
EXEC_READ_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.read(params)).await,
|
||||
),
|
||||
EXEC_WRITE_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.write(params)).await,
|
||||
),
|
||||
EXEC_TERMINATE_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.terminate(params)).await,
|
||||
),
|
||||
FS_READ_FILE_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_read_file(params)).await,
|
||||
),
|
||||
FS_WRITE_FILE_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_write_file(params)).await,
|
||||
),
|
||||
FS_CREATE_DIRECTORY_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_create_directory(params)).await,
|
||||
),
|
||||
FS_GET_METADATA_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_get_metadata(params)).await,
|
||||
),
|
||||
FS_READ_DIRECTORY_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_read_directory(params)).await,
|
||||
),
|
||||
FS_REMOVE_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_remove(params)).await,
|
||||
),
|
||||
FS_COPY_METHOD => request_response(
|
||||
id,
|
||||
dispatch_async_request(params, |params| handler.fs_copy(params)).await,
|
||||
),
|
||||
other => jsonrpc_error_response(id, invalid_request(format!("unknown method: {other}"))),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_notification(
|
||||
handler: &mut ExecServerHandler,
|
||||
notification: JSONRPCNotification,
|
||||
) -> Result<(), String> {
|
||||
match notification.method.as_str() {
|
||||
INITIALIZED_METHOD => handler.initialized(),
|
||||
other => Err(format!("unexpected notification method: {other}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_params<P>(params: serde_json::Value) -> Result<P, JSONRPCErrorError>
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
{
|
||||
serde_json::from_value(params).map_err(|err| invalid_params(err.to_string()))
|
||||
}
|
||||
|
||||
async fn dispatch_async_request<P, T, F, Fut>(
|
||||
params: serde_json::Value,
|
||||
f: F,
|
||||
) -> Result<T, JSONRPCErrorError>
|
||||
where
|
||||
P: DeserializeOwned,
|
||||
F: FnOnce(P) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T, JSONRPCErrorError>>,
|
||||
{
|
||||
match parse_params(params) {
|
||||
Ok(params) => f(params).await,
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn request_response<T>(
|
||||
request_id: RequestId,
|
||||
result: Result<T, JSONRPCErrorError>,
|
||||
) -> JSONRPCMessage
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
match result.and_then(serialize_response) {
|
||||
Ok(result) => JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request_id,
|
||||
result,
|
||||
}),
|
||||
Err(error) => JSONRPCMessage::Error(JSONRPCError {
|
||||
id: request_id,
|
||||
error,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_response<T>(response: T) -> Result<serde_json::Value, JSONRPCErrorError>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))
|
||||
}
|
||||
|
||||
fn jsonrpc_error_response(request_id: RequestId, error: JSONRPCErrorError) -> JSONRPCMessage {
|
||||
JSONRPCMessage::Error(JSONRPCError {
|
||||
id: request_id,
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
||||
fn notification_message(
|
||||
notification: ExecServerServerNotification,
|
||||
) -> Result<JSONRPCMessage, serde_json::Error> {
|
||||
match notification {
|
||||
ExecServerServerNotification::OutputDelta(params) => {
|
||||
typed_notification(EXEC_OUTPUT_DELTA_METHOD, params)
|
||||
}
|
||||
ExecServerServerNotification::Exited(params) => {
|
||||
typed_notification(EXEC_EXITED_METHOD, params)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn typed_notification<T>(method: &str, params: T) -> Result<JSONRPCMessage, serde_json::Error>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
Ok(JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: method.to_string(),
|
||||
params: Some(serde_json::to_value(params)?),
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::dispatch_request;
|
||||
use super::handle_connection_message;
|
||||
use super::notification_message;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::INITIALIZE_METHOD;
|
||||
use crate::protocol::INITIALIZED_METHOD;
|
||||
use crate::protocol::InitializeParams;
|
||||
use crate::protocol::PROTOCOL_VERSION;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::ExecServerServerNotification;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_initialize_returns_initialize_response() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let message = dispatch_request(
|
||||
&mut handler,
|
||||
JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: INITIALIZE_METHOD.to_string(),
|
||||
params: Some(
|
||||
serde_json::to_value(InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
auth_token: None,
|
||||
})
|
||||
.expect("serialize initialize params"),
|
||||
),
|
||||
trace: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
message,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: RequestId::Integer(1),
|
||||
result: serde_json::json!({
|
||||
"protocolVersion": PROTOCOL_VERSION,
|
||||
}),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_exec_returns_invalid_request_before_initialize() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let message = dispatch_request(
|
||||
&mut handler,
|
||||
JSONRPCRequest {
|
||||
id: RequestId::Integer(7),
|
||||
method: EXEC_METHOD.to_string(),
|
||||
params: Some(serde_json::json!({
|
||||
"processId": "proc-1",
|
||||
"argv": ["bash", "-lc", "true"],
|
||||
"cwd": std::env::current_dir().expect("cwd"),
|
||||
"env": HashMap::<String, String>::new(),
|
||||
"tty": true,
|
||||
"arg0": null,
|
||||
"sandbox": null,
|
||||
})),
|
||||
trace: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let JSONRPCMessage::Error(JSONRPCError { id, error }) = message else {
|
||||
panic!("expected invalid-request error");
|
||||
};
|
||||
assert_eq!(id, RequestId::Integer(7));
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(
|
||||
error.message,
|
||||
"client must call initialize before using exec methods"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialized_notification_before_initialize_is_protocol_error() {
|
||||
let (notification_tx, _notification_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut handler = ExecServerHandler::new(notification_tx, None);
|
||||
|
||||
let err = handle_connection_message(
|
||||
&mut handler,
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: INITIALIZED_METHOD.to_string(),
|
||||
params: Some(serde_json::json!({})),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect_err("expected early initialized to fail");
|
||||
|
||||
assert_eq!(
|
||||
err,
|
||||
"received `initialized` notification before `initialize`"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notification_message_serializes_process_output() {
|
||||
let message = notification_message(ExecServerServerNotification::OutputDelta(
|
||||
ExecOutputDeltaNotification {
|
||||
process_id: "proc-1".to_string(),
|
||||
stream: ExecOutputStream::Stdout,
|
||||
chunk: b"hello".to_vec().into(),
|
||||
},
|
||||
))
|
||||
.expect("serialize notification");
|
||||
|
||||
assert_eq!(
|
||||
message,
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
|
||||
params: Some(serde_json::json!({
|
||||
"processId": "proc-1",
|
||||
"stream": "stdout",
|
||||
"chunk": "aGVsbG8=",
|
||||
})),
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
161
codex-rs/exec-server/src/server/transport.rs
Normal file
161
codex-rs/exec-server/src/server/transport.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::server::ExecServerConfig;
|
||||
use crate::server::processor::run_connection;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum ExecServerTransport {
|
||||
WebSocket { bind_address: SocketAddr },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum ExecServerTransportParseError {
|
||||
UnsupportedListenUrl(String),
|
||||
InvalidWebSocketListenUrl(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ExecServerTransportParseError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!(
|
||||
f,
|
||||
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`"
|
||||
),
|
||||
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!(
|
||||
f,
|
||||
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ExecServerTransportParseError {}
|
||||
|
||||
impl ExecServerTransport {
|
||||
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
|
||||
|
||||
pub fn from_listen_url(listen_url: &str) -> Result<Self, ExecServerTransportParseError> {
|
||||
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
|
||||
let bind_address = socket_addr.parse::<SocketAddr>().map_err(|_| {
|
||||
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string())
|
||||
})?;
|
||||
return Ok(Self::WebSocket { bind_address });
|
||||
}
|
||||
|
||||
Err(ExecServerTransportParseError::UnsupportedListenUrl(
|
||||
listen_url.to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ExecServerTransport {
|
||||
fn default() -> Self {
|
||||
Self::WebSocket {
|
||||
bind_address: "127.0.0.1:0".parse().unwrap_or_else(|err| {
|
||||
panic!("default exec-server websocket bind address should parse: {err}")
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ExecServerTransport {
|
||||
type Err = ExecServerTransportParseError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Self::from_listen_url(s)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run_transport(
|
||||
transport: ExecServerTransport,
|
||||
config: ExecServerConfig,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
match transport {
|
||||
ExecServerTransport::WebSocket { bind_address } => {
|
||||
run_websocket_listener(bind_address, config).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_websocket_listener(
|
||||
bind_address: SocketAddr,
|
||||
config: ExecServerConfig,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let listener = TcpListener::bind(bind_address).await?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
print_websocket_startup_banner(local_addr);
|
||||
|
||||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await?;
|
||||
let config = config.clone();
|
||||
tokio::spawn(async move {
|
||||
match accept_async(stream).await {
|
||||
Ok(websocket) => {
|
||||
run_connection(
|
||||
JsonRpcConnection::from_websocket(
|
||||
websocket,
|
||||
format!("exec-server websocket {peer_addr}"),
|
||||
),
|
||||
config,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to accept exec-server websocket connection from {peer_addr}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::print_stderr)]
|
||||
fn print_websocket_startup_banner(addr: SocketAddr) {
|
||||
eprintln!("codex-exec-server listening on ws://{addr}");
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::ExecServerTransport;
|
||||
|
||||
#[test]
|
||||
fn exec_server_transport_parses_websocket_listen_url() {
|
||||
let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234")
|
||||
.expect("websocket listen URL should parse");
|
||||
assert_eq!(
|
||||
transport,
|
||||
ExecServerTransport::WebSocket {
|
||||
bind_address: "127.0.0.1:1234".parse().expect("valid socket address"),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_server_transport_rejects_invalid_websocket_listen_url() {
|
||||
let err = ExecServerTransport::from_listen_url("ws://localhost:1234")
|
||||
.expect_err("hostname bind address should be rejected");
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_server_transport_rejects_unsupported_listen_url() {
|
||||
let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234")
|
||||
.expect_err("unsupported scheme should fail");
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user