Add thread message delivery

This commit is contained in:
Eric Traut
2026-04-09 21:03:57 -07:00
parent a4d5112b37
commit 54f5a4b0fa
51 changed files with 5358 additions and 40 deletions

View File

@@ -17,6 +17,13 @@ use codex_cli::run_login_with_chatgpt;
use codex_cli::run_login_with_device_code;
use codex_cli::run_logout;
use codex_cloud_tasks::Cli as CloudTasksCli;
use codex_core::messages::MessagePayload;
use codex_core::messages::validate_meta_key;
use codex_core::timers::ThreadTimerStorageCreateParams;
use codex_core::timers::ThreadTimerTrigger;
use codex_core::timers::TimerDelivery;
use codex_core::timers::build_thread_timer_create_params;
use codex_core::timers::normalize_thread_timer_dtstart_input;
use codex_exec::Cli as ExecCli;
use codex_exec::Command as ExecCommand;
use codex_exec::ReviewArgs;
@@ -30,7 +37,9 @@ use codex_tui::ExitReason;
use codex_tui::UpdateAction;
use codex_utils_cli::CliConfigOverrides;
use owo_colors::OwoColorize;
use std::collections::BTreeMap;
use std::io::IsTerminal;
use std::path::Path;
use std::path::PathBuf;
use supports_color::Stream;
@@ -53,6 +62,7 @@ use codex_core::config::find_codex_home;
use codex_features::FEATURES;
use codex_features::Stage;
use codex_features::is_known_feature_key;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::user_input::UserInput;
use codex_terminal_detection::TerminalName;
@@ -140,6 +150,9 @@ enum Subcommand {
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
Resume(ResumeCommand),
/// Queue a message to an existing thread.
Queue(QueueCommand),
/// Fork a previous interactive session (picker by default; use --last to fork the most recent).
Fork(ForkCommand),
@@ -217,6 +230,33 @@ struct DebugPromptInputCommand {
images: Vec<PathBuf>,
}
#[derive(Debug, Parser)]
struct QueueCommand {
/// Target thread id.
#[arg(long = "thread", value_name = "THREAD_ID")]
thread: String,
/// Message content.
#[arg(long = "content", value_name = "TEXT")]
content: String,
/// Optional instructions for immediate messages. Not allowed with --at.
#[arg(long = "instructions", value_name = "TEXT")]
instructions: Option<String>,
/// Message metadata as key=value. May be repeated.
#[arg(long = "meta", value_name = "KEY=VALUE")]
meta: Vec<String>,
/// Deliver by steering the current turn when possible.
#[arg(long = "steer")]
steer: bool,
/// Queue a one-shot timer for a local datetime or time, e.g. 2026-04-10T09:30:00 or 09:30.
#[arg(long = "at", value_name = "WHEN")]
at: Option<String>,
}
#[derive(Debug, Parser)]
struct ResumeCommand {
/// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses.
@@ -807,6 +847,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
.await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Queue(queue_cli)) => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
root_remote_auth_token_env.as_deref(),
"queue",
)?;
run_queue_command(queue_cli, &root_config_overrides, &interactive).await?;
}
Some(Subcommand::Fork(ForkCommand {
session_id,
last,
@@ -1274,6 +1322,137 @@ async fn run_debug_clear_memories_command(
Ok(())
}
async fn run_queue_command(
cmd: QueueCommand,
root_config_overrides: &CliConfigOverrides,
interactive: &TuiCli,
) -> anyhow::Result<()> {
validate_queue_command(&cmd)?;
let meta = parse_queue_meta(&cmd.meta)?;
let cli_kv_overrides = root_config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
let overrides = ConfigOverrides {
config_profile: interactive.config_profile.clone(),
..Default::default()
};
let config =
Config::load_with_cli_overrides_and_harness_overrides(cli_kv_overrides, overrides).await?;
let thread_id = resolve_queue_thread_id(config.codex_home.as_path(), &cmd.thread).await?;
let state_db =
StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()).await?;
let delivery = if cmd.steer {
TimerDelivery::SteerCurrentTurn
} else {
TimerDelivery::AfterTurn
};
if let Some(at) = cmd.at {
let dtstart = normalize_thread_timer_dtstart_input(&at).map_err(anyhow::Error::msg)?;
let timer_params = build_thread_timer_create_params(ThreadTimerStorageCreateParams {
thread_id,
source: "external".to_string(),
client_id: "codex-cli".to_string(),
trigger: ThreadTimerTrigger::Schedule {
dtstart: Some(dtstart.clone()),
rrule: None,
},
payload: MessagePayload {
content: cmd.content,
instructions: None,
meta,
},
delivery,
})
.map_err(anyhow::Error::msg)?;
state_db.create_thread_timer(&timer_params).await?;
println!(
"{}",
queue_timer_success_message(
&timer_params.id,
&timer_params.thread_id,
&dtstart,
timer_params.pending_run,
)
);
return Ok(());
}
let meta_json = serde_json::to_string(&meta)?;
let message_params = codex_state::ThreadMessageCreateParams::new(
thread_id,
"external".to_string(),
cmd.content,
cmd.instructions,
meta_json,
delivery.as_str().to_string(),
unix_timestamp_now()?,
);
state_db.create_thread_message(&message_params).await?;
println!(
"Queued message {} for thread {}.",
message_params.id, message_params.thread_id
);
Ok(())
}
fn queue_timer_success_message(
timer_id: &str,
thread_id: &str,
dtstart: &str,
pending_run: bool,
) -> String {
if pending_run {
format!("Queued timer {timer_id} for thread {thread_id}; it is due now.")
} else {
let local_time = dtstart.replace('T', " ");
format!(
"Queued timer {timer_id} for thread {thread_id}; it will fire at {local_time} local time."
)
}
}
async fn resolve_queue_thread_id(codex_home: &Path, target: &str) -> anyhow::Result<String> {
if let Ok(thread_id) = ThreadId::from_string(target) {
return Ok(thread_id.to_string());
}
let thread_ids = codex_core::find_thread_ids_by_name(codex_home, target).await?;
match thread_ids.as_slice() {
[] => anyhow::bail!("no thread named `{target}`"),
[thread_id] => Ok(thread_id.to_string()),
_ => anyhow::bail!("more than one thread is named `{target}`; use a thread id instead"),
}
}
fn validate_queue_command(cmd: &QueueCommand) -> anyhow::Result<()> {
if cmd.at.is_some() && cmd.instructions.is_some() {
anyhow::bail!("--instructions is not supported with --at");
}
Ok(())
}
fn unix_timestamp_now() -> anyhow::Result<i64> {
let duration = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|err| anyhow::anyhow!("system clock is before unix epoch: {err}"))?;
i64::try_from(duration.as_secs()).map_err(|_| anyhow::anyhow!("current time is out of range"))
}
fn parse_queue_meta(entries: &[String]) -> anyhow::Result<BTreeMap<String, String>> {
let mut meta = BTreeMap::new();
for entry in entries {
let Some((key, value)) = entry.split_once('=') else {
anyhow::bail!("metadata entry `{entry}` must use key=value syntax");
};
validate_meta_key(key).map_err(anyhow::Error::msg)?;
if meta.insert(key.to_string(), value.to_string()).is_some() {
anyhow::bail!("duplicate metadata key `{key}`");
}
}
Ok(meta)
}
/// Prepend root-level overrides so they have lower precedence than
/// CLI-specific ones specified after the subcommand (if any).
fn prepend_config_flags(
@@ -1517,6 +1696,7 @@ mod tests {
use codex_protocol::ThreadId;
use codex_protocol::protocol::TokenUsage;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
fn finalize_resume_from_args(args: &[&str]) -> TuiCli {
let cli = MultitoolCli::try_parse_from(args).expect("parse");
@@ -1575,6 +1755,123 @@ mod tests {
finalize_fork_interactive(interactive, root_overrides, session_id, last, all, fork_cli)
}
#[test]
fn queue_command_parses_immediate_message() {
let cli = MultitoolCli::try_parse_from([
"codex",
"queue",
"--thread",
"thread-1",
"--content",
"do work",
"--instructions",
"be brief",
"--meta",
"ticket=ABC_123",
"--steer",
])
.expect("parse");
let Some(Subcommand::Queue(cmd)) = cli.subcommand else {
unreachable!()
};
assert_eq!(cmd.thread, "thread-1");
assert_eq!(cmd.content, "do work");
assert_eq!(cmd.instructions, Some("be brief".to_string()));
assert_eq!(cmd.meta, vec!["ticket=ABC_123".to_string()]);
assert!(cmd.steer);
assert_eq!(cmd.at, None);
}
#[test]
fn queue_meta_rejects_invalid_and_duplicate_keys() {
assert!(parse_queue_meta(&["bad-key=value".to_string()]).is_err());
assert!(parse_queue_meta(&["ticket=one".to_string(), "ticket=two".to_string()]).is_err());
}
#[test]
fn queue_at_rejects_instructions() {
let cli = MultitoolCli::try_parse_from([
"codex",
"queue",
"--thread",
"thread-1",
"--content",
"do work",
"--instructions",
"be brief",
"--at",
"2026-04-10T12:00:00",
])
.expect("parse");
let Some(Subcommand::Queue(cmd)) = cli.subcommand else {
unreachable!()
};
let err = validate_queue_command(&cmd).expect_err("queue command should be rejected");
assert_eq!(err.to_string(), "--instructions is not supported with --at");
}
#[test]
fn queued_timer_output_reports_fire_time() {
assert_eq!(
queue_timer_success_message("timer-1", "thread-1", "2026-04-10T08:59:00", false),
"Queued timer timer-1 for thread thread-1; it will fire at 2026-04-10 08:59:00 local time."
);
}
#[test]
fn queued_timer_output_reports_due_now() {
assert_eq!(
queue_timer_success_message("timer-1", "thread-1", "2026-04-10T08:59:00", true),
"Queued timer timer-1 for thread thread-1; it is due now."
);
}
#[tokio::test]
async fn queue_thread_resolves_thread_name() {
let temp = TempDir::new().expect("tempdir");
let thread_id = ThreadId::new();
codex_core::append_thread_name(temp.path(), thread_id, "named-thread")
.await
.expect("append thread name");
assert_eq!(
resolve_queue_thread_id(temp.path(), "named-thread")
.await
.expect("resolve"),
thread_id.to_string()
);
}
#[tokio::test]
async fn queue_thread_name_rejects_missing_and_ambiguous_names() {
let temp = TempDir::new().expect("tempdir");
let first = ThreadId::new();
let second = ThreadId::new();
codex_core::append_thread_name(temp.path(), first, "same")
.await
.expect("append first name");
codex_core::append_thread_name(temp.path(), second, "same")
.await
.expect("append second name");
assert_eq!(
resolve_queue_thread_id(temp.path(), "missing")
.await
.expect_err("missing name should fail")
.to_string(),
"no thread named `missing`"
);
assert_eq!(
resolve_queue_thread_id(temp.path(), "same")
.await
.expect_err("ambiguous name should fail")
.to_string(),
"more than one thread is named `same`; use a thread id instead"
);
}
#[test]
fn exec_resume_last_accepts_prompt_positional() {
let cli =