Compare commits

...

4 Commits

Author SHA1 Message Date
Ahmed Ibrahim
2c00009dba Merge branch 'main' into dev/flaky-subagent-completion-watcher-revival 2026-03-09 12:07:28 -07:00
Ahmed Ibrahim
da738b559c codex: start watcher after send_input succeeds 2026-03-09 11:47:43 -07:00
Ahmed Ibrahim
91de3ac397 codex: remove subagent watcher debug logs 2026-03-09 11:45:27 -07:00
Ahmed Ibrahim
0bd20db915 Attach subagent completion watcher before input 2026-03-07 23:40:14 -08:00

View File

@@ -211,9 +211,22 @@ impl AgentControl {
// to subscribe or drain this newly created thread.
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
let completion_status_rx = if matches!(
notification_source.as_ref(),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }))
) {
self.subscribe_status(new_thread.thread_id).await.ok()
} else {
None
};
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
self.maybe_start_completion_watcher(
new_thread.thread_id,
notification_source,
completion_status_rx,
)
.await;
Ok(new_thread.thread_id)
}
@@ -288,7 +301,12 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
self.maybe_start_completion_watcher(
resumed_thread.thread_id,
Some(notification_source),
None,
)
.await;
Ok(resumed_thread.thread_id)
}
@@ -418,10 +436,11 @@ impl AgentControl {
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
async fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
status_rx: Option<watch::Receiver<AgentStatus>>,
) {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
@@ -429,10 +448,15 @@ impl AgentControl {
else {
return;
};
let status_rx = match status_rx {
Some(status_rx) => Some(status_rx),
None => self.subscribe_status(child_thread_id).await.ok(),
};
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let status = match status_rx {
Some(mut status_rx) => {
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
@@ -443,7 +467,7 @@ impl AgentControl {
}
status
}
Err(_) => control.get_status(child_thread_id).await,
None => control.get_status(child_thread_id).await,
};
if !is_final(&status) {
return;
@@ -1331,15 +1355,19 @@ mod tests {
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
);
harness
.control
.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
None,
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);