mirror of
https://github.com/openai/codex.git
synced 2026-05-02 04:11:39 +03:00
483 lines
15 KiB
Rust
483 lines
15 KiB
Rust
use anyhow::Result;
|
|
use anyhow::anyhow;
|
|
use chrono::Utc;
|
|
use codex_core::messages::MessagePayload;
|
|
use codex_core::timers::TIMER_FIRED_BACKGROUND_EVENT_PREFIX;
|
|
use codex_core::timers::ThreadTimer;
|
|
use codex_core::timers::ThreadTimerTrigger;
|
|
use codex_core::timers::TimerDelivery;
|
|
use codex_features::Feature;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use core_test_support::responses::ev_assistant_message;
|
|
use core_test_support::responses::ev_completed;
|
|
use core_test_support::responses::ev_response_created;
|
|
use core_test_support::responses::mount_sse_once;
|
|
use core_test_support::responses::mount_sse_sequence;
|
|
use core_test_support::responses::sse;
|
|
use core_test_support::responses::start_mock_server;
|
|
use core_test_support::test_codex::test_codex;
|
|
use core_test_support::wait_for_event;
|
|
use core_test_support::wait_for_event_match;
|
|
use core_test_support::wait_for_event_with_timeout;
|
|
use pretty_assertions::assert_eq;
|
|
use std::time::Duration;
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn create_timer_emits_fired_background_event_when_timer_starts() -> Result<()> {
|
|
assert_after_turn_timer_starts_and_emits_fired_event().await
|
|
}
|
|
|
|
#[tokio::test(flavor = "current_thread")]
|
|
async fn create_timer_starts_on_current_thread_runtime() -> Result<()> {
|
|
assert_after_turn_timer_starts_and_emits_fired_event().await
|
|
}
|
|
|
|
async fn assert_after_turn_timer_starts_and_emits_fired_event() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mock = mount_sse_once(
|
|
&server,
|
|
sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "timer ran"),
|
|
ev_completed("resp-1"),
|
|
]),
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::Timers)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
let created = test
|
|
.codex
|
|
.create_timer(
|
|
ThreadTimerTrigger::Delay {
|
|
seconds: 0,
|
|
repeat: None,
|
|
},
|
|
MessagePayload {
|
|
content: "run timer".to_string(),
|
|
instructions: None,
|
|
meta: Default::default(),
|
|
},
|
|
TimerDelivery::AfterTurn,
|
|
)
|
|
.await
|
|
.map_err(|err| anyhow!("{err}"))?;
|
|
|
|
let payload = wait_for_event_match(&test.codex, |event| match event {
|
|
EventMsg::BackgroundEvent(event) => event
|
|
.message
|
|
.strip_prefix(TIMER_FIRED_BACKGROUND_EVENT_PREFIX)
|
|
.map(str::to_owned),
|
|
_ => None,
|
|
})
|
|
.await;
|
|
let fired: ThreadTimer = serde_json::from_str(&payload)?;
|
|
assert_eq!(fired, created);
|
|
|
|
wait_for_event(&test.codex, |event| {
|
|
matches!(event, EventMsg::TurnComplete(_))
|
|
})
|
|
.await;
|
|
|
|
let user_messages = mock.single_request().message_input_texts("user");
|
|
let timer_messages = user_messages
|
|
.iter()
|
|
.filter(|message| message.contains("<codex_message>"))
|
|
.collect::<Vec<_>>();
|
|
assert_eq!(timer_messages.len(), 1);
|
|
let timer_message = timer_messages[0];
|
|
assert!(timer_message.contains(&format!("<source>timer {}</source>", created.id)));
|
|
assert!(timer_message.contains("<content>\nTimer fired: run timer\n</content>"));
|
|
assert!(timer_message.contains("<instructions>\nrun timer\n\nThis one-shot timer"));
|
|
assert!(timer_message.contains("<meta />"));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn create_timer_persists_source_and_client_metadata() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::Timers)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
let created = test
|
|
.codex
|
|
.create_timer(
|
|
ThreadTimerTrigger::Delay {
|
|
seconds: 60,
|
|
repeat: Some(true),
|
|
},
|
|
MessagePayload {
|
|
content: "run timer".to_string(),
|
|
instructions: None,
|
|
meta: Default::default(),
|
|
},
|
|
TimerDelivery::AfterTurn,
|
|
)
|
|
.await
|
|
.map_err(|err| anyhow!("{err}"))?;
|
|
|
|
let db = test.codex.state_db().expect("state db enabled");
|
|
let timers = db
|
|
.list_thread_timers(&test.session_configured.session_id.to_string())
|
|
.await?;
|
|
|
|
assert_eq!(timers.len(), 1);
|
|
assert_eq!(timers[0].id, created.id);
|
|
assert_eq!(timers[0].source, "agent");
|
|
assert_eq!(timers[0].client_id, "codex-cli");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn create_timer_lazily_opens_sqlite_for_ephemeral_thread() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config.ephemeral = true;
|
|
config
|
|
.features
|
|
.enable(Feature::Timers)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
|
|
let created = test
|
|
.codex
|
|
.create_timer(
|
|
ThreadTimerTrigger::Delay {
|
|
seconds: 60,
|
|
repeat: Some(true),
|
|
},
|
|
MessagePayload {
|
|
content: "run timer".to_string(),
|
|
instructions: None,
|
|
meta: Default::default(),
|
|
},
|
|
TimerDelivery::AfterTurn,
|
|
)
|
|
.await
|
|
.map_err(|err| anyhow!("{err}"))?;
|
|
|
|
assert_eq!(
|
|
test.codex.list_timers().await,
|
|
vec![created],
|
|
"ephemeral threads should still open sqlite timer storage lazily"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn list_timers_discovers_externally_inserted_timer() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::Timers)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
let db = test.codex.state_db().expect("state db enabled");
|
|
let created_at = Utc::now().timestamp();
|
|
|
|
db.create_thread_timer(&codex_state::ThreadTimerCreateParams {
|
|
id: "external-timer".to_string(),
|
|
thread_id: test.session_configured.session_id.to_string(),
|
|
source: "client".to_string(),
|
|
client_id: "external-client".to_string(),
|
|
trigger_json: r#"{"kind":"delay","seconds":60,"repeat":true}"#.to_string(),
|
|
content: "external timer".to_string(),
|
|
instructions: None,
|
|
meta_json: "{}".to_string(),
|
|
delivery: "after-turn".to_string(),
|
|
created_at,
|
|
next_run_at: Some(created_at + 60),
|
|
last_run_at: None,
|
|
pending_run: false,
|
|
})
|
|
.await?;
|
|
|
|
let timers = test.codex.list_timers().await;
|
|
|
|
assert_eq!(timers.len(), 1);
|
|
assert_eq!(timers[0].id, "external-timer");
|
|
assert_eq!(
|
|
timers[0].trigger,
|
|
ThreadTimerTrigger::Delay {
|
|
seconds: 60,
|
|
repeat: Some(true),
|
|
}
|
|
);
|
|
assert_eq!(timers[0].content, "external timer");
|
|
assert_eq!(timers[0].delivery, TimerDelivery::AfterTurn);
|
|
assert_eq!(timers[0].created_at, created_at);
|
|
assert_eq!(timers[0].last_run_at, None);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn queued_messages_feature_consumes_messages_without_timers() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mock = mount_sse_sequence(
|
|
&server,
|
|
vec![
|
|
sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "first turn"),
|
|
ev_completed("resp-1"),
|
|
]),
|
|
sse(vec![
|
|
ev_response_created("resp-2"),
|
|
ev_assistant_message("msg-2", "queued turn"),
|
|
ev_completed("resp-2"),
|
|
]),
|
|
],
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::QueuedMessages)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
let db = test.codex.state_db().expect("state db enabled");
|
|
let thread_id = test.session_configured.session_id.to_string();
|
|
db.create_thread_message(&codex_state::ThreadMessageCreateParams::new(
|
|
thread_id,
|
|
"external".to_string(),
|
|
"queued hello".to_string(),
|
|
/*instructions*/ None,
|
|
"{}".to_string(),
|
|
TimerDelivery::AfterTurn.as_str().to_string(),
|
|
Utc::now().timestamp(),
|
|
))
|
|
.await?;
|
|
|
|
test.submit_turn("start").await?;
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| matches!(event, EventMsg::InjectedMessage(_)),
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| matches!(event, EventMsg::TurnComplete(_)),
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
|
|
let requests = mock.requests();
|
|
assert_eq!(requests.len(), 2);
|
|
assert!(
|
|
requests[1]
|
|
.message_input_texts("user")
|
|
.iter()
|
|
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mock = mount_sse_sequence(
|
|
&server,
|
|
vec![
|
|
sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "timer turn"),
|
|
ev_completed("resp-1"),
|
|
]),
|
|
sse(vec![
|
|
ev_response_created("resp-2"),
|
|
ev_assistant_message("msg-2", "queued turn"),
|
|
ev_completed("resp-2"),
|
|
]),
|
|
],
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::Timers)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::QueuedMessages)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
let db = test.codex.state_db().expect("state db enabled");
|
|
let timer = test
|
|
.codex
|
|
.create_timer(
|
|
ThreadTimerTrigger::Delay {
|
|
seconds: 0,
|
|
repeat: Some(true),
|
|
},
|
|
MessagePayload {
|
|
content: "keep going".to_string(),
|
|
instructions: None,
|
|
meta: Default::default(),
|
|
},
|
|
TimerDelivery::AfterTurn,
|
|
)
|
|
.await
|
|
.map_err(|err| anyhow!("{err}"))?;
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| match event {
|
|
EventMsg::InjectedMessage(event) => event.source == format!("timer {}", timer.id),
|
|
_ => false,
|
|
},
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
let thread_id = test.session_configured.session_id.to_string();
|
|
db.create_thread_message(&codex_state::ThreadMessageCreateParams::new(
|
|
thread_id,
|
|
"external".to_string(),
|
|
"queued hello".to_string(),
|
|
/*instructions*/ None,
|
|
"{}".to_string(),
|
|
TimerDelivery::AfterTurn.as_str().to_string(),
|
|
Utc::now().timestamp(),
|
|
))
|
|
.await?;
|
|
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| matches!(event, EventMsg::TurnComplete(_)),
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
assert!(
|
|
test.codex
|
|
.delete_timer(&timer.id)
|
|
.await
|
|
.map_err(|err| anyhow!("{err}"))?,
|
|
"test should delete the idle recurring timer before it can schedule another turn"
|
|
);
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| match event {
|
|
EventMsg::InjectedMessage(event) => event.source == "external",
|
|
_ => false,
|
|
},
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
wait_for_event_with_timeout(
|
|
&test.codex,
|
|
|event| matches!(event, EventMsg::TurnComplete(_)),
|
|
Duration::from_secs(20),
|
|
)
|
|
.await;
|
|
|
|
let requests = mock.requests();
|
|
assert!(
|
|
requests.len() >= 2,
|
|
"expected timer and queued-message turns to run"
|
|
);
|
|
assert!(
|
|
requests[0]
|
|
.message_input_texts("user")
|
|
.iter()
|
|
.any(|message| message.contains("<content>\nTimer fired: keep going\n</content>"))
|
|
);
|
|
assert!(
|
|
requests[1]
|
|
.message_input_texts("user")
|
|
.iter()
|
|
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn queued_messages_feature_disabled_leaves_messages_queued() -> Result<()> {
|
|
let server = start_mock_server().await;
|
|
let mock = mount_sse_once(
|
|
&server,
|
|
sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "first turn"),
|
|
ev_completed("resp-1"),
|
|
]),
|
|
)
|
|
.await;
|
|
|
|
let mut builder = test_codex().with_config(|config| {
|
|
config
|
|
.features
|
|
.enable(Feature::Sqlite)
|
|
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
|
});
|
|
let test = builder.build(&server).await?;
|
|
let db = test.codex.state_db().expect("state db enabled");
|
|
let thread_id = test.session_configured.session_id.to_string();
|
|
db.create_thread_message(&codex_state::ThreadMessageCreateParams::new(
|
|
thread_id.clone(),
|
|
"external".to_string(),
|
|
"queued hello".to_string(),
|
|
/*instructions*/ None,
|
|
"{}".to_string(),
|
|
TimerDelivery::AfterTurn.as_str().to_string(),
|
|
Utc::now().timestamp(),
|
|
))
|
|
.await?;
|
|
|
|
test.submit_turn("start").await?;
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
|
|
assert_eq!(mock.requests().len(), 1);
|
|
assert!(
|
|
db.claim_next_thread_message(
|
|
&thread_id, /*can_after_turn*/ true, /*can_steer_current_turn*/ true,
|
|
)
|
|
.await?
|
|
.is_some()
|
|
);
|
|
|
|
Ok(())
|
|
}
|