diff --git a/codex-rs/.codex/pro/1757202005/agent-a b/codex-rs/.codex/pro/1757202005/agent-a new file mode 160000 index 0000000000..6e743e8496 --- /dev/null +++ b/codex-rs/.codex/pro/1757202005/agent-a @@ -0,0 +1 @@ +Subproject commit 6e743e84964bbb4e3aaa66abf16c3d00172cf578 diff --git a/codex-rs/.codex/pro/1757202005/agent-b b/codex-rs/.codex/pro/1757202005/agent-b new file mode 160000 index 0000000000..6e743e8496 --- /dev/null +++ b/codex-rs/.codex/pro/1757202005/agent-b @@ -0,0 +1 @@ +Subproject commit 6e743e84964bbb4e3aaa66abf16c3d00172cf578 diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5a304e4a3a..756c1edd33 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1409,7 +1409,12 @@ async fn submission_loop( id: sub_id.clone(), msg: EventMsg::ConversationHistory(ConversationPathResponseEvent { conversation_id: sess.conversation_id, - entries: sess.state.lock_unchecked().history.contents(), + path: sess + .rollout + .lock_unchecked() + .as_ref() + .map(|r| r.path().to_path_buf()) + .unwrap_or_default(), }), }; sess.send_event(event).await; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index b8e61514a1..cf8733ffa4 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -156,16 +156,19 @@ impl ConversationManager { /// caller's `config`). The new conversation will have a fresh id. pub async fn fork_conversation( &self, - conversation_history: Vec, + conversation_path: PathBuf, + conversation_id: ConversationId, num_messages_to_drop: usize, config: Config, ) -> CodexResult { // Compute the prefix up to the cut point. - let items: Vec = conversation_history - .into_iter() - .map(RolloutItem::from) - .collect(); - let history = truncate_after_dropping_last_messages(items, num_messages_to_drop); + let initial_history = RolloutRecorder::get_rollout_history(&conversation_path).await?; + let conversation_history = match initial_history { + InitialHistory::Resumed(items) => items, + InitialHistory::New => return Err(CodexErr::ConversationNotFound(conversation_id)), + }; + let history = + truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop); // Spawn a new conversation with the computed initial history. let auth_manager = self.auth_manager.clone(); diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 56462ab04b..7d73fc50ad 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -138,6 +138,9 @@ impl RolloutRecorderParams { } impl RolloutRecorder { + pub fn path(&self) -> &Path { + &self.path + } #[allow(dead_code)] /// List conversations (rollout files) under the provided Codex home directory. pub async fn list_conversations( @@ -314,10 +317,6 @@ impl RolloutRecorder { })) } - pub fn path(&self) -> &Path { - &self.path - } - pub async fn shutdown(&self) -> std::io::Result<()> { let (tx_done, rx_done) = oneshot::channel(); match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await { diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index d3efbc8c21..bc9a1ec368 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -77,13 +77,43 @@ async fn fork_conversation_twice_drops_to_first_message() { let base_history = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await; - // Capture entries from the base history and compute expected prefixes after each fork. - let entries_after_three: Vec = match &base_history { - EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => { - entries.clone() - } + // Capture path/id from the base history and compute expected prefixes after each fork. + let (base_conv_id, base_path) = match &base_history { + EventMsg::ConversationHistory(ConversationPathResponseEvent { + conversation_id, + path, + }) => (*conversation_id, path.clone()), _ => panic!("expected ConversationHistory event"), }; + + // Read entries from rollout file. + async fn read_response_entries(path: &std::path::Path) -> Vec { + let text = tokio::fs::read_to_string(path).await.unwrap_or_default(); + let mut out = Vec::new(); + for line in text.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(item) = serde_json::from_str::(line) { + out.push(item); + } + } + out + } + async fn read_response_entries_with_retry( + path: &std::path::Path, + min_len: usize, + ) -> Vec { + for _ in 0..50u32 { + let entries = read_response_entries(path).await; + if entries.len() >= min_len { + return entries; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + read_response_entries(path).await + } + let entries_after_three: Vec = read_response_entries(&base_path).await; // History layout for this test: // [0] user instructions, // [1] environment context, @@ -114,7 +144,7 @@ async fn fork_conversation_twice_drops_to_first_message() { conversation: codex_fork1, .. } = conversation_manager - .fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone()) + .fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone()) .await .expect("fork 1"); @@ -123,20 +153,23 @@ async fn fork_conversation_twice_drops_to_first_message() { matches!(ev, EventMsg::ConversationHistory(_)) }) .await; - let entries_after_first_fork: Vec = match &fork1_history { - EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => { - assert_eq!(entries, &expected_after_first); - entries.clone() - } + let (fork1_id, fork1_path) = match &fork1_history { + EventMsg::ConversationHistory(ConversationPathResponseEvent { + conversation_id, + path, + }) => (*conversation_id, path.clone()), _ => panic!("expected ConversationHistory event after first fork"), }; + let entries_after_first_fork: Vec = + read_response_entries_with_retry(&fork1_path, expected_after_first.len()).await; + assert_eq!(entries_after_first_fork, expected_after_first); // Fork again with n=1 → drops the (new) last user message, leaving only the first. let NewConversation { conversation: codex_fork2, .. } = conversation_manager - .fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone()) + .fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone()) .await .expect("fork 2"); @@ -145,10 +178,14 @@ async fn fork_conversation_twice_drops_to_first_message() { matches!(ev, EventMsg::ConversationHistory(_)) }) .await; - match &fork2_history { - EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => { - assert_eq!(entries, &expected_after_second); - } + let (_fork2_id, fork2_path) = match &fork2_history { + EventMsg::ConversationHistory(ConversationPathResponseEvent { + conversation_id, + path, + }) => (*conversation_id, path.clone()), _ => panic!("expected ConversationHistory event after second fork"), }; + let entries_after_second_fork: Vec = + read_response_entries_with_retry(&fork2_path, expected_after_second.len()).await; + assert_eq!(entries_after_second_fork, expected_after_second); } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 7157ea80f2..918dfb1f8f 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -801,7 +801,7 @@ pub struct WebSearchEndEvent { #[derive(Debug, Clone, Deserialize, Serialize, TS)] pub struct ConversationPathResponseEvent { pub conversation_id: ConversationId, - pub entries: Vec, + pub path: PathBuf, } #[derive(Debug, Clone, Deserialize, Serialize, TS)] diff --git a/codex-rs/tui/src/app_backtrack.rs b/codex-rs/tui/src/app_backtrack.rs index a36f3d4c6a..31aae3cbcc 100644 --- a/codex-rs/tui/src/app_backtrack.rs +++ b/codex-rs/tui/src/app_backtrack.rs @@ -288,7 +288,7 @@ impl App { let cfg = self.chat_widget.config_ref().clone(); // Perform the fork via a thin wrapper for clarity/testability. let result = self - .perform_fork(ev.entries.clone(), drop_count, cfg.clone()) + .perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone()) .await; // We aren't using the initial history UI replay in session configured because we have more accurate version of the history. match result { @@ -302,12 +302,13 @@ impl App { /// Thin wrapper around ConversationManager::fork_conversation. async fn perform_fork( &self, - conversation_history: Vec, + conversation_path: std::path::PathBuf, + conversation_id: codex_protocol::mcp_protocol::ConversationId, drop_count: usize, cfg: codex_core::config::Config, ) -> codex_core::error::Result { self.server - .fork_conversation(conversation_history, drop_count, cfg) + .fork_conversation(conversation_path, conversation_id, drop_count, cfg) .await }