Agent jobs (spawn_agents_on_csv) + progress UI (#10935)

## Summary
- Add agent job support: spawn a batch of sub-agents from CSV, auto-run,
auto-export, and store results in SQLite.
- Simplify workflow: remove run/resume/get-status/export tools; spawn is
deterministic and completes in one call.
- Improve exec UX: stable, single-line progress bar with ETA; suppress
sub-agent chatter in exec.

## Why
Enables map-reduce style workflows over arbitrarily large repos using
the existing Codex orchestrator. This addresses review feedback about
overly complex job controls and non-deterministic monitoring.

## Demo (progress bar)
```
./codex-rs/target/debug/codex exec \
  --enable collab \
  --enable sqlite \
  --full-auto \
  --progress-cursor \
  -c agents.max_threads=16 \
  -C /Users/daveaitel/code/codex \
  - <<'PROMPT'
Create /tmp/agent_job_progress_demo.csv with columns: path,area and 30 rows:
path = item-01..item-30, area = test.

Then call spawn_agents_on_csv with:
- csv_path: /tmp/agent_job_progress_demo.csv
- instruction: "Run `python - <<'PY'` to sleep a random 0.3–1.2s, then output JSON with keys: path, score (int). Set score = 1."
- output_csv_path: /tmp/agent_job_progress_demo_out.csv
PROMPT
```

## Review feedback addressed
- Auto-start jobs on spawn; removed run/resume/status/export tools.
- Auto-export on success.
- More descriptive tool spec + clearer prompts.
- Avoid deadlocks on spawn failure; pending/running handled safely.
- Progress bar no longer scrolls; stable single-line redraw.

## Tests
- `cd codex-rs && cargo test -p codex-exec`
- `cd codex-rs && cargo build -p codex-cli`
This commit is contained in:
daveaitel-openai
2026-02-24 16:00:19 -05:00
committed by GitHub
parent bd192b54cd
commit dcab40123f
36 changed files with 3370 additions and 50 deletions

View File

@@ -0,0 +1,424 @@
use anyhow::Result;
use codex_core::features::Feature;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use regex_lite::Regex;
use serde_json::Value;
use serde_json::json;
use std::fs;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use wiremock::Mock;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
struct AgentJobsResponder {
spawn_args_json: String,
seen_main: AtomicBool,
call_counter: AtomicUsize,
}
impl AgentJobsResponder {
fn new(spawn_args_json: String) -> Self {
Self {
spawn_args_json,
seen_main: AtomicBool::new(false),
call_counter: AtomicUsize::new(0),
}
}
}
struct StopAfterFirstResponder {
spawn_args_json: String,
seen_main: AtomicBool,
worker_calls: Arc<AtomicUsize>,
}
impl StopAfterFirstResponder {
fn new(spawn_args_json: String, worker_calls: Arc<AtomicUsize>) -> Self {
Self {
spawn_args_json,
seen_main: AtomicBool::new(false),
worker_calls,
}
}
}
impl Respond for StopAfterFirstResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return sse_response(sse(vec![
ev_response_created("resp-tool"),
ev_completed("resp-tool"),
]));
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
let call_index = self.worker_calls.fetch_add(1, Ordering::SeqCst);
let call_id = format!("call-worker-{call_index}");
let stop = call_index == 0;
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id },
"stop": stop,
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return sse_response(sse(vec![
ev_response_created("resp-worker"),
ev_function_call(&call_id, "report_agent_job_result", &args_json),
ev_completed("resp-worker"),
]));
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return sse_response(sse(vec![
ev_response_created("resp-main"),
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
ev_completed("resp-main"),
]));
}
sse_response(sse(vec![
ev_response_created("resp-default"),
ev_completed("resp-default"),
]))
}
}
impl Respond for AgentJobsResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return sse_response(sse(vec![
ev_response_created("resp-tool"),
ev_completed("resp-tool"),
]));
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
let call_id = format!(
"call-worker-{}",
self.call_counter.fetch_add(1, Ordering::SeqCst)
);
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id }
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return sse_response(sse(vec![
ev_response_created("resp-worker"),
ev_function_call(&call_id, "report_agent_job_result", &args_json),
ev_completed("resp-worker"),
]));
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return sse_response(sse(vec![
ev_response_created("resp-main"),
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
ev_completed("resp-main"),
]));
}
sse_response(sse(vec![
ev_response_created("resp-default"),
ev_completed("resp-default"),
]))
}
}
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
let Some(encoding) = request
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok())
else {
return request.body.clone();
};
if encoding
.split(',')
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
{
zstd::stream::decode_all(std::io::Cursor::new(&request.body))
.unwrap_or_else(|_| request.body.clone())
} else {
request.body.clone()
}
}
fn has_function_call_output(body: &Value) -> bool {
body.get("input")
.and_then(Value::as_array)
.is_some_and(|items| {
items.iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
})
})
}
fn extract_job_and_item(body: &Value) -> Option<(String, String)> {
let texts = message_input_texts(body);
let mut combined = texts.join("\n");
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
combined.push('\n');
combined.push_str(instructions);
}
if !combined.contains("You are processing one item for a generic agent job.") {
return None;
}
let job_id = Regex::new(r"Job ID:\s*([^\n]+)")
.ok()?
.captures(&combined)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().trim().to_string())?;
let item_id = Regex::new(r"Item ID:\s*([^\n]+)")
.ok()?
.captures(&combined)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().trim().to_string())?;
Some((job_id, item_id))
}
fn message_input_texts(body: &Value) -> Vec<String> {
let Some(items) = body.get("input").and_then(Value::as_array) else {
return Vec::new();
};
items
.iter()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str))
.map(str::to_string)
.collect()
}
fn parse_simple_csv_line(line: &str) -> Vec<String> {
line.split(',').map(str::to_string).collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn report_agent_job_result_rejects_wrong_thread() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Collab);
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let input_path = test.cwd_path().join("agent_jobs_wrong_thread.csv");
let output_path = test.cwd_path().join("agent_jobs_wrong_thread_out.csv");
fs::write(&input_path, "path\nfile-1\n")?;
let args = json!({
"csv_path": input_path.display().to_string(),
"instruction": "Return {path}",
"output_csv_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run job").await?;
let db = test.codex.state_db().expect("state db");
let output = fs::read_to_string(&output_path)?;
let rows: Vec<&str> = output.lines().skip(1).collect();
assert_eq!(rows.len(), 1);
let job_id = rows
.first()
.and_then(|line| {
parse_simple_csv_line(line)
.iter()
.find(|value| value.len() == 36)
.cloned()
})
.expect("job_id from csv");
let job = db.get_agent_job(job_id.as_str()).await?.expect("job");
let items = db
.list_agent_job_items(job.id.as_str(), None, Some(10))
.await?;
let item = items.first().expect("item");
let wrong_thread_id = "00000000-0000-0000-0000-000000000000";
let accepted = db
.report_agent_job_item_result(
job.id.as_str(),
item.item_id.as_str(),
wrong_thread_id,
&json!({ "wrong": true }),
)
.await?;
assert!(!accepted);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_csv_runs_and_exports() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Collab);
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let input_path = test.cwd_path().join("agent_jobs_input.csv");
let output_path = test.cwd_path().join("agent_jobs_output.csv");
fs::write(&input_path, "path,area\nfile-1,test\nfile-2,test\n")?;
let args = json!({
"csv_path": input_path.display().to_string(),
"instruction": "Return {path}",
"output_csv_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run batch job").await?;
let output = fs::read_to_string(&output_path)?;
assert!(output.contains("result_json"));
assert!(output.contains("item_id"));
assert!(output.contains("\"item_id\""));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_csv_dedupes_item_ids() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Collab);
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let input_path = test.cwd_path().join("agent_jobs_dupe.csv");
let output_path = test.cwd_path().join("agent_jobs_dupe_out.csv");
fs::write(&input_path, "id,path\nfoo,alpha\nfoo,beta\n")?;
let args = json!({
"csv_path": input_path.display().to_string(),
"instruction": "Return {path}",
"id_column": "id",
"output_csv_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run batch job with duplicate ids").await?;
let output = fs::read_to_string(&output_path)?;
let mut lines = output.lines();
let headers = lines.next().expect("csv headers");
let header_cols = parse_simple_csv_line(headers);
let item_id_index = header_cols
.iter()
.position(|header| header == "item_id")
.expect("item_id column");
let mut item_ids = Vec::new();
for line in lines {
let cols = parse_simple_csv_line(line);
item_ids.push(cols[item_id_index].clone());
}
item_ids.sort();
item_ids.dedup();
assert_eq!(item_ids.len(), 2);
assert!(item_ids.contains(&"foo".to_string()));
assert!(item_ids.contains(&"foo-2".to_string()));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_csv_stop_halts_future_items() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Collab);
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let input_path = test.cwd_path().join("agent_jobs_stop.csv");
let output_path = test.cwd_path().join("agent_jobs_stop_out.csv");
fs::write(&input_path, "path\nfile-1\nfile-2\nfile-3\n")?;
let args = json!({
"csv_path": input_path.display().to_string(),
"instruction": "Return {path}",
"output_csv_path": output_path.display().to_string(),
"max_concurrency": 1,
});
let args_json = serde_json::to_string(&args)?;
let worker_calls = Arc::new(AtomicUsize::new(0));
let responder = StopAfterFirstResponder::new(args_json, worker_calls.clone());
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run job").await?;
let output = fs::read_to_string(&output_path)?;
let rows: Vec<&str> = output.lines().skip(1).collect();
assert_eq!(rows.len(), 3);
let job_id = rows
.first()
.and_then(|line| {
parse_simple_csv_line(line)
.iter()
.find(|value| value.len() == 36)
.cloned()
})
.expect("job_id from csv");
let db = test.codex.state_db().expect("state db");
let job = db.get_agent_job(job_id.as_str()).await?.expect("job");
assert_eq!(job.status, codex_state::AgentJobStatus::Cancelled);
let progress = db.get_agent_job_progress(job_id.as_str()).await?;
assert_eq!(progress.total_items, 3);
assert_eq!(progress.completed_items, 1);
assert_eq!(progress.failed_items, 0);
assert_eq!(progress.running_items, 0);
assert_eq!(progress.pending_items, 2);
assert_eq!(worker_calls.load(Ordering::SeqCst), 1);
Ok(())
}

View File

@@ -56,6 +56,7 @@ pub static CODEX_ALIASES_TEMP_DIR: TestCodexAliasesGuard = unsafe {
#[cfg(not(target_os = "windows"))]
mod abort_tasks;
mod agent_jobs;
mod agent_websocket;
mod apply_patch_cli;
#[cfg(not(target_os = "windows"))]

View File

@@ -52,10 +52,16 @@ async fn wait_for_snapshot(codex_home: &Path) -> Result<PathBuf> {
let snapshot_dir = codex_home.join("shell_snapshots");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if let Ok(mut entries) = fs::read_dir(&snapshot_dir).await
&& let Some(entry) = entries.next_entry().await?
{
return Ok(entry.path());
if let Ok(mut entries) = fs::read_dir(&snapshot_dir).await {
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let Some(extension) = path.extension().and_then(|ext| ext.to_str()) else {
continue;
};
if extension == "sh" || extension == "ps1" {
return Ok(path);
}
}
}
if Instant::now() >= deadline {

View File

@@ -33,7 +33,7 @@ async fn new_thread_is_recorded_in_state_db() -> Result<()> {
let thread_id = test.session_configured.session_id;
let rollout_path = test.codex.rollout_path().expect("rollout path");
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
let db_path = codex_state::state_db_path(test.config.sqlite_home.as_path());
for _ in 0..100 {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
@@ -161,7 +161,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
let test = builder.build(&server).await?;
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
let db_path = codex_state::state_db_path(test.config.sqlite_home.as_path());
let rollout_path = test.config.codex_home.join(&rollout_rel_path);
let default_provider = test.config.model_provider_id.clone();
@@ -220,7 +220,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
});
let test = builder.build(&server).await?;
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
let db_path = codex_state::state_db_path(test.config.sqlite_home.as_path());
for _ in 0..100 {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
break;