adding a test and renaming jobs to tasks

This commit is contained in:
pap
2025-07-22 16:06:21 -07:00
parent bb0befc8db
commit ed9de08465
8 changed files with 262 additions and 151 deletions

View File

@@ -1,6 +1,7 @@
use std::fs::File;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::io::Write; // added for write_all / flush
use anyhow::Context;
use codex_common::ApprovalModeCliArg;
@@ -98,29 +99,41 @@ pub fn maybe_spawn_concurrent(
};
// Unique job id for this concurrent run (used for log file naming instead of slug).
let job_id = uuid::Uuid::new_v4().to_string();
let task_id = uuid::Uuid::new_v4().to_string();
// Prepare log file path early so we can write pre-spawn logs (e.g. worktree creation output) into it.
let log_dir = match codex_base_dir() {
Ok(base) => {
let d = base.join("log");
let _ = std::fs::create_dir_all(&d);
d
}
Err(_) => PathBuf::from("/tmp"),
};
let log_path = log_dir.join(format!("codex-logs-{}.log", task_id));
// If user did NOT specify an explicit cwd, create an isolated git worktree.
let mut created_worktree: Option<(PathBuf, String)> = None; // (path, branch)
let mut original_branch: Option<String> = None;
let mut original_commit: Option<String> = None;
let mut pre_spawn_logs = String::new();
if tui_cli.cwd.is_none() {
// Capture original branch & commit (best-effort).
original_branch = git_capture(["rev-parse", "--abbrev-ref", "HEAD"]).ok();
original_commit = git_capture(["rev-parse", "HEAD"]).ok();
// Use branch_name_effective for branch/worktree name.
match create_concurrent_worktree(&branch_name_effective) {
Ok(Some((worktree_path, branch_name))) => {
println!(
"Created git worktree at {} (branch {}) for concurrent run",
worktree_path.display(), branch_name
);
Ok(Some(info)) => {
exec_args.push("--cd".into());
exec_args.push(worktree_path.display().to_string());
created_worktree = Some((worktree_path, branch_name));
exec_args.push(info.worktree_path.display().to_string());
created_worktree = Some((info.worktree_path, info.branch_name.clone()));
// Keep the original git output plus a concise created line (for log file only).
pre_spawn_logs.push_str(&info.logs);
pre_spawn_logs.push_str(&format!(
"Created git worktree at {} (branch {}) for concurrent run\n",
created_worktree.as_ref().unwrap().0.display(), info.branch_name
));
}
Ok(None) => {
eprintln!("Warning: Not a git repository (skipping worktree creation); running in current directory.");
// Silence console noise: do not warn here to keep stdout clean; we still proceed.
}
Err(e) => {
eprintln!("Error: failed to create git worktree for --concurrent: {e}");
@@ -136,16 +149,20 @@ pub fn maybe_spawn_concurrent(
// Prompt (safe to unwrap due to earlier validation).
if let Some(prompt) = tui_cli.prompt.clone() { exec_args.push(prompt); }
// Prepare log file path using stable job id (UUID) rather than prompt slug.
let log_dir = match codex_base_dir() {
Ok(base) => {
let d = base.join("log");
let _ = std::fs::create_dir_all(&d);
d
// Create (or truncate) the log file and write any pre-spawn logs we captured.
let file = match File::create(&log_path) {
Ok(mut f) => {
if !pre_spawn_logs.is_empty() {
let _ = f.write_all(pre_spawn_logs.as_bytes());
let _ = f.flush();
}
f
}
Err(e) => {
eprintln!("Failed to create log file {}: {e}. Falling back to interactive mode.", log_path.display());
return Ok(false);
}
Err(_) => PathBuf::from("/tmp"),
};
let log_path = log_dir.join(format!("codex-logs-{}.log", job_id));
match File::create(&log_path) {
Ok(file) => {
@@ -164,26 +181,18 @@ pub fn maybe_spawn_concurrent(
if let Some(oc) = &original_commit { cmd.env("CODEX_ORIGINAL_COMMIT", oc); }
if let Ok(orig_root) = std::env::current_dir() { cmd.env("CODEX_ORIGINAL_ROOT", orig_root); }
}
// Provide job id so child process can emit token_count updates to tasks.jsonl.
cmd.env("CODEX_JOB_ID", &job_id);
// Provide task id so child process can emit token_count updates to tasks.jsonl.
cmd.env("CODEX_TASK_ID", &task_id);
cmd.stdout(Stdio::from(file));
if let Some(f2) = file_err { cmd.stderr(Stdio::from(f2)); }
match cmd.spawn() {
Ok(child) => {
if let Some((wt_path, wt_branch)) = &created_worktree {
println!(
"Background Codex exec started in worktree. PID={} job_id={} log={} worktree={} branch={} original_branch={} automerge={}",
child.id(), job_id, log_path.display(), wt_path.display(), wt_branch,
original_branch.as_deref().unwrap_or("?"), effective_automerge
);
} else {
println!(
"Background Codex exec started. PID={} job_id={} log={} automerge={}",
child.id(), job_id, log_path.display(), effective_automerge
);
}
// Previous verbose status lines replaced with a single short id output for scripting.
let _ = child;
let short_task_id = &task_id[..8];
println!("{}", short_task_id);
// Record job metadata to CODEX_HOME/jobs.jsonl (JSON Lines file).
// Record task metadata to CODEX_HOME/tasks.jsonl (JSON Lines file).
let record_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
@@ -191,7 +200,7 @@ pub fn maybe_spawn_concurrent(
if let Ok(base) = codex_base_dir() {
let tasks_path = base.join("tasks.jsonl");
let record = serde_json::json!({
"job_id": job_id,
"task_id": task_id,
"pid": child.id(),
"worktree": created_worktree.as_ref().map(|(p, _)| p.display().to_string()),
"branch": created_worktree.as_ref().map(|(_, b)| b.clone()),
@@ -246,9 +255,9 @@ fn codex_base_dir() -> anyhow::Result<PathBuf> {
Ok(base)
}
/// Attempt to create a git worktree for an isolated concurrent run.
/// Returns Ok(Some((worktree_path, branch_name))) on success, Ok(None) if not a git repo, and Err on failure.
fn create_concurrent_worktree(branch_name: &str) -> anyhow::Result<Option<(PathBuf, String)>> {
/// Attempt to create a git worktree for an isolated concurrent run capturing git output.
struct WorktreeInfo { worktree_path: PathBuf, branch_name: String, logs: String }
fn create_concurrent_worktree(branch_name: &str) -> anyhow::Result<Option<WorktreeInfo>> {
// Determine repository root.
let output = Command::new("git").arg("rev-parse").arg("--show-toplevel").output();
let repo_root = match output {
@@ -285,16 +294,15 @@ fn create_concurrent_worktree(branch_name: &str) -> anyhow::Result<Option<(PathB
std::fs::create_dir_all(&base_dir)?;
let mut worktree_path = base_dir.join(branch_name.replace('/', "-"));
// Ensure uniqueness if path already exists.
if worktree_path.exists() {
for i in 1..1000 { // arbitrary cap
for i in 1..1000 {
let candidate = base_dir.join(format!("{}-{}", branch_name.replace('/', "-"), i));
if !candidate.exists() { worktree_path = candidate; break; }
}
}
// Run: git worktree add -b <branch_name> <path> HEAD
let status = Command::new("git")
// Run git worktree add capturing output (stdout+stderr).
let add_out = Command::new("git")
.current_dir(&repo_root)
.arg("worktree")
.arg("add")
@@ -302,13 +310,15 @@ fn create_concurrent_worktree(branch_name: &str) -> anyhow::Result<Option<(PathB
.arg(&branch_name)
.arg(&worktree_path)
.arg("HEAD")
.status()?;
if !status.success() {
anyhow::bail!("git worktree add failed with status {status}");
.output()?;
if !add_out.status.success() {
anyhow::bail!("git worktree add failed with status {}", add_out.status);
}
let mut logs = String::new();
if !add_out.stdout.is_empty() { logs.push_str(&String::from_utf8_lossy(&add_out.stdout)); }
if !add_out.stderr.is_empty() { logs.push_str(&String::from_utf8_lossy(&add_out.stderr)); }
Ok(Some((worktree_path, branch_name.to_string())))
Ok(Some(WorktreeInfo { worktree_path, branch_name: branch_name.to_string(), logs }))
}
/// Helper: capture trimmed stdout of a git command.

View File

@@ -7,7 +7,7 @@ use std::fs;
#[derive(Debug, Parser)]
pub struct InspectCli {
/// Job identifier (full/short job id or exact branch name)
/// Task identifier (full/short task id or exact branch name)
pub id: String,
/// Output JSON instead of human table
#[arg(long)]
@@ -16,7 +16,7 @@ pub struct InspectCli {
#[derive(Debug, Deserialize)]
struct RawRecord {
job_id: Option<String>,
task_id: Option<String>,
pid: Option<u64>,
worktree: Option<String>,
branch: Option<String>,
@@ -36,8 +36,8 @@ struct RawRecord {
}
#[derive(Debug, serde::Serialize, Default, Clone)]
struct JobFull {
job_id: String,
struct TaskFull {
task_id: String,
pid: Option<u64>,
branch: Option<String>,
worktree: Option<String>,
@@ -61,25 +61,25 @@ struct JobFull {
pub fn run_inspect(cli: InspectCli) -> anyhow::Result<()> {
let id = cli.id.to_lowercase();
let jobs = load_job_records()?;
let matches: Vec<JobFull> = jobs
let tasks = load_task_records()?;
let matches: Vec<TaskFull> = tasks
.into_iter()
.filter(|j| j.job_id.starts_with(&id) || j.branch.as_deref().map(|b| b == id).unwrap_or(false))
.filter(|t| t.task_id.starts_with(&id) || t.branch.as_deref().map(|b| b == id).unwrap_or(false))
.collect();
if matches.is_empty() {
eprintln!("No job matches identifier '{}'.", id);
eprintln!("No task matches identifier '{}'.", id);
return Ok(());
}
if matches.len() > 1 {
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.job_id[..8]).collect::<Vec<_>>().join(", "));
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.task_id[..8]).collect::<Vec<_>>().join(", "));
return Ok(());
}
let job = &matches[0];
let task = &matches[0];
if cli.json {
println!("{}", serde_json::to_string_pretty(job)?);
println!("{}", serde_json::to_string_pretty(task)?);
return Ok(());
}
print_human(job);
print_human(task);
Ok(())
}
@@ -89,8 +89,8 @@ fn base_dir() -> Option<PathBuf> {
Some(PathBuf::from(home).join(".codex"))
}
fn load_job_records() -> anyhow::Result<Vec<JobFull>> {
let mut map: std::collections::HashMap<String, JobFull> = std::collections::HashMap::new();
fn load_task_records() -> anyhow::Result<Vec<TaskFull>> {
let mut map: std::collections::HashMap<String, TaskFull> = std::collections::HashMap::new();
let Some(base) = base_dir() else { return Ok(vec![]); };
let tasks = base.join("tasks.jsonl");
if !tasks.exists() { return Ok(vec![]); }
@@ -101,8 +101,8 @@ fn load_job_records() -> anyhow::Result<Vec<JobFull>> {
if line.trim().is_empty() { continue; }
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
let Some(job_id) = rec.job_id.clone() else { continue };
let entry = map.entry(job_id.clone()).or_insert_with(|| JobFull { job_id: job_id.clone(), ..Default::default() });
let Some(task_id) = rec.task_id.clone() else { continue };
let entry = map.entry(task_id.clone()).or_insert_with(|| TaskFull { task_id: task_id.clone(), ..Default::default() });
// Initial metadata fields
if rec.start_time.is_some() {
entry.pid = rec.pid.or(entry.pid);
@@ -130,32 +130,32 @@ fn load_job_records() -> anyhow::Result<Vec<JobFull>> {
}
}
// Compute duration
for j in map.values_mut() {
if let (Some(s), Some(e)) = (j.start_time, j.end_time) { j.duration_secs = Some(e.saturating_sub(s)); }
for t in map.values_mut() {
if let (Some(s), Some(e)) = (t.start_time, t.end_time) { t.duration_secs = Some(e.saturating_sub(s)); }
}
Ok(map.into_values().collect())
}
fn print_human(job: &JobFull) {
println!("Job {}", job.job_id);
println!("State: {}", job.state.as_deref().unwrap_or("?"));
if let Some(model) = &job.model { println!("Model: {}", model); } else { println!("Model: {}", resolve_default_model()); }
if let Some(branch) = &job.branch { println!("Branch: {}", branch); }
if let Some(wt) = &job.worktree { println!("Worktree: {}", wt); }
if let Some(ob) = &job.original_branch { println!("Original branch: {}", ob); }
if let Some(oc) = &job.original_commit { println!("Original commit: {}", oc); }
if let Some(start) = job.start_time { println!("Start: {}", format_epoch(start)); }
if let Some(end) = job.end_time { println!("End: {}", format_epoch(end)); }
if let Some(d) = job.duration_secs { println!("Duration: {}s", d); }
if let Some(pid) = job.pid { println!("PID: {}", pid); }
if let Some(log) = &job.log_path { println!("Log: {}", log); }
if let Some(am) = job.automerge { println!("Automerge: {}", am); }
if let Some(exp) = &job.explicit_branch_name { println!("Explicit branch name: {}", exp); }
if let Some(total) = job.total_tokens { println!("Total tokens: {}", total); }
if job.input_tokens.is_some() || job.output_tokens.is_some() {
println!(" Input: {:?} Output: {:?} Reasoning: {:?}", job.input_tokens, job.output_tokens, job.reasoning_output_tokens);
fn print_human(task: &TaskFull) {
println!("Task {}", task.task_id);
println!("State: {}", task.state.as_deref().unwrap_or("?"));
if let Some(model) = &task.model { println!("Model: {}", model); } else { println!("Model: {}", resolve_default_model()); }
if let Some(branch) = &task.branch { println!("Branch: {}", branch); }
if let Some(wt) = &task.worktree { println!("Worktree: {}", wt); }
if let Some(ob) = &task.original_branch { println!("Original branch: {}", ob); }
if let Some(oc) = &task.original_commit { println!("Original commit: {}", oc); }
if let Some(start) = task.start_time { println!("Start: {}", format_epoch(start)); }
if let Some(end) = task.end_time { println!("End: {}", format_epoch(end)); }
if let Some(d) = task.duration_secs { println!("Duration: {}s", d); }
if let Some(pid) = task.pid { println!("PID: {}", pid); }
if let Some(log) = &task.log_path { println!("Log: {}", log); }
if let Some(am) = task.automerge { println!("Automerge: {}", am); }
if let Some(exp) = &task.explicit_branch_name { println!("Explicit branch name: {}", exp); }
if let Some(total) = task.total_tokens { println!("Total tokens: {}", total); }
if task.input_tokens.is_some() || task.output_tokens.is_some() {
println!(" Input: {:?} Output: {:?} Reasoning: {:?}", task.input_tokens, task.output_tokens, task.reasoning_output_tokens);
}
if let Some(p) = &job.prompt { println!("Prompt:\n{}", p); }
if let Some(p) = &task.prompt { println!("Prompt:\n{}", p); }
}
fn format_epoch(secs: u64) -> String {

View File

@@ -3,7 +3,7 @@ pub mod debug_sandbox;
mod exit_status;
pub mod login;
pub mod proto;
pub mod jobs;
pub mod tasks;
pub mod logs;
pub mod inspect;

View File

@@ -9,7 +9,7 @@ use std::time::Duration;
#[derive(Debug, Parser)]
pub struct LogsCli {
/// Job identifier: full/short job UUID or branch name
/// Task identifier: full/short task UUID or branch name
pub id: String,
/// Follow log output (stream new lines)
#[arg(short = 'f', long = "follow")]
@@ -21,15 +21,15 @@ pub struct LogsCli {
#[derive(Debug, Deserialize)]
struct RawRecord {
job_id: Option<String>,
task_id: Option<String>,
branch: Option<String>,
log_path: Option<String>,
start_time: Option<u64>,
}
#[derive(Debug, Clone)]
struct JobMeta {
job_id: String,
struct TaskMeta {
task_id: String,
branch: Option<String>,
log_path: String,
start_time: Option<u64>,
@@ -37,27 +37,27 @@ struct JobMeta {
pub fn run_logs(cli: LogsCli) -> anyhow::Result<()> {
let id = cli.id.to_lowercase();
let jobs = load_jobs_index()?;
if jobs.is_empty() {
eprintln!("No jobs found in tasks.jsonl");
let tasks = load_tasks_index()?;
if tasks.is_empty() {
eprintln!("No tasks found in tasks.jsonl");
return Ok(());
}
let matches: Vec<&JobMeta> = jobs
let matches: Vec<&TaskMeta> = tasks
.values()
.filter(|meta| {
meta.job_id.starts_with(&id) || meta.branch.as_deref().map(|b| b == id).unwrap_or(false)
meta.task_id.starts_with(&id) || meta.branch.as_deref().map(|b| b == id).unwrap_or(false)
})
.collect();
if matches.is_empty() {
eprintln!("No job matches identifier '{}'.", id);
eprintln!("No task matches identifier '{}'.", id);
return Ok(());
}
if matches.len() > 1 {
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.job_id[..8]).collect::<Vec<_>>().join(", "));
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.task_id[..8]).collect::<Vec<_>>().join(", "));
return Ok(());
}
let job = matches[0];
let path = PathBuf::from(&job.log_path);
let task = matches[0];
let path = PathBuf::from(&task.log_path);
if !path.exists() {
eprintln!("Log file not found at {}", path.display());
return Ok(());
@@ -77,8 +77,8 @@ fn base_dir() -> Option<PathBuf> {
Some(PathBuf::from(home).join(".codex"))
}
fn load_jobs_index() -> anyhow::Result<HashMap<String, JobMeta>> {
let mut map: HashMap<String, JobMeta> = HashMap::new();
fn load_tasks_index() -> anyhow::Result<HashMap<String, TaskMeta>> {
let mut map: HashMap<String, TaskMeta> = HashMap::new();
let Some(base) = base_dir() else { return Ok(map); };
let tasks = base.join("tasks.jsonl");
if !tasks.exists() { return Ok(map); }
@@ -89,10 +89,10 @@ fn load_jobs_index() -> anyhow::Result<HashMap<String, JobMeta>> {
if line.trim().is_empty() { continue; }
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
let (Some(job_id), Some(log_path)) = (rec.job_id.clone(), rec.log_path.clone()) else { continue };
let (Some(task_id), Some(log_path)) = (rec.task_id.clone(), rec.log_path.clone()) else { continue };
// Insert or update only if not already present (we just need initial metadata)
map.entry(job_id.clone()).or_insert(JobMeta {
job_id,
map.entry(task_id.clone()).or_insert(TaskMeta {
task_id,
branch: rec.branch,
log_path,
start_time: rec.start_time,

View File

@@ -78,13 +78,13 @@ enum Subcommand {
#[clap(visible_alias = "a")]
Apply(ApplyCommand),
/// Manage / inspect concurrent background jobs.
Jobs(codex_cli::jobs::JobsCli),
/// Manage / inspect concurrent background tasks.
Tasks(codex_cli::tasks::TasksCli),
/// Show or follow logs for a specific job.
/// Show or follow logs for a specific task.
Logs(codex_cli::logs::LogsCli),
/// Inspect full metadata for a job.
/// Inspect full metadata for a task.
Inspect(codex_cli::inspect::InspectCli),
}
@@ -185,8 +185,8 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
prepend_config_flags(&mut apply_cli.config_overrides, cli.config_overrides);
run_apply_command(apply_cli).await?;
}
Some(Subcommand::Jobs(jobs_cli)) => {
codex_cli::jobs::run_jobs(jobs_cli)?;
Some(Subcommand::Tasks(tasks_cli)) => {
codex_cli::tasks::run_tasks(tasks_cli)?;
}
Some(Subcommand::Logs(logs_cli)) => {
codex_cli::logs::run_logs(logs_cli)?;

View File

@@ -6,26 +6,26 @@ use std::io::{BufRead, BufReader};
use std::fs;
#[derive(Debug, Parser)]
pub struct JobsCli {
pub struct TasksCli {
#[command(subcommand)]
pub cmd: JobsCommand,
pub cmd: TasksCommand,
}
#[derive(Debug, Subcommand)]
pub enum JobsCommand {
/// List background concurrent jobs (from ~/.codex/tasks.jsonl)
Ls(JobsListArgs),
pub enum TasksCommand {
/// List background concurrent tasks (from ~/.codex/tasks.jsonl)
Ls(TasksListArgs),
}
#[derive(Debug, Parser)]
pub struct JobsListArgs {
pub struct TasksListArgs {
/// Output raw JSON instead of table
#[arg(long)]
pub json: bool,
/// Limit number of jobs displayed (most recent first)
/// Limit number of tasks displayed (most recent first)
#[arg(long)]
pub limit: Option<usize>,
/// Show completed jobs as well (by default only running jobs)
/// Show completed tasks as well (by default only running tasks)
#[arg(short = 'a', long = "all")]
pub all: bool,
/// Show all columns including prompt text
@@ -35,7 +35,7 @@ pub struct JobsListArgs {
#[derive(Debug, Deserialize)]
struct RawRecord {
job_id: Option<String>,
task_id: Option<String>,
pid: Option<u64>,
worktree: Option<String>,
branch: Option<String>,
@@ -53,8 +53,8 @@ struct RawRecord {
}
#[derive(Debug, Serialize, Default, Clone)]
struct JobAggregate {
job_id: String,
struct TaskAggregate {
task_id: String,
pid: Option<u64>,
branch: Option<String>,
worktree: Option<String>,
@@ -67,9 +67,9 @@ struct JobAggregate {
end_time: Option<u64>,
}
pub fn run_jobs(cmd: JobsCli) -> anyhow::Result<()> {
pub fn run_tasks(cmd: TasksCli) -> anyhow::Result<()> {
match cmd.cmd {
JobsCommand::Ls(args) => list_jobs(args),
TasksCommand::Ls(args) => list_tasks(args),
}
}
@@ -80,28 +80,28 @@ fn base_dir() -> Option<std::path::PathBuf> {
Some(base)
}
fn list_jobs(args: JobsListArgs) -> anyhow::Result<()> {
fn list_tasks(args: TasksListArgs) -> anyhow::Result<()> {
let Some(base) = base_dir() else {
println!("No home directory found; cannot locate tasks.jsonl");
return Ok(());
};
let path = base.join("tasks.jsonl");
if !path.exists() {
println!("No tasks.jsonl found (no concurrent jobs recorded yet)");
println!("No tasks.jsonl found (no concurrent tasks recorded yet)");
return Ok(());
}
let f = File::open(&path)?;
let reader = BufReader::new(f);
let mut agg: HashMap<String, JobAggregate> = HashMap::new();
let mut agg: HashMap<String, TaskAggregate> = HashMap::new();
for line_res in reader.lines() {
let line = match line_res { Ok(l) => l, Err(_) => continue };
if line.trim().is_empty() { continue; }
let raw: serde_json::Value = match serde_json::from_str(&line) { Ok(v) => v, Err(_) => continue };
let rec: RawRecord = match serde_json::from_value(raw) { Ok(r) => r, Err(_) => continue };
let Some(job_id) = rec.job_id.clone() else { continue }; // must have job_id
let entry = agg.entry(job_id.clone()).or_insert_with(|| JobAggregate { job_id: job_id.clone(), ..Default::default() });
let Some(task_id) = rec.task_id.clone() else { continue }; // must have task_id
let entry = agg.entry(task_id.clone()).or_insert_with(|| TaskAggregate { task_id: task_id.clone(), ..Default::default() });
if rec.start_time.is_some() { // initial metadata line
entry.pid = rec.pid.or(entry.pid);
entry.branch = rec.branch.or(entry.branch.clone());
@@ -119,48 +119,48 @@ fn list_jobs(args: JobsListArgs) -> anyhow::Result<()> {
}
// Collect and sort by start_time desc
let mut jobs: Vec<JobAggregate> = agg.into_values().collect();
jobs.sort_by_key(|j| std::cmp::Reverse(j.start_time.unwrap_or(0)));
let mut tasks: Vec<TaskAggregate> = agg.into_values().collect();
tasks.sort_by_key(|j| std::cmp::Reverse(j.start_time.unwrap_or(0)));
if !args.all { jobs.retain(|j| j.state.as_deref() != Some("done")); }
if let Some(limit) = args.limit { jobs.truncate(limit); }
if !args.all { tasks.retain(|j| j.state.as_deref() != Some("done")); }
if let Some(limit) = args.limit { tasks.truncate(limit); }
if args.json {
println!("{}", serde_json::to_string_pretty(&jobs)?);
println!("{}", serde_json::to_string_pretty(&tasks)?);
return Ok(());
}
if jobs.is_empty() {
println!("No jobs found");
if tasks.is_empty() {
println!("No tasks found");
return Ok(());
}
// Table header
if args.all_columns {
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", "JOB_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL", "PROMPT");
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", "TASK_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL", "PROMPT");
} else {
// Widened branch column to 22 chars for better readability.
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", "JOB_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL");
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", "TASK_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL");
}
for j in jobs {
let job_short = if j.job_id.len() > 8 { &j.job_id[..8] } else { &j.job_id };
let pid_str = j.pid.map(|p| p.to_string()).unwrap_or_default();
let mut branch = j.branch.clone().unwrap_or_default();
for t in tasks {
let task_short = if t.task_id.len() > 8 { &t.task_id[..8] } else { &t.task_id };
let pid_str = t.pid.map(|p| p.to_string()).unwrap_or_default();
let mut branch = t.branch.clone().unwrap_or_default();
let branch_limit = if args.all_columns { 22 } else { 22 }; // unified width
if branch.len() > branch_limit { branch.truncate(branch_limit); }
let start = j.start_time.map(format_epoch_short).unwrap_or_default();
let tokens = j.total_tokens.map(|t| t.to_string()).unwrap_or_default();
let state = j.state.clone().unwrap_or_else(|| "?".into());
let mut model = j.model.clone().unwrap_or_default();
let start = t.start_time.map(format_epoch_short).unwrap_or_default();
let tokens = t.total_tokens.map(|t| t.to_string()).unwrap_or_default();
let state = t.state.clone().unwrap_or_else(|| "?".into());
let mut model = t.model.clone().unwrap_or_default();
if model.trim().is_empty() { model = resolve_default_model(); }
if model.is_empty() { model.push('-'); }
if model.len() > 12 { model.truncate(12); }
if args.all_columns {
let mut prompt = j.prompt.clone().unwrap_or_default().replace('\n', " ");
let mut prompt = t.prompt.clone().unwrap_or_default().replace('\n', " ");
if prompt.len() > 60 { prompt.truncate(60); }
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", job_short, pid_str, branch, start, state, tokens, model, prompt);
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", task_short, pid_str, branch, start, state, tokens, model, prompt);
} else {
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", job_short, pid_str, branch, start, state, tokens, model);
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", task_short, pid_str, branch, start, state, tokens, model);
}
}

View File

@@ -0,0 +1,101 @@
// Minimal integration test for --concurrent background spawning.
// Verifies that invoking the top-level CLI with --concurrent records a task entry
// in CODEX_HOME/tasks.jsonl and that multiple invocations append distinct task_ids.
use std::fs;
use std::io::Write;
use std::process::Command;
use std::time::{Duration, Instant};
use tempfile::TempDir;
// Skip helper when sandbox network disabled (mirrors existing tests' behavior).
fn network_disabled() -> bool {
std::env::var(codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
}
#[test]
fn concurrent_creates_task_records() {
if network_disabled() {
eprintln!("Skipping concurrent_creates_task_records due to sandbox network-disabled env");
return;
}
// Temp home (CODEX_HOME) and separate temp git repo.
let home = TempDir::new().expect("temp home");
let repo = TempDir::new().expect("temp repo");
// Initialize a minimal git repository (needed for --concurrent worktree logic).
assert!(Command::new("git").arg("init").current_dir(repo.path()).status().unwrap().success());
fs::write(repo.path().join("README.md"), "# temp\n").unwrap();
assert!(Command::new("git").arg("add").arg(".").current_dir(repo.path()).status().unwrap().success());
assert!(Command::new("git")
.args(["commit", "-m", "init"]) // may warn about user/email; allow non-zero if commit already exists
.current_dir(repo.path())
.status()
.map(|s| s.success())
.unwrap_or(true));
// SSE fixture so the spawned background exec does not perform a real network call.
let fixture = home.path().join("fixture.sse");
let mut f = fs::File::create(&fixture).unwrap();
writeln!(f, "data: {{\"choices\":[{{\"delta\":{{\"content\":\"ok\"}}}}]}}\n").unwrap();
writeln!(f, "data: {{\"choices\":[{{\"delta\":{{}}}}]}}\n").unwrap();
writeln!(f, "data: [DONE]\n").unwrap();
// Helper to run one concurrent invocation with a given prompt.
let run_once = |prompt: &str| {
let mut cmd = Command::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("--concurrent")
.arg("--full-auto")
.arg("-C")
.arg(repo.path())
.arg(prompt);
cmd.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local");
let output = cmd.output().expect("spawn codex");
assert!(output.status.success(), "concurrent codex run failed: stderr={}", String::from_utf8_lossy(&output.stderr));
};
run_once("Add a cat in ASCII");
run_once("Add hello world comment");
// Wait for tasks.jsonl to contain at least two lines with task records.
let tasks_path = home.path().join("tasks.jsonl");
let deadline = Instant::now() + Duration::from_secs(10);
let mut lines: Vec<String> = Vec::new();
while Instant::now() < deadline {
if tasks_path.exists() {
let content = fs::read_to_string(&tasks_path).unwrap_or_default();
lines = content.lines().filter(|l| !l.trim().is_empty()).map(|s| s.to_string()).collect();
if lines.len() >= 2 { break; }
}
std::thread::sleep(Duration::from_millis(100));
}
assert!(lines.len() >= 2, "Expected at least 2 task records, got {}", lines.len());
// Parse JSON and ensure distinct task_ids and prompts present.
let mut task_ids = std::collections::HashSet::new();
let mut saw_cat = false;
let mut saw_hello = false;
for line in &lines {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(tid) = val.get("task_id").and_then(|v| v.as_str()) { task_ids.insert(tid.to_string()); }
if let Some(p) = val.get("prompt").and_then(|v| v.as_str()) {
if p.contains("cat") { saw_cat = true; }
if p.contains("hello") { saw_hello = true; }
}
assert_eq!(val.get("state").and_then(|v| v.as_str()), Some("started"), "task record missing started state");
}
}
assert!(task_ids.len() >= 2, "Expected distinct task_ids, got {:?}", task_ids);
assert!(saw_cat, "Did not find cat prompt in tasks.jsonl");
assert!(saw_hello, "Did not find hello prompt in tasks.jsonl");
}

View File

@@ -192,7 +192,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
EventMsg::TaskComplete(_) => {
// On completion, append a final state entry with last token count snapshot.
if let Ok(job_id) = std::env::var("CODEX_JOB_ID") {
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
@@ -207,7 +207,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
"total_tokens": u.total_tokens,
}));
let mut obj = serde_json::json!({
"job_id": job_id,
"task_id": task_id,
"completion_time": ts,
"end_time": ts,
"state": "done",
@@ -220,7 +220,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::TokenCount(token_usage_full) => {
self.last_token_usage = Some(token_usage_full.clone());
ts_println!(self, "tokens used: {}", token_usage_full.total_tokens);
if let Ok(job_id) = std::env::var("CODEX_JOB_ID") {
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
@@ -228,7 +228,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
.map(|d| d.as_secs())
.unwrap_or(0);
let full = serde_json::json!({
"job_id": job_id,
"task_id": task_id,
"update_time": ts,
"token_count": {
"input_tokens": token_usage_full.input_tokens,