mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
1. Use requirement-resolved config.features as the plugin gate. 2. Guard plugin/list, plugin/read, and related flows behind that gate. 3. Skip bad marketplace.json files instead of failing the whole list. 4. Simplify plugin state and caching.
1575 lines
60 KiB
Rust
1575 lines
60 KiB
Rust
//! Shared in-process app-server client facade for CLI surfaces.
|
|
//!
|
|
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
|
|
//! used by surfaces like TUI and exec. It centralizes:
|
|
//!
|
|
//! - Runtime startup and initialize-capabilities handshake.
|
|
//! - Typed caller-provided startup identity (`SessionSource` + client name).
|
|
//! - Typed and raw request/notification dispatch.
|
|
//! - Server request resolution and rejection.
|
|
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
|
|
//! - Bounded graceful shutdown with abort fallback.
|
|
//!
|
|
//! The facade interposes a worker task between the caller and the underlying
|
|
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
|
|
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
|
|
//! surfaces as channel-full errors rather than unbounded memory growth.
|
|
|
|
mod remote;
|
|
|
|
use std::error::Error;
|
|
use std::fmt;
|
|
use std::io::Error as IoError;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
|
pub use codex_app_server::in_process::InProcessServerEvent;
|
|
use codex_app_server::in_process::InProcessStartArgs;
|
|
use codex_app_server_protocol::ClientInfo;
|
|
use codex_app_server_protocol::ClientNotification;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::ConfigWarningNotification;
|
|
use codex_app_server_protocol::InitializeCapabilities;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::JSONRPCNotification;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::Result as JsonRpcResult;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_arg0::Arg0DispatchPaths;
|
|
use codex_core::AuthManager;
|
|
use codex_core::ThreadManager;
|
|
use codex_core::config::Config;
|
|
use codex_core::config_loader::CloudRequirementsLoader;
|
|
use codex_core::config_loader::LoaderOverrides;
|
|
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
|
use codex_feedback::CodexFeedback;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use serde::de::DeserializeOwned;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use toml::Value as TomlValue;
|
|
use tracing::warn;
|
|
|
|
pub use crate::remote::RemoteAppServerClient;
|
|
pub use crate::remote::RemoteAppServerConnectArgs;
|
|
|
|
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
|
|
|
/// Raw app-server request result for typed in-process requests.
|
|
///
|
|
/// Even on the in-process path, successful responses still travel back through
|
|
/// the same JSON-RPC result envelope used by socket/stdio transports because
|
|
/// `MessageProcessor` continues to produce that shape internally.
|
|
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum AppServerEvent {
|
|
Lagged { skipped: usize },
|
|
ServerNotification(ServerNotification),
|
|
LegacyNotification(JSONRPCNotification),
|
|
ServerRequest(ServerRequest),
|
|
Disconnected { message: String },
|
|
}
|
|
|
|
impl From<InProcessServerEvent> for AppServerEvent {
|
|
fn from(value: InProcessServerEvent) -> Self {
|
|
match value {
|
|
InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped },
|
|
InProcessServerEvent::ServerNotification(notification) => {
|
|
Self::ServerNotification(notification)
|
|
}
|
|
InProcessServerEvent::LegacyNotification(notification) => {
|
|
Self::LegacyNotification(notification)
|
|
}
|
|
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
|
|
// These terminal events drive surface shutdown/completion state. Dropping
|
|
// them under backpressure can leave exec/TUI waiting forever even though
|
|
// the underlying turn has already ended.
|
|
match event {
|
|
InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
|
|
) => true,
|
|
InProcessServerEvent::LegacyNotification(notification) => matches!(
|
|
notification
|
|
.method
|
|
.strip_prefix("codex/event/")
|
|
.unwrap_or(¬ification.method),
|
|
"task_complete" | "turn_aborted" | "shutdown_complete"
|
|
),
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
/// Layered error for [`InProcessAppServerClient::request_typed`].
|
|
///
|
|
/// This keeps transport failures, server-side JSON-RPC failures, and response
|
|
/// decode failures distinct so callers can decide whether to retry, surface a
|
|
/// server error, or treat the response as an internal request/response mismatch.
|
|
#[derive(Debug)]
|
|
pub enum TypedRequestError {
|
|
Transport {
|
|
method: String,
|
|
source: IoError,
|
|
},
|
|
Server {
|
|
method: String,
|
|
source: JSONRPCErrorError,
|
|
},
|
|
Deserialize {
|
|
method: String,
|
|
source: serde_json::Error,
|
|
},
|
|
}
|
|
|
|
impl fmt::Display for TypedRequestError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
Self::Transport { method, source } => {
|
|
write!(f, "{method} transport error: {source}")
|
|
}
|
|
Self::Server { method, source } => {
|
|
write!(f, "{method} failed: {}", source.message)
|
|
}
|
|
Self::Deserialize { method, source } => {
|
|
write!(f, "{method} response decode error: {source}")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Error for TypedRequestError {
|
|
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
|
match self {
|
|
Self::Transport { source, .. } => Some(source),
|
|
Self::Server { .. } => None,
|
|
Self::Deserialize { source, .. } => Some(source),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct SharedCoreManagers {
|
|
// Temporary bootstrap escape hatch for embedders that still need direct
|
|
// core handles during the in-process app-server migration. Once TUI/exec
|
|
// stop depending on direct manager access, remove this wrapper and keep
|
|
// manager ownership entirely inside the app-server runtime.
|
|
auth_manager: Arc<AuthManager>,
|
|
thread_manager: Arc<ThreadManager>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessClientStartArgs {
|
|
/// Resolved argv0 dispatch paths used by command execution internals.
|
|
pub arg0_paths: Arg0DispatchPaths,
|
|
/// Shared config used to initialize app-server runtime.
|
|
pub config: Arc<Config>,
|
|
/// CLI config overrides that are already parsed into TOML values.
|
|
pub cli_overrides: Vec<(String, TomlValue)>,
|
|
/// Loader override knobs used by config API paths.
|
|
pub loader_overrides: LoaderOverrides,
|
|
/// Preloaded cloud requirements provider.
|
|
pub cloud_requirements: CloudRequirementsLoader,
|
|
/// Feedback sink used by app-server/core telemetry and logs.
|
|
pub feedback: CodexFeedback,
|
|
/// Startup warnings emitted after initialize succeeds.
|
|
pub config_warnings: Vec<ConfigWarningNotification>,
|
|
/// Session source recorded in app-server thread metadata.
|
|
pub session_source: SessionSource,
|
|
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
|
pub enable_codex_api_key_env: bool,
|
|
/// Client name reported during initialize.
|
|
pub client_name: String,
|
|
/// Client version reported during initialize.
|
|
pub client_version: String,
|
|
/// Whether experimental APIs are requested at initialize time.
|
|
pub experimental_api: bool,
|
|
/// Notification methods this client opts out of receiving.
|
|
pub opt_out_notification_methods: Vec<String>,
|
|
/// Queue capacity for command/event channels (clamped to at least 1).
|
|
pub channel_capacity: usize,
|
|
}
|
|
|
|
impl InProcessClientStartArgs {
|
|
fn shared_core_managers(&self) -> SharedCoreManagers {
|
|
let auth_manager = AuthManager::shared(
|
|
self.config.codex_home.clone(),
|
|
self.enable_codex_api_key_env,
|
|
self.config.cli_auth_credentials_store_mode,
|
|
);
|
|
let thread_manager = Arc::new(ThreadManager::new(
|
|
self.config.as_ref(),
|
|
auth_manager.clone(),
|
|
self.session_source.clone(),
|
|
CollaborationModesConfig {
|
|
default_mode_request_user_input: self
|
|
.config
|
|
.features
|
|
.enabled(codex_core::features::Feature::DefaultModeRequestUserInput),
|
|
},
|
|
));
|
|
|
|
SharedCoreManagers {
|
|
auth_manager,
|
|
thread_manager,
|
|
}
|
|
}
|
|
|
|
/// Builds initialize params from caller-provided metadata.
|
|
pub fn initialize_params(&self) -> InitializeParams {
|
|
let capabilities = InitializeCapabilities {
|
|
experimental_api: self.experimental_api,
|
|
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
|
None
|
|
} else {
|
|
Some(self.opt_out_notification_methods.clone())
|
|
},
|
|
};
|
|
|
|
InitializeParams {
|
|
client_info: ClientInfo {
|
|
name: self.client_name.clone(),
|
|
title: None,
|
|
version: self.client_version.clone(),
|
|
},
|
|
capabilities: Some(capabilities),
|
|
}
|
|
}
|
|
|
|
fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs {
|
|
let initialize = self.initialize_params();
|
|
InProcessStartArgs {
|
|
arg0_paths: self.arg0_paths,
|
|
config: self.config,
|
|
cli_overrides: self.cli_overrides,
|
|
loader_overrides: self.loader_overrides,
|
|
cloud_requirements: self.cloud_requirements,
|
|
auth_manager: Some(shared_core.auth_manager.clone()),
|
|
thread_manager: Some(shared_core.thread_manager.clone()),
|
|
feedback: self.feedback,
|
|
config_warnings: self.config_warnings,
|
|
session_source: self.session_source,
|
|
enable_codex_api_key_env: self.enable_codex_api_key_env,
|
|
initialize,
|
|
channel_capacity: self.channel_capacity,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Internal command sent from public facade methods to the worker task.
|
|
///
|
|
/// Each variant carries a oneshot sender so the caller can `await` the
|
|
/// result without holding a mutable reference to the client.
|
|
enum ClientCommand {
|
|
Request {
|
|
request: Box<ClientRequest>,
|
|
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
|
},
|
|
Notify {
|
|
notification: ClientNotification,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
ResolveServerRequest {
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
RejectServerRequest {
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
Shutdown {
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
}
|
|
|
|
/// Async facade over the in-process app-server runtime.
|
|
///
|
|
/// This type owns a worker task that bridges between:
|
|
/// - caller-facing async `mpsc` channels used by TUI/exec
|
|
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
|
|
/// the embedded `MessageProcessor`
|
|
///
|
|
/// The facade intentionally preserves the server's request/notification/event
|
|
/// model instead of exposing direct core runtime handles. That keeps in-process
|
|
/// callers aligned with app-server behavior while still avoiding a process
|
|
/// boundary.
|
|
pub struct InProcessAppServerClient {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
|
worker_handle: tokio::task::JoinHandle<()>,
|
|
auth_manager: Arc<AuthManager>,
|
|
thread_manager: Arc<ThreadManager>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessAppServerRequestHandle {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum AppServerRequestHandle {
|
|
InProcess(InProcessAppServerRequestHandle),
|
|
Remote(crate::remote::RemoteAppServerRequestHandle),
|
|
}
|
|
|
|
pub enum AppServerClient {
|
|
InProcess(InProcessAppServerClient),
|
|
Remote(RemoteAppServerClient),
|
|
}
|
|
|
|
impl InProcessAppServerClient {
|
|
/// Starts the in-process runtime and facade worker task.
|
|
///
|
|
/// The returned client is ready for requests and event consumption. If the
|
|
/// internal event queue is saturated later, server requests are rejected
|
|
/// with overload error instead of being silently dropped.
|
|
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
|
let channel_capacity = args.channel_capacity.max(1);
|
|
let shared_core = args.shared_core_managers();
|
|
let mut handle =
|
|
codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?;
|
|
let request_sender = handle.sender();
|
|
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
|
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
|
|
|
let worker_handle = tokio::spawn(async move {
|
|
let mut event_stream_enabled = true;
|
|
let mut skipped_events = 0usize;
|
|
loop {
|
|
tokio::select! {
|
|
command = command_rx.recv() => {
|
|
match command {
|
|
Some(ClientCommand::Request { request, response_tx }) => {
|
|
let request_sender = request_sender.clone();
|
|
// Request waits happen on a detached task so
|
|
// this loop can keep draining runtime events
|
|
// while the request is blocked on client input.
|
|
tokio::spawn(async move {
|
|
let result = request_sender.request(*request).await;
|
|
let _ = response_tx.send(result);
|
|
});
|
|
}
|
|
Some(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
}) => {
|
|
let result = request_sender.notify(notification);
|
|
let _ = response_tx.send(result);
|
|
}
|
|
Some(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
}) => {
|
|
let send_result =
|
|
request_sender.respond_to_server_request(request_id, result);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
}) => {
|
|
let send_result = request_sender.fail_server_request(request_id, error);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::Shutdown { response_tx }) => {
|
|
let shutdown_result = handle.shutdown().await;
|
|
let _ = response_tx.send(shutdown_result);
|
|
break;
|
|
}
|
|
None => {
|
|
let _ = handle.shutdown().await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
event = handle.next_event(), if event_stream_enabled => {
|
|
let Some(event) = event else {
|
|
break;
|
|
};
|
|
|
|
if skipped_events > 0 {
|
|
if event_requires_delivery(&event) {
|
|
// Surface lag before the terminal event, but
|
|
// do not let the lag marker itself cause us to
|
|
// drop the completion/abort notification that
|
|
// the caller is blocked on.
|
|
if event_tx
|
|
.send(InProcessServerEvent::Lagged {
|
|
skipped: skipped_events,
|
|
})
|
|
.await
|
|
.is_err()
|
|
{
|
|
event_stream_enabled = false;
|
|
continue;
|
|
}
|
|
skipped_events = 0;
|
|
} else {
|
|
match event_tx.try_send(InProcessServerEvent::Lagged {
|
|
skipped: skipped_events,
|
|
}) {
|
|
Ok(()) => {
|
|
skipped_events = 0;
|
|
}
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
skipped_events = skipped_events.saturating_add(1);
|
|
warn!(
|
|
"dropping in-process app-server event because consumer queue is full"
|
|
);
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
let _ = request_sender.fail_server_request(
|
|
request.id().clone(),
|
|
JSONRPCErrorError {
|
|
code: -32001,
|
|
message: "in-process app-server event queue is full".to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
}
|
|
continue;
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
event_stream_enabled = false;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if event_requires_delivery(&event) {
|
|
// Block until the consumer catches up for
|
|
// terminal notifications; this preserves the
|
|
// completion signal even when the queue is
|
|
// otherwise saturated.
|
|
if event_tx.send(event).await.is_err() {
|
|
event_stream_enabled = false;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
match event_tx.try_send(event) {
|
|
Ok(()) => {}
|
|
Err(mpsc::error::TrySendError::Full(event)) => {
|
|
skipped_events = skipped_events.saturating_add(1);
|
|
warn!("dropping in-process app-server event because consumer queue is full");
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
let _ = request_sender.fail_server_request(
|
|
request.id().clone(),
|
|
JSONRPCErrorError {
|
|
code: -32001,
|
|
message: "in-process app-server event queue is full".to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
}
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
event_stream_enabled = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
auth_manager: shared_core.auth_manager,
|
|
thread_manager: shared_core.thread_manager,
|
|
})
|
|
}
|
|
|
|
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
|
|
pub fn auth_manager(&self) -> Arc<AuthManager> {
|
|
self.auth_manager.clone()
|
|
}
|
|
|
|
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
|
|
pub fn thread_manager(&self) -> Arc<ThreadManager> {
|
|
self.thread_manager.clone()
|
|
}
|
|
|
|
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
|
|
InProcessAppServerRequestHandle {
|
|
command_tx: self.command_tx.clone(),
|
|
}
|
|
}
|
|
|
|
/// Sends a typed client request and returns raw JSON-RPC result.
|
|
///
|
|
/// Callers that expect a concrete response type should usually prefer
|
|
/// [`request_typed`](Self::request_typed).
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Sends a typed client request and decodes the successful response body.
|
|
///
|
|
/// This still deserializes from a JSON value produced by app-server's
|
|
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
|
|
/// failures indicate an internal request/response mismatch at the call site
|
|
/// (or an in-process bug), not transport skew from an external client.
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
|
|
/// Sends a typed client notification.
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server notify channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Resolves a pending server request.
|
|
///
|
|
/// This should only be called with request IDs obtained from the current
|
|
/// client's event stream.
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server resolve channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Rejects a pending server request with JSON-RPC error payload.
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server reject channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Returns the next in-process event, or `None` when worker exits.
|
|
///
|
|
/// Callers are expected to drain this stream promptly. If they fall behind,
|
|
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
|
|
/// pending server requests rather than letting approval flows hang.
|
|
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
|
self.event_rx.recv().await
|
|
}
|
|
|
|
/// Shuts down worker and in-process runtime with bounded wait.
|
|
///
|
|
/// If graceful shutdown exceeds timeout, the worker task is aborted to
|
|
/// avoid leaking background tasks in embedding callers.
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
let Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
auth_manager: _,
|
|
thread_manager: _,
|
|
} = self;
|
|
let mut worker_handle = worker_handle;
|
|
// Drop the caller-facing receiver before asking the worker to shut
|
|
// down. That unblocks any pending must-deliver `event_tx.send(..)`
|
|
// so the worker can reach `handle.shutdown()` instead of timing out
|
|
// and getting aborted with the runtime still attached.
|
|
drop(event_rx);
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
if command_tx
|
|
.send(ClientCommand::Shutdown { response_tx })
|
|
.await
|
|
.is_ok()
|
|
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
|
{
|
|
command_result.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server shutdown channel is closed",
|
|
)
|
|
})??;
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
|
worker_handle.abort();
|
|
let _ = worker_handle.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl InProcessAppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
}
|
|
|
|
impl AppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(handle) => handle.request(request).await,
|
|
Self::Remote(handle) => handle.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(handle) => handle.request_typed(request).await,
|
|
Self::Remote(handle) => handle.request_typed(request).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AppServerClient {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(client) => client.request(request).await,
|
|
Self::Remote(client) => client.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(client) => client.request_typed(request).await,
|
|
Self::Remote(client) => client.request_typed(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.notify(notification).await,
|
|
Self::Remote(client) => client.notify(notification).await,
|
|
}
|
|
}
|
|
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.resolve_server_request(request_id, result).await,
|
|
Self::Remote(client) => client.resolve_server_request(request_id, result).await,
|
|
}
|
|
}
|
|
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.reject_server_request(request_id, error).await,
|
|
Self::Remote(client) => client.reject_server_request(request_id, error).await,
|
|
}
|
|
}
|
|
|
|
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
|
match self {
|
|
Self::InProcess(client) => client.next_event().await.map(Into::into),
|
|
Self::Remote(client) => client.next_event().await,
|
|
}
|
|
}
|
|
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.shutdown().await,
|
|
Self::Remote(client) => client.shutdown().await,
|
|
}
|
|
}
|
|
|
|
pub fn request_handle(&self) -> AppServerRequestHandle {
|
|
match self {
|
|
Self::InProcess(client) => AppServerRequestHandle::InProcess(client.request_handle()),
|
|
Self::Remote(client) => AppServerRequestHandle::Remote(client.request_handle()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Extracts the JSON-RPC method name for diagnostics without extending the
|
|
/// protocol crate with in-process-only helpers.
|
|
pub(crate) fn request_method_name(request: &ClientRequest) -> String {
|
|
serde_json::to_value(request)
|
|
.ok()
|
|
.and_then(|value| {
|
|
value
|
|
.get("method")
|
|
.and_then(serde_json::Value::as_str)
|
|
.map(ToOwned::to_owned)
|
|
})
|
|
.unwrap_or_else(|| "<unknown>".to_string())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_app_server_protocol::AccountUpdatedNotification;
|
|
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
|
use codex_app_server_protocol::GetAccountResponse;
|
|
use codex_app_server_protocol::JSONRPCMessage;
|
|
use codex_app_server_protocol::JSONRPCRequest;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::ToolRequestUserInputParams;
|
|
use codex_app_server_protocol::ToolRequestUserInputQuestion;
|
|
use codex_core::AuthManager;
|
|
use codex_core::ThreadManager;
|
|
use codex_core::config::ConfigBuilder;
|
|
use futures::SinkExt;
|
|
use futures::StreamExt;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::net::TcpListener;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::accept_async;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
|
|
async fn build_test_config() -> Config {
|
|
match ConfigBuilder::default().build().await {
|
|
Ok(config) => config,
|
|
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
|
.expect("default config should load"),
|
|
}
|
|
}
|
|
|
|
async fn start_test_client_with_capacity(
|
|
session_source: SessionSource,
|
|
channel_capacity: usize,
|
|
) -> InProcessAppServerClient {
|
|
InProcessAppServerClient::start(InProcessClientStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: Arc::new(build_test_config().await),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
feedback: CodexFeedback::new(),
|
|
config_warnings: Vec::new(),
|
|
session_source,
|
|
enable_codex_api_key_env: false,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity,
|
|
})
|
|
.await
|
|
.expect("in-process app-server client should start")
|
|
}
|
|
|
|
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
|
|
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
|
}
|
|
|
|
async fn start_test_remote_server<F, Fut>(handler: F) -> String
|
|
where
|
|
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
|
|
+ Send
|
|
+ 'static,
|
|
Fut: std::future::Future<Output = ()> + Send + 'static,
|
|
{
|
|
let listener = TcpListener::bind("127.0.0.1:0")
|
|
.await
|
|
.expect("listener should bind");
|
|
let addr = listener.local_addr().expect("listener address");
|
|
tokio::spawn(async move {
|
|
let (stream, _) = listener.accept().await.expect("accept should succeed");
|
|
let websocket = accept_async(stream)
|
|
.await
|
|
.expect("websocket upgrade should succeed");
|
|
handler(websocket).await;
|
|
});
|
|
format!("ws://{addr}")
|
|
}
|
|
|
|
async fn expect_remote_initialize(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(websocket).await else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
write_websocket_message(
|
|
websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) = read_websocket_message(websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
}
|
|
|
|
async fn read_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) -> JSONRPCMessage {
|
|
loop {
|
|
let frame = websocket
|
|
.next()
|
|
.await
|
|
.expect("frame should be available")
|
|
.expect("frame should decode");
|
|
match frame {
|
|
Message::Text(text) => {
|
|
return serde_json::from_str::<JSONRPCMessage>(&text)
|
|
.expect("text frame should be valid JSON-RPC");
|
|
}
|
|
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
|
|
continue;
|
|
}
|
|
Message::Close(_) => panic!("unexpected close frame"),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn write_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
message: JSONRPCMessage,
|
|
) {
|
|
websocket
|
|
.send(Message::Text(
|
|
serde_json::to_string(&message)
|
|
.expect("message should serialize")
|
|
.into(),
|
|
))
|
|
.await
|
|
.expect("message should send");
|
|
}
|
|
|
|
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
|
|
RemoteAppServerConnectArgs {
|
|
websocket_url,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: 8,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_roundtrip_works() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_reports_json_rpc_errors() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let err = client
|
|
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
|
|
request_id: RequestId::Integer(99),
|
|
params: codex_app_server_protocol::ThreadReadParams {
|
|
thread_id: "missing-thread".to_string(),
|
|
include_turns: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("missing thread should return a JSON-RPC error");
|
|
assert!(
|
|
err.to_string().starts_with("thread/read failed:"),
|
|
"expected method-qualified JSON-RPC failure message"
|
|
);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn caller_provided_session_source_is_applied() {
|
|
for (session_source, expected_source) in [
|
|
(SessionSource::Exec, ApiSessionSource::Exec),
|
|
(SessionSource::Cli, ApiSessionSource::Cli),
|
|
(
|
|
SessionSource::Custom("atlas".to_string()),
|
|
ApiSessionSource::Custom("atlas".to_string()),
|
|
),
|
|
] {
|
|
let client = start_test_client(session_source).await;
|
|
let parsed: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(2),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
assert_eq!(parsed.thread.source, expected_source);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shared_thread_manager_tracks_threads_started_via_app_server() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
let response: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(3),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id)
|
|
.expect("thread id should parse");
|
|
timeout(
|
|
Duration::from_secs(2),
|
|
client.thread_manager().get_thread(created_thread_id),
|
|
)
|
|
.await
|
|
.expect("timed out waiting for retained thread manager to observe started thread")
|
|
.expect("started thread should be visible through the shared thread manager");
|
|
let thread_ids = client.thread_manager().list_thread_ids().await;
|
|
assert!(thread_ids.contains(&created_thread_id));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
|
|
let client = start_test_client_with_capacity(SessionSource::Exec, 1).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_typed_request_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let response: GetAccountResponse = client
|
|
.request_typed(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
assert_eq!(response.account, None);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_duplicate_request_id_keeps_original_waiter() {
|
|
let (first_request_seen_tx, first_request_seen_rx) = tokio::sync::oneshot::channel();
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
first_request_seen_tx
|
|
.send(request.id.clone())
|
|
.expect("request id should send");
|
|
assert!(
|
|
timeout(
|
|
Duration::from_millis(100),
|
|
read_websocket_message(&mut websocket)
|
|
)
|
|
.await
|
|
.is_err(),
|
|
"duplicate request should not be forwarded to the server"
|
|
);
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
let _ = websocket.next().await;
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
let first_request_handle = client.request_handle();
|
|
let second_request_handle = first_request_handle.clone();
|
|
|
|
let first_request = tokio::spawn(async move {
|
|
first_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
});
|
|
|
|
let first_request_id = first_request_seen_rx
|
|
.await
|
|
.expect("server should observe the first request");
|
|
assert_eq!(first_request_id, RequestId::Integer(1));
|
|
|
|
let second_err = second_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("duplicate request id should be rejected");
|
|
assert_eq!(
|
|
second_err.to_string(),
|
|
"account/read transport error: duplicate remote app-server request id `1`"
|
|
);
|
|
|
|
let first_response = first_request
|
|
.await
|
|
.expect("first request task should join")
|
|
.expect("first request should succeed");
|
|
assert_eq!(
|
|
first_response,
|
|
GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
}
|
|
);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_notifications_arrive_over_websocket() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Notification(
|
|
serde_json::from_value(
|
|
serde_json::to_value(ServerNotification::AccountUpdated(
|
|
AccountUpdatedNotification {
|
|
auth_mode: None,
|
|
plan_type: None,
|
|
},
|
|
))
|
|
.expect("notification should serialize"),
|
|
)
|
|
.expect("notification should convert to JSON-RPC"),
|
|
),
|
|
)
|
|
.await;
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client.next_event().await.expect("event should arrive");
|
|
assert!(matches!(
|
|
event,
|
|
AppServerEvent::ServerNotification(ServerNotification::AccountUpdated(_))
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_resolution_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-1".to_string());
|
|
let server_request = JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
};
|
|
write_websocket_message(&mut websocket, JSONRPCMessage::Request(server_request)).await;
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_received_during_initialize_is_delivered() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
|
|
let request_id = RequestId::String("srv-init".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) =
|
|
read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_unknown_server_request_is_rejected() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-unknown".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "thread/unknown".to_string(),
|
|
params: None,
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Error(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected JSON-RPC error response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
assert_eq!(response.error.code, -32601);
|
|
assert_eq!(
|
|
response.error.message,
|
|
"unsupported remote app-server request `thread/unknown`"
|
|
);
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_disconnect_surfaces_as_event() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client
|
|
.next_event()
|
|
.await
|
|
.expect("disconnect event should arrive");
|
|
assert!(matches!(event, AppServerEvent::Disconnected { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn typed_request_error_exposes_sources() {
|
|
let transport = TypedRequestError::Transport {
|
|
method: "config/read".to_string(),
|
|
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&transport).is_some(), true);
|
|
|
|
let server = TypedRequestError::Server {
|
|
method: "thread/read".to_string(),
|
|
source: JSONRPCErrorError {
|
|
code: -32603,
|
|
data: None,
|
|
message: "internal".to_string(),
|
|
},
|
|
};
|
|
assert_eq!(std::error::Error::source(&server).is_some(), false);
|
|
|
|
let deserialize = TypedRequestError::Deserialize {
|
|
method: "thread/start".to_string(),
|
|
source: serde_json::from_str::<u32>("\"nope\"")
|
|
.expect_err("invalid integer should return deserialize error"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_event_surfaces_lagged_markers() {
|
|
let (command_tx, _command_rx) = mpsc::channel(1);
|
|
let (event_tx, event_rx) = mpsc::channel(1);
|
|
let worker_handle = tokio::spawn(async {});
|
|
let config = build_test_config().await;
|
|
let auth_manager = AuthManager::shared(
|
|
config.codex_home.clone(),
|
|
false,
|
|
config.cli_auth_credentials_store_mode,
|
|
);
|
|
let thread_manager = Arc::new(ThreadManager::new(
|
|
&config,
|
|
auth_manager.clone(),
|
|
SessionSource::Exec,
|
|
CollaborationModesConfig {
|
|
default_mode_request_user_input: config
|
|
.features
|
|
.enabled(codex_core::features::Feature::DefaultModeRequestUserInput),
|
|
},
|
|
));
|
|
event_tx
|
|
.send(InProcessServerEvent::Lagged { skipped: 3 })
|
|
.await
|
|
.expect("lagged marker should enqueue");
|
|
drop(event_tx);
|
|
|
|
let mut client = InProcessAppServerClient {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
auth_manager,
|
|
thread_manager,
|
|
};
|
|
|
|
let event = timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("lagged marker should arrive before timeout");
|
|
assert!(matches!(
|
|
event,
|
|
Some(InProcessServerEvent::Lagged { skipped: 3 })
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[test]
|
|
fn event_requires_delivery_marks_terminal_events() {
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::TurnCompleted(
|
|
codex_app_server_protocol::TurnCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn: codex_app_server_protocol::Turn {
|
|
id: "turn".to_string(),
|
|
items: Vec::new(),
|
|
status: codex_app_server_protocol::TurnStatus::Completed,
|
|
error: None,
|
|
},
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::LegacyNotification(
|
|
codex_app_server_protocol::JSONRPCNotification {
|
|
method: "codex/event/turn_aborted".to_string(),
|
|
params: None,
|
|
}
|
|
)
|
|
));
|
|
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
|
|
skipped: 1
|
|
}));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn accessors_expose_retained_shared_managers() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
assert!(
|
|
Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()),
|
|
"auth_manager accessor should clone the retained shared manager"
|
|
);
|
|
assert!(
|
|
Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()),
|
|
"thread_manager accessor should clone the retained shared manager"
|
|
);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_completes_promptly_with_retained_shared_managers() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
timeout(Duration::from_secs(1), client.shutdown())
|
|
.await
|
|
.expect("shutdown should not wait for the 5s fallback timeout")
|
|
.expect("shutdown should complete");
|
|
}
|
|
}
|