feat: add debug clear-memories command to hard-wipe memories state (#13085)

#### what
adds a `codex debug clear-memories` command to help with clearing all
memories state from disk, sqlite db, and marking threads as
`memory_mode=disabled` so they don't get resummarized when the
`memories` feature is re-enabled.

#### tests
add tests
This commit is contained in:
sayan-oai
2026-02-27 17:45:55 -08:00
committed by GitHub
parent 8c1e3f3e64
commit 033ef9cb9d
5 changed files with 344 additions and 0 deletions

View File

@@ -30,6 +30,19 @@ impl StateRuntime {
/// stage-1 (`memory_stage1`) and phase-2 (`memory_consolidate_global`)
/// memory pipelines.
pub async fn clear_memory_data(&self) -> anyhow::Result<()> {
self.clear_memory_data_inner(false).await
}
/// Resets persisted memory state for a clean-slate local start.
///
/// In addition to clearing persisted stage-1 outputs and memory pipeline
/// jobs, this disables memory generation for all existing threads so
/// historical rollouts are not immediately picked up again.
pub async fn reset_memory_data_for_fresh_start(&self) -> anyhow::Result<()> {
self.clear_memory_data_inner(true).await
}
async fn clear_memory_data_inner(&self, disable_existing_threads: bool) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
@@ -51,6 +64,18 @@ WHERE kind = ? OR kind = ?
.execute(&mut *tx)
.await?;
if disable_existing_threads {
sqlx::query(
r#"
UPDATE threads
SET memory_mode = 'disabled'
WHERE memory_mode = 'enabled'
"#,
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
@@ -1158,6 +1183,8 @@ ON CONFLICT(kind, job_key) DO UPDATE SET
#[cfg(test)]
mod tests {
use super::JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL;
use super::JOB_KIND_MEMORY_STAGE1;
use super::StateRuntime;
use super::test_support::test_thread_metadata;
use super::test_support::unique_temp_dir;
@@ -1664,6 +1691,115 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn reset_memory_data_for_fresh_start_clears_rows_and_disables_threads() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let now = Utc::now() - Duration::hours(13);
let worker_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("worker id");
let enabled_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("enabled thread id");
let disabled_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("disabled thread id");
let mut enabled =
test_thread_metadata(&codex_home, enabled_thread_id, codex_home.join("enabled"));
enabled.created_at = now;
enabled.updated_at = now;
runtime
.upsert_thread(&enabled)
.await
.expect("upsert enabled thread");
let claim = runtime
.try_claim_stage1_job(
enabled_thread_id,
worker_id,
enabled.updated_at.timestamp(),
3600,
64,
)
.await
.expect("claim enabled thread");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
enabled_thread_id,
ownership_token.as_str(),
enabled.updated_at.timestamp(),
"raw",
"summary",
None,
)
.await
.expect("mark enabled thread succeeded"),
"stage1 success should be recorded"
);
runtime
.enqueue_global_consolidation(enabled.updated_at.timestamp())
.await
.expect("enqueue global consolidation");
let mut disabled =
test_thread_metadata(&codex_home, disabled_thread_id, codex_home.join("disabled"));
disabled.created_at = now;
disabled.updated_at = now;
runtime
.upsert_thread(&disabled)
.await
.expect("upsert disabled thread");
sqlx::query("UPDATE threads SET memory_mode = 'disabled' WHERE id = ?")
.bind(disabled_thread_id.to_string())
.execute(runtime.pool.as_ref())
.await
.expect("disable existing thread");
runtime
.reset_memory_data_for_fresh_start()
.await
.expect("reset memory data");
let stage1_outputs_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM stage1_outputs")
.fetch_one(runtime.pool.as_ref())
.await
.expect("count stage1 outputs");
assert_eq!(stage1_outputs_count, 0);
let memory_jobs_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM jobs WHERE kind = ? OR kind = ?")
.bind(JOB_KIND_MEMORY_STAGE1)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.fetch_one(runtime.pool.as_ref())
.await
.expect("count memory jobs");
assert_eq!(memory_jobs_count, 0);
let enabled_memory_mode: String =
sqlx::query_scalar("SELECT memory_mode FROM threads WHERE id = ?")
.bind(enabled_thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("read enabled thread memory mode");
assert_eq!(enabled_memory_mode, "disabled");
let disabled_memory_mode: String =
sqlx::query_scalar("SELECT memory_mode FROM threads WHERE id = ?")
.bind(disabled_thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("read disabled thread memory mode");
assert_eq!(disabled_memory_mode, "disabled");
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_enforces_global_running_cap() {
let codex_home = unique_temp_dir();