fix(sqlite): don't hard fail migrator if DB is newer (#16924)

## Description

This PR makes the SQLite state runtime tolerate databases that have
already been migrated by a newer Codex binary.

Today, if an older CLI sees migration versions in `_sqlx_migrations`
that it doesn't know about, startup fails. This change relaxes that
check for the runtime migrators we use in `codex-state` so older
binaries can keep opening the DB in that case.

## Why

We can end up with mixed-version CLIs running against the same local
state DB. In that setup, treating "the database is ahead of me" as a
hard error is unnecessarily strict and breaks the older client even when
the migration history is otherwise fine.

## Follow-up

We still clean up versioned `state_*.sqlite` and `logs_*.sqlite` files
during init, so older binaries can treat newer DB files as legacy. That
should probably be tightened separately if we want mixed-version local
usage to be fully safe.
This commit is contained in:
Owen Lin
2026-04-06 12:16:31 -07:00
committed by GitHub
parent bd30bad96f
commit 9bb813353e
2 changed files with 104 additions and 6 deletions

View File

@@ -1,4 +1,29 @@
use std::borrow::Cow;
use sqlx::migrate::Migrator;
pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations");
/// Allow an older Codex binary to open a database that has already been
/// migrated by a newer binary running in parallel.
///
/// We intentionally ignore applied migration versions that are newer than the
/// embedded migration set. Known migration versions are still validated by
/// checksum, so this only relaxes the "database is ahead of me" case.
fn runtime_migrator(base: &'static Migrator) -> Migrator {
Migrator {
migrations: Cow::Borrowed(base.migrations.as_ref()),
ignore_missing: true,
locking: base.locking,
no_tx: base.no_tx,
}
}
pub(crate) fn runtime_state_migrator() -> Migrator {
runtime_migrator(&STATE_MIGRATOR)
}
pub(crate) fn runtime_logs_migrator() -> Migrator {
runtime_migrator(&LOGS_MIGRATOR)
}

View File

@@ -17,8 +17,8 @@ use crate::ThreadMetadata;
use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::LOGS_MIGRATOR;
use crate::migrations::STATE_MIGRATOR;
use crate::migrations::runtime_logs_migrator;
use crate::migrations::runtime_state_migrator;
use crate::model::AgentJobRow;
use crate::model::ThreadRow;
use crate::model::anchor_from_item;
@@ -83,6 +83,8 @@ impl StateRuntime {
/// rest of the state store.
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
tokio::fs::create_dir_all(&codex_home).await?;
let state_migrator = runtime_state_migrator();
let logs_migrator = runtime_logs_migrator();
let current_state_name = state_db_filename();
let current_logs_name = logs_db_filename();
remove_legacy_db_files(
@@ -101,14 +103,14 @@ impl StateRuntime {
.await;
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await {
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await {
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
@@ -146,7 +148,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
.log_statements(LevelFilter::Off)
}
async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
@@ -172,7 +174,7 @@ async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::
Ok(pool)
}
async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
@@ -268,3 +270,74 @@ fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -
};
!version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit())
}
#[cfg(test)]
mod tests {
use super::open_state_sqlite;
use super::runtime_state_migrator;
use super::state_db_path;
use super::test_support::unique_temp_dir;
use crate::migrations::STATE_MIGRATOR;
use sqlx::SqlitePool;
use sqlx::migrate::MigrateError;
use sqlx::sqlite::SqliteConnectOptions;
use std::path::Path;
async fn open_db_pool(path: &Path) -> SqlitePool {
SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false),
)
.await
.expect("open sqlite pool")
}
#[tokio::test]
async fn open_state_sqlite_tolerates_newer_applied_migrations() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let state_path = state_db_path(codex_home.as_path());
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&state_path)
.create_if_missing(true),
)
.await
.expect("open state db");
STATE_MIGRATOR
.run(&pool)
.await
.expect("apply current state schema");
sqlx::query(
"INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)",
)
.bind(9_999_i64)
.bind("future migration")
.bind(true)
.bind(vec![1_u8, 2, 3, 4])
.bind(1_i64)
.execute(&pool)
.await
.expect("insert future migration record");
pool.close().await;
let strict_pool = open_db_pool(state_path.as_path()).await;
let strict_err = STATE_MIGRATOR
.run(&strict_pool)
.await
.expect_err("strict migrator should reject newer applied migrations");
assert!(matches!(strict_err, MigrateError::VersionMissing(9_999)));
strict_pool.close().await;
let tolerant_migrator = runtime_state_migrator();
let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator)
.await
.expect("runtime migrator should tolerate newer applied migrations");
tolerant_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}