diff --git a/codex-rs/state/src/migrations.rs b/codex-rs/state/src/migrations.rs index 6e7b9d363c..883129a943 100644 --- a/codex-rs/state/src/migrations.rs +++ b/codex-rs/state/src/migrations.rs @@ -1,4 +1,29 @@ +use std::borrow::Cow; + use sqlx::migrate::Migrator; pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations"); pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations"); + +/// Allow an older Codex binary to open a database that has already been +/// migrated by a newer binary running in parallel. +/// +/// We intentionally ignore applied migration versions that are newer than the +/// embedded migration set. Known migration versions are still validated by +/// checksum, so this only relaxes the "database is ahead of me" case. +fn runtime_migrator(base: &'static Migrator) -> Migrator { + Migrator { + migrations: Cow::Borrowed(base.migrations.as_ref()), + ignore_missing: true, + locking: base.locking, + no_tx: base.no_tx, + } +} + +pub(crate) fn runtime_state_migrator() -> Migrator { + runtime_migrator(&STATE_MIGRATOR) +} + +pub(crate) fn runtime_logs_migrator() -> Migrator { + runtime_migrator(&LOGS_MIGRATOR) +} diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 4d3af301b2..d3d81d87d3 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -17,8 +17,8 @@ use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; -use crate::migrations::LOGS_MIGRATOR; -use crate::migrations::STATE_MIGRATOR; +use crate::migrations::runtime_logs_migrator; +use crate::migrations::runtime_state_migrator; use crate::model::AgentJobRow; use crate::model::ThreadRow; use crate::model::anchor_from_item; @@ -83,6 +83,8 @@ impl StateRuntime { /// rest of the state store. pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; + let state_migrator = runtime_state_migrator(); + let logs_migrator = runtime_logs_migrator(); let current_state_name = state_db_filename(); let current_logs_name = logs_db_filename(); remove_legacy_db_files( @@ -101,14 +103,14 @@ impl StateRuntime { .await; let state_path = state_db_path(codex_home.as_path()); let logs_path = logs_db_path(codex_home.as_path()); - let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await { + let pool = match open_state_sqlite(&state_path, &state_migrator).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); return Err(err); } }; - let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await { + let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); @@ -146,7 +148,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { .log_statements(LevelFilter::Off) } -async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { +async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); let pool = SqlitePoolOptions::new() .max_connections(5) @@ -172,7 +174,7 @@ async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow:: Ok(pool) } -async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { +async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result { let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); let pool = SqlitePoolOptions::new() .max_connections(5) @@ -268,3 +270,74 @@ fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) - }; !version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit()) } + +#[cfg(test)] +mod tests { + use super::open_state_sqlite; + use super::runtime_state_migrator; + use super::state_db_path; + use super::test_support::unique_temp_dir; + use crate::migrations::STATE_MIGRATOR; + use sqlx::SqlitePool; + use sqlx::migrate::MigrateError; + use sqlx::sqlite::SqliteConnectOptions; + use std::path::Path; + + async fn open_db_pool(path: &Path) -> SqlitePool { + SqlitePool::connect_with( + SqliteConnectOptions::new() + .filename(path) + .create_if_missing(false), + ) + .await + .expect("open sqlite pool") + } + + #[tokio::test] + async fn open_state_sqlite_tolerates_newer_applied_migrations() { + let codex_home = unique_temp_dir(); + tokio::fs::create_dir_all(&codex_home) + .await + .expect("create codex home"); + let state_path = state_db_path(codex_home.as_path()); + let pool = SqlitePool::connect_with( + SqliteConnectOptions::new() + .filename(&state_path) + .create_if_missing(true), + ) + .await + .expect("open state db"); + STATE_MIGRATOR + .run(&pool) + .await + .expect("apply current state schema"); + sqlx::query( + "INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)", + ) + .bind(9_999_i64) + .bind("future migration") + .bind(true) + .bind(vec![1_u8, 2, 3, 4]) + .bind(1_i64) + .execute(&pool) + .await + .expect("insert future migration record"); + pool.close().await; + + let strict_pool = open_db_pool(state_path.as_path()).await; + let strict_err = STATE_MIGRATOR + .run(&strict_pool) + .await + .expect_err("strict migrator should reject newer applied migrations"); + assert!(matches!(strict_err, MigrateError::VersionMissing(9_999))); + strict_pool.close().await; + + let tolerant_migrator = runtime_state_migrator(); + let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator) + .await + .expect("runtime migrator should tolerate newer applied migrations"); + tolerant_pool.close().await; + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } +}