mirror of
https://github.com/openai/codex.git
synced 2026-04-23 07:51:51 +03:00
Compare commits
3 Commits
cconger/sy
...
jif/use-la
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de11fbb068 | ||
|
|
fa7c298f68 | ||
|
|
a0e7beaa1f |
@@ -630,6 +630,11 @@
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"max_unused_days": {
|
||||
"description": "Maximum age of a phase-1 memory's last use before phase 2 stops selecting it.",
|
||||
"format": "int64",
|
||||
"type": "integer"
|
||||
},
|
||||
"min_rollout_idle_hours": {
|
||||
"description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.",
|
||||
"format": "int64",
|
||||
|
||||
@@ -2466,6 +2466,7 @@ persistence = "none"
|
||||
let memories = r#"
|
||||
[memories]
|
||||
max_raw_memories_for_global = 512
|
||||
max_unused_days = 21
|
||||
max_rollout_age_days = 42
|
||||
max_rollouts_per_startup = 9
|
||||
min_rollout_idle_hours = 24
|
||||
@@ -2477,6 +2478,7 @@ phase_2_model = "gpt-5"
|
||||
assert_eq!(
|
||||
Some(MemoriesToml {
|
||||
max_raw_memories_for_global: Some(512),
|
||||
max_unused_days: Some(21),
|
||||
max_rollout_age_days: Some(42),
|
||||
max_rollouts_per_startup: Some(9),
|
||||
min_rollout_idle_hours: Some(24),
|
||||
@@ -2496,6 +2498,7 @@ phase_2_model = "gpt-5"
|
||||
config.memories,
|
||||
MemoriesConfig {
|
||||
max_raw_memories_for_global: 512,
|
||||
max_unused_days: 21,
|
||||
max_rollout_age_days: 42,
|
||||
max_rollouts_per_startup: 9,
|
||||
min_rollout_idle_hours: 24,
|
||||
|
||||
@@ -27,6 +27,7 @@ pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
|
||||
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
@@ -363,6 +364,8 @@ pub struct FeedbackConfigToml {
|
||||
pub struct MemoriesToml {
|
||||
/// Maximum number of recent raw memories retained for global consolidation.
|
||||
pub max_raw_memories_for_global: Option<usize>,
|
||||
/// Maximum age of a phase-1 memory's last use before phase 2 stops selecting it.
|
||||
pub max_unused_days: Option<i64>,
|
||||
/// Maximum age of the threads used for memories.
|
||||
pub max_rollout_age_days: Option<i64>,
|
||||
/// Maximum number of rollout candidates processed per pass.
|
||||
@@ -379,6 +382,7 @@ pub struct MemoriesToml {
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MemoriesConfig {
|
||||
pub max_raw_memories_for_global: usize,
|
||||
pub max_unused_days: i64,
|
||||
pub max_rollout_age_days: i64,
|
||||
pub max_rollouts_per_startup: usize,
|
||||
pub min_rollout_idle_hours: i64,
|
||||
@@ -390,6 +394,7 @@ impl Default for MemoriesConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
max_unused_days: DEFAULT_MEMORIES_MAX_UNUSED_DAYS,
|
||||
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
|
||||
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
|
||||
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
|
||||
@@ -407,6 +412,10 @@ impl From<MemoriesToml> for MemoriesConfig {
|
||||
.max_raw_memories_for_global
|
||||
.unwrap_or(defaults.max_raw_memories_for_global)
|
||||
.min(4096),
|
||||
max_unused_days: toml
|
||||
.max_unused_days
|
||||
.unwrap_or(defaults.max_unused_days)
|
||||
.clamp(0, 365),
|
||||
max_rollout_age_days: toml
|
||||
.max_rollout_age_days
|
||||
.unwrap_or(defaults.max_rollout_age_days)
|
||||
|
||||
@@ -59,10 +59,15 @@ Phase 2 consolidates the latest stage-1 outputs into the filesystem memory artif
|
||||
What it does:
|
||||
|
||||
- claims a single global phase-2 job (so only one consolidation runs at a time)
|
||||
- loads a bounded set of the most recent stage-1 outputs from the state DB (the per-rollout memories produced by Phase 1, used as the consolidation input set)
|
||||
- loads a bounded set of stage-1 outputs from the state DB using phase-2 selection rules:
|
||||
- ignore memories that have not been used within the configured recency window (`max_unused_days`)
|
||||
- for memories never used before, fall back to `generated_at` so fresh memories can still be selected
|
||||
- rank eligible memories by `usage_count` first, then by the most recent `last_usage` / `generated_at`
|
||||
- stores that selected set back in SQLite via `selected_for_phase2`, clearing the flag for all non-selected rows
|
||||
- logs the diff against the prior `selected_for_phase2` set so added/removed rollout summaries are visible between runs
|
||||
- computes a completion watermark from the claimed watermark + newest input timestamps
|
||||
- syncs local memory artifacts under the memories root:
|
||||
- `raw_memories.md` (merged raw memories, latest first)
|
||||
- `raw_memories.md` (merged raw memories in phase-2 selection order)
|
||||
- `rollout_summaries/` (one summary file per retained rollout)
|
||||
- prunes stale rollout summaries that are no longer retained
|
||||
- if there are no inputs, marks the job successful and exits
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::memories::metrics;
|
||||
use crate::memories::phase_two;
|
||||
use crate::memories::prompts::build_consolidation_prompt;
|
||||
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
|
||||
use crate::memories::storage::rollout_summary_file_stem;
|
||||
use crate::memories::storage::sync_rollout_summaries_from_memories;
|
||||
use codex_config::Constrained;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -19,6 +20,7 @@ use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::watch;
|
||||
@@ -73,14 +75,22 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
|
||||
};
|
||||
|
||||
// 3. Query the memories
|
||||
let raw_memories = match db.list_stage1_outputs_for_global(max_raw_memories).await {
|
||||
Ok(memories) => memories,
|
||||
let phase2_selection = match db
|
||||
.select_stage1_outputs_for_phase2(max_raw_memories, config.memories.max_unused_days)
|
||||
.await
|
||||
{
|
||||
Ok(selection) => selection,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to list stage1 outputs from global: {}", err);
|
||||
tracing::error!("failed to select stage1 outputs for phase 2: {err}");
|
||||
job::failed(session, db, &claim, "failed_load_stage1_outputs").await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
log_selection_diff(
|
||||
&phase2_selection.previously_selected,
|
||||
&phase2_selection.selected,
|
||||
);
|
||||
let raw_memories = phase2_selection.selected;
|
||||
let new_watermark = get_watermark(claim.watermark, &raw_memories);
|
||||
|
||||
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
|
||||
@@ -400,6 +410,35 @@ pub(super) fn get_watermark(
|
||||
.max(claimed_watermark) // todo double check the claimed here.
|
||||
}
|
||||
|
||||
fn log_selection_diff(
|
||||
previous: &[codex_state::Stage1Output],
|
||||
current: &[codex_state::Stage1Output],
|
||||
) {
|
||||
let previous_rollouts = previous
|
||||
.iter()
|
||||
.map(|memory| format!("{}.md", rollout_summary_file_stem(memory)))
|
||||
.collect::<BTreeSet<_>>();
|
||||
let current_rollouts = current
|
||||
.iter()
|
||||
.map(|memory| format!("{}.md", rollout_summary_file_stem(memory)))
|
||||
.collect::<BTreeSet<_>>();
|
||||
let added = current_rollouts
|
||||
.difference(&previous_rollouts)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let removed = previous_rollouts
|
||||
.difference(¤t_rollouts)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
selected = current.len(),
|
||||
added = ?added,
|
||||
removed = ?removed,
|
||||
"updated stage-1 outputs selected for phase 2"
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_metrics(session: &Arc<Session>, counters: Counters) {
|
||||
let otel = session.services.otel_manager.clone();
|
||||
if counters.input > 0 {
|
||||
|
||||
@@ -72,7 +72,7 @@ async fn rebuild_raw_memories_file(
|
||||
return tokio::fs::write(raw_memories_file(root), body).await;
|
||||
}
|
||||
|
||||
body.push_str("Merged stage-1 raw memories (latest first):\n\n");
|
||||
body.push_str("Merged stage-1 raw memories (phase-2 selection order):\n\n");
|
||||
for memory in retained {
|
||||
writeln!(body, "## Thread `{}`", memory.thread_id).map_err(raw_memories_format_error)?;
|
||||
writeln!(
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::error::Result;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::memories::citations::get_thread_id_from_citations;
|
||||
use crate::parse_turn_item;
|
||||
use crate::state_db;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
use crate::tools::router::ToolRouter;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
@@ -58,13 +57,10 @@ pub(crate) async fn record_completed_response_item(
|
||||
) {
|
||||
sess.record_conversation_items(turn_context, std::slice::from_ref(item))
|
||||
.await;
|
||||
record_stage1_output_usage_for_completed_item(turn_context, item).await;
|
||||
record_stage1_output_usage_for_completed_item(sess, item).await;
|
||||
}
|
||||
|
||||
async fn record_stage1_output_usage_for_completed_item(
|
||||
turn_context: &TurnContext,
|
||||
item: &ResponseItem,
|
||||
) {
|
||||
async fn record_stage1_output_usage_for_completed_item(sess: &Session, item: &ResponseItem) {
|
||||
let Some(raw_text) = raw_assistant_output_text_from_item(item) else {
|
||||
return;
|
||||
};
|
||||
@@ -75,7 +71,7 @@ async fn record_stage1_output_usage_for_completed_item(
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(db) = state_db::get_state_db(turn_context.config.as_ref(), None).await {
|
||||
if let Some(db) = sess.services.state_db.as_deref() {
|
||||
let _ = db.record_stage1_output_usage(&thread_ids).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_core::features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
@@ -10,11 +12,14 @@ use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
@@ -253,6 +258,117 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn assistant_memory_citations_update_usage_and_reorder_phase2_selection() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let owner = test.session_configured.session_id;
|
||||
|
||||
let cited_thread = ThreadId::new();
|
||||
let uncited_thread = ThreadId::new();
|
||||
seed_stage1_output(&test, cited_thread, owner, "workspace-cited", 100).await?;
|
||||
seed_stage1_output(&test, uncited_thread, owner, "workspace-uncited", 200).await?;
|
||||
|
||||
let initial_selection = db
|
||||
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
initial_selection
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![uncited_thread]
|
||||
);
|
||||
|
||||
mark_phase2_clean(db.as_ref(), owner).await?;
|
||||
|
||||
let assistant_text = format!(
|
||||
"Using a prior memory.\n<oai-mem-citation>\n<citation_entries>\nrollout_summaries/test.md:1-2|note=[integration]\n</citation_entries>\n<rollout_ids>\n{cited_thread}\n</rollout_ids>\n</oai-mem-citation>"
|
||||
);
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", &assistant_text),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("answer using memory").await?;
|
||||
|
||||
let usage_only_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
|
||||
let (ownership_token, input_watermark) = match usage_only_claim {
|
||||
codex_state::Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("expected usage-only phase2 claim, got {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
|
||||
.await?,
|
||||
"usage-only citation should re-dirty phase 2"
|
||||
);
|
||||
|
||||
let updated_selection = db
|
||||
.select_stage1_outputs_for_phase2(1, test.config.memories.max_unused_days)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
updated_selection
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![cited_thread]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn assistant_messages_without_memory_citations_do_not_redirty_phase2() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let owner = test.session_configured.session_id;
|
||||
|
||||
let thread_id = ThreadId::new();
|
||||
seed_stage1_output(&test, thread_id, owner, "workspace-plain", 100).await?;
|
||||
mark_phase2_clean(db.as_ref(), owner).await?;
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "No memory references here."),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("answer without memory").await?;
|
||||
|
||||
let phase2_claim = db.try_claim_global_phase2_job(owner, 3600).await?;
|
||||
assert!(
|
||||
matches!(
|
||||
phase2_claim,
|
||||
codex_state::Phase2JobClaimOutcome::SkippedNotDirty
|
||||
),
|
||||
"non-citation assistant output should not enqueue phase 2"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn tool_call_logs_include_thread_id() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
@@ -326,3 +442,80 @@ async fn tool_call_logs_include_thread_id() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn seed_stage1_output(
|
||||
test: &TestCodex,
|
||||
thread_id: ThreadId,
|
||||
owner: ThreadId,
|
||||
workspace_name: &str,
|
||||
source_updated_at: i64,
|
||||
) -> Result<()> {
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let metadata = seeded_thread_metadata(
|
||||
test,
|
||||
thread_id,
|
||||
workspace_name,
|
||||
DateTime::<Utc>::from_timestamp(source_updated_at, 0).expect("timestamp"),
|
||||
);
|
||||
db.upsert_thread(&metadata).await?;
|
||||
|
||||
let claim = db
|
||||
.try_claim_stage1_job(thread_id, owner, source_updated_at, 3600, 64)
|
||||
.await?;
|
||||
let ownership_token = match claim {
|
||||
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("expected stage1 claim, got {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
db.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
source_updated_at,
|
||||
"raw memory",
|
||||
"rollout summary",
|
||||
None,
|
||||
)
|
||||
.await?,
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_phase2_clean(db: &codex_state::StateRuntime, owner: ThreadId) -> Result<()> {
|
||||
let claim = db.try_claim_global_phase2_job(owner, 3600).await?;
|
||||
let (ownership_token, input_watermark) = match claim {
|
||||
codex_state::Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("expected phase2 claim, got {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
db.mark_global_phase2_job_succeeded(ownership_token.as_str(), input_watermark)
|
||||
.await?,
|
||||
"phase2 success should clear dirty state"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn seeded_thread_metadata(
|
||||
test: &TestCodex,
|
||||
thread_id: ThreadId,
|
||||
workspace_name: &str,
|
||||
timestamp: DateTime<Utc>,
|
||||
) -> codex_state::ThreadMetadata {
|
||||
let mut builder = codex_state::ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
test.config
|
||||
.codex_home
|
||||
.join(format!("sessions/seed-{thread_id}.jsonl")),
|
||||
timestamp,
|
||||
SessionSource::default(),
|
||||
);
|
||||
builder.updated_at = Some(timestamp);
|
||||
builder.cwd = test.config.codex_home.join(workspace_name);
|
||||
builder.cli_version = Some("test".to_string());
|
||||
builder.build(&test.config.model_provider_id)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ mod runtime;
|
||||
pub use model::LogEntry;
|
||||
pub use model::LogQuery;
|
||||
pub use model::LogRow;
|
||||
pub use model::Phase2InputSelection;
|
||||
pub use model::Phase2JobClaimOutcome;
|
||||
/// Preferred entrypoint: owns configuration and metrics.
|
||||
pub use runtime::StateRuntime;
|
||||
|
||||
@@ -20,6 +20,13 @@ pub struct Stage1Output {
|
||||
pub generated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Phase-2 memory selection result for global consolidation.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
pub struct Phase2InputSelection {
|
||||
pub selected: Vec<Stage1Output>,
|
||||
pub previously_selected: Vec<Stage1Output>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Stage1OutputRow {
|
||||
thread_id: String,
|
||||
|
||||
@@ -16,6 +16,7 @@ pub use backfill_state::BackfillStatus;
|
||||
pub use log::LogEntry;
|
||||
pub use log::LogQuery;
|
||||
pub use log::LogRow;
|
||||
pub use memories::Phase2InputSelection;
|
||||
pub use memories::Phase2JobClaimOutcome;
|
||||
pub use memories::Stage1JobClaim;
|
||||
pub use memories::Stage1JobClaimOutcome;
|
||||
|
||||
@@ -3124,6 +3124,447 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn select_stage1_outputs_for_phase2_applies_recency_ranking_and_selection_flags() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let now = Utc::now().timestamp();
|
||||
|
||||
let recent_tiebreak_loser =
|
||||
seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
|
||||
let recent_tiebreak_winner =
|
||||
seed_stage1_output(&runtime, &codex_home, owner, "workspace-b", 101).await;
|
||||
let never_used_recent =
|
||||
seed_stage1_output(&runtime, &codex_home, owner, "workspace-c", 102).await;
|
||||
let stale_used = seed_stage1_output(&runtime, &codex_home, owner, "workspace-d", 103).await;
|
||||
let stale_never_used =
|
||||
seed_stage1_output(&runtime, &codex_home, owner, "workspace-e", 104).await;
|
||||
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
recent_tiebreak_loser,
|
||||
Some(5),
|
||||
Some((Utc::now() - Duration::days(10)).timestamp()),
|
||||
now - 20,
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
recent_tiebreak_winner,
|
||||
Some(5),
|
||||
Some((Utc::now() - Duration::days(5)).timestamp()),
|
||||
now - 10,
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
never_used_recent,
|
||||
Some(0),
|
||||
None,
|
||||
(Utc::now() - Duration::days(2)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
stale_used,
|
||||
Some(9),
|
||||
Some((Utc::now() - Duration::days(40)).timestamp()),
|
||||
(Utc::now() - Duration::days(40)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
stale_never_used,
|
||||
Some(0),
|
||||
None,
|
||||
(Utc::now() - Duration::days(40)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let selection = runtime
|
||||
.select_stage1_outputs_for_phase2(3, 30)
|
||||
.await
|
||||
.expect("select stage1 outputs for phase2");
|
||||
|
||||
assert_eq!(selection.previously_selected, Vec::new());
|
||||
assert_eq!(
|
||||
selection
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
recent_tiebreak_winner,
|
||||
recent_tiebreak_loser,
|
||||
never_used_recent,
|
||||
]
|
||||
);
|
||||
|
||||
for (thread_id, expected_flag) in [
|
||||
(recent_tiebreak_winner, 1_i64),
|
||||
(recent_tiebreak_loser, 1_i64),
|
||||
(never_used_recent, 1_i64),
|
||||
(stale_used, 0_i64),
|
||||
(stale_never_used, 0_i64),
|
||||
] {
|
||||
let row =
|
||||
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load selected_for_phase2 row");
|
||||
assert_eq!(
|
||||
row.try_get::<i64, _>("selected_for_phase2")
|
||||
.expect("selected_for_phase2"),
|
||||
expected_flag
|
||||
);
|
||||
}
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn select_stage1_outputs_for_phase2_returns_previous_selection_before_replacing_it() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
let memory_a = seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
|
||||
let memory_b = seed_stage1_output(&runtime, &codex_home, owner, "workspace-b", 101).await;
|
||||
let memory_c = seed_stage1_output(&runtime, &codex_home, owner, "workspace-c", 102).await;
|
||||
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
memory_a,
|
||||
Some(3),
|
||||
Some((Utc::now() - Duration::days(3)).timestamp()),
|
||||
(Utc::now() - Duration::days(3)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
memory_b,
|
||||
Some(2),
|
||||
Some((Utc::now() - Duration::days(2)).timestamp()),
|
||||
(Utc::now() - Duration::days(2)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
memory_c,
|
||||
Some(1),
|
||||
Some((Utc::now() - Duration::days(1)).timestamp()),
|
||||
(Utc::now() - Duration::days(1)).timestamp(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let first = runtime
|
||||
.select_stage1_outputs_for_phase2(2, 30)
|
||||
.await
|
||||
.expect("first phase2 selection");
|
||||
assert_eq!(first.previously_selected, Vec::new());
|
||||
assert_eq!(
|
||||
first
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![memory_a, memory_b]
|
||||
);
|
||||
|
||||
set_phase2_selection_metadata(
|
||||
&runtime,
|
||||
memory_c,
|
||||
Some(4),
|
||||
Some(Utc::now().timestamp()),
|
||||
Utc::now().timestamp(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let second = runtime
|
||||
.select_stage1_outputs_for_phase2(2, 30)
|
||||
.await
|
||||
.expect("second phase2 selection");
|
||||
assert_eq!(second.previously_selected, first.selected);
|
||||
assert_eq!(
|
||||
second
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![memory_c, memory_a]
|
||||
);
|
||||
|
||||
for (thread_id, expected_flag) in [(memory_a, 1_i64), (memory_b, 0_i64), (memory_c, 1_i64)]
|
||||
{
|
||||
let row =
|
||||
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load selected_for_phase2 row");
|
||||
assert_eq!(
|
||||
row.try_get::<i64, _>("selected_for_phase2")
|
||||
.expect("selected_for_phase2"),
|
||||
expected_flag
|
||||
);
|
||||
}
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn select_stage1_outputs_for_phase2_batches_flag_updates_for_large_selections() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let generated_at = Utc::now().timestamp();
|
||||
let mut thread_ids = Vec::new();
|
||||
|
||||
for idx in 0..1_000 {
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join(format!("workspace-{idx}")),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO stage1_outputs (
|
||||
thread_id,
|
||||
source_updated_at,
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
generated_at,
|
||||
usage_count
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.bind(generated_at + i64::from(idx))
|
||||
.bind("raw memory")
|
||||
.bind("rollout summary")
|
||||
.bind(generated_at)
|
||||
.bind(1_i64)
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("insert stage1 output");
|
||||
thread_ids.push(thread_id);
|
||||
}
|
||||
|
||||
let selection = runtime
|
||||
.select_stage1_outputs_for_phase2(1_000, 30)
|
||||
.await
|
||||
.expect("select large phase2 set");
|
||||
assert_eq!(selection.selected.len(), 1_000);
|
||||
|
||||
let selected_count = sqlx::query(
|
||||
"SELECT COUNT(*) AS count FROM stage1_outputs WHERE selected_for_phase2 = 1",
|
||||
)
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("count selected rows")
|
||||
.try_get::<i64, _>("count")
|
||||
.expect("selected row count");
|
||||
assert_eq!(selected_count, 1_000);
|
||||
|
||||
let sample_row =
|
||||
sqlx::query("SELECT selected_for_phase2 FROM stage1_outputs WHERE thread_id = ?")
|
||||
.bind(thread_ids[0].to_string())
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load sample selected row");
|
||||
assert_eq!(
|
||||
sample_row
|
||||
.try_get::<i64, _>("selected_for_phase2")
|
||||
.expect("selected_for_phase2"),
|
||||
1
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn select_stage1_outputs_for_phase2_keeps_legacy_rows_with_missing_usage_metadata() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let old_generated_at = (Utc::now() - Duration::days(40)).timestamp();
|
||||
|
||||
let legacy_thread_id =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("legacy thread id");
|
||||
let never_used_thread_id =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("never-used thread id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
legacy_thread_id,
|
||||
codex_home.join("workspace-legacy"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert legacy thread");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
never_used_thread_id,
|
||||
codex_home.join("workspace-never-used"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert never-used thread");
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO stage1_outputs (
|
||||
thread_id,
|
||||
source_updated_at,
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
generated_at
|
||||
) VALUES (?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(legacy_thread_id.to_string())
|
||||
.bind(old_generated_at)
|
||||
.bind("legacy raw memory")
|
||||
.bind("legacy summary")
|
||||
.bind(old_generated_at)
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("insert legacy stage1 output");
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO stage1_outputs (
|
||||
thread_id,
|
||||
source_updated_at,
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
generated_at,
|
||||
usage_count
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(never_used_thread_id.to_string())
|
||||
.bind(old_generated_at)
|
||||
.bind("never-used raw memory")
|
||||
.bind("never-used summary")
|
||||
.bind(old_generated_at)
|
||||
.bind(0_i64)
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("insert never-used stage1 output");
|
||||
|
||||
let selection = runtime
|
||||
.select_stage1_outputs_for_phase2(10, 30)
|
||||
.await
|
||||
.expect("select phase2 outputs");
|
||||
assert_eq!(
|
||||
selection
|
||||
.selected
|
||||
.iter()
|
||||
.map(|memory| memory.thread_id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![legacy_thread_id]
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_stage1_job_succeeded_clears_selected_for_phase2_on_upsert() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
let thread_id = seed_stage1_output(&runtime, &codex_home, owner, "workspace-a", 100).await;
|
||||
|
||||
let first_selection = runtime
|
||||
.select_stage1_outputs_for_phase2(1, 30)
|
||||
.await
|
||||
.expect("first phase2 selection");
|
||||
assert_eq!(first_selection.selected.len(), 1);
|
||||
let stale_last_usage = (Utc::now() - Duration::days(40)).timestamp();
|
||||
sqlx::query(
|
||||
"UPDATE stage1_outputs SET usage_count = ?, last_usage = ? WHERE thread_id = ?",
|
||||
)
|
||||
.bind(7_i64)
|
||||
.bind(stale_last_usage)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("seed prior usage metadata");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 101, 3600, 64)
|
||||
.await
|
||||
.expect("claim updated stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
101,
|
||||
"raw memory updated",
|
||||
"rollout summary updated",
|
||||
Some("updated-rollout"),
|
||||
)
|
||||
.await
|
||||
.expect("mark updated stage1 succeeded"),
|
||||
"updated stage1 success should persist output"
|
||||
);
|
||||
|
||||
let row = sqlx::query(
|
||||
"SELECT selected_for_phase2, source_updated_at, usage_count, last_usage FROM stage1_outputs WHERE thread_id = ?",
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_one(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("load updated stage1 row");
|
||||
assert_eq!(
|
||||
row.try_get::<i64, _>("selected_for_phase2")
|
||||
.expect("selected_for_phase2"),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
row.try_get::<i64, _>("source_updated_at")
|
||||
.expect("source_updated_at"),
|
||||
101
|
||||
);
|
||||
assert_eq!(
|
||||
row.try_get::<i64, _>("usage_count").expect("usage_count"),
|
||||
7
|
||||
);
|
||||
assert_eq!(
|
||||
row.try_get::<Option<i64>, _>("last_usage")
|
||||
.expect("last_usage"),
|
||||
Some(stale_last_usage)
|
||||
);
|
||||
|
||||
let second_selection = runtime
|
||||
.select_stage1_outputs_for_phase2(1, 30)
|
||||
.await
|
||||
.expect("second phase2 selection after refresh");
|
||||
assert_eq!(second_selection.selected.len(), 1);
|
||||
assert_eq!(second_selection.selected[0].thread_id, thread_id);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_stage1_output_usage_updates_usage_metadata() {
|
||||
let codex_home = unique_temp_dir();
|
||||
@@ -3182,6 +3623,24 @@ VALUES (?, ?, ?, ?, ?)
|
||||
.await
|
||||
.expect("mark stage1 succeeded b")
|
||||
);
|
||||
let phase2_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 after stage1 success");
|
||||
let (phase2_token, phase2_input_watermark) = match phase2_claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("unexpected phase2 claim after stage1 success: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_succeeded(phase2_token.as_str(), phase2_input_watermark)
|
||||
.await
|
||||
.expect("mark phase2 success after stage1"),
|
||||
"phase2 success should clear dirty state before usage-only update"
|
||||
);
|
||||
|
||||
let updated_rows = runtime
|
||||
.record_stage1_output_usage(&[thread_a, thread_a, thread_b, missing])
|
||||
@@ -3219,6 +3678,17 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let last_usage_b = row_b.try_get::<i64, _>("last_usage").expect("last_usage b");
|
||||
assert_eq!(last_usage_a, last_usage_b);
|
||||
assert!(last_usage_a > 0);
|
||||
let usage_only_phase2_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 after usage-only update");
|
||||
assert!(
|
||||
matches!(
|
||||
usage_only_phase2_claim,
|
||||
Phase2JobClaimOutcome::Claimed { .. }
|
||||
),
|
||||
"usage updates should enqueue phase2 when selection inputs change"
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
@@ -3872,6 +4342,72 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
async fn seed_stage1_output(
|
||||
runtime: &StateRuntime,
|
||||
codex_home: &Path,
|
||||
owner: ThreadId,
|
||||
cwd_name: &str,
|
||||
source_updated_at: i64,
|
||||
) -> ThreadId {
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
codex_home,
|
||||
thread_id,
|
||||
codex_home.join(cwd_name),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, source_updated_at, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
source_updated_at,
|
||||
"raw memory",
|
||||
"rollout summary",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
|
||||
thread_id
|
||||
}
|
||||
|
||||
async fn set_phase2_selection_metadata(
|
||||
runtime: &StateRuntime,
|
||||
thread_id: ThreadId,
|
||||
usage_count: Option<i64>,
|
||||
last_usage: Option<i64>,
|
||||
generated_at: i64,
|
||||
) {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET usage_count = ?, last_usage = ?, generated_at = ?
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(usage_count)
|
||||
.bind(last_usage)
|
||||
.bind(generated_at)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("update phase2 selection metadata");
|
||||
}
|
||||
|
||||
fn test_thread_metadata(
|
||||
codex_home: &Path,
|
||||
thread_id: ThreadId,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::model::Phase2InputSelection;
|
||||
use crate::model::Phase2JobClaimOutcome;
|
||||
use crate::model::Stage1JobClaim;
|
||||
use crate::model::Stage1JobClaimOutcome;
|
||||
@@ -52,7 +53,9 @@ WHERE kind = ? OR kind = ?
|
||||
/// Record usage for cited stage-1 outputs.
|
||||
///
|
||||
/// Each thread id increments `usage_count` by one and sets `last_usage` to
|
||||
/// the current Unix timestamp. Missing rows are ignored.
|
||||
/// the current Unix timestamp. Missing rows are ignored. When at least one
|
||||
/// row changes, this also re-enqueues global phase 2 so usage-driven
|
||||
/// selection changes refresh the memory artifacts.
|
||||
pub async fn record_stage1_output_usage(
|
||||
&self,
|
||||
thread_ids: &[ThreadId],
|
||||
@@ -82,6 +85,10 @@ WHERE thread_id = ?
|
||||
.rows_affected() as usize;
|
||||
}
|
||||
|
||||
if updated_rows > 0 {
|
||||
enqueue_global_consolidation_with_executor(&mut *tx, now).await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(updated_rows)
|
||||
}
|
||||
@@ -214,6 +221,143 @@ LEFT JOIN jobs
|
||||
Ok(claimed)
|
||||
}
|
||||
|
||||
/// Selects the stage-1 outputs used as phase-2 consolidation input.
|
||||
///
|
||||
/// Query behavior:
|
||||
/// - snapshots the prior `selected_for_phase2 = 1` set for diff logging
|
||||
/// - filters current candidates to rows where:
|
||||
/// - at least one of `raw_memory` / `rollout_summary` is non-empty
|
||||
/// - either:
|
||||
/// - usage metadata is still missing from the pre-0016 migration state,
|
||||
/// or
|
||||
/// - the fresher of `last_usage` / `generated_at` is within
|
||||
/// `max_unused_days`
|
||||
/// - orders candidates by:
|
||||
/// - `COALESCE(usage_count, 0) DESC`
|
||||
/// - the fresher of `last_usage` / `generated_at` DESC
|
||||
/// - `source_updated_at DESC, thread_id DESC`
|
||||
/// - applies `LIMIT max_selected`
|
||||
/// - clears `selected_for_phase2` on all rows, then marks only the selected
|
||||
/// rows as `1`
|
||||
pub async fn select_stage1_outputs_for_phase2(
|
||||
&self,
|
||||
max_selected: usize,
|
||||
max_unused_days: i64,
|
||||
) -> anyhow::Result<Phase2InputSelection> {
|
||||
let recency_cutoff = (Utc::now() - Duration::days(max_unused_days.max(0))).timestamp();
|
||||
let mut tx = self.pool.begin_with("BEGIN IMMEDIATE").await?;
|
||||
|
||||
let previously_selected = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
so.thread_id,
|
||||
so.source_updated_at,
|
||||
so.raw_memory,
|
||||
so.rollout_summary,
|
||||
so.rollout_slug,
|
||||
so.generated_at,
|
||||
COALESCE(t.cwd, '') AS cwd
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE so.selected_for_phase2 = 1
|
||||
ORDER BY
|
||||
COALESCE(so.usage_count, 0) DESC,
|
||||
COALESCE(so.last_usage, so.generated_at) DESC,
|
||||
so.source_updated_at DESC,
|
||||
so.thread_id DESC
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let selected = if max_selected == 0 {
|
||||
Vec::new()
|
||||
} else {
|
||||
sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
so.thread_id,
|
||||
so.source_updated_at,
|
||||
so.raw_memory,
|
||||
so.rollout_summary,
|
||||
so.rollout_slug,
|
||||
so.generated_at,
|
||||
COALESCE(t.cwd, '') AS cwd
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE
|
||||
(length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
|
||||
AND (
|
||||
(so.usage_count IS NULL AND so.last_usage IS NULL)
|
||||
OR CASE
|
||||
WHEN so.last_usage IS NULL THEN so.generated_at
|
||||
WHEN so.generated_at > so.last_usage THEN so.generated_at
|
||||
ELSE so.last_usage
|
||||
END >= ?
|
||||
)
|
||||
ORDER BY
|
||||
COALESCE(so.usage_count, 0) DESC,
|
||||
CASE
|
||||
WHEN so.last_usage IS NULL THEN so.generated_at
|
||||
WHEN so.generated_at > so.last_usage THEN so.generated_at
|
||||
ELSE so.last_usage
|
||||
END DESC,
|
||||
so.source_updated_at DESC,
|
||||
so.thread_id DESC
|
||||
LIMIT ?
|
||||
"#,
|
||||
)
|
||||
.bind(recency_cutoff)
|
||||
.bind(max_selected as i64)
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET selected_for_phase2 = 0
|
||||
WHERE selected_for_phase2 != 0
|
||||
"#,
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
if !selected.is_empty() {
|
||||
for chunk in selected.chunks(900) {
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET selected_for_phase2 = 1
|
||||
WHERE thread_id IN (
|
||||
"#,
|
||||
);
|
||||
let mut separated = builder.separated(", ");
|
||||
for memory in chunk {
|
||||
separated.push_bind(memory.thread_id.to_string());
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
|
||||
builder.build().execute(&mut *tx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Phase2InputSelection {
|
||||
selected,
|
||||
previously_selected,
|
||||
})
|
||||
}
|
||||
|
||||
/// Lists the most recent non-empty stage-1 outputs for global consolidation.
|
||||
///
|
||||
/// Query behavior:
|
||||
@@ -453,6 +597,8 @@ WHERE kind = ? AND job_key = ?
|
||||
/// - sets `status='done'` and `last_success_watermark = input_watermark`
|
||||
/// - upserts `stage1_outputs` for the thread, replacing existing output only
|
||||
/// when `source_updated_at` is newer or equal
|
||||
/// - inserts new rows with `usage_count = 0` so "never used" is distinct
|
||||
/// from legacy rows that still have missing usage metadata
|
||||
/// - persists optional `rollout_slug` for rollout summary artifact naming
|
||||
/// - enqueues/advances the global phase-2 job watermark using
|
||||
/// `source_updated_at`
|
||||
@@ -503,14 +649,17 @@ INSERT INTO stage1_outputs (
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
rollout_slug,
|
||||
generated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
generated_at,
|
||||
usage_count
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(thread_id) DO UPDATE SET
|
||||
source_updated_at = excluded.source_updated_at,
|
||||
raw_memory = excluded.raw_memory,
|
||||
rollout_summary = excluded.rollout_summary,
|
||||
rollout_slug = excluded.rollout_slug,
|
||||
generated_at = excluded.generated_at
|
||||
generated_at = excluded.generated_at,
|
||||
usage_count = COALESCE(stage1_outputs.usage_count, excluded.usage_count),
|
||||
selected_for_phase2 = 0
|
||||
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
|
||||
"#,
|
||||
)
|
||||
@@ -520,6 +669,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
|
||||
.bind(rollout_summary)
|
||||
.bind(rollout_slug)
|
||||
.bind(now)
|
||||
.bind(0_i64)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user