Compare commits

...

2 Commits

Author SHA1 Message Date
zuxin-oai
dcf6141900 Merge branch 'main' into zuxin/summary_naming 2026-02-17 18:20:04 -08:00
Zuxin Liu
c53205179a memories: switch rollout summary names to timestamp+slug 2026-02-17 18:03:21 -08:00
9 changed files with 728 additions and 106 deletions

View File

@@ -7,6 +7,7 @@ use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::resolve_rollout_summary_files;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_config::Constrained;
use codex_protocol::ThreadId;
@@ -74,25 +75,43 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
let new_watermark = get_watermark(claim.watermark, &raw_memories);
let resolved = resolve_rollout_summary_files(&raw_memories, max_raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
// step 3
// [`rollout_summaries/`]
if let Err(err) =
sync_rollout_summaries_from_memories(&root, &raw_memories, max_raw_memories).await
{
if let Err(err) = sync_rollout_summaries_from_memories(&root, &raw_memories, &resolved).await {
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
return;
}
// [`raw_memories.md`]
if let Err(err) =
rebuild_raw_memories_file_from_memories(&root, &raw_memories, max_raw_memories).await
if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &raw_memories, &resolved).await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_rebuild_raw_memories").await;
return;
}
let filename_rows = resolved
.iter()
.map(|item| (item.thread_id, item.file_name.clone()))
.collect::<Vec<_>>();
if let Err(err) = db
.set_rollout_summary_filenames_for_global(filename_rows.as_slice())
.await
{
tracing::error!(
"failed persisting rollout summary filename mapping for global consolidation: {err}"
);
job::failed(
session,
db,
&claim,
"failed_persist_rollout_summary_filenames",
)
.await;
return;
}
if raw_memories.is_empty() {
// We check only after sync of the file system.
job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await;

View File

@@ -1,4 +1,8 @@
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_state::Stage1Output;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Write as _;
use std::path::Path;
@@ -7,37 +11,102 @@ use tracing::warn;
use crate::memories::ensure_layout;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use crate::rollout::list::parse_timestamp_uuid_from_filename;
#[derive(Clone, Debug, Eq, PartialEq)]
pub(super) struct ResolvedRolloutSummary {
pub(super) thread_id: ThreadId,
pub(super) file_stem: String,
pub(super) file_name: String,
}
/// Resolves rollout summary filenames for retained stage-1 outputs.
pub(super) fn resolve_rollout_summary_files(
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
) -> Vec<ResolvedRolloutSummary> {
let retained = retained_memories(memories, max_raw_memories_for_global);
let mut base_name_counts = HashMap::<String, usize>::new();
let mut resolved = Vec::with_capacity(retained.len());
for memory in retained {
let timestamp = if let Some(file_name) = memory
.rollout_path
.file_name()
.and_then(|name| name.to_str())
&& let Some((parsed_timestamp, _)) = parse_timestamp_uuid_from_filename(file_name)
&& let Some(parsed) =
DateTime::<Utc>::from_timestamp(parsed_timestamp.unix_timestamp(), 0)
{
parsed.format("%Y-%m-%dT%H-%M-%S").to_string()
} else {
memory
.source_updated_at
.format("%Y-%m-%dT%H-%M-%S")
.to_string()
};
let slug = normalize_rollout_slug(memory.rollout_slug.as_deref());
let base_stem = format!("{timestamp}-{slug}");
let counter = base_name_counts.entry(base_stem.clone()).or_default();
*counter += 1;
let file_stem = if *counter == 1 {
base_stem
} else {
format!("{base_stem}-{counter}")
};
let file_name = format!("{file_stem}.md");
resolved.push(ResolvedRolloutSummary {
thread_id: memory.thread_id,
file_stem,
file_name,
});
}
resolved
}
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
resolved: &[ResolvedRolloutSummary],
) -> std::io::Result<()> {
ensure_layout(root).await?;
rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await
rebuild_raw_memories_file(root, memories, resolved).await
}
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
pub(super) async fn sync_rollout_summaries_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
resolved: &[ResolvedRolloutSummary],
) -> std::io::Result<()> {
ensure_layout(root).await?;
let retained = retained_memories(memories, max_raw_memories_for_global);
let keep = retained
let keep = resolved
.iter()
.map(rollout_summary_file_stem)
.map(|item| item.file_stem.clone())
.collect::<HashSet<_>>();
prune_rollout_summaries(root, &keep).await?;
for memory in retained {
write_rollout_summary_for_thread(root, memory).await?;
let memory_by_thread = memories
.iter()
.map(|memory| (memory.thread_id.to_string(), memory))
.collect::<HashMap<_, _>>();
for item in resolved {
let Some(memory) = memory_by_thread.get(&item.thread_id.to_string()) else {
return Err(std::io::Error::other(format!(
"missing stage1 output for thread {} while syncing rollout summaries",
item.thread_id
)));
};
write_rollout_summary_for_thread(root, memory, &item.file_stem).await?;
}
if retained.is_empty() {
if resolved.is_empty() {
for file_name in ["MEMORY.md", "memory_summary.md"] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(path).await
@@ -61,18 +130,29 @@ pub(super) async fn sync_rollout_summaries_from_memories(
async fn rebuild_raw_memories_file(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
resolved: &[ResolvedRolloutSummary],
) -> std::io::Result<()> {
let retained = retained_memories(memories, max_raw_memories_for_global);
let mut body = String::from("# Raw Memories\n\n");
if retained.is_empty() {
if resolved.is_empty() {
body.push_str("No raw memories yet.\n");
return tokio::fs::write(raw_memories_file(root), body).await;
}
let memory_by_thread = memories
.iter()
.map(|memory| (memory.thread_id.to_string(), memory))
.collect::<HashMap<_, _>>();
body.push_str("Merged stage-1 raw memories (latest first):\n\n");
for memory in retained {
for item in resolved {
let Some(memory) = memory_by_thread.get(&item.thread_id.to_string()) else {
return Err(std::io::Error::other(format!(
"missing stage1 output for thread {} while rebuilding raw memories",
item.thread_id
)));
};
writeln!(body, "## Thread `{}`", memory.thread_id).map_err(raw_memories_format_error)?;
writeln!(
body,
@@ -81,6 +161,8 @@ async fn rebuild_raw_memories_file(
)
.map_err(raw_memories_format_error)?;
writeln!(body, "cwd: {}", memory.cwd.display()).map_err(raw_memories_format_error)?;
writeln!(body, "rollout_summary_file_name: {}", item.file_name)
.map_err(raw_memories_format_error)?;
writeln!(body).map_err(raw_memories_format_error)?;
body.push_str(memory.raw_memory.trim());
body.push_str("\n\n");
@@ -122,8 +204,8 @@ async fn prune_rollout_summaries(root: &Path, keep: &HashSet<String>) -> std::io
async fn write_rollout_summary_for_thread(
root: &Path,
memory: &Stage1Output,
file_stem: &str,
) -> std::io::Result<()> {
let file_stem = rollout_summary_file_stem(memory);
let path = rollout_summaries_dir(root).join(format!("{file_stem}.md"));
let mut body = String::new();
@@ -157,41 +239,48 @@ fn rollout_summary_format_error(err: std::fmt::Error) -> std::io::Error {
std::io::Error::other(format!("format rollout summary: {err}"))
}
fn rollout_summary_file_stem(memory: &Stage1Output) -> String {
const ROLLOUT_SLUG_MAX_LEN: usize = 20;
fn normalize_rollout_slug(raw_slug: Option<&str>) -> String {
const ROLLOUT_SLUG_MAX_LEN: usize = 60;
let thread_id = memory.thread_id.to_string();
let Some(raw_slug) = memory.rollout_slug.as_deref() else {
return thread_id;
};
let mut normalized = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
for ch in raw_slug.unwrap_or_default().chars() {
let mapped = if ch.is_ascii_alphanumeric() {
ch.to_ascii_lowercase()
} else if ch == '_' || ch == '-' {
ch
} else {
'_'
};
let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
for ch in raw_slug.chars() {
if slug.len() >= ROLLOUT_SLUG_MAX_LEN {
let mapped_is_sep = mapped == '_' || mapped == '-';
let prev_is_sep = normalized
.chars()
.last()
.is_some_and(|previous| previous == '_' || previous == '-');
if mapped_is_sep && prev_is_sep {
continue;
}
normalized.push(mapped);
if normalized.len() == ROLLOUT_SLUG_MAX_LEN {
break;
}
if ch.is_ascii_alphanumeric() {
slug.push(ch.to_ascii_lowercase());
} else {
slug.push('_');
}
}
while slug.ends_with('_') {
slug.pop();
}
if slug.is_empty() {
thread_id
let trimmed = normalized
.trim_matches(|ch| ch == '_' || ch == '-')
.to_string();
if trimmed.is_empty() {
"unknown".to_string()
} else {
format!("{thread_id}-{slug}")
trimmed
}
}
#[cfg(test)]
mod tests {
use super::rollout_summary_file_stem;
use super::normalize_rollout_slug;
use super::resolve_rollout_summary_files;
use chrono::TimeZone;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -199,43 +288,82 @@ mod tests {
use pretty_assertions::assert_eq;
use std::path::PathBuf;
fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output {
fn stage1_output_with_slug_and_path(
rollout_slug: Option<&str>,
rollout_path: &str,
) -> Stage1Output {
Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "summary".to_string(),
rollout_slug: rollout_slug.map(ToString::to_string),
rollout_summary_filename: None,
rollout_path: PathBuf::from(rollout_path),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
}
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() {
let memory = stage1_output_with_slug(None);
let thread_id = memory.thread_id.to_string();
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
fn normalize_rollout_slug_applies_capping_and_separator_rules() {
let value = normalize_rollout_slug(Some(
"--Unsafe Slug//With---Spaces&&Symbols____________________01234567890123456789012345",
));
assert_eq!(
value,
"unsafe_slug_with-spaces_symbols_01234567890123456789012345"
);
assert!(value.len() <= 60);
}
#[test]
fn rollout_summary_file_stem_sanitizes_and_truncates_slug() {
let memory =
stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345"));
let thread_id = memory.thread_id.to_string();
fn normalize_rollout_slug_uses_unknown_for_empty_result() {
assert_eq!(normalize_rollout_slug(Some("!!!")), "unknown");
assert_eq!(normalize_rollout_slug(Some("")), "unknown");
assert_eq!(normalize_rollout_slug(None), "unknown");
}
#[test]
fn resolve_rollout_summary_files_uses_timestamp_and_suffixes_collisions() {
let first = stage1_output_with_slug_and_path(
Some("Unsafe Slug/With Spaces & Symbols"),
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-00000000-0000-0000-0000-000000000001.jsonl",
);
let second = Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
raw_memory: "raw memory 2".to_string(),
rollout_summary: "summary 2".to_string(),
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols".to_string()),
rollout_summary_filename: None,
rollout_path: PathBuf::from(
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-00000000-0000-0000-0000-000000000002.jsonl",
),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(125, 0).single().expect("timestamp"),
};
let resolved = resolve_rollout_summary_files(&[first, second], 8);
assert_eq!(resolved.len(), 2);
assert_eq!(
rollout_summary_file_stem(&memory),
format!("{thread_id}-unsafe_slug_with_spa")
resolved[0].file_name,
"2026-02-17T19-22-07-unsafe_slug_with_spaces_symbols.md"
);
assert_eq!(
resolved[1].file_name,
"2026-02-17T19-22-07-unsafe_slug_with_spaces_symbols-2.md"
);
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() {
let memory = stage1_output_with_slug(Some(""));
let thread_id = memory.thread_id.to_string();
fn resolve_rollout_summary_files_falls_back_to_source_updated_at_when_rollout_timestamp_is_missing()
{
let memory =
stage1_output_with_slug_and_path(Some("alpha"), "sessions/rollout-not-parseable.jsonl");
let resolved = resolve_rollout_summary_files(&[memory], 8);
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].file_name, "1970-01-01T00-02-03-alpha.md");
}
}

View File

@@ -1,4 +1,5 @@
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::resolve_rollout_summary_files;
use super::storage::sync_rollout_summaries_from_memories;
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL;
use crate::memories::ensure_layout;
@@ -69,10 +70,25 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let keep_id = ThreadId::default().to_string();
let drop_id = ThreadId::default().to_string();
let keep_path = rollout_summaries_dir(&root).join(format!("{keep_id}.md"));
let drop_path = rollout_summaries_dir(&root).join(format!("{drop_id}.md"));
let keep_id = ThreadId::new();
let memories = vec![Stage1Output {
thread_id: keep_id,
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: None,
rollout_summary_filename: None,
rollout_path: PathBuf::from(format!(
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{keep_id}.jsonl"
)),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
let resolved =
resolve_rollout_summary_files(&memories, DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL);
let keep_path = rollout_summaries_dir(&root).join(&resolved[0].file_name);
let drop_path = rollout_summaries_dir(&root).join("obsolete-summary.md");
tokio::fs::write(&keep_path, "keep")
.await
.expect("write keep");
@@ -80,30 +96,12 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
.await
.expect("write drop");
let memories = vec![Stage1Output {
thread_id: ThreadId::try_from(keep_id.clone()).expect("thread id"),
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: None,
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("rebuild raw memories");
sync_rollout_summaries_from_memories(&root, &memories, &resolved)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &memories, &resolved)
.await
.expect("rebuild raw memories");
assert!(keep_path.is_file());
assert!(!drop_path.exists());
@@ -112,12 +110,16 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
.await
.expect("read raw memories");
assert!(raw_memories.contains("raw memory"));
assert!(raw_memories.contains(&keep_id));
assert!(raw_memories.contains(&keep_id.to_string()));
assert!(raw_memories.contains("cwd: /tmp/workspace"));
assert!(raw_memories.contains(&format!(
"rollout_summary_file_name: {}",
resolved[0].file_name
)));
}
#[tokio::test]
async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
async fn sync_rollout_summaries_uses_timestamp_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
@@ -139,17 +141,19 @@ async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()),
rollout_summary_filename: None,
rollout_path: PathBuf::from(format!(
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{thread_id}.jsonl"
)),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"),
}];
let resolved =
resolve_rollout_summary_files(&memories, DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL);
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("sync rollout summaries");
sync_rollout_summaries_from_memories(&root, &memories, &resolved)
.await
.expect("sync rollout summaries");
let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root))
.await
@@ -162,17 +166,22 @@ async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
assert_eq!(files.len(), 1);
let file_name = &files[0];
assert_eq!(file_name, &resolved[0].file_name);
let stem = file_name
.strip_suffix(".md")
.expect("rollout summary file should end with .md");
.expect("summary should end with .md");
assert!(
stem.starts_with("2026-02-17T19-22-07-"),
"rollout summary filename should include rollout timestamp"
);
let slug = stem
.strip_prefix(&format!("{thread_id}-"))
.expect("rollout summary filename should include thread id and slug");
assert!(slug.len() <= 20, "slug should be capped at 20 chars");
.strip_prefix("2026-02-17T19-22-07-")
.expect("summary stem should include timestamp prefix");
assert!(slug.len() <= 60, "slug should be capped at 60 chars");
assert!(
slug.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'),
"slug should be file-safe lowercase ascii with underscores"
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_' || ch == '-'),
"slug should be file-safe lowercase ascii with underscores/hyphens"
);
let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name))
@@ -220,13 +229,18 @@ mod phase2 {
use tempfile::TempDir;
fn stage1_output_with_source_updated_at(source_updated_at: i64) -> Stage1Output {
let thread_id = ThreadId::new();
Stage1Output {
thread_id: ThreadId::new(),
thread_id,
source_updated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at, 0)
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
rollout_slug: None,
rollout_summary_filename: None,
rollout_path: PathBuf::from(format!(
"sessions/2026/02/17/rollout-2026-02-17T19-22-07-{thread_id}.jsonl"
)),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at + 1, 0)
.expect("valid generated_at timestamp"),
@@ -281,7 +295,7 @@ mod phase2 {
thread_id,
self.config
.codex_home
.join(format!("rollout-{thread_id}.jsonl")),
.join(format!("rollout-2026-02-17T19-22-07-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
@@ -457,6 +471,55 @@ mod phase2 {
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_persists_rollout_summary_filename_mapping_and_raw_memories_header() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
let outputs = harness
.state_db
.list_stage1_outputs_for_global(10)
.await
.expect("list stage1 outputs");
assert_eq!(outputs.len(), 1);
let file_name = outputs[0]
.rollout_summary_filename
.clone()
.expect("phase2 should persist rollout summary filename");
assert!(
file_name.starts_with("2026-02-17T19-22-07-"),
"filename should include rollout timestamp"
);
let lookup = harness
.state_db
.get_stage1_output_by_rollout_summary_filename(file_name.as_str())
.await
.expect("lookup stage1 output by rollout summary filename")
.expect("lookup should resolve persisted filename");
assert_eq!(lookup.thread_id, outputs[0].thread_id);
assert_eq!(lookup.rollout_summary_filename, Some(file_name.clone()));
let root = memory_root(&harness.config.codex_home);
let raw_memories = tokio::fs::read_to_string(raw_memories_file(&root))
.await
.expect("read raw memories");
assert!(
raw_memories.contains(&format!("rollout_summary_file_name: {file_name}")),
"raw_memories.md should include rollout summary filename header"
);
assert!(
tokio::fs::try_exists(rollout_summaries_dir(&root).join(&file_name))
.await
.expect("check rollout summary file"),
"rollout summary file should exist under resolved filename"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
let harness = DispatchHarness::new().await;

View File

@@ -25,7 +25,7 @@ Folder structure (under {{ memory_root }}/):
- Temporary file: merged raw memories from Phase 1. Input for Phase 2.
- skills/<skill-name>/
- Reusable procedures. Entrypoint: SKILL.md; may include scripts/, templates/, examples/.
- rollout_summaries/<rollout_slug>.md
- rollout_summaries/<timestamp>-<slug>.md
- Recap of the rollout, including lessons learned, reusable knowledge,
pointers/references, and pruned raw evidence snippets. Distilled version of
everything valuable from the raw rollout.

View File

@@ -235,7 +235,6 @@ shows or why it matters>:
The schema is below.
---
rollout_summary_file: <file.md>
description: brief description of the task and outcome
keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo concepts, contracts)>
---

View File

@@ -0,0 +1,6 @@
ALTER TABLE stage1_outputs
ADD COLUMN rollout_summary_filename TEXT;
CREATE UNIQUE INDEX idx_stage1_outputs_rollout_summary_filename
ON stage1_outputs(rollout_summary_filename)
WHERE rollout_summary_filename IS NOT NULL;

View File

@@ -16,6 +16,8 @@ pub struct Stage1Output {
pub raw_memory: String,
pub rollout_summary: String,
pub rollout_slug: Option<String>,
pub rollout_summary_filename: Option<String>,
pub rollout_path: PathBuf,
pub cwd: PathBuf,
pub generated_at: DateTime<Utc>,
}
@@ -27,6 +29,8 @@ pub(crate) struct Stage1OutputRow {
raw_memory: String,
rollout_summary: String,
rollout_slug: Option<String>,
rollout_summary_filename: Option<String>,
rollout_path: String,
cwd: String,
generated_at: i64,
}
@@ -39,6 +43,8 @@ impl Stage1OutputRow {
raw_memory: row.try_get("raw_memory")?,
rollout_summary: row.try_get("rollout_summary")?,
rollout_slug: row.try_get("rollout_slug")?,
rollout_summary_filename: row.try_get("rollout_summary_filename")?,
rollout_path: row.try_get("rollout_path")?,
cwd: row.try_get("cwd")?,
generated_at: row.try_get("generated_at")?,
})
@@ -55,6 +61,8 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
raw_memory: row.raw_memory,
rollout_summary: row.rollout_summary,
rollout_slug: row.rollout_slug,
rollout_summary_filename: row.rollout_summary_filename,
rollout_path: PathBuf::from(row.rollout_path),
cwd: PathBuf::from(row.cwd),
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})

View File

@@ -2120,10 +2120,20 @@ WHERE kind = 'memory_stage1'
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].rollout_summary, "summary b");
assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b"));
assert_eq!(outputs[0].rollout_summary_filename, None);
assert_eq!(
outputs[0].rollout_path,
codex_home.join(format!("rollout-{thread_id_b}.jsonl"))
);
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
assert_eq!(outputs[1].thread_id, thread_id_a);
assert_eq!(outputs[1].rollout_summary, "summary a");
assert_eq!(outputs[1].rollout_slug, None);
assert_eq!(outputs[1].rollout_summary_filename, None);
assert_eq!(
outputs[1].rollout_path,
codex_home.join(format!("rollout-{thread_id_a}.jsonl"))
);
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
@@ -2193,11 +2203,304 @@ VALUES (?, ?, ?, ?, ?)
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].thread_id, thread_id_non_empty);
assert_eq!(outputs[0].rollout_summary, "summary");
assert_eq!(outputs[0].rollout_summary_filename, None);
assert_eq!(
outputs[0].rollout_path,
codex_home.join(format!("rollout-{thread_id_non_empty}.jsonl"))
);
assert_eq!(outputs[0].cwd, codex_home.join("workspace-non-empty"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn set_rollout_summary_filenames_for_global_updates_and_clears_stale_rows() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let thread_id_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id_a,
codex_home.join("workspace-a"),
))
.await
.expect("upsert thread a");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id_b,
codex_home.join("workspace-b"),
))
.await
.expect("upsert thread b");
let claim = runtime
.try_claim_stage1_job(thread_id_a, owner, 100, 3600, 64)
.await
.expect("claim stage1 a");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id_a,
ownership_token.as_str(),
100,
"raw memory a",
"summary a",
None,
)
.await
.expect("mark stage1 succeeded a"),
"stage1 success should persist output a"
);
let claim = runtime
.try_claim_stage1_job(thread_id_b, owner, 101, 3600, 64)
.await
.expect("claim stage1 b");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id_b,
ownership_token.as_str(),
101,
"raw memory b",
"summary b",
None,
)
.await
.expect("mark stage1 succeeded b"),
"stage1 success should persist output b"
);
runtime
.set_rollout_summary_filenames_for_global(&[
(thread_id_a, "2026-02-17T19-22-07-alpha.md".to_string()),
(thread_id_b, "2026-02-17T19-22-08-beta.md".to_string()),
])
.await
.expect("set rollout summary filenames");
let first = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list outputs after first filename update");
assert_eq!(first.len(), 2);
assert_eq!(
first[0].rollout_summary_filename.as_deref(),
Some("2026-02-17T19-22-08-beta.md")
);
assert_eq!(
first[1].rollout_summary_filename.as_deref(),
Some("2026-02-17T19-22-07-alpha.md")
);
runtime
.set_rollout_summary_filenames_for_global(&[(
thread_id_b,
"2026-02-17T19-22-08-beta-renamed.md".to_string(),
)])
.await
.expect("set rollout summary filename for retained row only");
let second = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list outputs after clearing stale filename");
assert_eq!(second.len(), 2);
assert_eq!(
second[0].rollout_summary_filename.as_deref(),
Some("2026-02-17T19-22-08-beta-renamed.md")
);
assert_eq!(second[1].rollout_summary_filename, None);
runtime
.set_rollout_summary_filenames_for_global(&[])
.await
.expect("clear rollout summary filenames");
let cleared = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list outputs after clearing all filenames");
assert_eq!(cleared[0].rollout_summary_filename, None);
assert_eq!(cleared[1].rollout_summary_filename, None);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn get_stage1_output_by_rollout_summary_filename_returns_matching_row() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join("workspace-a"),
))
.await
.expect("upsert thread");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
.await
.expect("claim stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw memory",
"summary",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should persist output"
);
let file_name = "2026-02-17T19-22-07-summary.md".to_string();
runtime
.set_rollout_summary_filenames_for_global(&[(thread_id, file_name.clone())])
.await
.expect("set rollout summary filename");
let output = runtime
.get_stage1_output_by_rollout_summary_filename(file_name.as_str())
.await
.expect("lookup by rollout summary filename")
.expect("row should exist");
assert_eq!(output.thread_id, thread_id);
assert_eq!(output.rollout_summary, "summary");
assert_eq!(
output.rollout_summary_filename.as_deref(),
Some(file_name.as_str())
);
assert_eq!(
output.rollout_path,
codex_home.join(format!("rollout-{thread_id}.jsonl"))
);
let missing = runtime
.get_stage1_output_by_rollout_summary_filename("missing.md")
.await
.expect("lookup missing rollout summary filename");
assert_eq!(missing, None);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn set_rollout_summary_filenames_for_global_rejects_duplicate_file_names() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let thread_id_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id_a,
codex_home.join("workspace-a"),
))
.await
.expect("upsert thread a");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id_b,
codex_home.join("workspace-b"),
))
.await
.expect("upsert thread b");
let claim = runtime
.try_claim_stage1_job(thread_id_a, owner, 100, 3600, 64)
.await
.expect("claim stage1 a");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id_a,
ownership_token.as_str(),
100,
"raw memory a",
"summary a",
None,
)
.await
.expect("mark stage1 succeeded a"),
"stage1 success should persist output a"
);
let claim = runtime
.try_claim_stage1_job(thread_id_b, owner, 101, 3600, 64)
.await
.expect("claim stage1 b");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id_b,
ownership_token.as_str(),
101,
"raw memory b",
"summary b",
None,
)
.await
.expect("mark stage1 succeeded b"),
"stage1 success should persist output b"
);
let result = runtime
.set_rollout_summary_filenames_for_global(&[
(thread_id_a, "2026-02-17T19-22-07-duplicate.md".to_string()),
(thread_id_b, "2026-02-17T19-22-07-duplicate.md".to_string()),
])
.await;
assert!(
result.is_err(),
"duplicate filenames should violate the unique index"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn mark_stage1_job_succeeded_enqueues_global_consolidation() {
let codex_home = unique_temp_dir();

View File

@@ -197,7 +197,9 @@ SELECT
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.rollout_summary_filename,
so.generated_at
, COALESCE(t.rollout_path, '') AS rollout_path
, COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
@@ -216,6 +218,98 @@ LIMIT ?
.collect::<Result<Vec<_>, _>>()
}
/// Persists the current rollout summary filename mapping for retained rows.
///
/// Query behavior:
/// - clears `rollout_summary_filename` for rows not in `rows`
/// - updates each retained row to its current emitted filename
pub async fn set_rollout_summary_filenames_for_global(
&self,
rows: &[(ThreadId, String)],
) -> anyhow::Result<()> {
let rows = rows
.iter()
.map(|(thread_id, file_name)| (thread_id.to_string(), file_name.clone()))
.collect::<Vec<_>>();
let mut tx = self.pool.begin().await?;
if rows.is_empty() {
sqlx::query(
r#"
UPDATE stage1_outputs
SET rollout_summary_filename = NULL
"#,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
return Ok(());
}
let mut clear_builder = QueryBuilder::<Sqlite>::new(
r#"
UPDATE stage1_outputs
SET rollout_summary_filename = NULL
WHERE thread_id NOT IN (
"#,
);
let mut separated = clear_builder.separated(", ");
for (thread_id, _) in &rows {
separated.push_bind(thread_id.as_str());
}
separated.push_unseparated("\n)");
clear_builder.build().execute(&mut *tx).await?;
for (thread_id, file_name) in &rows {
sqlx::query(
r#"
UPDATE stage1_outputs
SET rollout_summary_filename = ?
WHERE thread_id = ?
"#,
)
.bind(file_name)
.bind(thread_id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
/// Finds one stage-1 output row by rollout summary filename.
pub async fn get_stage1_output_by_rollout_summary_filename(
&self,
file_name: &str,
) -> anyhow::Result<Option<Stage1Output>> {
let row = sqlx::query(
r#"
SELECT
so.thread_id,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.rollout_summary_filename,
so.generated_at
, COALESCE(t.rollout_path, '') AS rollout_path
, COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE so.rollout_summary_filename = ?
LIMIT 1
"#,
)
.bind(file_name)
.fetch_optional(self.pool.as_ref())
.await?;
row.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
.transpose()
}
/// Attempts to claim a stage-1 job for a thread at `source_updated_at`.
///
/// Claim semantics:
@@ -463,13 +557,15 @@ INSERT INTO stage1_outputs (
raw_memory,
rollout_summary,
rollout_slug,
rollout_summary_filename,
generated_at
) VALUES (?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, NULL, ?)
ON CONFLICT(thread_id) DO UPDATE SET
source_updated_at = excluded.source_updated_at,
raw_memory = excluded.raw_memory,
rollout_summary = excluded.rollout_summary,
rollout_slug = excluded.rollout_slug,
rollout_summary_filename = excluded.rollout_summary_filename,
generated_at = excluded.generated_at
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
"#,