Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
de11fbb068 add tests 2026-02-25 16:36:55 +00:00
jif-oai
fa7c298f68 Merge remote-tracking branch 'origin/main' into jif/use-last-used 2026-02-25 16:26:59 +00:00
jif-oai
a0e7beaa1f feat: used memory frequency usage 2026-02-25 15:06:04 +00:00
13 changed files with 962 additions and 17 deletions

View File

@@ -630,6 +630,11 @@
"minimum": 0.0,
"type": "integer"
},
"max_unused_days": {
"description": "Maximum age of a phase-1 memory's last use before phase 2 stops selecting it.",
"format": "int64",
"type": "integer"
},
"min_rollout_idle_hours": {
"description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.",
"format": "int64",

View File

@@ -2466,6 +2466,7 @@ persistence = "none"
let memories = r#"
[memories]
max_raw_memories_for_global = 512
max_unused_days = 21
max_rollout_age_days = 42
max_rollouts_per_startup = 9
min_rollout_idle_hours = 24
@@ -2477,6 +2478,7 @@ phase_2_model = "gpt-5"
assert_eq!(
Some(MemoriesToml {
max_raw_memories_for_global: Some(512),
max_unused_days: Some(21),
max_rollout_age_days: Some(42),
max_rollouts_per_startup: Some(9),
min_rollout_idle_hours: Some(24),
@@ -2496,6 +2498,7 @@ phase_2_model = "gpt-5"
config.memories,
MemoriesConfig {
max_raw_memories_for_global: 512,
max_unused_days: 21,
max_rollout_age_days: 42,
max_rollouts_per_startup: 9,
min_rollout_idle_hours: 24,

View File

@@ -27,6 +27,7 @@ pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "kebab-case")]
@@ -363,6 +364,8 @@ pub struct FeedbackConfigToml {
pub struct MemoriesToml {
/// Maximum number of recent raw memories retained for global consolidation.
pub max_raw_memories_for_global: Option<usize>,
/// Maximum age of a phase-1 memory's last use before phase 2 stops selecting it.
pub max_unused_days: Option<i64>,
/// Maximum age of the threads used for memories.
pub max_rollout_age_days: Option<i64>,
/// Maximum number of rollout candidates processed per pass.
@@ -379,6 +382,7 @@ pub struct MemoriesToml {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemoriesConfig {
pub max_raw_memories_for_global: usize,
pub max_unused_days: i64,
pub max_rollout_age_days: i64,
pub max_rollouts_per_startup: usize,
pub min_rollout_idle_hours: i64,
@@ -390,6 +394,7 @@ impl Default for MemoriesConfig {
fn default() -> Self {
Self {
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
max_unused_days: DEFAULT_MEMORIES_MAX_UNUSED_DAYS,
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
@@ -407,6 +412,10 @@ impl From<MemoriesToml> for MemoriesConfig {
.max_raw_memories_for_global
.unwrap_or(defaults.max_raw_memories_for_global)
.min(4096),
max_unused_days: toml
.max_unused_days
.unwrap_or(defaults.max_unused_days)
.clamp(0, 365),
max_rollout_age_days: toml
.max_rollout_age_days
.unwrap_or(defaults.max_rollout_age_days)

View File

@@ -59,10 +59,15 @@ Phase 2 consolidates the latest stage-1 outputs into the filesystem memory artif
What it does:
- claims a single global phase-2 job (so only one consolidation runs at a time)
- loads a bounded set of the most recent stage-1 outputs from the state DB (the per-rollout memories produced by Phase 1, used as the consolidation input set)
- loads a bounded set of stage-1 outputs from the state DB using phase-2 selection rules:
- ignore memories that have not been used within the configured recency window (`max_unused_days`)
- for memories never used before, fall back to `generated_at` so fresh memories can still be selected
- rank eligible memories by `usage_count` first, then by the most recent `last_usage` / `generated_at`
- stores that selected set back in SQLite via `selected_for_phase2`, clearing the flag for all non-selected rows
- logs the diff against the prior `selected_for_phase2` set so added/removed rollout summaries are visible between runs
- computes a completion watermark from the claimed watermark + newest input timestamps
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
- `raw_memories.md` (merged raw memories in phase-2 selection order)
- `rollout_summaries/` (one summary file per retained rollout)
- prunes stale rollout summaries that are no longer retained
- if there are no inputs, marks the job successful and exits

View File

@@ -8,6 +8,7 @@ use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::rollout_summary_file_stem;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_config::Constrained;
use codex_protocol::ThreadId;
@@ -19,6 +20,7 @@ use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::StateRuntime;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
@@ -73,14 +75,22 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
};
// 3. Query the memories
let raw_memories = match db.list_stage1_outputs_for_global(max_raw_memories).await {
Ok(memories) => memories,
let phase2_selection = match db
.select_stage1_outputs_for_phase2(max_raw_memories, config.memories.max_unused_days)
.await
{
Ok(selection) => selection,
Err(err) => {
tracing::error!("failed to list stage1 outputs from global: {}", err);
tracing::error!("failed to select stage1 outputs for phase 2: {err}");
job::failed(session, db, &claim, "failed_load_stage1_outputs").await;
return;
}
};
log_selection_diff(
&phase2_selection.previously_selected,
&phase2_selection.selected,
);
let raw_memories = phase2_selection.selected;
let new_watermark = get_watermark(claim.watermark, &raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
@@ -400,6 +410,35 @@ pub(super) fn get_watermark(
.max(claimed_watermark) // todo double check the claimed here.
}
fn log_selection_diff(
previous: &[codex_state::Stage1Output],
current: &[codex_state::Stage1Output],
) {
let previous_rollouts = previous
.iter()
.map(|memory| format!("{}.md", rollout_summary_file_stem(memory)))
.collect::<BTreeSet<_>>();
let current_rollouts = current
.iter()
.map(|memory| format!("{}.md", rollout_summary_file_stem(memory)))
.collect::<BTreeSet<_>>();
let added = current_rollouts
.difference(&previous_rollouts)
.cloned()
.collect::<Vec<_>>();
let removed = previous_rollouts
.difference(&current_rollouts)
.cloned()
.collect::<Vec<_>>();
tracing::info!(
selected = current.len(),
added = ?added,
removed = ?removed,
"updated stage-1 outputs selected for phase 2"
);
}
fn emit_metrics(session: &Arc<Session>, counters: Counters) {
let otel = session.services.otel_manager.clone();
if counters.input > 0 {

View File

@@ -72,7 +72,7 @@ async fn rebuild_raw_memories_file(
return tokio::fs::write(raw_memories_file(root), body).await;
}
body.push_str("Merged stage-1 raw memories (latest first):\n\n");
body.push_str("Merged stage-1 raw memories (phase-2 selection order):\n\n");
for memory in retained {
writeln!(body, "## Thread `{}`", memory.thread_id).map_err(raw_memories_format_error)?;
writeln!(

View File

@@ -13,7 +13,6 @@ use crate::error::Result;
use crate::function_tool::FunctionCallError;
use crate::memories::citations::get_thread_id_from_citations;
use crate::parse_turn_item;
use crate::state_db;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputBody;
@@ -58,13 +57,10 @@ pub(crate) async fn record_completed_response_item(
) {
sess.record_conversation_items(turn_context, std::slice::from_ref(item))
.await;
record_stage1_output_usage_for_completed_item(turn_context, item).await;
record_stage1_output_usage_for_completed_item(sess, item).await;
}
async fn record_stage1_output_usage_for_completed_item(
turn_context: &TurnContext,
item: &ResponseItem,
) {
async fn record_stage1_output_usage_for_completed_item(sess: &Session, item: &ResponseItem) {
let Some(raw_text) = raw_assistant_output_text_from_item(item) else {
return;
};
@@ -75,7 +71,7 @@ async fn record_stage1_output_usage_for_completed_item(
return;
}
if let Some(db) = state_db::get_state_db(turn_context.config.as_ref(), None).await {
if let Some(db) = sess.services.state_db.as_deref() {
let _ = db.record_stage1_output_usage(&thread_ids).await;
}
}

View File

@@ -1,4 +1,6 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use codex_core::features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
@@ -10,11 +12,14 @@ use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use core_test_support::responses;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::json;
@@ -253,6 +258,117 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn assistant_memory_citations_update_usage_and_reorder_phase2_selection() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let owner = test.session_configured.session_id;
let cited_thread = ThreadId::new();
let uncited_thread = ThreadId::new();
seed_stage1_output(&test, cited_thread, owner, "workspace-cited", 100).await?;
seed_stage1_output(&test, uncited_thread, owner, "workspace-uncited", 200).await?;
let initial_selection = db
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
.await?;
assert_eq!(
initial_selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![uncited_thread]
);
mark_phase2_clean(db.as_ref(), owner).await?;
let assistant_text = format!(
"Using a prior memory.\n<oai-mem-citation>\n<citation_entries>\nrollout_summaries/test.md:1-2|note=[integration]\n</citation_entries>\n<rollout_ids>\n{cited_thread}\n</rollout_ids>\n</oai-mem-citation>"
);
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", &assistant_text),
ev_completed("resp-1"),
]),
)
.await;
test.submit_turn("answer using memory").await?;
let usage_only_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
let (ownership_token, input_watermark) = match usage_only_claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("expected usage-only phase2 claim, got {other:?}"),
};
assert!(
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await?,
"usage-only citation should re-dirty phase 2"
);
let updated_selection = db
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
.await?;
assert_eq!(
updated_selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![cited_thread]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn assistant_messages_without_memory_citations_do_not_redirty_phase2() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let owner = test.session_configured.session_id;
let thread_id = ThreadId::new();
seed_stage1_output(&test, thread_id, owner, "workspace-plain", 100).await?;
mark_phase2_clean(db.as_ref(), owner).await?;
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "No memory references here."),
ev_completed("resp-1"),
]),
)
.await;
test.submit_turn("answer without memory").await?;
let phase2_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
assert!(
matches!(
phase2_claim,
codex_state::Phase2JobClaimOutcome::SkippedNotDirty
),
"non-citation assistant output should not enqueue phase 2"
);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn tool_call_logs_include_thread_id() -> Result<()> {
let server = start_mock_server().await;
@@ -326,3 +442,80 @@ async fn tool_call_logs_include_thread_id() -> Result<()> {
Ok(())
}
async fn seed_stage1_output(
test: &TestCodex,
thread_id: ThreadId,
owner: ThreadId,
workspace_name: &str,
source_updated_at: i64,
) -> Result<()> {
let db = test.codex.state_db().expect("state db enabled");
let metadata = seeded_thread_metadata(
test,
thread_id,
workspace_name,
DateTime::<Utc>::from_timestamp(source_updated_at, 0).expect("timestamp"),
);
db.upsert_thread(&metadata).await?;
let claim = db
.try_claim_stage1_job(thread_id, owner, source_updated_at, 3600, 64)
.await?;
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("expected stage1 claim, got {other:?}"),
};
assert!(
db.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
source_updated_at,
"raw memory",
"rollout summary",
None,
)
.await?,
"stage1 success should persist output"
);
Ok(())
}
async fn mark_phase2_clean(db: &codex_state::StateRuntime, owner: ThreadId) -> Result<()> {
let claim = db.try_claim_global_phase2_job(owner, 3600).await?;
let (ownership_token, input_watermark) = match claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("expected phase2 claim, got {other:?}"),
};
assert!(
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
.await?,
"phase2 success should clear dirty state"
);
Ok(())
}
fn seeded_thread_metadata(
test: &TestCodex,
thread_id: ThreadId,
workspace_name: &str,
timestamp: DateTime<Utc>,
) -> codex_state::ThreadMetadata {
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
test.config
.codex_home
.join(format!("sessions/seed-{thread_id}.jsonl")),
timestamp,
SessionSource::default(),
);
builder.updated_at = Some(timestamp);
builder.cwd = test.config.codex_home.join(workspace_name);
builder.cli_version = Some("test".to_string());
builder.build(&test.config.model_provider_id)
}

View File

@@ -14,6 +14,7 @@ mod runtime;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
pub use model::Phase2InputSelection;
pub use model::Phase2JobClaimOutcome;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;

View File

@@ -20,6 +20,13 @@ pub struct Stage1Output {
pub generated_at: DateTime<Utc>,
}
/// Phase-2 memory selection result for global consolidation.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Phase2InputSelection {
pub selected: Vec<Stage1Output>,
pub previously_selected: Vec<Stage1Output>,
}
#[derive(Debug)]
pub(crate) struct Stage1OutputRow {
thread_id: String,

View File

@@ -16,6 +16,7 @@ pub use backfill_state::BackfillStatus;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;
pub use memories::Phase2InputSelection;
pub use memories::Phase2JobClaimOutcome;
pub use memories::Stage1JobClaim;
pub use memories::Stage1JobClaimOutcome;

View File

@@ -3124,6 +3124,447 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn select_stage1_outputs_for_phase2_applies_recency_ranking_and_selection_flags() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
let now = Utc::now().timestamp();
let recent_tiebreak_loser =
seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
let recent_tiebreak_winner =
seed_stage1_output(&runtime, &codex_home, owner, "workspace-b", 101).await;
let never_used_recent =
seed_stage1_output(&runtime, &codex_home, owner, "workspace-c", 102).await;
let stale_used = seed_stage1_output(&runtime, &codex_home, owner, "workspace-d", 103).await;
let stale_never_used =
seed_stage1_output(&runtime, &codex_home, owner, "workspace-e", 104).await;
set_phase2_selection_metadata(
&runtime,
recent_tiebreak_loser,
Some(5),
Some((Utc::now() - Duration::days(10)).timestamp()),
now - 20,
)
.await;
set_phase2_selection_metadata(
&runtime,
recent_tiebreak_winner,
Some(5),
Some((Utc::now() - Duration::days(5)).timestamp()),
now - 10,
)
.await;
set_phase2_selection_metadata(
&runtime,
never_used_recent,
Some(0),
None,
(Utc::now() - Duration::days(2)).timestamp(),
)
.await;
set_phase2_selection_metadata(
&runtime,
stale_used,
Some(9),
Some((Utc::now() - Duration::days(40)).timestamp()),
(Utc::now() - Duration::days(40)).timestamp(),
)
.await;
set_phase2_selection_metadata(
&runtime,
stale_never_used,
Some(0),
None,
(Utc::now() - Duration::days(40)).timestamp(),
)
.await;
let selection = runtime
.select_stage1_outputs_for_phase2(3, 30)
.await
.expect("select stage1 outputs for phase2");
assert_eq!(selection.previously_selected, Vec::new());
assert_eq!(
selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![
recent_tiebreak_winner,
recent_tiebreak_loser,
never_used_recent,
]
);
for (thread_id, expected_flag) in [
(recent_tiebreak_winner, 1_i64),
(recent_tiebreak_loser, 1_i64),
(never_used_recent, 1_i64),
(stale_used, 0_i64),
(stale_never_used, 0_i64),
] {
let row =
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("load selected_for_phase2 row");
assert_eq!(
row.try_get::<i64, _>("selected_for_phase2")
.expect("selected_for_phase2"),
expected_flag
);
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn select_stage1_outputs_for_phase2_returns_previous_selection_before_replacing_it() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
let memory_a = seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
let memory_b = seed_stage1_output(&runtime, &codex_home, owner, "workspace-b", 101).await;
let memory_c = seed_stage1_output(&runtime, &codex_home, owner, "workspace-c", 102).await;
set_phase2_selection_metadata(
&runtime,
memory_a,
Some(3),
Some((Utc::now() - Duration::days(3)).timestamp()),
(Utc::now() - Duration::days(3)).timestamp(),
)
.await;
set_phase2_selection_metadata(
&runtime,
memory_b,
Some(2),
Some((Utc::now() - Duration::days(2)).timestamp()),
(Utc::now() - Duration::days(2)).timestamp(),
)
.await;
set_phase2_selection_metadata(
&runtime,
memory_c,
Some(1),
Some((Utc::now() - Duration::days(1)).timestamp()),
(Utc::now() - Duration::days(1)).timestamp(),
)
.await;
let first = runtime
.select_stage1_outputs_for_phase2(2, 30)
.await
.expect("first phase2 selection");
assert_eq!(first.previously_selected, Vec::new());
assert_eq!(
first
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![memory_a, memory_b]
);
set_phase2_selection_metadata(
&runtime,
memory_c,
Some(4),
Some(Utc::now().timestamp()),
Utc::now().timestamp(),
)
.await;
let second = runtime
.select_stage1_outputs_for_phase2(2, 30)
.await
.expect("second phase2 selection");
assert_eq!(second.previously_selected, first.selected);
assert_eq!(
second
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![memory_c, memory_a]
);
for (thread_id, expected_flag) in [(memory_a, 1_i64), (memory_b, 0_i64), (memory_c, 1_i64)]
{
let row =
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("load selected_for_phase2 row");
assert_eq!(
row.try_get::<i64, _>("selected_for_phase2")
.expect("selected_for_phase2"),
expected_flag
);
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn select_stage1_outputs_for_phase2_batches_flag_updates_for_large_selections() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let generated_at = Utc::now().timestamp();
let mut thread_ids = Vec::new();
for idx in 0..1_000 {
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join(format!("workspace-{idx}")),
))
.await
.expect("upsert thread");
sqlx::query(
r#"
INSERT INTO stage1_outputs (
thread_id,
source_updated_at,
raw_memory,
rollout_summary,
generated_at,
usage_count
) VALUES (?, ?, ?, ?, ?, ?)
"#,
)
.bind(thread_id.to_string())
.bind(generated_at + i64::from(idx))
.bind("raw memory")
.bind("rollout summary")
.bind(generated_at)
.bind(1_i64)
.execute(runtime.pool.as_ref())
.await
.expect("insert stage1 output");
thread_ids.push(thread_id);
}
let selection = runtime
.select_stage1_outputs_for_phase2(1_000, 30)
.await
.expect("select large phase2 set");
assert_eq!(selection.selected.len(), 1_000);
let selected_count = sqlx::query(
"SELECT COUNT(*) AS count FROM stage1_outputs WHERE selected_for_phase2 = 1",
)
.fetch_one(runtime.pool.as_ref())
.await
.expect("count selected rows")
.try_get::<i64, _>("count")
.expect("selected row count");
assert_eq!(selected_count, 1_000);
let sample_row =
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
.bind(thread_ids[0].to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("load sample selected row");
assert_eq!(
sample_row
.try_get::<i64, _>("selected_for_phase2")
.expect("selected_for_phase2"),
1
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn select_stage1_outputs_for_phase2_keeps_legacy_rows_with_missing_usage_metadata() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let old_generated_at = (Utc::now() - Duration::days(40)).timestamp();
let legacy_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("legacy thread id");
let never_used_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("never-used thread id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
legacy_thread_id,
codex_home.join("workspace-legacy"),
))
.await
.expect("upsert legacy thread");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
never_used_thread_id,
codex_home.join("workspace-never-used"),
))
.await
.expect("upsert never-used thread");
sqlx::query(
r#"
INSERT INTO stage1_outputs (
thread_id,
source_updated_at,
raw_memory,
rollout_summary,
generated_at
) VALUES (?, ?, ?, ?, ?)
"#,
)
.bind(legacy_thread_id.to_string())
.bind(old_generated_at)
.bind("legacy raw memory")
.bind("legacy summary")
.bind(old_generated_at)
.execute(runtime.pool.as_ref())
.await
.expect("insert legacy stage1 output");
sqlx::query(
r#"
INSERT INTO stage1_outputs (
thread_id,
source_updated_at,
raw_memory,
rollout_summary,
generated_at,
usage_count
) VALUES (?, ?, ?, ?, ?, ?)
"#,
)
.bind(never_used_thread_id.to_string())
.bind(old_generated_at)
.bind("never-used raw memory")
.bind("never-used summary")
.bind(old_generated_at)
.bind(0_i64)
.execute(runtime.pool.as_ref())
.await
.expect("insert never-used stage1 output");
let selection = runtime
.select_stage1_outputs_for_phase2(10, 30)
.await
.expect("select phase2 outputs");
assert_eq!(
selection
.selected
.iter()
.map(|memory| memory.thread_id)
.collect::<Vec<_>>(),
vec![legacy_thread_id]
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn mark_stage1_job_succeeded_clears_selected_for_phase2_on_upsert() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
let thread_id = seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
let first_selection = runtime
.select_stage1_outputs_for_phase2(1, 30)
.await
.expect("first phase2 selection");
assert_eq!(first_selection.selected.len(), 1);
let stale_last_usage = (Utc::now() - Duration::days(40)).timestamp();
sqlx::query(
"UPDATE stage1_outputs SET usage_count = ?, last_usage = ? WHERE thread_id = ?",
)
.bind(7_i64)
.bind(stale_last_usage)
.bind(thread_id.to_string())
.execute(runtime.pool.as_ref())
.await
.expect("seed prior usage metadata");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, 101, 3600, 64)
.await
.expect("claim updated stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
101,
"raw memory updated",
"rollout summary updated",
Some("updated-rollout"),
)
.await
.expect("mark updated stage1 succeeded"),
"updated stage1 success should persist output"
);
let row = sqlx::query(
"SELECT selected_for_phase2, source_updated_at, usage_count, last_usage FROM stage1_outputs WHERE thread_id = ?",
)
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("load updated stage1 row");
assert_eq!(
row.try_get::<i64, _>("selected_for_phase2")
.expect("selected_for_phase2"),
0
);
assert_eq!(
row.try_get::<i64, _>("source_updated_at")
.expect("source_updated_at"),
101
);
assert_eq!(
row.try_get::<i64, _>("usage_count").expect("usage_count"),
7
);
assert_eq!(
row.try_get::<Option<i64>, _>("last_usage")
.expect("last_usage"),
Some(stale_last_usage)
);
let second_selection = runtime
.select_stage1_outputs_for_phase2(1, 30)
.await
.expect("second phase2 selection after refresh");
assert_eq!(second_selection.selected.len(), 1);
assert_eq!(second_selection.selected[0].thread_id, thread_id);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn record_stage1_output_usage_updates_usage_metadata() {
let codex_home = unique_temp_dir();
@@ -3182,6 +3623,24 @@ VALUES (?, ?, ?, ?, ?)
.await
.expect("mark stage1 succeeded b")
);
let phase2_claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 after stage1 success");
let (phase2_token, phase2_input_watermark) = match phase2_claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("unexpected phase2 claim after stage1 success: {other:?}"),
};
assert!(
runtime
.mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark)
.await
.expect("mark phase2 success after stage1"),
"phase2 success should clear dirty state before usage-only update"
);
let updated_rows = runtime
.record_stage1_output_usage(&[thread_a, thread_a, thread_b, missing])
@@ -3219,6 +3678,17 @@ VALUES (?, ?, ?, ?, ?)
let last_usage_b = row_b.try_get::<i64, _>("last_usage").expect("last_usage b");
assert_eq!(last_usage_a, last_usage_b);
assert!(last_usage_a > 0);
let usage_only_phase2_claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 after usage-only update");
assert!(
matches!(
usage_only_phase2_claim,
Phase2JobClaimOutcome::Claimed { .. }
),
"usage updates should enqueue phase2 when selection inputs change"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -3872,6 +4342,72 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
async fn seed_stage1_output(
runtime: &StateRuntime,
codex_home: &Path,
owner: ThreadId,
cwd_name: &str,
source_updated_at: i64,
) -> ThreadId {
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
runtime
.upsert_thread(&test_thread_metadata(
codex_home,
thread_id,
codex_home.join(cwd_name),
))
.await
.expect("upsert thread");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, source_updated_at, 3600, 64)
.await
.expect("claim stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
source_updated_at,
"raw memory",
"rollout summary",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should persist output"
);
thread_id
}
async fn set_phase2_selection_metadata(
runtime: &StateRuntime,
thread_id: ThreadId,
usage_count: Option<i64>,
last_usage: Option<i64>,
generated_at: i64,
) {
sqlx::query(
r#"
UPDATE stage1_outputs
SET usage_count = ?, last_usage = ?, generated_at = ?
WHERE thread_id = ?
"#,
)
.bind(usage_count)
.bind(last_usage)
.bind(generated_at)
.bind(thread_id.to_string())
.execute(runtime.pool.as_ref())
.await
.expect("update phase2 selection metadata");
}
fn test_thread_metadata(
codex_home: &Path,
thread_id: ThreadId,

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::model::Phase2InputSelection;
use crate::model::Phase2JobClaimOutcome;
use crate::model::Stage1JobClaim;
use crate::model::Stage1JobClaimOutcome;
@@ -52,7 +53,9 @@ WHERE kind = ? OR kind = ?
/// Record usage for cited stage-1 outputs.
///
/// Each thread id increments `usage_count` by one and sets `last_usage` to
/// the current Unix timestamp. Missing rows are ignored.
/// the current Unix timestamp. Missing rows are ignored. When at least one
/// row changes, this also re-enqueues global phase 2 so usage-driven
/// selection changes refresh the memory artifacts.
pub async fn record_stage1_output_usage(
&self,
thread_ids: &[ThreadId],
@@ -82,6 +85,10 @@ WHERE thread_id = ?
.rows_affected() as usize;
}
if updated_rows > 0 {
enqueue_global_consolidation_with_executor(&mut *tx, now).await?;
}
tx.commit().await?;
Ok(updated_rows)
}
@@ -214,6 +221,143 @@ LEFT JOIN jobs
Ok(claimed)
}
/// Selects the stage-1 outputs used as phase-2 consolidation input.
///
/// Query behavior:
/// - snapshots the prior `selected_for_phase2 = 1` set for diff logging
/// - filters current candidates to rows where:
/// - at least one of `raw_memory` / `rollout_summary` is non-empty
/// - either:
/// - usage metadata is still missing from the pre-0016 migration state,
/// or
/// - the fresher of `last_usage` / `generated_at` is within
/// `max_unused_days`
/// - orders candidates by:
/// - `COALESCE(usage_count, 0) DESC`
/// - the fresher of `last_usage` / `generated_at` DESC
/// - `source_updated_at DESC, thread_id DESC`
/// - applies `LIMIT max_selected`
/// - clears `selected_for_phase2` on all rows, then marks only the selected
/// rows as `1`
pub async fn select_stage1_outputs_for_phase2(
&self,
max_selected: usize,
max_unused_days: i64,
) -> anyhow::Result<Phase2InputSelection> {
let recency_cutoff = (Utc::now() - Duration::days(max_unused_days.max(0))).timestamp();
let mut tx = self.pool.begin_with("BEGIN IMMEDIATE").await?;
let previously_selected = sqlx::query(
r#"
SELECT
so.thread_id,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.generated_at,
COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE so.selected_for_phase2 = 1
ORDER BY
COALESCE(so.usage_count, 0) DESC,
COALESCE(so.last_usage, so.generated_at) DESC,
so.source_updated_at DESC,
so.thread_id DESC
"#,
)
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
.collect::<Result<Vec<_>, _>>()?;
let selected = if max_selected == 0 {
Vec::new()
} else {
sqlx::query(
r#"
SELECT
so.thread_id,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.generated_at,
COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE
(length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
AND (
(so.usage_count IS NULL AND so.last_usage IS NULL)
OR CASE
WHEN so.last_usage IS NULL THEN so.generated_at
WHEN so.generated_at > so.last_usage THEN so.generated_at
ELSE so.last_usage
END >= ?
)
ORDER BY
COALESCE(so.usage_count, 0) DESC,
CASE
WHEN so.last_usage IS NULL THEN so.generated_at
WHEN so.generated_at > so.last_usage THEN so.generated_at
ELSE so.last_usage
END DESC,
so.source_updated_at DESC,
so.thread_id DESC
LIMIT ?
"#,
)
.bind(recency_cutoff)
.bind(max_selected as i64)
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
.collect::<Result<Vec<_>, _>>()?
};
sqlx::query(
r#"
UPDATE stage1_outputs
SET selected_for_phase2 = 0
WHERE selected_for_phase2 != 0
"#,
)
.execute(&mut *tx)
.await?;
if !selected.is_empty() {
for chunk in selected.chunks(900) {
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
UPDATE stage1_outputs
SET selected_for_phase2 = 1
WHERE thread_id IN (
"#,
);
let mut separated = builder.separated(", ");
for memory in chunk {
separated.push_bind(memory.thread_id.to_string());
}
separated.push_unseparated(")");
builder.build().execute(&mut *tx).await?;
}
}
tx.commit().await?;
Ok(Phase2InputSelection {
selected,
previously_selected,
})
}
/// Lists the most recent non-empty stage-1 outputs for global consolidation.
///
/// Query behavior:
@@ -453,6 +597,8 @@ WHERE kind = ? AND job_key = ?
/// - sets `status='done'` and `last_success_watermark = input_watermark`
/// - upserts `stage1_outputs` for the thread, replacing existing output only
/// when `source_updated_at` is newer or equal
/// - inserts new rows with `usage_count = 0` so "never used" is distinct
/// from legacy rows that still have missing usage metadata
/// - persists optional `rollout_slug` for rollout summary artifact naming
/// - enqueues/advances the global phase-2 job watermark using
/// `source_updated_at`
@@ -503,14 +649,17 @@ INSERT INTO stage1_outputs (
raw_memory,
rollout_summary,
rollout_slug,
generated_at
) VALUES (?, ?, ?, ?, ?, ?)
generated_at,
usage_count
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
source_updated_at = excluded.source_updated_at,
raw_memory = excluded.raw_memory,
rollout_summary = excluded.rollout_summary,
rollout_slug = excluded.rollout_slug,
generated_at = excluded.generated_at
generated_at = excluded.generated_at,
usage_count = COALESCE(stage1_outputs.usage_count, excluded.usage_count),
selected_for_phase2 = 0
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
"#,
)
@@ -520,6 +669,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
.bind(rollout_summary)
.bind(rollout_slug)
.bind(now)
.bind(0_i64)
.execute(&mut *tx)
.await?;