Compare commits

...

12 Commits

12 changed files with 1462 additions and 132 deletions

View File

@@ -76,6 +76,11 @@ pub struct ThreadHistoryBuilder {
next_item_index: i64,
current_rollout_index: usize,
next_rollout_index: usize,
// Current streams emit per-attempt spawn ids (`call_id`, `call_id#2`, ...); legacy rollouts
// reused the raw tool call_id for each retry, so replay still synthesizes stable per-attempt
// item ids when needed to preserve each attempt row.
current_spawn_attempt_ids: HashMap<String, String>,
spawn_attempt_counts: HashMap<String, usize>,
}
impl Default for ThreadHistoryBuilder {
@@ -92,6 +97,8 @@ impl ThreadHistoryBuilder {
next_item_index: 1,
current_rollout_index: 0,
next_rollout_index: 0,
current_spawn_attempt_ids: HashMap::new(),
spawn_attempt_counts: HashMap::new(),
}
}
@@ -608,8 +615,9 @@ impl ThreadHistoryBuilder {
&mut self,
payload: &codex_protocol::protocol::CollabAgentSpawnBeginEvent,
) {
let item_id = self.next_collab_spawn_attempt_item_id(&payload.call_id);
let item = ThreadItem::CollabAgentToolCall {
id: payload.call_id.clone(),
id: item_id,
tool: CollabAgentTool::SpawnAgent,
status: CollabAgentToolCallStatus::InProgress,
sender_thread_id: payload.sender_thread_id.to_string(),
@@ -626,6 +634,10 @@ impl ThreadHistoryBuilder {
&mut self,
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
) {
let item_id = self
.current_spawn_attempt_ids
.remove(&payload.call_id)
.unwrap_or_else(|| payload.call_id.clone());
let has_receiver = payload.new_thread_id.is_some();
let status = match &payload.status {
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
@@ -644,7 +656,7 @@ impl ThreadHistoryBuilder {
None => (Vec::new(), HashMap::new()),
};
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
id: payload.call_id.clone(),
id: item_id,
tool: CollabAgentTool::SpawnAgent,
status,
sender_thread_id: payload.sender_thread_id.to_string(),
@@ -977,6 +989,8 @@ impl ThreadHistoryBuilder {
}
fn finish_current_turn(&mut self) {
self.current_spawn_attempt_ids.clear();
self.spawn_attempt_counts.clear();
if let Some(turn) = self.current_turn.take() {
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
return;
@@ -1040,6 +1054,22 @@ impl ThreadHistoryBuilder {
id
}
fn next_collab_spawn_attempt_item_id(&mut self, call_id: &str) -> String {
let attempt_number = self
.spawn_attempt_counts
.entry(call_id.to_string())
.and_modify(|count| *count += 1)
.or_insert(1);
let item_id = if *attempt_number == 1 {
call_id.to_string()
} else {
format!("{call_id}#{attempt_number}")
};
self.current_spawn_attempt_ids
.insert(call_id.to_string(), item_id.clone());
item_id
}
fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
let mut content = Vec::new();
if !payload.message.trim().is_empty() {
@@ -2562,6 +2592,157 @@ mod tests {
);
}
#[test]
fn reconstructs_collab_spawn_end_without_receiver_as_failed_spawn_attempt() {
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
.expect("valid sender thread id");
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "spawn agent".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
call_id: "spawn-1".into(),
sender_thread_id,
prompt: "inspect the repo".into(),
model: "gpt-5.4-mini".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
}),
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: "spawn-1".into(),
sender_thread_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt: "inspect the repo".into(),
model: "gpt-5.4-mini".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
status: AgentStatus::PendingInit,
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].items.len(), 2);
assert_eq!(
turns[0].items[1],
ThreadItem::CollabAgentToolCall {
id: "spawn-1".into(),
tool: CollabAgentTool::SpawnAgent,
status: CollabAgentToolCallStatus::Failed,
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
receiver_thread_ids: Vec::new(),
prompt: Some("inspect the repo".into()),
model: Some("gpt-5.4-mini".into()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
agents_states: HashMap::new(),
}
);
}
#[test]
fn reconstructs_collab_spawn_retries_as_distinct_attempt_items() {
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
.expect("valid sender thread id");
let spawned_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
.expect("valid receiver thread id");
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "spawn agent".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
call_id: "spawn-1".into(),
sender_thread_id,
prompt: "inspect the repo".into(),
model: "gpt-5.4-mini".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
}),
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: "spawn-1".into(),
sender_thread_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt: "inspect the repo".into(),
model: "gpt-5.4-mini".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
status: AgentStatus::Errored("insufficient_quota".into()),
}),
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
call_id: "spawn-1".into(),
sender_thread_id,
prompt: "inspect the repo".into(),
model: "gpt-5".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
}),
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
call_id: "spawn-1".into(),
sender_thread_id,
new_thread_id: Some(spawned_thread_id),
new_agent_nickname: Some("Scout".into()),
new_agent_role: Some("explorer".into()),
prompt: "inspect the repo".into(),
model: "gpt-5".into(),
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
status: AgentStatus::Running,
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].items.len(), 3);
assert_eq!(
turns[0].items[1],
ThreadItem::CollabAgentToolCall {
id: "spawn-1".into(),
tool: CollabAgentTool::SpawnAgent,
status: CollabAgentToolCallStatus::Failed,
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
receiver_thread_ids: Vec::new(),
prompt: Some("inspect the repo".into()),
model: Some("gpt-5.4-mini".into()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
agents_states: HashMap::new(),
}
);
assert_eq!(
turns[0].items[2],
ThreadItem::CollabAgentToolCall {
id: "spawn-1#2".into(),
tool: CollabAgentTool::SpawnAgent,
status: CollabAgentToolCallStatus::Completed,
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()],
prompt: Some("inspect the repo".into()),
model: Some("gpt-5".into()),
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
agents_states: [(
"00000000-0000-0000-0000-000000000002".into(),
CollabAgentState {
status: crate::protocol::v2::CollabAgentStatus::Running,
message: None,
},
)]
.into_iter()
.collect(),
}
);
}
#[test]
fn reconstructs_interrupted_send_input_as_completed_collab_call() {
// `send_input(interrupt=true)` first stops the child's active turn, then redirects it with

View File

@@ -309,7 +309,7 @@ where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
Option::<Vec<T>>::deserialize(deserializer).map(Option::unwrap_or_default)
Option::<Vec<T>>::deserialize(deserializer).map(std::option::Option::unwrap_or_default)
}
#[derive(Clone, Debug, Deserialize)]

View File

@@ -24,7 +24,6 @@ use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;

View File

@@ -47,63 +47,137 @@ impl ToolHandler for Handler {
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
let config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
let mut candidates_to_try = collect_spawn_agent_model_candidates(
args.model_fallback_list.as_ref(),
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
);
if candidates_to_try.is_empty() {
candidates_to_try.push(SpawnAgentModelCandidate {
model: None,
reasoning_effort: None,
});
}
let result = session
.services
.agent_control
.spawn_agent_with_metadata(
config,
input_items,
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
},
let mut spawn_result = None;
for (idx, candidate) in candidates_to_try.iter().enumerate() {
let attempt_call_id = spawn_attempt_event_call_id(&call_id, idx);
let candidate_model = candidate.model.clone().unwrap_or_default();
let candidate_reasoning_effort = candidate.reasoning_effort.unwrap_or_default();
send_collab_agent_spawn_begin_event(
&session,
&turn,
attempt_call_id.clone(),
prompt.clone(),
candidate_model.clone(),
candidate_reasoning_effort,
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
.await;
let mut candidate_config = config.clone();
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut candidate_config,
candidate.model.as_deref(),
candidate.reasoning_effort,
)
.await?;
apply_role_to_config(&mut candidate_config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut candidate_config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut candidate_config, child_depth);
let attempt_result = session
.services
.agent_control
.spawn_agent_with_metadata(
candidate_config,
input_items.clone(),
Some(thread_spawn_source(
session.conversation_id,
&turn.session_source,
child_depth,
role_name,
/*task_name*/ None,
)?),
SpawnAgentOptions {
fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()),
fork_mode: args.fork_context.then_some(SpawnAgentForkMode::FullHistory),
},
)
.await;
match attempt_result {
Ok(spawned_agent) => {
let status = if idx + 1 < candidates_to_try.len() {
match probe_spawn_attempt_for_async_quota_exhaustion(
spawned_agent.status.clone(),
spawned_agent.thread_id,
&session.services.agent_control,
)
.await
{
SpawnAttemptRetryDecision::Accept(status) => status,
SpawnAttemptRetryDecision::Retry(retry_status) => {
match close_quota_exhausted_spawn_attempt(
&session.services.agent_control,
spawned_agent.thread_id,
retry_status,
)
.await
{
SpawnAttemptRetryDecision::Accept(status) => status,
SpawnAttemptRetryDecision::Retry(status) => {
send_collab_agent_spawn_retry_preempted_event(
&session,
&turn,
attempt_call_id,
prompt.clone(),
candidate_model,
candidate_reasoning_effort,
status,
)
.await;
continue;
}
}
}
}
} else {
spawned_agent.status.clone()
};
spawn_result = Some((spawned_agent, status, attempt_call_id));
break;
}
Err(err) => {
send_collab_agent_spawn_error_event(
&session,
&turn,
attempt_call_id,
prompt.clone(),
candidate_model,
candidate_reasoning_effort,
&err,
)
.await;
if spawn_should_retry_on_quota_exhaustion(&err)
&& idx + 1 < candidates_to_try.len()
{
continue;
}
return Err(collab_spawn_error(err));
}
}
}
let Some((spawned_agent, status, spawn_event_call_id)) = spawn_result else {
return Err(FunctionCallError::RespondToModel(
"No spawn attempts were executed".to_string(),
));
};
let new_thread_id = Some(spawned_agent.thread_id);
let new_agent_metadata = Some(spawned_agent.metadata.clone());
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
@@ -141,7 +215,7 @@ impl ToolHandler for Handler {
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
call_id: spawn_event_call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
@@ -154,7 +228,7 @@ impl ToolHandler for Handler {
.into(),
)
.await;
let new_thread_id = result?.thread_id;
let new_thread_id = spawned_agent.thread_id;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
@@ -175,6 +249,7 @@ struct SpawnAgentArgs {
items: Option<Vec<UserInput>>,
agent_type: Option<String>,
model: Option<String>,
model_fallback_list: Option<Vec<SpawnAgentModelFallbackCandidate>>,
reasoning_effort: Option<ReasoningEffort>,
#[serde(default)]
fork_context: bool,

View File

@@ -16,19 +16,39 @@ use codex_protocol::models::ResponseInputItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::openai_models::ReasoningEffortPreset;
use codex_protocol::protocol::CollabAgentRef;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabAgentStatusEntry;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::timeout;
/// Minimum wait timeout to prevent tight polling loops from burning CPU.
pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000;
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 3600 * 1000;
const ASYNC_QUOTA_EXHAUSTION_STATUS_TIMEOUT: Duration = Duration::from_secs(2);
pub(crate) enum SpawnAttemptRetryDecision {
Accept(AgentStatus),
Retry(AgentStatus),
}
pub(crate) fn spawn_attempt_event_call_id(call_id: &str, attempt_index: usize) -> String {
if attempt_index == 0 {
call_id.to_string()
} else {
format!("{call_id}#{}", attempt_index + 1)
}
}
pub(crate) fn function_arguments(payload: ToolPayload) -> Result<String, FunctionCallError> {
match payload {
@@ -71,6 +91,177 @@ where
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SpawnAgentModelCandidate {
pub(crate) model: Option<String>,
pub(crate) reasoning_effort: Option<ReasoningEffort>,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub(crate) struct SpawnAgentModelFallbackCandidate {
pub(crate) model: String,
#[serde(default)]
pub(crate) reasoning_effort: Option<ReasoningEffort>,
}
pub(crate) fn collect_spawn_agent_model_candidates(
model_fallback_list: Option<&Vec<SpawnAgentModelFallbackCandidate>>,
requested_model: Option<&str>,
requested_reasoning_effort: Option<ReasoningEffort>,
) -> Vec<SpawnAgentModelCandidate> {
if let Some(model_fallback_list) = model_fallback_list {
return model_fallback_list
.iter()
.map(|candidate| SpawnAgentModelCandidate {
model: Some(candidate.model.clone()),
reasoning_effort: candidate.reasoning_effort.or(requested_reasoning_effort),
})
.collect();
}
let mut candidates = Vec::new();
if requested_model.is_some() || requested_reasoning_effort.is_some() {
candidates.push(SpawnAgentModelCandidate {
model: requested_model.map(ToString::to_string),
reasoning_effort: requested_reasoning_effort,
});
}
candidates
}
pub(crate) async fn close_quota_exhausted_spawn_attempt(
agent_control: &crate::agent::control::AgentControl,
thread_id: ThreadId,
retry_status: AgentStatus,
) -> SpawnAttemptRetryDecision {
let retry_decision =
recheck_spawn_attempt_retry_decision(retry_status, thread_id, agent_control).await;
let SpawnAttemptRetryDecision::Retry(status) = retry_decision else {
return retry_decision;
};
// There is still a narrow TOCTOU window: a child can leave `PendingInit` after the final
// status read above and before `close_agent` runs. `AgentControl` does not currently expose
// a compare-and-close primitive, so this is the strongest local mitigation available.
if let Err(err) = agent_control.close_agent(thread_id).await
&& !matches!(
err,
CodexErr::ThreadNotFound(_) | CodexErr::InternalAgentDied
)
{
tracing::warn!("failed to close quota-exhausted spawn attempt {thread_id}: {err}");
}
SpawnAttemptRetryDecision::Retry(status)
}
pub(crate) fn spawn_should_retry_on_quota_exhaustion(error: &CodexErr) -> bool {
matches!(
error,
CodexErr::QuotaExceeded | CodexErr::UsageLimitReached(_)
)
}
pub(crate) async fn probe_spawn_attempt_for_async_quota_exhaustion(
thread_status: AgentStatus,
thread_id: ThreadId,
agent_control: &crate::agent::control::AgentControl,
) -> SpawnAttemptRetryDecision {
match thread_status {
AgentStatus::Completed(_)
| AgentStatus::Errored(_)
| AgentStatus::Shutdown
| AgentStatus::NotFound => {
return retry_decision_for_final_spawn_status(thread_status);
}
AgentStatus::PendingInit | AgentStatus::Running | AgentStatus::Interrupted => {}
}
let Ok(mut status_rx) = agent_control.subscribe_status(thread_id).await else {
return match thread_status {
AgentStatus::Running | AgentStatus::Interrupted => {
SpawnAttemptRetryDecision::Accept(thread_status)
}
_ => SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
};
};
let deadline = Instant::now() + ASYNC_QUOTA_EXHAUSTION_STATUS_TIMEOUT;
loop {
let status = status_rx.borrow_and_update().clone();
match status {
AgentStatus::Completed(_)
| AgentStatus::Errored(_)
| AgentStatus::Shutdown
| AgentStatus::NotFound => {
return retry_decision_for_final_spawn_status(status);
}
AgentStatus::PendingInit | AgentStatus::Running | AgentStatus::Interrupted => {}
}
let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
return match status {
AgentStatus::PendingInit => {
SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit)
}
AgentStatus::Running | AgentStatus::Interrupted => {
SpawnAttemptRetryDecision::Accept(status)
}
AgentStatus::Completed(_)
| AgentStatus::Errored(_)
| AgentStatus::Shutdown
| AgentStatus::NotFound => retry_decision_for_final_spawn_status(status),
};
};
match timeout(remaining, status_rx.changed()).await {
Ok(Ok(())) => {}
Ok(Err(_)) => return SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
Err(_) => return SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
}
}
}
pub(crate) async fn recheck_spawn_attempt_retry_decision(
status: AgentStatus,
thread_id: ThreadId,
agent_control: &crate::agent::control::AgentControl,
) -> SpawnAttemptRetryDecision {
if !matches!(status, AgentStatus::PendingInit) {
return SpawnAttemptRetryDecision::Retry(status);
}
let latest_status = agent_control.get_status(thread_id).await;
match latest_status {
AgentStatus::Running | AgentStatus::Interrupted => {
SpawnAttemptRetryDecision::Accept(latest_status)
}
AgentStatus::Completed(_)
| AgentStatus::Errored(_)
| AgentStatus::Shutdown
| AgentStatus::NotFound => retry_decision_for_final_spawn_status(latest_status),
AgentStatus::PendingInit => SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
}
}
fn retry_decision_for_final_spawn_status(status: AgentStatus) -> SpawnAttemptRetryDecision {
if spawn_should_retry_on_quota_exhaustion_status(&status) {
SpawnAttemptRetryDecision::Retry(status)
} else {
SpawnAttemptRetryDecision::Accept(status)
}
}
fn spawn_should_retry_on_quota_exhaustion_status(status: &AgentStatus) -> bool {
match status {
AgentStatus::Errored(message) => {
let message = message.to_lowercase();
message.contains("insufficient_quota")
|| message.contains("usage limit")
|| message.contains("quota")
}
AgentStatus::NotFound => false,
_ => false,
}
}
pub(crate) fn build_wait_agent_statuses(
statuses: &HashMap<ThreadId, AgentStatus>,
receiver_agents: &[CollabAgentRef],
@@ -118,6 +309,88 @@ pub(crate) fn collab_spawn_error(err: CodexErr) -> FunctionCallError {
}
}
pub(crate) async fn send_collab_agent_spawn_error_event(
session: &Session,
turn: &TurnContext,
call_id: String,
prompt: String,
model: String,
reasoning_effort: ReasoningEffort,
err: &CodexErr,
) {
session
.send_event(
turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt,
model,
reasoning_effort,
status: match err {
CodexErr::ThreadNotFound(_) => AgentStatus::NotFound,
err => AgentStatus::Errored(err.to_string()),
},
}
.into(),
)
.await;
}
pub(crate) async fn send_collab_agent_spawn_begin_event(
session: &Session,
turn: &TurnContext,
call_id: String,
prompt: String,
model: String,
reasoning_effort: ReasoningEffort,
) {
session
.send_event(
turn,
CollabAgentSpawnBeginEvent {
call_id,
sender_thread_id: session.conversation_id,
prompt,
model,
reasoning_effort,
}
.into(),
)
.await;
}
pub(crate) async fn send_collab_agent_spawn_retry_preempted_event(
session: &Session,
turn: &TurnContext,
call_id: String,
prompt: String,
model: String,
reasoning_effort: ReasoningEffort,
status: AgentStatus,
) {
session
.send_event(
turn,
CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: session.conversation_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt,
model,
reasoning_effort,
status,
}
.into(),
)
.await;
}
pub(crate) fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
match err {
CodexErr::ThreadNotFound(id) => {
@@ -363,3 +636,111 @@ fn validate_spawn_agent_reasoning_effort(
"Reasoning effort `{requested_reasoning_effort}` is not supported for model `{model}`. Supported reasoning efforts: {supported}"
)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::UsageLimitReachedError;
use crate::protocol::AgentStatus;
#[test]
fn collect_spawn_agent_model_candidates_prefers_fallback_list() {
let candidates = collect_spawn_agent_model_candidates(
Some(&vec![
SpawnAgentModelFallbackCandidate {
model: "fallback-a".to_string(),
reasoning_effort: Some(ReasoningEffort::High),
},
SpawnAgentModelFallbackCandidate {
model: "fallback-b".to_string(),
reasoning_effort: Some(ReasoningEffort::Minimal),
},
]),
Some("legacy-model"),
Some(ReasoningEffort::Low),
);
assert_eq!(
candidates,
vec![
SpawnAgentModelCandidate {
model: Some("fallback-a".to_string()),
reasoning_effort: Some(ReasoningEffort::High),
},
SpawnAgentModelCandidate {
model: Some("fallback-b".to_string()),
reasoning_effort: Some(ReasoningEffort::Minimal),
},
]
);
}
#[test]
fn collect_spawn_agent_model_candidates_falls_back_to_legacy_args() {
let candidates = collect_spawn_agent_model_candidates(
/*model_fallback_list*/ None,
Some("legacy-model"),
Some(ReasoningEffort::Minimal),
);
assert_eq!(
candidates,
vec![SpawnAgentModelCandidate {
model: Some("legacy-model".to_string()),
reasoning_effort: Some(ReasoningEffort::Minimal),
}]
);
}
#[test]
fn collect_spawn_agent_model_candidates_empty_when_no_model_is_set() {
let candidates = collect_spawn_agent_model_candidates(
/*model_fallback_list*/ None, /*requested_model*/ None,
/*requested_reasoning_effort*/ None,
);
assert_eq!(candidates, Vec::new());
}
#[test]
fn spawn_should_retry_on_quota_exhaustion_checks_expected_error_variants() {
assert!(spawn_should_retry_on_quota_exhaustion(
&CodexErr::QuotaExceeded
));
assert!(spawn_should_retry_on_quota_exhaustion(
&CodexErr::UsageLimitReached(UsageLimitReachedError {
plan_type: None,
resets_at: None,
rate_limits: None,
promo_message: None,
})
));
assert!(!spawn_should_retry_on_quota_exhaustion(
&CodexErr::UnsupportedOperation("thread manager dropped".to_string())
));
}
#[test]
fn collab_spawn_error_handles_thread_manager_drop() {
assert_eq!(
collab_spawn_error(CodexErr::UnsupportedOperation(
"thread manager dropped".to_string()
)),
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
);
}
#[test]
fn build_wait_agent_statuses_includes_extras_in_sorted_order() {
let receiver_agents = vec![];
let mut statuses = HashMap::new();
let thread_a = ThreadId::new();
let thread_b = ThreadId::new();
statuses.insert(thread_b, AgentStatus::Completed(Some("done".to_string())));
statuses.insert(thread_a, AgentStatus::Completed(Some("done".to_string())));
let entries = build_wait_agent_statuses(&statuses, &receiver_agents);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].thread_id, thread_a);
assert_eq!(entries[1].thread_id, thread_b);
}
}

View File

@@ -2,6 +2,7 @@ use super::*;
use crate::ThreadManager;
use crate::built_in_model_providers;
use crate::codex::make_session_and_context;
use crate::codex::make_session_and_context_with_rx;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
use crate::config::types::ShellEnvironmentPolicy;
use crate::function_tool::FunctionCallError;
@@ -27,8 +28,11 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::InitialHistory;
@@ -163,6 +167,20 @@ where
}
}
async fn wait_for_collab_spawn_end_event(
rx: &async_channel::Receiver<Event>,
) -> CollabAgentSpawnEndEvent {
loop {
let event = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("collab spawn-end event timed out")
.expect("collab spawn-end event missing");
if let EventMsg::CollabAgentSpawnEnd(event) = event.msg {
return event;
}
}
}
#[derive(Debug, Deserialize)]
struct ListAgentsResult {
agents: Vec<ListedAgentResult>,
@@ -397,22 +415,175 @@ async fn multi_agent_v2_spawn_rejects_legacy_items_field() {
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let (session, turn) = make_session_and_context().await;
let (session, turn, rx) = make_session_and_context_with_rx().await;
let invocation = invocation(
Arc::new(session),
Arc::new(turn),
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({"message": "hello"})),
);
let Err(err) = SpawnAgentHandler.handle(invocation).await else {
panic!("spawn should fail without a manager");
};
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
assert_eq!(spawn_end_event.call_id, "call-1");
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
assert_eq!(spawn_end_event.new_thread_id, None);
assert_eq!(spawn_end_event.new_agent_nickname, None);
assert_eq!(spawn_end_event.new_agent_role, None);
assert_eq!(spawn_end_event.prompt, "hello");
assert_eq!(spawn_end_event.model, "");
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::default());
assert!(matches!(
spawn_end_event.status,
AgentStatus::Errored(ref message) if message.contains("thread manager dropped")
));
assert_eq!(
err,
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
);
}
#[tokio::test]
async fn multi_agent_v2_spawn_agent_errors_when_manager_dropped() {
let (session, mut turn, rx) = make_session_and_context_with_rx().await;
let turn_context = Arc::get_mut(&mut turn).expect("single turn context ref");
let mut config = (*turn_context.config).clone();
config
.features
.enable(Feature::MultiAgentV2)
.expect("test config should allow feature update");
turn_context.config = Arc::new(config);
let invocation = invocation(
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "inspect this repo",
"task_name": "worker"
})),
);
let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else {
panic!("spawn should fail without a manager");
};
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
assert_eq!(spawn_end_event.call_id, "call-1");
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
assert_eq!(spawn_end_event.new_thread_id, None);
assert_eq!(spawn_end_event.new_agent_nickname, None);
assert_eq!(spawn_end_event.new_agent_role, None);
assert_eq!(spawn_end_event.prompt, "inspect this repo");
assert_eq!(spawn_end_event.model, "");
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::default());
assert!(matches!(
spawn_end_event.status,
AgentStatus::Errored(ref message) if message.contains("thread manager dropped")
));
assert_eq!(
err,
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
);
}
#[tokio::test]
async fn spawn_retry_preempted_event_omits_thread_identity() {
let (session, turn, rx) = make_session_and_context_with_rx().await;
send_collab_agent_spawn_retry_preempted_event(
session.as_ref(),
turn.as_ref(),
"call-1".to_string(),
"inspect this repo".to_string(),
"gpt-5.4-mini".to_string(),
ReasoningEffort::Medium,
AgentStatus::PendingInit,
)
.await;
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
assert_eq!(spawn_end_event.call_id, "call-1");
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
assert_eq!(spawn_end_event.new_thread_id, None);
assert_eq!(spawn_end_event.new_agent_nickname, None);
assert_eq!(spawn_end_event.new_agent_role, None);
assert_eq!(spawn_end_event.prompt, "inspect this repo");
assert_eq!(spawn_end_event.model, "gpt-5.4-mini");
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::Medium);
assert_eq!(spawn_end_event.status, AgentStatus::PendingInit);
}
#[tokio::test]
async fn spawn_async_quota_probe_accepts_running_child() {
let decision = probe_spawn_attempt_for_async_quota_exhaustion(
AgentStatus::Running,
ThreadId::default(),
&crate::agent::control::AgentControl::default(),
)
.await;
assert!(matches!(
decision,
SpawnAttemptRetryDecision::Accept(AgentStatus::Running)
));
}
#[tokio::test]
async fn close_quota_exhausted_spawn_attempt_accepts_child_that_started_running() {
let (_session, turn) = make_session_and_context().await;
let manager = thread_manager();
let thread = manager
.start_thread((*turn.config).clone())
.await
.expect("child thread should start");
let active_turn = thread.thread.codex.session.new_default_turn().await;
thread
.thread
.codex
.session
.spawn_task(
Arc::clone(&active_turn),
vec![UserInput::Text {
text: "working".to_string(),
text_elements: Vec::new(),
}],
NeverEndingTask,
)
.await;
timeout(Duration::from_secs(1), async {
loop {
if manager.agent_control().get_status(thread.thread_id).await == AgentStatus::Running {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("child should reach running");
let decision = close_quota_exhausted_spawn_attempt(
&manager.agent_control(),
thread.thread_id,
AgentStatus::PendingInit,
)
.await;
assert!(matches!(
decision,
SpawnAttemptRetryDecision::Accept(AgentStatus::Running)
));
assert_eq!(
manager.agent_control().get_status(thread.thread_id).await,
AgentStatus::Running
);
let _ = thread
.thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
}
#[tokio::test]
async fn multi_agent_v2_spawn_returns_path_and_send_message_accepts_relative_path() {
#[derive(Debug, Deserialize)]

View File

@@ -17,7 +17,6 @@ use codex_protocol::models::ResponseInputItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;

View File

@@ -40,7 +40,8 @@ impl ToolHandler for Handler {
.map(str::trim)
.filter(|role| !role.is_empty());
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
let initial_operation =
parse_collab_input(Some(args.message.clone()), /*items*/ None)?;
let prompt = render_input_preview(&initial_operation);
let session_source = turn.session_source.clone();
@@ -51,34 +52,8 @@ impl ToolHandler for Handler {
"Agent depth limit reached. Solve the task yourself.".to_string(),
));
}
session
.send_event(
&turn,
CollabAgentSpawnBeginEvent {
call_id: call_id.clone(),
sender_thread_id: session.conversation_id,
prompt: prompt.clone(),
model: args.model.clone().unwrap_or_default(),
reasoning_effort: args.reasoning_effort.unwrap_or_default(),
}
.into(),
)
.await;
let mut config =
let config =
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut config,
args.model.as_deref(),
args.reasoning_effort,
)
.await?;
apply_role_to_config(&mut config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut config, child_depth);
let spawn_source = thread_spawn_source(
session.conversation_id,
@@ -87,47 +62,148 @@ impl ToolHandler for Handler {
role_name,
Some(args.task_name.clone()),
)?;
let result = session
.services
.agent_control
.spawn_agent_with_metadata(
config,
match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
}
(_, initial_operation) => initial_operation,
},
Some(spawn_source),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode,
},
)
.await
.map_err(collab_spawn_error);
let (new_thread_id, new_agent_metadata, status) = match &result {
Ok(spawned_agent) => (
Some(spawned_agent.thread_id),
Some(spawned_agent.metadata.clone()),
spawned_agent.status.clone(),
),
Err(_) => (None, None, AgentStatus::NotFound),
let initial_agent_op = match (spawn_source.get_agent_path(), initial_operation) {
(Some(recipient), Op::UserInput { items, .. })
if items
.iter()
.all(|item| matches!(item, UserInput::Text { .. })) =>
{
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
recipient,
Vec::new(),
prompt.clone(),
/*trigger_turn*/ true,
),
}
}
(_, initial_operation) => initial_operation,
};
let mut candidates_to_try = collect_spawn_agent_model_candidates(
args.model_fallback_list.as_ref(),
args.model.as_deref(),
args.reasoning_effort,
);
if candidates_to_try.is_empty() {
candidates_to_try.push(SpawnAgentModelCandidate {
model: None,
reasoning_effort: None,
});
}
let mut spawn_result = None;
for (idx, candidate) in candidates_to_try.iter().enumerate() {
let attempt_call_id = spawn_attempt_event_call_id(&call_id, idx);
let candidate_model = candidate.model.clone().unwrap_or_default();
let candidate_reasoning_effort = candidate.reasoning_effort.unwrap_or_default();
send_collab_agent_spawn_begin_event(
&session,
&turn,
attempt_call_id.clone(),
prompt.clone(),
candidate_model.clone(),
candidate_reasoning_effort,
)
.await;
let mut candidate_config = config.clone();
apply_requested_spawn_agent_model_overrides(
&session,
turn.as_ref(),
&mut candidate_config,
candidate.model.as_deref(),
candidate.reasoning_effort,
)
.await?;
apply_role_to_config(&mut candidate_config, role_name)
.await
.map_err(FunctionCallError::RespondToModel)?;
apply_spawn_agent_runtime_overrides(&mut candidate_config, turn.as_ref())?;
apply_spawn_agent_overrides(&mut candidate_config, child_depth);
let attempt_result = session
.services
.agent_control
.spawn_agent_with_metadata(
candidate_config,
initial_agent_op.clone(),
Some(spawn_source.clone()),
SpawnAgentOptions {
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
fork_mode: fork_mode.clone(),
},
)
.await;
match attempt_result {
Ok(spawned_agent) => {
let status = if idx + 1 < candidates_to_try.len() {
match probe_spawn_attempt_for_async_quota_exhaustion(
spawned_agent.status.clone(),
spawned_agent.thread_id,
&session.services.agent_control,
)
.await
{
SpawnAttemptRetryDecision::Accept(status) => status,
SpawnAttemptRetryDecision::Retry(retry_status) => {
match close_quota_exhausted_spawn_attempt(
&session.services.agent_control,
spawned_agent.thread_id,
retry_status,
)
.await
{
SpawnAttemptRetryDecision::Accept(status) => status,
SpawnAttemptRetryDecision::Retry(status) => {
send_collab_agent_spawn_retry_preempted_event(
&session,
&turn,
attempt_call_id,
prompt.clone(),
candidate_model,
candidate_reasoning_effort,
status,
)
.await;
continue;
}
}
}
}
} else {
spawned_agent.status.clone()
};
spawn_result = Some((spawned_agent, status, attempt_call_id));
break;
}
Err(err) => {
send_collab_agent_spawn_error_event(
&session,
&turn,
attempt_call_id,
prompt.clone(),
candidate_model,
candidate_reasoning_effort,
&err,
)
.await;
if spawn_should_retry_on_quota_exhaustion(&err)
&& idx + 1 < candidates_to_try.len()
{
continue;
}
return Err(collab_spawn_error(err));
}
}
}
let Some((spawned_agent, status, spawn_event_call_id)) = spawn_result else {
return Err(FunctionCallError::RespondToModel(
"No spawn attempts were executed".to_string(),
));
};
let new_thread_id = Some(spawned_agent.thread_id);
let new_agent_metadata = Some(spawned_agent.metadata.clone());
let agent_snapshot = match new_thread_id {
Some(thread_id) => {
session
@@ -165,7 +241,7 @@ impl ToolHandler for Handler {
.send_event(
&turn,
CollabAgentSpawnEndEvent {
call_id,
call_id: spawn_event_call_id,
sender_thread_id: session.conversation_id,
new_thread_id,
new_agent_nickname,
@@ -178,7 +254,6 @@ impl ToolHandler for Handler {
.into(),
)
.await;
let _ = result?;
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
turn.session_telemetry.counter(
"codex.multi_agent.spawn",
@@ -206,6 +281,7 @@ struct SpawnAgentArgs {
task_name: String,
agent_type: Option<String>,
model: Option<String>,
model_fallback_list: Option<Vec<SpawnAgentModelFallbackCandidate>>,
reasoning_effort: Option<ReasoningEffort>,
fork_turns: Option<String>,
fork_context: Option<bool>,

View File

@@ -4,6 +4,14 @@ use codex_core::config::AgentRoleConfig;
use codex_features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -17,7 +25,10 @@ use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::time::Duration;
use tokio::time::Instant;
@@ -36,6 +47,10 @@ const REQUESTED_MODEL: &str = "gpt-5.1";
const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
const ROLE_MODEL: &str = "gpt-5.1-codex-max";
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
const FALLBACK_MODEL_A: &str = "gpt-5.1";
const FALLBACK_REASONING_EFFORT_A: ReasoningEffort = ReasoningEffort::Low;
const FALLBACK_MODEL_B: &str = "gpt-5.2-codex";
const FALLBACK_REASONING_EFFORT_B: ReasoningEffort = ReasoningEffort::Medium;
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
let is_zstd = req
@@ -57,6 +72,57 @@ fn body_contains(req: &wiremock::Request, text: &str) -> bool {
.is_some_and(|body| body.contains(text))
}
fn request_uses_model_and_effort(
req: &wiremock::Request,
model: &str,
reasoning_effort: &str,
) -> bool {
let is_zstd = req
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| {
value
.split(',')
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
});
let bytes = if is_zstd {
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
} else {
Some(req.body.clone())
};
bytes
.and_then(|body| serde_json::from_slice::<Value>(&body).ok())
.is_some_and(|body| {
body.get("model").and_then(Value::as_str) == Some(model)
&& body
.get("reasoning")
.and_then(|reasoning| reasoning.get("effort"))
.and_then(Value::as_str)
== Some(reasoning_effort)
})
}
fn request_uses_model(req: &wiremock::Request, model: &str) -> bool {
let is_zstd = req
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| {
value
.split(',')
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
});
let bytes = if is_zstd {
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
} else {
Some(req.body.clone())
};
bytes
.and_then(|body| serde_json::from_slice::<Value>(&body).ok())
.is_some_and(|body| body.get("model").and_then(Value::as_str) == Some(model))
}
fn has_subagent_notification(req: &ResponsesRequest) -> bool {
req.message_input_texts("user")
.iter()
@@ -102,7 +168,7 @@ fn role_block(description: &str, role_name: &str) -> Option<String> {
}
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
let deadline = Instant::now() + Duration::from_secs(2);
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let ids = test.thread_manager.list_thread_ids().await;
if let Some(spawned_id) = ids
@@ -134,6 +200,61 @@ async fn wait_for_requests(
}
}
async fn submit_turn_and_wait_for_spawn_attempt_events(
test: &TestCodex,
prompt: &str,
expected_attempts: usize,
) -> Result<Vec<(CollabAgentSpawnBeginEvent, CollabAgentSpawnEndEvent)>> {
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: prompt.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: test.session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let turn_id = wait_for_event_match(&test.codex, |event| match event {
EventMsg::TurnStarted(event) => Some(event.turn_id.clone()),
_ => None,
})
.await;
let mut spawn_events = Vec::with_capacity(expected_attempts);
let mut pending_begin = None;
loop {
let event = wait_for_event(&test.codex, |_| true).await;
match event {
EventMsg::CollabAgentSpawnBegin(event) => {
pending_begin = Some(event);
}
EventMsg::CollabAgentSpawnEnd(event) => {
let begin_event = pending_begin
.take()
.ok_or_else(|| anyhow::anyhow!("spawn end event without matching begin"))?;
spawn_events.push((begin_event, event));
}
EventMsg::TurnComplete(event) if event.turn_id == turn_id => break,
_ => {}
}
}
if let Some(begin_event) = pending_begin {
anyhow::bail!("spawn begin event without matching end: {begin_event:?}");
}
assert_eq!(spawn_events.len(), expected_attempts);
Ok(spawn_events)
}
async fn setup_turn_one_with_spawned_child(
server: &MockServer,
child_response_delay: Option<Duration>,
@@ -480,6 +601,229 @@ async fn spawn_agent_role_overrides_requested_model_and_reasoning_settings() ->
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agent_model_fallback_list_retries_after_quota_exhaustion() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"model_fallback_list": [
{
"model": FALLBACK_MODEL_A,
"reasoning_effort": FALLBACK_REASONING_EFFORT_A,
},
{
"model": FALLBACK_MODEL_B,
"reasoning_effort": FALLBACK_REASONING_EFFORT_B,
}
]
}))?;
mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
sse(vec![
ev_response_created("resp-turn1-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-turn1-1"),
]),
)
.await;
let quota_child_attempt = mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT)
&& request_uses_model_and_effort(req, FALLBACK_MODEL_A, "low")
&& !body_contains(req, SPAWN_CALL_ID)
},
sse(vec![
ev_response_created("resp-child-quota"),
json!({
"type": "response.failed",
"response": {
"id": "resp-child-quota",
"error": {
"code": "insufficient_quota",
"message": "You exceeded your current quota, please check your plan and billing details."
}
}
}),
]),
)
.await;
let fallback_child_attempt = mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT)
&& request_uses_model(req, FALLBACK_MODEL_B)
&& !body_contains(req, SPAWN_CALL_ID)
},
sse(vec![
ev_response_created("resp-child-fallback"),
ev_assistant_message("msg-child-fallback", "child done"),
ev_completed("resp-child-fallback"),
]),
)
.await;
let _turn1_followup = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
ev_completed("resp-turn1-2"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Collab)
.expect("test config should allow feature update");
config.model = Some(INHERITED_MODEL.to_string());
config.model_reasoning_effort = Some(INHERITED_REASONING_EFFORT);
});
let test = builder.build(&server).await?;
let spawn_events = submit_turn_and_wait_for_spawn_attempt_events(
&test,
TURN_1_PROMPT,
/*expected_attempts*/ 2,
)
.await?;
let (quota_begin_event, quota_end_event) = &spawn_events[0];
assert_eq!(quota_begin_event.call_id, SPAWN_CALL_ID);
assert_eq!(quota_begin_event.prompt, CHILD_PROMPT);
assert_eq!(quota_begin_event.model, FALLBACK_MODEL_A);
assert_eq!(
quota_begin_event.reasoning_effort,
FALLBACK_REASONING_EFFORT_A
);
assert_eq!(quota_end_event.call_id, SPAWN_CALL_ID);
assert_eq!(quota_end_event.new_thread_id, None);
assert_eq!(quota_end_event.new_agent_nickname, None);
assert_eq!(quota_end_event.new_agent_role, None);
assert_eq!(quota_end_event.prompt, CHILD_PROMPT);
assert_eq!(quota_end_event.model, FALLBACK_MODEL_A);
assert_eq!(
quota_end_event.reasoning_effort,
FALLBACK_REASONING_EFFORT_A
);
match &quota_end_event.status {
AgentStatus::PendingInit => {}
AgentStatus::Errored(message) if message.to_lowercase().contains("quota") => {}
status => panic!("unexpected first-attempt retry status: {status:?}"),
}
let (fallback_begin_event, fallback_end_event) = &spawn_events[1];
assert_eq!(fallback_begin_event.call_id, format!("{SPAWN_CALL_ID}#2"));
assert_eq!(fallback_begin_event.prompt, CHILD_PROMPT);
assert_eq!(fallback_begin_event.model, FALLBACK_MODEL_B);
assert_eq!(
fallback_begin_event.reasoning_effort,
FALLBACK_REASONING_EFFORT_B
);
assert_eq!(fallback_end_event.call_id, format!("{SPAWN_CALL_ID}#2"));
assert_eq!(fallback_end_event.prompt, CHILD_PROMPT);
assert_eq!(fallback_end_event.model, FALLBACK_MODEL_B);
assert_eq!(
fallback_end_event.reasoning_effort,
FALLBACK_REASONING_EFFORT_B
);
let quota_requests = quota_child_attempt
.requests()
.into_iter()
.filter(|request| {
request.body_json().get("model").and_then(Value::as_str) == Some(FALLBACK_MODEL_A)
})
.collect::<Vec<_>>();
assert!(!quota_requests.is_empty());
for quota_request in &quota_requests {
let body = quota_request.body_json();
assert_eq!(
body.get("model").and_then(Value::as_str),
Some(FALLBACK_MODEL_A)
);
assert_eq!(
body.get("reasoning")
.and_then(|reasoning| reasoning.get("effort"))
.and_then(Value::as_str),
Some("low")
);
}
let fallback_requests = wait_for_requests(&fallback_child_attempt)
.await?
.into_iter()
.filter(|request| {
request.body_json().get("model").and_then(Value::as_str) == Some(FALLBACK_MODEL_B)
})
.collect::<Vec<_>>();
assert!(!fallback_requests.is_empty());
for fallback_request in &fallback_requests {
let fallback_body = fallback_request.body_json();
assert_eq!(
fallback_body.get("model").and_then(Value::as_str),
Some(FALLBACK_MODEL_B)
);
if let Some(effort) = fallback_body
.get("reasoning")
.and_then(|reasoning| reasoning.get("effort"))
.and_then(Value::as_str)
{
assert_eq!(effort, "medium");
}
}
let deadline = Instant::now() + Duration::from_secs(2);
let child_snapshot = loop {
let spawned_ids = test
.thread_manager
.list_thread_ids()
.await
.into_iter()
.filter(|id| *id != test.session_configured.session_id)
.collect::<Vec<_>>();
let mut matching_snapshot = None;
for thread_id in spawned_ids {
let snapshot = test
.thread_manager
.get_thread(thread_id)
.await?
.config_snapshot()
.await;
if snapshot.model == FALLBACK_MODEL_B
&& snapshot.reasoning_effort == Some(FALLBACK_REASONING_EFFORT_B)
{
matching_snapshot = Some(snapshot);
break;
}
}
if let Some(snapshot) = matching_snapshot {
break snapshot;
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for fallback child snapshot");
}
sleep(Duration::from_millis(10)).await;
};
assert_eq!(child_snapshot.model, FALLBACK_MODEL_B);
assert_eq!(
child_snapshot.reasoning_effort,
Some(FALLBACK_REASONING_EFFORT_B)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agent_tool_description_mentions_role_locked_settings() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -545,6 +545,41 @@ fn create_collab_input_items_schema() -> JsonSchema {
}
}
fn spawn_agent_model_fallback_list_schema() -> JsonSchema {
let model_fallback_item_properties = BTreeMap::from([
(
"model".to_string(),
JsonSchema::String {
description: Some(
"Model to try. Must be a model slug from the current model picker list."
.to_string(),
),
},
),
(
"reasoning_effort".to_string(),
JsonSchema::String {
description: Some(
"Optional reasoning effort override for this candidate. Replaces the inherited reasoning effort."
.to_string(),
),
},
),
]);
JsonSchema::Array {
items: Box::new(JsonSchema::Object {
properties: model_fallback_item_properties,
required: Some(vec!["model".to_string()]),
additional_properties: Some(false.into()),
}),
description: Some(
"Ordered model candidates for fallback retries. Each entry may include an optional reasoning effort."
.to_string(),
),
}
}
fn spawn_agent_common_properties_v1(agent_type_description: &str) -> BTreeMap<String, JsonSchema> {
BTreeMap::from([
(
@@ -590,6 +625,10 @@ fn spawn_agent_common_properties_v1(agent_type_description: &str) -> BTreeMap<St
),
},
),
(
"model_fallback_list".to_string(),
spawn_agent_model_fallback_list_schema(),
),
])
}
@@ -634,6 +673,10 @@ fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<St
),
},
),
(
"model_fallback_list".to_string(),
spawn_agent_model_fallback_list_schema(),
),
])
}

View File

@@ -70,12 +70,44 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
required,
Some(vec!["task_name".to_string(), "message".to_string()])
);
let Some(JsonSchema::Array { items, .. }) = properties.get("model_fallback_list") else {
panic!("spawn_agent v2 should define model_fallback_list as an array");
};
let JsonSchema::Object {
properties: model_fallback_item_properties,
required: Some(model_fallback_item_required),
..
} = items.as_ref()
else {
panic!("spawn_agent v2 model_fallback_list items should be objects");
};
assert!(model_fallback_item_properties.contains_key("model"));
assert!(model_fallback_item_properties.contains_key("reasoning_effort"));
assert_eq!(model_fallback_item_required, &vec!["model".to_string()]);
assert_eq!(
output_schema.expect("spawn_agent output schema")["required"],
json!(["agent_id", "task_name", "nickname"])
);
}
#[test]
fn spawn_agent_tool_v1_includes_model_fallback_list() {
let tool = create_spawn_agent_tool_v1(SpawnAgentToolOptions {
available_models: &[],
agent_type_description: "role help".to_string(),
});
let ToolSpec::Function(ResponsesApiTool { parameters, .. }) = tool else {
panic!("spawn_agent should be a function tool");
};
let JsonSchema::Object { properties, .. } = parameters else {
panic!("spawn_agent should use object params");
};
let Some(JsonSchema::Array { .. }) = properties.get("model_fallback_list") else {
panic!("spawn_agent v1 should define model_fallback_list as an array");
};
}
#[test]
fn spawn_agent_tool_v1_keeps_legacy_fork_context_field() {
let tool = create_spawn_agent_tool_v1(SpawnAgentToolOptions {

View File

@@ -769,6 +769,35 @@ mod tests {
assert_eq!(title.spans[6].style.fg, Some(Color::Magenta));
}
#[test]
fn spawn_end_without_receiver_renders_failed_spawn_attempt() {
let sender_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000001")
.expect("valid sender thread id");
let cell = spawn_end(
CollabAgentSpawnEndEvent {
call_id: "call-spawn".to_string(),
sender_thread_id,
new_thread_id: None,
new_agent_nickname: None,
new_agent_role: None,
prompt: "inspect the repo".to_string(),
model: "gpt-5".to_string(),
reasoning_effort: ReasoningEffortConfig::High,
status: AgentStatus::PendingInit,
},
Some(&SpawnRequestSummary {
model: "gpt-5".to_string(),
reasoning_effort: ReasoningEffortConfig::High,
}),
);
assert_eq!(
cell_to_text(&cell),
"• Agent spawn failed\n └ inspect the repo"
);
}
#[test]
fn collab_resume_interrupted_snapshot() {
let sender_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000001")