diff --git a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs index 9bb7b6055c..3e9360d9e7 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs @@ -53,55 +53,94 @@ impl ToolHandler for Handler { call_id: call_id.clone(), sender_thread_id: session.conversation_id, prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.unwrap_or_default(), + model: args + .model_fallback_list + .as_ref() + .and_then(|list| list.first()) + .map(|candidate| candidate.model.clone()) + .unwrap_or_else(|| args.model.clone().unwrap_or_default()), + reasoning_effort: args + .model_fallback_list + .as_ref() + .and_then(|list| list.first()) + .and_then(|candidate| candidate.reasoning_effort) + .unwrap_or_else(|| args.reasoning_effort.unwrap_or_default()), } .into(), ) .await; - let mut config = + let config = build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; - apply_requested_spawn_agent_model_overrides( - &session, - turn.as_ref(), - &mut config, + + let mut candidates_to_try = collect_spawn_agent_model_candidates( + args.model_fallback_list.as_ref(), args.model.as_deref(), args.reasoning_effort, - ) - .await?; - apply_role_to_config(&mut config, role_name) - .await - .map_err(FunctionCallError::RespondToModel)?; - apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; - apply_spawn_agent_overrides(&mut config, child_depth); + ); + if candidates_to_try.is_empty() { + candidates_to_try.push(SpawnAgentModelCandidate { + model: None, + reasoning_effort: None, + }); + } - let result = session - .services - .agent_control - .spawn_agent_with_metadata( - config, - input_items, - Some(thread_spawn_source( - session.conversation_id, - &turn.session_source, - child_depth, - role_name, - /*task_name*/ None, - )?), - SpawnAgentOptions { - fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), - }, + let mut spawn_result = None; + for (idx, candidate) in candidates_to_try.iter().enumerate() { + let mut candidate_config = config.clone(); + apply_requested_spawn_agent_model_overrides( + &session, + turn.as_ref(), + &mut candidate_config, + candidate.model.as_deref(), + candidate.reasoning_effort, ) - .await - .map_err(collab_spawn_error); - let (new_thread_id, new_agent_metadata, status) = match &result { - Ok(spawned_agent) => ( - Some(spawned_agent.thread_id), - Some(spawned_agent.metadata.clone()), - spawned_agent.status.clone(), - ), - Err(_) => (None, None, AgentStatus::NotFound), + .await?; + apply_role_to_config(&mut candidate_config, role_name) + .await + .map_err(FunctionCallError::RespondToModel)?; + apply_spawn_agent_runtime_overrides(&mut candidate_config, turn.as_ref())?; + apply_spawn_agent_overrides(&mut candidate_config, child_depth); + let attempt_result = session + .services + .agent_control + .spawn_agent_with_metadata( + candidate_config, + input_items.clone(), + Some(thread_spawn_source( + session.conversation_id, + &turn.session_source, + child_depth, + role_name, + /*task_name*/ None, + )?), + SpawnAgentOptions { + fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), + }, + ) + .await; + match attempt_result { + Ok(spawned_agent) => { + spawn_result = Some(spawned_agent); + break; + } + Err(err) => { + if spawn_should_retry_on_quota_exhaustion(&err) + && idx + 1 < candidates_to_try.len() + { + continue; + } + return Err(collab_spawn_error(err)); + } + } + } + let Some(spawned_agent) = spawn_result else { + return Err(FunctionCallError::RespondToModel( + "No spawn attempts were executed".to_string(), + )); }; + let new_thread_id = Some(spawned_agent.thread_id); + let new_agent_metadata = Some(spawned_agent.metadata.clone()); + let status = spawned_agent.status.clone(); let agent_snapshot = match new_thread_id { Some(thread_id) => { session @@ -152,7 +191,7 @@ impl ToolHandler for Handler { .into(), ) .await; - let new_thread_id = result?.thread_id; + let new_thread_id = spawned_agent.thread_id; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( "codex.multi_agent.spawn", @@ -173,6 +212,7 @@ struct SpawnAgentArgs { items: Option>, agent_type: Option, model: Option, + model_fallback_list: Option>, reasoning_effort: Option, #[serde(default)] fork_context: bool, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_common.rs b/codex-rs/core/src/tools/handlers/multi_agents_common.rs index dd68465a7f..de7a703363 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_common.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_common.rs @@ -21,6 +21,7 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::user_input::UserInput; +use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; use std::collections::HashMap; @@ -71,6 +72,51 @@ where }) } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct SpawnAgentModelCandidate { + pub(crate) model: Option, + pub(crate) reasoning_effort: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +pub(crate) struct SpawnAgentModelFallbackCandidate { + pub(crate) model: String, + #[serde(default)] + pub(crate) reasoning_effort: Option, +} + +pub(crate) fn collect_spawn_agent_model_candidates( + model_fallback_list: Option<&Vec>, + requested_model: Option<&str>, + requested_reasoning_effort: Option, +) -> Vec { + if let Some(model_fallback_list) = model_fallback_list { + return model_fallback_list + .iter() + .map(|candidate| SpawnAgentModelCandidate { + model: Some(candidate.model.clone()), + reasoning_effort: candidate.reasoning_effort, + }) + .collect(); + } + + let mut candidates = Vec::new(); + if requested_model.is_some() || requested_reasoning_effort.is_some() { + candidates.push(SpawnAgentModelCandidate { + model: requested_model.map(ToString::to_string), + reasoning_effort: requested_reasoning_effort, + }); + } + candidates +} + +pub(crate) fn spawn_should_retry_on_quota_exhaustion(error: &CodexErr) -> bool { + matches!( + error, + CodexErr::QuotaExceeded | CodexErr::UsageLimitReached(_) + ) +} + pub(crate) fn build_wait_agent_statuses( statuses: &HashMap, receiver_agents: &[CollabAgentRef], @@ -363,3 +409,111 @@ fn validate_spawn_agent_reasoning_effort( "Reasoning effort `{requested_reasoning_effort}` is not supported for model `{model}`. Supported reasoning efforts: {supported}" ))) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::UsageLimitReachedError; + use crate::protocol::AgentStatus; + + #[test] + fn collect_spawn_agent_model_candidates_prefers_fallback_list() { + let candidates = collect_spawn_agent_model_candidates( + Some(&vec![ + SpawnAgentModelFallbackCandidate { + model: "fallback-a".to_string(), + reasoning_effort: Some(ReasoningEffort::High), + }, + SpawnAgentModelFallbackCandidate { + model: "fallback-b".to_string(), + reasoning_effort: Some(ReasoningEffort::Minimal), + }, + ]), + Some("legacy-model"), + Some(ReasoningEffort::Low), + ); + + assert_eq!( + candidates, + vec![ + SpawnAgentModelCandidate { + model: Some("fallback-a".to_string()), + reasoning_effort: Some(ReasoningEffort::High), + }, + SpawnAgentModelCandidate { + model: Some("fallback-b".to_string()), + reasoning_effort: Some(ReasoningEffort::Minimal), + }, + ] + ); + } + + #[test] + fn collect_spawn_agent_model_candidates_falls_back_to_legacy_args() { + let candidates = collect_spawn_agent_model_candidates( + /*model_fallback_list*/ None, + Some("legacy-model"), + Some(ReasoningEffort::Minimal), + ); + assert_eq!( + candidates, + vec![SpawnAgentModelCandidate { + model: Some("legacy-model".to_string()), + reasoning_effort: Some(ReasoningEffort::Minimal), + }] + ); + } + + #[test] + fn collect_spawn_agent_model_candidates_empty_when_no_model_is_set() { + let candidates = collect_spawn_agent_model_candidates( + /*model_fallback_list*/ None, /*requested_model*/ None, + /*requested_reasoning_effort*/ None, + ); + assert_eq!(candidates, Vec::new()); + } + + #[test] + fn spawn_should_retry_on_quota_exhaustion_checks_expected_error_variants() { + assert!(spawn_should_retry_on_quota_exhaustion( + &CodexErr::QuotaExceeded + )); + assert!(spawn_should_retry_on_quota_exhaustion( + &CodexErr::UsageLimitReached(UsageLimitReachedError { + plan_type: None, + resets_at: None, + rate_limits: None, + promo_message: None, + }) + )); + assert!(!spawn_should_retry_on_quota_exhaustion( + &CodexErr::UnsupportedOperation("thread manager dropped".to_string()) + )); + } + + #[test] + fn collab_spawn_error_handles_thread_manager_drop() { + assert_eq!( + collab_spawn_error(CodexErr::UnsupportedOperation( + "thread manager dropped".to_string() + )), + FunctionCallError::RespondToModel("collab manager unavailable".to_string()) + ); + } + + #[test] + fn build_wait_agent_statuses_includes_extras_in_sorted_order() { + let receiver_agents = vec![]; + let mut statuses = HashMap::new(); + let thread_a = ThreadId::new(); + let thread_b = ThreadId::new(); + statuses.insert(thread_b, AgentStatus::Completed(Some("done".to_string()))); + statuses.insert(thread_a, AgentStatus::Completed(Some("done".to_string()))); + + let entries = build_wait_agent_statuses(&statuses, &receiver_agents); + + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].thread_id, thread_a); + assert_eq!(entries[1].thread_id, thread_b); + } +} diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index ffe128b43e..eff49c0447 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -56,27 +56,24 @@ impl ToolHandler for Handler { call_id: call_id.clone(), sender_thread_id: session.conversation_id, prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.unwrap_or_default(), + model: args + .model_fallback_list + .as_ref() + .and_then(|list| list.first()) + .map(|candidate| candidate.model.clone()) + .unwrap_or_else(|| args.model.clone().unwrap_or_default()), + reasoning_effort: args + .model_fallback_list + .as_ref() + .and_then(|list| list.first()) + .and_then(|candidate| candidate.reasoning_effort) + .unwrap_or_else(|| args.reasoning_effort.unwrap_or_default()), } .into(), ) .await; - let mut config = + let config = build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; - apply_requested_spawn_agent_model_overrides( - &session, - turn.as_ref(), - &mut config, - args.model.as_deref(), - args.reasoning_effort, - ) - .await?; - apply_role_to_config(&mut config, role_name) - .await - .map_err(FunctionCallError::RespondToModel)?; - apply_spawn_agent_runtime_overrides(&mut config, turn.as_ref())?; - apply_spawn_agent_overrides(&mut config, child_depth); let spawn_source = thread_spawn_source( session.conversation_id, @@ -85,46 +82,89 @@ impl ToolHandler for Handler { role_name, Some(args.task_name.clone()), )?; - let result = session - .services - .agent_control - .spawn_agent_with_metadata( - config, - match (spawn_source.get_agent_path(), initial_operation) { - (Some(recipient), Op::UserInput { items, .. }) - if items - .iter() - .all(|item| matches!(item, UserInput::Text { .. })) => - { - Op::InterAgentCommunication { - communication: InterAgentCommunication::new( - turn.session_source - .get_agent_path() - .unwrap_or_else(AgentPath::root), - recipient, - Vec::new(), - prompt.clone(), - /*trigger_turn*/ true, - ), - } - } - (_, initial_operation) => initial_operation, - }, - Some(spawn_source), - SpawnAgentOptions { - fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), - }, - ) - .await - .map_err(collab_spawn_error); - let (new_thread_id, new_agent_metadata, status) = match &result { - Ok(spawned_agent) => ( - Some(spawned_agent.thread_id), - Some(spawned_agent.metadata.clone()), - spawned_agent.status.clone(), - ), - Err(_) => (None, None, AgentStatus::NotFound), + let initial_agent_op = match (spawn_source.get_agent_path(), initial_operation) { + (Some(recipient), Op::UserInput { items, .. }) + if items + .iter() + .all(|item| matches!(item, UserInput::Text { .. })) => + { + Op::InterAgentCommunication { + communication: InterAgentCommunication::new( + turn.session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root), + recipient, + Vec::new(), + prompt.clone(), + /*trigger_turn*/ true, + ), + } + } + (_, initial_operation) => initial_operation, }; + let mut candidates_to_try = collect_spawn_agent_model_candidates( + args.model_fallback_list.as_ref(), + args.model.as_deref(), + args.reasoning_effort, + ); + if candidates_to_try.is_empty() { + candidates_to_try.push(SpawnAgentModelCandidate { + model: None, + reasoning_effort: None, + }); + } + + let mut spawn_result = None; + for (idx, candidate) in candidates_to_try.iter().enumerate() { + let mut candidate_config = config.clone(); + apply_requested_spawn_agent_model_overrides( + &session, + turn.as_ref(), + &mut candidate_config, + candidate.model.as_deref(), + candidate.reasoning_effort, + ) + .await?; + apply_role_to_config(&mut candidate_config, role_name) + .await + .map_err(FunctionCallError::RespondToModel)?; + apply_spawn_agent_runtime_overrides(&mut candidate_config, turn.as_ref())?; + apply_spawn_agent_overrides(&mut candidate_config, child_depth); + let attempt_result = session + .services + .agent_control + .spawn_agent_with_metadata( + candidate_config, + initial_agent_op.clone(), + Some(spawn_source.clone()), + SpawnAgentOptions { + fork_parent_spawn_call_id: args.fork_context.then(|| call_id.clone()), + }, + ) + .await; + match attempt_result { + Ok(spawned_agent) => { + spawn_result = Some(spawned_agent); + break; + } + Err(err) => { + if spawn_should_retry_on_quota_exhaustion(&err) + && idx + 1 < candidates_to_try.len() + { + continue; + } + return Err(collab_spawn_error(err)); + } + } + } + let Some(spawned_agent) = spawn_result else { + return Err(FunctionCallError::RespondToModel( + "No spawn attempts were executed".to_string(), + )); + }; + let new_thread_id = Some(spawned_agent.thread_id); + let new_agent_metadata = Some(spawned_agent.metadata.clone()); + let status = spawned_agent.status.clone(); let agent_snapshot = match new_thread_id { Some(thread_id) => { session @@ -175,7 +215,6 @@ impl ToolHandler for Handler { .into(), ) .await; - let _ = result?; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( "codex.multi_agent.spawn", @@ -203,6 +242,7 @@ struct SpawnAgentArgs { task_name: String, agent_type: Option, model: Option, + model_fallback_list: Option>, reasoning_effort: Option, #[serde(default)] fork_context: bool, diff --git a/codex-rs/tools/src/agent_tool.rs b/codex-rs/tools/src/agent_tool.rs index 87f5bbd9e2..75747c8fd9 100644 --- a/codex-rs/tools/src/agent_tool.rs +++ b/codex-rs/tools/src/agent_tool.rs @@ -545,6 +545,27 @@ fn create_collab_input_items_schema() -> JsonSchema { } fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap { + let model_fallback_item_properties = BTreeMap::from([ + ( + "model".to_string(), + JsonSchema::String { + description: Some( + "Model to try. Must be a model slug from the current model picker list." + .to_string(), + ), + }, + ), + ( + "reasoning_effort".to_string(), + JsonSchema::String { + description: Some( + "Optional reasoning effort override for this candidate. Replaces the inherited reasoning effort." + .to_string(), + ), + }, + ), + ]); + BTreeMap::from([ ( "message".to_string(), @@ -580,6 +601,20 @@ fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap