mirror of
https://github.com/openai/codex.git
synced 2026-04-30 03:12:20 +03:00
Add in-process app server and wire up exec to use it (#14005)
This is a subset of PR #13636. See that PR for a full overview of the architectural change. This PR implements the in-process app server and modifies the non-interactive "exec" entry point to use the app server. --------- Co-authored-by: Felipe Coury <felipe.coury@gmail.com>
This commit is contained in:
884
codex-rs/app-server/src/in_process.rs
Normal file
884
codex-rs/app-server/src/in_process.rs
Normal file
@@ -0,0 +1,884 @@
|
||||
//! In-process app-server runtime host for local embedders.
|
||||
//!
|
||||
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
|
||||
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
|
||||
//! channels. The intent is to preserve app-server semantics while avoiding a
|
||||
//! process boundary for CLI surfaces that run in the same process.
|
||||
//!
|
||||
//! # Lifecycle
|
||||
//!
|
||||
//! 1. Construct runtime state with [`InProcessStartArgs`].
|
||||
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
|
||||
//! internally and returns a ready-to-use [`InProcessClientHandle`].
|
||||
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
|
||||
//! [`InProcessClientHandle::notify`], and consume events via
|
||||
//! [`InProcessClientHandle::next_event`].
|
||||
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
|
||||
//!
|
||||
//! # Transport model
|
||||
//!
|
||||
//! The runtime is transport-local but not protocol-free. Incoming requests are
|
||||
//! typed [`ClientRequest`] values, yet responses still come back through the
|
||||
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
|
||||
//! websocket transports. This keeps in-process behavior aligned with
|
||||
//! app-server rather than creating a second execution contract.
|
||||
//!
|
||||
//! # Backpressure
|
||||
//!
|
||||
//! Command submission uses `try_send` and can return `WouldBlock`, while event
|
||||
//! fanout may drop notifications under saturation. Server requests are never
|
||||
//! silently abandoned: if they cannot be queued they are failed back into
|
||||
//! `MessageProcessor` with overload or internal errors so approval flows do
|
||||
//! not hang indefinitely.
|
||||
//!
|
||||
//! # Relationship to `codex-app-server-client`
|
||||
//!
|
||||
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
|
||||
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
|
||||
//! which wraps this module behind a worker task with async request/response
|
||||
//! helpers, surface-specific startup policy, and bounded shutdown.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::error_code::OVERLOADED_ERROR_CODE;
|
||||
use crate::message_processor::ConnectionSessionState;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::CHANNEL_CAPACITY;
|
||||
use crate::transport::OutboundConnectionState;
|
||||
use crate::transport::route_outgoing_envelope;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Default bounded channel capacity for in-process runtime queues.
|
||||
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
|
||||
|
||||
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
|
||||
|
||||
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
||||
matches!(notification, ServerNotification::TurnCompleted(_))
|
||||
}
|
||||
|
||||
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
|
||||
matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
)
|
||||
}
|
||||
|
||||
/// Input needed to start an in-process app-server runtime.
|
||||
///
|
||||
/// These fields mirror the pieces of ambient process state that stdio and
|
||||
/// websocket transports normally assemble before `MessageProcessor` starts.
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessStartArgs {
|
||||
/// Resolved argv0 dispatch paths used by command execution internals.
|
||||
pub arg0_paths: Arg0DispatchPaths,
|
||||
/// Shared base config used to initialize core components.
|
||||
pub config: Arc<Config>,
|
||||
/// CLI config overrides that are already parsed into TOML values.
|
||||
pub cli_overrides: Vec<(String, TomlValue)>,
|
||||
/// Loader override knobs used by config API paths.
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// Startup warnings emitted after initialize succeeds.
|
||||
pub config_warnings: Vec<ConfigWarningNotification>,
|
||||
/// Session source stamped into thread/session metadata.
|
||||
pub session_source: SessionSource,
|
||||
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
||||
pub enable_codex_api_key_env: bool,
|
||||
/// Initialize params used for initial handshake.
|
||||
pub initialize: InitializeParams,
|
||||
/// Capacity used for all runtime queues (clamped to at least 1).
|
||||
pub channel_capacity: usize,
|
||||
}
|
||||
|
||||
/// Event emitted from the app-server to the in-process client.
|
||||
///
|
||||
/// The stream carries three event families because CLI surfaces are mid-migration
|
||||
/// from the legacy `codex_protocol::Event` model to the typed app-server
|
||||
/// notification model. Once all surfaces consume only [`ServerNotification`],
|
||||
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
|
||||
///
|
||||
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
|
||||
/// event — it signals that the consumer fell behind and some events were dropped.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InProcessServerEvent {
|
||||
/// Server request that requires client response/rejection.
|
||||
ServerRequest(ServerRequest),
|
||||
/// App-server notification directed to the embedded client.
|
||||
ServerNotification(ServerNotification),
|
||||
/// Legacy JSON-RPC notification from core event bridge.
|
||||
LegacyNotification(JSONRPCNotification),
|
||||
/// Indicates one or more events were dropped due to backpressure.
|
||||
Lagged { skipped: usize },
|
||||
}
|
||||
|
||||
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
|
||||
///
|
||||
/// Requests carry a oneshot sender for the response; notifications and server-request
|
||||
/// replies are fire-and-forget from the caller's perspective (transport errors are
|
||||
/// caught by `try_send` on the outer channel).
|
||||
enum InProcessClientMessage {
|
||||
Request {
|
||||
request: Box<ClientRequest>,
|
||||
response_tx: oneshot::Sender<PendingClientRequestResponse>,
|
||||
},
|
||||
Notification {
|
||||
notification: ClientNotification,
|
||||
},
|
||||
ServerRequestResponse {
|
||||
request_id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
ServerRequestError {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
Shutdown {
|
||||
done_tx: oneshot::Sender<()>,
|
||||
},
|
||||
}
|
||||
|
||||
enum ProcessorCommand {
|
||||
Request(Box<ClientRequest>),
|
||||
Notification(ClientNotification),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessClientSender {
|
||||
client_tx: mpsc::Sender<InProcessClientMessage>,
|
||||
}
|
||||
|
||||
impl InProcessClientSender {
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.try_send_client_message(InProcessClientMessage::Request {
|
||||
request: Box::new(request),
|
||||
response_tx,
|
||||
})?;
|
||||
response_rx.await.map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
format!("in-process request response channel closed: {err}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::Notification { notification })
|
||||
}
|
||||
|
||||
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
|
||||
request_id,
|
||||
result,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn fail_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
|
||||
request_id,
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
||||
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
|
||||
match self.client_tx.try_send(message) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
|
||||
ErrorKind::WouldBlock,
|
||||
"in-process app-server client queue is full",
|
||||
)),
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server runtime is closed",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle used by an in-process client to call app-server and consume events.
|
||||
///
|
||||
/// This is the low-level runtime handle. Higher-level callers should usually go
|
||||
/// through `codex-app-server-client`, which adds worker-task buffering,
|
||||
/// request/response helpers, and surface-specific startup policy.
|
||||
pub struct InProcessClientHandle {
|
||||
client: InProcessClientSender,
|
||||
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
||||
runtime_handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl InProcessClientHandle {
|
||||
/// Sends a typed client request into the in-process runtime.
|
||||
///
|
||||
/// The returned value is a transport-level `IoResult` containing either a
|
||||
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
|
||||
/// request IDs unique among concurrent requests; reusing an in-flight ID
|
||||
/// produces an `INVALID_REQUEST` response and can make request routing
|
||||
/// ambiguous in the caller.
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
self.client.request(request).await
|
||||
}
|
||||
|
||||
/// Sends a typed client notification into the in-process runtime.
|
||||
///
|
||||
/// Notifications do not have an application-level response. Transport
|
||||
/// errors indicate queue saturation or closed runtime.
|
||||
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
self.client.notify(notification)
|
||||
}
|
||||
|
||||
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
||||
///
|
||||
/// This should be used only with request IDs received from the current
|
||||
/// runtime event stream; sending arbitrary IDs has no effect on app-server
|
||||
/// state and can mask a stuck approval flow in the caller.
|
||||
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
||||
self.client.respond_to_server_request(request_id, result)
|
||||
}
|
||||
|
||||
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
||||
///
|
||||
/// Use this when the embedder cannot satisfy a server request; leaving
|
||||
/// requests unanswered can stall turn progress.
|
||||
pub fn fail_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
self.client.fail_server_request(request_id, error)
|
||||
}
|
||||
|
||||
/// Receives the next server event from the in-process runtime.
|
||||
///
|
||||
/// Returns `None` when the runtime task exits and no more events are
|
||||
/// available.
|
||||
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
||||
self.event_rx.recv().await
|
||||
}
|
||||
|
||||
/// Requests runtime shutdown and waits for worker termination.
|
||||
///
|
||||
/// Shutdown is bounded by internal timeouts and may abort background tasks
|
||||
/// if graceful drain does not complete in time.
|
||||
pub async fn shutdown(self) -> IoResult<()> {
|
||||
let mut runtime_handle = self.runtime_handle;
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
|
||||
if self
|
||||
.client
|
||||
.client_tx
|
||||
.send(InProcessClientMessage::Shutdown { done_tx })
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
|
||||
runtime_handle.abort();
|
||||
let _ = runtime_handle.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> InProcessClientSender {
|
||||
self.client.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts an in-process app-server runtime and performs initialize handshake.
|
||||
///
|
||||
/// This function sends `initialize` followed by `initialized` before returning
|
||||
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
|
||||
/// the runtime is shut down and an `InvalidData` error is returned.
|
||||
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let initialize = args.initialize.clone();
|
||||
let client = start_uninitialized(args);
|
||||
|
||||
let initialize_response = client
|
||||
.request(ClientRequest::Initialize {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: initialize,
|
||||
})
|
||||
.await?;
|
||||
if let Err(error) = initialize_response {
|
||||
let _ = client.shutdown().await;
|
||||
return Err(IoError::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("in-process initialize failed: {}", error.message),
|
||||
));
|
||||
}
|
||||
client.notify(ClientNotification::Initialized)?;
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
|
||||
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
|
||||
|
||||
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
||||
outbound_connections.insert(
|
||||
IN_PROCESS_CONNECTION_ID,
|
||||
OutboundConnectionState::new(
|
||||
writer_tx,
|
||||
Arc::clone(&outbound_initialized),
|
||||
Arc::clone(&outbound_experimental_api_enabled),
|
||||
Arc::clone(&outbound_opted_out_notification_methods),
|
||||
None,
|
||||
),
|
||||
);
|
||||
let mut outbound_handle = tokio::spawn(async move {
|
||||
while let Some(envelope) = outgoing_rx.recv().await {
|
||||
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
||||
}
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
cli_overrides: args.cli_overrides,
|
||||
loader_overrides: args.loader_overrides,
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
feedback: args.feedback,
|
||||
log_db: None,
|
||||
config_warnings: args.config_warnings,
|
||||
session_source: args.session_source,
|
||||
enable_codex_api_key_env: args.enable_codex_api_key_env,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut session = ConnectionSessionState::default();
|
||||
let mut listen_for_threads = true;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = processor_rx.recv() => {
|
||||
match command {
|
||||
Some(ProcessorCommand::Request(request)) => {
|
||||
let was_initialized = session.initialized;
|
||||
processor
|
||||
.process_client_request(
|
||||
IN_PROCESS_CONNECTION_ID,
|
||||
*request,
|
||||
&mut session,
|
||||
&outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
if let Ok(mut opted_out_notification_methods) =
|
||||
outbound_opted_out_notification_methods.write()
|
||||
{
|
||||
*opted_out_notification_methods =
|
||||
session.opted_out_notification_methods.clone();
|
||||
} else {
|
||||
warn!("failed to update outbound opted-out notifications");
|
||||
}
|
||||
outbound_experimental_api_enabled.store(
|
||||
session.experimental_api_enabled,
|
||||
Ordering::Release,
|
||||
);
|
||||
if !was_initialized && session.initialized {
|
||||
processor.send_initialize_notifications().await;
|
||||
}
|
||||
}
|
||||
Some(ProcessorCommand::Notification(notification)) => {
|
||||
processor.process_client_notification(notification).await;
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
let connection_ids = if session.initialized {
|
||||
vec![IN_PROCESS_CONNECTION_ID]
|
||||
} else {
|
||||
Vec::<ConnectionId>::new()
|
||||
};
|
||||
processor
|
||||
.try_attach_thread_listener(thread_id, connection_ids)
|
||||
.await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
processor.connection_closed(IN_PROCESS_CONNECTION_ID).await;
|
||||
});
|
||||
let mut pending_request_responses =
|
||||
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
|
||||
let mut shutdown_ack = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
message = client_rx.recv() => {
|
||||
match message {
|
||||
Some(InProcessClientMessage::Request { request, response_tx }) => {
|
||||
let request = *request;
|
||||
let request_id = request.id().clone();
|
||||
match pending_request_responses.entry(request_id.clone()) {
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(response_tx);
|
||||
}
|
||||
Entry::Occupied(_) => {
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("duplicate request id: {request_id:?}"),
|
||||
data: None,
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
if let Some(response_tx) =
|
||||
pending_request_responses.remove(&request_id)
|
||||
{
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: OVERLOADED_ERROR_CODE,
|
||||
message: "in-process app-server request queue is full"
|
||||
.to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
if let Some(response_tx) =
|
||||
pending_request_responses.remove(&request_id)
|
||||
{
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message:
|
||||
"in-process app-server request processor is closed"
|
||||
.to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InProcessClientMessage::Notification { notification }) => {
|
||||
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
warn!("dropping in-process client notification (queue full)");
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
|
||||
outgoing_message_sender
|
||||
.notify_client_response(request_id, result)
|
||||
.await;
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
|
||||
outgoing_message_sender
|
||||
.notify_client_error(request_id, error)
|
||||
.await;
|
||||
}
|
||||
Some(InProcessClientMessage::Shutdown { done_tx }) => {
|
||||
shutdown_ack = Some(done_tx);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
outgoing_message = writer_rx.recv() => {
|
||||
let Some(outgoing_message) = outgoing_message else {
|
||||
break;
|
||||
};
|
||||
match outgoing_message {
|
||||
OutgoingMessage::Response(response) => {
|
||||
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
|
||||
let _ = response_tx.send(Ok(response.result));
|
||||
} else {
|
||||
warn!(
|
||||
request_id = ?response.id,
|
||||
"dropping unmatched in-process response"
|
||||
);
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Error(error) => {
|
||||
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
|
||||
let _ = response_tx.send(Err(error.error));
|
||||
} else {
|
||||
warn!(
|
||||
request_id = ?error.id,
|
||||
"dropping unmatched in-process error response"
|
||||
);
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Request(request) => {
|
||||
// Send directly to avoid cloning; on failure the
|
||||
// original value is returned inside the error.
|
||||
if let Err(send_error) = event_tx
|
||||
.try_send(InProcessServerEvent::ServerRequest(request))
|
||||
{
|
||||
let (code, message, inner) = match send_error {
|
||||
mpsc::error::TrySendError::Full(inner) => (
|
||||
OVERLOADED_ERROR_CODE,
|
||||
"in-process server request queue is full",
|
||||
inner,
|
||||
),
|
||||
mpsc::error::TrySendError::Closed(inner) => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
"in-process server request consumer is closed",
|
||||
inner,
|
||||
),
|
||||
};
|
||||
let request_id = match inner {
|
||||
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
|
||||
_ => unreachable!("we just sent a ServerRequest variant"),
|
||||
};
|
||||
outgoing_message_sender
|
||||
.notify_client_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code,
|
||||
message: message.to_string(),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
OutgoingMessage::AppServerNotification(notification) => {
|
||||
if server_notification_requires_delivery(¬ification) {
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::ServerNotification(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if let Err(send_error) =
|
||||
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
|
||||
{
|
||||
match send_error {
|
||||
mpsc::error::TrySendError::Full(_) => {
|
||||
warn!("dropping in-process server notification (queue full)");
|
||||
}
|
||||
mpsc::error::TrySendError::Closed(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Notification(notification) => {
|
||||
let notification = JSONRPCNotification {
|
||||
method: notification.method,
|
||||
params: notification.params,
|
||||
};
|
||||
if legacy_notification_requires_delivery(¬ification) {
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::LegacyNotification(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if let Err(send_error) =
|
||||
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
|
||||
{
|
||||
match send_error {
|
||||
mpsc::error::TrySendError::Full(_) => {
|
||||
warn!("dropping in-process legacy notification (queue full)");
|
||||
}
|
||||
mpsc::error::TrySendError::Closed(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(writer_rx);
|
||||
drop(processor_tx);
|
||||
outgoing_message_sender
|
||||
.cancel_all_requests(Some(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "in-process app-server runtime is shutting down".to_string(),
|
||||
data: None,
|
||||
}))
|
||||
.await;
|
||||
// Drop the runtime's last sender before awaiting the router task so
|
||||
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
|
||||
drop(outgoing_message_sender);
|
||||
for (_, response_tx) in pending_request_responses {
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "in-process app-server runtime is shutting down".to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
|
||||
processor_handle.abort();
|
||||
let _ = processor_handle.await;
|
||||
}
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
|
||||
outbound_handle.abort();
|
||||
let _ = outbound_handle.await;
|
||||
}
|
||||
|
||||
if let Some(done_tx) = shutdown_ack {
|
||||
let _ = done_tx.send(());
|
||||
}
|
||||
});
|
||||
|
||||
InProcessClientHandle {
|
||||
client: InProcessClientSender { client_tx },
|
||||
event_rx,
|
||||
runtime_handle,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
||||
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
Ok(config) => config,
|
||||
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
||||
.expect("default config should load"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_test_client_with_capacity(
|
||||
session_source: SessionSource,
|
||||
channel_capacity: usize,
|
||||
) -> InProcessClientHandle {
|
||||
let args = InProcessStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: Arc::new(build_test_config().await),
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source,
|
||||
enable_codex_api_key_env: false,
|
||||
initialize: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-in-process-test".to_string(),
|
||||
title: None,
|
||||
version: "0.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
channel_capacity,
|
||||
};
|
||||
start(args).await.expect("in-process runtime should start")
|
||||
}
|
||||
|
||||
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
|
||||
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_initializes_and_handles_typed_v2_request() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
let response = client
|
||||
.request(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
.expect("request transport should work")
|
||||
.expect("request should succeed");
|
||||
assert!(response.is_object());
|
||||
|
||||
let _parsed: ConfigRequirementsReadResponse =
|
||||
serde_json::from_value(response).expect("response should match v2 schema");
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_uses_requested_session_source_for_thread_start() {
|
||||
for (requested_source, expected_source) in [
|
||||
(SessionSource::Cli, ApiSessionSource::Cli),
|
||||
(SessionSource::Exec, ApiSessionSource::Exec),
|
||||
] {
|
||||
let client = start_test_client(requested_source).await;
|
||||
let response = client
|
||||
.request(ClientRequest::ThreadStart {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: ThreadStartParams {
|
||||
ephemeral: Some(true),
|
||||
..ThreadStartParams::default()
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("request transport should work")
|
||||
.expect("thread/start should succeed");
|
||||
let parsed: ThreadStartResponse =
|
||||
serde_json::from_value(response).expect("thread/start response should parse");
|
||||
assert_eq!(parsed.thread.source, expected_source);
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_clamps_zero_channel_capacity() {
|
||||
let client = start_test_client_with_capacity(SessionSource::Cli, 0).await;
|
||||
let response = loop {
|
||||
match client
|
||||
.request(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(4),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(response) => break response.expect("request should succeed"),
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Err(err) => panic!("request transport should work: {err}"),
|
||||
}
|
||||
};
|
||||
let _parsed: ConfigRequirementsReadResponse =
|
||||
serde_json::from_value(response).expect("response should match v2 schema");
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
||||
assert!(server_notification_requires_delivery(
|
||||
&ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
})
|
||||
));
|
||||
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/task_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/turn_aborted".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/shutdown_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(!legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/item_started".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user