mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
Merge branch 'main' into dev/ws-terminal-error-split
This commit is contained in:
@@ -8,6 +8,10 @@ license.workspace = true
|
||||
name = "codex-app-server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "codex-app-server-test-notify-capture"
|
||||
path = "src/bin/notify_capture.rs"
|
||||
|
||||
[lib]
|
||||
name = "codex_app_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let mut args = env::args_os();
|
||||
let _program = args.next();
|
||||
let output_path = PathBuf::from(
|
||||
args.next()
|
||||
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
|
||||
);
|
||||
let payload = args
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
|
||||
|
||||
if args.next().is_some() {
|
||||
bail!("expected payload as final argument");
|
||||
}
|
||||
|
||||
let payload = payload.to_string_lossy();
|
||||
let temp_path = PathBuf::from(format!("{}.tmp", output_path.display()));
|
||||
let mut file = File::create(&temp_path)
|
||||
.with_context(|| format!("failed to create {}", temp_path.display()))?;
|
||||
file.write_all(payload.as_bytes())
|
||||
.with_context(|| format!("failed to write {}", temp_path.display()))?;
|
||||
file.sync_all()
|
||||
.with_context(|| format!("failed to sync {}", temp_path.display()))?;
|
||||
fs::rename(&temp_path, &output_path).with_context(|| {
|
||||
format!(
|
||||
"failed to move {} into {}",
|
||||
temp_path.display(),
|
||||
output_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -29,7 +29,6 @@ use super::connection_handling_websocket::assert_no_message;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_jsonrpc_message;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -712,8 +711,7 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Duration;
|
||||
@@ -40,8 +41,7 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
@@ -86,8 +86,7 @@ async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Resul
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let readyz = http_get(&client, bind_addr, "/readyz").await?;
|
||||
@@ -108,15 +107,12 @@ async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn spawn_websocket_server(
|
||||
codex_home: &Path,
|
||||
bind_addr: SocketAddr,
|
||||
) -> Result<Child> {
|
||||
pub(super) async fn spawn_websocket_server(codex_home: &Path) -> Result<(Child, SocketAddr)> {
|
||||
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
|
||||
.context("should find app-server binary")?;
|
||||
let mut cmd = Command::new(program);
|
||||
cmd.arg("--listen")
|
||||
.arg(format!("ws://{bind_addr}"))
|
||||
.arg("ws://127.0.0.1:0")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
@@ -127,23 +123,57 @@ pub(super) async fn spawn_websocket_server(
|
||||
.spawn()
|
||||
.context("failed to spawn websocket app-server process")?;
|
||||
|
||||
if let Some(stderr) = process.stderr.take() {
|
||||
let mut stderr_reader = tokio::io::BufReader::new(stderr).lines();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
let stderr = process
|
||||
.stderr
|
||||
.take()
|
||||
.context("failed to capture websocket app-server stderr")?;
|
||||
let mut stderr_reader = BufReader::new(stderr).lines();
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
let bind_addr = loop {
|
||||
let line = timeout(
|
||||
deadline.saturating_duration_since(Instant::now()),
|
||||
stderr_reader.next_line(),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for websocket app-server to report bound websocket address")?
|
||||
.context("failed to read websocket app-server stderr")?
|
||||
.context("websocket app-server exited before reporting bound websocket address")?;
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
|
||||
let stripped_line = {
|
||||
let mut stripped = String::with_capacity(line.len());
|
||||
let mut chars = line.chars().peekable();
|
||||
while let Some(ch) = chars.next() {
|
||||
if ch == '\u{1b}' && matches!(chars.peek(), Some(&'[')) {
|
||||
chars.next();
|
||||
for next in chars.by_ref() {
|
||||
if ('@'..='~').contains(&next) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
stripped.push(ch);
|
||||
}
|
||||
});
|
||||
}
|
||||
stripped
|
||||
};
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
if let Some(bind_addr) = stripped_line
|
||||
.split_whitespace()
|
||||
.find_map(|token| token.strip_prefix("ws://"))
|
||||
.and_then(|addr| addr.parse::<SocketAddr>().ok())
|
||||
{
|
||||
break bind_addr;
|
||||
}
|
||||
};
|
||||
|
||||
pub(super) fn reserve_local_addr() -> Result<SocketAddr> {
|
||||
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
drop(listener);
|
||||
Ok(addr)
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok((process, bind_addr))
|
||||
}
|
||||
|
||||
pub(super) async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient> {
|
||||
|
||||
@@ -3,7 +3,6 @@ use super::connection_handling_websocket::WsClient;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -154,8 +153,7 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let mut ws = connect_websocket(bind_addr).await?;
|
||||
|
||||
send_initialize_request(&mut ws, 1, "ws_graceful_shutdown").await?;
|
||||
|
||||
@@ -193,13 +193,13 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
let notify_file = codex_home.path().join("notify.json");
|
||||
let notify_capture = cargo_bin("test_notify_capture")?;
|
||||
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
|
||||
let notify_capture = notify_capture
|
||||
.to_str()
|
||||
.expect("notify capture path should be valid UTF-8");
|
||||
let notify_file = notify_file
|
||||
let notify_file_str = notify_file
|
||||
.to_str()
|
||||
.expect("notify output path should be valid UTF-8");
|
||||
.expect("notify file path should be valid UTF-8");
|
||||
create_config_toml_with_extra(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
@@ -207,7 +207,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
&format!(
|
||||
"notify = [{}, {}]",
|
||||
toml_basic_string(notify_capture),
|
||||
toml_basic_string(notify_file)
|
||||
toml_basic_string(notify_file_str)
|
||||
),
|
||||
)?;
|
||||
|
||||
@@ -255,9 +255,8 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
)
|
||||
.await??;
|
||||
|
||||
let notify_file = Path::new(notify_file);
|
||||
fs_wait::wait_for_path_exists(notify_file, Duration::from_secs(5)).await?;
|
||||
let payload_raw = tokio::fs::read_to_string(notify_file).await?;
|
||||
fs_wait::wait_for_path_exists(¬ify_file, Duration::from_secs(5)).await?;
|
||||
let payload_raw = tokio::fs::read_to_string(¬ify_file).await?;
|
||||
let payload: Value = serde_json::from_str(&payload_raw)?;
|
||||
assert_eq!(payload["client"], "xcode");
|
||||
|
||||
@@ -292,6 +291,9 @@ model_provider = "mock_provider"
|
||||
|
||||
{extra}
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
|
||||
@@ -6,7 +6,6 @@ use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_and_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -34,8 +33,7 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
@@ -96,8 +94,7 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
|
||||
@@ -35,6 +35,51 @@ use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
async fn wait_for_responses_request_count_to_stabilize(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
settle_duration: std::time::Duration,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
let mut stable_since: Option<tokio::time::Instant> = None;
|
||||
loop {
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.context("failed to fetch received requests")?;
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
|
||||
if responses_request_count == expected_count {
|
||||
match stable_since {
|
||||
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
None => stable_since = Some(tokio::time::Instant::now()),
|
||||
Some(_) => {}
|
||||
}
|
||||
} else {
|
||||
stable_since = None;
|
||||
}
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -172,6 +217,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
|
||||
};
|
||||
assert_eq!(payload.thread_id, thread_id);
|
||||
|
||||
wait_for_responses_request_count_to_stabilize(
|
||||
&server,
|
||||
1,
|
||||
std::time::Duration::from_millis(200),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user