Files
codex/codex-rs/app-server/tests/suite/v2/thread_resume.rs
Owen Lin 1924500250 [app-server] populate thread>turns>items on thread/resume (#6848)
This PR allows clients to render historical messages when resuming a
thread via `thread/resume` by reading from the list of `EventMsg`
payloads loaded from the rollout, and then transforming them into Turns
and ThreadItems to be returned on the `Thread` object.

This is implemented by leveraging `SessionConfiguredNotification` which
returns this list of `EventMsg` objects when resuming a conversation,
and then applying a stateful `ThreadHistoryBuilder` that parses from
this EventMsg log and transforms it into Turns and ThreadItems.

Note that we only persist a subset of `EventMsg`s in a rollout as
defined in `policy.rs`, so we lose fidelity whenever we resume a thread
compared to when we streamed the thread's turns originally. However,
this behavior is at parity with the legacy API.
2025-11-19 15:58:09 +00:00

250 lines
7.9 KiB
Rust

use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_resume_returns_original_thread() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("arcticfox".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
// Resume it via v2 API.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed, ..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed, thread);
Ok(())
}
#[tokio::test]
async fn thread_resume_returns_rollout_history() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let preview = "Saved user message";
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
preview,
Some("mock_provider"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: conversation_id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.id, conversation_id);
assert_eq!(thread.preview, preview);
assert_eq!(thread.model_provider, "mock_provider");
assert!(thread.path.is_absolute());
assert_eq!(
thread.turns.len(),
1,
"expected rollouts to include one turn"
);
let turn = &thread.turns[0];
assert_eq!(turn.status, TurnStatus::Completed);
assert_eq!(turn.items.len(), 1, "expected user message item");
match &turn.items[0] {
ThreadItem::UserMessage { content, .. } => {
assert_eq!(
content,
&vec![UserInput::Text {
text: preview.to_string()
}]
);
}
other => panic!("expected user message item, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("arcticfox".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let thread_path = thread.path.clone();
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: "not-a-valid-thread-id".to_string(),
path: Some(thread_path),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed, ..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed, thread);
Ok(())
}
#[tokio::test]
async fn thread_resume_supports_history_and_overrides() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("arcticfox".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let history_text = "Hello from history";
let history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: history_text.to_string(),
}],
}];
// Resume with explicit history and override the model.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
history: Some(history),
model: Some("mock-model".to_string()),
model_provider: Some("mock_provider".to_string()),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed,
model_provider,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert!(!resumed.id.is_empty());
assert_eq!(model_provider, "mock_provider");
assert_eq!(resumed.preview, history_text);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}