mirror of
https://github.com/openai/codex.git
synced 2026-04-30 11:21:34 +03:00
Preserve in-process request ordering for app-server clients
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -92,7 +92,24 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Default bounded channel capacity for in-process runtime queues.
|
||||
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
|
||||
|
||||
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
|
||||
/// JSON-RPC application response returned by in-process client requests.
|
||||
pub type InProcessRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
|
||||
|
||||
/// Handle for a request that has been enqueued but whose response has not yet been awaited.
|
||||
pub struct PendingInProcessRequest {
|
||||
response_rx: oneshot::Receiver<InProcessRequestResponse>,
|
||||
}
|
||||
|
||||
impl PendingInProcessRequest {
|
||||
pub async fn recv(self) -> IoResult<InProcessRequestResponse> {
|
||||
self.response_rx.await.map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
format!("in-process request response channel closed: {err}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
||||
matches!(notification, ServerNotification::TurnCompleted(_))
|
||||
@@ -171,7 +188,7 @@ pub enum InProcessServerEvent {
|
||||
enum InProcessClientMessage {
|
||||
Request {
|
||||
request: Box<ClientRequest>,
|
||||
response_tx: oneshot::Sender<PendingClientRequestResponse>,
|
||||
response_tx: oneshot::Sender<InProcessRequestResponse>,
|
||||
},
|
||||
Notification {
|
||||
notification: ClientNotification,
|
||||
@@ -200,18 +217,17 @@ pub struct InProcessClientSender {
|
||||
}
|
||||
|
||||
impl InProcessClientSender {
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
pub fn start_request(&self, request: ClientRequest) -> IoResult<PendingInProcessRequest> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.try_send_client_message(InProcessClientMessage::Request {
|
||||
request: Box::new(request),
|
||||
response_tx,
|
||||
})?;
|
||||
response_rx.await.map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
format!("in-process request response channel closed: {err}"),
|
||||
)
|
||||
})
|
||||
Ok(PendingInProcessRequest { response_rx })
|
||||
}
|
||||
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<InProcessRequestResponse> {
|
||||
self.start_request(request)?.recv().await
|
||||
}
|
||||
|
||||
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
@@ -263,6 +279,11 @@ pub struct InProcessClientHandle {
|
||||
}
|
||||
|
||||
impl InProcessClientHandle {
|
||||
/// Enqueues a typed client request and returns a handle for awaiting the response later.
|
||||
pub fn start_request(&self, request: ClientRequest) -> IoResult<PendingInProcessRequest> {
|
||||
self.client.start_request(request)
|
||||
}
|
||||
|
||||
/// Sends a typed client request into the in-process runtime.
|
||||
///
|
||||
/// The returned value is a transport-level `IoResult` containing either a
|
||||
@@ -270,7 +291,7 @@ impl InProcessClientHandle {
|
||||
/// request IDs unique among concurrent requests; reusing an in-flight ID
|
||||
/// produces an `INVALID_REQUEST` response and can make request routing
|
||||
/// ambiguous in the caller.
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<InProcessRequestResponse> {
|
||||
self.client.request(request).await
|
||||
}
|
||||
|
||||
@@ -490,7 +511,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
processor.shutdown_threads().await;
|
||||
});
|
||||
let mut pending_request_responses =
|
||||
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
|
||||
HashMap::<RequestId, oneshot::Sender<InProcessRequestResponse>>::new();
|
||||
let mut shutdown_ack = None;
|
||||
|
||||
loop {
|
||||
@@ -732,6 +753,8 @@ mod tests {
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
|
||||
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
@@ -740,6 +763,7 @@ mod tests {
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
@@ -857,6 +881,50 @@ mod tests {
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_request_preserves_enqueue_order_for_dependent_requests() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
let root = TempDir::new().expect("temp dir should be created");
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")
|
||||
.expect("fixture file should be written");
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
|
||||
let start_request = client
|
||||
.start_request(ClientRequest::FuzzyFileSearchSessionStart {
|
||||
request_id: RequestId::Integer(10),
|
||||
params: FuzzyFileSearchSessionStartParams {
|
||||
session_id: "session-1".to_string(),
|
||||
roots: vec![root_path],
|
||||
},
|
||||
})
|
||||
.expect("session start should enqueue");
|
||||
let update_request = client
|
||||
.start_request(ClientRequest::FuzzyFileSearchSessionUpdate {
|
||||
request_id: RequestId::Integer(11),
|
||||
params: FuzzyFileSearchSessionUpdateParams {
|
||||
session_id: "session-1".to_string(),
|
||||
query: "alpha".to_string(),
|
||||
},
|
||||
})
|
||||
.expect("session update should enqueue");
|
||||
|
||||
start_request
|
||||
.recv()
|
||||
.await
|
||||
.expect("session start transport should succeed")
|
||||
.expect("session start should succeed");
|
||||
update_request
|
||||
.recv()
|
||||
.await
|
||||
.expect("session update transport should succeed")
|
||||
.expect("session update should succeed");
|
||||
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
||||
assert!(server_notification_requires_delivery(
|
||||
|
||||
Reference in New Issue
Block a user