mirror of
https://github.com/openai/codex.git
synced 2026-05-01 11:52:10 +03:00
tests: centralize in-flight turn cleanup helper (#12271)
## Why Several tests intentionally exercise behavior while a turn is still active. The cleanup sequence for those tests (`turn/interrupt` + waiting for `codex/event/turn_aborted`) was duplicated across files, which made the rationale easy to lose and the pattern easy to apply inconsistently. This change centralizes that cleanup in one place with a single explanatory doc comment. ## What Changed ### Added shared helper In `codex-rs/app-server/tests/common/mcp_process.rs`: - Added `McpProcess::interrupt_turn_and_wait_for_aborted(...)`. - Added a doc comment explaining why explicit interrupt + terminal wait is required for tests that intentionally leave a turn in-flight. ### Migrated call sites Replaced duplicated interrupt/aborted blocks with the helper in: - `codex-rs/app-server/tests/suite/v2/thread_resume.rs` - `thread_resume_rejects_history_when_thread_is_running` - `thread_resume_rejects_mismatched_path_when_thread_is_running` - `codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs` - `turn_start_shell_zsh_fork_executes_command_v2` - `turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2` - `codex-rs/app-server/tests/suite/v2/turn_steer.rs` - `turn_steer_returns_active_turn_id` ### Existing cleanup retained In `codex-rs/app-server/tests/suite/v2/turn_start.rs`: - `turn_start_accepts_local_image_input` continues to explicitly wait for `turn/completed` so the turn lifecycle is fully drained before test exit. ## Verification - `cargo test -p codex-app-server`
This commit is contained in:
@@ -61,6 +61,7 @@ use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnInterruptParams;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnSteerParams;
|
||||
@@ -572,6 +573,63 @@ impl McpProcess {
|
||||
self.send_request("turn/interrupt", params).await
|
||||
}
|
||||
|
||||
/// Deterministically clean up an intentionally in-flight turn.
|
||||
///
|
||||
/// Some tests assert behavior while a turn is still running. Returning from those tests
|
||||
/// without an explicit interrupt + `codex/event/turn_aborted` wait can leave in-flight work
|
||||
/// racing teardown and intermittently show up as `LEAK` in nextest.
|
||||
///
|
||||
/// In rare races, the turn can also fail or complete on its own after we send
|
||||
/// `turn/interrupt` but before the server emits the interrupt response. The helper treats a
|
||||
/// buffered matching `turn/completed` notification as sufficient terminal cleanup in that
|
||||
/// case so teardown does not flap on timing.
|
||||
pub async fn interrupt_turn_and_wait_for_aborted(
|
||||
&mut self,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
read_timeout: std::time::Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let interrupt_request_id = self
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id: thread_id.clone(),
|
||||
turn_id: turn_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
match tokio::time::timeout(
|
||||
read_timeout,
|
||||
self.read_stream_until_response_message(RequestId::Integer(interrupt_request_id)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
result.with_context(|| "failed while waiting for turn interrupt response")?;
|
||||
}
|
||||
Err(err) => {
|
||||
if self.pending_turn_completed_notification(&thread_id, &turn_id) {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(err).with_context(|| "timed out waiting for turn interrupt response");
|
||||
}
|
||||
}
|
||||
match tokio::time::timeout(
|
||||
read_timeout,
|
||||
self.read_stream_until_notification_message("codex/event/turn_aborted"),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
result.with_context(|| "failed while waiting for turn aborted notification")?;
|
||||
}
|
||||
Err(err) => {
|
||||
if self.pending_turn_completed_notification(&thread_id, &turn_id) {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(err).with_context(|| "timed out waiting for turn aborted notification");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a `turn/steer` JSON-RPC request (v2).
|
||||
pub async fn send_turn_steer_request(
|
||||
&mut self,
|
||||
@@ -940,6 +998,25 @@ impl McpProcess {
|
||||
None
|
||||
}
|
||||
|
||||
fn pending_turn_completed_notification(&self, thread_id: &str, turn_id: &str) -> bool {
|
||||
self.pending_messages.iter().any(|message| {
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
return false;
|
||||
};
|
||||
if notification.method != "turn/completed" {
|
||||
return false;
|
||||
}
|
||||
let Some(params) = notification.params.as_ref() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(payload) = serde_json::from_value::<TurnCompletedNotification>(params.clone())
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
payload.thread_id == thread_id && payload.turn.id == turn_id
|
||||
})
|
||||
}
|
||||
|
||||
fn message_request_id(message: &JSONRPCMessage) -> Option<&RequestId> {
|
||||
match message {
|
||||
JSONRPCMessage::Request(request) => Some(&request.id),
|
||||
|
||||
@@ -15,8 +15,6 @@ use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::TurnInterruptParams;
|
||||
use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -317,9 +315,14 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
|
||||
let second_response = responses::sse_response(responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]))
|
||||
.set_delay(std::time::Duration::from_millis(500));
|
||||
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
|
||||
let _second_response_mock = responses::mount_sse_once(&server, second_body).await;
|
||||
let _second_response_mock = responses::mount_response_once(&server, second_response).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -413,28 +416,9 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
||||
resume_err.error.message
|
||||
);
|
||||
|
||||
// This test intentionally keeps a turn running to exercise the resume error path.
|
||||
// Keep this explicit interrupt + turn_aborted wait so teardown does not leave
|
||||
// in-flight work behind (which can show up as LEAK in nextest).
|
||||
let interrupt_id = primary
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id,
|
||||
turn_id: running_turn.id,
|
||||
})
|
||||
primary
|
||||
.interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT)
|
||||
.await?;
|
||||
let interrupt_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
|
||||
)
|
||||
.await??;
|
||||
let _turn_interrupt_response: TurnInterruptResponse =
|
||||
to_response::<TurnInterruptResponse>(interrupt_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("codex/event/turn_aborted"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -447,9 +431,14 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
|
||||
let second_response = responses::sse_response(responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]))
|
||||
.set_delay(std::time::Duration::from_millis(500));
|
||||
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
|
||||
let _second_response_mock = responses::mount_sse_once(&server, second_body).await;
|
||||
let _second_response_mock = responses::mount_response_once(&server, second_response).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -533,28 +522,9 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
||||
resume_err.error.message
|
||||
);
|
||||
|
||||
// This test intentionally keeps a turn running to exercise the resume error path.
|
||||
// Keep this explicit interrupt + turn_aborted wait so teardown does not leave
|
||||
// in-flight work behind (which can show up as LEAK in nextest).
|
||||
let interrupt_id = primary
|
||||
.send_turn_interrupt_request(TurnInterruptParams {
|
||||
thread_id,
|
||||
turn_id: running_turn.id,
|
||||
})
|
||||
primary
|
||||
.interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT)
|
||||
.await?;
|
||||
let interrupt_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
|
||||
)
|
||||
.await??;
|
||||
let _turn_interrupt_response: TurnInterruptResponse =
|
||||
to_response::<TurnInterruptResponse>(interrupt_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("codex/event/turn_aborted"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -747,7 +747,12 @@ async fn turn_start_accepts_local_image_input() -> Result<()> {
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
assert!(!turn.id.is_empty());
|
||||
|
||||
// This test only validates that turn/start responds and returns a turn.
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_core::features::FEATURES;
|
||||
@@ -100,7 +101,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "run echo hi".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
@@ -114,11 +115,12 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
@@ -149,6 +151,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
assert!(command.contains(" -lc 'echo hi'"));
|
||||
assert_eq!(cwd, workspace);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -504,11 +509,12 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let mut approval_ids = Vec::new();
|
||||
for decision in [
|
||||
@@ -577,6 +583,9 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
assert_eq!(approval_ids.len(), 2);
|
||||
assert_ne!(approval_ids[0], approval_ids[1]);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
|
||||
|
||||
let steer_req = mcp
|
||||
.send_turn_steer_request(TurnSteerParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "steer".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
@@ -152,6 +152,9 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
|
||||
let steer: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
|
||||
assert_eq!(steer.turn_id, turn.id);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(thread.id, steer.turn_id, DEFAULT_READ_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user