Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
03d1bb9f03 Enable experimental API in request-order regression test
Co-authored-by: Codex <noreply@openai.com>
2026-03-17 20:55:59 +00:00
Ahmed Ibrahim
7b5094f288 Preserve in-process request ordering for app-server clients
Co-authored-by: Codex <noreply@openai.com>
2026-03-17 18:22:04 +00:00
2 changed files with 122 additions and 19 deletions

View File

@@ -352,14 +352,21 @@ impl InProcessAppServerClient {
command = command_rx.recv() => {
match command {
Some(ClientCommand::Request { request, response_tx }) => {
let request_sender = request_sender.clone();
// Request waits happen on a detached task so
// this loop can keep draining runtime events
// while the request is blocked on client input.
tokio::spawn(async move {
let result = request_sender.request(*request).await;
let _ = response_tx.send(result);
});
// Enqueue synchronously so the embedded runtime
// observes request order matching the caller's
// command order, then detach only the response
// wait so this loop can keep draining events.
match request_sender.start_request(*request) {
Ok(pending_request) => {
tokio::spawn(async move {
let result = pending_request.recv().await;
let _ = response_tx.send(result);
});
}
Err(err) => {
let _ = response_tx.send(Err(err));
}
}
}
Some(ClientCommand::Notify {
notification,

View File

@@ -92,7 +92,24 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Default bounded channel capacity for in-process runtime queues.
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
/// JSON-RPC application response returned by in-process client requests.
pub type InProcessRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
/// Handle for a request that has been enqueued but whose response has not yet been awaited.
pub struct PendingInProcessRequest {
response_rx: oneshot::Receiver<InProcessRequestResponse>,
}
impl PendingInProcessRequest {
pub async fn recv(self) -> IoResult<InProcessRequestResponse> {
self.response_rx.await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!("in-process request response channel closed: {err}"),
)
})
}
}
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
@@ -171,7 +188,7 @@ pub enum InProcessServerEvent {
enum InProcessClientMessage {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<PendingClientRequestResponse>,
response_tx: oneshot::Sender<InProcessRequestResponse>,
},
Notification {
notification: ClientNotification,
@@ -200,18 +217,17 @@ pub struct InProcessClientSender {
}
impl InProcessClientSender {
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
pub fn start_request(&self, request: ClientRequest) -> IoResult<PendingInProcessRequest> {
let (response_tx, response_rx) = oneshot::channel();
self.try_send_client_message(InProcessClientMessage::Request {
request: Box::new(request),
response_tx,
})?;
response_rx.await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!("in-process request response channel closed: {err}"),
)
})
Ok(PendingInProcessRequest { response_rx })
}
pub async fn request(&self, request: ClientRequest) -> IoResult<InProcessRequestResponse> {
self.start_request(request)?.recv().await
}
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
@@ -263,6 +279,11 @@ pub struct InProcessClientHandle {
}
impl InProcessClientHandle {
/// Enqueues a typed client request and returns a handle for awaiting the response later.
pub fn start_request(&self, request: ClientRequest) -> IoResult<PendingInProcessRequest> {
self.client.start_request(request)
}
/// Sends a typed client request into the in-process runtime.
///
/// The returned value is a transport-level `IoResult` containing either a
@@ -270,7 +291,7 @@ impl InProcessClientHandle {
/// request IDs unique among concurrent requests; reusing an in-flight ID
/// produces an `INVALID_REQUEST` response and can make request routing
/// ambiguous in the caller.
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
pub async fn request(&self, request: ClientRequest) -> IoResult<InProcessRequestResponse> {
self.client.request(request).await
}
@@ -490,7 +511,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
processor.shutdown_threads().await;
});
let mut pending_request_responses =
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
HashMap::<RequestId, oneshot::Sender<InProcessRequestResponse>>::new();
let mut shutdown_ack = None;
loop {
@@ -732,6 +753,10 @@ mod tests {
use super::*;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -740,6 +765,7 @@ mod tests {
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
@@ -857,6 +883,76 @@ mod tests {
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn start_request_preserves_enqueue_order_for_dependent_requests() {
let client = start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
},
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await
.expect("in-process runtime should start");
let root = TempDir::new().expect("temp dir should be created");
std::fs::write(root.path().join("alpha.txt"), "contents")
.expect("fixture file should be written");
let root_path = root.path().to_string_lossy().to_string();
let start_request = client
.start_request(ClientRequest::FuzzyFileSearchSessionStart {
request_id: RequestId::Integer(10),
params: FuzzyFileSearchSessionStartParams {
session_id: "session-1".to_string(),
roots: vec![root_path],
},
})
.expect("session start should enqueue");
let update_request = client
.start_request(ClientRequest::FuzzyFileSearchSessionUpdate {
request_id: RequestId::Integer(11),
params: FuzzyFileSearchSessionUpdateParams {
session_id: "session-1".to_string(),
query: "alpha".to_string(),
},
})
.expect("session update should enqueue");
start_request
.recv()
.await
.expect("session start transport should succeed")
.expect("session start should succeed");
update_request
.recv()
.await
.expect("session update transport should succeed")
.expect("session update should succeed");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
assert!(server_notification_requires_delivery(