diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 50da272375..3149e75bd9 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -332,6 +332,7 @@ use codex_protocol::config_types::ServiceTier; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::models::ContentItem; use codex_protocol::models::DeveloperInstructions; +use codex_protocol::models::MessagePhase; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; @@ -7426,6 +7427,25 @@ async fn try_run_sampling_request( cancellation_token: cancellation_token.child_token(), }; + let preempt_for_mailbox_mail = match &item { + ResponseItem::Message { role, phase, .. } => { + role == "assistant" && matches!(phase, Some(MessagePhase::Commentary)) + } + ResponseItem::Reasoning { .. } => true, + ResponseItem::LocalShellCall { .. } + | ResponseItem::FunctionCall { .. } + | ResponseItem::ToolSearchCall { .. } + | ResponseItem::FunctionCallOutput { .. } + | ResponseItem::CustomToolCall { .. } + | ResponseItem::CustomToolCallOutput { .. } + | ResponseItem::ToolSearchOutput { .. } + | ResponseItem::WebSearchCall { .. } + | ResponseItem::ImageGenerationCall { .. } + | ResponseItem::GhostSnapshot { .. } + | ResponseItem::Compaction { .. } + | ResponseItem::Other => false, + }; + let output_result = handle_output_item_done(&mut ctx, item, previously_active_item) .instrument(handle_responses) .await?; @@ -7436,6 +7456,13 @@ async fn try_run_sampling_request( last_agent_message = Some(agent_message); } needs_follow_up |= output_result.needs_follow_up; + // todo: remove before stabilizing multi-agent v2 + if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() { + break Ok(SamplingRequestResult { + needs_follow_up: true, + last_agent_message, + }); + } } ResponseEvent::OutputItemAdded(item) => { if let Some(turn_item) = handle_non_tool_response_item( diff --git a/codex-rs/core/tests/suite/pending_input.rs b/codex-rs/core/tests/suite/pending_input.rs index 30b0d9d50b..2fc5bf4e34 100644 --- a/codex-rs/core/tests/suite/pending_input.rs +++ b/codex-rs/core/tests/suite/pending_input.rs @@ -1,17 +1,31 @@ +use std::sync::Arc; + +use codex_core::CodexThread; +use codex_protocol::AgentPath; +use codex_protocol::items::TurnItem; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::Op; use codex_protocol::user_input::UserInput; +use core_test_support::context_snapshot; +use core_test_support::context_snapshot::ContextSnapshotOptions; use core_test_support::responses; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; +use core_test_support::responses::ev_reasoning_item; +use core_test_support::responses::ev_reasoning_item_added; use core_test_support::responses::ev_response_created; use core_test_support::streaming_sse::StreamingSseChunk; +use core_test_support::streaming_sse::StreamingSseServer; use core_test_support::streaming_sse::start_streaming_sse_server; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; use serde_json::Value; +use serde_json::from_slice; +use serde_json::json; use tokio::sync::oneshot; fn ev_message_item_done(id: &str, text: &str) -> Value { @@ -44,6 +58,115 @@ fn message_input_texts(body: &Value, role: &str) -> Vec { .collect() } +fn chunk(event: Value) -> StreamingSseChunk { + StreamingSseChunk { + gate: None, + body: responses::sse(vec![event]), + } +} + +fn gated_chunk(gate: oneshot::Receiver<()>, events: Vec) -> StreamingSseChunk { + StreamingSseChunk { + gate: Some(gate), + body: responses::sse(events), + } +} + +fn response_completed_chunks(response_id: &str) -> Vec { + vec![ + chunk(ev_response_created(response_id)), + chunk(ev_completed(response_id)), + ] +} + +async fn build_codex(server: &StreamingSseServer) -> Arc { + test_codex() + .with_model("gpt-5.1") + .build_with_streaming_server(server) + .await + .unwrap_or_else(|err| panic!("build streaming Codex test session: {err}")) + .codex +} + +async fn submit_user_input(codex: &CodexThread, text: &str) { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .unwrap_or_else(|err| panic!("submit user input: {err}")); +} + +async fn submit_queue_only_agent_mail(codex: &CodexThread, text: &str) { + codex + .submit(Op::InterAgentCommunication { + communication: InterAgentCommunication::new( + AgentPath::try_from("/root/worker") + .unwrap_or_else(|err| panic!("worker path should parse: {err}")), + AgentPath::root(), + Vec::new(), + text.to_string(), + /*trigger_turn*/ false, + ), + }) + .await + .unwrap_or_else(|err| panic!("submit queue-only agent mail: {err}")); +} + +async fn wait_for_reasoning_item_started(codex: &CodexThread) { + wait_for_event(codex, |event| { + matches!( + event, + EventMsg::ItemStarted(item_started) + if matches!(&item_started.item, TurnItem::Reasoning(_)) + ) + }) + .await; +} + +async fn wait_for_agent_message(codex: &CodexThread, text: &str) { + let final_message = wait_for_event( + codex, + |event| matches!(event, EventMsg::AgentMessage(message) if message.message == text), + ) + .await; + assert!(matches!(final_message, EventMsg::AgentMessage(_))); +} + +async fn wait_for_turn_complete(codex: &CodexThread) { + wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; +} + +fn assert_two_responses_input_snapshot(snapshot_name: &str, requests: &[Vec]) { + assert_eq!(requests.len(), 2); + let options = ContextSnapshotOptions::default().strip_capability_instructions(); + let first: Value = + from_slice(&requests[0]).unwrap_or_else(|err| panic!("parse first request: {err}")); + let second: Value = + from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse second request: {err}")); + let first_items = first["input"] + .as_array() + .unwrap_or_else(|| panic!("first request input")) + .clone(); + let second_items = second["input"] + .as_array() + .unwrap_or_else(|| panic!("second request input")) + .clone(); + let snapshot = context_snapshot::format_labeled_items_snapshot( + "/responses POST bodies (input only, redacted like other suite snapshots)", + &[ + ("First request", first_items.as_slice()), + ("Second request", second_items.as_slice()), + ], + &options, + ); + insta::assert_snapshot!(snapshot_name, snapshot); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "TODO(aibrahim): flaky"] async fn injected_user_input_triggers_follow_up_request_with_deltas() { @@ -144,3 +267,162 @@ async fn injected_user_input_triggers_follow_up_request_with_deltas() { server.shutdown().await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() { + let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel(); + + let first_chunks = vec![ + chunk(ev_response_created("resp-1")), + chunk(ev_reasoning_item_added("reason-1", &["thinking"])), + gated_chunk( + gate_reasoning_done_rx, + vec![ + ev_reasoning_item("reason-1", &["thinking"], &[]), + ev_function_call( + "call-stale", + "shell", + r#"{"command":"echo stale tool call"}"#, + ), + ev_message_item_added("msg-stale", ""), + ev_output_text_delta("stale final"), + ev_message_item_done("msg-stale", "stale final"), + ev_completed("resp-1"), + ], + ), + ]; + + let (server, _completions) = + start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await; + + let codex = build_codex(&server).await; + + submit_user_input(&codex, "first prompt").await; + + wait_for_reasoning_item_started(&codex).await; + + submit_queue_only_agent_mail(&codex, "queued child update").await; + + let _ = gate_reasoning_done_tx.send(()); + + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_two_responses_input_snapshot("pending_input_queued_mail_after_reasoning", &requests); + + server.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() { + let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel(); + + let first_chunks = vec![ + chunk(ev_response_created("resp-1")), + chunk(ev_message_item_added("msg-1", "")), + gated_chunk( + gate_message_done_rx, + vec![ + ev_output_text_delta("first answer"), + json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": "msg-1", + "content": [{"type": "output_text", "text": "first answer"}], + "phase": "commentary", + } + }), + ev_function_call( + "call-stale", + "shell", + r#"{"command":"echo stale tool call"}"#, + ), + ev_message_item_added("msg-stale", ""), + ev_output_text_delta("stale final"), + ev_message_item_done("msg-stale", "stale final"), + ev_completed("resp-1"), + ], + ), + ]; + + let (server, _completions) = + start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await; + + let codex = build_codex(&server).await; + + submit_user_input(&codex, "first prompt").await; + + wait_for_event(&codex, |event| { + matches!( + event, + EventMsg::ItemStarted(item_started) + if matches!(&item_started.item, TurnItem::AgentMessage(_)) + ) + }) + .await; + + submit_queue_only_agent_mail(&codex, "queued child update").await; + + let _ = gate_message_done_tx.send(()); + + wait_for_agent_message(&codex, "first answer").await; + + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_two_responses_input_snapshot("pending_input_queued_mail_after_commentary", &requests); + + server.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn user_input_does_not_preempt_after_reasoning_item() { + let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel(); + + let first_chunks = vec![ + chunk(ev_response_created("resp-1")), + chunk(ev_reasoning_item_added("reason-1", &["thinking"])), + gated_chunk( + gate_reasoning_done_rx, + vec![ + ev_reasoning_item("reason-1", &["thinking"], &[]), + ev_function_call( + "call-preserved", + "shell", + r#"{"command":"echo preserved tool call"}"#, + ), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("first answer"), + ev_message_item_done("msg-1", "first answer"), + ev_completed("resp-1"), + ], + ), + ]; + + let (server, _completions) = + start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await; + + let codex = build_codex(&server).await; + + submit_user_input(&codex, "first prompt").await; + + wait_for_reasoning_item_started(&codex).await; + + submit_user_input(&codex, "second prompt").await; + + let _ = gate_reasoning_done_tx.send(()); + + wait_for_agent_message(&codex, "first answer").await; + + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_two_responses_input_snapshot( + "pending_input_user_input_no_preempt_after_reasoning", + &requests, + ); + + server.shutdown().await; +} diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_commentary.snap b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_commentary.snap new file mode 100644 index 0000000000..e65a5f34f0 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_commentary.snap @@ -0,0 +1,17 @@ +--- +source: core/tests/suite/pending_input.rs +expression: snapshot +--- +Scenario: /responses POST bodies (input only, redacted like other suite snapshots) + +## First request +00:message/developer: +01:message/user:> +02:message/user:first prompt + +## Second request +00:message/developer: +01:message/user:> +02:message/user:first prompt +03:message/assistant:first answer +04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false} diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_reasoning.snap b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_reasoning.snap new file mode 100644 index 0000000000..004196f97a --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_queued_mail_after_reasoning.snap @@ -0,0 +1,17 @@ +--- +source: core/tests/suite/pending_input.rs +expression: snapshot +--- +Scenario: /responses POST bodies (input only, redacted like other suite snapshots) + +## First request +00:message/developer: +01:message/user:> +02:message/user:first prompt + +## Second request +00:message/developer: +01:message/user:> +02:message/user:first prompt +03:reasoning:summary=thinking:encrypted=true +04:message/assistant:{"author":"/root/worker","recipient":"/root","other_recipients":[],"content":"queued child update","trigger_turn":false} diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_user_input_no_preempt_after_reasoning.snap b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_user_input_no_preempt_after_reasoning.snap new file mode 100644 index 0000000000..724efd706a --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__pending_input__pending_input_user_input_no_preempt_after_reasoning.snap @@ -0,0 +1,20 @@ +--- +source: core/tests/suite/pending_input.rs +expression: snapshot +--- +Scenario: /responses POST bodies (input only, redacted like other suite snapshots) + +## First request +00:message/developer: +01:message/user:> +02:message/user:first prompt + +## Second request +00:message/developer: +01:message/user:> +02:message/user:first prompt +03:reasoning:summary=thinking:encrypted=true +04:function_call/shell +05:message/assistant:first answer +06:function_call_output:failed to parse function arguments: invalid type: string "echo preserved tool call", expected a sequence at line 1 column 37 +07:message/user:second prompt