refactor(exec-server): split transports from client launch

Separate the transport-neutral JSON-RPC connection and server processor from
local process spawning, add websocket support, and document the new API
shape.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 01:33:51 +00:00
parent 5ce96df361
commit 632cd242bc
12 changed files with 1293 additions and 531 deletions

View File

@@ -3,6 +3,7 @@
use std::process::Stdio;
use std::time::Duration;
use anyhow::Context;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
@@ -10,9 +11,12 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecServerClient;
use codex_exec_server::ExecServerClientConnectOptions;
use codex_exec_server::ExecServerLaunchCommand;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::RemoteExecServerConnectArgs;
use codex_exec_server::spawn_local_exec_server;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
@@ -76,13 +80,19 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
}
let client = ExecServerClient::spawn(ExecServerLaunchCommand {
program: cargo_bin("codex-exec-server")?,
args: Vec::new(),
})
let server = spawn_local_exec_server(
ExecServerLaunchCommand {
program: cargo_bin("codex-exec-server")?,
args: Vec::new(),
},
ExecServerClientConnectOptions {
client_name: "exec-server-test".to_string(),
},
)
.await?;
let process = client
let process = server
.client()
.start_process(ExecParams {
process_id: "2001".to_string(),
argv: vec![
@@ -124,6 +134,86 @@ async fn exec_server_client_streams_output_and_accepts_writes() -> anyhow::Resul
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> {
let mut env = std::collections::HashMap::new();
if let Some(path) = std::env::var_os("PATH") {
env.insert("PATH".to_string(), path.to_string_lossy().into_owned());
}
let binary = cargo_bin("codex-exec-server")?;
let mut child = Command::new(binary);
child.args(["--listen", "ws://127.0.0.1:0"]);
child.stdin(Stdio::null());
child.stdout(Stdio::null());
child.stderr(Stdio::piped());
let mut child = child.spawn()?;
let stderr = child.stderr.take().expect("stderr");
let mut stderr_lines = BufReader::new(stderr).lines();
let websocket_url = read_websocket_url(&mut stderr_lines).await?;
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: "exec-server-test".to_string(),
})
.await?;
let process = client
.start_process(ExecParams {
process_id: "2002".to_string(),
argv: vec![
"bash".to_string(),
"-lc".to_string(),
"printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"
.to_string(),
],
cwd: std::env::current_dir()?,
env,
tty: true,
output_bytes_cap: 4096,
arg0: None,
})
.await?;
let mut output = process.output_receiver();
assert!(
recv_until_contains(&mut output, "ready")
.await?
.contains("ready"),
"expected initial ready output"
);
process
.writer_sender()
.send(b"hello\n".to_vec())
.await
.expect("write should succeed");
assert!(
recv_until_contains(&mut output, "echo:hello")
.await?
.contains("echo:hello"),
"expected echoed output"
);
process.terminate();
child.start_kill()?;
Ok(())
}
async fn read_websocket_url<R>(lines: &mut tokio::io::Lines<BufReader<R>>) -> anyhow::Result<String>
where
R: tokio::io::AsyncRead + Unpin,
{
let line = timeout(Duration::from_secs(5), lines.next_line()).await??;
let line = line.context("missing websocket startup banner")?;
let websocket_url = line
.split_whitespace()
.find(|part| part.starts_with("ws://"))
.context("missing websocket URL in startup banner")?;
Ok(websocket_url.to_string())
}
async fn recv_until_contains(
output: &mut broadcast::Receiver<Vec<u8>>,
needle: &str,