feat: mem v2 - PR1 (#11364)

# Memories migration plan (simplified global workflow)

## Target behavior

- One shared memory root only: `~/.codex/memories/`.
- No per-cwd memory buckets, no cwd hash handling.
- Phase 1 candidate rules:
- Not currently being processed unless the job lease is stale.
- Rollout updated within the max-age window (currently 30 days).
- Rollout idle for at least 12 hours (new constant).
- Global cap: at most 64 stage-1 jobs in `running` state at any time
(new invariant).
- Stage-1 model output shape (new):
- `rollout_slug` (accepted but ignored for now).
- `rollout_summary`.
- `raw_memory`.
- Phase-1 artifacts written under the shared root:
- `rollout_summaries/<thread_id>.md` for each rollout summary.
- `raw_memories.md` containing appended/merged raw memory paragraphs.
- Phase 2 runs one consolidation agent for the shared `memories/`
directory.
- Phase-2 lock is DB-backed with 1 hour lease and heartbeat/expiry.

## Current code map

- Core startup pipeline: `core/src/memories/startup/mod.rs`.
- Stage-1 request+parse: `core/src/memories/startup/extract.rs`,
`core/src/memories/stage_one.rs`, templates in
`core/templates/memories/`.
- File materialization: `core/src/memories/storage.rs`,
`core/src/memories/layout.rs`.
- Scope routing (cwd/user): `core/src/memories/scope.rs`,
`core/src/memories/startup/mod.rs`.
- DB job lifecycle and scope queueing: `state/src/runtime/memory.rs`.

## PR plan

## PR 1: Correct phase-1 selection invariants (no behavior-breaking
layout changes yet)

- Add `PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS: i64 = 12` in
`core/src/memories/mod.rs`.
- Thread this into `state::claim_stage1_jobs_for_startup(...)`.
- Enforce idle-time filter in DB selection logic (not only in-memory
filtering after `scan_limit`) so eligible threads are not starved by
very recent threads.
- Enforce global running cap of 64 at claim time in DB logic:
- Count fresh `memory_stage1` running jobs.
- Only allow new claims while count < cap.
- Keep stale-lease takeover behavior intact.
- Add/adjust tests in `state/src/runtime.rs`:
- Idle filter inclusion/exclusion around 12h boundary.
- Global running-cap guarantee.
- Existing stale/fresh ownership behavior still passes.

Acceptance criteria:
- Startup never creates more than 64 fresh `memory_stage1` running jobs.
- Threads updated <12h ago are skipped.
- Threads older than 30d are skipped.

## PR 2: Stage-1 output contract + storage artifacts
(forward-compatible)

- Update parser/types to accept the new structured output while keeping
backward compatibility:
- Add `rollout_slug` (optional for now).
- Add `rollout_summary`.
- Keep alias support for legacy `summary` and `rawMemory` until prompt
swap completes.
- Update stage-1 schema generator in `core/src/memories/stage_one.rs` to
include the new keys.
- Update prompt templates:
- `core/templates/memories/stage_one_system.md`.
- `core/templates/memories/stage_one_input.md`.
- Replace storage model in `core/src/memories/storage.rs`:
- Introduce `rollout_summaries/` directory writer (`<thread_id>.md`
files).
- Introduce `raw_memories.md` aggregator writer from DB rows.
- Keep deterministic rebuild behavior from DB outputs so files can
always be regenerated.
- Update consolidation prompt template to reference `rollout_summaries/`
+ `raw_memories.md` inputs.

Acceptance criteria:
- Stage-1 accepts both old and new output keys during migration.
- Phase-1 artifacts are generated in new format from DB state.
- No dependence on per-thread files in `raw_memories/`.

## PR 3: Remove per-cwd memories and move to one global memory root

- Simplify layout in `core/src/memories/layout.rs`:
- Single root: `codex_home/memories`.
- Remove cwd-hash bucket helpers and normalization logic used only for
memory pathing.
- Remove scope branching from startup phase-2 dispatch path:
- No cwd/user mapping in `core/src/memories/startup/mod.rs`.
- One target root for consolidation.
- In `state/src/runtime/memory.rs`, stop enqueueing/handling cwd
consolidation scope.
- Keep one logical consolidation scope/job key (global/user) to avoid a
risky schema rewrite in same PR.
- Add one-time migration helper (core side) to preserve current shared
memory output:
- If `~/.codex/memories/user/memory` exists and new root is empty,
move/copy contents into `~/.codex/memories`.
- Leave old hashed cwd buckets untouched for now (safe/no-destructive
migration).

Acceptance criteria:
- New runs only read/write `~/.codex/memories`.
- No new cwd-scoped consolidation jobs are enqueued.
- Existing user-shared memory content is preserved.

## PR 4: Phase-2 global lock simplification and cleanup

- Replace multi-scope dispatch with a single global consolidation claim
path:
- Either reuse jobs table with one fixed key, or add a tiny dedicated
lock helper; keep 1h lease.
- Ensure at most one consolidation agent can run at once.
- Keep heartbeat + stale lock recovery semantics in
`core/src/memories/startup/watch.rs`.
- Remove dead scope code and legacy constants no longer used.
- Update tests:
- One-agent-at-a-time behavior.
- Lock expiry allows takeover after stale lease.

Acceptance criteria:
- Exactly one phase-2 consolidation agent can be active cluster-wide
(per local DB).
- Stale lock recovers automatically.

## PR 5: Final cleanup and docs

- Remove legacy artifacts and references:
- `raw_memories/` and `memory_summary.md` assumptions from
prompts/comments/tests.
- Scope constants for cwd memory pathing in core/state if fully unused.
- Update docs under `docs/` for memory workflow and directory layout.
- Add a brief operator note for rollout: compatibility window for old
stage-1 JSON keys and when to remove aliases.

Acceptance criteria:
- Code and docs reflect only the simplified global workflow.
- No stale references to per-cwd memory buckets.

## Notes on sequencing

- PR 1 is safest first because it improves correctness without changing
external artifact layout.
- PR 2 keeps parser compatibility so prompt deployment can happen
independently.
- PR 3 and PR 4 split filesystem/scope simplification from locking
simplification to reduce blast radius.
- PR 5 is intentionally cleanup-only.
This commit is contained in:
jif-oai
2026-02-10 21:29:06 +00:00
committed by GitHub
parent a6e9469fa4
commit 07da740c8a
5 changed files with 322 additions and 74 deletions

View File

@@ -1,8 +1,10 @@
use super::*;
use crate::Stage1Output;
use crate::model::Stage1OutputRow;
use crate::model::ThreadRow;
use chrono::Duration;
use sqlx::Executor;
use sqlx::QueryBuilder;
use sqlx::Sqlite;
use std::collections::HashSet;
use std::path::Path;
@@ -38,47 +40,87 @@ impl StateRuntime {
pub async fn claim_stage1_jobs_for_startup(
&self,
current_thread_id: ThreadId,
scan_limit: usize,
max_claimed: usize,
max_age_days: i64,
allowed_sources: &[String],
lease_seconds: i64,
params: Stage1StartupClaimParams<'_>,
) -> anyhow::Result<Vec<Stage1JobClaim>> {
let Stage1StartupClaimParams {
scan_limit,
max_claimed,
max_age_days,
min_rollout_idle_hours,
allowed_sources,
lease_seconds,
} = params;
if scan_limit == 0 || max_claimed == 0 {
return Ok(Vec::new());
}
let page = self
.list_threads(
scan_limit,
None,
SortKey::UpdatedAt,
allowed_sources,
None,
false,
)
.await?;
let worker_id = current_thread_id;
let current_thread_id = worker_id.to_string();
let max_age_cutoff = (Utc::now() - Duration::days(max_age_days.max(0))).timestamp();
let idle_cutoff = (Utc::now() - Duration::hours(min_rollout_idle_hours.max(0))).timestamp();
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
SELECT
id,
rollout_path,
created_at,
updated_at,
source,
model_provider,
cwd,
cli_version,
title,
sandbox_policy,
approval_mode,
tokens_used,
first_user_message,
archived_at,
git_sha,
git_branch,
git_origin_url
FROM threads
"#,
);
push_thread_filters(
&mut builder,
false,
allowed_sources,
None,
None,
SortKey::UpdatedAt,
);
builder
.push(" AND id != ")
.push_bind(current_thread_id.as_str());
builder
.push(" AND updated_at >= ")
.push_bind(max_age_cutoff);
builder.push(" AND updated_at <= ").push_bind(idle_cutoff);
push_thread_order_and_limit(&mut builder, SortKey::UpdatedAt, scan_limit);
let items = builder
.build()
.fetch_all(self.pool.as_ref())
.await?
.into_iter()
.map(|row| ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from))
.collect::<Result<Vec<_>, _>>()?;
let cutoff = Utc::now() - Duration::days(max_age_days.max(0));
let mut claimed = Vec::new();
for item in page.items {
for item in items {
if claimed.len() >= max_claimed {
break;
}
if item.id == current_thread_id {
continue;
}
if item.updated_at < cutoff {
continue;
}
if let Stage1JobClaimOutcome::Claimed { ownership_token } = self
.try_claim_stage1_job(
item.id,
current_thread_id,
worker_id,
item.updated_at.timestamp(),
lease_seconds,
max_claimed,
)
.await?
{
@@ -202,9 +244,11 @@ LIMIT ?
worker_id: ThreadId,
source_updated_at: i64,
lease_seconds: i64,
max_running_jobs: usize,
) -> anyhow::Result<Stage1JobClaimOutcome> {
let now = Utc::now().timestamp();
let lease_until = now.saturating_add(lease_seconds.max(0));
let max_running_jobs = max_running_jobs as i64;
let ownership_token = Uuid::new_v4().to_string();
let thread_id = thread_id.to_string();
let worker_id = worker_id.to_string();
@@ -241,7 +285,53 @@ WHERE kind = ? AND job_key = ?
.fetch_optional(&mut *tx)
.await?;
let Some(existing_job) = existing_job else {
let should_insert = if let Some(existing_job) = existing_job {
let status: String = existing_job.try_get("status")?;
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
let retry_remaining: i64 = existing_job.try_get("retry_remaining")?;
if retry_remaining <= 0 {
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRetryExhausted);
}
if retry_at.is_some_and(|retry_at| retry_at > now) {
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRetryBackoff);
}
if status == "running"
&& existing_lease_until.is_some_and(|lease_until| lease_until > now)
{
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRunning);
}
false
} else {
true
};
let fresh_running_jobs = sqlx::query(
r#"
SELECT COUNT(*) AS count
FROM jobs
WHERE kind = ?
AND status = 'running'
AND lease_until IS NOT NULL
AND lease_until > ?
"#,
)
.bind(JOB_KIND_MEMORY_STAGE1)
.bind(now)
.fetch_one(&mut *tx)
.await?
.try_get::<i64, _>("count")?;
if fresh_running_jobs >= max_running_jobs {
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRunning);
}
if should_insert {
sqlx::query(
r#"
INSERT INTO jobs (
@@ -273,25 +363,6 @@ INSERT INTO jobs (
.await?;
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::Claimed { ownership_token });
};
let status: String = existing_job.try_get("status")?;
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
let retry_remaining: i64 = existing_job.try_get("retry_remaining")?;
if retry_remaining <= 0 {
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRetryExhausted);
}
if retry_at.is_some_and(|retry_at| retry_at > now) {
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRetryBackoff);
}
if status == "running" && existing_lease_until.is_some_and(|lease_until| lease_until > now)
{
tx.commit().await?;
return Ok(Stage1JobClaimOutcome::SkippedRunning);
}
let rows_affected = sqlx::query(