diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 17f6701063..73f5c6ed7d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4047,29 +4047,39 @@ impl Session { } pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { - self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::NextTurn) - .await; - } - - pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) { - self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::CurrentTurn) - .await; - } - - async fn set_mailbox_delivery_phase(&self, sub_id: &str, phase: MailboxDeliveryPhase) { - let turn_state = { - let active = self.active_turn.lock().await; - active.as_ref().and_then(|active_turn| { - active_turn - .tasks - .contains_key(sub_id) - .then(|| Arc::clone(&active_turn.turn_state)) - }) - }; + let turn_state = self.turn_state_for_sub_id(sub_id).await; let Some(turn_state) = turn_state else { return; }; - turn_state.lock().await.set_mailbox_delivery_phase(phase); + let mut turn_state = turn_state.lock().await; + if turn_state.has_pending_input() { + return; + } + turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); + } + + pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) { + let turn_state = self.turn_state_for_sub_id(sub_id).await; + let Some(turn_state) = turn_state else { + return; + }; + turn_state + .lock() + .await + .set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); + } + + async fn turn_state_for_sub_id( + &self, + sub_id: &str, + ) -> Option>> { + let active = self.active_turn.lock().await; + active.as_ref().and_then(|active_turn| { + active_turn + .tasks + .contains_key(sub_id) + .then(|| Arc::clone(&active_turn.turn_state)) + }) } pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index f7d343a81e..4ff31e8bd0 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -4896,6 +4896,52 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() { ); } +#[tokio::test] +async fn stale_defer_mailbox_delivery_does_not_override_steered_input() { + let (sess, tc, _rx) = make_session_and_context_with_rx().await; + let communication = InterAgentCommunication::new( + AgentPath::try_from("/root/worker").expect("worker path should parse"), + AgentPath::root(), + Vec::new(), + "queued child update".to_string(), + /*trigger_turn*/ false, + ); + sess.spawn_task( + Arc::clone(&tc), + Vec::new(), + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: true, + }, + ) + .await; + + sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + sess.enqueue_mailbox_communication(communication.clone()); + sess.steer_input( + vec![UserInput::Text { + text: "follow up".to_string(), + text_elements: Vec::new(), + }], + Some(&tc.sub_id), + ) + .await + .expect("steered input should be accepted"); + + sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + + assert_eq!( + sess.get_pending_input().await, + vec![ + ResponseInputItem::from(vec![UserInput::Text { + text: "follow up".to_string(), + text_elements: Vec::new(), + }]), + communication.to_response_input_item(), + ], + ); +} + #[tokio::test] async fn tool_calls_reopen_mailbox_delivery_for_current_turn() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index af318d8017..f3ebc7225d 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -8,3 +8,4 @@ pub(crate) use turn::ActiveTurn; pub(crate) use turn::MailboxDeliveryPhase; pub(crate) use turn::RunningTask; pub(crate) use turn::TaskKind; +pub(crate) use turn::TurnState; diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 223edbe7a6..8cfd8555e6 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -214,10 +214,6 @@ impl TurnState { !self.pending_input.is_empty() } - pub(crate) fn defer_mailbox_delivery_to_next_turn(&mut self) { - self.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); - } - pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) { self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); }