Compare commits

...

1 Commits

Author SHA1 Message Date
Charles Cunningham
de6fb6767c Cache state DB runtime for repeated lookups
Reuse a cached StateRuntime for ad hoc core state DB lookups, but keep state_db::init() on its own runtime so session-owned work does not share a single SQLite pool. Allow no-provider open_if_present() callers to initialize and cache an ad hoc runtime so rollout path fallback and DB read-repair still work, and treat empty-provider cached runtimes as reusable for later ad hoc lookups. Prune dead cached runtimes on insert so one-off sqlite homes do not accumulate stale cache keys for the process lifetime.

Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:43:59 -07:00
2 changed files with 169 additions and 23 deletions

View File

@@ -14,33 +14,36 @@ use codex_protocol::protocol::SessionSource;
pub use codex_state::LogEntry;
use codex_state::ThreadMetadataBuilder;
use serde_json::Value;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use std::sync::Weak;
use tracing::warn;
use uuid::Uuid;
/// Core-facing handle to the SQLite-backed state runtime.
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
#[derive(Clone)]
struct CachedStateDb {
default_provider: String,
runtime: Weak<codex_state::StateRuntime>,
}
static STATE_DB_CACHE: LazyLock<StdMutex<HashMap<PathBuf, CachedStateDb>>> =
LazyLock::new(|| StdMutex::new(HashMap::new()));
/// Initialize the state runtime for thread state persistence and backfill checks. To only be used
/// inside `core`. The initialization should not be done anywhere else.
pub(crate) async fn init(config: &Config) -> Option<StateDbHandle> {
let runtime = match codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
let runtime = create_runtime(
config.sqlite_home.as_path(),
config.model_provider_id.as_str(),
)
.await
{
Ok(runtime) => runtime,
Err(err) => {
warn!(
"failed to initialize state runtime at {}: {err}",
config.sqlite_home.display()
);
return None;
}
};
.await?;
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
@@ -67,12 +70,11 @@ pub async fn get_state_db(config: &Config) -> Option<StateDbHandle> {
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
return None;
}
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
let runtime = get_or_init_runtime(
config.sqlite_home.as_path(),
config.model_provider_id.as_str(),
)
.await
.ok()?;
.await?;
require_backfill_complete(runtime, config.sqlite_home.as_path()).await
}
@@ -84,13 +86,73 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
return None;
}
let runtime =
codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string())
.await
.ok()?;
let runtime = get_or_init_runtime(codex_home, default_provider).await?;
require_backfill_complete(runtime, codex_home).await
}
fn cached_runtime(codex_home: &Path, default_provider: Option<&str>) -> Option<StateDbHandle> {
let mut cache = STATE_DB_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let cached = cache.get(codex_home)?.clone();
let runtime = match cached.runtime.upgrade() {
Some(runtime) => runtime,
None => {
cache.remove(codex_home);
return None;
}
};
if let Some(default_provider) = default_provider
&& !default_provider.is_empty()
&& !cached.default_provider.is_empty()
&& cached.default_provider != default_provider
{
return None;
}
Some(runtime)
}
fn prune_stale_cached_runtimes(cache: &mut HashMap<PathBuf, CachedStateDb>) {
cache.retain(|_, cached| cached.runtime.strong_count() > 0);
}
async fn get_or_init_runtime(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
if let Some(runtime) = cached_runtime(codex_home, Some(default_provider)) {
return Some(runtime);
}
create_runtime(codex_home, default_provider).await
}
async fn create_runtime(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
let runtime = match codex_state::StateRuntime::init(
codex_home.to_path_buf(),
default_provider.to_string(),
)
.await
{
Ok(runtime) => runtime,
Err(err) => {
warn!(
"failed to initialize state runtime at {}: {err}",
codex_home.display()
);
return None;
}
};
let mut cache = STATE_DB_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
prune_stale_cached_runtimes(&mut cache);
cache.insert(
codex_home.to_path_buf(),
CachedStateDb {
default_provider: default_provider.to_string(),
runtime: Arc::downgrade(&runtime),
},
);
Some(runtime)
}
async fn require_backfill_complete(
runtime: StateDbHandle,
codex_home: &Path,

View File

@@ -1,6 +1,8 @@
use super::*;
use crate::config::test_config;
use crate::rollout::list::parse_cursor;
use pretty_assertions::assert_eq;
use std::sync::Arc;
#[test]
fn cursor_to_anchor_normalizes_timestamp_format() {
@@ -19,3 +21,85 @@ fn cursor_to_anchor_normalizes_timestamp_format() {
assert_eq!(anchor.id, uuid);
assert_eq!(anchor.ts, expected_ts);
}
#[tokio::test]
async fn get_state_db_reuses_cached_runtime() {
let config = test_config();
let runtime = init(&config).await.expect("initialize state db");
runtime
.mark_backfill_complete(None)
.await
.expect("mark backfill complete");
let cached = get_state_db(&config).await.expect("get cached runtime");
assert!(Arc::ptr_eq(&runtime, &cached));
}
#[tokio::test]
async fn init_creates_distinct_runtime_for_session_owned_pool() {
let config = test_config();
let first = init(&config).await.expect("initialize first state db");
first
.mark_backfill_complete(None)
.await
.expect("mark backfill complete");
let second = init(&config).await.expect("initialize second state db");
assert!(!Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn open_if_present_without_provider_initializes_and_caches_runtime() {
let config = test_config();
let seeded_runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("initialize seeded runtime");
seeded_runtime
.mark_backfill_complete(None)
.await
.expect("mark backfill complete");
drop(seeded_runtime);
let opened = open_if_present(config.sqlite_home.as_path(), "")
.await
.expect("initialize uncached runtime");
let reused = get_state_db(&config)
.await
.expect("reuse initialized runtime");
assert!(Arc::ptr_eq(&opened, &reused));
}
#[tokio::test]
async fn create_runtime_prunes_stale_cache_entries() {
let first = test_config();
let second = test_config();
let first_runtime = create_runtime(
first.sqlite_home.as_path(),
first.model_provider_id.as_str(),
)
.await
.expect("initialize first runtime");
drop(first_runtime);
let _second_runtime = create_runtime(
second.sqlite_home.as_path(),
second.model_provider_id.as_str(),
)
.await
.expect("initialize second runtime");
let cache = STATE_DB_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert!(!cache.contains_key(first.sqlite_home.as_path()));
assert!(cache.contains_key(second.sqlite_home.as_path()));
}