Compare commits

...

2 Commits

Author SHA1 Message Date
daniel-oai
3423a0b23d Seed legacy turn fallback ids from thread identity 2026-04-08 18:40:32 -07:00
daniel-oai
4c73b17957 Stabilize legacy thread replay turn ids 2026-04-08 03:26:50 -07:00
5 changed files with 421 additions and 19 deletions

View File

@@ -41,9 +41,11 @@ use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::PatchApplyEndEvent;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::protocol::ViewImageToolCallEvent;
@@ -51,7 +53,6 @@ use codex_protocol::protocol::WebSearchBeginEvent;
use codex_protocol::protocol::WebSearchEndEvent;
use std::collections::HashMap;
use tracing::warn;
use uuid::Uuid;
#[cfg(test)]
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
@@ -63,7 +64,22 @@ use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
/// resumed/rebuilt thread history preserves the original turn identifiers.
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
let mut builder = ThreadHistoryBuilder::for_replay(/*thread_id*/ None);
for item in items {
builder.handle_rollout_item(item);
}
builder.finish()
}
/// Replay persisted rollout items for a known thread.
///
/// Passing the thread id explicitly keeps deterministic legacy fallback ids stable
/// even if the rollout is missing `session_meta`.
pub fn build_turns_from_rollout_items_for_thread(
thread_id: &str,
items: &[RolloutItem],
) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::for_replay(Some(thread_id.to_owned()));
for item in items {
builder.handle_rollout_item(item);
}
@@ -73,6 +89,9 @@ pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
pub struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
thread_id_seed: Option<String>,
pending_implicit_turn_id: Option<String>,
next_synthetic_turn_index: usize,
next_item_index: i64,
current_rollout_index: usize,
next_rollout_index: usize,
@@ -86,9 +105,24 @@ impl Default for ThreadHistoryBuilder {
impl ThreadHistoryBuilder {
pub fn new() -> Self {
Self::for_replay(/*thread_id*/ None)
}
pub fn for_replay(thread_id: Option<String>) -> Self {
Self::with_thread_id_seed(thread_id)
}
pub fn for_live(thread_id: String) -> Self {
Self::with_thread_id_seed(Some(thread_id))
}
fn with_thread_id_seed(thread_id_seed: Option<String>) -> Self {
Self {
turns: Vec::new(),
current_turn: None,
thread_id_seed,
pending_implicit_turn_id: None,
next_synthetic_turn_index: 0,
next_item_index: 1,
current_rollout_index: 0,
next_rollout_index: 0,
@@ -96,7 +130,12 @@ impl ThreadHistoryBuilder {
}
pub fn reset(&mut self) {
*self = Self::new();
self.turns.clear();
self.current_turn = None;
self.pending_implicit_turn_id = None;
self.next_item_index = 1;
self.current_rollout_index = 0;
self.next_rollout_index = 0;
}
pub fn finish(mut self) -> Vec<Turn> {
@@ -205,10 +244,53 @@ impl ThreadHistoryBuilder {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::TurnContext(payload) => self.handle_turn_context(payload),
RolloutItem::SessionMeta(payload) => self.handle_session_meta(payload),
}
}
fn handle_session_meta(&mut self, payload: &SessionMetaLine) {
let session_thread_id = payload.meta.id.to_string();
match self.thread_id_seed.as_deref() {
Some(thread_id_seed) if thread_id_seed != session_thread_id => {
warn!(
thread_id_seed,
session_meta_thread_id = session_thread_id.as_str(),
"thread history builder saw mismatched session_meta thread id"
);
}
Some(_) => {}
None => self.thread_id_seed = Some(session_thread_id),
}
}
fn handle_turn_context(&mut self, payload: &TurnContextItem) {
let Some(turn_id) = payload.turn_id.as_ref() else {
return;
};
if let Some(current_turn) = self.current_turn.as_mut() {
if current_turn.opened_explicitly {
if current_turn.id != *turn_id {
warn!(
active_turn_id = current_turn.id.as_str(),
context_turn_id = turn_id,
"ignoring mismatched turn_context turn_id for explicit turn"
);
}
} else {
current_turn.id.clone_from(turn_id);
}
return;
}
if self.turns.last().is_some_and(|turn| turn.id == *turn_id) {
return;
}
self.pending_implicit_turn_id = Some(turn_id.clone());
}
fn handle_response_item(&mut self, item: &codex_protocol::models::ResponseItem) {
let codex_protocol::models::ResponseItem::Message {
role, content, id, ..
@@ -912,6 +994,7 @@ impl ThreadHistoryBuilder {
fn handle_turn_started(&mut self, payload: &TurnStartedEvent) {
self.finish_current_turn();
self.pending_implicit_turn_id = None;
self.current_turn = Some(
self.new_turn(Some(payload.turn_id.clone()))
.with_status(TurnStatus::InProgress)
@@ -987,7 +1070,7 @@ impl ThreadHistoryBuilder {
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
PendingTurn {
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
id: id.unwrap_or_else(|| self.take_next_implicit_turn_id()),
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
@@ -1010,6 +1093,28 @@ impl ThreadHistoryBuilder {
unreachable!("current turn must exist after initialization");
}
fn take_next_implicit_turn_id(&mut self) -> String {
// Newer legacy-history entries persist a turn_context before the events that need an
// implicit turn. Prefer that real ID over synthesizing one.
if let Some(turn_id) = self.pending_implicit_turn_id.take() {
return turn_id;
}
let synthetic_turn_index = self.next_synthetic_turn_index;
self.next_synthetic_turn_index += 1;
self.synthetic_turn_id(synthetic_turn_index)
}
fn synthetic_turn_id(&self, synthetic_turn_index: usize) -> String {
// Old rollout entries may have renderable items but no persisted TurnStarted or
// turn_context ID. Seed the fallback from the known thread id when we have one; otherwise
// fall back to a per-history counter without inventing a misleading placeholder id.
match self.thread_id_seed.as_deref() {
Some(thread_id) => format!("codex-legacy-turn:{thread_id}:{synthetic_turn_index}"),
None => format!("codex-legacy-turn:{synthetic_turn_index}"),
}
}
fn upsert_item_in_turn_id(&mut self, turn_id: &str, item: ThreadItem) {
if let Some(turn) = self.current_turn.as_mut()
&& turn.id == turn_id
@@ -1200,6 +1305,7 @@ mod tests {
use super::*;
use crate::protocol::v2::CommandExecutionSource;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
use codex_protocol::items::HookPromptFragment as CoreHookPromptFragment;
use codex_protocol::items::TurnItem as CoreTurnItem;
@@ -1212,6 +1318,7 @@ mod tests {
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::DynamicToolCallResponseEvent;
@@ -1221,17 +1328,66 @@ mod tests {
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::protocol::WebSearchEndEvent;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::time::Duration;
use uuid::Uuid;
fn build_session_meta_line(thread_id: ThreadId) -> SessionMetaLine {
SessionMetaLine {
meta: SessionMeta {
id: thread_id,
forked_from_id: None,
timestamp: "2025-01-05T12:00:00Z".into(),
cwd: PathBuf::from("/"),
originator: "codex".into(),
cli_version: "0.0.0".into(),
source: SessionSource::Cli,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: Some("mock_provider".into()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}
}
fn build_turn_context(turn_id: Option<&str>) -> TurnContextItem {
TurnContextItem {
turn_id: turn_id.map(str::to_string),
trace_id: None,
cwd: PathBuf::from("/"),
current_date: None,
timezone: None,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
network: None,
model: "mock-model".into(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: ReasoningSummaryConfig::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}
}
#[test]
fn builds_multiple_turns_with_reasoning_items() {
@@ -1274,7 +1430,7 @@ mod tests {
assert_eq!(turns.len(), 2);
let first = &turns[0];
assert!(Uuid::parse_str(&first.id).is_ok());
assert_eq!(first.id, "codex-legacy-turn:0");
assert_eq!(first.status, TurnStatus::Completed);
assert_eq!(first.items.len(), 3);
assert_eq!(
@@ -1311,8 +1467,7 @@ mod tests {
);
let second = &turns[1];
assert!(Uuid::parse_str(&second.id).is_ok());
assert_ne!(first.id, second.id);
assert_eq!(second.id, "codex-legacy-turn:1");
assert_eq!(second.items.len(), 2);
assert_eq!(
second.items[0],
@@ -1335,6 +1490,149 @@ mod tests {
);
}
#[test]
fn uses_turn_context_id_for_legacy_implicit_turns() {
let items = vec![
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "legacy turn".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::TurnContext(build_turn_context(Some("legacy-turn-id"))),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "completed".into(),
phase: None,
memory_citation: None,
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].id, "legacy-turn-id");
}
#[test]
fn replays_legacy_implicit_turns_with_stable_ids() {
let items = vec![
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "first".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
phase: None,
memory_citation: None,
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
phase: None,
memory_citation: None,
})),
];
let first_replay = build_turns_from_rollout_items(&items);
let second_replay = build_turns_from_rollout_items(&items);
assert_eq!(first_replay, second_replay);
assert_ne!(first_replay[0].id, first_replay[1].id);
}
#[test]
fn caller_seeded_thread_id_is_used_for_legacy_implicit_turns() {
let items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "seeded".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "reply".into(),
phase: None,
memory_citation: None,
})),
];
let turns = build_turns_from_rollout_items_for_thread("thread-seed", &items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].id, "codex-legacy-turn:thread-seed:0");
}
#[test]
fn reset_preserves_live_thread_id_seed_and_synthetic_counter() {
let mut builder = ThreadHistoryBuilder::for_live("thread-seed".into());
builder.handle_event(&EventMsg::UserMessage(UserMessageEvent {
message: "first".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}));
let first_turn_id = builder
.active_turn_snapshot()
.expect("active turn snapshot")
.id;
assert_eq!(first_turn_id, "codex-legacy-turn:thread-seed:0");
builder.handle_event(&EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
}));
assert!(!builder.has_active_turn());
builder.reset();
builder.handle_event(&EventMsg::UserMessage(UserMessageEvent {
message: "second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}));
let second_turn_id = builder
.active_turn_snapshot()
.expect("active turn snapshot")
.id;
assert_eq!(second_turn_id, "codex-legacy-turn:thread-seed:1");
}
#[test]
fn preserves_explicit_turn_started_id_when_turn_context_differs() {
let items = vec![
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "explicit-turn-id".into(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
RolloutItem::TurnContext(build_turn_context(Some("different-turn-id"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "explicit".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "explicit-turn-id".into(),
last_agent_message: None,
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].id, "explicit-turn-id");
}
#[test]
fn ignores_non_plan_item_lifecycle_events() {
let turn_id = "turn-1";
@@ -1646,8 +1944,8 @@ mod tests {
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert!(Uuid::parse_str(&turns[0].id).is_ok());
assert!(Uuid::parse_str(&turns[1].id).is_ok());
assert_eq!(turns[0].id, "codex-legacy-turn:0");
assert_eq!(turns[1].id, "codex-legacy-turn:2");
assert_ne!(turns[0].id, turns[1].id);
assert_eq!(turns[0].status, TurnStatus::Completed);
assert_eq!(turns[1].status, TurnStatus::Completed);

View File

@@ -102,7 +102,7 @@ use codex_app_server_protocol::TurnPlanStep;
use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_app_server_protocol::build_turns_from_rollout_items_for_thread;
use codex_app_server_protocol::convert_patch_changes;
use codex_core::CodexThread;
use codex_core::ThreadManager;
@@ -1807,7 +1807,10 @@ pub(crate) async fn apply_bespoke_event_handling(
let mut thread = summary_to_thread(summary);
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
thread.turns = build_turns_from_rollout_items_for_thread(
thread.id.as_str(),
&items,
);
thread.status = thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

View File

@@ -174,7 +174,7 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification;
use codex_app_server_protocol::WindowsSandboxSetupMode;
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
use codex_app_server_protocol::WindowsSandboxSetupStartResponse;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_app_server_protocol::build_turns_from_rollout_items_for_thread;
use codex_arg0::Arg0DispatchPaths;
use codex_backend_client::Client as BackendClient;
use codex_chatgpt::connectors;
@@ -3507,7 +3507,8 @@ impl CodexMessageProcessor {
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
thread.turns =
build_turns_from_rollout_items_for_thread(thread.id.as_str(), &items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
@@ -7096,7 +7097,7 @@ impl CodexMessageProcessor {
if thread_state.listener_matches(&conversation) {
return;
}
thread_state.set_listener(cancel_tx, &conversation)
thread_state.set_listener(cancel_tx, &conversation, &conversation_id)
};
let ListenerTaskContext {
outgoing,
@@ -7689,7 +7690,7 @@ async fn populate_thread_turns(
ThreadTurnSource::RolloutPath(rollout_path) => {
read_rollout_items_from_rollout(rollout_path)
.await
.map(|items| build_turns_from_rollout_items(&items))
.map(|items| build_turns_from_rollout_items_for_thread(thread.id.as_str(), &items))
.map_err(|err| {
format!(
"failed to load rollout `{}` for thread {}: {err}",
@@ -7698,7 +7699,9 @@ async fn populate_thread_turns(
)
})?
}
ThreadTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items),
ThreadTurnSource::HistoryItems(items) => {
build_turns_from_rollout_items_for_thread(thread.id.as_str(), items)
}
};
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());

View File

@@ -74,6 +74,7 @@ impl ThreadState {
&mut self,
cancel_tx: oneshot::Sender<()>,
conversation: &Arc<CodexThread>,
conversation_id: &ThreadId,
) -> (mpsc::UnboundedReceiver<ThreadListenerCommand>, u64) {
if let Some(previous) = self.cancel_tx.replace(cancel_tx) {
let _ = previous.send(());
@@ -81,6 +82,7 @@ impl ThreadState {
self.listener_generation = self.listener_generation.wrapping_add(1);
let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel();
self.listener_command_tx = Some(listener_command_tx);
self.current_turn_history = ThreadHistoryBuilder::for_live(conversation_id.to_string());
self.listener_thread = Some(Arc::downgrade(conversation));
(listener_command_rx, self.listener_generation)
}
@@ -90,7 +92,7 @@ impl ThreadState {
let _ = cancel_tx.send(());
}
self.listener_command_tx = None;
self.current_turn_history.reset();
self.current_turn_history = ThreadHistoryBuilder::new();
self.listener_thread = None;
}

View File

@@ -42,13 +42,17 @@ use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
use codex_protocol::ThreadId;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary as CoreReasoningSummary;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AskForApproval as CoreAskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SandboxPolicy as CoreSandboxPolicy;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
@@ -224,6 +228,98 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_resume_and_read_use_persisted_turn_context_ids_for_legacy_history() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let filename_ts = "2025-01-05T12-00-00";
let meta_rfc3339 = "2025-01-05T12:00:00Z";
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
filename_ts,
meta_rfc3339,
"Saved user message",
Vec::new(),
Some("mock_provider"),
/*git_info*/ None,
)?;
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
let appended_turn_context = json!({
"timestamp": meta_rfc3339,
"type": "turn_context",
"payload": serde_json::to_value(TurnContextItem {
turn_id: Some("legacy-turn-id".into()),
trace_id: None,
cwd: PathBuf::from("/"),
current_date: None,
timezone: None,
approval_policy: CoreAskForApproval::Never,
sandbox_policy: CoreSandboxPolicy::new_read_only_policy(),
network: None,
model: "mock-model".into(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: CoreReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
})?,
})
.to_string();
std::fs::write(
&rollout_file_path,
format!("{persisted_rollout}{appended_turn_context}\n"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: conversation_id.clone(),
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse {
thread: read_thread,
} = to_response::<ThreadReadResponse>(read_resp)?;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: conversation_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_thread,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(read_thread.turns.len(), 1);
assert_eq!(resumed_thread.turns.len(), 1);
assert_eq!(read_thread.turns[0].id, "legacy-turn-id");
assert_eq!(resumed_thread.turns[0].id, "legacy-turn-id");
assert_eq!(read_thread.turns, resumed_thread.turns);
Ok(())
}
#[tokio::test]
async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;