Compare commits

...

2 Commits

Author SHA1 Message Date
Eric Traut
591efdc95a codex: address PR review feedback (#16365) 2026-03-31 14:52:38 -06:00
Eric Traut
0e4c922344 Avoid blocking on app-server request resolution 2026-03-31 14:24:20 -06:00
2 changed files with 186 additions and 73 deletions

View File

@@ -342,12 +342,10 @@ enum ClientCommand {
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
@@ -428,19 +426,22 @@ impl InProcessAppServerClient {
Some(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
}) => {
let send_result =
request_sender.respond_to_server_request(request_id, result);
let _ = response_tx.send(send_result);
if let Err(err) =
request_sender.respond_to_server_request(request_id, result)
{
warn!("failed to resolve in-process server request: {err}");
}
}
Some(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
}) => {
let send_result = request_sender.fail_server_request(request_id, error);
let _ = response_tx.send(send_result);
if let Err(err) =
request_sender.fail_server_request(request_id, error)
{
warn!("failed to reject in-process server request: {err}");
}
}
Some(ClientCommand::Shutdown { response_tx }) => {
let shutdown_result = handle.shutdown().await;
@@ -595,61 +596,44 @@ impl InProcessAppServerClient {
/// Resolves a pending server request.
///
/// This should only be called with request IDs obtained from the current
/// client's event stream.
/// This returns after enqueueing the resolution so the caller can keep
/// draining `next_event()` even if the worker is currently blocked on a
/// must-deliver notification.
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.send(ClientCommand::ResolveServerRequest { request_id, result })
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server resolve channel is closed",
)
})?
})
}
/// Rejects a pending server request with JSON-RPC error payload.
///
/// This returns after enqueueing the rejection so the caller can keep
/// draining `next_event()` even if the worker is currently blocked on a
/// must-deliver notification.
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.send(ClientCommand::RejectServerRequest { request_id, error })
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server reject channel is closed",
)
})?
})
}
/// Returns the next in-process event, or `None` when worker exits.
@@ -1264,6 +1248,67 @@ mod tests {
));
}
#[tokio::test]
async fn resolve_server_request_returns_after_enqueueing_command() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
let client = InProcessAppServerClient {
command_tx,
event_rx,
worker_handle: tokio::spawn(async {}),
};
timeout(
Duration::from_secs(1),
client.resolve_server_request(RequestId::Integer(7), serde_json::json!({ "ok": true })),
)
.await
.expect("resolve should return before timeout")
.expect("resolve should enqueue successfully");
let Some(ClientCommand::ResolveServerRequest { request_id, result }) =
command_rx.recv().await
else {
panic!("expected resolve command");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(result, serde_json::json!({ "ok": true }));
}
#[tokio::test]
async fn reject_server_request_returns_after_enqueueing_command() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
let client = InProcessAppServerClient {
command_tx,
event_rx,
worker_handle: tokio::spawn(async {}),
};
let error = JSONRPCErrorError {
code: -32000,
message: "declined".to_string(),
data: None,
};
timeout(
Duration::from_secs(1),
client.reject_server_request(RequestId::Integer(9), error.clone()),
)
.await
.expect("reject should return before timeout")
.expect("reject should enqueue successfully");
let Some(ClientCommand::RejectServerRequest {
request_id,
error: queued_error,
}) = command_rx.recv().await
else {
panic!("expected reject command");
};
assert_eq!(request_id, RequestId::Integer(9));
assert_eq!(queued_error, error);
}
#[tokio::test]
async fn remote_typed_request_roundtrip_works() {
let websocket_url = start_test_remote_server(|mut websocket| async move {

View File

@@ -111,12 +111,10 @@ enum RemoteClientCommand {
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
@@ -255,9 +253,8 @@ impl RemoteAppServerClient {
RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
} => {
let result = write_jsonrpc_message(
if let Err(err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
@@ -265,15 +262,28 @@ impl RemoteAppServerClient {
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
.await
{
let err_message = err.to_string();
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
break;
}
}
RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
} => {
let result = write_jsonrpc_message(
if let Err(err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Error(JSONRPCError {
error,
@@ -281,8 +291,22 @@ impl RemoteAppServerClient {
}),
&websocket_url,
)
.await;
let _ = response_tx.send(result);
.await
{
let err_message = err.to_string();
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
break;
}
}
RemoteClientCommand::Shutdown { response_tx } => {
let close_result = stream.close(None).await.map_err(|err| {
@@ -540,26 +564,15 @@ impl RemoteAppServerClient {
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.send(RemoteClientCommand::ResolveServerRequest { request_id, result })
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server resolve channel is closed",
)
})?
})
}
pub async fn reject_server_request(
@@ -567,26 +580,15 @@ impl RemoteAppServerClient {
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.send(RemoteClientCommand::RejectServerRequest { request_id, error })
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server reject channel is closed",
)
})?
})
}
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
@@ -947,6 +949,9 @@ async fn write_jsonrpc_message(
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::time::timeout;
#[test]
fn event_requires_delivery_marks_transcript_and_disconnect_events() {
@@ -979,4 +984,67 @@ mod tests {
skipped: 1
}));
}
#[tokio::test]
async fn resolve_server_request_returns_after_enqueueing_command() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
let client = RemoteAppServerClient {
command_tx,
event_rx,
pending_events: VecDeque::new(),
worker_handle: tokio::spawn(async {}),
};
timeout(
Duration::from_secs(1),
client.resolve_server_request(RequestId::Integer(3), serde_json::json!({ "ok": true })),
)
.await
.expect("resolve should return before timeout")
.expect("resolve should enqueue successfully");
let Some(RemoteClientCommand::ResolveServerRequest { request_id, result }) =
command_rx.recv().await
else {
panic!("expected resolve command");
};
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(result, serde_json::json!({ "ok": true }));
}
#[tokio::test]
async fn reject_server_request_returns_after_enqueueing_command() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
let client = RemoteAppServerClient {
command_tx,
event_rx,
pending_events: VecDeque::new(),
worker_handle: tokio::spawn(async {}),
};
let error = JSONRPCErrorError {
code: -32000,
message: "declined".to_string(),
data: None,
};
timeout(
Duration::from_secs(1),
client.reject_server_request(RequestId::Integer(4), error.clone()),
)
.await
.expect("reject should return before timeout")
.expect("reject should enqueue successfully");
let Some(RemoteClientCommand::RejectServerRequest {
request_id,
error: queued_error,
}) = command_rx.recv().await
else {
panic!("expected reject command");
};
assert_eq!(request_id, RequestId::Integer(4));
assert_eq!(queued_error, error);
}
}