Compare commits

...

19 Commits

Author SHA1 Message Date
Charles Cunningham
37cd45f0fb codex: fix Bazel macOS shell snapshot timeout on PR #15239
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:15 -07:00
Charles Cunningham
8b801eb996 Fix explicit turn start index test
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:15 -07:00
Charles Cunningham
f3b01e150b Handle implicit mid-turn fork snapshots
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:15 -07:00
Charles Cunningham
846f01d1b2 Fix fork snapshot truncation boundary
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:15 -07:00
Charles Cunningham
0909852df1 Only treat in-progress snapshots as mid-turn
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:14 -07:00
Charles Cunningham
d2f1845358 Avoid synthetic turn ids in interrupted fork snapshots
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:12 -07:00
Charles Cunningham
a3426a1aba Treat interrupted fork snapshots as closed
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:12 -07:00
Charles Cunningham
c67dacf1c0 dd extraneous diff 2026-03-23 19:05:12 -07:00
Charles Cunningham
17f743eb86 Derive interrupted fork turn ids from persisted history
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:05:12 -07:00
Charles Cunningham
96fe0ed92d Persist abort boundary in interrupted fork snapshots
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:48 -07:00
Charles Cunningham
ab1eb798ff Fix interrupted fork snapshot detection
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:48 -07:00
Charles Cunningham
cb98305eab address review 2026-03-23 19:03:48 -07:00
Charles Cunningham
a704387e8e Fix test 2026-03-23 19:03:48 -07:00
Charles Cunningham
91487b7b9e Add comment 2026-03-23 19:03:48 -07:00
Charles Cunningham
f2099ad6b9 Use interrupted snapshots for full thread forks
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:48 -07:00
Charles Cunningham
e5e3a1788c Replace ForkSnapshotMode with ForkSnapshot
Encode valid fork snapshot semantics directly in the API by separating committed-prefix truncation from the interrupted snapshot case.

Validation:
- cargo check -p codex-core -p codex-app-server -p codex-tui
- cargo test -p codex-core interrupted_fork_snapshot_appends_interrupt_marker
- just fmt

Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:48 -07:00
Charles Cunningham
095e8bcf2f Relax resumed snapshot invariant
Keep the local truncate invariant visible in debug builds, but fall back to forked history instead of panicking if a resumed history ever reaches snapshot_fork_history.

Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:48 -07:00
Charles Cunningham
51ee2891f5 Tighten empty interrupted fork snapshots
Keep interrupted fork snapshots empty when truncation leaves no history, so we do not manufacture a standalone <turn_aborted> marker without a prior turn boundary.

Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:47 -07:00
Charles Cunningham
756f4a3ecf Add fork snapshot modes
Introduce ForkSnapshotMode so callers can choose between a committed fork and one that appends the same in-distribution interrupt marker used by the live interrupt path.

Co-authored-by: Codex <noreply@openai.com>
2026-03-23 19:03:47 -07:00
14 changed files with 830 additions and 51 deletions

View File

@@ -74,6 +74,8 @@ pub struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_item_index: i64,
current_rollout_index: usize,
next_rollout_index: usize,
}
impl Default for ThreadHistoryBuilder {
@@ -88,6 +90,8 @@ impl ThreadHistoryBuilder {
turns: Vec::new(),
current_turn: None,
next_item_index: 1,
current_rollout_index: 0,
next_rollout_index: 0,
}
}
@@ -111,6 +115,19 @@ impl ThreadHistoryBuilder {
self.current_turn.is_some()
}
pub fn active_turn_id_if_explicit(&self) -> Option<String> {
self.current_turn
.as_ref()
.filter(|turn| turn.opened_explicitly)
.map(|turn| turn.id.clone())
}
pub fn active_turn_start_index(&self) -> Option<usize> {
self.current_turn
.as_ref()
.map(|turn| turn.rollout_start_index)
}
/// Shared reducer for persisted rollout replay and in-memory current-turn
/// tracking used by running thread resume/rejoin.
///
@@ -182,6 +199,8 @@ impl ThreadHistoryBuilder {
}
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
self.current_rollout_index = self.next_rollout_index;
self.next_rollout_index += 1;
match item {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
@@ -974,6 +993,7 @@ impl ThreadHistoryBuilder {
status: TurnStatus::Completed,
opened_explicitly: false,
saw_compaction: false,
rollout_start_index: self.current_rollout_index,
}
}
@@ -1137,6 +1157,8 @@ struct PendingTurn {
/// True when this turn includes a persisted `RolloutItem::Compacted`, which
/// should keep the turn from being dropped even without normal items.
saw_compaction: bool,
/// Index of the rollout item that opened this turn during replay.
rollout_start_index: usize,
}
impl PendingTurn {

View File

@@ -125,7 +125,7 @@ Example with notification opt-out:
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
@@ -240,7 +240,7 @@ Example:
{ "id": 11, "result": { "thread": { "id": "thr_123", } } }
```
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. Pass `ephemeral: true` when the fork should stay in-memory only:
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only:
```json
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123", "ephemeral": true } }

View File

@@ -183,6 +183,7 @@ use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::CodexThread;
use codex_core::Cursor as RolloutCursor;
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
@@ -4039,7 +4040,7 @@ impl CodexMessageProcessor {
} = match self
.thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::Interrupted,
config,
rollout_path.clone(),
persist_extended_history,
@@ -6508,7 +6509,7 @@ impl CodexMessageProcessor {
} = self
.thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::Interrupted,
config,
rollout_path,
/*persist_extended_history*/ false,

View File

@@ -97,6 +97,7 @@ mod seatbelt_permissions;
mod thread_manager;
pub mod web_search;
pub mod windows_sandbox_read_grants;
pub use thread_manager::ForkSnapshot;
pub use thread_manager::NewThread;
pub use thread_manager::ThreadManager;
#[deprecated(note = "use ThreadManager")]

View File

@@ -46,8 +46,8 @@ pub(crate) fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<us
/// a prefix that excludes the first user message and everything after it).
///
/// If `n_from_start` is `usize::MAX`, this returns the full rollout (no truncation).
/// If fewer than or equal to `n_from_start` user messages exist, this returns an empty
/// vector (out of range).
/// If fewer than or equal to `n_from_start` user messages exist, this returns the full
/// rollout unchanged.
pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items: &[RolloutItem],
n_from_start: usize,
@@ -58,9 +58,9 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
let user_positions = user_message_positions_in_rollout(items);
// If fewer than or equal to n user messages exist, treat as empty (out of range).
// If fewer than or equal to n user messages exist, keep the full rollout.
if user_positions.len() <= n_from_start {
return Vec::new();
return items.to_vec();
}
// Cut strictly before the nth user message (do not keep the nth itself).

View File

@@ -1,6 +1,5 @@
use super::*;
use crate::codex::make_session_and_context;
use assert_matches::assert_matches;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::protocol::ThreadRolledBackEvent;
@@ -74,7 +73,10 @@ fn truncates_rollout_from_start_before_nth_user_only() {
);
let truncated2 = truncate_rollout_before_nth_user_message_from_start(&rollout, 2);
assert_matches!(truncated2.as_slice(), []);
assert_eq!(
serde_json::to_value(&truncated2).unwrap(),
serde_json::to_value(&rollout).unwrap()
);
}
#[test]

View File

@@ -22,6 +22,7 @@ use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::contextual_user_message::TURN_ABORTED_CLOSE_TAG;
use crate::contextual_user_message::TURN_ABORTED_OPEN_TAG;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
@@ -60,6 +61,22 @@ pub(crate) use user_shell::execute_user_shell_command;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
/// Shared model-visible marker used by both the real interrupt path and
/// interrupted fork snapshots.
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!(
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n{TURN_ABORTED_CLOSE_TAG}"
),
}],
end_turn: None,
phase: None,
}
}
fn emit_turn_network_proxy_metric(
session_telemetry: &SessionTelemetry,
network_proxy_active: bool,
@@ -457,17 +474,7 @@ impl Session {
if reason == TurnAbortReason::Interrupted {
self.cleanup_after_interrupt(&task.turn_context).await;
let marker = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!(
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n</turn_aborted>"
),
}],
end_turn: None,
phase: None,
};
let marker = interrupted_turn_history_marker();
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
.await;
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])

View File

@@ -24,6 +24,9 @@ use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillsManager;
use crate::tasks::interrupted_turn_history_marker;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
#[cfg(test)]
@@ -34,6 +37,8 @@ use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::W3cTraceContext;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
@@ -126,6 +131,45 @@ pub struct NewThread {
pub session_configured: SessionConfiguredEvent,
}
// TODO(ccunningham): Add an explicit non-interrupting live-turn snapshot once
// core can represent sampling boundaries directly instead of relying on
// whichever items happened to be persisted mid-turn.
//
// Two likely future variants:
// - `TruncateToLastSamplingBoundary` for callers that want a coherent fork from
// the last stable model boundary without synthesizing an interrupt.
// - `WaitUntilNextSamplingBoundary` (or similar) for callers that prefer to
// fork after the next sampling boundary rather than interrupting immediately.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ForkSnapshot {
/// Fork a committed prefix ending strictly before the nth user message.
///
/// When `n` is within range, this cuts before that 0-based user-message
/// boundary. When `n` is out of range and the source thread is currently
/// mid-turn, this instead cuts before the active turn's opening boundary
/// so the fork drops the unfinished turn suffix. When `n` is out of range
/// and the source thread is already at a turn boundary, this returns the
/// full committed history unchanged.
TruncateBeforeNthUserMessage(usize),
/// Fork the current persisted history as if the source thread had been
/// interrupted now.
///
/// If the persisted snapshot ends mid-turn, this appends the same
/// `<turn_aborted>` marker produced by a real interrupt. If the snapshot is
/// already at a turn boundary, this returns the current persisted history
/// unchanged.
Interrupted,
}
/// Preserve legacy `fork_thread(usize, ...)` callsites by mapping them to the
/// existing truncate-before-nth-user-message snapshot mode.
impl From<usize> for ForkSnapshot {
fn from(value: usize) -> Self {
Self::TruncateBeforeNthUserMessage(value)
}
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ThreadShutdownReport {
pub completed: Vec<ThreadId>,
@@ -541,20 +585,41 @@ impl ThreadManager {
report
}
/// Fork an existing thread by taking messages up to the given position (not including
/// the message at the given position) and starting a new thread with identical
/// configuration (unless overridden by the caller's `config`). The new thread will have
/// a fresh id. Pass `usize::MAX` to keep the full rollout history.
pub async fn fork_thread(
/// Fork an existing thread by snapshotting rollout history according to
/// `snapshot` and starting a new thread with identical configuration
/// (unless overridden by the caller's `config`). The new thread will have
/// a fresh id.
pub async fn fork_thread<S>(
&self,
nth_user_message: usize,
snapshot: S,
config: Config,
path: PathBuf,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
) -> CodexResult<NewThread>
where
S: Into<ForkSnapshot>,
{
let snapshot = snapshot.into();
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message);
let snapshot_state = snapshot_turn_state(&history);
let history = match snapshot {
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
}
ForkSnapshot::Interrupted => {
let history = match history {
InitialHistory::New => InitialHistory::New,
InitialHistory::Forked(history) => InitialHistory::Forked(history),
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
};
if snapshot_state.ends_mid_turn {
append_interrupted_boundary(history, snapshot_state.active_turn_id)
} else {
history
}
}
};
Box::pin(self.state.spawn_thread(
config,
history,
@@ -838,11 +903,31 @@ impl ThreadManagerState {
}
}
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
/// Return a fork snapshot cut strictly before the nth user message (0-based).
///
/// Out-of-range values keep the full committed history at a turn boundary, but
/// when the source thread is currently mid-turn they fall back to cutting
/// before the active turn's opening boundary so the fork omits the unfinished
/// suffix entirely.
fn truncate_before_nth_user_message(
history: InitialHistory,
n: usize,
snapshot_state: &SnapshotTurnState,
) -> InitialHistory {
let items: Vec<RolloutItem> = history.get_rollout_items();
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
let user_positions = truncation::user_message_positions_in_rollout(&items);
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
if let Some(cut_idx) = snapshot_state
.active_turn_start_index
.or_else(|| user_positions.last().copied())
{
items[..cut_idx].to_vec()
} else {
items
}
} else {
truncation::truncate_rollout_before_nth_user_message_from_start(&items, n)
};
if rolled.is_empty() {
InitialHistory::New
@@ -851,6 +936,107 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
}
}
#[derive(Debug, Eq, PartialEq)]
struct SnapshotTurnState {
ends_mid_turn: bool,
active_turn_id: Option<String>,
active_turn_start_index: Option<usize>,
}
fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState {
let rollout_items = history.get_rollout_items();
let mut builder = ThreadHistoryBuilder::new();
for item in &rollout_items {
builder.handle_rollout_item(item);
}
let active_turn_id = builder.active_turn_id_if_explicit();
if builder.has_active_turn() && active_turn_id.is_some() {
let active_turn_snapshot = builder.active_turn_snapshot();
if active_turn_snapshot
.as_ref()
.is_some_and(|turn| turn.status != TurnStatus::InProgress)
{
return SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
};
}
return SnapshotTurnState {
ends_mid_turn: true,
active_turn_id,
active_turn_start_index: builder.active_turn_start_index(),
};
}
let Some(last_user_position) = truncation::user_message_positions_in_rollout(&rollout_items)
.last()
.copied()
else {
return SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
};
};
// Synthetic fork/resume histories can contain user/assistant response items
// without explicit turn lifecycle events. If the persisted snapshot has no
// terminating boundary after its last user message, treat it as mid-turn.
SnapshotTurnState {
ends_mid_turn: !rollout_items[last_user_position + 1..].iter().any(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_))
)
}),
active_turn_id: None,
active_turn_start_index: None,
}
}
/// Append the same persisted interrupt boundary used by the live interrupt path
/// to an existing fork snapshot after the source thread has been confirmed to
/// be mid-turn.
fn append_interrupted_boundary(history: InitialHistory) -> InitialHistory {
let turn_id = {
let rollout_items = history.get_rollout_items();
let mut builder = ThreadHistoryBuilder::new();
for item in &rollout_items {
builder.handle_rollout_item(item);
}
if builder.has_active_turn() {
builder.active_turn_snapshot().map(|turn| turn.id)
} else {
None
}
};
let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
turn_id,
reason: TurnAbortReason::Interrupted,
}));
match history {
InitialHistory::New => InitialHistory::Forked(vec![
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
aborted_event,
]),
InitialHistory::Forked(mut history) => {
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
history.push(aborted_event);
InitialHistory::Forked(history)
}
InitialHistory::Resumed(mut resumed) => {
resumed
.history
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
resumed.history.push(aborted_event);
InitialHistory::Forked(resumed.history)
}
}
}
#[cfg(test)]
#[path = "thread_manager_tests.rs"]
mod tests;

View File

@@ -3,11 +3,15 @@ use crate::codex::make_session_and_context;
use crate::config::test_config;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::manager::RefreshStrategy;
use assert_matches::assert_matches;
use crate::rollout::RolloutRecorder;
use crate::tasks::interrupted_turn_history_marker;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use core_test_support::responses::mount_models_once;
use pretty_assertions::assert_eq;
use std::time::Duration;
@@ -38,7 +42,7 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
#[test]
fn drops_from_last_user_only() {
fn truncates_before_requested_user_message() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
@@ -68,7 +72,15 @@ fn drops_from_last_user_only() {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let truncated = truncate_before_nth_user_message(
InitialHistory::Forked(initial),
1,
&SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
},
);
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
@@ -85,8 +97,99 @@ fn drops_from_last_user_only() {
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
assert_matches!(truncated2, InitialHistory::New);
let truncated2 = truncate_before_nth_user_message(
InitialHistory::Forked(initial2.clone()),
2,
&SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
},
);
assert_eq!(
serde_json::to_value(truncated2.get_rollout_items()).unwrap(),
serde_json::to_value(initial2).unwrap()
);
}
#[test]
fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
let items = vec![
RolloutItem::ResponseItem(user_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::ResponseItem(user_msg("u2")),
RolloutItem::ResponseItem(assistant_msg("partial")),
];
let truncated = truncate_before_nth_user_message(
InitialHistory::Forked(items.clone()),
usize::MAX,
&SnapshotTurnState {
ends_mid_turn: true,
active_turn_id: None,
active_turn_start_index: None,
},
);
assert_eq!(
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
serde_json::to_value(items[..2].to_vec()).unwrap()
);
}
#[test]
fn fork_thread_accepts_legacy_usize_snapshot_argument() {
fn assert_legacy_snapshot_callsite(
manager: &ThreadManager,
config: Config,
path: std::path::PathBuf,
) {
let _future = manager.fork_thread(
usize::MAX,
config,
path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
);
}
let _: fn(&ThreadManager, Config, std::path::PathBuf) = assert_legacy_snapshot_callsite;
}
#[test]
fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
let items = vec![
RolloutItem::ResponseItem(user_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-2".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
RolloutItem::ResponseItem(user_msg("u2")),
RolloutItem::ResponseItem(assistant_msg("partial")),
];
let snapshot_state = snapshot_turn_state(&InitialHistory::Forked(items.clone()));
assert_eq!(
snapshot_state,
SnapshotTurnState {
ends_mid_turn: true,
active_turn_id: Some("turn-2".to_string()),
active_turn_start_index: Some(2),
},
);
let truncated = truncate_before_nth_user_message(
InitialHistory::Forked(items.clone()),
usize::MAX,
&snapshot_state,
);
assert_eq!(
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
serde_json::to_value(items[..2].to_vec()).unwrap()
);
}
#[tokio::test]
@@ -104,7 +207,15 @@ async fn ignores_session_prefix_messages_when_truncating() {
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let truncated = truncate_before_nth_user_message(
InitialHistory::Forked(rollout_items),
1,
&SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
},
);
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![
@@ -185,3 +296,417 @@ async fn new_uses_configured_openai_provider_for_model_refresh() {
let _ = manager.list_models(RefreshStrategy::Online).await;
assert_eq!(models_mock.requests().len(), 1);
}
#[test]
fn interrupted_fork_snapshot_appends_interrupt_boundary() {
let committed_history =
InitialHistory::Forked(vec![RolloutItem::ResponseItem(user_msg("hello"))]);
assert_eq!(
serde_json::to_value(append_interrupted_boundary(committed_history).get_rollout_items())
.expect("serialize interrupted fork history"),
serde_json::to_value(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: None,
reason: TurnAbortReason::Interrupted,
})),
])
.expect("serialize expected interrupted fork history"),
);
assert_eq!(
serde_json::to_value(append_interrupted_boundary(InitialHistory::New).get_rollout_items())
.expect("serialize interrupted empty fork history"),
serde_json::to_value(vec![
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: None,
reason: TurnAbortReason::Interrupted,
})),
])
.expect("serialize expected interrupted empty history"),
);
}
#[test]
fn interrupted_snapshot_is_not_mid_turn() {
let interrupted_history = InitialHistory::Forked(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::ResponseItem(assistant_msg("partial")),
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::Interrupted,
})),
]);
assert_eq!(
snapshot_turn_state(&interrupted_history),
SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
},
);
}
#[test]
fn completed_legacy_event_history_is_not_mid_turn() {
let completed_history = InitialHistory::Forked(vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".to_string(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "done".to_string(),
phase: None,
memory_citation: None,
})),
]);
assert_eq!(
snapshot_turn_state(&completed_history),
SnapshotTurnState {
ends_mid_turn: false,
active_turn_id: None,
active_turn_start_index: None,
},
);
}
#[test]
fn mixed_response_and_legacy_user_event_history_is_mid_turn() {
let mixed_history = InitialHistory::Forked(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".to_string(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
]);
assert_eq!(
snapshot_turn_state(&mixed_history),
SnapshotTurnState {
ends_mid_turn: true,
active_turn_id: None,
active_turn_start_index: None,
},
);
}
#[tokio::test]
async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_history() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config();
config.codex_home = temp_dir.path().join("codex-home");
config.cwd = config.codex_home.clone();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig::default(),
);
let source = manager
.resume_thread_with_history(
config.clone(),
InitialHistory::Forked(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("create source thread from completed history");
let source_path = source
.thread
.rollout_path()
.expect("source rollout path should exist");
let source_history = RolloutRecorder::get_rollout_history(&source_path)
.await
.expect("read source rollout history");
let source_snapshot_state = snapshot_turn_state(&source_history);
assert!(source_snapshot_state.ends_mid_turn);
let expected_turn_id = source_snapshot_state.active_turn_id.clone();
assert_eq!(expected_turn_id, None);
let forked = manager
.fork_thread(
ForkSnapshot::Interrupted,
config,
source_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork interrupted snapshot");
let forked_path = forked
.thread
.rollout_path()
.expect("forked rollout path should exist");
let history = RolloutRecorder::get_rollout_history(&forked_path)
.await
.expect("read forked rollout history");
assert!(!snapshot_turn_state(&history).ends_mid_turn);
let rollout_items: Vec<_> = history
.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect();
let interrupted_marker_json =
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
.expect("serialize interrupted marker");
let interrupted_abort_json = serde_json::to_value(RolloutItem::EventMsg(
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: expected_turn_id,
reason: TurnAbortReason::Interrupted,
}),
))
.expect("serialize interrupted abort event");
assert_eq!(
rollout_items
.iter()
.filter(|item| {
serde_json::to_value(item).expect("serialize rollout item")
== interrupted_marker_json
})
.count(),
1,
);
assert_eq!(
rollout_items
.iter()
.filter(|item| {
serde_json::to_value(item).expect("serialize rollout item")
== interrupted_abort_json
})
.count(),
1,
);
}
#[tokio::test]
async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config();
config.codex_home = temp_dir.path().join("codex-home");
config.cwd = config.codex_home.clone();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig::default(),
);
let source = manager
.resume_thread_with_history(
config.clone(),
InitialHistory::Forked(vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-explicit".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("create source thread from explicit partial history");
let source_path = source
.thread
.rollout_path()
.expect("source rollout path should exist");
let source_history = RolloutRecorder::get_rollout_history(&source_path)
.await
.expect("read source rollout history");
let source_snapshot_state = snapshot_turn_state(&source_history);
assert_eq!(
source_snapshot_state,
SnapshotTurnState {
ends_mid_turn: true,
active_turn_id: Some("turn-explicit".to_string()),
active_turn_start_index: Some(1),
},
);
let forked = manager
.fork_thread(
ForkSnapshot::Interrupted,
config,
source_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork interrupted snapshot");
let forked_path = forked
.thread
.rollout_path()
.expect("forked rollout path should exist");
let history = RolloutRecorder::get_rollout_history(&forked_path)
.await
.expect("read forked rollout history");
let rollout_items: Vec<_> = history
.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect();
assert!(rollout_items.iter().any(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(turn_id),
reason: TurnAbortReason::Interrupted,
})) if turn_id == "turn-explicit"
)
}));
}
#[tokio::test]
async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_source() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config();
config.codex_home = temp_dir.path().join("codex-home");
config.cwd = config.codex_home.clone();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig::default(),
);
let source = manager
.resume_thread_with_history(
config.clone(),
InitialHistory::Forked(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("create source thread from partial history");
let source_path = source
.thread
.rollout_path()
.expect("source rollout path should exist");
let source_history = RolloutRecorder::get_rollout_history(&source_path)
.await
.expect("read source rollout history");
assert!(snapshot_turn_state(&source_history).ends_mid_turn);
manager.remove_thread(&source.thread_id).await;
let forked = manager
.fork_thread(
ForkSnapshot::Interrupted,
config.clone(),
source_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork interrupted snapshot");
let forked_path = forked
.thread
.rollout_path()
.expect("forked rollout path should exist");
let history = RolloutRecorder::get_rollout_history(&forked_path)
.await
.expect("read forked rollout history");
assert!(!snapshot_turn_state(&history).ends_mid_turn);
let forked_rollout_items: Vec<_> = history
.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect();
let interrupted_marker_json =
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
.expect("serialize interrupted marker");
assert_eq!(
forked_rollout_items
.iter()
.filter(|item| {
serde_json::to_value(item).expect("serialize forked rollout item")
== interrupted_marker_json
})
.count(),
1,
);
manager.remove_thread(&forked.thread_id).await;
let reforked = manager
.fork_thread(
ForkSnapshot::Interrupted,
config,
forked_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("re-fork interrupted snapshot");
let reforked_path = reforked
.thread
.rollout_path()
.expect("re-forked rollout path should exist");
let reforked_history = RolloutRecorder::get_rollout_history(&reforked_path)
.await
.expect("read re-forked rollout history");
let reforked_rollout_items: Vec<_> = reforked_history
.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect();
assert_eq!(
reforked_rollout_items
.iter()
.filter(|item| {
serde_json::to_value(item).expect("serialize re-forked rollout item")
== interrupted_marker_json
})
.count(),
1,
);
assert_eq!(
reforked_rollout_items
.iter()
.filter(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
..
}))
)
})
.count(),
1,
);
}

View File

@@ -12,6 +12,7 @@ use super::compact::FIRST_REPLY;
use super::compact::SUMMARY_TEXT;
use anyhow::Result;
use codex_core::CodexThread;
use codex_core::ForkSnapshot;
use codex_core::ThreadManager;
use codex_core::compact::SUMMARIZATION_PROMPT;
use codex_core::config::Config;
@@ -383,8 +384,13 @@ async fn compact_resume_after_second_compaction_preserves_history() -> Result<()
let seeded_user_prefix = &first_request_user_texts[..first_turn_user_index];
let summary_after_second_compact =
extract_summary_user_text(&requests[requests.len() - 3], SUMMARY_TEXT);
let mut expected_after_second_compact_user_texts =
vec!["AFTER_FORK".to_string(), summary_after_second_compact];
let mut expected_after_second_compact_user_texts = vec![
"hello world".to_string(),
"AFTER_COMPACT".to_string(),
"AFTER_RESUME".to_string(),
"AFTER_FORK".to_string(),
summary_after_second_compact,
];
expected_after_second_compact_user_texts.extend_from_slice(seeded_user_prefix);
expected_after_second_compact_user_texts.push("AFTER_COMPACT_2".to_string());
let final_user_texts = json_message_input_texts(&requests[requests.len() - 1], "user");
@@ -841,8 +847,14 @@ async fn fork_thread(
path: std::path::PathBuf,
nth_user_message: usize,
) -> Arc<CodexThread> {
Box::pin(manager.fork_thread(nth_user_message, config.clone(), path, false, None))
.await
.expect("fork conversation")
.thread
Box::pin(manager.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message),
config.clone(),
path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
))
.await
.expect("fork conversation")
.thread
}

View File

@@ -1,3 +1,4 @@
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::parse_turn_item;
use codex_protocol::items::TurnItem;
@@ -110,7 +111,13 @@ async fn fork_thread_twice_drops_to_first_message() {
thread: codex_fork1,
..
} = thread_manager
.fork_thread(1, config_for_fork.clone(), base_path.clone(), false, None)
.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(1),
config_for_fork.clone(),
base_path.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork 1");
@@ -129,7 +136,13 @@ async fn fork_thread_twice_drops_to_first_message() {
thread: codex_fork2,
..
} = thread_manager
.fork_thread(0, config_for_fork.clone(), fork1_path.clone(), false, None)
.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(0),
config_for_fork.clone(),
fork1_path.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork 2");

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use codex_core::ForkSnapshot;
use codex_core::config::Constrained;
use codex_execpolicy::Policy;
use codex_protocol::models::DeveloperInstructions;
@@ -419,7 +420,13 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
fork_config.permissions.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted);
let forked = initial
.thread_manager
.fork_thread(usize::MAX, fork_config, rollout_path, false, None)
.fork_thread(
ForkSnapshot::Interrupted,
fork_config,
rollout_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
forked
.thread

View File

@@ -531,7 +531,9 @@ async fn shell_command_snapshot_still_intercepts_apply_patch() -> Result<()> {
let script = "apply_patch <<'EOF'\n*** Begin Patch\n*** Add File: snapshot-apply.txt\n+hello from snapshot\n*** End Patch\nEOF\n";
let args = json!({
"command": script,
"timeout_ms": 1_000,
// The intercepted apply_patch path self-invokes codex, which can take
// longer than a second in Bazel macOS test environments.
"timeout_ms": 5_000,
});
let call_id = "shell-snapshot-apply-patch";
let responses = vec![

View File

@@ -57,6 +57,7 @@ use codex_app_server_protocol::RequestId;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::ForkSnapshot;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
@@ -2502,7 +2503,7 @@ impl App {
);
let forked = thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::Interrupted,
config.clone(),
target_session.path.clone(),
/*persist_extended_history*/ false,
@@ -2925,7 +2926,7 @@ impl App {
match self
.server
.fork_thread(
usize::MAX,
ForkSnapshot::Interrupted,
self.config.clone(),
path.clone(),
/*persist_extended_history*/ false,