mirror of
https://github.com/openai/codex.git
synced 2026-04-25 00:41:46 +03:00
Compare commits
2 Commits
dev/window
...
daniels-oa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3423a0b23d | ||
|
|
4c73b17957 |
@@ -41,9 +41,11 @@ use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::protocol::ViewImageToolCallEvent;
|
||||
@@ -51,7 +53,6 @@ use codex_protocol::protocol::WebSearchBeginEvent;
|
||||
use codex_protocol::protocol::WebSearchEndEvent;
|
||||
use std::collections::HashMap;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
|
||||
@@ -63,7 +64,22 @@ use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
||||
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
let mut builder = ThreadHistoryBuilder::for_replay(/*thread_id*/ None);
|
||||
for item in items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
/// Replay persisted rollout items for a known thread.
|
||||
///
|
||||
/// Passing the thread id explicitly keeps deterministic legacy fallback ids stable
|
||||
/// even if the rollout is missing `session_meta`.
|
||||
pub fn build_turns_from_rollout_items_for_thread(
|
||||
thread_id: &str,
|
||||
items: &[RolloutItem],
|
||||
) -> Vec<Turn> {
|
||||
let mut builder = ThreadHistoryBuilder::for_replay(Some(thread_id.to_owned()));
|
||||
for item in items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
@@ -73,6 +89,9 @@ pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
pub struct ThreadHistoryBuilder {
|
||||
turns: Vec<Turn>,
|
||||
current_turn: Option<PendingTurn>,
|
||||
thread_id_seed: Option<String>,
|
||||
pending_implicit_turn_id: Option<String>,
|
||||
next_synthetic_turn_index: usize,
|
||||
next_item_index: i64,
|
||||
current_rollout_index: usize,
|
||||
next_rollout_index: usize,
|
||||
@@ -86,9 +105,24 @@ impl Default for ThreadHistoryBuilder {
|
||||
|
||||
impl ThreadHistoryBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self::for_replay(/*thread_id*/ None)
|
||||
}
|
||||
|
||||
pub fn for_replay(thread_id: Option<String>) -> Self {
|
||||
Self::with_thread_id_seed(thread_id)
|
||||
}
|
||||
|
||||
pub fn for_live(thread_id: String) -> Self {
|
||||
Self::with_thread_id_seed(Some(thread_id))
|
||||
}
|
||||
|
||||
fn with_thread_id_seed(thread_id_seed: Option<String>) -> Self {
|
||||
Self {
|
||||
turns: Vec::new(),
|
||||
current_turn: None,
|
||||
thread_id_seed,
|
||||
pending_implicit_turn_id: None,
|
||||
next_synthetic_turn_index: 0,
|
||||
next_item_index: 1,
|
||||
current_rollout_index: 0,
|
||||
next_rollout_index: 0,
|
||||
@@ -96,7 +130,12 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::new();
|
||||
self.turns.clear();
|
||||
self.current_turn = None;
|
||||
self.pending_implicit_turn_id = None;
|
||||
self.next_item_index = 1;
|
||||
self.current_rollout_index = 0;
|
||||
self.next_rollout_index = 0;
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Vec<Turn> {
|
||||
@@ -205,10 +244,53 @@ impl ThreadHistoryBuilder {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
|
||||
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
|
||||
RolloutItem::TurnContext(payload) => self.handle_turn_context(payload),
|
||||
RolloutItem::SessionMeta(payload) => self.handle_session_meta(payload),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_session_meta(&mut self, payload: &SessionMetaLine) {
|
||||
let session_thread_id = payload.meta.id.to_string();
|
||||
match self.thread_id_seed.as_deref() {
|
||||
Some(thread_id_seed) if thread_id_seed != session_thread_id => {
|
||||
warn!(
|
||||
thread_id_seed,
|
||||
session_meta_thread_id = session_thread_id.as_str(),
|
||||
"thread history builder saw mismatched session_meta thread id"
|
||||
);
|
||||
}
|
||||
Some(_) => {}
|
||||
None => self.thread_id_seed = Some(session_thread_id),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_turn_context(&mut self, payload: &TurnContextItem) {
|
||||
let Some(turn_id) = payload.turn_id.as_ref() else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(current_turn) = self.current_turn.as_mut() {
|
||||
if current_turn.opened_explicitly {
|
||||
if current_turn.id != *turn_id {
|
||||
warn!(
|
||||
active_turn_id = current_turn.id.as_str(),
|
||||
context_turn_id = turn_id,
|
||||
"ignoring mismatched turn_context turn_id for explicit turn"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
current_turn.id.clone_from(turn_id);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if self.turns.last().is_some_and(|turn| turn.id == *turn_id) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.pending_implicit_turn_id = Some(turn_id.clone());
|
||||
}
|
||||
|
||||
fn handle_response_item(&mut self, item: &codex_protocol::models::ResponseItem) {
|
||||
let codex_protocol::models::ResponseItem::Message {
|
||||
role, content, id, ..
|
||||
@@ -912,6 +994,7 @@ impl ThreadHistoryBuilder {
|
||||
|
||||
fn handle_turn_started(&mut self, payload: &TurnStartedEvent) {
|
||||
self.finish_current_turn();
|
||||
self.pending_implicit_turn_id = None;
|
||||
self.current_turn = Some(
|
||||
self.new_turn(Some(payload.turn_id.clone()))
|
||||
.with_status(TurnStatus::InProgress)
|
||||
@@ -987,7 +1070,7 @@ impl ThreadHistoryBuilder {
|
||||
|
||||
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
||||
PendingTurn {
|
||||
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
||||
id: id.unwrap_or_else(|| self.take_next_implicit_turn_id()),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
@@ -1010,6 +1093,28 @@ impl ThreadHistoryBuilder {
|
||||
unreachable!("current turn must exist after initialization");
|
||||
}
|
||||
|
||||
fn take_next_implicit_turn_id(&mut self) -> String {
|
||||
// Newer legacy-history entries persist a turn_context before the events that need an
|
||||
// implicit turn. Prefer that real ID over synthesizing one.
|
||||
if let Some(turn_id) = self.pending_implicit_turn_id.take() {
|
||||
return turn_id;
|
||||
}
|
||||
|
||||
let synthetic_turn_index = self.next_synthetic_turn_index;
|
||||
self.next_synthetic_turn_index += 1;
|
||||
self.synthetic_turn_id(synthetic_turn_index)
|
||||
}
|
||||
|
||||
fn synthetic_turn_id(&self, synthetic_turn_index: usize) -> String {
|
||||
// Old rollout entries may have renderable items but no persisted TurnStarted or
|
||||
// turn_context ID. Seed the fallback from the known thread id when we have one; otherwise
|
||||
// fall back to a per-history counter without inventing a misleading placeholder id.
|
||||
match self.thread_id_seed.as_deref() {
|
||||
Some(thread_id) => format!("codex-legacy-turn:{thread_id}:{synthetic_turn_index}"),
|
||||
None => format!("codex-legacy-turn:{synthetic_turn_index}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn upsert_item_in_turn_id(&mut self, turn_id: &str, item: ThreadItem) {
|
||||
if let Some(turn) = self.current_turn.as_mut()
|
||||
&& turn.id == turn_id
|
||||
@@ -1200,6 +1305,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::v2::CommandExecutionSource;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
|
||||
use codex_protocol::items::HookPromptFragment as CoreHookPromptFragment;
|
||||
use codex_protocol::items::TurnItem as CoreTurnItem;
|
||||
@@ -1212,6 +1318,7 @@ mod tests {
|
||||
use codex_protocol::protocol::AgentReasoningEvent;
|
||||
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::DynamicToolCallResponseEvent;
|
||||
@@ -1221,17 +1328,66 @@ mod tests {
|
||||
use codex_protocol::protocol::McpInvocation;
|
||||
use codex_protocol::protocol::McpToolCallEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::protocol::WebSearchEndEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn build_session_meta_line(thread_id: ThreadId) -> SessionMetaLine {
|
||||
SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
forked_from_id: None,
|
||||
timestamp: "2025-01-05T12:00:00Z".into(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".into(),
|
||||
cli_version: "0.0.0".into(),
|
||||
source: SessionSource::Cli,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("mock_provider".into()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_turn_context(turn_id: Option<&str>) -> TurnContextItem {
|
||||
TurnContextItem {
|
||||
turn_id: turn_id.map(str::to_string),
|
||||
trace_id: None,
|
||||
cwd: PathBuf::from("/"),
|
||||
current_date: None,
|
||||
timezone: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
network: None,
|
||||
model: "mock-model".into(),
|
||||
personality: None,
|
||||
collaboration_mode: None,
|
||||
realtime_active: None,
|
||||
effort: None,
|
||||
summary: ReasoningSummaryConfig::Auto,
|
||||
user_instructions: None,
|
||||
developer_instructions: None,
|
||||
final_output_json_schema: None,
|
||||
truncation_policy: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn builds_multiple_turns_with_reasoning_items() {
|
||||
@@ -1274,7 +1430,7 @@ mod tests {
|
||||
assert_eq!(turns.len(), 2);
|
||||
|
||||
let first = &turns[0];
|
||||
assert!(Uuid::parse_str(&first.id).is_ok());
|
||||
assert_eq!(first.id, "codex-legacy-turn:0");
|
||||
assert_eq!(first.status, TurnStatus::Completed);
|
||||
assert_eq!(first.items.len(), 3);
|
||||
assert_eq!(
|
||||
@@ -1311,8 +1467,7 @@ mod tests {
|
||||
);
|
||||
|
||||
let second = &turns[1];
|
||||
assert!(Uuid::parse_str(&second.id).is_ok());
|
||||
assert_ne!(first.id, second.id);
|
||||
assert_eq!(second.id, "codex-legacy-turn:1");
|
||||
assert_eq!(second.items.len(), 2);
|
||||
assert_eq!(
|
||||
second.items[0],
|
||||
@@ -1335,6 +1490,149 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn uses_turn_context_id_for_legacy_implicit_turns() {
|
||||
let items = vec![
|
||||
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "legacy turn".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::TurnContext(build_turn_context(Some("legacy-turn-id"))),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "completed".into(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].id, "legacy-turn-id");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replays_legacy_implicit_turns_with_stable_ids() {
|
||||
let items = vec![
|
||||
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "first".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "A1".into(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "second".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "A2".into(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let first_replay = build_turns_from_rollout_items(&items);
|
||||
let second_replay = build_turns_from_rollout_items(&items);
|
||||
|
||||
assert_eq!(first_replay, second_replay);
|
||||
assert_ne!(first_replay[0].id, first_replay[1].id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn caller_seeded_thread_id_is_used_for_legacy_implicit_turns() {
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "seeded".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "reply".into(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items_for_thread("thread-seed", &items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].id, "codex-legacy-turn:thread-seed:0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reset_preserves_live_thread_id_seed_and_synthetic_counter() {
|
||||
let mut builder = ThreadHistoryBuilder::for_live("thread-seed".into());
|
||||
builder.handle_event(&EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "first".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}));
|
||||
|
||||
let first_turn_id = builder
|
||||
.active_turn_snapshot()
|
||||
.expect("active turn snapshot")
|
||||
.id;
|
||||
assert_eq!(first_turn_id, "codex-legacy-turn:thread-seed:0");
|
||||
|
||||
builder.handle_event(&EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
}));
|
||||
assert!(!builder.has_active_turn());
|
||||
|
||||
builder.reset();
|
||||
builder.handle_event(&EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "second".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}));
|
||||
|
||||
let second_turn_id = builder
|
||||
.active_turn_snapshot()
|
||||
.expect("active turn snapshot")
|
||||
.id;
|
||||
assert_eq!(second_turn_id, "codex-legacy-turn:thread-seed:1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preserves_explicit_turn_started_id_when_turn_context_differs() {
|
||||
let items = vec![
|
||||
RolloutItem::SessionMeta(build_session_meta_line(ThreadId::new())),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "explicit-turn-id".into(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::TurnContext(build_turn_context(Some("different-turn-id"))),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "explicit".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "explicit-turn-id".into(),
|
||||
last_agent_message: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].id, "explicit-turn-id");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_non_plan_item_lifecycle_events() {
|
||||
let turn_id = "turn-1";
|
||||
@@ -1646,8 +1944,8 @@ mod tests {
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 2);
|
||||
assert!(Uuid::parse_str(&turns[0].id).is_ok());
|
||||
assert!(Uuid::parse_str(&turns[1].id).is_ok());
|
||||
assert_eq!(turns[0].id, "codex-legacy-turn:0");
|
||||
assert_eq!(turns[1].id, "codex-legacy-turn:2");
|
||||
assert_ne!(turns[0].id, turns[1].id);
|
||||
assert_eq!(turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(turns[1].status, TurnStatus::Completed);
|
||||
|
||||
@@ -102,7 +102,7 @@ use codex_app_server_protocol::TurnPlanStep;
|
||||
use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items_for_thread;
|
||||
use codex_app_server_protocol::convert_patch_changes;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ThreadManager;
|
||||
@@ -1807,7 +1807,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
let mut thread = summary_to_thread(summary);
|
||||
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(items) => {
|
||||
thread.turns = build_turns_from_rollout_items(&items);
|
||||
thread.turns = build_turns_from_rollout_items_for_thread(
|
||||
thread.id.as_str(),
|
||||
&items,
|
||||
);
|
||||
thread.status = thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await;
|
||||
|
||||
@@ -174,7 +174,7 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupMode;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupStartResponse;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items_for_thread;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_backend_client::Client as BackendClient;
|
||||
use codex_chatgpt::connectors;
|
||||
@@ -3507,7 +3507,8 @@ impl CodexMessageProcessor {
|
||||
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
|
||||
match read_rollout_items_from_rollout(rollout_path).await {
|
||||
Ok(items) => {
|
||||
thread.turns = build_turns_from_rollout_items(&items);
|
||||
thread.turns =
|
||||
build_turns_from_rollout_items_for_thread(thread.id.as_str(), &items);
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
self.send_invalid_request_error(
|
||||
@@ -7096,7 +7097,7 @@ impl CodexMessageProcessor {
|
||||
if thread_state.listener_matches(&conversation) {
|
||||
return;
|
||||
}
|
||||
thread_state.set_listener(cancel_tx, &conversation)
|
||||
thread_state.set_listener(cancel_tx, &conversation, &conversation_id)
|
||||
};
|
||||
let ListenerTaskContext {
|
||||
outgoing,
|
||||
@@ -7689,7 +7690,7 @@ async fn populate_thread_turns(
|
||||
ThreadTurnSource::RolloutPath(rollout_path) => {
|
||||
read_rollout_items_from_rollout(rollout_path)
|
||||
.await
|
||||
.map(|items| build_turns_from_rollout_items(&items))
|
||||
.map(|items| build_turns_from_rollout_items_for_thread(thread.id.as_str(), &items))
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {}: {err}",
|
||||
@@ -7698,7 +7699,9 @@ async fn populate_thread_turns(
|
||||
)
|
||||
})?
|
||||
}
|
||||
ThreadTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items),
|
||||
ThreadTurnSource::HistoryItems(items) => {
|
||||
build_turns_from_rollout_items_for_thread(thread.id.as_str(), items)
|
||||
}
|
||||
};
|
||||
if let Some(active_turn) = active_turn {
|
||||
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
|
||||
|
||||
@@ -74,6 +74,7 @@ impl ThreadState {
|
||||
&mut self,
|
||||
cancel_tx: oneshot::Sender<()>,
|
||||
conversation: &Arc<CodexThread>,
|
||||
conversation_id: &ThreadId,
|
||||
) -> (mpsc::UnboundedReceiver<ThreadListenerCommand>, u64) {
|
||||
if let Some(previous) = self.cancel_tx.replace(cancel_tx) {
|
||||
let _ = previous.send(());
|
||||
@@ -81,6 +82,7 @@ impl ThreadState {
|
||||
self.listener_generation = self.listener_generation.wrapping_add(1);
|
||||
let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel();
|
||||
self.listener_command_tx = Some(listener_command_tx);
|
||||
self.current_turn_history = ThreadHistoryBuilder::for_live(conversation_id.to_string());
|
||||
self.listener_thread = Some(Arc::downgrade(conversation));
|
||||
(listener_command_rx, self.listener_generation)
|
||||
}
|
||||
@@ -90,7 +92,7 @@ impl ThreadState {
|
||||
let _ = cancel_tx.send(());
|
||||
}
|
||||
self.listener_command_tx = None;
|
||||
self.current_turn_history.reset();
|
||||
self.current_turn_history = ThreadHistoryBuilder::new();
|
||||
self.listener_thread = None;
|
||||
}
|
||||
|
||||
|
||||
@@ -42,13 +42,17 @@ use codex_core::auth::AuthCredentialsStoreMode;
|
||||
use codex_core::auth::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary as CoreReasoningSummary;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AskForApproval as CoreAskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SandboxPolicy as CoreSandboxPolicy;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
@@ -224,6 +228,98 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_and_read_use_persisted_turn_context_ids_for_legacy_history() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let conversation_id = create_fake_rollout_with_text_elements(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
"Saved user message",
|
||||
Vec::new(),
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
||||
let appended_turn_context = json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "turn_context",
|
||||
"payload": serde_json::to_value(TurnContextItem {
|
||||
turn_id: Some("legacy-turn-id".into()),
|
||||
trace_id: None,
|
||||
cwd: PathBuf::from("/"),
|
||||
current_date: None,
|
||||
timezone: None,
|
||||
approval_policy: CoreAskForApproval::Never,
|
||||
sandbox_policy: CoreSandboxPolicy::new_read_only_policy(),
|
||||
network: None,
|
||||
model: "mock-model".into(),
|
||||
personality: None,
|
||||
collaboration_mode: None,
|
||||
realtime_active: None,
|
||||
effort: None,
|
||||
summary: CoreReasoningSummary::Auto,
|
||||
user_instructions: None,
|
||||
developer_instructions: None,
|
||||
final_output_json_schema: None,
|
||||
truncation_policy: None,
|
||||
})?,
|
||||
})
|
||||
.to_string();
|
||||
std::fs::write(
|
||||
&rollout_file_path,
|
||||
format!("{persisted_rollout}{appended_turn_context}\n"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: read_thread,
|
||||
} = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed_thread,
|
||||
..
|
||||
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(read_thread.turns.len(), 1);
|
||||
assert_eq!(resumed_thread.turns.len(), 1);
|
||||
assert_eq!(read_thread.turns[0].id, "legacy-turn-id");
|
||||
assert_eq!(resumed_thread.turns[0].id, "legacy-turn-id");
|
||||
assert_eq!(read_thread.turns, resumed_thread.turns);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
Reference in New Issue
Block a user