mirror of
https://github.com/openai/codex.git
synced 2026-04-22 07:21:46 +03:00
Compare commits
2 Commits
exec-env-p
...
zuxin/summ
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dcf6141900 | ||
|
|
c53205179a |
@@ -7,6 +7,7 @@ use crate::memories::metrics;
|
||||
use crate::memories::phase_two;
|
||||
use crate::memories::prompts::build_consolidation_prompt;
|
||||
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
|
||||
use crate::memories::storage::resolve_rollout_summary_files;
|
||||
use crate::memories::storage::sync_rollout_summaries_from_memories;
|
||||
use codex_config::Constrained;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -74,25 +75,43 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
|
||||
}
|
||||
};
|
||||
let new_watermark = get_watermark(claim.watermark, &raw_memories);
|
||||
let resolved = resolve_rollout_summary_files(&raw_memories, max_raw_memories);
|
||||
|
||||
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
|
||||
// step 3
|
||||
// [`rollout_summaries/`]
|
||||
if let Err(err) =
|
||||
sync_rollout_summaries_from_memories(&root, &raw_memories, max_raw_memories).await
|
||||
{
|
||||
if let Err(err) = sync_rollout_summaries_from_memories(&root, &raw_memories, &resolved).await {
|
||||
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
|
||||
job::failed(session, db, &claim, "failed_sync_artifacts").await;
|
||||
return;
|
||||
}
|
||||
// [`raw_memories.md`]
|
||||
if let Err(err) =
|
||||
rebuild_raw_memories_file_from_memories(&root, &raw_memories, max_raw_memories).await
|
||||
if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &raw_memories, &resolved).await
|
||||
{
|
||||
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
|
||||
job::failed(session, db, &claim, "failed_rebuild_raw_memories").await;
|
||||
return;
|
||||
}
|
||||
let filename_rows = resolved
|
||||
.iter()
|
||||
.map(|item| (item.thread_id, item.file_name.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
if let Err(err) = db
|
||||
.set_rollout_summary_filenames_for_global(filename_rows.as_slice())
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"failed persisting rollout summary filename mapping for global consolidation: {err}"
|
||||
);
|
||||
job::failed(
|
||||
session,
|
||||
db,
|
||||
&claim,
|
||||
"failed_persist_rollout_summary_filenames",
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
if raw_memories.is_empty() {
|
||||
// We check only after sync of the file system.
|
||||
job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await;
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::Stage1Output;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Write as _;
|
||||
use std::path::Path;
|
||||
@@ -7,37 +11,102 @@ use tracing::warn;
|
||||
use crate::memories::ensure_layout;
|
||||
use crate::memories::raw_memories_file;
|
||||
use crate::memories::rollout_summaries_dir;
|
||||
use crate::rollout::list::parse_timestamp_uuid_from_filename;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub(super) struct ResolvedRolloutSummary {
|
||||
pub(super) thread_id: ThreadId,
|
||||
pub(super) file_stem: String,
|
||||
pub(super) file_name: String,
|
||||
}
|
||||
|
||||
/// Resolves rollout summary filenames for retained stage-1 outputs.
|
||||
pub(super) fn resolve_rollout_summary_files(
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
) -> Vec<ResolvedRolloutSummary> {
|
||||
let retained = retained_memories(memories, max_raw_memories_for_global);
|
||||
let mut base_name_counts = HashMap::<String, usize>::new();
|
||||
let mut resolved = Vec::with_capacity(retained.len());
|
||||
|
||||
for memory in retained {
|
||||
let timestamp = if let Some(file_name) = memory
|
||||
.rollout_path
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
&& let Some((parsed_timestamp, _)) = parse_timestamp_uuid_from_filename(file_name)
|
||||
&& let Some(parsed) =
|
||||
DateTime::<Utc>::from_timestamp(parsed_timestamp.unix_timestamp(), 0)
|
||||
{
|
||||
parsed.format("%Y-%m-%dT%H-%M-%S").to_string()
|
||||
} else {
|
||||
memory
|
||||
.source_updated_at
|
||||
.format("%Y-%m-%dT%H-%M-%S")
|
||||
.to_string()
|
||||
};
|
||||
let slug = normalize_rollout_slug(memory.rollout_slug.as_deref());
|
||||
let base_stem = format!("{timestamp}-{slug}");
|
||||
|
||||
let counter = base_name_counts.entry(base_stem.clone()).or_default();
|
||||
*counter += 1;
|
||||
let file_stem = if *counter == 1 {
|
||||
base_stem
|
||||
} else {
|
||||
format!("{base_stem}-{counter}")
|
||||
};
|
||||
let file_name = format!("{file_stem}.md");
|
||||
|
||||
resolved.push(ResolvedRolloutSummary {
|
||||
thread_id: memory.thread_id,
|
||||
file_stem,
|
||||
file_name,
|
||||
});
|
||||
}
|
||||
|
||||
resolved
|
||||
}
|
||||
|
||||
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
|
||||
pub(super) async fn rebuild_raw_memories_file_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
resolved: &[ResolvedRolloutSummary],
|
||||
) -> std::io::Result<()> {
|
||||
ensure_layout(root).await?;
|
||||
rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await
|
||||
rebuild_raw_memories_file(root, memories, resolved).await
|
||||
}
|
||||
|
||||
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
|
||||
pub(super) async fn sync_rollout_summaries_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
resolved: &[ResolvedRolloutSummary],
|
||||
) -> std::io::Result<()> {
|
||||
ensure_layout(root).await?;
|
||||
|
||||
let retained = retained_memories(memories, max_raw_memories_for_global);
|
||||
let keep = retained
|
||||
let keep = resolved
|
||||
.iter()
|
||||
.map(rollout_summary_file_stem)
|
||||
.map(|item| item.file_stem.clone())
|
||||
.collect::<HashSet<_>>();
|
||||
prune_rollout_summaries(root, &keep).await?;
|
||||
|
||||
for memory in retained {
|
||||
write_rollout_summary_for_thread(root, memory).await?;
|
||||
let memory_by_thread = memories
|
||||
.iter()
|
||||
.map(|memory| (memory.thread_id.to_string(), memory))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for item in resolved {
|
||||
let Some(memory) = memory_by_thread.get(&item.thread_id.to_string()) else {
|
||||
return Err(std::io::Error::other(format!(
|
||||
"missing stage1 output for thread {} while syncing rollout summaries",
|
||||
item.thread_id
|
||||
)));
|
||||
};
|
||||
write_rollout_summary_for_thread(root, memory, &item.file_stem).await?;
|
||||
}
|
||||
|
||||
if retained.is_empty() {
|
||||
if resolved.is_empty() {
|
||||
for file_name in ["MEMORY.md", "memory_summary.md"] {
|
||||
let path = root.join(file_name);
|
||||
if let Err(err) = tokio::fs::remove_file(path).await
|
||||
@@ -61,18 +130,29 @@ pub(super) async fn sync_rollout_summaries_from_memories(
|
||||
async fn rebuild_raw_memories_file(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
resolved: &[ResolvedRolloutSummary],
|
||||
) -> std::io::Result<()> {
|
||||
let retained = retained_memories(memories, max_raw_memories_for_global);
|
||||
let mut body = String::from("# Raw Memories\n\n");
|
||||
|
||||
if retained.is_empty() {
|
||||
if resolved.is_empty() {
|
||||
body.push_str("No raw memories yet.\n");
|
||||
return tokio::fs::write(raw_memories_file(root), body).await;
|
||||
}
|
||||
|
||||
let memory_by_thread = memories
|
||||
.iter()
|
||||
.map(|memory| (memory.thread_id.to_string(), memory))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
body.push_str("Merged stage-1 raw memories (latest first):\n\n");
|
||||
for memory in retained {
|
||||
for item in resolved {
|
||||
let Some(memory) = memory_by_thread.get(&item.thread_id.to_string()) else {
|
||||
return Err(std::io::Error::other(format!(
|
||||
"missing stage1 output for thread {} while rebuilding raw memories",
|
||||
item.thread_id
|
||||
)));
|
||||
};
|
||||
|
||||
writeln!(body, "## Thread `{}`", memory.thread_id).map_err(raw_memories_format_error)?;
|
||||
writeln!(
|
||||
body,
|
||||
@@ -81,6 +161,8 @@ async fn rebuild_raw_memories_file(
|
||||
)
|
||||
.map_err(raw_memories_format_error)?;
|
||||
writeln!(body, "cwd: {}", memory.cwd.display()).map_err(raw_memories_format_error)?;
|
||||
writeln!(body, "rollout_summary_file_name: {}", item.file_name)
|
||||
.map_err(raw_memories_format_error)?;
|
||||
writeln!(body).map_err(raw_memories_format_error)?;
|
||||
body.push_str(memory.raw_memory.trim());
|
||||
body.push_str("\n\n");
|
||||
@@ -122,8 +204,8 @@ async fn prune_rollout_summaries(root: &Path, keep: &HashSet<String>) -> std::io
|
||||
async fn write_rollout_summary_for_thread(
|
||||
root: &Path,
|
||||
memory: &Stage1Output,
|
||||
file_stem: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let file_stem = rollout_summary_file_stem(memory);
|
||||
let path = rollout_summaries_dir(root).join(format!("{file_stem}.md"));
|
||||
|
||||
let mut body = String::new();
|
||||
@@ -157,41 +239,48 @@ fn rollout_summary_format_error(err: std::fmt::Error) -> std::io::Error {
|
||||
std::io::Error::other(format!("format rollout summary: {err}"))
|
||||
}
|
||||
|
||||
fn rollout_summary_file_stem(memory: &Stage1Output) -> String {
|
||||
const ROLLOUT_SLUG_MAX_LEN: usize = 20;
|
||||
fn normalize_rollout_slug(raw_slug: Option<&str>) -> String {
|
||||
const ROLLOUT_SLUG_MAX_LEN: usize = 60;
|
||||
|
||||
let thread_id = memory.thread_id.to_string();
|
||||
let Some(raw_slug) = memory.rollout_slug.as_deref() else {
|
||||
return thread_id;
|
||||
};
|
||||
let mut normalized = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
|
||||
for ch in raw_slug.unwrap_or_default().chars() {
|
||||
let mapped = if ch.is_ascii_alphanumeric() {
|
||||
ch.to_ascii_lowercase()
|
||||
} else if ch == '_' || ch == '-' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
};
|
||||
|
||||
let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
|
||||
for ch in raw_slug.chars() {
|
||||
if slug.len() >= ROLLOUT_SLUG_MAX_LEN {
|
||||
let mapped_is_sep = mapped == '_' || mapped == '-';
|
||||
let prev_is_sep = normalized
|
||||
.chars()
|
||||
.last()
|
||||
.is_some_and(|previous| previous == '_' || previous == '-');
|
||||
if mapped_is_sep && prev_is_sep {
|
||||
continue;
|
||||
}
|
||||
|
||||
normalized.push(mapped);
|
||||
if normalized.len() == ROLLOUT_SLUG_MAX_LEN {
|
||||
break;
|
||||
}
|
||||
|
||||
if ch.is_ascii_alphanumeric() {
|
||||
slug.push(ch.to_ascii_lowercase());
|
||||
} else {
|
||||
slug.push('_');
|
||||
}
|
||||
}
|
||||
|
||||
while slug.ends_with('_') {
|
||||
slug.pop();
|
||||
}
|
||||
|
||||
if slug.is_empty() {
|
||||
thread_id
|
||||
let trimmed = normalized
|
||||
.trim_matches(|ch| ch == '_' || ch == '-')
|
||||
.to_string();
|
||||
if trimmed.is_empty() {
|
||||
"unknown".to_string()
|
||||
} else {
|
||||
format!("{thread_id}-{slug}")
|
||||
trimmed
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::rollout_summary_file_stem;
|
||||
use super::normalize_rollout_slug;
|
||||
use super::resolve_rollout_summary_files;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -199,43 +288,82 @@ mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output {
|
||||
fn stage1_output_with_slug_and_path(
|
||||
rollout_slug: Option<&str>,
|
||||
rollout_path: &str,
|
||||
) -> Stage1Output {
|
||||
Stage1Output {
|
||||
thread_id: ThreadId::new(),
|
||||
source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "summary".to_string(),
|
||||
rollout_slug: rollout_slug.map(ToString::to_string),
|
||||
rollout_summary_filename: None,
|
||||
rollout_path: PathBuf::from(rollout_path),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() {
|
||||
let memory = stage1_output_with_slug(None);
|
||||
let thread_id = memory.thread_id.to_string();
|
||||
|
||||
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
|
||||
fn normalize_rollout_slug_applies_capping_and_separator_rules() {
|
||||
let value = normalize_rollout_slug(Some(
|
||||
"--Unsafe Slug//With---Spaces&&Symbols____________________01234567890123456789012345",
|
||||
));
|
||||
assert_eq!(
|
||||
value,
|
||||
"unsafe_slug_with-spaces_symbols_01234567890123456789012345"
|
||||
);
|
||||
assert!(value.len() <= 60);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollout_summary_file_stem_sanitizes_and_truncates_slug() {
|
||||
let memory =
|
||||
stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345"));
|
||||
let thread_id = memory.thread_id.to_string();
|
||||
fn normalize_rollout_slug_uses_unknown_for_empty_result() {
|
||||
assert_eq!(normalize_rollout_slug(Some("!!!")), "unknown");
|
||||
assert_eq!(normalize_rollout_slug(Some("")), "unknown");
|
||||
assert_eq!(normalize_rollout_slug(None), "unknown");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_rollout_summary_files_uses_timestamp_and_suffixes_collisions() {
|
||||
let first = stage1_output_with_slug_and_path(
|
||||
Some("Unsafe Slug/With Spaces & Symbols"),
|
||||
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-00000000-0000-0000-0000-000000000001.jsonl",
|
||||
);
|
||||
let second = Stage1Output {
|
||||
thread_id: ThreadId::new(),
|
||||
source_updated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
|
||||
raw_memory: "raw memory 2".to_string(),
|
||||
rollout_summary: "summary 2".to_string(),
|
||||
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols".to_string()),
|
||||
rollout_summary_filename: None,
|
||||
rollout_path: PathBuf::from(
|
||||
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-00000000-0000-0000-0000-000000000002.jsonl",
|
||||
),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(125, 0).single().expect("timestamp"),
|
||||
};
|
||||
|
||||
let resolved = resolve_rollout_summary_files(&[first, second], 8);
|
||||
assert_eq!(resolved.len(), 2);
|
||||
assert_eq!(
|
||||
rollout_summary_file_stem(&memory),
|
||||
format!("{thread_id}-unsafe_slug_with_spa")
|
||||
resolved[0].file_name,
|
||||
"2026-02-17T19-22-07-unsafe_slug_with_spaces_symbols.md"
|
||||
);
|
||||
assert_eq!(
|
||||
resolved[1].file_name,
|
||||
"2026-02-17T19-22-07-unsafe_slug_with_spaces_symbols-2.md"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() {
|
||||
let memory = stage1_output_with_slug(Some(""));
|
||||
let thread_id = memory.thread_id.to_string();
|
||||
fn resolve_rollout_summary_files_falls_back_to_source_updated_at_when_rollout_timestamp_is_missing()
|
||||
{
|
||||
let memory =
|
||||
stage1_output_with_slug_and_path(Some("alpha"), "sessions/rollout-not-parseable.jsonl");
|
||||
let resolved = resolve_rollout_summary_files(&[memory], 8);
|
||||
|
||||
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
|
||||
assert_eq!(resolved.len(), 1);
|
||||
assert_eq!(resolved[0].file_name, "1970-01-01T00-02-03-alpha.md");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::storage::resolve_rollout_summary_files;
|
||||
use super::storage::sync_rollout_summaries_from_memories;
|
||||
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL;
|
||||
use crate::memories::ensure_layout;
|
||||
@@ -69,10 +70,25 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
let root = dir.path().join("memory");
|
||||
ensure_layout(&root).await.expect("ensure layout");
|
||||
|
||||
let keep_id = ThreadId::default().to_string();
|
||||
let drop_id = ThreadId::default().to_string();
|
||||
let keep_path = rollout_summaries_dir(&root).join(format!("{keep_id}.md"));
|
||||
let drop_path = rollout_summaries_dir(&root).join(format!("{drop_id}.md"));
|
||||
let keep_id = ThreadId::new();
|
||||
|
||||
let memories = vec![Stage1Output {
|
||||
thread_id: keep_id,
|
||||
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "short summary".to_string(),
|
||||
rollout_slug: None,
|
||||
rollout_summary_filename: None,
|
||||
rollout_path: PathBuf::from(format!(
|
||||
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{keep_id}.jsonl"
|
||||
)),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
|
||||
}];
|
||||
let resolved =
|
||||
resolve_rollout_summary_files(&memories, DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL);
|
||||
let keep_path = rollout_summaries_dir(&root).join(&resolved[0].file_name);
|
||||
let drop_path = rollout_summaries_dir(&root).join("obsolete-summary.md");
|
||||
tokio::fs::write(&keep_path, "keep")
|
||||
.await
|
||||
.expect("write keep");
|
||||
@@ -80,30 +96,12 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
.await
|
||||
.expect("write drop");
|
||||
|
||||
let memories = vec![Stage1Output {
|
||||
thread_id: ThreadId::try_from(keep_id.clone()).expect("thread id"),
|
||||
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "short summary".to_string(),
|
||||
rollout_slug: None,
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
|
||||
}];
|
||||
|
||||
sync_rollout_summaries_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
rebuild_raw_memories_file_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
)
|
||||
.await
|
||||
.expect("rebuild raw memories");
|
||||
sync_rollout_summaries_from_memories(&root, &memories, &resolved)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
rebuild_raw_memories_file_from_memories(&root, &memories, &resolved)
|
||||
.await
|
||||
.expect("rebuild raw memories");
|
||||
|
||||
assert!(keep_path.is_file());
|
||||
assert!(!drop_path.exists());
|
||||
@@ -112,12 +110,16 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
.await
|
||||
.expect("read raw memories");
|
||||
assert!(raw_memories.contains("raw memory"));
|
||||
assert!(raw_memories.contains(&keep_id));
|
||||
assert!(raw_memories.contains(&keep_id.to_string()));
|
||||
assert!(raw_memories.contains("cwd: /tmp/workspace"));
|
||||
assert!(raw_memories.contains(&format!(
|
||||
"rollout_summary_file_name: {}",
|
||||
resolved[0].file_name
|
||||
)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
|
||||
async fn sync_rollout_summaries_uses_timestamp_and_sanitized_slug_filename() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let root = dir.path().join("memory");
|
||||
ensure_layout(&root).await.expect("ensure layout");
|
||||
@@ -139,17 +141,19 @@ async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "short summary".to_string(),
|
||||
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()),
|
||||
rollout_summary_filename: None,
|
||||
rollout_path: PathBuf::from(format!(
|
||||
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{thread_id}.jsonl"
|
||||
)),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"),
|
||||
}];
|
||||
let resolved =
|
||||
resolve_rollout_summary_files(&memories, DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL);
|
||||
|
||||
sync_rollout_summaries_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
sync_rollout_summaries_from_memories(&root, &memories, &resolved)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
|
||||
let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root))
|
||||
.await
|
||||
@@ -162,17 +166,22 @@ async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
|
||||
|
||||
assert_eq!(files.len(), 1);
|
||||
let file_name = &files[0];
|
||||
assert_eq!(file_name, &resolved[0].file_name);
|
||||
let stem = file_name
|
||||
.strip_suffix(".md")
|
||||
.expect("rollout summary file should end with .md");
|
||||
.expect("summary should end with .md");
|
||||
assert!(
|
||||
stem.starts_with("2026-02-17T19-22-07-"),
|
||||
"rollout summary filename should include rollout timestamp"
|
||||
);
|
||||
let slug = stem
|
||||
.strip_prefix(&format!("{thread_id}-"))
|
||||
.expect("rollout summary filename should include thread id and slug");
|
||||
assert!(slug.len() <= 20, "slug should be capped at 20 chars");
|
||||
.strip_prefix("2026-02-17T19-22-07-")
|
||||
.expect("summary stem should include timestamp prefix");
|
||||
assert!(slug.len() <= 60, "slug should be capped at 60 chars");
|
||||
assert!(
|
||||
slug.chars()
|
||||
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'),
|
||||
"slug should be file-safe lowercase ascii with underscores"
|
||||
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_' || ch == '-'),
|
||||
"slug should be file-safe lowercase ascii with underscores/hyphens"
|
||||
);
|
||||
|
||||
let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name))
|
||||
@@ -220,13 +229,18 @@ mod phase2 {
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn stage1_output_with_source_updated_at(source_updated_at: i64) -> Stage1Output {
|
||||
let thread_id = ThreadId::new();
|
||||
Stage1Output {
|
||||
thread_id: ThreadId::new(),
|
||||
thread_id,
|
||||
source_updated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at, 0)
|
||||
.expect("valid source_updated_at timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "rollout summary".to_string(),
|
||||
rollout_slug: None,
|
||||
rollout_summary_filename: None,
|
||||
rollout_path: PathBuf::from(format!(
|
||||
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{thread_id}.jsonl"
|
||||
)),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at + 1, 0)
|
||||
.expect("valid generated_at timestamp"),
|
||||
@@ -281,7 +295,7 @@ mod phase2 {
|
||||
thread_id,
|
||||
self.config
|
||||
.codex_home
|
||||
.join(format!("rollout-{thread_id}.jsonl")),
|
||||
.join(format!("rollout-2026-02-17T19-22-07-{thread_id}.jsonl")),
|
||||
Utc::now(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
@@ -457,6 +471,55 @@ mod phase2 {
|
||||
harness.shutdown_threads().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_persists_rollout_summary_filename_mapping_and_raw_memories_header() {
|
||||
let harness = DispatchHarness::new().await;
|
||||
harness.seed_stage1_output(100).await;
|
||||
|
||||
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
|
||||
|
||||
let outputs = harness
|
||||
.state_db
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list stage1 outputs");
|
||||
assert_eq!(outputs.len(), 1);
|
||||
let file_name = outputs[0]
|
||||
.rollout_summary_filename
|
||||
.clone()
|
||||
.expect("phase2 should persist rollout summary filename");
|
||||
assert!(
|
||||
file_name.starts_with("2026-02-17T19-22-07-"),
|
||||
"filename should include rollout timestamp"
|
||||
);
|
||||
|
||||
let lookup = harness
|
||||
.state_db
|
||||
.get_stage1_output_by_rollout_summary_filename(file_name.as_str())
|
||||
.await
|
||||
.expect("lookup stage1 output by rollout summary filename")
|
||||
.expect("lookup should resolve persisted filename");
|
||||
assert_eq!(lookup.thread_id, outputs[0].thread_id);
|
||||
assert_eq!(lookup.rollout_summary_filename, Some(file_name.clone()));
|
||||
|
||||
let root = memory_root(&harness.config.codex_home);
|
||||
let raw_memories = tokio::fs::read_to_string(raw_memories_file(&root))
|
||||
.await
|
||||
.expect("read raw memories");
|
||||
assert!(
|
||||
raw_memories.contains(&format!("rollout_summary_file_name: {file_name}")),
|
||||
"raw_memories.md should include rollout summary filename header"
|
||||
);
|
||||
assert!(
|
||||
tokio::fs::try_exists(rollout_summaries_dir(&root).join(&file_name))
|
||||
.await
|
||||
.expect("check rollout summary file"),
|
||||
"rollout summary file should exist under resolved filename"
|
||||
);
|
||||
|
||||
harness.shutdown_threads().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
|
||||
let harness = DispatchHarness::new().await;
|
||||
|
||||
@@ -25,7 +25,7 @@ Folder structure (under {{ memory_root }}/):
|
||||
- Temporary file: merged raw memories from Phase 1. Input for Phase 2.
|
||||
- skills/<skill-name>/
|
||||
- Reusable procedures. Entrypoint: SKILL.md; may include scripts/, templates/, examples/.
|
||||
- rollout_summaries/<rollout_slug>.md
|
||||
- rollout_summaries/<timestamp>-<slug>.md
|
||||
- Recap of the rollout, including lessons learned, reusable knowledge,
|
||||
pointers/references, and pruned raw evidence snippets. Distilled version of
|
||||
everything valuable from the raw rollout.
|
||||
|
||||
@@ -235,7 +235,6 @@ shows or why it matters>:
|
||||
|
||||
The schema is below.
|
||||
---
|
||||
rollout_summary_file: <file.md>
|
||||
description: brief description of the task and outcome
|
||||
keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo concepts, contracts)>
|
||||
---
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
ALTER TABLE stage1_outputs
|
||||
ADD COLUMN rollout_summary_filename TEXT;
|
||||
|
||||
CREATE UNIQUE INDEX idx_stage1_outputs_rollout_summary_filename
|
||||
ON stage1_outputs(rollout_summary_filename)
|
||||
WHERE rollout_summary_filename IS NOT NULL;
|
||||
@@ -16,6 +16,8 @@ pub struct Stage1Output {
|
||||
pub raw_memory: String,
|
||||
pub rollout_summary: String,
|
||||
pub rollout_slug: Option<String>,
|
||||
pub rollout_summary_filename: Option<String>,
|
||||
pub rollout_path: PathBuf,
|
||||
pub cwd: PathBuf,
|
||||
pub generated_at: DateTime<Utc>,
|
||||
}
|
||||
@@ -27,6 +29,8 @@ pub(crate) struct Stage1OutputRow {
|
||||
raw_memory: String,
|
||||
rollout_summary: String,
|
||||
rollout_slug: Option<String>,
|
||||
rollout_summary_filename: Option<String>,
|
||||
rollout_path: String,
|
||||
cwd: String,
|
||||
generated_at: i64,
|
||||
}
|
||||
@@ -39,6 +43,8 @@ impl Stage1OutputRow {
|
||||
raw_memory: row.try_get("raw_memory")?,
|
||||
rollout_summary: row.try_get("rollout_summary")?,
|
||||
rollout_slug: row.try_get("rollout_slug")?,
|
||||
rollout_summary_filename: row.try_get("rollout_summary_filename")?,
|
||||
rollout_path: row.try_get("rollout_path")?,
|
||||
cwd: row.try_get("cwd")?,
|
||||
generated_at: row.try_get("generated_at")?,
|
||||
})
|
||||
@@ -55,6 +61,8 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
|
||||
raw_memory: row.raw_memory,
|
||||
rollout_summary: row.rollout_summary,
|
||||
rollout_slug: row.rollout_slug,
|
||||
rollout_summary_filename: row.rollout_summary_filename,
|
||||
rollout_path: PathBuf::from(row.rollout_path),
|
||||
cwd: PathBuf::from(row.cwd),
|
||||
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
|
||||
})
|
||||
|
||||
@@ -2120,10 +2120,20 @@ WHERE kind = 'memory_stage1'
|
||||
assert_eq!(outputs[0].thread_id, thread_id_b);
|
||||
assert_eq!(outputs[0].rollout_summary, "summary b");
|
||||
assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b"));
|
||||
assert_eq!(outputs[0].rollout_summary_filename, None);
|
||||
assert_eq!(
|
||||
outputs[0].rollout_path,
|
||||
codex_home.join(format!("rollout-{thread_id_b}.jsonl"))
|
||||
);
|
||||
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
|
||||
assert_eq!(outputs[1].thread_id, thread_id_a);
|
||||
assert_eq!(outputs[1].rollout_summary, "summary a");
|
||||
assert_eq!(outputs[1].rollout_slug, None);
|
||||
assert_eq!(outputs[1].rollout_summary_filename, None);
|
||||
assert_eq!(
|
||||
outputs[1].rollout_path,
|
||||
codex_home.join(format!("rollout-{thread_id_a}.jsonl"))
|
||||
);
|
||||
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
@@ -2193,11 +2203,304 @@ VALUES (?, ?, ?, ?, ?)
|
||||
assert_eq!(outputs.len(), 1);
|
||||
assert_eq!(outputs[0].thread_id, thread_id_non_empty);
|
||||
assert_eq!(outputs[0].rollout_summary, "summary");
|
||||
assert_eq!(outputs[0].rollout_summary_filename, None);
|
||||
assert_eq!(
|
||||
outputs[0].rollout_path,
|
||||
codex_home.join(format!("rollout-{thread_id_non_empty}.jsonl"))
|
||||
);
|
||||
assert_eq!(outputs[0].cwd, codex_home.join("workspace-non-empty"));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_rollout_summary_filenames_for_global_updates_and_clears_stale_rows() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id_a,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread a");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id_b,
|
||||
codex_home.join("workspace-b"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread b");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id_a, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 a");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id_a,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw memory a",
|
||||
"summary a",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded a"),
|
||||
"stage1 success should persist output a"
|
||||
);
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id_b, owner, 101, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 b");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id_b,
|
||||
ownership_token.as_str(),
|
||||
101,
|
||||
"raw memory b",
|
||||
"summary b",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded b"),
|
||||
"stage1 success should persist output b"
|
||||
);
|
||||
|
||||
runtime
|
||||
.set_rollout_summary_filenames_for_global(&[
|
||||
(thread_id_a, "2026-02-17T19-22-07-alpha.md".to_string()),
|
||||
(thread_id_b, "2026-02-17T19-22-08-beta.md".to_string()),
|
||||
])
|
||||
.await
|
||||
.expect("set rollout summary filenames");
|
||||
|
||||
let first = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list outputs after first filename update");
|
||||
assert_eq!(first.len(), 2);
|
||||
assert_eq!(
|
||||
first[0].rollout_summary_filename.as_deref(),
|
||||
Some("2026-02-17T19-22-08-beta.md")
|
||||
);
|
||||
assert_eq!(
|
||||
first[1].rollout_summary_filename.as_deref(),
|
||||
Some("2026-02-17T19-22-07-alpha.md")
|
||||
);
|
||||
|
||||
runtime
|
||||
.set_rollout_summary_filenames_for_global(&[(
|
||||
thread_id_b,
|
||||
"2026-02-17T19-22-08-beta-renamed.md".to_string(),
|
||||
)])
|
||||
.await
|
||||
.expect("set rollout summary filename for retained row only");
|
||||
|
||||
let second = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list outputs after clearing stale filename");
|
||||
assert_eq!(second.len(), 2);
|
||||
assert_eq!(
|
||||
second[0].rollout_summary_filename.as_deref(),
|
||||
Some("2026-02-17T19-22-08-beta-renamed.md")
|
||||
);
|
||||
assert_eq!(second[1].rollout_summary_filename, None);
|
||||
|
||||
runtime
|
||||
.set_rollout_summary_filenames_for_global(&[])
|
||||
.await
|
||||
.expect("clear rollout summary filenames");
|
||||
|
||||
let cleared = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list outputs after clearing all filenames");
|
||||
assert_eq!(cleared[0].rollout_summary_filename, None);
|
||||
assert_eq!(cleared[1].rollout_summary_filename, None);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_stage1_output_by_rollout_summary_filename_returns_matching_row() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw memory",
|
||||
"summary",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
|
||||
let file_name = "2026-02-17T19-22-07-summary.md".to_string();
|
||||
runtime
|
||||
.set_rollout_summary_filenames_for_global(&[(thread_id, file_name.clone())])
|
||||
.await
|
||||
.expect("set rollout summary filename");
|
||||
|
||||
let output = runtime
|
||||
.get_stage1_output_by_rollout_summary_filename(file_name.as_str())
|
||||
.await
|
||||
.expect("lookup by rollout summary filename")
|
||||
.expect("row should exist");
|
||||
assert_eq!(output.thread_id, thread_id);
|
||||
assert_eq!(output.rollout_summary, "summary");
|
||||
assert_eq!(
|
||||
output.rollout_summary_filename.as_deref(),
|
||||
Some(file_name.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
output.rollout_path,
|
||||
codex_home.join(format!("rollout-{thread_id}.jsonl"))
|
||||
);
|
||||
|
||||
let missing = runtime
|
||||
.get_stage1_output_by_rollout_summary_filename("missing.md")
|
||||
.await
|
||||
.expect("lookup missing rollout summary filename");
|
||||
assert_eq!(missing, None);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_rollout_summary_filenames_for_global_rejects_duplicate_file_names() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id_a,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread a");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id_b,
|
||||
codex_home.join("workspace-b"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread b");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id_a, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 a");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id_a,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw memory a",
|
||||
"summary a",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded a"),
|
||||
"stage1 success should persist output a"
|
||||
);
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id_b, owner, 101, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1 b");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id_b,
|
||||
ownership_token.as_str(),
|
||||
101,
|
||||
"raw memory b",
|
||||
"summary b",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded b"),
|
||||
"stage1 success should persist output b"
|
||||
);
|
||||
|
||||
let result = runtime
|
||||
.set_rollout_summary_filenames_for_global(&[
|
||||
(thread_id_a, "2026-02-17T19-22-07-duplicate.md".to_string()),
|
||||
(thread_id_b, "2026-02-17T19-22-07-duplicate.md".to_string()),
|
||||
])
|
||||
.await;
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"duplicate filenames should violate the unique index"
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_stage1_job_succeeded_enqueues_global_consolidation() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -197,7 +197,9 @@ SELECT
|
||||
so.raw_memory,
|
||||
so.rollout_summary,
|
||||
so.rollout_slug,
|
||||
so.rollout_summary_filename,
|
||||
so.generated_at
|
||||
, COALESCE(t.rollout_path, '') AS rollout_path
|
||||
, COALESCE(t.cwd, '') AS cwd
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
@@ -216,6 +218,98 @@ LIMIT ?
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
/// Persists the current rollout summary filename mapping for retained rows.
|
||||
///
|
||||
/// Query behavior:
|
||||
/// - clears `rollout_summary_filename` for rows not in `rows`
|
||||
/// - updates each retained row to its current emitted filename
|
||||
pub async fn set_rollout_summary_filenames_for_global(
|
||||
&self,
|
||||
rows: &[(ThreadId, String)],
|
||||
) -> anyhow::Result<()> {
|
||||
let rows = rows
|
||||
.iter()
|
||||
.map(|(thread_id, file_name)| (thread_id.to_string(), file_name.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
if rows.is_empty() {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET rollout_summary_filename = NULL
|
||||
"#,
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut clear_builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET rollout_summary_filename = NULL
|
||||
WHERE thread_id NOT IN (
|
||||
"#,
|
||||
);
|
||||
let mut separated = clear_builder.separated(", ");
|
||||
for (thread_id, _) in &rows {
|
||||
separated.push_bind(thread_id.as_str());
|
||||
}
|
||||
separated.push_unseparated("\n)");
|
||||
clear_builder.build().execute(&mut *tx).await?;
|
||||
|
||||
for (thread_id, file_name) in &rows {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE stage1_outputs
|
||||
SET rollout_summary_filename = ?
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(file_name)
|
||||
.bind(thread_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finds one stage-1 output row by rollout summary filename.
|
||||
pub async fn get_stage1_output_by_rollout_summary_filename(
|
||||
&self,
|
||||
file_name: &str,
|
||||
) -> anyhow::Result<Option<Stage1Output>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
so.thread_id,
|
||||
so.source_updated_at,
|
||||
so.raw_memory,
|
||||
so.rollout_summary,
|
||||
so.rollout_slug,
|
||||
so.rollout_summary_filename,
|
||||
so.generated_at
|
||||
, COALESCE(t.rollout_path, '') AS rollout_path
|
||||
, COALESCE(t.cwd, '') AS cwd
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE so.rollout_summary_filename = ?
|
||||
LIMIT 1
|
||||
"#,
|
||||
)
|
||||
.bind(file_name)
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
row.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Attempts to claim a stage-1 job for a thread at `source_updated_at`.
|
||||
///
|
||||
/// Claim semantics:
|
||||
@@ -463,13 +557,15 @@ INSERT INTO stage1_outputs (
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
rollout_slug,
|
||||
rollout_summary_filename,
|
||||
generated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
) VALUES (?, ?, ?, ?, ?, NULL, ?)
|
||||
ON CONFLICT(thread_id) DO UPDATE SET
|
||||
source_updated_at = excluded.source_updated_at,
|
||||
raw_memory = excluded.raw_memory,
|
||||
rollout_summary = excluded.rollout_summary,
|
||||
rollout_slug = excluded.rollout_slug,
|
||||
rollout_summary_filename = excluded.rollout_summary_filename,
|
||||
generated_at = excluded.generated_at
|
||||
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
|
||||
"#,
|
||||
|
||||
Reference in New Issue
Block a user