mirror of
https://github.com/openai/codex.git
synced 2026-04-05 06:51:44 +03:00
Compare commits
1 Commits
mstar/remo
...
jif/fix-on
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68bf31f704 |
@@ -3,7 +3,6 @@ use crate::agent::registry::AgentMetadata;
|
||||
use crate::agent::registry::AgentRegistry;
|
||||
use crate::agent::role::DEFAULT_ROLE_NAME;
|
||||
use crate::agent::role::resolve_role_config;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::codex_thread::ThreadConfigSnapshot;
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use crate::error::CodexErr;
|
||||
@@ -271,17 +270,6 @@ impl AgentControl {
|
||||
.await;
|
||||
|
||||
self.send_input(new_thread.thread_id, items).await?;
|
||||
let child_reference = agent_metadata
|
||||
.agent_path
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| new_thread.thread_id.to_string());
|
||||
self.maybe_start_completion_watcher(
|
||||
new_thread.thread_id,
|
||||
notification_source,
|
||||
child_reference,
|
||||
agent_metadata.agent_path.clone(),
|
||||
);
|
||||
|
||||
Ok(LiveAgent {
|
||||
thread_id: new_thread.thread_id,
|
||||
@@ -443,17 +431,6 @@ impl AgentControl {
|
||||
// Resumed threads are re-registered in-memory and need the same listener
|
||||
// attachment path as freshly spawned threads.
|
||||
state.notify_thread_created(resumed_thread.thread_id);
|
||||
let child_reference = agent_metadata
|
||||
.agent_path
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| resumed_thread.thread_id.to_string());
|
||||
self.maybe_start_completion_watcher(
|
||||
resumed_thread.thread_id,
|
||||
Some(notification_source.clone()),
|
||||
child_reference,
|
||||
agent_metadata.agent_path.clone(),
|
||||
);
|
||||
self.persist_thread_spawn_edge_for_source(
|
||||
resumed_thread.thread.as_ref(),
|
||||
resumed_thread.thread_id,
|
||||
@@ -780,83 +757,53 @@ impl AgentControl {
|
||||
Ok(agents)
|
||||
}
|
||||
|
||||
/// Starts a detached watcher for sub-agents spawned from another thread.
|
||||
///
|
||||
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
|
||||
/// can receive completion notifications.
|
||||
fn maybe_start_completion_watcher(
|
||||
pub(crate) async fn forward_child_completion_to_parent(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
child_thread_id: ThreadId,
|
||||
session_source: Option<SessionSource>,
|
||||
child_reference: String,
|
||||
child_agent_path: Option<AgentPath>,
|
||||
status: AgentStatus,
|
||||
) {
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
else {
|
||||
let Ok(state) = self.upgrade() else {
|
||||
return;
|
||||
};
|
||||
let control = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let status = match control.subscribe_status(child_thread_id).await {
|
||||
Ok(mut status_rx) => {
|
||||
let mut status = status_rx.borrow().clone();
|
||||
while !is_final(&status) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
status = control.get_status(child_thread_id).await;
|
||||
break;
|
||||
}
|
||||
status = status_rx.borrow().clone();
|
||||
}
|
||||
status
|
||||
}
|
||||
Err(_) => control.get_status(child_thread_id).await,
|
||||
};
|
||||
if !is_final(&status) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(state) = control.upgrade() else {
|
||||
let child_thread = state.get_thread(child_thread_id).await.ok();
|
||||
let message = format_subagent_notification_message(child_reference.as_str(), &status);
|
||||
if child_agent_path.is_some()
|
||||
&& child_thread
|
||||
.as_ref()
|
||||
.map(|thread| thread.enabled(Feature::MultiAgentV2))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
let Some(child_agent_path) = child_agent_path else {
|
||||
return;
|
||||
};
|
||||
let child_thread = state.get_thread(child_thread_id).await.ok();
|
||||
let message = format_subagent_notification_message(child_reference.as_str(), &status);
|
||||
if child_agent_path.is_some()
|
||||
&& child_thread
|
||||
.as_ref()
|
||||
.map(|thread| thread.enabled(Feature::MultiAgentV2))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
let Some(child_agent_path) = child_agent_path.clone() else {
|
||||
return;
|
||||
};
|
||||
let Some(parent_agent_path) = child_agent_path
|
||||
.as_str()
|
||||
.rsplit_once('/')
|
||||
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let communication = InterAgentCommunication::new(
|
||||
child_agent_path,
|
||||
parent_agent_path,
|
||||
Vec::new(),
|
||||
message,
|
||||
/*trigger_turn*/ false,
|
||||
);
|
||||
let _ = control
|
||||
.send_inter_agent_communication(parent_thread_id, communication)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
|
||||
let Some(parent_agent_path) = child_agent_path
|
||||
.as_str()
|
||||
.rsplit_once('/')
|
||||
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
parent_thread
|
||||
.inject_user_message_without_turn(message)
|
||||
let communication = InterAgentCommunication::new(
|
||||
child_agent_path,
|
||||
parent_agent_path,
|
||||
Vec::new(),
|
||||
message,
|
||||
/*trigger_turn*/ false,
|
||||
);
|
||||
let _ = self
|
||||
.send_inter_agent_communication(parent_thread_id, communication)
|
||||
.await;
|
||||
});
|
||||
return;
|
||||
}
|
||||
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
|
||||
return;
|
||||
};
|
||||
parent_thread
|
||||
.inject_user_message_without_turn(message)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
||||
@@ -1008,7 +1008,7 @@ async fn resume_agent_releases_slot_after_resume_failure() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_child_completion_notifies_parent_history() {
|
||||
async fn spawn_child_turn_completion_notifies_parent_history() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
|
||||
@@ -1033,10 +1033,18 @@ async fn spawn_child_completion_notifies_parent_history() {
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
let _ = child_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
let child_turn = child_thread.codex.session.new_default_turn().await;
|
||||
child_thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
child_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: child_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done".to_string()),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
}
|
||||
@@ -1149,9 +1157,22 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
|
||||
let mut tester_config = harness.config.clone();
|
||||
let _ = tester_config.features.enable(Feature::MultiAgentV2);
|
||||
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
|
||||
let tester_path = worker_path.join("tester").expect("tester path");
|
||||
let tester_thread_id = harness
|
||||
.manager
|
||||
.start_thread(tester_config)
|
||||
.control
|
||||
.spawn_agent_with_metadata(
|
||||
tester_config,
|
||||
text_input("seed task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: worker_thread_id,
|
||||
depth: 2,
|
||||
agent_path: Some(tester_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
SpawnAgentOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("tester thread should start")
|
||||
.thread_id;
|
||||
@@ -1160,20 +1181,6 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
.get_thread(tester_thread_id)
|
||||
.await
|
||||
.expect("tester thread should exist");
|
||||
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
|
||||
let tester_path = worker_path.join("tester").expect("tester path");
|
||||
harness.control.maybe_start_completion_watcher(
|
||||
tester_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: worker_thread_id,
|
||||
depth: 2,
|
||||
agent_path: Some(tester_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
tester_path.to_string(),
|
||||
Some(tester_path.clone()),
|
||||
);
|
||||
let tester_turn = tester_thread.codex.session.new_default_turn().await;
|
||||
tester_thread
|
||||
.codex
|
||||
@@ -1218,7 +1225,7 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("completion watcher should queue a direct-parent message");
|
||||
.expect("turn completion should queue a direct-parent message");
|
||||
|
||||
let root_history_items = root_thread
|
||||
.codex
|
||||
@@ -1240,44 +1247,118 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn completion_watcher_notifies_parent_when_child_is_missing() {
|
||||
async fn multi_agent_v2_turn_completion_notifies_parent_for_later_turns() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
let child_thread_id = ThreadId::new();
|
||||
let (_root_thread_id, _root_thread) = harness.start_thread().await;
|
||||
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
|
||||
let mut tester_config = harness.config.clone();
|
||||
let _ = tester_config.features.enable(Feature::MultiAgentV2);
|
||||
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
|
||||
let tester_path = worker_path.join("tester").expect("tester path");
|
||||
let tester_thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_metadata(
|
||||
tester_config,
|
||||
text_input("seed task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: worker_thread_id,
|
||||
depth: 2,
|
||||
agent_path: Some(tester_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
SpawnAgentOptions::default(),
|
||||
)
|
||||
.await
|
||||
.expect("tester thread should start")
|
||||
.thread_id;
|
||||
let tester_thread = harness
|
||||
.manager
|
||||
.get_thread(tester_thread_id)
|
||||
.await
|
||||
.expect("tester thread should exist");
|
||||
|
||||
harness.control.maybe_start_completion_watcher(
|
||||
child_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
child_thread_id.to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
|
||||
let history_items = parent_thread
|
||||
let first_turn = tester_thread.codex.session.new_default_turn().await;
|
||||
tester_thread
|
||||
.codex
|
||||
.session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.to_vec();
|
||||
assert_eq!(
|
||||
history_contains_text(
|
||||
&history_items,
|
||||
&format!("\"agent_path\":\"{child_thread_id}\"")
|
||||
.send_event(
|
||||
first_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done once".to_string()),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let second_turn = tester_thread.codex.session.new_default_turn().await;
|
||||
tester_thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
second_turn.as_ref(),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: second_turn.sub_id.clone(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: ModeKind::Default,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
tester_thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
second_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: second_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done twice".to_string()),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_messages = [
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
tester_path.as_str(),
|
||||
&AgentStatus::Completed(Some("done once".to_string())),
|
||||
),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
history_contains_text(&history_items, "\"status\":\"not_found\""),
|
||||
true
|
||||
);
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
tester_path.as_str(),
|
||||
&AgentStatus::Completed(Some("done twice".to_string())),
|
||||
),
|
||||
];
|
||||
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let count = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter(|(thread_id, op)| {
|
||||
*thread_id == worker_thread_id
|
||||
&& expected_messages.iter().any(|message| {
|
||||
matches!(
|
||||
op,
|
||||
Op::InterAgentCommunication { communication }
|
||||
if communication
|
||||
== &InterAgentCommunication::new(
|
||||
tester_path.clone(),
|
||||
worker_path.clone(),
|
||||
Vec::new(),
|
||||
message.clone(),
|
||||
false,
|
||||
)
|
||||
)
|
||||
})
|
||||
})
|
||||
.count();
|
||||
if count == expected_messages.len() {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("turn completion should notify the parent for each completed turn");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -2629,6 +2629,7 @@ impl Session {
|
||||
msg,
|
||||
};
|
||||
self.send_event_raw(event).await;
|
||||
self.maybe_start_parent_turn_completion_notification(turn_context, &legacy_source);
|
||||
self.maybe_mirror_event_text_to_realtime(&legacy_source)
|
||||
.await;
|
||||
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
|
||||
@@ -2644,6 +2645,45 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_start_parent_turn_completion_notification(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
msg: &EventMsg,
|
||||
) {
|
||||
let Some(status @ (AgentStatus::Completed(_) | AgentStatus::Errored(_))) =
|
||||
agent_status_from_event(msg)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
agent_path,
|
||||
..
|
||||
}) = &turn_context.session_source
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let child_reference = agent_path
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| self.conversation_id.to_string());
|
||||
let agent_control = self.services.agent_control.clone();
|
||||
let parent_thread_id = *parent_thread_id;
|
||||
let child_thread_id = self.conversation_id;
|
||||
let child_agent_path = agent_path.clone();
|
||||
tokio::spawn(async move {
|
||||
agent_control
|
||||
.forward_child_completion_to_parent(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
child_reference,
|
||||
child_agent_path,
|
||||
status,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
|
||||
let Some(text) = realtime_text_for_event(msg) else {
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user