Compare commits

...

5 Commits

Author SHA1 Message Date
jif-oai
068ab89f96 fix 2026-04-02 12:22:51 +02:00
jif-oai
32ff8b94d7 fix 2026-04-02 12:04:58 +02:00
jif-oai
c7c47d0b2d fix 2026-04-02 11:57:48 +02:00
jif-oai
c7bc168db7 fix 2026-04-02 11:50:52 +02:00
jif-oai
6175239980 fix: pending input race 2026-04-02 11:20:21 +02:00
6 changed files with 156 additions and 21 deletions

View File

@@ -19,6 +19,11 @@ pub(crate) struct MailboxReceiver {
pending_mails: VecDeque<InterAgentCommunication>,
}
pub(crate) struct MailboxDrain {
pub(crate) mails: Vec<InterAgentCommunication>,
pub(crate) saw_trigger_turn: bool,
}
impl Mailbox {
pub(crate) fn new() -> (Self, MailboxReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
@@ -65,9 +70,14 @@ impl MailboxReceiver {
self.pending_mails.iter().any(|mail| mail.trigger_turn)
}
pub(crate) fn drain(&mut self) -> Vec<InterAgentCommunication> {
pub(crate) fn drain(&mut self) -> MailboxDrain {
self.sync_pending_mails();
self.pending_mails.drain(..).collect()
let mails = self.pending_mails.drain(..).collect::<Vec<_>>();
let saw_trigger_turn = mails.iter().any(|mail| mail.trigger_turn);
MailboxDrain {
mails,
saw_trigger_turn,
}
}
}
@@ -134,28 +144,38 @@ mod tests {
mailbox.send(mail_one.clone());
mailbox.send(mail_two.clone());
assert_eq!(receiver.drain(), vec![mail_one, mail_two]);
let drain = receiver.drain();
assert_eq!(drain.mails, vec![mail_one, mail_two]);
assert!(!drain.saw_trigger_turn);
assert!(!receiver.has_pending());
}
#[tokio::test]
async fn mailbox_tracks_pending_trigger_turn_mail() {
async fn mailbox_drain_tracks_pending_trigger_turn_mail() {
let (mailbox, mut receiver) = Mailbox::new();
mailbox.send(make_mail(
let queue_only_mail = make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"queued",
/*trigger_turn*/ false,
));
assert!(!receiver.has_pending_trigger_turn());
);
mailbox.send(queue_only_mail.clone());
mailbox.send(make_mail(
let drain = receiver.drain();
assert_eq!(drain.mails, vec![queue_only_mail]);
assert!(!drain.saw_trigger_turn);
let trigger_turn_mail = make_mail(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
"wake",
/*trigger_turn*/ true,
));
assert!(receiver.has_pending_trigger_turn());
);
mailbox.send(trigger_turn_mail.clone());
let drain = receiver.drain();
assert_eq!(drain.mails, vec![trigger_turn_mail]);
assert!(drain.saw_trigger_turn);
}
}

View File

@@ -8,6 +8,7 @@ pub(crate) mod status;
pub(crate) use codex_protocol::protocol::AgentStatus;
pub(crate) use control::AgentControl;
pub(crate) use mailbox::Mailbox;
pub(crate) use mailbox::MailboxDrain;
pub(crate) use mailbox::MailboxReceiver;
pub(crate) use registry::exceeds_thread_spawn_depth_limit;
pub(crate) use registry::next_thread_spawn_depth;

View File

@@ -9,6 +9,7 @@ use std::sync::atomic::AtomicU64;
use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::Mailbox;
use crate::agent::MailboxDrain;
use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
@@ -4034,6 +4035,9 @@ impl Session {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
if at.tasks.is_empty() && !at.accepts_input_without_running_tasks {
return Err(input);
}
let mut ts = at.turn_state.lock().await;
for item in input {
ts.push_pending_input(item);
@@ -4052,6 +4056,10 @@ impl Session {
self.mailbox.send(communication);
}
pub(crate) async fn drain_mailbox(&self) -> MailboxDrain {
self.mailbox_rx.lock().await.drain()
}
pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending_trigger_turn()
}
@@ -4080,9 +4088,9 @@ impl Session {
}
};
let mailbox_items = {
let mut mailbox_rx = self.mailbox_rx.lock().await;
mailbox_rx
.drain()
self.drain_mailbox()
.await
.mails
.into_iter()
.map(|mail| mail.to_response_input_item())
.collect::<Vec<_>>()
@@ -4099,7 +4107,6 @@ impl Session {
}
/// Queue response items to be injected into the next active turn created for this session.
#[cfg(test)]
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
return;

View File

@@ -54,6 +54,7 @@ use codex_execpolicy::NetworkRuleProtocol;
use codex_execpolicy::Policy;
use codex_network_proxy::NetworkProxyConfig;
use codex_otel::TelemetryAuthMode;
use codex_protocol::AgentPath;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
@@ -69,6 +70,7 @@ use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::NetworkApprovalProtocol;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
@@ -4781,6 +4783,79 @@ async fn queued_response_items_for_next_turn_move_into_next_active_turn() {
assert_eq!(sess.get_pending_input().await, vec![queued_item]);
}
#[tokio::test]
async fn inject_response_items_accepts_pending_turn_start_reservation() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
*sess.active_turn.lock().await = Some(ActiveTurn::default());
let pending_item = ResponseInputItem::Message {
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "queued during turn-complete".to_string(),
}],
};
sess.inject_response_items(vec![pending_item.clone()])
.await
.expect("starting active turn reservation should accept injected items");
assert_eq!(sess.get_pending_input().await, vec![pending_item]);
}
#[tokio::test]
async fn inject_response_items_rejects_finalizing_turn_reservation() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
let mut finalizing_turn = ActiveTurn::default();
finalizing_turn.accepts_input_without_running_tasks = false;
*sess.active_turn.lock().await = Some(finalizing_turn);
let pending_item = ResponseInputItem::Message {
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "queued during turn-complete".to_string(),
}],
};
let err = sess
.inject_response_items(vec![pending_item.clone()])
.await
.expect_err("finalizing active turn reservation should reject injected items");
assert_eq!(err, vec![pending_item]);
}
#[tokio::test]
async fn on_task_finished_records_queue_only_mailbox_mail_without_restart() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
let communication = InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"queued completion".to_string(),
/*trigger_turn*/ false,
);
sess.enqueue_mailbox_communication(communication.clone());
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
.await;
sleep(Duration::from_millis(50)).await;
assert!(!sess.has_pending_input().await);
assert!(sess.active_turn.lock().await.is_none());
assert_eq!(
sess.clone_history().await.raw_items(),
&[ResponseItem::from(communication.to_response_input_item())],
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

View File

@@ -27,6 +27,7 @@ use codex_protocol::protocol::TokenUsage;
pub(crate) struct ActiveTurn {
pub(crate) tasks: IndexMap<String, RunningTask>,
pub(crate) turn_state: Arc<Mutex<TurnState>>,
pub(crate) accepts_input_without_running_tasks: bool,
}
impl Default for ActiveTurn {
@@ -34,6 +35,7 @@ impl Default for ActiveTurn {
Self {
tasks: IndexMap::new(),
turn_state: Arc::new(Mutex::new(TurnState::default())),
accepts_input_without_running_tasks: true,
}
}
}

View File

@@ -198,6 +198,7 @@ impl Session {
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
turn.accepts_input_without_running_tasks = true;
Arc::clone(&turn.turn_state)
};
{
@@ -335,27 +336,42 @@ impl Session {
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_tool_calls = 0_u64;
let turn_state = {
let finalizing_turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
should_clear_active_turn = true;
let turn_state = Arc::clone(&at.turn_state);
if should_clear_active_turn {
*active = None;
}
Some(turn_state)
at.accepts_input_without_running_tasks = false;
Some(Arc::clone(&at.turn_state))
} else {
None
}
};
if let Some(turn_state) = turn_state {
if let Some(turn_state) = finalizing_turn_state.as_ref() {
let mut ts = turn_state.lock().await;
pending_input = ts.take_pending_input();
turn_tool_calls = ts.tool_calls;
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());
}
let mailbox_drain = self.drain_mailbox().await;
let mut next_turn_mailbox_input = Vec::new();
let mut queue_mail_for_next_turn = false;
for mail in mailbox_drain.mails {
let mailbox_input = mail.to_response_input_item();
queue_mail_for_next_turn |= mail.trigger_turn;
if queue_mail_for_next_turn {
next_turn_mailbox_input.push(mailbox_input);
} else {
pending_input.push(mailbox_input);
}
}
if mailbox_drain.saw_trigger_turn {
self.queue_response_items_for_next_turn(next_turn_mailbox_input)
.await;
}
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_pending_input(self, &turn_context, pending_input_item).await {
@@ -370,6 +386,7 @@ impl Session {
}
}
}
// Emit token usage metrics.
if let Some(token_usage_at_turn_start) = token_usage_at_turn_start {
// TODO(jif): drop this
@@ -449,6 +466,19 @@ impl Session {
&[("token_type", "reasoning_output"), tmp_mem],
);
}
if should_clear_active_turn
&& let Some(turn_state) = finalizing_turn_state.as_ref()
{
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_ref()
&& Arc::ptr_eq(&at.turn_state, turn_state)
&& at.tasks.is_empty()
&& !at.accepts_input_without_running_tasks
{
*active = None;
}
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,