Compare commits

...

42 Commits

Author SHA1 Message Date
Ahmed Ibrahim
e60c8f66b1 chore: ci green proof tail 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 16:15:16 -08:00
Ahmed Ibrahim
7fd5979333 chore: ci green proof tail 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 16:15:13 -08:00
Ahmed Ibrahim
aa1654df96 Log non-audio realtime events 2026-03-04 16:14:35 -08:00
Ahmed Ibrahim
4d21c1f5b8 chore: ci green proof after revert 4
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 15:55:11 -08:00
Ahmed Ibrahim
fc48f4ddc0 chore: ci green proof after revert 3
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 15:55:08 -08:00
Ahmed Ibrahim
f1660ff3a8 chore: ci green proof after revert 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 15:55:06 -08:00
Ahmed Ibrahim
0ab5ac94ff chore: ci green proof after revert 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 15:55:01 -08:00
Ahmed Ibrahim
56cd0a699d Revert "Add richer logging for paste and event tracing"
This reverts commit b747312f7a.
2026-03-04 15:38:56 -08:00
Ahmed Ibrahim
b747312f7a Add richer logging for paste and event tracing 2026-03-04 14:35:04 -08:00
Ahmed Ibrahim
7374a805fb chore: ci green proof run 4
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:31:54 -08:00
Ahmed Ibrahim
861457fdc2 chore: ci green proof run 3
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:31:52 -08:00
Ahmed Ibrahim
dbd356b39b chore: ci green proof run 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:31:48 -08:00
Ahmed Ibrahim
4815a04306 chore: ci green proof run 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:31:45 -08:00
Ahmed Ibrahim
cbedebdad5 fix: avoid stdio-to-uds socket read deadlock in test
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:17:04 -08:00
Ahmed Ibrahim
fd620b4fbb chore: data-point commit 6
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:23 -08:00
Ahmed Ibrahim
3fc06fde14 chore: data-point commit 5
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:19 -08:00
Ahmed Ibrahim
2c84206154 chore: data-point commit 4
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:16 -08:00
Ahmed Ibrahim
8bcf6897ff chore: data-point commit 3
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:12 -08:00
Ahmed Ibrahim
de5cafaf43 chore: data-point commit 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:08 -08:00
Ahmed Ibrahim
2378388ed6 chore: data-point commit 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 14:01:04 -08:00
Ahmed Ibrahim
9622840751 chore: retrigger ci after cancelled run
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:58:28 -08:00
Ahmed Ibrahim
8729c59558 chore: ci probe marker remove 10
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:11 -08:00
Ahmed Ibrahim
549dd4da66 chore: ci probe marker add 10
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
72ab1249e3 chore: ci probe marker remove 9
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
debd2ff239 chore: ci probe marker add 9
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
0fce5b72e4 chore: ci probe marker remove 8
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
0725aac484 chore: ci probe marker add 8
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
0444499d4a chore: ci probe marker remove 7
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
181c98ca0c chore: ci probe marker add 7
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
31b629dd4b chore: ci probe marker remove 6
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
86cebd5d0e chore: ci probe marker add 6
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
e5552599ec chore: ci probe marker remove 5
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
7e37683e85 chore: ci probe marker add 5
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
3edde7e2ed chore: ci probe marker remove 4
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
782b021c07 chore: ci probe marker add 4
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
52e33b41b3 chore: ci probe marker remove 3
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:10 -08:00
Ahmed Ibrahim
a19b30e5c0 chore: ci probe marker add 3
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
Ahmed Ibrahim
b5c275be3c chore: ci probe marker remove 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
Ahmed Ibrahim
b94ac7b982 chore: ci probe marker add 2
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
Ahmed Ibrahim
344bf7e31b chore: ci probe marker remove 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
Ahmed Ibrahim
f9581760b8 chore: ci probe marker add 1
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
Ahmed Ibrahim
2354f861e7 Fix flaky app-server/core tests and remove temporary logging
Co-authored-by: Codex <noreply@openai.com>
2026-03-04 13:55:09 -08:00
13 changed files with 91 additions and 40 deletions

View File

@@ -188,8 +188,9 @@ pub(crate) async fn apply_bespoke_event_handling(
} = event;
match msg {
EventMsg::TurnStarted(payload) => {
// While not technically necessary as it was already done on TurnComplete, be extra cautios and abort any pending server requests.
outgoing.abort_pending_server_requests().await;
// Do not abort pending server requests here. In practice, approval requests can be
// emitted very close to turn-start handling and event ordering may vary by platform.
// Aborting on TurnStarted can race with those approval requests and drop callbacks.
thread_watch_manager
.note_turn_started(&conversation_id.to_string())
.await;
@@ -213,6 +214,9 @@ pub(crate) async fn apply_bespoke_event_handling(
}
}
EventMsg::TurnComplete(_ev) => {
if should_ignore_turn_complete(&event_turn_id, &thread_state).await {
return;
}
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
@@ -1423,6 +1427,7 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing.abort_pending_server_requests().await;
let pending = {
let mut state = thread_state.lock().await;
state.interrupted_turn_ids.insert(event_turn_id.clone());
std::mem::take(&mut state.pending_interrupts)
};
if !pending.is_empty() {
@@ -1716,6 +1721,14 @@ async fn find_and_remove_turn_summary(
std::mem::take(&mut state.turn_summary)
}
async fn should_ignore_turn_complete(
event_turn_id: &str,
thread_state: &Arc<Mutex<ThreadState>>,
) -> bool {
let mut state = thread_state.lock().await;
state.interrupted_turn_ids.remove(event_turn_id)
}
async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
@@ -2493,6 +2506,20 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_should_ignore_turn_complete_for_interrupted_turn() -> Result<()> {
let thread_state = new_thread_state();
let turn_id = "interrupt_then_complete".to_string();
{
let mut state = thread_state.lock().await;
state.interrupted_turn_ids.insert(turn_id.clone());
}
assert!(should_ignore_turn_complete(&turn_id, &thread_state).await);
assert!(!should_ignore_turn_complete(&turn_id, &thread_state).await);
Ok(())
}
#[tokio::test]
async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> {
let conversation_id = ThreadId::new();

View File

@@ -51,6 +51,7 @@ pub(crate) struct TurnSummary {
#[derive(Default)]
pub(crate) struct ThreadState {
pub(crate) pending_interrupts: PendingInterruptQueue,
pub(crate) interrupted_turn_ids: HashSet<String>,
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
pub(crate) turn_summary: TurnSummary,
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,

View File

@@ -101,7 +101,7 @@ impl McpProcess {
cmd.stderr(Stdio::piped());
cmd.current_dir(codex_home);
cmd.env("CODEX_HOME", codex_home);
cmd.env("RUST_LOG", "info");
cmd.env("RUST_LOG", "warn");
cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR);
for (k, v) in env_overrides {
@@ -795,7 +795,6 @@ impl McpProcess {
}
async fn send_jsonrpc_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
eprintln!("writing message to stdin: {message:?}");
let Some(stdin) = self.stdin.as_mut() else {
anyhow::bail!("mcp stdin closed");
};
@@ -810,13 +809,10 @@ impl McpProcess {
let mut line = String::new();
self.stdout.read_line(&mut line).await?;
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
eprintln!("read message from stdout: {message:?}");
Ok(message)
}
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<ServerRequest> {
eprintln!("in read_stream_until_request_message()");
let message = self
.read_stream_until_message(|message| matches!(message, JSONRPCMessage::Request(_)))
.await?;
@@ -833,8 +829,6 @@ impl McpProcess {
&mut self,
request_id: RequestId,
) -> anyhow::Result<JSONRPCResponse> {
eprintln!("in read_stream_until_response_message({request_id:?})");
let message = self
.read_stream_until_message(|message| {
Self::message_request_id(message) == Some(&request_id)
@@ -867,8 +861,6 @@ impl McpProcess {
&mut self,
method: &str,
) -> anyhow::Result<JSONRPCNotification> {
eprintln!("in read_stream_until_notification_message({method})");
let message = self
.read_stream_until_message(|message| {
matches!(

View File

@@ -18,11 +18,11 @@ use core_test_support::fs_wait;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const NOTIFY_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
#[tokio::test]
async fn initialize_uses_client_info_name_as_originator() -> Result<()> {
@@ -261,7 +261,7 @@ tmp_path.replace(payload_path)
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
fs_wait::wait_for_path_exists(&notify_file, NOTIFY_FILE_TIMEOUT).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
assert_eq!(payload["client"], "xcode");

View File

@@ -21,6 +21,7 @@ use codex_protocol::openai_models::ReasoningEffort;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const STARTUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn request_user_input_round_trip() -> Result<()> {
@@ -33,7 +34,7 @@ async fn request_user_input_round_trip() -> Result<()> {
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {

View File

@@ -31,6 +31,7 @@ use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const STARTUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
@@ -232,7 +233,7 @@ async fn review_start_rejects_empty_base_branch() -> Result<()> {
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_default_thread(&mut mcp).await?;
let request_id = mcp
@@ -340,7 +341,7 @@ async fn review_start_rejects_empty_commit_sha() -> Result<()> {
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_default_thread(&mut mcp).await?;
let request_id = mcp

View File

@@ -3,6 +3,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
@@ -19,6 +20,7 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -139,12 +141,19 @@ async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep_approval",
)?])
let no_op_response = responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_completed("resp-2"),
]);
let server = create_mock_responses_server_sequence_unchecked(vec![
create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep_approval",
)?,
no_op_response,
])
.await;
create_config_toml(&codex_home, &server.uri(), "untrusted")?;

View File

@@ -1423,11 +1423,13 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
collaboration_mode: None,
})
.await?;
timeout(
let second_turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_turn)),
)
.await??;
let TurnStartResponse { turn, .. } = to_response::<TurnStartResponse>(second_turn_resp)?;
let second_turn_id = turn.id;
let command_exec_item = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
@@ -1440,7 +1442,10 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
.expect("item/started params");
let item_started: ItemStartedNotification =
serde_json::from_value(params).expect("deserialize item/started notification");
if matches!(item_started.item, ThreadItem::CommandExecution { .. }) {
if item_started.turn_id == second_turn_id
&& let ThreadItem::CommandExecution { id, .. } = &item_started.item
&& id == "call-second"
{
return Ok::<ThreadItem, anyhow::Error>(item_started.item);
}
}

View File

@@ -547,11 +547,17 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
let second_file_str = second_file.to_string_lossy().into_owned();
let parent_shell_hint = format!("&& {}", &first_file_str);
while target_decision_index < target_decisions.len() || !saw_parent_approval {
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let wait_timeout = if target_decision_index < target_decisions.len() {
DEFAULT_READ_TIMEOUT
} else {
std::time::Duration::from_secs(2)
};
let server_req = match timeout(wait_timeout, mcp.read_stream_until_request_message()).await
{
Ok(server_req) => server_req?,
Err(_) if target_decision_index >= target_decisions.len() => break,
Err(error) => return Err(error.into()),
};
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req
else {
panic!("expected CommandExecutionRequestApproval request");

View File

@@ -326,7 +326,13 @@ pub(crate) async fn handle_start(
msg,
};
while let Ok(event) = events_rx.recv().await {
debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event");
// if not audio out, log the event
if !matches!(event, RealtimeEvent::AudioOut(_)) {
info!(
event = ?event,
"received realtime conversation event"
);
}
let maybe_routed_text = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
realtime_text_from_handoff_request(handoff)

View File

@@ -653,14 +653,17 @@ async fn expect_patch_approval(
}
async fn wait_for_completion_without_approval(test: &TestCodex) {
let event = wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
)
})
let event = wait_for_event_with_timeout(
&test.codex,
|event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
)
},
std::time::Duration::from_secs(10),
)
.await;
match event {
EventMsg::TurnComplete(_) => {}
EventMsg::ExecApprovalRequest(event) => {

View File

@@ -841,7 +841,7 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "perl -e 'print \"1\" x 10001'",
"timeout_ms": 1000,
"timeout_ms": 5000,
});
let responses = vec![
sse(vec![

View File

@@ -35,9 +35,9 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
let (mut connection, _) = listener
.accept()
.context("failed to accept test connection")?;
let mut received = Vec::new();
let mut received = vec![0_u8; b"request".len()];
connection
.read_to_end(&mut received)
.read_exact(&mut received)
.context("failed to read data from client")?;
tx.send(received)
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;