diff --git a/codex-rs/core/src/codex/timer_runtime.rs b/codex-rs/core/src/codex/timer_runtime.rs index adac1929c1..e7fbf85e66 100644 --- a/codex-rs/core/src/codex/timer_runtime.rs +++ b/codex-rs/core/src/codex/timer_runtime.rs @@ -20,6 +20,7 @@ use crate::pending_input::GeneratedMessageInput; use crate::pending_input::PendingInputItem; use crate::timers::ClaimedTimer; use crate::timers::CreateTimer; +use crate::timers::IdleRecurringTimerPolicy; use crate::timers::PersistedTimer; use crate::timers::RestoredTimerTask; use crate::timers::TIMER_FIRED_BACKGROUND_EVENT_PREFIX; @@ -194,18 +195,31 @@ impl Session { } pub(crate) async fn maybe_start_pending_timer(self: &Arc) { - if !self.try_start_pending_timer().await { - self.maybe_start_pending_message().await; + if self + .try_start_pending_timer(IdleRecurringTimerPolicy::IncludeOnlyNeverRun) + .await + { + return; } + if self.maybe_start_pending_message().await { + return; + } + self.try_start_pending_timer(IdleRecurringTimerPolicy::IncludeAll) + .await; } - async fn try_start_pending_timer(self: &Arc) -> bool { + async fn try_start_pending_timer( + self: &Arc, + idle_recurring_timer_policy: IdleRecurringTimerPolicy, + ) -> bool { let Some(ClaimedTimer { timer, context, deleted_one_shot_timer, .. - }) = self.claim_next_timer_for_delivery().await + }) = self + .claim_next_timer_for_delivery(idle_recurring_timer_policy) + .await else { return false; }; @@ -237,9 +251,9 @@ impl Session { true } - async fn maybe_start_pending_message(self: &Arc) { + async fn maybe_start_pending_message(self: &Arc) -> bool { let Some((input_item, delivery)) = self.claim_next_message_for_delivery().await else { - return; + return false; }; match delivery { @@ -260,6 +274,7 @@ impl Session { } } *self.timer_start_in_progress.lock().await = false; + true } async fn claim_next_message_for_delivery( @@ -349,7 +364,10 @@ impl Session { } } - async fn claim_next_timer_for_delivery(self: &Arc) -> Option { + async fn claim_next_timer_for_delivery( + self: &Arc, + idle_recurring_timer_policy: IdleRecurringTimerPolicy, + ) -> Option { if !self.timers_feature_enabled() { return None; } @@ -377,6 +395,7 @@ impl Session { Utc::now(), can_after_turn, active_turn_is_regular, + idle_recurring_timer_policy, ); let Some(claimed) = claimed else { *self.timer_start_in_progress.lock().await = false; diff --git a/codex-rs/core/src/timers.rs b/codex-rs/core/src/timers.rs index cc7e59903b..377d3ee9d0 100644 --- a/codex-rs/core/src/timers.rs +++ b/codex-rs/core/src/timers.rs @@ -83,6 +83,12 @@ pub(crate) struct ClaimedTimer { pub(crate) previous_last_run_at: Option, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum IdleRecurringTimerPolicy { + IncludeOnlyNeverRun, + IncludeAll, +} + #[derive(Debug)] pub(crate) struct CreateTimer { pub(crate) id: String, @@ -357,6 +363,7 @@ impl TimersState { now: chrono::DateTime, can_after_turn: bool, can_steer_current_turn: bool, + idle_recurring_timer_policy: IdleRecurringTimerPolicy, ) -> Option { let (next_timer_id, actual_delivery) = self .timers @@ -364,6 +371,11 @@ impl TimersState { .filter(|runtime| runtime.pending_run) .filter_map(|runtime| { if runtime.timer.trigger.is_idle_recurring() { + if idle_recurring_timer_policy == IdleRecurringTimerPolicy::IncludeOnlyNeverRun + && runtime.timer.last_run_at.is_some() + { + return None; + } if can_after_turn { return Some((runtime, TimerDelivery::AfterTurn)); } @@ -532,6 +544,7 @@ fn timer_message_instructions(timer: &TimerInvocationContext) -> Option #[cfg(test)] mod tests { use super::CreateTimer; + use super::IdleRecurringTimerPolicy; use super::MAX_ACTIVE_TIMERS_PER_THREAD; use super::PersistedTimer; use super::ThreadTimer; @@ -581,7 +594,10 @@ mod tests { let claimed = timers .claim_next_timer( - now, /*can_after_turn*/ true, /*can_steer_current_turn*/ true, + now, + /*can_after_turn*/ true, + /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeAll, ) .expect("timer should be claimed"); assert_eq!(claimed.context.current_timer_id, "timer-1"); @@ -636,6 +652,7 @@ mod tests { first_claimed_at, /*can_after_turn*/ true, /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeAll, ) .expect("first timer should be claimed"); assert_eq!(first.context.current_timer_id, "timer-1"); @@ -645,6 +662,7 @@ mod tests { second_claimed_at, /*can_after_turn*/ true, /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeAll, ) .expect("second timer should be claimed"); assert_eq!(second.context.current_timer_id, "timer-2"); @@ -674,7 +692,10 @@ mod tests { let claimed = timers .claim_next_timer( - now, /*can_after_turn*/ true, /*can_steer_current_turn*/ true, + now, + /*can_after_turn*/ true, + /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeAll, ) .expect("timer should be claimed"); assert!(!claimed.deleted_one_shot_timer); @@ -713,18 +734,66 @@ mod tests { assert_eq!( timers.claim_next_timer( - now, /*can_after_turn*/ false, /*can_steer_current_turn*/ true, + now, + /*can_after_turn*/ false, + /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeAll, ), None ); let claimed = timers .claim_next_timer( - now, /*can_after_turn*/ true, /*can_steer_current_turn*/ false, + now, + /*can_after_turn*/ true, + /*can_steer_current_turn*/ false, + IdleRecurringTimerPolicy::IncludeAll, ) .expect("timer should be claimed when idle"); assert_eq!(claimed.context.delivery, TimerDelivery::AfterTurn); } + #[test] + fn idle_recurring_policy_can_exclude_timer_that_already_ran() { + let now = Utc.timestamp_opt(100, 0).single().expect("valid timestamp"); + let mut timers = TimersState::default(); + timers + .create_timer( + CreateTimer { + id: "timer-1".to_string(), + trigger: delay(ZERO_SECONDS, Some(true)), + payload: MessagePayload { + content: "keep going".to_string(), + instructions: None, + meta: BTreeMap::new(), + }, + delivery: TimerDelivery::AfterTurn, + now, + }, + /*timer_cancel*/ None, + ) + .expect("timer should be created"); + + let claimed = timers + .claim_next_timer( + now, + /*can_after_turn*/ true, + /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeOnlyNeverRun, + ) + .expect("never-run idle timer should be claimed"); + assert_eq!(claimed.context.current_timer_id, "timer-1"); + + assert_eq!( + timers.claim_next_timer( + now, + /*can_after_turn*/ true, + /*can_steer_current_turn*/ true, + IdleRecurringTimerPolicy::IncludeOnlyNeverRun, + ), + None + ); + } + #[test] fn create_timer_rejects_more_than_maximum_active_timers() { let now = Utc.timestamp_opt(100, 0).single().expect("valid timestamp"); diff --git a/codex-rs/core/tests/suite/timers.rs b/codex-rs/core/tests/suite/timers.rs index d8abb4f009..5046cdb63d 100644 --- a/codex-rs/core/tests/suite/timers.rs +++ b/codex-rs/core/tests/suite/timers.rs @@ -310,6 +310,109 @@ async fn queued_messages_feature_consumes_messages_without_timers() -> Result<() Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> { + let server = start_mock_server().await; + let mock = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "timer turn"), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "queued turn"), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-3", "next timer turn"), + ev_completed("resp-3"), + ]), + ], + ) + .await; + + let mut builder = test_codex().with_config(|config| { + config + .features + .enable(Feature::Timers) + .unwrap_or_else(|err| panic!("test config should allow feature update: {err}")); + config + .features + .enable(Feature::QueuedMessages) + .unwrap_or_else(|err| panic!("test config should allow feature update: {err}")); + config + .features + .enable(Feature::Sqlite) + .unwrap_or_else(|err| panic!("test config should allow feature update: {err}")); + }); + let test = builder.build(&server).await?; + let db = test.codex.state_db().expect("state db enabled"); + let thread_id = test.session_configured.session_id.to_string(); + db.create_thread_message(&codex_state::ThreadMessageCreateParams::new( + thread_id, + "external".to_string(), + "queued hello".to_string(), + /*instructions*/ None, + "{}".to_string(), + TimerDelivery::AfterTurn.as_str().to_string(), + Utc::now().timestamp(), + )) + .await?; + + test.codex + .create_timer( + ThreadTimerTrigger::Delay { + seconds: 0, + repeat: Some(true), + }, + MessagePayload { + content: "keep going".to_string(), + instructions: None, + meta: Default::default(), + }, + TimerDelivery::AfterTurn, + ) + .await + .map_err(|err| anyhow!("{err}"))?; + + wait_for_event_with_timeout( + &test.codex, + |event| matches!(event, EventMsg::TurnComplete(_)), + Duration::from_secs(20), + ) + .await; + wait_for_event_with_timeout( + &test.codex, + |event| matches!(event, EventMsg::TurnComplete(_)), + Duration::from_secs(20), + ) + .await; + + let requests = mock.requests(); + assert!( + requests.len() >= 2, + "expected timer and queued-message turns to run" + ); + assert!( + requests[0] + .message_input_texts("user") + .iter() + .any(|message| message.contains("\nTimer fired: keep going\n")) + ); + assert!( + requests[1] + .message_input_texts("user") + .iter() + .any(|message| message.contains("\nqueued hello\n")) + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn queued_messages_feature_disabled_leaves_messages_queued() -> Result<()> { let server = start_mock_server().await;