Split timer and queued message flags

This commit is contained in:
Eric Traut
2026-04-10 14:43:56 -07:00
parent 2a6526c9fd
commit cbe466866c
11 changed files with 350 additions and 98 deletions

View File

@@ -1,20 +1,20 @@
//! Implementation for the `codex queue` command.
//!
//! The top-level CLI module owns command routing; this module owns the
//! queue-specific policy for resolving target threads, validating message
//! metadata, and writing either immediate messages or one-shot timers into the
//! SQLite state database.
//! queue-specific policy for resolving target threads and writing either
//! immediate messages or one-shot timers into the SQLite state database.
use clap::Parser;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::messages::MessagePayload;
use codex_core::messages::validate_meta_key;
use codex_core::timers::ThreadTimerStorageCreateParams;
use codex_core::timers::ThreadTimerTrigger;
use codex_core::timers::TimerDelivery;
use codex_core::timers::build_thread_timer_create_params;
use codex_core::timers::normalize_thread_timer_dtstart_input;
use codex_features::Feature;
use codex_features::Features;
use codex_protocol::ThreadId;
use codex_state::StateRuntime;
use codex_tui::Cli as TuiCli;
@@ -32,10 +32,6 @@ pub(crate) struct QueueCommand {
#[arg(long = "message", value_name = "TEXT")]
message: String,
/// Message metadata as key=value. May be repeated.
#[arg(long = "meta", value_name = "KEY=VALUE")]
meta: Vec<String>,
/// Queue a one-shot timer for a local datetime or time, e.g. 2026-04-10T09:30:00 or 09:30.
#[arg(long = "at", value_name = "WHEN")]
at: Option<String>,
@@ -46,7 +42,6 @@ pub(crate) async fn run_queue_command(
root_config_overrides: &CliConfigOverrides,
interactive: &TuiCli,
) -> anyhow::Result<()> {
let meta = parse_queue_meta(&cmd.meta)?;
let cli_kv_overrides = root_config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
@@ -56,6 +51,12 @@ pub(crate) async fn run_queue_command(
};
let config =
Config::load_with_cli_overrides_and_harness_overrides(cli_kv_overrides, overrides).await?;
let queue_mode = if cmd.at.is_some() {
QueueMode::Timer
} else {
QueueMode::ImmediateMessage
};
validate_queue_feature_flags(&config.features, queue_mode)?;
let thread_id = resolve_queue_thread_id(config.codex_home.as_path(), &cmd.thread).await?;
let state_db =
StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()).await?;
@@ -74,7 +75,7 @@ pub(crate) async fn run_queue_command(
payload: MessagePayload {
content: cmd.message,
instructions: None,
meta,
meta: BTreeMap::new(),
},
delivery,
})
@@ -92,13 +93,12 @@ pub(crate) async fn run_queue_command(
return Ok(());
}
let meta_json = serde_json::to_string(&meta)?;
let message_params = codex_state::ThreadMessageCreateParams::new(
thread_id,
"external".to_string(),
cmd.message,
None,
meta_json,
"{}".to_string(),
delivery.as_str().to_string(),
unix_timestamp_now()?,
);
@@ -110,6 +110,22 @@ pub(crate) async fn run_queue_command(
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QueueMode {
ImmediateMessage,
Timer,
}
fn validate_queue_feature_flags(features: &Features, mode: QueueMode) -> anyhow::Result<()> {
if !features.enabled(Feature::QueuedMessages) {
anyhow::bail!("codex queue requires the queued_messages feature");
}
if mode == QueueMode::Timer && !features.enabled(Feature::Timers) {
anyhow::bail!("codex queue --at requires the timers feature");
}
Ok(())
}
fn queue_timer_success_message(
timer_id: &str,
thread_id: &str,
@@ -146,20 +162,6 @@ fn unix_timestamp_now() -> anyhow::Result<i64> {
i64::try_from(duration.as_secs()).map_err(|_| anyhow::anyhow!("current time is out of range"))
}
fn parse_queue_meta(entries: &[String]) -> anyhow::Result<BTreeMap<String, String>> {
let mut meta = BTreeMap::new();
for entry in entries {
let Some((key, value)) = entry.split_once('=') else {
anyhow::bail!("metadata entry `{entry}` must use key=value syntax");
};
validate_meta_key(key).map_err(anyhow::Error::msg)?;
if meta.insert(key.to_string(), value.to_string()).is_some() {
anyhow::bail!("duplicate metadata key `{key}`");
}
}
Ok(meta)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -177,8 +179,6 @@ mod tests {
"thread-1",
"--message",
"do work",
"--meta",
"ticket=ABC_123",
])
.expect("parse");
let Some(Subcommand::Queue(cmd)) = cli.subcommand else {
@@ -187,7 +187,6 @@ mod tests {
assert_eq!(cmd.thread, "thread-1");
assert_eq!(cmd.message, "do work");
assert_eq!(cmd.meta, vec!["ticket=ABC_123".to_string()]);
assert_eq!(cmd.at, None);
}
@@ -214,9 +213,20 @@ mod tests {
}
#[test]
fn queue_meta_rejects_invalid_and_duplicate_keys() {
assert!(parse_queue_meta(&["bad-key=value".to_string()]).is_err());
assert!(parse_queue_meta(&["ticket=one".to_string(), "ticket=two".to_string()]).is_err());
fn queue_rejects_meta_flag() {
assert!(
MultitoolCli::try_parse_from([
"codex",
"queue",
"--thread",
"thread-1",
"--message",
"do work",
"--meta",
"ticket=ABC_123",
])
.is_err()
);
}
#[test]
@@ -272,6 +282,41 @@ mod tests {
assert_eq!(cmd.at, Some("2026-04-10T12:00:00".to_string()));
}
#[test]
fn queue_requires_queued_messages_feature() {
let mut features = Features::with_defaults();
let err = validate_queue_feature_flags(&features, QueueMode::ImmediateMessage)
.expect_err("queue should require queued_messages");
assert_eq!(
err.to_string(),
"codex queue requires the queued_messages feature"
);
features.enable(Feature::QueuedMessages);
validate_queue_feature_flags(&features, QueueMode::ImmediateMessage)
.expect("queued messages feature should permit immediate queue command");
}
#[test]
fn queue_at_requires_timers_feature() {
let mut features = Features::with_defaults();
features.enable(Feature::QueuedMessages);
let err = validate_queue_feature_flags(&features, QueueMode::Timer)
.expect_err("queue --at should require timers");
assert_eq!(
err.to_string(),
"codex queue --at requires the timers feature"
);
features.enable(Feature::Timers);
validate_queue_feature_flags(&features, QueueMode::Timer)
.expect("timers feature should permit queue --at");
}
#[test]
fn queued_timer_output_reports_fire_time() {
assert_eq!(