Test generic exec-server RPC response matching

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-18 14:43:39 -07:00
parent 66f49ea604
commit 43b112c263

View File

@@ -440,3 +440,123 @@ async fn drain_pending(pending: &Mutex<HashMap<RequestId, PendingRequest>>) {
}));
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::time::timeout;
use super::RpcClient;
use crate::connection::JsonRpcConnection;
async fn read_jsonrpc_line<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> JSONRPCMessage
where
R: tokio::io::AsyncRead + Unpin,
{
let next_line = timeout(Duration::from_secs(1), lines.next_line()).await;
let line_result = match next_line {
Ok(line_result) => line_result,
Err(err) => panic!("timed out waiting for JSON-RPC line: {err}"),
};
let maybe_line = match line_result {
Ok(maybe_line) => maybe_line,
Err(err) => panic!("failed to read JSON-RPC line: {err}"),
};
let line = match maybe_line {
Some(line) => line,
None => panic!("server connection closed before JSON-RPC line arrived"),
};
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(message) => message,
Err(err) => panic!("failed to parse JSON-RPC line: {err}"),
}
}
async fn write_jsonrpc_line<W>(writer: &mut W, message: JSONRPCMessage)
where
W: tokio::io::AsyncWrite + Unpin,
{
let encoded = match serde_json::to_string(&message) {
Ok(encoded) => encoded,
Err(err) => panic!("failed to encode JSON-RPC message: {err}"),
};
if let Err(err) = writer.write_all(format!("{encoded}\n").as_bytes()).await {
panic!("failed to write JSON-RPC line: {err}");
}
}
#[tokio::test]
async fn rpc_client_matches_out_of_order_responses_by_request_id() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let (client, _events_rx) = RpcClient::new(JsonRpcConnection::from_stdio(
client_stdout,
client_stdin,
"test-rpc".to_string(),
));
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let first = read_jsonrpc_line(&mut lines).await;
let second = read_jsonrpc_line(&mut lines).await;
let (slow_request, fast_request) = match (first, second) {
(
JSONRPCMessage::Request(first_request),
JSONRPCMessage::Request(second_request),
) if first_request.method == "slow" && second_request.method == "fast" => {
(first_request, second_request)
}
(
JSONRPCMessage::Request(first_request),
JSONRPCMessage::Request(second_request),
) if first_request.method == "fast" && second_request.method == "slow" => {
(second_request, first_request)
}
_ => panic!("expected slow and fast requests"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: fast_request.id,
result: serde_json::json!({ "value": "fast" }),
}),
)
.await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: slow_request.id,
result: serde_json::json!({ "value": "slow" }),
}),
)
.await;
});
let slow_params = serde_json::json!({ "n": 1 });
let fast_params = serde_json::json!({ "n": 2 });
let (slow, fast) = tokio::join!(
client.call::<_, serde_json::Value>("slow", &slow_params),
client.call::<_, serde_json::Value>("fast", &fast_params),
);
let slow = slow.unwrap_or_else(|err| panic!("slow request failed: {err:?}"));
let fast = fast.unwrap_or_else(|err| panic!("fast request failed: {err:?}"));
assert_eq!(slow, serde_json::json!({ "value": "slow" }));
assert_eq!(fast, serde_json::json!({ "value": "fast" }));
assert_eq!(client.pending_request_count().await, 0);
if let Err(err) = server.await {
panic!("server task failed: {err}");
}
}
}