Materialize rollout state on-demand for v2 thread APIs

### Change summary

Defer rollout file creation until needed.
* Add a core API to force rollout persistence for loaded non-ephemeral threads:
* seeds initial context if needed
* flushes rollout and returns persisted path

Add concurrency guard to make lazy rollout initialization idempotent under concurrent calls.

Add centralized app-server rollout-path resolver that:
* uses in-memory thread state when loaded
* forces persistence on demand for rollout-dependent calls
* falls back to on-disk lookup for unloaded threads
* maps ephemeral threads to invalid-request errors for rollout-dependent operations

Route rollout-dependent endpoints through the resolver (v2 + shared legacy surfaces), including:
* thread/archive
* thread/resume (thread-id path)
* thread/fork (thread-id path)
* resumeConversation
* forkConversation
* thread summary by thread id
* detached review parent-thread path resolution
* feedback include_logs rollout resolution

Remove stale cached rollout-path assumptions in rollback/detached-review flows by resolving via thread id when needed.

No wire-schema changes; behavior-only change.

v1 compatibility is not expanded in this PR.

### Tests updated/added

* thread_start: assert rollout is absent immediately after thread/start; created after first completed turn.
* thread_resume: resume by thread id succeeds for just-started thread via on-demand persistence; path-vs-thread-id precedence test updated.
* thread_fork: fork by thread id succeeds for just-started thread.
* thread_archive: archive succeeds for just-started thread and materializes before archive.
* thread_unarchive: adjusted for deferred creation timing.
* thread_rollback: rollback path no longer depends on stale cached rollout path.
* Detached review targeted test verified for lazy path behavior.
* Core tests for new persistence API
This commit is contained in:
Eric Traut
2026-02-06 11:19:11 -08:00
11 changed files with 688 additions and 230 deletions

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -18,7 +19,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test]
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let server = create_mock_responses_server_repeating_assistant("Done").await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -38,16 +40,6 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(!thread.id.is_empty());
// Locate the rollout path recorded for this thread id.
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
assert!(
rollout_path.exists(),
"expected {} to exist",
rollout_path.display()
);
// Archive the thread.
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
@@ -61,16 +53,17 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
// Locate the rollout path recorded for this thread id.
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_none(),
"archived thread should no longer have an active rollout path"
);
// Verify file moved.
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
// The archived file keeps the original filename (rollout-...-<id>.jsonl).
let archived_rollout_path =
archived_directory.join(rollout_path.file_name().expect("rollout file name"));
assert!(
!rollout_path.exists(),
"expected rollout path {} to be moved",
rollout_path.display()
);
find_archived_rollout_by_thread_id(&archived_directory, &thread.id)?;
assert!(
archived_rollout_path.exists(),
"expected archived rollout path {} to exist",
@@ -80,14 +73,45 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
std::fs::write(config_toml, config_contents(server_uri))
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
fn config_contents(server_uri: &str) -> String {
format!(
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
)
}
fn find_archived_rollout_by_thread_id(
archived_directory: &Path,
thread_id: &str,
) -> std::io::Result<std::path::PathBuf> {
let entries = std::fs::read_dir(archived_directory)?;
for entry in entries {
let path = entry?.path();
if let Some(file_name) = path.file_name().and_then(|name| name.to_str())
&& file_name.ends_with(&format!("{thread_id}.jsonl"))
{
return Ok(path);
}
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("no archived rollout found for thread id {thread_id}"),
))
}