Add resume_agent collab tool (#10903)

Summary
- add the new resume_agent collab tool path through core, protocol, and
the app server API, including the resume events
- update the schema/TypeScript definitions plus docs so resume_agent
appears in generated artifacts and README
- note that resumed agents rehydrate rollout history without overwriting
their base instructions

Testing
- Not run (not requested)
This commit is contained in:
jif-oai
2026-02-07 17:31:45 +01:00
committed by GitHub
parent 4cd0c42a28
commit 62605fa471
38 changed files with 1456 additions and 13 deletions

View File

@@ -5,7 +5,9 @@ use crate::error::Result as CodexResult;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
@@ -39,7 +41,7 @@ impl AgentControl {
&self,
config: crate::config::Config,
prompt: String,
session_source: Option<codex_protocol::protocol::SessionSource>,
session_source: Option<SessionSource>,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
@@ -65,6 +67,32 @@ impl AgentControl {
Ok(new_thread.thread_id)
}
/// Resume an existing agent thread from a recorded rollout file.
pub(crate) async fn resume_agent_from_rollout(
&self,
config: crate::config::Config,
rollout_path: PathBuf,
session_source: SessionSource,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let resumed_thread = state
.resume_thread_from_rollout_with_source(
config,
rollout_path,
self.clone(),
session_source,
)
.await?;
reservation.commit(resumed_thread.thread_id);
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
Ok(resumed_thread.thread_id)
}
/// Send a `user` prompt to an existing agent thread.
pub(crate) async fn send_prompt(
&self,
@@ -287,6 +315,24 @@ mod tests {
);
}
#[tokio::test]
async fn resume_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.resume_agent_from_rollout(
config,
PathBuf::from("/tmp/missing-rollout.jsonl"),
SessionSource::Exec,
)
.await
.expect_err("resume_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn send_prompt_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
@@ -518,4 +564,88 @@ mod tests {
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn resume_agent_respects_max_threads_limit() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let resumable_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.await
.expect("spawn_agent should succeed");
let rollout_path = manager
.get_thread(resumable_id)
.await
.expect("thread should exist")
.rollout_path()
.expect("rollout path should exist");
let _ = control
.shutdown_agent(resumable_id)
.await
.expect("shutdown resumable thread");
let active_id = control
.spawn_agent(config.clone(), "occupy".to_string(), None)
.await
.expect("spawn_agent should succeed for active slot");
let err = control
.resume_agent_from_rollout(config, rollout_path, SessionSource::Exec)
.await
.expect_err("resume should respect max threads");
let CodexErr::AgentLimitReached {
max_threads: seen_max_threads,
} = err
else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(seen_max_threads, max_threads);
let _ = control
.shutdown_agent(active_id)
.await
.expect("shutdown active thread");
}
#[tokio::test]
async fn resume_agent_releases_slot_after_resume_failure() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let missing_rollout = config.codex_home.join("sessions/missing-rollout.jsonl");
let _ = control
.resume_agent_from_rollout(config.clone(), missing_rollout, SessionSource::Exec)
.await
.expect_err("resume should fail for missing rollout path");
let resumed_id = control
.spawn_agent(config, "hello".to_string(), None)
.await
.expect("spawn should succeed after failed resume");
let _ = control
.shutdown_agent(resumed_id)
.await
.expect("shutdown resumed thread");
}
}