This commit is contained in:
jif-oai
2026-03-31 11:22:38 +02:00
committed by GitHub
parent 873e466549
commit 25fbd7e40e
3 changed files with 99 additions and 73 deletions

View File

@@ -191,64 +191,68 @@ impl Session {
let cancellation_token = CancellationToken::new();
let done = Arc::new(Notify::new());
let done_clone = Arc::clone(&done);
let handle = {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
// full task lifecycle after the submission dispatch span ends.
let task_span = info_span!(
"turn",
otel.name = span_name,
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
model = %turn_context.model_info.slug,
);
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
let sess = session_ctx.clone_session();
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
}
done_clone.notify_waiters();
}
.instrument(task_span),
)
};
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
let mailbox_items = self.get_pending_input().await;
let mut active = self.active_turn.lock().await;
let mut turn = ActiveTurn::default();
let mut turn_state = turn.turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
turn_state.push_pending_input(item);
let turn_state = {
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
Arc::clone(&turn.turn_state)
};
{
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
turn_state.push_pending_input(item);
}
for item in mailbox_items {
turn_state.push_pending_input(item);
}
}
for item in mailbox_items {
turn_state.push_pending_input(item);
}
drop(turn_state);
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
let done_clone = Arc::clone(&done);
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
// full task lifecycle after the submission dispatch span ends.
let task_span = info_span!(
"turn",
otel.name = span_name,
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
model = %turn_context.model_info.slug,
);
let handle = tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
let sess = session_ctx.clone_session();
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
}
done_clone.notify_waiters();
}
.instrument(task_span),
);
let timer = turn_context
.session_telemetry
.start_timer(TURN_E2E_DURATION_METRIC, &[])
.ok();
let running_task = RunningTask {
done,
handle: Arc::new(AbortOnDropHandle::new(handle)),
@@ -259,24 +263,26 @@ impl Session {
_timer: timer,
};
turn.add_task(running_task);
*active = Some(turn);
}
/// Starts a regular turn when queued next-turn items or trigger-turn mailbox mail are waiting.
/// Starts a regular turn when the session is idle and pending work is waiting.
///
/// Pending work currently includes queued next-turn items and mailbox mail marked with
/// `trigger_turn`.
///
/// This helper generates a fresh sub-id for the synthetic turn before delegating to the
/// explicit-sub-id variant.
pub(crate) async fn ensure_task_for_pending_inputs(self: &Arc<Self>) {
self.ensure_task_for_pending_inputs_with_sub_id(uuid::Uuid::new_v4().to_string())
pub(crate) async fn maybe_start_turn_for_pending_work(self: &Arc<Self>) {
self.maybe_start_turn_for_pending_work_with_sub_id(uuid::Uuid::new_v4().to_string())
.await;
}
/// Starts a regular turn with the provided sub-id when pending input should wake an idle
/// Starts a regular turn with the provided sub-id when pending work should wake an idle
/// session.
///
/// The turn is created only when there are queued next-turn items or mailbox mail marked with
/// `trigger_turn`, and only if the session is currently idle.
pub(crate) async fn ensure_task_for_pending_inputs_with_sub_id(
pub(crate) async fn maybe_start_turn_for_pending_work_with_sub_id(
self: &Arc<Self>,
sub_id: String,
) {
@@ -286,8 +292,12 @@ impl Session {
return;
}
if self.active_turn.lock().await.is_some() {
return;
{
let mut active_turn = self.active_turn.lock().await;
if active_turn.is_some() {
return;
}
*active_turn = Some(ActiveTurn::default());
}
let turn_context = self.new_default_turn_with_sub_id(sub_id).await;
@@ -307,7 +317,7 @@ impl Session {
active_turn.clear_pending().await;
}
if reason == TurnAbortReason::Interrupted {
self.ensure_task_for_pending_inputs().await;
self.maybe_start_turn_for_pending_work().await;
}
}
@@ -320,24 +330,31 @@ impl Session {
.turn_metadata_state
.cancel_git_enrichment_task();
let mut active = self.active_turn.lock().await;
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_tool_calls = 0_u64;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
let mut ts = at.turn_state.lock().await;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
should_clear_active_turn = true;
let turn_state = Arc::clone(&at.turn_state);
if should_clear_active_turn {
*active = None;
}
Some(turn_state)
} else {
None
}
};
if let Some(turn_state) = turn_state {
let mut ts = turn_state.lock().await;
pending_input = ts.take_pending_input();
turn_tool_calls = ts.tool_calls;
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());
should_clear_active_turn = true;
}
if should_clear_active_turn {
*active = None;
}
drop(active);
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_pending_input(self, &turn_context, pending_input_item).await {
@@ -436,6 +453,15 @@ impl Session {
last_agent_message,
});
self.send_event(turn_context.as_ref(), event).await;
if should_clear_active_turn {
let session = Arc::clone(self);
let _scheduler = tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
session.maybe_start_turn_for_pending_work().await;
});
});
}
}
async fn take_active_turn(&self) -> Option<ActiveTurn> {