mirror of
https://github.com/openai/codex.git
synced 2026-04-08 00:21:43 +03:00
Compare commits
42 Commits
dev/window
...
fix-flaky-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e60c8f66b1 | ||
|
|
7fd5979333 | ||
|
|
aa1654df96 | ||
|
|
4d21c1f5b8 | ||
|
|
fc48f4ddc0 | ||
|
|
f1660ff3a8 | ||
|
|
0ab5ac94ff | ||
|
|
56cd0a699d | ||
|
|
b747312f7a | ||
|
|
7374a805fb | ||
|
|
861457fdc2 | ||
|
|
dbd356b39b | ||
|
|
4815a04306 | ||
|
|
cbedebdad5 | ||
|
|
fd620b4fbb | ||
|
|
3fc06fde14 | ||
|
|
2c84206154 | ||
|
|
8bcf6897ff | ||
|
|
de5cafaf43 | ||
|
|
2378388ed6 | ||
|
|
9622840751 | ||
|
|
8729c59558 | ||
|
|
549dd4da66 | ||
|
|
72ab1249e3 | ||
|
|
debd2ff239 | ||
|
|
0fce5b72e4 | ||
|
|
0725aac484 | ||
|
|
0444499d4a | ||
|
|
181c98ca0c | ||
|
|
31b629dd4b | ||
|
|
86cebd5d0e | ||
|
|
e5552599ec | ||
|
|
7e37683e85 | ||
|
|
3edde7e2ed | ||
|
|
782b021c07 | ||
|
|
52e33b41b3 | ||
|
|
a19b30e5c0 | ||
|
|
b5c275be3c | ||
|
|
b94ac7b982 | ||
|
|
344bf7e31b | ||
|
|
f9581760b8 | ||
|
|
2354f861e7 |
@@ -188,8 +188,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
} = event;
|
||||
match msg {
|
||||
EventMsg::TurnStarted(payload) => {
|
||||
// While not technically necessary as it was already done on TurnComplete, be extra cautios and abort any pending server requests.
|
||||
outgoing.abort_pending_server_requests().await;
|
||||
// Do not abort pending server requests here. In practice, approval requests can be
|
||||
// emitted very close to turn-start handling and event ordering may vary by platform.
|
||||
// Aborting on TurnStarted can race with those approval requests and drop callbacks.
|
||||
thread_watch_manager
|
||||
.note_turn_started(&conversation_id.to_string())
|
||||
.await;
|
||||
@@ -213,6 +214,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
}
|
||||
}
|
||||
EventMsg::TurnComplete(_ev) => {
|
||||
if should_ignore_turn_complete(&event_turn_id, &thread_state).await {
|
||||
return;
|
||||
}
|
||||
// All per-thread requests are bound to a turn, so abort them.
|
||||
outgoing.abort_pending_server_requests().await;
|
||||
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
|
||||
@@ -1423,6 +1427,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
outgoing.abort_pending_server_requests().await;
|
||||
let pending = {
|
||||
let mut state = thread_state.lock().await;
|
||||
state.interrupted_turn_ids.insert(event_turn_id.clone());
|
||||
std::mem::take(&mut state.pending_interrupts)
|
||||
};
|
||||
if !pending.is_empty() {
|
||||
@@ -1716,6 +1721,14 @@ async fn find_and_remove_turn_summary(
|
||||
std::mem::take(&mut state.turn_summary)
|
||||
}
|
||||
|
||||
async fn should_ignore_turn_complete(
|
||||
event_turn_id: &str,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
) -> bool {
|
||||
let mut state = thread_state.lock().await;
|
||||
state.interrupted_turn_ids.remove(event_turn_id)
|
||||
}
|
||||
|
||||
async fn handle_turn_complete(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
@@ -2493,6 +2506,20 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_should_ignore_turn_complete_for_interrupted_turn() -> Result<()> {
|
||||
let thread_state = new_thread_state();
|
||||
let turn_id = "interrupt_then_complete".to_string();
|
||||
{
|
||||
let mut state = thread_state.lock().await;
|
||||
state.interrupted_turn_ids.insert(turn_id.clone());
|
||||
}
|
||||
|
||||
assert!(should_ignore_turn_complete(&turn_id, &thread_state).await);
|
||||
assert!(!should_ignore_turn_complete(&turn_id, &thread_state).await);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> {
|
||||
let conversation_id = ThreadId::new();
|
||||
|
||||
@@ -51,6 +51,7 @@ pub(crate) struct TurnSummary {
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ThreadState {
|
||||
pub(crate) pending_interrupts: PendingInterruptQueue,
|
||||
pub(crate) interrupted_turn_ids: HashSet<String>,
|
||||
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
|
||||
pub(crate) turn_summary: TurnSummary,
|
||||
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,
|
||||
|
||||
@@ -101,7 +101,7 @@ impl McpProcess {
|
||||
cmd.stderr(Stdio::piped());
|
||||
cmd.current_dir(codex_home);
|
||||
cmd.env("CODEX_HOME", codex_home);
|
||||
cmd.env("RUST_LOG", "info");
|
||||
cmd.env("RUST_LOG", "warn");
|
||||
cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR);
|
||||
|
||||
for (k, v) in env_overrides {
|
||||
@@ -795,7 +795,6 @@ impl McpProcess {
|
||||
}
|
||||
|
||||
async fn send_jsonrpc_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
|
||||
eprintln!("writing message to stdin: {message:?}");
|
||||
let Some(stdin) = self.stdin.as_mut() else {
|
||||
anyhow::bail!("mcp stdin closed");
|
||||
};
|
||||
@@ -810,13 +809,10 @@ impl McpProcess {
|
||||
let mut line = String::new();
|
||||
self.stdout.read_line(&mut line).await?;
|
||||
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
|
||||
eprintln!("read message from stdout: {message:?}");
|
||||
Ok(message)
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<ServerRequest> {
|
||||
eprintln!("in read_stream_until_request_message()");
|
||||
|
||||
let message = self
|
||||
.read_stream_until_message(|message| matches!(message, JSONRPCMessage::Request(_)))
|
||||
.await?;
|
||||
@@ -833,8 +829,6 @@ impl McpProcess {
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
) -> anyhow::Result<JSONRPCResponse> {
|
||||
eprintln!("in read_stream_until_response_message({request_id:?})");
|
||||
|
||||
let message = self
|
||||
.read_stream_until_message(|message| {
|
||||
Self::message_request_id(message) == Some(&request_id)
|
||||
@@ -867,8 +861,6 @@ impl McpProcess {
|
||||
&mut self,
|
||||
method: &str,
|
||||
) -> anyhow::Result<JSONRPCNotification> {
|
||||
eprintln!("in read_stream_until_notification_message({method})");
|
||||
|
||||
let message = self
|
||||
.read_stream_until_message(|message| {
|
||||
matches!(
|
||||
|
||||
@@ -18,11 +18,11 @@ use core_test_support::fs_wait;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const NOTIFY_FILE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_uses_client_info_name_as_originator() -> Result<()> {
|
||||
@@ -261,7 +261,7 @@ tmp_path.replace(payload_path)
|
||||
)
|
||||
.await??;
|
||||
|
||||
fs_wait::wait_for_path_exists(¬ify_file, Duration::from_secs(5)).await?;
|
||||
fs_wait::wait_for_path_exists(¬ify_file, NOTIFY_FILE_TIMEOUT).await?;
|
||||
let payload_raw = tokio::fs::read_to_string(¬ify_file).await?;
|
||||
let payload: Value = serde_json::from_str(&payload_raw)?;
|
||||
assert_eq!(payload["client"], "xcode");
|
||||
|
||||
@@ -21,6 +21,7 @@ use codex_protocol::openai_models::ReasoningEffort;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const STARTUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn request_user_input_round_trip() -> Result<()> {
|
||||
@@ -33,7 +34,7 @@ async fn request_user_input_round_trip() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
|
||||
@@ -31,6 +31,7 @@ use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const STARTUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -232,7 +233,7 @@ async fn review_start_rejects_empty_base_branch() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
|
||||
let thread_id = start_default_thread(&mut mcp).await?;
|
||||
|
||||
let request_id = mcp
|
||||
@@ -340,7 +341,7 @@ async fn review_start_rejects_empty_commit_sha() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
timeout(STARTUP_TIMEOUT, mcp.initialize()).await??;
|
||||
let thread_id = start_default_thread(&mut mcp).await?;
|
||||
|
||||
let request_id = mcp
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
@@ -19,6 +20,7 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use core_test_support::responses;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -139,12 +141,19 @@ async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()
|
||||
let working_directory = tmp.path().join("workdir");
|
||||
std::fs::create_dir(&working_directory)?;
|
||||
|
||||
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000),
|
||||
"call_sleep_approval",
|
||||
)?])
|
||||
let no_op_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]);
|
||||
let server = create_mock_responses_server_sequence_unchecked(vec![
|
||||
create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000),
|
||||
"call_sleep_approval",
|
||||
)?,
|
||||
no_op_response,
|
||||
])
|
||||
.await;
|
||||
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
||||
|
||||
|
||||
@@ -1423,11 +1423,13 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
collaboration_mode: None,
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
let second_turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(second_turn)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn, .. } = to_response::<TurnStartResponse>(second_turn_resp)?;
|
||||
let second_turn_id = turn.id;
|
||||
|
||||
let command_exec_item = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
@@ -1440,7 +1442,10 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
.expect("item/started params");
|
||||
let item_started: ItemStartedNotification =
|
||||
serde_json::from_value(params).expect("deserialize item/started notification");
|
||||
if matches!(item_started.item, ThreadItem::CommandExecution { .. }) {
|
||||
if item_started.turn_id == second_turn_id
|
||||
&& let ThreadItem::CommandExecution { id, .. } = &item_started.item
|
||||
&& id == "call-second"
|
||||
{
|
||||
return Ok::<ThreadItem, anyhow::Error>(item_started.item);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -547,11 +547,17 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
let second_file_str = second_file.to_string_lossy().into_owned();
|
||||
let parent_shell_hint = format!("&& {}", &first_file_str);
|
||||
while target_decision_index < target_decisions.len() || !saw_parent_approval {
|
||||
let server_req = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_request_message(),
|
||||
)
|
||||
.await??;
|
||||
let wait_timeout = if target_decision_index < target_decisions.len() {
|
||||
DEFAULT_READ_TIMEOUT
|
||||
} else {
|
||||
std::time::Duration::from_secs(2)
|
||||
};
|
||||
let server_req = match timeout(wait_timeout, mcp.read_stream_until_request_message()).await
|
||||
{
|
||||
Ok(server_req) => server_req?,
|
||||
Err(_) if target_decision_index >= target_decisions.len() => break,
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req
|
||||
else {
|
||||
panic!("expected CommandExecutionRequestApproval request");
|
||||
|
||||
@@ -326,7 +326,13 @@ pub(crate) async fn handle_start(
|
||||
msg,
|
||||
};
|
||||
while let Ok(event) = events_rx.recv().await {
|
||||
debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event");
|
||||
// if not audio out, log the event
|
||||
if !matches!(event, RealtimeEvent::AudioOut(_)) {
|
||||
info!(
|
||||
event = ?event,
|
||||
"received realtime conversation event"
|
||||
);
|
||||
}
|
||||
let maybe_routed_text = match &event {
|
||||
RealtimeEvent::HandoffRequested(handoff) => {
|
||||
realtime_text_from_handoff_request(handoff)
|
||||
|
||||
@@ -653,14 +653,17 @@ async fn expect_patch_approval(
|
||||
}
|
||||
|
||||
async fn wait_for_completion_without_approval(test: &TestCodex) {
|
||||
let event = wait_for_event(&test.codex, |event| {
|
||||
matches!(
|
||||
event,
|
||||
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
|
||||
)
|
||||
})
|
||||
let event = wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| {
|
||||
matches!(
|
||||
event,
|
||||
EventMsg::ExecApprovalRequest(_) | EventMsg::TurnComplete(_)
|
||||
)
|
||||
},
|
||||
std::time::Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
|
||||
match event {
|
||||
EventMsg::TurnComplete(_) => {}
|
||||
EventMsg::ExecApprovalRequest(event) => {
|
||||
|
||||
@@ -841,7 +841,7 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "perl -e 'print \"1\" x 10001'",
|
||||
"timeout_ms": 1000,
|
||||
"timeout_ms": 5000,
|
||||
});
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
|
||||
@@ -35,9 +35,9 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
let (mut connection, _) = listener
|
||||
.accept()
|
||||
.context("failed to accept test connection")?;
|
||||
let mut received = Vec::new();
|
||||
let mut received = vec![0_u8; b"request".len()];
|
||||
connection
|
||||
.read_to_end(&mut received)
|
||||
.read_exact(&mut received)
|
||||
.context("failed to read data from client")?;
|
||||
tx.send(received)
|
||||
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
|
||||
|
||||
Reference in New Issue
Block a user