mirror of
https://github.com/openai/codex.git
synced 2026-05-04 13:21:54 +03:00
Adds the core runtime behavior for active goals on top of the model tools from PR 3. ## Why A long-running goal should be a core runtime concern, not something every client has to implement. Core owns the turn lifecycle, tool completion boundaries, interruptions, resume behavior, and token usage, so it is the right place to account progress, enforce budgets, and decide when to continue work. ## What changed - Centralized goal lifecycle side effects behind `Session::goal_runtime_apply(GoalRuntimeEvent::...)`. - Starts goal continuation turns only when the session is idle; pending user input and mailbox work take priority. - Accounts token and wall-clock usage at turn, tool, mutation, interrupt, and resume boundaries; `get_thread_goal` remains read-only. - Preserves sub-second wall-clock remainder across accounting boundaries so long-running goals do not drift downward over time. - Treats token budget exhaustion as a soft stop by marking the goal `budget_limited` and injecting wrap-up steering instead of aborting the active turn. - Suppresses budget steering when `update_goal` marks a goal complete. - Pauses active goals on interrupt and auto-reactivates paused goals when a thread resumes outside plan mode. - Suppresses repeated automatic continuation when a continuation turn makes no tool calls. - Added continuation and budget-limit prompt templates. ## Verification - Added focused core coverage for continuation scheduling, accounting boundaries, budget-limit steering, completion accounting, interrupt pause behavior, resume auto-activation, and wall-clock remainder accounting.
3008 lines
99 KiB
Rust
3008 lines
99 KiB
Rust
use anyhow::Result;
|
|
use app_test_support::ChatGptAuthFixture;
|
|
use app_test_support::McpProcess;
|
|
use app_test_support::create_apply_patch_sse_response;
|
|
use app_test_support::create_fake_rollout;
|
|
use app_test_support::create_fake_rollout_with_text_elements;
|
|
use app_test_support::create_fake_rollout_with_token_usage;
|
|
use app_test_support::create_final_assistant_message_sse_response;
|
|
use app_test_support::create_mock_responses_server_repeating_assistant;
|
|
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
|
use app_test_support::create_shell_command_sse_response;
|
|
use app_test_support::rollout_path;
|
|
use app_test_support::test_absolute_path;
|
|
use app_test_support::to_response;
|
|
use app_test_support::write_chatgpt_auth;
|
|
use chrono::Utc;
|
|
use codex_app_server_protocol::AskForApproval;
|
|
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
|
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
|
use codex_app_server_protocol::FileChangeApprovalDecision;
|
|
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
|
use codex_app_server_protocol::ItemStartedNotification;
|
|
use codex_app_server_protocol::JSONRPCError;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::PatchApplyStatus;
|
|
use codex_app_server_protocol::PatchChangeKind;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_app_server_protocol::SessionSource;
|
|
use codex_app_server_protocol::ThreadGoalClearResponse;
|
|
use codex_app_server_protocol::ThreadGoalSetResponse;
|
|
use codex_app_server_protocol::ThreadGoalStatus;
|
|
use codex_app_server_protocol::ThreadItem;
|
|
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
|
|
use codex_app_server_protocol::ThreadMetadataUpdateParams;
|
|
use codex_app_server_protocol::ThreadReadParams;
|
|
use codex_app_server_protocol::ThreadReadResponse;
|
|
use codex_app_server_protocol::ThreadResumeParams;
|
|
use codex_app_server_protocol::ThreadResumeResponse;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::ThreadStatus;
|
|
use codex_app_server_protocol::TurnStartParams;
|
|
use codex_app_server_protocol::TurnStartResponse;
|
|
use codex_app_server_protocol::TurnStatus;
|
|
use codex_app_server_protocol::UserInput;
|
|
use codex_config::types::AuthCredentialsStoreMode;
|
|
use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::config_types::Personality;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::AgentMessageEvent;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::SessionMeta;
|
|
use codex_protocol::protocol::SessionMetaLine;
|
|
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
|
|
use codex_protocol::protocol::TokenCountEvent;
|
|
use codex_protocol::protocol::TokenUsage;
|
|
use codex_protocol::protocol::TokenUsageInfo;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use codex_protocol::user_input::ByteRange;
|
|
use codex_protocol::user_input::TextElement;
|
|
use codex_state::StateRuntime;
|
|
use core_test_support::responses;
|
|
use core_test_support::skip_if_no_network;
|
|
use pretty_assertions::assert_eq;
|
|
use serde_json::json;
|
|
use std::fs::FileTimes;
|
|
use std::path::Path;
|
|
use std::path::PathBuf;
|
|
use std::process::Command;
|
|
use tempfile::TempDir;
|
|
use tokio::time::timeout;
|
|
use uuid::Uuid;
|
|
use wiremock::Mock;
|
|
use wiremock::MockServer;
|
|
use wiremock::ResponseTemplate;
|
|
use wiremock::matchers::method;
|
|
use wiremock::matchers::path;
|
|
|
|
use super::analytics::assert_basic_thread_initialized_event;
|
|
use super::analytics::enable_analytics_capture;
|
|
use super::analytics::thread_initialized_event;
|
|
use super::analytics::wait_for_analytics_payload;
|
|
|
|
#[cfg(windows)]
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
|
#[cfg(not(windows))]
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
|
const INTERNAL_ERROR_CODE: i64 = -32603;
|
|
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
|
|
|
|
async fn wait_for_responses_request_count(
|
|
server: &wiremock::MockServer,
|
|
expected_count: usize,
|
|
) -> Result<()> {
|
|
timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let Some(requests) = server.received_requests().await else {
|
|
anyhow::bail!("wiremock did not record requests");
|
|
};
|
|
let responses_request_count = requests
|
|
.iter()
|
|
.filter(|request| {
|
|
request.method == "POST" && request.url.path().ends_with("/responses")
|
|
})
|
|
.count();
|
|
if responses_request_count == expected_count {
|
|
return Ok::<(), anyhow::Error>(());
|
|
}
|
|
if responses_request_count > expected_count {
|
|
anyhow::bail!(
|
|
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
|
);
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
}
|
|
})
|
|
.await??;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
// Start a thread.
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
// Resume should fail before the first user message materializes rollout storage.
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
assert!(
|
|
resume_err
|
|
.error
|
|
.message
|
|
.contains("no rollout found for thread id"),
|
|
"unexpected resume error: {}",
|
|
resume_err.error.message
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_goal_get_rejects_unmaterialized_thread() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
let config_path = codex_home.path().join("config.toml");
|
|
let config = std::fs::read_to_string(&config_path)?;
|
|
std::fs::write(
|
|
&config_path,
|
|
config.replace(
|
|
"general_analytics = true\n",
|
|
"general_analytics = true\ngoals = true\n",
|
|
),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.2-codex".to_string()),
|
|
ephemeral: Some(true),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let goal_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/get",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let goal_err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_error_message(RequestId::Integer(goal_id)),
|
|
)
|
|
.await??;
|
|
assert!(
|
|
goal_err
|
|
.error
|
|
.message
|
|
.contains("ephemeral thread does not support goals"),
|
|
"unexpected goal/get error: {}",
|
|
goal_err.error.message
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml_with_chatgpt_base_url(
|
|
codex_home.path(),
|
|
&server.uri(),
|
|
&server.uri(),
|
|
/*general_analytics_enabled*/ true,
|
|
)?;
|
|
enable_analytics_capture(&server, codex_home.path()).await?;
|
|
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"Saved user message",
|
|
Vec::new(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?;
|
|
let event = thread_initialized_event(&payload)?;
|
|
assert_basic_thread_initialized_event(event, &thread.id, "gpt-5.3-codex", "resumed");
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_returns_rollout_history() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let preview = "Saved user message";
|
|
let text_elements = vec![TextElement::new(
|
|
ByteRange { start: 0, end: 5 },
|
|
Some("<note>".into()),
|
|
)];
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
preview,
|
|
text_elements
|
|
.iter()
|
|
.map(|elem| serde_json::to_value(elem).expect("serialize text element"))
|
|
.collect(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.id, conversation_id);
|
|
assert_eq!(thread.preview, preview);
|
|
assert_eq!(thread.model_provider, "mock_provider");
|
|
assert!(thread.path.as_ref().expect("thread path").is_absolute());
|
|
assert_eq!(thread.cwd, test_absolute_path("/"));
|
|
assert_eq!(thread.cli_version, "0.0.0");
|
|
assert_eq!(thread.source, SessionSource::Cli);
|
|
assert_eq!(thread.git_info, None);
|
|
assert_eq!(thread.status, ThreadStatus::Idle);
|
|
|
|
assert_eq!(
|
|
thread.turns.len(),
|
|
1,
|
|
"expected rollouts to include one turn"
|
|
);
|
|
let turn = &thread.turns[0];
|
|
assert_eq!(turn.status, TurnStatus::Completed);
|
|
assert_eq!(turn.items.len(), 1, "expected user message item");
|
|
match &turn.items[0] {
|
|
ThreadItem::UserMessage { content, .. } => {
|
|
assert_eq!(
|
|
content,
|
|
&vec![UserInput::Text {
|
|
text: preview.to_string(),
|
|
text_elements: text_elements.clone().into_iter().map(Into::into).collect(),
|
|
}]
|
|
);
|
|
}
|
|
other => panic!("expected user message item, got {other:?}"),
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"Saved user message",
|
|
Vec::new(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id.clone(),
|
|
exclude_turns: true,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.id, conversation_id);
|
|
assert!(thread.turns.is_empty());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_emits_active_goal_update_before_continuation() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
let config_path = codex_home.path().join("config.toml");
|
|
let config = std::fs::read_to_string(&config_path)?;
|
|
std::fs::write(
|
|
&config_path,
|
|
config.replace(
|
|
"general_analytics = true\n",
|
|
"general_analytics = true\ngoals = true\n",
|
|
),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.2-codex".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "materialize this thread".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let _turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let goal_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/set",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"objective": "keep polishing",
|
|
"status": "paused",
|
|
})),
|
|
)
|
|
.await?;
|
|
let goal_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(goal_id)),
|
|
)
|
|
.await??;
|
|
let _goal: ThreadGoalSetResponse = to_response(goal_resp)?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/goal/updated"),
|
|
)
|
|
.await??;
|
|
mcp.clear_message_buffer();
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let _resume: ThreadResumeResponse = to_response(resume_resp)?;
|
|
let notification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/goal/updated"),
|
|
)
|
|
.await??;
|
|
let notification: ServerNotification = notification.try_into()?;
|
|
let ServerNotification::ThreadGoalUpdated(notification) = notification else {
|
|
anyhow::bail!("expected thread goal update notification");
|
|
};
|
|
assert_eq!(notification.goal.status, ThreadGoalStatus::Active);
|
|
assert!(
|
|
!mcp.pending_notification_methods()
|
|
.iter()
|
|
.any(|method| method == "turn/started"),
|
|
"goal continuation should start only after the resume goal snapshot"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_goal_set_preserves_budget_limited_same_objective() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
let config_path = codex_home.path().join("config.toml");
|
|
let config = std::fs::read_to_string(&config_path)?;
|
|
std::fs::write(
|
|
&config_path,
|
|
config.replace(
|
|
"general_analytics = true\n",
|
|
"general_analytics = true\ngoals = true\n",
|
|
),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.2-codex".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "materialize this thread".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let _turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let goal_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/set",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"objective": "keep polishing",
|
|
"status": "budgetLimited",
|
|
"tokenBudget": 10,
|
|
})),
|
|
)
|
|
.await?;
|
|
let goal_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(goal_id)),
|
|
)
|
|
.await??;
|
|
let goal: ThreadGoalSetResponse = to_response(goal_resp)?;
|
|
assert_eq!(goal.goal.status, ThreadGoalStatus::BudgetLimited);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/goal/updated"),
|
|
)
|
|
.await??;
|
|
|
|
let replacement_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/set",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"objective": "keep polishing",
|
|
})),
|
|
)
|
|
.await?;
|
|
let replacement_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(replacement_id)),
|
|
)
|
|
.await??;
|
|
let replacement: ThreadGoalSetResponse = to_response(replacement_resp)?;
|
|
|
|
assert_eq!(replacement.goal.status, ThreadGoalStatus::BudgetLimited);
|
|
assert_eq!(replacement.goal.token_budget, Some(10));
|
|
assert_eq!(replacement.goal.tokens_used, 0);
|
|
assert_eq!(replacement.goal.time_used_seconds, 0);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
let config_path = codex_home.path().join("config.toml");
|
|
let config = std::fs::read_to_string(&config_path)?;
|
|
std::fs::write(
|
|
&config_path,
|
|
config.replace(
|
|
"general_analytics = true\n",
|
|
"general_analytics = true\ngoals = true\n",
|
|
),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.2-codex".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "materialize this thread".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let _turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let goal_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/set",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"objective": "keep polishing",
|
|
})),
|
|
)
|
|
.await?;
|
|
let goal_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(goal_id)),
|
|
)
|
|
.await??;
|
|
let _goal: ThreadGoalSetResponse = to_response(goal_resp)?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/goal/updated"),
|
|
)
|
|
.await??;
|
|
|
|
let clear_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/clear",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let clear_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(clear_id)),
|
|
)
|
|
.await??;
|
|
let clear: ThreadGoalClearResponse = to_response(clear_resp)?;
|
|
assert!(clear.cleared);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/goal/cleared"),
|
|
)
|
|
.await??;
|
|
|
|
let get_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/get",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let get_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(get_id)),
|
|
)
|
|
.await??;
|
|
let get: codex_app_server_protocol::ThreadGoalGetResponse = to_response(get_resp)?;
|
|
assert_eq!(None, get.goal);
|
|
|
|
let clear_again_id = mcp
|
|
.send_raw_request(
|
|
"thread/goal/clear",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let clear_again_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(clear_again_id)),
|
|
)
|
|
.await??;
|
|
let clear_again: ThreadGoalClearResponse = to_response(clear_again_resp)?;
|
|
assert!(!clear_again.cleared);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_by_path_uses_remote_thread_store_error() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml_with_remote_thread_store(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: "ignored-when-path-is-present".to_string(),
|
|
path: Some(PathBuf::from("sessions/2025/01/05/rollout.jsonl")),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
|
|
assert_eq!(resume_err.error.code, INTERNAL_ERROR_CODE);
|
|
assert_eq!(
|
|
resume_err.error.message,
|
|
"failed to read thread: thread-store internal error: remote thread store does not support read_thread_by_rollout_path"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let conversation_id = create_fake_rollout_with_token_usage(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"Saved user message",
|
|
Some("mock_provider"),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
let note = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
|
)
|
|
.await??;
|
|
let parsed: ServerNotification = note.try_into()?;
|
|
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
|
panic!("expected thread/tokenUsage/updated notification");
|
|
};
|
|
|
|
assert_eq!(notification.thread_id, thread.id);
|
|
assert_eq!(notification.turn_id, thread.turns[0].id);
|
|
assert_eq!(notification.token_usage.total.total_tokens, 150);
|
|
assert_eq!(notification.token_usage.total.input_tokens, 120);
|
|
assert_eq!(notification.token_usage.total.cached_input_tokens, 20);
|
|
assert_eq!(notification.token_usage.total.output_tokens, 30);
|
|
assert_eq!(notification.token_usage.total.reasoning_output_tokens, 10);
|
|
assert_eq!(notification.token_usage.last.total_tokens, 90);
|
|
assert_eq!(notification.token_usage.model_context_window, Some(200_000));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_skips_restored_token_usage_when_turns_are_excluded() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let conversation_id = create_fake_rollout_with_token_usage(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"Saved user message",
|
|
Some("mock_provider"),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let first_resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let first_resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } =
|
|
to_response::<ThreadResumeResponse>(first_resume_resp)?;
|
|
let expected_turn_id = thread.turns[0].id.clone();
|
|
|
|
let first_note = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
|
)
|
|
.await??;
|
|
let parsed: ServerNotification = first_note.try_into()?;
|
|
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
|
panic!("expected thread/tokenUsage/updated notification");
|
|
};
|
|
assert_eq!(notification.turn_id, expected_turn_id);
|
|
|
|
let second_resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
exclude_turns: true,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let second_resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_again,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;
|
|
assert!(resumed_again.turns.is_empty());
|
|
|
|
let second_note = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
|
)
|
|
.await;
|
|
assert!(
|
|
second_note.is_err(),
|
|
"excludeTurns=true should not replay token usage"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_token_usage_replay_ignores_stale_interrupted_tail_turn() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let filename_ts = "2025-01-05T12-00-00";
|
|
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
|
let conversation_id = create_fake_rollout_with_token_usage(
|
|
codex_home.path(),
|
|
filename_ts,
|
|
meta_rfc3339,
|
|
"Saved user message",
|
|
Some("mock_provider"),
|
|
)?;
|
|
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
|
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
|
let stale_turn_id = "incomplete-turn-after-token-usage";
|
|
let appended_rollout = [
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: stale_turn_id.to_string(),
|
|
started_at: None,
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Still running".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
]
|
|
.join("\n");
|
|
std::fs::write(
|
|
&rollout_file_path,
|
|
format!("{persisted_rollout}{appended_rollout}\n"),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.turns.len(), 2);
|
|
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(thread.turns[1].id, stale_turn_id);
|
|
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
|
|
|
let note = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
|
)
|
|
.await??;
|
|
let parsed: ServerNotification = note.try_into()?;
|
|
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
|
panic!("expected thread/tokenUsage/updated notification");
|
|
};
|
|
|
|
assert_eq!(notification.thread_id, thread.id);
|
|
assert_eq!(notification.turn_id, thread.turns[0].id);
|
|
assert_ne!(notification.turn_id, stale_turn_id);
|
|
assert_eq!(notification.token_usage.total.total_tokens, 150);
|
|
assert_eq!(notification.token_usage.last.total_tokens, 90);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_token_usage_replay_can_belong_to_interrupted_turn() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let filename_ts = "2025-01-05T12-00-00";
|
|
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
|
let conversation_id = create_fake_rollout_with_token_usage(
|
|
codex_home.path(),
|
|
filename_ts,
|
|
meta_rfc3339,
|
|
"Saved user message",
|
|
Some("mock_provider"),
|
|
)?;
|
|
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
|
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
|
let interrupted_turn_id = "interrupted-turn-with-token-usage";
|
|
let appended_rollout = [
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: interrupted_turn_id.to_string(),
|
|
started_at: None,
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Interrupted after usage".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::TokenCount(TokenCountEvent {
|
|
info: Some(TokenUsageInfo {
|
|
total_token_usage: TokenUsage {
|
|
input_tokens: 180,
|
|
cached_input_tokens: 40,
|
|
output_tokens: 50,
|
|
reasoning_output_tokens: 15,
|
|
total_tokens: 230,
|
|
},
|
|
last_token_usage: TokenUsage {
|
|
input_tokens: 90,
|
|
cached_input_tokens: 30,
|
|
output_tokens: 40,
|
|
reasoning_output_tokens: 12,
|
|
total_tokens: 130,
|
|
},
|
|
model_context_window: Some(200_000),
|
|
}),
|
|
rate_limits: None,
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some(interrupted_turn_id.to_string()),
|
|
reason: TurnAbortReason::Interrupted,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
]
|
|
.join("\n");
|
|
std::fs::write(
|
|
&rollout_file_path,
|
|
format!("{persisted_rollout}{appended_rollout}\n"),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.turns.len(), 2);
|
|
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(thread.turns[1].id, interrupted_turn_id);
|
|
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
|
|
|
let note = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
|
)
|
|
.await??;
|
|
let parsed: ServerNotification = note.try_into()?;
|
|
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
|
panic!("expected thread/tokenUsage/updated notification");
|
|
};
|
|
|
|
assert_eq!(notification.thread_id, thread.id);
|
|
assert_eq!(notification.turn_id, interrupted_turn_id);
|
|
assert_eq!(notification.token_usage.total.total_tokens, 230);
|
|
assert_eq!(notification.token_usage.last.total_tokens, 130);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
let config_toml = codex_home.path().join("config.toml");
|
|
std::fs::write(
|
|
&config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
sqlite = true
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#,
|
|
server.uri()
|
|
),
|
|
)?;
|
|
|
|
let repo_path = codex_home.path().join("repo");
|
|
std::fs::create_dir_all(&repo_path)?;
|
|
assert!(
|
|
Command::new("git")
|
|
.args(["init"])
|
|
.arg(&repo_path)
|
|
.status()?
|
|
.success()
|
|
);
|
|
assert!(
|
|
Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["checkout", "-B", "master"])
|
|
.status()?
|
|
.success()
|
|
);
|
|
assert!(
|
|
Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["config", "user.name", "Test User"])
|
|
.status()?
|
|
.success()
|
|
);
|
|
assert!(
|
|
Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["config", "user.email", "test@example.com"])
|
|
.status()?
|
|
.success()
|
|
);
|
|
std::fs::write(repo_path.join("README.md"), "test\n")?;
|
|
assert!(
|
|
Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["add", "README.md"])
|
|
.status()?
|
|
.success()
|
|
);
|
|
assert!(
|
|
Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["commit", "-m", "initial"])
|
|
.status()?
|
|
.success()
|
|
);
|
|
let head_branch = Command::new("git")
|
|
.current_dir(&repo_path)
|
|
.args(["branch", "--show-current"])
|
|
.output()?;
|
|
assert_eq!(
|
|
String::from_utf8(head_branch.stdout)?.trim(),
|
|
"master",
|
|
"test repo should stay on master to verify resume ignores live HEAD"
|
|
);
|
|
|
|
let thread_id = Uuid::new_v4().to_string();
|
|
let conversation_id = ThreadId::from_string(&thread_id)?;
|
|
let rollout_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &thread_id);
|
|
let rollout_dir = rollout_path.parent().expect("rollout parent directory");
|
|
std::fs::create_dir_all(rollout_dir)?;
|
|
let session_meta = SessionMeta {
|
|
id: conversation_id,
|
|
forked_from_id: None,
|
|
timestamp: "2025-01-05T12:00:00Z".to_string(),
|
|
cwd: repo_path.clone(),
|
|
originator: "codex".to_string(),
|
|
cli_version: "0.0.0".to_string(),
|
|
source: RolloutSessionSource::Cli,
|
|
agent_path: None,
|
|
agent_nickname: None,
|
|
agent_role: None,
|
|
model_provider: Some("mock_provider".to_string()),
|
|
base_instructions: None,
|
|
dynamic_tools: None,
|
|
memory_mode: None,
|
|
};
|
|
std::fs::write(
|
|
&rollout_path,
|
|
[
|
|
json!({
|
|
"timestamp": "2025-01-05T12:00:00Z",
|
|
"type": "session_meta",
|
|
"payload": serde_json::to_value(SessionMetaLine {
|
|
meta: session_meta,
|
|
git: None,
|
|
})?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": "2025-01-05T12:00:00Z",
|
|
"type": "response_item",
|
|
"payload": {
|
|
"type": "message",
|
|
"role": "user",
|
|
"content": [{"type": "input_text", "text": "Saved user message"}]
|
|
}
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": "2025-01-05T12:00:00Z",
|
|
"type": "event_msg",
|
|
"payload": {
|
|
"type": "user_message",
|
|
"message": "Saved user message",
|
|
"kind": "plain"
|
|
}
|
|
})
|
|
.to_string(),
|
|
]
|
|
.join("\n")
|
|
+ "\n",
|
|
)?;
|
|
let state_db =
|
|
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
|
state_db
|
|
.mark_backfill_complete(/*last_watermark*/ None)
|
|
.await?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let update_id = mcp
|
|
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
|
|
thread_id: thread_id.clone(),
|
|
git_info: Some(ThreadMetadataGitInfoUpdateParams {
|
|
sha: None,
|
|
branch: Some(Some("feature/pr-branch".to_string())),
|
|
origin_url: None,
|
|
}),
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(update_id)),
|
|
)
|
|
.await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(
|
|
thread
|
|
.git_info
|
|
.as_ref()
|
|
.and_then(|git| git.branch.as_deref()),
|
|
Some("feature/pr-branch")
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
|
|
{
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let filename_ts = "2025-01-05T12-00-00";
|
|
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home.path(),
|
|
filename_ts,
|
|
meta_rfc3339,
|
|
"Saved user message",
|
|
Vec::new(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
|
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
|
let turn_id = "incomplete-turn";
|
|
let appended_rollout = [
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
|
turn_id: turn_id.to_string(),
|
|
started_at: None,
|
|
model_context_window: None,
|
|
collaboration_mode_kind: Default::default(),
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
json!({
|
|
"timestamp": meta_rfc3339,
|
|
"type": "event_msg",
|
|
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
|
message: "Still running".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}))?,
|
|
})
|
|
.to_string(),
|
|
]
|
|
.join("\n");
|
|
std::fs::write(
|
|
&rollout_file_path,
|
|
format!("{persisted_rollout}{appended_rollout}\n"),
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.status, ThreadStatus::Idle);
|
|
assert_eq!(thread.turns.len(), 2);
|
|
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
|
assert_eq!(thread.turns[1].id, turn_id);
|
|
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
|
|
|
let second_resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let second_resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_again,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;
|
|
|
|
assert_eq!(resumed_again.status, ThreadStatus::Idle);
|
|
assert_eq!(resumed_again.turns.len(), 2);
|
|
assert_eq!(resumed_again.turns[1].id, turn_id);
|
|
assert_eq!(resumed_again.turns[1].status, TurnStatus::Interrupted);
|
|
|
|
let read_id = mcp
|
|
.send_thread_read_request(ThreadReadParams {
|
|
thread_id: resumed_again.id,
|
|
include_turns: true,
|
|
})
|
|
.await?;
|
|
let read_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
|
)
|
|
.await??;
|
|
let ThreadReadResponse {
|
|
thread: read_thread,
|
|
..
|
|
} = to_response::<ThreadReadResponse>(read_resp)?;
|
|
|
|
assert_eq!(read_thread.status, ThreadStatus::Idle);
|
|
assert_eq!(read_thread.turns.len(), 2);
|
|
assert_eq!(read_thread.turns[1].id, turn_id);
|
|
assert_eq!(read_thread.turns[1].status, TurnStatus::Interrupted);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
|
|
let thread_id = rollout.conversation_id.clone();
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let read_id = mcp
|
|
.send_thread_read_request(ThreadReadParams {
|
|
thread_id: thread_id.clone(),
|
|
include_turns: false,
|
|
})
|
|
.await?;
|
|
let read_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
|
)
|
|
.await??;
|
|
let ThreadReadResponse {
|
|
thread: before_resume,
|
|
..
|
|
} = to_response::<ThreadReadResponse>(read_resp)?;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread_id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(thread.updated_at, before_resume.updated_at);
|
|
assert_eq!(thread.status, ThreadStatus::Idle);
|
|
|
|
let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
|
|
assert_eq!(after_modified, rollout.before_modified);
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id,
|
|
input: vec![UserInput::Text {
|
|
text: "Hello".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
|
|
assert!(after_turn_modified > rollout.before_modified);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let mut secondary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
|
|
|
|
let turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "respond with docs".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
|
|
let resume_id = secondary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_thread,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_ne!(resumed_thread.status, ThreadStatus::NotLoaded);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
|
let server = responses::start_mock_server().await;
|
|
let first_body = responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "Done"),
|
|
responses::ev_completed("resp-1"),
|
|
]);
|
|
let second_response = responses::sse_response(responses::sse(vec![
|
|
responses::ev_response_created("resp-2"),
|
|
responses::ev_assistant_message("msg-2", "Done"),
|
|
responses::ev_completed("resp-2"),
|
|
]))
|
|
.set_delay(std::time::Duration::from_millis(500));
|
|
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
|
|
let _second_response_mock = responses::mount_response_once(&server, second_response).await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let thread_id = thread.id.clone();
|
|
let running_turn_request_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread_id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "keep running".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let running_turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn: running_turn } =
|
|
to_response::<TurnStartResponse>(running_turn_resp)?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
|
|
let resume_id = primary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread_id.clone(),
|
|
history: Some(vec![ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![ContentItem::InputText {
|
|
text: "history override".to_string(),
|
|
}],
|
|
end_turn: None,
|
|
phase: None,
|
|
}]),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
assert!(
|
|
resume_err.error.message.contains("cannot resume thread")
|
|
&& resume_err.error.message.contains("with history")
|
|
&& resume_err.error.message.contains("running"),
|
|
"unexpected resume error: {}",
|
|
resume_err.error.message
|
|
);
|
|
|
|
primary
|
|
.interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Result<()> {
|
|
let server = responses::start_mock_server().await;
|
|
let first_body = responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "Done"),
|
|
responses::ev_completed("resp-1"),
|
|
]);
|
|
let second_response = responses::sse_response(responses::sse(vec![
|
|
responses::ev_response_created("resp-2"),
|
|
responses::ev_assistant_message("msg-2", "Done"),
|
|
responses::ev_completed("resp-2"),
|
|
]))
|
|
.set_delay(std::time::Duration::from_millis(500));
|
|
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
|
|
let _second_response_mock = responses::mount_response_once(&server, second_response).await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let thread_id = thread.id.clone();
|
|
let running_turn_request_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread_id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "keep running".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let running_turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(running_turn_request_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn: running_turn } =
|
|
to_response::<TurnStartResponse>(running_turn_resp)?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
|
|
let resume_id = primary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread_id.clone(),
|
|
path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
assert!(
|
|
resume_err.error.message.contains("mismatched path"),
|
|
"unexpected resume error: {}",
|
|
resume_err.error.message
|
|
);
|
|
|
|
primary
|
|
.interrupt_turn_and_wait_for_aborted(thread_id, running_turn.id, DEFAULT_READ_TIMEOUT)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> Result<()> {
|
|
let server = responses::start_mock_server().await;
|
|
let first_response = responses::sse_response(responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "Done"),
|
|
responses::ev_completed("resp-1"),
|
|
]));
|
|
let second_response = responses::sse_response(responses::sse(vec![
|
|
responses::ev_response_created("resp-2"),
|
|
responses::ev_assistant_message("msg-2", "Done"),
|
|
responses::ev_completed("resp-2"),
|
|
]))
|
|
.set_delay(std::time::Duration::from_millis(500));
|
|
let _response_mock =
|
|
responses::mount_response_sequence(&server, vec![first_response, second_response]).await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let running_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "keep running".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
|
|
let resume_id = primary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
model: Some("not-the-running-model".to_string()),
|
|
cwd: Some("/tmp".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse { thread, model, .. } =
|
|
to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_eq!(model, "gpt-5.4");
|
|
// The running-thread resume response is queued onto the thread listener task.
|
|
// If the in-flight turn completes before that queued command runs, the response
|
|
// can legitimately observe the thread as idle.
|
|
match &thread.status {
|
|
ThreadStatus::Active { active_flags } => assert!(active_flags.is_empty()),
|
|
ThreadStatus::Idle => {}
|
|
status => panic!("unexpected thread status after running resume: {status:?}"),
|
|
}
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> {
|
|
let server = responses::start_mock_server().await;
|
|
let _response_mock = responses::mount_sse_once(
|
|
&server,
|
|
responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "Done"),
|
|
responses::ev_completed("resp-1"),
|
|
]),
|
|
)
|
|
.await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let mut secondary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
|
|
|
|
let resume_id = secondary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
exclude_turns: true,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed, ..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(resumed.id, thread.id);
|
|
assert_eq!(resumed.status, ThreadStatus::Idle);
|
|
assert!(resumed.turns.is_empty());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> {
|
|
let responses = vec![
|
|
create_final_assistant_message_sse_response("seeded")?,
|
|
create_shell_command_sse_response(
|
|
vec![
|
|
"python3".to_string(),
|
|
"-c".to_string(),
|
|
"print(42)".to_string(),
|
|
],
|
|
/*workdir*/ None,
|
|
Some(5000),
|
|
"call-1",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done")?,
|
|
];
|
|
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let running_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "run command".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
approval_policy: Some(AskForApproval::UnlessTrusted),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
|
|
)
|
|
.await??;
|
|
|
|
let original_request = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::CommandExecutionRequestApproval { .. } = &original_request else {
|
|
panic!("expected CommandExecutionRequestApproval request, got {original_request:?}");
|
|
};
|
|
|
|
let resume_id = primary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_thread,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_eq!(resumed_thread.id, thread.id);
|
|
assert!(
|
|
resumed_thread
|
|
.turns
|
|
.iter()
|
|
.any(|turn| matches!(turn.status, TurnStatus::InProgress))
|
|
);
|
|
|
|
let replayed_request = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
pretty_assertions::assert_eq!(replayed_request, original_request);
|
|
|
|
let ServerRequest::CommandExecutionRequestApproval { request_id, .. } = replayed_request else {
|
|
panic!("expected CommandExecutionRequestApproval request");
|
|
};
|
|
primary
|
|
.send_response(
|
|
request_id,
|
|
serde_json::to_value(CommandExecutionRequestApprovalResponse {
|
|
decision: CommandExecutionApprovalDecision::Accept,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_replays_pending_file_change_request_approval() -> Result<()> {
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().join("codex_home");
|
|
std::fs::create_dir(&codex_home)?;
|
|
let workspace = tmp.path().join("workspace");
|
|
std::fs::create_dir(&workspace)?;
|
|
|
|
let patch = r#"*** Begin Patch
|
|
*** Add File: README.md
|
|
+new line
|
|
*** End Patch
|
|
"#;
|
|
let responses = vec![
|
|
create_final_assistant_message_sse_response("seeded")?,
|
|
create_apply_patch_sse_response(patch, "patch-call")?,
|
|
create_final_assistant_message_sse_response("done")?,
|
|
];
|
|
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
|
create_config_toml(&codex_home, &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(&codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
cwd: Some(workspace.to_string_lossy().into_owned()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let seed_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
cwd: Some(workspace.clone()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
primary.clear_message_buffer();
|
|
|
|
let running_turn_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "apply patch".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
cwd: Some(workspace.clone()),
|
|
approval_policy: Some(AskForApproval::UnlessTrusted),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
|
|
)
|
|
.await??;
|
|
|
|
let original_started = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let notification = primary
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let started: ItemStartedNotification =
|
|
serde_json::from_value(notification.params.clone().expect("item/started params"))?;
|
|
if let ThreadItem::FileChange { .. } = started.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let expected_readme_path = workspace.join("README.md");
|
|
let expected_file_change = ThreadItem::FileChange {
|
|
id: "patch-call".to_string(),
|
|
changes: vec![codex_app_server_protocol::FileUpdateChange {
|
|
path: expected_readme_path.to_string_lossy().into_owned(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "new line\n".to_string(),
|
|
}],
|
|
status: PatchApplyStatus::InProgress,
|
|
};
|
|
assert_eq!(original_started, expected_file_change);
|
|
|
|
let original_request = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::FileChangeRequestApproval { .. } = &original_request else {
|
|
panic!("expected FileChangeRequestApproval request, got {original_request:?}");
|
|
};
|
|
primary.clear_message_buffer();
|
|
|
|
let resume_id = primary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_thread,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_eq!(resumed_thread.id, thread.id);
|
|
assert!(
|
|
resumed_thread
|
|
.turns
|
|
.iter()
|
|
.any(|turn| matches!(turn.status, TurnStatus::InProgress))
|
|
);
|
|
|
|
let replayed_request = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
assert_eq!(replayed_request, original_request);
|
|
|
|
let ServerRequest::FileChangeRequestApproval { request_id, .. } = replayed_request else {
|
|
panic!("expected FileChangeRequestApproval request");
|
|
};
|
|
primary
|
|
.send_response(
|
|
request_id,
|
|
serde_json::to_value(FileChangeRequestApprovalResponse {
|
|
decision: FileChangeApprovalDecision::Accept,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
wait_for_responses_request_count(&server, /*expected_count*/ 3).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let RestartedThreadFixture {
|
|
mut mcp,
|
|
thread_id,
|
|
rollout_file_path,
|
|
updated_at,
|
|
} = start_materialized_thread_and_restart(codex_home.path(), "materialize").await?;
|
|
let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z";
|
|
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;
|
|
let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id,
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed_thread,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
|
|
assert_eq!(resumed_thread.updated_at, updated_at);
|
|
assert_eq!(resumed_thread.status, ThreadStatus::Idle);
|
|
|
|
let after_resume_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
|
assert_eq!(after_resume_modified, before_modified);
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: resumed_thread.id,
|
|
input: vec![UserInput::Text {
|
|
text: "Hello".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let after_turn_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
|
assert!(after_turn_modified > before_modified);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
|
|
create_config_toml_with_required_broken_mcp(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: rollout.conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
|
|
assert!(
|
|
err.error
|
|
.message
|
|
.contains("required MCP servers failed to initialize"),
|
|
"unexpected error message: {}",
|
|
err.error.message
|
|
);
|
|
assert!(
|
|
err.error.message.contains("required_broken"),
|
|
"unexpected error message: {}",
|
|
err.error.message
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_surfaces_cloud_requirements_load_errors() -> Result<()> {
|
|
let server = MockServer::start().await;
|
|
Mock::given(method("GET"))
|
|
.and(path("/backend-api/wham/config/requirements"))
|
|
.respond_with(
|
|
ResponseTemplate::new(401)
|
|
.insert_header("content-type", "text/html")
|
|
.set_body_string("<html>nope</html>"),
|
|
)
|
|
.mount(&server)
|
|
.await;
|
|
Mock::given(method("POST"))
|
|
.and(path("/oauth/token"))
|
|
.respond_with(ResponseTemplate::new(401).set_body_json(json!({
|
|
"error": { "code": "refresh_token_invalidated" }
|
|
})))
|
|
.mount(&server)
|
|
.await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
let model_server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let chatgpt_base_url = format!("{}/backend-api", server.uri());
|
|
create_config_toml_with_chatgpt_base_url(
|
|
codex_home.path(),
|
|
&model_server.uri(),
|
|
&chatgpt_base_url,
|
|
/*general_analytics_enabled*/ false,
|
|
)?;
|
|
write_chatgpt_auth(
|
|
codex_home.path(),
|
|
ChatGptAuthFixture::new("chatgpt-token")
|
|
.refresh_token("stale-refresh-token")
|
|
.plan_type("business")
|
|
.chatgpt_user_id("user-123")
|
|
.chatgpt_account_id("account-123")
|
|
.account_id("account-123"),
|
|
AuthCredentialsStoreMode::File,
|
|
)?;
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"Saved user message",
|
|
Vec::new(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
let refresh_token_url = format!("{}/oauth/token", server.uri());
|
|
let mut mcp = McpProcess::new_with_env(
|
|
codex_home.path(),
|
|
&[
|
|
("OPENAI_API_KEY", None),
|
|
(
|
|
REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR,
|
|
Some(refresh_token_url.as_str()),
|
|
),
|
|
],
|
|
)
|
|
.await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: conversation_id,
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let err: JSONRPCError = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_error_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
|
|
assert!(
|
|
err.error.message.contains("failed to load configuration"),
|
|
"unexpected error message: {}",
|
|
err.error.message
|
|
);
|
|
assert_eq!(
|
|
err.error.data,
|
|
Some(json!({
|
|
"reason": "cloudRequirements",
|
|
"errorCode": "Auth",
|
|
"action": "relogin",
|
|
"statusCode": 401,
|
|
"detail": "Your access token could not be refreshed because your refresh token was revoked. Please log out and sign in again.",
|
|
}))
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "materialize".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let thread_path = thread.path.clone().expect("thread path");
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: "not-a-valid-thread-id".to_string(),
|
|
path: Some(thread_path.to_path_buf()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed, ..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_eq!(resumed.id, thread.id);
|
|
assert_eq!(resumed.path, thread.path);
|
|
assert_eq!(resumed.status, ThreadStatus::Idle);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_can_load_source_by_external_path() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
let external_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
let thread_id = create_fake_rollout(
|
|
external_home.path(),
|
|
"2025-01-05T12-00-00",
|
|
"2025-01-05T12:00:00Z",
|
|
"external path history",
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
let thread_path = rollout_path(external_home.path(), "2025-01-05T12-00-00", &thread_id);
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: "not-a-valid-thread-id".to_string(),
|
|
path: Some(thread_path.clone()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed, ..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
let expected_thread_path = std::fs::canonicalize(&thread_path)?;
|
|
assert_eq!(resumed.id, thread_id);
|
|
assert_eq!(resumed.path, Some(expected_thread_path));
|
|
assert_eq!(resumed.preview, "external path history");
|
|
assert_eq!(resumed.status, ThreadStatus::Idle);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_supports_history_and_overrides() -> Result<()> {
|
|
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let RestartedThreadFixture {
|
|
mut mcp, thread_id, ..
|
|
} = start_materialized_thread_and_restart(codex_home.path(), "seed history").await?;
|
|
|
|
let history_text = "Hello from history";
|
|
let history = vec![ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![ContentItem::InputText {
|
|
text: history_text.to_string(),
|
|
}],
|
|
end_turn: None,
|
|
phase: None,
|
|
}];
|
|
|
|
// Resume with explicit history and override the model.
|
|
let resume_id = mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id,
|
|
history: Some(history),
|
|
model: Some("mock-model".to_string()),
|
|
model_provider: Some("mock_provider".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed,
|
|
model_provider,
|
|
..
|
|
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert!(!resumed.id.is_empty());
|
|
assert_eq!(model_provider, "mock_provider");
|
|
assert_eq!(resumed.preview, history_text);
|
|
assert_eq!(resumed.status, ThreadStatus::Idle);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
struct RestartedThreadFixture {
|
|
mcp: McpProcess,
|
|
thread_id: String,
|
|
rollout_file_path: PathBuf,
|
|
updated_at: i64,
|
|
}
|
|
|
|
async fn start_materialized_thread_and_restart(
|
|
codex_home: &Path,
|
|
seed_text: &str,
|
|
) -> Result<RestartedThreadFixture> {
|
|
let mut first_mcp = McpProcess::new(codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, first_mcp.initialize()).await??;
|
|
|
|
let start_id = first_mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.4".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
first_mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let materialize_turn_id = first_mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: seed_text.to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
first_mcp.read_stream_until_response_message(RequestId::Integer(materialize_turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
first_mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let read_id = first_mcp
|
|
.send_thread_read_request(ThreadReadParams {
|
|
thread_id: thread.id.clone(),
|
|
include_turns: false,
|
|
})
|
|
.await?;
|
|
let read_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
first_mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
|
)
|
|
.await??;
|
|
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
|
|
|
let thread_id = thread.id;
|
|
let rollout_file_path = thread
|
|
.path
|
|
.ok_or_else(|| anyhow::anyhow!("thread path missing from thread/start response"))?;
|
|
let updated_at = thread.updated_at;
|
|
|
|
drop(first_mcp);
|
|
|
|
let mut second_mcp = McpProcess::new(codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, second_mcp.initialize()).await??;
|
|
|
|
Ok(RestartedThreadFixture {
|
|
mcp: second_mcp,
|
|
thread_id,
|
|
rollout_file_path: rollout_file_path.to_path_buf(),
|
|
updated_at,
|
|
})
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_resume_accepts_personality_override() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let first_body = responses::sse(vec![
|
|
responses::ev_response_created("resp-1"),
|
|
responses::ev_assistant_message("msg-1", "Done"),
|
|
responses::ev_completed("resp-1"),
|
|
]);
|
|
let second_body = responses::sse(vec![
|
|
responses::ev_response_created("resp-2"),
|
|
responses::ev_assistant_message("msg-2", "Done"),
|
|
responses::ev_completed("resp-2"),
|
|
]);
|
|
let response_mock = responses::mount_sse_sequence(&server, vec![first_body, second_body]).await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut primary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
|
|
|
let start_id = primary
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("gpt-5.3-codex".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let materialize_id = primary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![UserInput::Text {
|
|
text: "seed history".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_response_message(RequestId::Integer(materialize_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
primary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let mut secondary = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
|
|
|
|
let resume_id = secondary
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id,
|
|
model: Some("gpt-5.3-codex".to_string()),
|
|
personality: Some(Personality::Friendly),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
|
|
assert_eq!(resume.thread.status, ThreadStatus::Idle);
|
|
|
|
let turn_id = secondary
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: resume.thread.id,
|
|
input: vec![UserInput::Text {
|
|
text: "Hello".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
secondary.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
secondary.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let requests = response_mock.requests();
|
|
let request = requests
|
|
.last()
|
|
.expect("expected request for resumed thread turn");
|
|
let developer_texts = request.message_input_texts("developer");
|
|
assert!(
|
|
developer_texts
|
|
.iter()
|
|
.any(|text| text.contains("<personality_spec>")),
|
|
"expected a personality update message in developer input, got {developer_texts:?}"
|
|
);
|
|
let instructions_text = request.instructions_text();
|
|
assert!(
|
|
instructions_text.contains(CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT),
|
|
"expected default base instructions from history, got {instructions_text:?}"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Helper to create a config.toml pointing at the mock model server.
|
|
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
general_analytics = true
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#
|
|
),
|
|
)
|
|
}
|
|
|
|
fn create_config_toml_with_remote_thread_store(
|
|
codex_home: &std::path::Path,
|
|
server_uri: &str,
|
|
) -> std::io::Result<()> {
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
experimental_thread_store_endpoint = "http://127.0.0.1:1"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
general_analytics = true
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#
|
|
),
|
|
)
|
|
}
|
|
|
|
fn create_config_toml_with_chatgpt_base_url(
|
|
codex_home: &std::path::Path,
|
|
server_uri: &str,
|
|
chatgpt_base_url: &str,
|
|
general_analytics_enabled: bool,
|
|
) -> std::io::Result<()> {
|
|
let general_analytics_toml = if general_analytics_enabled {
|
|
"\ngeneral_analytics = true".to_string()
|
|
} else {
|
|
"\ngeneral_analytics = false".to_string()
|
|
};
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
chatgpt_base_url = "{chatgpt_base_url}"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
{general_analytics_toml}
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#
|
|
),
|
|
)
|
|
}
|
|
|
|
fn create_config_toml_with_required_broken_mcp(
|
|
codex_home: &std::path::Path,
|
|
server_uri: &str,
|
|
) -> std::io::Result<()> {
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
|
|
[mcp_servers.required_broken]
|
|
command = "codex-definitely-not-a-real-binary"
|
|
required = true
|
|
"#
|
|
),
|
|
)
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> {
|
|
let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc);
|
|
let times = FileTimes::new().set_modified(parsed.into());
|
|
std::fs::OpenOptions::new()
|
|
.append(true)
|
|
.open(path)?
|
|
.set_times(times)?;
|
|
Ok(())
|
|
}
|
|
|
|
struct RolloutFixture {
|
|
conversation_id: String,
|
|
rollout_file_path: PathBuf,
|
|
before_modified: std::time::SystemTime,
|
|
}
|
|
|
|
fn setup_rollout_fixture(codex_home: &Path, server_uri: &str) -> Result<RolloutFixture> {
|
|
create_config_toml(codex_home, server_uri)?;
|
|
|
|
let preview = "Saved user message";
|
|
let filename_ts = "2025-01-05T12-00-00";
|
|
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
|
let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z";
|
|
let conversation_id = create_fake_rollout_with_text_elements(
|
|
codex_home,
|
|
filename_ts,
|
|
meta_rfc3339,
|
|
preview,
|
|
Vec::new(),
|
|
Some("mock_provider"),
|
|
/*git_info*/ None,
|
|
)?;
|
|
let rollout_file_path = rollout_path(codex_home, filename_ts, &conversation_id);
|
|
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;
|
|
let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
|
Ok(RolloutFixture {
|
|
conversation_id,
|
|
rollout_file_path,
|
|
before_modified,
|
|
})
|
|
}
|