Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
68bf31f704 fix: multi-agent one shot issue 2026-03-27 18:05:07 +01:00
3 changed files with 211 additions and 143 deletions

View File

@@ -3,7 +3,6 @@ use crate::agent::registry::AgentMetadata;
use crate::agent::registry::AgentRegistry;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
@@ -271,17 +270,6 @@ impl AgentControl {
.await;
self.send_input(new_thread.thread_id, items).await?;
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| new_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
new_thread.thread_id,
notification_source,
child_reference,
agent_metadata.agent_path.clone(),
);
Ok(LiveAgent {
thread_id: new_thread.thread_id,
@@ -443,17 +431,6 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
let child_reference = agent_metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| resumed_thread.thread_id.to_string());
self.maybe_start_completion_watcher(
resumed_thread.thread_id,
Some(notification_source.clone()),
child_reference,
agent_metadata.agent_path.clone(),
);
self.persist_thread_spawn_edge_for_source(
resumed_thread.thread.as_ref(),
resumed_thread.thread_id,
@@ -780,83 +757,53 @@ impl AgentControl {
Ok(agents)
}
/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
pub(crate) async fn forward_child_completion_to_parent(
&self,
parent_thread_id: ThreadId,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
child_reference: String,
child_agent_path: Option<AgentPath>,
status: AgentStatus,
) {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
let Ok(state) = self.upgrade() else {
return;
};
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
status = control.get_status(child_thread_id).await;
break;
}
status = status_rx.borrow().clone();
}
status
}
Err(_) => control.get_status(child_thread_id).await,
};
if !is_final(&status) {
return;
}
let Ok(state) = control.upgrade() else {
let child_thread = state.get_thread(child_thread_id).await.ok();
let message = format_subagent_notification_message(child_reference.as_str(), &status);
if child_agent_path.is_some()
&& child_thread
.as_ref()
.map(|thread| thread.enabled(Feature::MultiAgentV2))
.unwrap_or(true)
{
let Some(child_agent_path) = child_agent_path else {
return;
};
let child_thread = state.get_thread(child_thread_id).await.ok();
let message = format_subagent_notification_message(child_reference.as_str(), &status);
if child_agent_path.is_some()
&& child_thread
.as_ref()
.map(|thread| thread.enabled(Feature::MultiAgentV2))
.unwrap_or(true)
{
let Some(child_agent_path) = child_agent_path.clone() else {
return;
};
let Some(parent_agent_path) = child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
else {
return;
};
let communication = InterAgentCommunication::new(
child_agent_path,
parent_agent_path,
Vec::new(),
message,
/*trigger_turn*/ false,
);
let _ = control
.send_inter_agent_communication(parent_thread_id, communication)
.await;
return;
}
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
let Some(parent_agent_path) = child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
else {
return;
};
parent_thread
.inject_user_message_without_turn(message)
let communication = InterAgentCommunication::new(
child_agent_path,
parent_agent_path,
Vec::new(),
message,
/*trigger_turn*/ false,
);
let _ = self
.send_inter_agent_communication(parent_thread_id, communication)
.await;
});
return;
}
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
return;
};
parent_thread
.inject_user_message_without_turn(message)
.await;
}
#[allow(clippy::too_many_arguments)]

View File

@@ -1008,7 +1008,7 @@ async fn resume_agent_releases_slot_after_resume_failure() {
}
#[tokio::test]
async fn spawn_child_completion_notifies_parent_history() {
async fn spawn_child_turn_completion_notifies_parent_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
@@ -1033,10 +1033,18 @@ async fn spawn_child_completion_notifies_parent_history() {
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
let child_turn = child_thread.codex.session.new_default_turn().await;
child_thread
.codex
.session
.send_event(
child_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: child_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
}),
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
@@ -1149,9 +1157,22 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
let mut tester_config = harness.config.clone();
let _ = tester_config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.manager
.start_thread(tester_config)
.control
.spawn_agent_with_metadata(
tester_config,
text_input("seed task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
SpawnAgentOptions::default(),
)
.await
.expect("tester thread should start")
.thread_id;
@@ -1160,20 +1181,6 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
harness.control.maybe_start_completion_watcher(
tester_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
tester_path.to_string(),
Some(tester_path.clone()),
);
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
@@ -1218,7 +1225,7 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
}
})
.await
.expect("completion watcher should queue a direct-parent message");
.expect("turn completion should queue a direct-parent message");
let root_history_items = root_thread
.codex
@@ -1240,44 +1247,118 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
}
#[tokio::test]
async fn completion_watcher_notifies_parent_when_child_is_missing() {
async fn multi_agent_v2_turn_completion_notifies_parent_for_later_turns() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
let (_root_thread_id, _root_thread) = harness.start_thread().await;
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
let mut tester_config = harness.config.clone();
let _ = tester_config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.control
.spawn_agent_with_metadata(
tester_config,
text_input("seed task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
SpawnAgentOptions::default(),
)
.await
.expect("tester thread should start")
.thread_id;
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
child_thread_id.to_string(),
None,
);
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
let history_items = parent_thread
let first_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert_eq!(
history_contains_text(
&history_items,
&format!("\"agent_path\":\"{child_thread_id}\"")
.send_event(
first_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn.sub_id.clone(),
last_agent_message: Some("done once".to_string()),
}),
)
.await;
let second_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
second_turn.as_ref(),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: second_turn.sub_id.clone(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
)
.await;
tester_thread
.codex
.session
.send_event(
second_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: second_turn.sub_id.clone(),
last_agent_message: Some("done twice".to_string()),
}),
)
.await;
let expected_messages = [
crate::session_prefix::format_subagent_notification_message(
tester_path.as_str(),
&AgentStatus::Completed(Some("done once".to_string())),
),
true
);
assert_eq!(
history_contains_text(&history_items, "\"status\":\"not_found\""),
true
);
crate::session_prefix::format_subagent_notification_message(
tester_path.as_str(),
&AgentStatus::Completed(Some("done twice".to_string())),
),
];
timeout(Duration::from_secs(5), async {
loop {
let count = harness
.manager
.captured_ops()
.into_iter()
.filter(|(thread_id, op)| {
*thread_id == worker_thread_id
&& expected_messages.iter().any(|message| {
matches!(
op,
Op::InterAgentCommunication { communication }
if communication
== &InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
message.clone(),
false,
)
)
})
})
.count();
if count == expected_messages.len() {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("turn completion should notify the parent for each completed turn");
}
#[tokio::test]

View File

@@ -2629,6 +2629,7 @@ impl Session {
msg,
};
self.send_event_raw(event).await;
self.maybe_start_parent_turn_completion_notification(turn_context, &legacy_source);
self.maybe_mirror_event_text_to_realtime(&legacy_source)
.await;
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
@@ -2644,6 +2645,45 @@ impl Session {
}
}
fn maybe_start_parent_turn_completion_notification(
&self,
turn_context: &TurnContext,
msg: &EventMsg,
) {
let Some(status @ (AgentStatus::Completed(_) | AgentStatus::Errored(_))) =
agent_status_from_event(msg)
else {
return;
};
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path,
..
}) = &turn_context.session_source
else {
return;
};
let child_reference = agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| self.conversation_id.to_string());
let agent_control = self.services.agent_control.clone();
let parent_thread_id = *parent_thread_id;
let child_thread_id = self.conversation_id;
let child_agent_path = agent_path.clone();
tokio::spawn(async move {
agent_control
.forward_child_completion_to_parent(
parent_thread_id,
child_thread_id,
child_reference,
child_agent_path,
status,
)
.await;
});
}
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
let Some(text) = realtime_text_for_event(msg) else {
return;