Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
9aaa6311ef nit 2 2026-02-17 18:23:58 +00:00
jif-oai
2f90b473a3 nit 2026-02-17 18:12:45 +00:00
jif-oai
c9a9c2028a feat: add phase 2 cooldown 2026-02-17 18:04:27 +00:00
2 changed files with 93 additions and 1 deletions

View File

@@ -2020,6 +2020,14 @@ WHERE kind = 'memory_stage1'
.await
.expect("enqueue global consolidation again");
sqlx::query("UPDATE jobs SET started_at = ? WHERE kind = ? AND job_key = ?")
.bind(Utc::now().timestamp() - (2 * 3_600) - 1)
.bind("memory_consolidate_global")
.bind("global")
.execute(runtime.pool.as_ref())
.await
.expect("set phase2 started_at outside cooldown");
let claim_rerun = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
@@ -2032,6 +2040,70 @@ WHERE kind = 'memory_stage1'
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_global_consolidation_claim_respects_cooldown() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.enqueue_global_consolidation(100)
.await
.expect("enqueue global consolidation");
let claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2");
let (ownership_token, input_watermark) = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("unexpected phase2 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await
.expect("mark phase2 succeeded"),
"phase2 success should finalize for current token"
);
runtime
.enqueue_global_consolidation(101)
.await
.expect("enqueue global consolidation again");
let cooldown_claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 during cooldown");
assert_eq!(cooldown_claim, Phase2JobClaimOutcome::SkippedNotDirty);
sqlx::query("UPDATE jobs SET started_at = ? WHERE kind = ? AND job_key = ?")
.bind(Utc::now().timestamp() - (2 * 3_600) - 1)
.bind("memory_consolidate_global")
.bind("global")
.execute(runtime.pool.as_ref())
.await
.expect("set phase2 started_at outside cooldown");
let claim_after_cooldown = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 after cooldown");
assert!(
matches!(claim_after_cooldown, Phase2JobClaimOutcome::Claimed { .. }),
"phase2 should claim after cooldown expires"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn list_stage1_outputs_for_global_returns_latest_outputs() {
let codex_home = unique_temp_dir();
@@ -2419,6 +2491,14 @@ VALUES (?, ?, ?, ?, ?)
.await
.expect("enqueue backfilled consolidation");
sqlx::query("UPDATE jobs SET started_at = ? WHERE kind = ? AND job_key = ?")
.bind(Utc::now().timestamp() - (2 * 3_600) - 1)
.bind("memory_consolidate_global")
.bind("global")
.execute(runtime.pool.as_ref())
.await
.expect("set phase2 started_at outside cooldown");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let claim_b = runtime
.try_claim_global_phase2_job(owner_b, 3_600)

View File

@@ -16,6 +16,7 @@ const JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL: &str = "memory_consolidate_global";
const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global";
const DEFAULT_RETRY_REMAINING: i64 = 3;
const PHASE2_CLAIM_COOLDOWN_SECONDS: i64 = 2 * 3_600;
impl StateRuntime {
/// Deletes all persisted memory state in one transaction.
@@ -640,7 +641,7 @@ WHERE kind = ? AND job_key = ?
let existing_job = sqlx::query(
r#"
SELECT status, lease_until, retry_at, retry_remaining, input_watermark, last_success_watermark
SELECT status, lease_until, retry_at, retry_remaining, input_watermark, last_success_watermark, started_at, last_error
FROM jobs
WHERE kind = ? AND job_key = ?
"#,
@@ -667,6 +668,8 @@ WHERE kind = ? AND job_key = ?
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
let retry_remaining: i64 = existing_job.try_get("retry_remaining")?;
let started_at: Option<i64> = existing_job.try_get("started_at")?;
let last_error: Option<String> = existing_job.try_get("last_error")?;
if retry_remaining <= 0 {
tx.commit().await?;
@@ -681,6 +684,15 @@ WHERE kind = ? AND job_key = ?
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedRunning);
}
if status != "running"
&& last_error.is_none()
&& started_at.is_some_and(|started_at| {
started_at > now.saturating_sub(PHASE2_CLAIM_COOLDOWN_SECONDS)
})
{
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
}
let rows_affected = sqlx::query(
r#"