Files
codex/codex-rs/app-server/tests/suite/v2/thread_start.rs
Eric Traut e6ade2d748 Materialize rollout state on-demand for v2 thread APIs
### Change summary

Defer rollout file creation until needed.
* Add a core API to force rollout persistence for loaded non-ephemeral threads:
* seeds initial context if needed
* flushes rollout and returns persisted path

Add concurrency guard to make lazy rollout initialization idempotent under concurrent calls.

Add centralized app-server rollout-path resolver that:
* uses in-memory thread state when loaded
* forces persistence on demand for rollout-dependent calls
* falls back to on-disk lookup for unloaded threads
* maps ephemeral threads to invalid-request errors for rollout-dependent operations

Route rollout-dependent endpoints through the resolver (v2 + shared legacy surfaces), including:
* thread/archive
* thread/resume (thread-id path)
* thread/fork (thread-id path)
* resumeConversation
* forkConversation
* thread summary by thread id
* detached review parent-thread path resolution
* feedback include_logs rollout resolution

Remove stale cached rollout-path assumptions in rollback/detached-review flows by resolving via thread id when needed.

No wire-schema changes; behavior-only change.

v1 compatibility is not expanded in this PR.

### Tests updated/added

* thread_start: assert rollout is absent immediately after thread/start; created after first completed turn.
* thread_resume: resume by thread id succeeds for just-started thread via on-demand persistence; path-vs-thread-id precedence test updated.
* thread_fork: fork by thread id succeeds for just-started thread.
* thread_archive: archive succeeds for just-started thread and materializes before archive.
* thread_unarchive: adjusted for deferred creation timing.
* thread_rollback: rollback path no longer depends on stale cached rollout path.
* Detached review targeted test verified for lazy path behavior.
* Core tests for new persistence API
2026-02-06 11:19:11 -08:00

241 lines
7.3 KiB
Rust

use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::config::set_project_trust_level;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::openai_models::ReasoningEffort;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
// Start server and initialize.
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a v2 thread with an explicit model override.
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1".to_string()),
..Default::default()
})
.await?;
// Expect a proper JSON-RPC response with a thread id.
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse {
thread,
model_provider,
..
} = to_response::<ThreadStartResponse>(resp)?;
assert!(!thread.id.is_empty(), "thread id should not be empty");
assert!(
thread.preview.is_empty(),
"new threads should start with an empty preview"
);
assert_eq!(model_provider, "mock_provider");
assert!(
thread.created_at > 0,
"created_at should be a positive UNIX timestamp"
);
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_none(),
"fresh threads should not create rollout files until first turn"
);
// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let started: ThreadStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread, thread);
// First turn should create the rollout file lazily.
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let _: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_some(),
"first completed turn should create rollout file"
);
Ok(())
}
#[tokio::test]
async fn thread_start_respects_project_config_from_cwd() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let workspace = TempDir::new()?;
let project_config_dir = workspace.path().join(".codex");
std::fs::create_dir_all(&project_config_dir)?;
std::fs::write(
project_config_dir.join("config.toml"),
r#"
model_reasoning_effort = "high"
"#,
)?;
set_project_trust_level(codex_home.path(), workspace.path(), TrustLevel::Trusted)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
cwd: Some(workspace.path().to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse {
reasoning_effort, ..
} = to_response::<ThreadStartResponse>(resp)?;
assert_eq!(reasoning_effort, Some(ReasoningEffort::High));
Ok(())
}
#[tokio::test]
async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_with_required_broken_mcp(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(req_id)),
)
.await??;
assert!(
err.error
.message
.contains("required MCP servers failed to initialize"),
"unexpected error message: {}",
err.error.message
);
assert!(
err.error.message.contains("required_broken"),
"unexpected error message: {}",
err.error.message
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}
fn create_config_toml_with_required_broken_mcp(
codex_home: &Path,
server_uri: &str,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
[mcp_servers.required_broken]
command = "codex-definitely-not-a-real-binary"
required = true
"#
),
)
}