This commit is contained in:
Owen Lin
2025-11-02 13:15:53 -08:00
parent afd59a99af
commit d8075b923c
6 changed files with 424 additions and 13 deletions

View File

@@ -1,3 +1,5 @@
mod thread_archive;
mod thread_list;
mod thread_resume;
mod thread_start;
mod turn_start;

View File

@@ -52,11 +52,17 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
.path()
.join(SESSIONS_SUBDIR)
.join(format!("{}.jsonl", thread.id));
assert!(rollout_path.exists(), "expected {} to exist", rollout_path.display());
assert!(
rollout_path.exists(),
"expected {} to exist",
rollout_path.display()
);
// Archive the thread.
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams { thread: thread.clone() })
.send_thread_archive_request(ThreadArchiveParams {
thread: thread.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
@@ -93,4 +99,3 @@ approval_policy = "never"
sandbox_mode = "read-only"
"#
}

View File

@@ -0,0 +1,209 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::UserInput as V2UserInput;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn turn_start_emits_notification_and_completes_without_overrides() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
// Two Codex turns hit the mock model (session start + turn/start).
let responses = vec![
create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done")?,
];
let server = create_mock_chat_completions_server(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread (v2) and capture its id.
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
model_provider: None,
profile: None,
cwd: None,
approval_policy: None,
sandbox: None,
config: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
include_apply_patch_tool: None,
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(thread_resp)?;
// Add legacy listener to get task_complete signal.
let conversation_id = codex_protocol::ConversationId::from_string(&thread.id)?;
let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams {
conversation_id,
experimental_raw_events: false,
})
.await?;
let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await??;
let AddConversationSubscriptionResponse { .. } = to_response::<_>(add_listener_resp)?;
// Start a turn with only input and thread_id set.
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text("Hello".to_string())],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
// Expect a turn/started notification.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(
started.turn.status,
codex_app_server_protocol::TurnStatus::InProgress
);
// And we should see task_complete for the legacy stream as the turn finishes.
let _task_done: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
drop(server);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn turn_start_accepts_local_image_input() -> Result<()> {
// Two Codex turns hit the mock model (session start + turn/start).
let responses = vec![
create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done")?,
];
let server = create_mock_chat_completions_server(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
model_provider: None,
profile: None,
cwd: None,
approval_policy: None,
sandbox: None,
config: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
include_apply_patch_tool: None,
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(thread_resp)?;
let image_path = codex_home.path().join("image.png");
// No need to actually write the file; we just exercise the input path.
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::LocalImage { path: image_path }],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
drop(server);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}