Compare commits

...

7 Commits

Author SHA1 Message Date
Michael Zeng
65c80faaf5 [exec-server] remove added HTTP upgrade tests 2026-05-11 15:31:01 -07:00
Ruslan Nigmatullin
81f5eabfc1 [exec-server] add HTTP-upgrade listen mode 2026-05-11 22:11:36 +00:00
Ruslan Nigmatullin
686953b63b [exec-server] preserve raw websocket listen mode 2026-05-11 22:08:49 +00:00
Ruslan Nigmatullin
a4a0322224 [exec-server] drop binary websocket handling 2026-05-11 22:07:02 +00:00
Ruslan Nigmatullin
a5a031cde0 [exec-server] add readiness endpoint 2026-05-11 21:33:10 +00:00
Ruslan Nigmatullin
734af5228e [exec-server] keep websocket URL surface unchanged 2026-05-11 21:16:39 +00:00
Ruslan Nigmatullin
6f40914087 [exec-server] add HTTP health endpoints 2026-05-09 21:48:53 +00:00
9 changed files with 213 additions and 74 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -2741,6 +2741,7 @@ dependencies = [
"anyhow",
"arc-swap",
"async-trait",
"axum",
"base64 0.22.1",
"bytes",
"codex-app-server-protocol",
@@ -2751,6 +2752,7 @@ dependencies = [
"codex-test-binary-support",
"codex-utils-absolute-path",
"codex-utils-pty",
"codex-utils-rustls-provider",
"ctor 0.6.3",
"futures",
"pretty_assertions",

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true, features = ["http1", "tokio", "ws"] }
base64 = { workspace = true }
bytes = { workspace = true }
codex-app-server-protocol = { workspace = true }
@@ -22,6 +23,7 @@ codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
futures = { workspace = true }
reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] }
serde = { workspace = true, features = ["derive"] }

View File

@@ -22,6 +22,7 @@ the wire.
The CLI entrypoint supports:
- `ws://IP:PORT` (default)
- `ws+http://IP:PORT` for HTTP-upgrade websocket connections plus `/readyz`
- `--remote URL --executor-id ID [--name NAME]`
Remote mode registers the local exec-server with the executor registry,

View File

@@ -7,6 +7,8 @@ use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::RemoteExecServerConnectArgs;
@@ -53,18 +55,23 @@ impl ExecServerClient {
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
let websocket_url = args.websocket_url.clone();
let websocket_connect_url = websocket_connect_url(&websocket_url);
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
let (stream, _) = timeout(
connect_timeout,
connect_async(websocket_connect_url.as_str()),
)
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
Self::connect(
JsonRpcConnection::from_websocket(
@@ -117,6 +124,12 @@ impl ExecServerClient {
}
}
fn websocket_connect_url(websocket_url: &str) -> String {
websocket_url
.strip_prefix("ws+http://")
.map_or_else(|| websocket_url.to_string(), |url| format!("ws://{url}"))
}
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
let mut command = Command::new(&stdio_command.program);
command.args(&stdio_command.args);

View File

@@ -3,8 +3,12 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use axum::extract::ws::Message as AxumWebSocketMessage;
use axum::extract::ws::WebSocket as AxumWebSocket;
use codex_app_server_protocol::JSONRPCMessage;
use futures::Sink;
use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
@@ -309,11 +313,30 @@ impl JsonRpcConnection {
pub(crate) fn from_websocket<S>(stream: WebSocketStream<S>, connection_label: String) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (websocket_writer, websocket_reader) = stream.split();
Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label)
}
pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self {
let (websocket_writer, websocket_reader) = stream.split();
Self::from_websocket_parts(websocket_writer, websocket_reader, connection_label)
}
fn from_websocket_parts<W, R, M, E>(
mut websocket_writer: W,
mut websocket_reader: R,
connection_label: String,
) -> Self
where
W: Sink<M, Error = E> + Unpin + Send + 'static,
R: Stream<Item = Result<M, E>> + Unpin + Send + 'static,
M: JsonRpcWebSocketMessage,
E: std::fmt::Display + Send + 'static,
{
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let (mut websocket_writer, mut websocket_reader) = stream.split();
let reader_label = connection_label.clone();
let incoming_tx_for_reader = incoming_tx.clone();
@@ -321,61 +344,36 @@ impl JsonRpcConnection {
let reader_task = tokio::spawn(async move {
loop {
match websocket_reader.next().await {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
Some(Ok(message)) => match message.parse_jsonrpc_frame() {
Ok(JsonRpcWebSocketFrame::Message(message)) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
}
Some(Ok(Message::Binary(bytes))) => {
match serde_json::from_slice::<JSONRPCMessage>(bytes.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
}
Some(Ok(Message::Close(_))) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {}
Some(Ok(_)) => {}
Ok(JsonRpcWebSocketFrame::Close) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
Ok(JsonRpcWebSocketFrame::Ignore) => {}
},
Some(Err(err)) => {
send_disconnected(
&incoming_tx_for_reader,
@@ -404,8 +402,7 @@ impl JsonRpcConnection {
while let Some(message) = outgoing_rx.recv().await {
match serialize_jsonrpc_message(&message) {
Ok(encoded) => {
if let Err(err) = websocket_writer.send(Message::Text(encoded.into())).await
{
if let Err(err) = websocket_writer.send(M::from_text(encoded)).await {
send_disconnected(
&incoming_tx,
&disconnected_tx,
@@ -447,6 +444,53 @@ impl JsonRpcConnection {
}
}
enum JsonRpcWebSocketFrame {
Message(JSONRPCMessage),
Close,
Ignore,
}
trait JsonRpcWebSocketMessage: Send + 'static {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error>;
fn from_text(text: String) -> Self;
}
impl JsonRpcWebSocketMessage for Message {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error> {
match self {
Message::Text(text) => {
serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message)
}
Message::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
Ok(JsonRpcWebSocketFrame::Ignore)
}
}
}
fn from_text(text: String) -> Self {
Self::Text(text.into())
}
}
impl JsonRpcWebSocketMessage for AxumWebSocketMessage {
fn parse_jsonrpc_frame(self) -> Result<JsonRpcWebSocketFrame, serde_json::Error> {
match self {
AxumWebSocketMessage::Text(text) => {
serde_json::from_str(text.as_ref()).map(JsonRpcWebSocketFrame::Message)
}
AxumWebSocketMessage::Close(_) => Ok(JsonRpcWebSocketFrame::Close),
AxumWebSocketMessage::Binary(_)
| AxumWebSocketMessage::Ping(_)
| AxumWebSocketMessage::Pong(_) => Ok(JsonRpcWebSocketFrame::Ignore),
}
}
fn from_text(text: String) -> Self {
Self::Text(text.into())
}
}
async fn send_disconnected(
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
disconnected_tx: &watch::Sender<bool>,

View File

@@ -276,12 +276,15 @@ fn validate_websocket_url(url: String) -> Result<String, ExecServerError> {
"environment url cannot be empty".to_string(),
));
}
if !url.starts_with("ws://") && !url.starts_with("wss://") {
if !url.starts_with("ws://") && !url.starts_with("wss://") && !url.starts_with("ws+http://") {
return Err(ExecServerError::Protocol(format!(
"environment url `{url}` must use ws:// or wss://"
"environment url `{url}` must use ws://, wss://, or ws+http://"
)));
}
url.into_client_request().map_err(|err| {
let websocket_connect_url = url
.strip_prefix("ws+http://")
.map_or_else(|| url.to_string(), |url| format!("ws://{url}"));
websocket_connect_url.into_client_request().map_err(|err| {
ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}"))
})?;
Ok(url.to_string())
@@ -438,7 +441,7 @@ mod tests {
url: Some("http://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
"environment url `http://127.0.0.1:8765` must use ws://, wss://, or ws+http://",
),
(
EnvironmentToml {

View File

@@ -7,6 +7,8 @@ use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::warn;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::connection::JsonRpcConnection;
@@ -133,6 +135,7 @@ pub async fn run_remote_executor(
config: RemoteExecutorConfig,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), ExecServerError> {
ensure_rustls_crypto_provider();
let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let mut backoff = Duration::from_secs(1);

View File

@@ -1,3 +1,11 @@
use axum::Router;
use axum::extract::ConnectInfo;
use axum::extract::State;
use axum::extract::ws::WebSocketUpgrade;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::any;
use axum::routing::get;
use std::io::Write as _;
use std::net::SocketAddr;
use tokio::io;
@@ -5,6 +13,7 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::info;
use tracing::warn;
use crate::ExecServerRuntimePaths;
@@ -16,6 +25,7 @@ pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum ExecServerListenTransport {
WebSocket(SocketAddr),
HttpUpgradeWebSocket(SocketAddr),
Stdio,
}
@@ -30,11 +40,11 @@ impl std::fmt::Display for ExecServerListenUrlParseError {
match self {
ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT` or `stdio`"
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`"
),
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT` or `ws+http://IP:PORT`"
),
}
}
@@ -58,6 +68,15 @@ pub(crate) fn parse_listen_url(
});
}
if let Some(socket_addr) = listen_url.strip_prefix("ws+http://") {
return socket_addr
.parse::<SocketAddr>()
.map(ExecServerListenTransport::HttpUpgradeWebSocket)
.map_err(|_| {
ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string())
});
}
Err(ExecServerListenUrlParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
@@ -71,6 +90,9 @@ pub(crate) async fn run_transport(
ExecServerListenTransport::WebSocket(bind_address) => {
run_websocket_listener(bind_address, runtime_paths).await
}
ExecServerListenTransport::HttpUpgradeWebSocket(bind_address) => {
run_http_upgrade_websocket_listener(bind_address, runtime_paths).await
}
ExecServerListenTransport::Stdio => run_stdio_connection(runtime_paths).await,
}
}
@@ -109,7 +131,7 @@ async fn run_websocket_listener(
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
tracing::info!("codex-exec-server listening on ws://{local_addr}");
info!("codex-exec-server listening on ws://{local_addr}");
println!("ws://{local_addr}");
std::io::stdout().flush()?;
@@ -136,6 +158,55 @@ async fn run_websocket_listener(
}
}
async fn run_http_upgrade_websocket_listener(
bind_address: SocketAddr,
runtime_paths: ExecServerRuntimePaths,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
info!("codex-exec-server listening on ws+http://{local_addr}");
println!("ws+http://{local_addr}");
std::io::stdout().flush()?;
let router = Router::new()
.route("/", any(websocket_upgrade_handler))
.route("/readyz", get(readiness_handler))
.with_state(ExecServerWebSocketState { processor });
axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Ok(())
}
#[derive(Clone)]
struct ExecServerWebSocketState {
processor: ConnectionProcessor,
}
async fn readiness_handler() -> StatusCode {
StatusCode::OK
}
async fn websocket_upgrade_handler(
websocket: WebSocketUpgrade,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
State(state): State<ExecServerWebSocketState>,
) -> impl IntoResponse {
info!(%peer_addr, "exec-server HTTP-upgrade websocket client connected");
websocket.on_upgrade(move |stream| async move {
state
.processor
.run_connection(JsonRpcConnection::from_axum_websocket(
stream,
format!("exec-server HTTP-upgrade websocket {peer_addr}"),
))
.await;
})
}
#[cfg(test)]
#[path = "transport_tests.rs"]
mod transport_tests;

View File

@@ -131,7 +131,7 @@ fn parse_listen_url_rejects_invalid_websocket_url() {
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT` or `ws+http://IP:PORT`"
);
}
@@ -141,7 +141,7 @@ fn parse_listen_url_rejects_unsupported_url() {
parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT` or `stdio`"
"unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`, `ws+http://IP:PORT`, or `stdio`"
);
}