Centralize subagent completion bookkeeping

This commit is contained in:
Friel
2025-11-23 11:50:26 -08:00
parent b4bccfb10a
commit c6b2af27a8

View File

@@ -1,8 +1,12 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -94,8 +98,16 @@ pub struct SubagentManager {
struct WatchdogHandle {
cancel: tokio_util::sync::CancellationToken,
#[allow(dead_code)]
task: tokio::task::JoinHandle<()>,
}
struct NeverFuture;
impl Future for NeverFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
@@ -1149,7 +1161,7 @@ impl SubagentManager {
let cancel_child = cancel.clone();
let manager = self.clone();
let msg_clone = message.clone();
let task = tokio::spawn(async move {
let _task = tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel_child.cancelled() => break,
@@ -1214,7 +1226,7 @@ impl SubagentManager {
.watchdogs
.write()
.await
.insert(key, WatchdogHandle { cancel, task })
.insert(key, WatchdogHandle { cancel })
.is_some();
Ok(if replaced {
WatchdogAction::Replaced
@@ -1379,6 +1391,55 @@ impl SubagentManager {
}
}
fn timeout_future(duration: Option<Duration>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
match duration {
Some(duration) => Box::pin(tokio::time::sleep(duration)),
None => Box::pin(NeverFuture),
}
}
async fn record_completion_and_metadata(
&self,
session_id: &ConversationId,
completion: &SubagentCompletion,
) -> Result<SubagentMetadata, SubagentManagerError> {
{
let mut completions = self.completions.write().await;
completions.insert(*session_id, completion.clone());
}
let desired_status = status_from_completion(completion);
let mut metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
if metadata.status != desired_status {
self.update_status_and_emit(session_id, desired_status)
.await;
metadata.status = desired_status;
}
Ok(metadata)
}
async fn completion_result(
&self,
session_id: &ConversationId,
completion: SubagentCompletion,
) -> Result<AwaitInboxResult, SubagentManagerError> {
let metadata = self
.record_completion_and_metadata(session_id, &completion)
.await?;
Ok(AwaitInboxResult {
metadata,
completion: Some(completion),
messages: Vec::new(),
})
}
pub async fn await_completion(
&self,
session_id: &ConversationId,
@@ -1388,25 +1449,9 @@ impl SubagentManager {
let completions = self.completions.read().await;
completions.get(session_id).cloned()
} {
let desired_status = status_from_completion(&completion);
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
if metadata.status != desired_status {
self.update_status_and_emit(session_id, desired_status)
.await;
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
return Ok(AwaitResult {
metadata,
completion,
});
}
.record_completion_and_metadata(session_id, &completion)
.await?;
return Ok(AwaitResult {
metadata,
completion,
@@ -1425,10 +1470,8 @@ impl SubagentManager {
completions.get(session_id).cloned()
} {
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
.record_completion_and_metadata(session_id, &completion)
.await?;
return Ok(AwaitResult {
metadata,
completion,
@@ -1442,10 +1485,8 @@ impl SubagentManager {
if let Some(completion) = current_completion(&receiver) {
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
.record_completion_and_metadata(session_id, &completion)
.await?;
return Ok(AwaitResult {
metadata,
completion,
@@ -1497,24 +1538,9 @@ impl SubagentManager {
.ok_or(SubagentManagerError::NotFound)?
};
{
let mut completions = self.completions.write().await;
completions.insert(*session_id, completion.clone());
}
let desired_status = status_from_completion(&completion);
if let Some(current) = self.registry.get(session_id).await
&& current.status != desired_status
{
self.update_status_and_emit(session_id, desired_status)
.await;
}
let metadata = self
.registry
.get(session_id)
.await
.ok_or(SubagentManagerError::NotFound)?;
.record_completion_and_metadata(session_id, &completion)
.await?;
Ok(AwaitResult {
metadata,
completion,
@@ -1567,8 +1593,14 @@ impl SubagentManager {
let mut completion_opt = current_completion(&receiver);
if completion_opt.is_some() || !messages.is_empty() {
if let Some(ref completion) = completion_opt {
self.ensure_completion_recorded(session_id, completion)
let metadata = self
.record_completion_and_metadata(session_id, completion)
.await?;
return Ok(AwaitInboxResult {
metadata,
completion: completion_opt,
messages,
});
}
let metadata = self
.registry
@@ -1624,8 +1656,14 @@ impl SubagentManager {
// Finalize completion state if we observed a terminal result.
if let Some(ref completion) = completion_opt {
self.ensure_completion_recorded(session_id, completion)
let metadata = self
.record_completion_and_metadata(session_id, completion)
.await?;
return Ok(AwaitInboxResult {
metadata,
completion: completion_opt,
messages,
});
}
let metadata = self