feat: new memory prompts (#11439)

* Update prompt
* Wire CWD in the prompt
* Handle the no-output case
This commit is contained in:
jif-oai
2026-02-11 13:57:52 +00:00
committed by GitHub
parent 8b7f8af343
commit f5d4a21098
14 changed files with 581 additions and 142 deletions

View File

@@ -22,8 +22,6 @@ use std::path::PathBuf;
const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation";
const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
const SKILLS_SUBDIR: &str = "skills";
/// Maximum number of rollout candidates processed per startup pass.
const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
@@ -55,6 +53,12 @@ struct StageOneOutput {
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
rollout_summary: String,
/// Optional slug accepted from stage-1 output for forward compatibility.
///
/// This is currently ignored by downstream storage and naming, which remain
/// thread-id based.
#[serde(default, rename = "rollout_slug")]
_rollout_slug: Option<String>,
}
fn memory_root(codex_home: &Path) -> PathBuf {

View File

@@ -5,7 +5,8 @@ use tracing::warn;
use super::text::prefix_at_char_boundary;
use super::text::suffix_at_char_boundary;
const MAX_ROLLOUT_BYTES_FOR_PROMPT: usize = 1_000_000;
// TODO(jif) use proper truncation
const MAX_ROLLOUT_BYTES_FOR_PROMPT: usize = 100_000;
#[derive(Template)]
#[template(path = "memories/consolidation.md", escape = "none")]
@@ -17,6 +18,7 @@ struct ConsolidationPromptTemplate<'a> {
#[template(path = "memories/stage_one_input.md", escape = "none")]
struct StageOneInputTemplate<'a> {
rollout_path: &'a str,
rollout_cwd: &'a str,
rollout_contents: &'a str,
}
@@ -42,7 +44,11 @@ pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
///
/// Large rollout payloads are truncated to a bounded byte budget while keeping
/// both head and tail context.
pub(super) fn build_stage_one_input_message(rollout_path: &Path, rollout_contents: &str) -> String {
pub(super) fn build_stage_one_input_message(
rollout_path: &Path,
rollout_cwd: &Path,
rollout_contents: &str,
) -> String {
let (rollout_contents, truncated) = truncate_rollout_for_prompt(rollout_contents);
if truncated {
warn!(
@@ -53,16 +59,20 @@ pub(super) fn build_stage_one_input_message(rollout_path: &Path, rollout_content
}
let rollout_path = rollout_path.display().to_string();
let rollout_cwd = rollout_cwd.display().to_string();
let template = StageOneInputTemplate {
rollout_path: &rollout_path,
rollout_cwd: &rollout_cwd,
rollout_contents: &rollout_contents,
};
// TODO(jif) use askama
match template.render() {
Ok(prompt) => prompt,
Err(err) => {
warn!("failed to render memories stage-one input template: {err}");
include_str!("../../templates/memories/stage_one_input.md")
.replace("{{ rollout_path }}", &rollout_path)
.replace("{{ rollout_cwd }}", &rollout_cwd)
.replace("{{ rollout_contents }}", &rollout_contents)
}
}

View File

@@ -29,6 +29,7 @@ pub(super) fn stage_one_output_schema() -> Value {
"type": "object",
"properties": {
"rollout_summary": { "type": "string" },
"rollout_slug": { "type": "string" },
"raw_memory": { "type": "string" }
},
"required": ["rollout_summary", "raw_memory"],
@@ -96,6 +97,15 @@ fn parse_json_object_loose(raw: &str) -> Result<Value> {
fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutput> {
output.raw_memory = output.raw_memory.trim().to_string();
output.rollout_summary = output.rollout_summary.trim().to_string();
output._rollout_slug = output
._rollout_slug
.map(|slug| slug.trim().to_string())
.filter(|slug| !slug.is_empty());
if output.raw_memory.is_empty() && output.rollout_summary.is_empty() {
// Empty pair is a deliberate "no meaningful signal" sentinel.
return Ok(output);
}
if output.raw_memory.is_empty() {
return Err(CodexErr::InvalidRequest(
@@ -189,6 +199,7 @@ mod tests {
let output = StageOneOutput {
raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(),
rollout_summary: "password = mysecret123456\n\nsmall".to_string(),
_rollout_slug: None,
};
let normalized = normalize_stage_one_output(output).expect("normalized");
@@ -210,4 +221,29 @@ mod tests {
assert!(normalized.contains("Outcome: uncertain"));
assert!(normalized.contains("loose notes only"));
}
#[test]
fn normalize_stage_one_output_allows_empty_pair_for_skip() {
let output = StageOneOutput {
raw_memory: String::new(),
rollout_summary: String::new(),
_rollout_slug: None,
};
let normalized = normalize_stage_one_output(output).expect("normalized");
assert_eq!(normalized.raw_memory, "");
assert_eq!(normalized.rollout_summary, "");
}
#[test]
fn normalize_stage_one_output_rejects_partial_empty_values() {
let output = StageOneOutput {
raw_memory: String::new(),
rollout_summary: "summary".to_string(),
_rollout_slug: None,
};
let err = normalize_stage_one_output(output).expect_err("should reject");
assert_eq!(err.to_string(), "stage-1 memory output missing raw_memory");
}
}

View File

@@ -16,9 +16,20 @@ use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS;
use super::super::prompts::build_consolidation_prompt;
use super::super::storage::rebuild_raw_memories_file_from_memories;
use super::super::storage::sync_rollout_summaries_from_memories;
use super::super::storage::wipe_consolidation_outputs;
use super::phase2::spawn_phase2_completion_task;
fn completion_watermark(
claimed_watermark: i64,
latest_memories: &[codex_state::Stage1Output],
) -> i64 {
latest_memories
.iter()
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark)
.max(claimed_watermark)
}
pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
@@ -70,27 +81,14 @@ pub(super) async fn run_global_memory_consolidation(
return false;
}
};
if latest_memories.is_empty() {
debug!("memory phase-2 has no stage-1 outputs; skipping global consolidation");
let _ = state_db
.mark_global_phase2_job_succeeded(&ownership_token, claimed_watermark)
.await;
return false;
};
let root = memory_root(&config.codex_home);
let materialized_watermark = latest_memories
.iter()
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark);
let completion_watermark = completion_watermark(claimed_watermark, &latest_memories);
if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await {
warn!("failed syncing phase-1 rollout summaries for global consolidation: {err}");
warn!("failed syncing local memory artifacts for global consolidation: {err}");
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed syncing phase-1 rollout summaries",
"failed syncing local memory artifacts",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.await;
@@ -108,15 +106,10 @@ pub(super) async fn run_global_memory_consolidation(
.await;
return false;
}
if let Err(err) = wipe_consolidation_outputs(&root).await {
warn!("failed to wipe previous global consolidation outputs: {err}");
if latest_memories.is_empty() {
debug!("memory phase-2 has no stage-1 outputs; finalized local memory artifacts");
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed to wipe previous consolidation outputs",
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
)
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await;
return false;
}
@@ -145,7 +138,7 @@ pub(super) async fn run_global_memory_consolidation(
spawn_phase2_completion_task(
session.as_ref(),
ownership_token,
materialized_watermark,
completion_watermark,
consolidation_agent_id,
);
true
@@ -166,6 +159,8 @@ pub(super) async fn run_global_memory_consolidation(
#[cfg(test)]
mod tests {
use super::completion_watermark;
use super::memory_root;
use super::run_global_memory_consolidation;
use crate::CodexAuth;
use crate::ThreadManager;
@@ -174,11 +169,14 @@ mod tests {
use crate::codex::make_session_and_context;
use crate::config::Config;
use crate::config::test_config;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_state::Phase2JobClaimOutcome;
use codex_state::Stage1Output;
use codex_state::ThreadMetadataBuilder;
use pretty_assertions::assert_eq;
use std::sync::Arc;
@@ -291,6 +289,22 @@ mod tests {
}
}
#[test]
fn completion_watermark_never_regresses_below_claimed_input_watermark() {
let stage1_output = Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: chrono::DateTime::<Utc>::from_timestamp(123, 0)
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
generated_at: chrono::DateTime::<Utc>::from_timestamp(124, 0)
.expect("valid generated_at timestamp"),
};
let completion = completion_watermark(1_000, &[stage1_output]);
assert_eq!(completion, 1_000);
}
#[tokio::test]
async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() {
let harness = DispatchHarness::new().await;
@@ -379,6 +393,94 @@ mod tests {
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
tokio::fs::create_dir_all(&summaries_dir)
.await
.expect("create rollout summaries dir");
let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new()));
tokio::fs::write(&stale_summary_path, "stale summary\n")
.await
.expect("write stale rollout summary");
let raw_memories_path = raw_memories_file(&root);
tokio::fs::write(&raw_memories_path, "stale raw memories\n")
.await
.expect("write stale raw memories");
let memory_index_path = root.join("MEMORY.md");
tokio::fs::write(&memory_index_path, "stale memory index\n")
.await
.expect("write stale memory index");
let memory_summary_path = root.join("memory_summary.md");
tokio::fs::write(&memory_summary_path, "stale memory summary\n")
.await
.expect("write stale memory summary");
let stale_skill_file = root.join("skills/demo/SKILL.md");
tokio::fs::create_dir_all(
stale_skill_file
.parent()
.expect("skills subdirectory parent should exist"),
)
.await
.expect("create stale skills dir");
tokio::fs::write(&stale_skill_file, "stale skill\n")
.await
.expect("write stale skill");
harness
.state_db
.enqueue_global_consolidation(999)
.await
.expect("enqueue global consolidation");
let scheduled =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(
!scheduled,
"dispatch should skip subagent spawn when no stage-1 outputs are available"
);
assert!(
!tokio::fs::try_exists(&stale_summary_path)
.await
.expect("check stale summary existence"),
"empty consolidation should prune stale rollout summary files"
);
let raw_memories = tokio::fs::read_to_string(&raw_memories_path)
.await
.expect("read rebuilt raw memories");
assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n");
assert!(
!tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"empty consolidation should remove stale MEMORY.md"
);
assert!(
!tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"empty consolidation should remove stale memory_summary.md"
);
assert!(
!tokio::fs::try_exists(&stale_skill_file)
.await
.expect("check stale skill existence"),
"empty consolidation should remove stale skills artifacts"
);
assert!(
!tokio::fs::try_exists(root.join("skills"))
.await
.expect("check skills dir existence"),
"empty consolidation should remove stale skills directory"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() {
let codex_home = tempfile::tempdir().expect("create temp codex home");

View File

@@ -24,6 +24,7 @@ use std::path::Path;
pub(super) async fn extract_stage_one_output(
session: &Session,
rollout_path: &Path,
rollout_cwd: &Path,
stage_one_context: &StageOneRequestContext,
) -> Result<StageOneOutput, &'static str> {
let (rollout_items, _thread_id, parse_errors) =
@@ -63,7 +64,7 @@ pub(super) async fn extract_stage_one_output(
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: build_stage_one_input_message(rollout_path, &rollout_contents),
text: build_stage_one_input_message(rollout_path, rollout_cwd, &rollout_contents),
}],
end_turn: None,
phase: None,

View File

@@ -127,6 +127,7 @@ pub(super) async fn run_memories_startup_pipeline(
let stage_one_output = match extract::extract_stage_one_output(
session.as_ref(),
&thread.rollout_path,
&thread.cwd,
&stage_one_context,
)
.await
@@ -151,6 +152,15 @@ pub(super) async fn run_memories_startup_pipeline(
return false;
};
if stage_one_output.raw_memory.is_empty()
&& stage_one_output.rollout_summary.is_empty()
{
return state_db
.mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token)
.await
.unwrap_or(false);
}
state_db
.mark_stage1_job_succeeded(
thread.id,

View File

@@ -109,7 +109,12 @@ async fn run_phase2_completion_task(
}
};
if is_phase2_success(&final_status) {
let phase2_success = is_phase2_success(&final_status);
info!(
"memory phase-2 global consolidation complete: agent_id={consolidation_agent_id} success={phase2_success} final_status={final_status:?}"
);
if phase2_success {
match state_db
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await
@@ -126,9 +131,6 @@ async fn run_phase2_completion_task(
);
}
}
info!(
"memory phase-2 global consolidation agent finished: agent_id={consolidation_agent_id} final_status={final_status:?}"
);
return;
}

View File

@@ -5,8 +5,6 @@ use std::path::Path;
use tracing::warn;
use super::MAX_RAW_MEMORIES_FOR_GLOBAL;
use super::MEMORY_REGISTRY_FILENAME;
use super::SKILLS_SUBDIR;
use super::ensure_layout;
use super::raw_memories_file;
use super::rollout_summaries_dir;
@@ -38,34 +36,26 @@ pub(super) async fn sync_rollout_summaries_from_memories(
.collect::<BTreeSet<_>>();
prune_rollout_summaries(root, &keep).await?;
for memory in retained {
for memory in &retained {
write_rollout_summary_for_thread(root, memory).await?;
}
Ok(())
}
/// Clears consolidation outputs so a fresh consolidation run can regenerate them.
///
/// Phase-1 artifacts (`rollout_summaries/` and `raw_memories.md`) are preserved.
pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<()> {
let path = root.join(MEMORY_REGISTRY_FILENAME);
if let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed removing consolidation file {}: {err}",
path.display()
);
}
if retained.is_empty() {
for file_name in ["MEMORY.md", "memory_summary.md"] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
return Err(err);
}
}
let skills_dir = root.join(SKILLS_SUBDIR);
if let Err(err) = tokio::fs::remove_dir_all(&skills_dir).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed removing consolidation skills directory {}: {err}",
skills_dir.display()
);
let skills_dir = root.join("skills");
if let Err(err) = tokio::fs::remove_dir_all(skills_dir).await
&& err.kind() != std::io::ErrorKind::NotFound
{
return Err(err);
}
}
Ok(())

View File

@@ -4,7 +4,6 @@ use super::rollout::serialize_filtered_rollout_response_items;
use super::stage_one::parse_stage_one_output;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use super::storage::wipe_consolidation_outputs;
use crate::memories::ensure_layout;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
@@ -41,6 +40,23 @@ fn parse_stage_one_output_rejects_legacy_keys() {
assert!(parse_stage_one_output(raw).is_err());
}
#[test]
fn parse_stage_one_output_accepts_empty_pair_for_skip() {
let raw = r#"{"raw_memory":"","rollout_summary":""}"#;
let parsed = parse_stage_one_output(raw).expect("parsed");
assert_eq!(parsed.raw_memory, "");
assert_eq!(parsed.rollout_summary, "");
}
#[test]
fn parse_stage_one_output_accepts_optional_rollout_slug() {
let raw = r#"{"raw_memory":"abc","rollout_summary":"short","rollout_slug":"my-slug"}"#;
let parsed = parse_stage_one_output(raw).expect("parsed");
assert!(parsed.raw_memory.contains("abc"));
assert_eq!(parsed.rollout_summary, "short");
assert_eq!(parsed._rollout_slug, Some("my-slug".to_string()));
}
#[test]
fn serialize_filtered_rollout_response_items_keeps_response_and_compacted() {
let input = vec![
@@ -182,27 +198,3 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(raw_memories.contains("raw memory"));
assert!(raw_memories.contains(&keep_id));
}
#[tokio::test]
async fn wipe_consolidation_outputs_removes_registry_and_skills() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let memory_registry = root.join("MEMORY.md");
let skills_dir = root.join("skills").join("example");
tokio::fs::create_dir_all(&skills_dir)
.await
.expect("create skills dir");
tokio::fs::write(&memory_registry, "memory")
.await
.expect("write memory registry");
wipe_consolidation_outputs(&root)
.await
.expect("wipe consolidation outputs");
assert!(!memory_registry.exists());
assert!(!root.join("skills").exists());
}