mirror of
https://github.com/openai/codex.git
synced 2026-04-05 06:51:44 +03:00
Compare commits
19 Commits
codex-appl
...
ccunningha
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37cd45f0fb | ||
|
|
8b801eb996 | ||
|
|
f3b01e150b | ||
|
|
846f01d1b2 | ||
|
|
0909852df1 | ||
|
|
d2f1845358 | ||
|
|
a3426a1aba | ||
|
|
c67dacf1c0 | ||
|
|
17f743eb86 | ||
|
|
96fe0ed92d | ||
|
|
ab1eb798ff | ||
|
|
cb98305eab | ||
|
|
a704387e8e | ||
|
|
91487b7b9e | ||
|
|
f2099ad6b9 | ||
|
|
e5e3a1788c | ||
|
|
095e8bcf2f | ||
|
|
51ee2891f5 | ||
|
|
756f4a3ecf |
@@ -74,6 +74,8 @@ pub struct ThreadHistoryBuilder {
|
||||
turns: Vec<Turn>,
|
||||
current_turn: Option<PendingTurn>,
|
||||
next_item_index: i64,
|
||||
current_rollout_index: usize,
|
||||
next_rollout_index: usize,
|
||||
}
|
||||
|
||||
impl Default for ThreadHistoryBuilder {
|
||||
@@ -88,6 +90,8 @@ impl ThreadHistoryBuilder {
|
||||
turns: Vec::new(),
|
||||
current_turn: None,
|
||||
next_item_index: 1,
|
||||
current_rollout_index: 0,
|
||||
next_rollout_index: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,6 +115,19 @@ impl ThreadHistoryBuilder {
|
||||
self.current_turn.is_some()
|
||||
}
|
||||
|
||||
pub fn active_turn_id_if_explicit(&self) -> Option<String> {
|
||||
self.current_turn
|
||||
.as_ref()
|
||||
.filter(|turn| turn.opened_explicitly)
|
||||
.map(|turn| turn.id.clone())
|
||||
}
|
||||
|
||||
pub fn active_turn_start_index(&self) -> Option<usize> {
|
||||
self.current_turn
|
||||
.as_ref()
|
||||
.map(|turn| turn.rollout_start_index)
|
||||
}
|
||||
|
||||
/// Shared reducer for persisted rollout replay and in-memory current-turn
|
||||
/// tracking used by running thread resume/rejoin.
|
||||
///
|
||||
@@ -182,6 +199,8 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
|
||||
self.current_rollout_index = self.next_rollout_index;
|
||||
self.next_rollout_index += 1;
|
||||
match item {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
@@ -974,6 +993,7 @@ impl ThreadHistoryBuilder {
|
||||
status: TurnStatus::Completed,
|
||||
opened_explicitly: false,
|
||||
saw_compaction: false,
|
||||
rollout_start_index: self.current_rollout_index,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1137,6 +1157,8 @@ struct PendingTurn {
|
||||
/// True when this turn includes a persisted `RolloutItem::Compacted`, which
|
||||
/// should keep the turn from being dropped even without normal items.
|
||||
saw_compaction: bool,
|
||||
/// Index of the rollout item that opened this turn during replay.
|
||||
rollout_start_index: usize,
|
||||
}
|
||||
|
||||
impl PendingTurn {
|
||||
|
||||
@@ -125,7 +125,7 @@ Example with notification opt-out:
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
@@ -240,7 +240,7 @@ Example:
|
||||
{ "id": 11, "result": { "thread": { "id": "thr_123", … } } }
|
||||
```
|
||||
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. Pass `ephemeral: true` when the fork should stay in-memory only:
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only:
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123", "ephemeral": true } }
|
||||
|
||||
@@ -183,6 +183,7 @@ use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::Cursor as RolloutCursor;
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::RolloutRecorder;
|
||||
use codex_core::SessionMeta;
|
||||
@@ -4039,7 +4040,7 @@ impl CodexMessageProcessor {
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
persist_extended_history,
|
||||
@@ -6508,7 +6509,7 @@ impl CodexMessageProcessor {
|
||||
} = self
|
||||
.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
rollout_path,
|
||||
/*persist_extended_history*/ false,
|
||||
|
||||
@@ -97,6 +97,7 @@ mod seatbelt_permissions;
|
||||
mod thread_manager;
|
||||
pub mod web_search;
|
||||
pub mod windows_sandbox_read_grants;
|
||||
pub use thread_manager::ForkSnapshot;
|
||||
pub use thread_manager::NewThread;
|
||||
pub use thread_manager::ThreadManager;
|
||||
#[deprecated(note = "use ThreadManager")]
|
||||
|
||||
@@ -46,8 +46,8 @@ pub(crate) fn user_message_positions_in_rollout(items: &[RolloutItem]) -> Vec<us
|
||||
/// a prefix that excludes the first user message and everything after it).
|
||||
///
|
||||
/// If `n_from_start` is `usize::MAX`, this returns the full rollout (no truncation).
|
||||
/// If fewer than or equal to `n_from_start` user messages exist, this returns an empty
|
||||
/// vector (out of range).
|
||||
/// If fewer than or equal to `n_from_start` user messages exist, this returns the full
|
||||
/// rollout unchanged.
|
||||
pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items: &[RolloutItem],
|
||||
n_from_start: usize,
|
||||
@@ -58,9 +58,9 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
|
||||
let user_positions = user_message_positions_in_rollout(items);
|
||||
|
||||
// If fewer than or equal to n user messages exist, treat as empty (out of range).
|
||||
// If fewer than or equal to n user messages exist, keep the full rollout.
|
||||
if user_positions.len() <= n_from_start {
|
||||
return Vec::new();
|
||||
return items.to_vec();
|
||||
}
|
||||
|
||||
// Cut strictly before the nth user message (do not keep the nth itself).
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
@@ -74,7 +73,10 @@ fn truncates_rollout_from_start_before_nth_user_only() {
|
||||
);
|
||||
|
||||
let truncated2 = truncate_rollout_before_nth_user_message_from_start(&rollout, 2);
|
||||
assert_matches!(truncated2.as_slice(), []);
|
||||
assert_eq!(
|
||||
serde_json::to_value(&truncated2).unwrap(),
|
||||
serde_json::to_value(&rollout).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -22,6 +22,7 @@ use tracing::warn;
|
||||
use crate::AuthManager;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::contextual_user_message::TURN_ABORTED_CLOSE_TAG;
|
||||
use crate::contextual_user_message::TURN_ABORTED_OPEN_TAG;
|
||||
use crate::hook_runtime::PendingInputHookDisposition;
|
||||
use crate::hook_runtime::inspect_pending_input;
|
||||
@@ -60,6 +61,22 @@ pub(crate) use user_shell::execute_user_shell_command;
|
||||
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
|
||||
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
|
||||
|
||||
/// Shared model-visible marker used by both the real interrupt path and
|
||||
/// interrupted fork snapshots.
|
||||
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: format!(
|
||||
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n{TURN_ABORTED_CLOSE_TAG}"
|
||||
),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_turn_network_proxy_metric(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
network_proxy_active: bool,
|
||||
@@ -457,17 +474,7 @@ impl Session {
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
self.cleanup_after_interrupt(&task.turn_context).await;
|
||||
|
||||
let marker = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: format!(
|
||||
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n</turn_aborted>"
|
||||
),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
};
|
||||
let marker = interrupted_turn_history_marker();
|
||||
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
|
||||
.await;
|
||||
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
|
||||
|
||||
@@ -24,6 +24,9 @@ use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::truncation;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
#[cfg(test)]
|
||||
@@ -34,6 +37,8 @@ use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
@@ -126,6 +131,45 @@ pub struct NewThread {
|
||||
pub session_configured: SessionConfiguredEvent,
|
||||
}
|
||||
|
||||
// TODO(ccunningham): Add an explicit non-interrupting live-turn snapshot once
|
||||
// core can represent sampling boundaries directly instead of relying on
|
||||
// whichever items happened to be persisted mid-turn.
|
||||
//
|
||||
// Two likely future variants:
|
||||
// - `TruncateToLastSamplingBoundary` for callers that want a coherent fork from
|
||||
// the last stable model boundary without synthesizing an interrupt.
|
||||
// - `WaitUntilNextSamplingBoundary` (or similar) for callers that prefer to
|
||||
// fork after the next sampling boundary rather than interrupting immediately.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ForkSnapshot {
|
||||
/// Fork a committed prefix ending strictly before the nth user message.
|
||||
///
|
||||
/// When `n` is within range, this cuts before that 0-based user-message
|
||||
/// boundary. When `n` is out of range and the source thread is currently
|
||||
/// mid-turn, this instead cuts before the active turn's opening boundary
|
||||
/// so the fork drops the unfinished turn suffix. When `n` is out of range
|
||||
/// and the source thread is already at a turn boundary, this returns the
|
||||
/// full committed history unchanged.
|
||||
TruncateBeforeNthUserMessage(usize),
|
||||
|
||||
/// Fork the current persisted history as if the source thread had been
|
||||
/// interrupted now.
|
||||
///
|
||||
/// If the persisted snapshot ends mid-turn, this appends the same
|
||||
/// `<turn_aborted>` marker produced by a real interrupt. If the snapshot is
|
||||
/// already at a turn boundary, this returns the current persisted history
|
||||
/// unchanged.
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
/// Preserve legacy `fork_thread(usize, ...)` callsites by mapping them to the
|
||||
/// existing truncate-before-nth-user-message snapshot mode.
|
||||
impl From<usize> for ForkSnapshot {
|
||||
fn from(value: usize) -> Self {
|
||||
Self::TruncateBeforeNthUserMessage(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq)]
|
||||
pub struct ThreadShutdownReport {
|
||||
pub completed: Vec<ThreadId>,
|
||||
@@ -541,20 +585,41 @@ impl ThreadManager {
|
||||
report
|
||||
}
|
||||
|
||||
/// Fork an existing thread by taking messages up to the given position (not including
|
||||
/// the message at the given position) and starting a new thread with identical
|
||||
/// configuration (unless overridden by the caller's `config`). The new thread will have
|
||||
/// a fresh id. Pass `usize::MAX` to keep the full rollout history.
|
||||
pub async fn fork_thread(
|
||||
/// Fork an existing thread by snapshotting rollout history according to
|
||||
/// `snapshot` and starting a new thread with identical configuration
|
||||
/// (unless overridden by the caller's `config`). The new thread will have
|
||||
/// a fresh id.
|
||||
pub async fn fork_thread<S>(
|
||||
&self,
|
||||
nth_user_message: usize,
|
||||
snapshot: S,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
persist_extended_history: bool,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<NewThread> {
|
||||
) -> CodexResult<NewThread>
|
||||
where
|
||||
S: Into<ForkSnapshot>,
|
||||
{
|
||||
let snapshot = snapshot.into();
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
let snapshot_state = snapshot_turn_state(&history);
|
||||
let history = match snapshot {
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
|
||||
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
|
||||
}
|
||||
ForkSnapshot::Interrupted => {
|
||||
let history = match history {
|
||||
InitialHistory::New => InitialHistory::New,
|
||||
InitialHistory::Forked(history) => InitialHistory::Forked(history),
|
||||
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
|
||||
};
|
||||
if snapshot_state.ends_mid_turn {
|
||||
append_interrupted_boundary(history, snapshot_state.active_turn_id)
|
||||
} else {
|
||||
history
|
||||
}
|
||||
}
|
||||
};
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
history,
|
||||
@@ -838,11 +903,31 @@ impl ThreadManagerState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
|
||||
/// (0-based) and all items that follow it.
|
||||
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
|
||||
/// Return a fork snapshot cut strictly before the nth user message (0-based).
|
||||
///
|
||||
/// Out-of-range values keep the full committed history at a turn boundary, but
|
||||
/// when the source thread is currently mid-turn they fall back to cutting
|
||||
/// before the active turn's opening boundary so the fork omits the unfinished
|
||||
/// suffix entirely.
|
||||
fn truncate_before_nth_user_message(
|
||||
history: InitialHistory,
|
||||
n: usize,
|
||||
snapshot_state: &SnapshotTurnState,
|
||||
) -> InitialHistory {
|
||||
let items: Vec<RolloutItem> = history.get_rollout_items();
|
||||
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
|
||||
let user_positions = truncation::user_message_positions_in_rollout(&items);
|
||||
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
|
||||
if let Some(cut_idx) = snapshot_state
|
||||
.active_turn_start_index
|
||||
.or_else(|| user_positions.last().copied())
|
||||
{
|
||||
items[..cut_idx].to_vec()
|
||||
} else {
|
||||
items
|
||||
}
|
||||
} else {
|
||||
truncation::truncate_rollout_before_nth_user_message_from_start(&items, n)
|
||||
};
|
||||
|
||||
if rolled.is_empty() {
|
||||
InitialHistory::New
|
||||
@@ -851,6 +936,107 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct SnapshotTurnState {
|
||||
ends_mid_turn: bool,
|
||||
active_turn_id: Option<String>,
|
||||
active_turn_start_index: Option<usize>,
|
||||
}
|
||||
|
||||
fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState {
|
||||
let rollout_items = history.get_rollout_items();
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in &rollout_items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
let active_turn_id = builder.active_turn_id_if_explicit();
|
||||
if builder.has_active_turn() && active_turn_id.is_some() {
|
||||
let active_turn_snapshot = builder.active_turn_snapshot();
|
||||
if active_turn_snapshot
|
||||
.as_ref()
|
||||
.is_some_and(|turn| turn.status != TurnStatus::InProgress)
|
||||
{
|
||||
return SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
};
|
||||
}
|
||||
|
||||
return SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id,
|
||||
active_turn_start_index: builder.active_turn_start_index(),
|
||||
};
|
||||
}
|
||||
|
||||
let Some(last_user_position) = truncation::user_message_positions_in_rollout(&rollout_items)
|
||||
.last()
|
||||
.copied()
|
||||
else {
|
||||
return SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
};
|
||||
};
|
||||
|
||||
// Synthetic fork/resume histories can contain user/assistant response items
|
||||
// without explicit turn lifecycle events. If the persisted snapshot has no
|
||||
// terminating boundary after its last user message, treat it as mid-turn.
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: !rollout_items[last_user_position + 1..].iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_))
|
||||
)
|
||||
}),
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Append the same persisted interrupt boundary used by the live interrupt path
|
||||
/// to an existing fork snapshot after the source thread has been confirmed to
|
||||
/// be mid-turn.
|
||||
fn append_interrupted_boundary(history: InitialHistory) -> InitialHistory {
|
||||
let turn_id = {
|
||||
let rollout_items = history.get_rollout_items();
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in &rollout_items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
if builder.has_active_turn() {
|
||||
builder.active_turn_snapshot().map(|turn| turn.id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}));
|
||||
|
||||
match history {
|
||||
InitialHistory::New => InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
aborted_event,
|
||||
]),
|
||||
InitialHistory::Forked(mut history) => {
|
||||
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
|
||||
history.push(aborted_event);
|
||||
InitialHistory::Forked(history)
|
||||
}
|
||||
InitialHistory::Resumed(mut resumed) => {
|
||||
resumed
|
||||
.history
|
||||
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
|
||||
resumed.history.push(aborted_event);
|
||||
InitialHistory::Forked(resumed.history)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "thread_manager_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -3,11 +3,15 @@ use crate::codex::make_session_and_context;
|
||||
use crate::config::test_config;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::models_manager::manager::RefreshStrategy;
|
||||
use assert_matches::assert_matches;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use core_test_support::responses::mount_models_once;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
@@ -38,7 +42,7 @@ fn assistant_msg(text: &str) -> ResponseItem {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drops_from_last_user_only() {
|
||||
fn truncates_before_requested_user_message() {
|
||||
let items = [
|
||||
user_msg("u1"),
|
||||
assistant_msg("a1"),
|
||||
@@ -68,7 +72,15 @@ fn drops_from_last_user_only() {
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
InitialHistory::Forked(initial),
|
||||
1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
let got_items = truncated.get_rollout_items();
|
||||
let expected_items = vec![
|
||||
RolloutItem::ResponseItem(items[0].clone()),
|
||||
@@ -85,8 +97,99 @@ fn drops_from_last_user_only() {
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
|
||||
assert_matches!(truncated2, InitialHistory::New);
|
||||
let truncated2 = truncate_before_nth_user_message(
|
||||
InitialHistory::Forked(initial2.clone()),
|
||||
2,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated2.get_rollout_items()).unwrap(),
|
||||
serde_json::to_value(initial2).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn out_of_range_truncation_drops_only_unfinished_suffix_mid_turn() {
|
||||
let items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::ResponseItem(user_msg("u2")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
];
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
|
||||
serde_json::to_value(items[..2].to_vec()).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_thread_accepts_legacy_usize_snapshot_argument() {
|
||||
fn assert_legacy_snapshot_callsite(
|
||||
manager: &ThreadManager,
|
||||
config: Config,
|
||||
path: std::path::PathBuf,
|
||||
) {
|
||||
let _future = manager.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
);
|
||||
}
|
||||
|
||||
let _: fn(&ThreadManager, Config, std::path::PathBuf) = assert_legacy_snapshot_callsite;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
|
||||
let items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-2".to_string(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::ResponseItem(user_msg("u2")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
];
|
||||
|
||||
let snapshot_state = snapshot_turn_state(&InitialHistory::Forked(items.clone()));
|
||||
assert_eq!(
|
||||
snapshot_state,
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: Some("turn-2".to_string()),
|
||||
active_turn_start_index: Some(2),
|
||||
},
|
||||
);
|
||||
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
InitialHistory::Forked(items.clone()),
|
||||
usize::MAX,
|
||||
&snapshot_state,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(truncated.get_rollout_items()).unwrap(),
|
||||
serde_json::to_value(items[..2].to_vec()).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -104,7 +207,15 @@ async fn ignores_session_prefix_messages_when_truncating() {
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
|
||||
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
|
||||
let truncated = truncate_before_nth_user_message(
|
||||
InitialHistory::Forked(rollout_items),
|
||||
1,
|
||||
&SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
let got_items = truncated.get_rollout_items();
|
||||
|
||||
let expected: Vec<RolloutItem> = vec![
|
||||
@@ -185,3 +296,417 @@ async fn new_uses_configured_openai_provider_for_model_refresh() {
|
||||
let _ = manager.list_models(RefreshStrategy::Online).await;
|
||||
assert_eq!(models_mock.requests().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interrupted_fork_snapshot_appends_interrupt_boundary() {
|
||||
let committed_history =
|
||||
InitialHistory::Forked(vec![RolloutItem::ResponseItem(user_msg("hello"))]);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(append_interrupted_boundary(committed_history).get_rollout_items())
|
||||
.expect("serialize interrupted fork history"),
|
||||
serde_json::to_value(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: None,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
})),
|
||||
])
|
||||
.expect("serialize expected interrupted fork history"),
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(append_interrupted_boundary(InitialHistory::New).get_rollout_items())
|
||||
.expect("serialize interrupted empty fork history"),
|
||||
serde_json::to_value(vec![
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: None,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
})),
|
||||
])
|
||||
.expect("serialize expected interrupted empty history"),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interrupted_snapshot_is_not_mid_turn() {
|
||||
let interrupted_history = InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some("turn-1".to_string()),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
})),
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
snapshot_turn_state(&interrupted_history),
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn completed_legacy_event_history_is_not_mid_turn() {
|
||||
let completed_history = InitialHistory::Forked(vec![
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hello".to_string(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "done".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
snapshot_turn_state(&completed_history),
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: false,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mixed_response_and_legacy_user_event_history_is_mid_turn() {
|
||||
let mixed_history = InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hello".to_string(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
snapshot_turn_state(&mixed_history),
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: None,
|
||||
active_turn_start_index: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_history() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config();
|
||||
config.codex_home = temp_dir.path().join("codex-home");
|
||||
config.cwd = config.codex_home.clone();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
CollaborationModesConfig::default(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
config.clone(),
|
||||
InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
]),
|
||||
auth_manager,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("create source thread from completed history");
|
||||
let source_path = source
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("source rollout path should exist");
|
||||
let source_history = RolloutRecorder::get_rollout_history(&source_path)
|
||||
.await
|
||||
.expect("read source rollout history");
|
||||
let source_snapshot_state = snapshot_turn_state(&source_history);
|
||||
assert!(source_snapshot_state.ends_mid_turn);
|
||||
let expected_turn_id = source_snapshot_state.active_turn_id.clone();
|
||||
assert_eq!(expected_turn_id, None);
|
||||
|
||||
let forked = manager
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
source_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork interrupted snapshot");
|
||||
let forked_path = forked
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("forked rollout path should exist");
|
||||
let history = RolloutRecorder::get_rollout_history(&forked_path)
|
||||
.await
|
||||
.expect("read forked rollout history");
|
||||
assert!(!snapshot_turn_state(&history).ends_mid_turn);
|
||||
let rollout_items: Vec<_> = history
|
||||
.get_rollout_items()
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
let interrupted_marker_json =
|
||||
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
|
||||
.expect("serialize interrupted marker");
|
||||
let interrupted_abort_json = serde_json::to_value(RolloutItem::EventMsg(
|
||||
EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: expected_turn_id,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}),
|
||||
))
|
||||
.expect("serialize interrupted abort event");
|
||||
assert_eq!(
|
||||
rollout_items
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
serde_json::to_value(item).expect("serialize rollout item")
|
||||
== interrupted_marker_json
|
||||
})
|
||||
.count(),
|
||||
1,
|
||||
);
|
||||
assert_eq!(
|
||||
rollout_items
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
serde_json::to_value(item).expect("serialize rollout item")
|
||||
== interrupted_abort_json
|
||||
})
|
||||
.count(),
|
||||
1,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config();
|
||||
config.codex_home = temp_dir.path().join("codex-home");
|
||||
config.cwd = config.codex_home.clone();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
CollaborationModesConfig::default(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
config.clone(),
|
||||
InitialHistory::Forked(vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-explicit".to_string(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
]),
|
||||
auth_manager,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("create source thread from explicit partial history");
|
||||
let source_path = source
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("source rollout path should exist");
|
||||
let source_history = RolloutRecorder::get_rollout_history(&source_path)
|
||||
.await
|
||||
.expect("read source rollout history");
|
||||
let source_snapshot_state = snapshot_turn_state(&source_history);
|
||||
assert_eq!(
|
||||
source_snapshot_state,
|
||||
SnapshotTurnState {
|
||||
ends_mid_turn: true,
|
||||
active_turn_id: Some("turn-explicit".to_string()),
|
||||
active_turn_start_index: Some(1),
|
||||
},
|
||||
);
|
||||
|
||||
let forked = manager
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
source_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork interrupted snapshot");
|
||||
let forked_path = forked
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("forked rollout path should exist");
|
||||
let history = RolloutRecorder::get_rollout_history(&forked_path)
|
||||
.await
|
||||
.expect("read forked rollout history");
|
||||
let rollout_items: Vec<_> = history
|
||||
.get_rollout_items()
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
|
||||
assert!(rollout_items.iter().any(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some(turn_id),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
})) if turn_id == "turn-explicit"
|
||||
)
|
||||
}));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_source() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config();
|
||||
config.codex_home = temp_dir.path().join("codex-home");
|
||||
config.cwd = config.codex_home.clone();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
CollaborationModesConfig::default(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
.resume_thread_with_history(
|
||||
config.clone(),
|
||||
InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
]),
|
||||
auth_manager,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("create source thread from partial history");
|
||||
let source_path = source
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("source rollout path should exist");
|
||||
let source_history = RolloutRecorder::get_rollout_history(&source_path)
|
||||
.await
|
||||
.expect("read source rollout history");
|
||||
assert!(snapshot_turn_state(&source_history).ends_mid_turn);
|
||||
manager.remove_thread(&source.thread_id).await;
|
||||
|
||||
let forked = manager
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
config.clone(),
|
||||
source_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork interrupted snapshot");
|
||||
let forked_path = forked
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("forked rollout path should exist");
|
||||
let history = RolloutRecorder::get_rollout_history(&forked_path)
|
||||
.await
|
||||
.expect("read forked rollout history");
|
||||
assert!(!snapshot_turn_state(&history).ends_mid_turn);
|
||||
|
||||
let forked_rollout_items: Vec<_> = history
|
||||
.get_rollout_items()
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
let interrupted_marker_json =
|
||||
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
|
||||
.expect("serialize interrupted marker");
|
||||
assert_eq!(
|
||||
forked_rollout_items
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
serde_json::to_value(item).expect("serialize forked rollout item")
|
||||
== interrupted_marker_json
|
||||
})
|
||||
.count(),
|
||||
1,
|
||||
);
|
||||
|
||||
manager.remove_thread(&forked.thread_id).await;
|
||||
let reforked = manager
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
forked_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("re-fork interrupted snapshot");
|
||||
let reforked_path = reforked
|
||||
.thread
|
||||
.rollout_path()
|
||||
.expect("re-forked rollout path should exist");
|
||||
let reforked_history = RolloutRecorder::get_rollout_history(&reforked_path)
|
||||
.await
|
||||
.expect("read re-forked rollout history");
|
||||
let reforked_rollout_items: Vec<_> = reforked_history
|
||||
.get_rollout_items()
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
|
||||
assert_eq!(
|
||||
reforked_rollout_items
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
serde_json::to_value(item).expect("serialize re-forked rollout item")
|
||||
== interrupted_marker_json
|
||||
})
|
||||
.count(),
|
||||
1,
|
||||
);
|
||||
assert_eq!(
|
||||
reforked_rollout_items
|
||||
.iter()
|
||||
.filter(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
..
|
||||
}))
|
||||
)
|
||||
})
|
||||
.count(),
|
||||
1,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use super::compact::FIRST_REPLY;
|
||||
use super::compact::SUMMARY_TEXT;
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::compact::SUMMARIZATION_PROMPT;
|
||||
use codex_core::config::Config;
|
||||
@@ -383,8 +384,13 @@ async fn compact_resume_after_second_compaction_preserves_history() -> Result<()
|
||||
let seeded_user_prefix = &first_request_user_texts[..first_turn_user_index];
|
||||
let summary_after_second_compact =
|
||||
extract_summary_user_text(&requests[requests.len() - 3], SUMMARY_TEXT);
|
||||
let mut expected_after_second_compact_user_texts =
|
||||
vec!["AFTER_FORK".to_string(), summary_after_second_compact];
|
||||
let mut expected_after_second_compact_user_texts = vec![
|
||||
"hello world".to_string(),
|
||||
"AFTER_COMPACT".to_string(),
|
||||
"AFTER_RESUME".to_string(),
|
||||
"AFTER_FORK".to_string(),
|
||||
summary_after_second_compact,
|
||||
];
|
||||
expected_after_second_compact_user_texts.extend_from_slice(seeded_user_prefix);
|
||||
expected_after_second_compact_user_texts.push("AFTER_COMPACT_2".to_string());
|
||||
let final_user_texts = json_message_input_texts(&requests[requests.len() - 1], "user");
|
||||
@@ -841,8 +847,14 @@ async fn fork_thread(
|
||||
path: std::path::PathBuf,
|
||||
nth_user_message: usize,
|
||||
) -> Arc<CodexThread> {
|
||||
Box::pin(manager.fork_thread(nth_user_message, config.clone(), path, false, None))
|
||||
.await
|
||||
.expect("fork conversation")
|
||||
.thread
|
||||
Box::pin(manager.fork_thread(
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message),
|
||||
config.clone(),
|
||||
path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
))
|
||||
.await
|
||||
.expect("fork conversation")
|
||||
.thread
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::parse_turn_item;
|
||||
use codex_protocol::items::TurnItem;
|
||||
@@ -110,7 +111,13 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
thread: codex_fork1,
|
||||
..
|
||||
} = thread_manager
|
||||
.fork_thread(1, config_for_fork.clone(), base_path.clone(), false, None)
|
||||
.fork_thread(
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(1),
|
||||
config_for_fork.clone(),
|
||||
base_path.clone(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork 1");
|
||||
|
||||
@@ -129,7 +136,13 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
thread: codex_fork2,
|
||||
..
|
||||
} = thread_manager
|
||||
.fork_thread(0, config_for_fork.clone(), fork1_path.clone(), false, None)
|
||||
.fork_thread(
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(0),
|
||||
config_for_fork.clone(),
|
||||
fork1_path.clone(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork 2");
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::config::Constrained;
|
||||
use codex_execpolicy::Policy;
|
||||
use codex_protocol::models::DeveloperInstructions;
|
||||
@@ -419,7 +420,13 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
|
||||
fork_config.permissions.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted);
|
||||
let forked = initial
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, fork_config, rollout_path, false, None)
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
fork_config,
|
||||
rollout_path,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await?;
|
||||
forked
|
||||
.thread
|
||||
|
||||
@@ -531,7 +531,9 @@ async fn shell_command_snapshot_still_intercepts_apply_patch() -> Result<()> {
|
||||
let script = "apply_patch <<'EOF'\n*** Begin Patch\n*** Add File: snapshot-apply.txt\n+hello from snapshot\n*** End Patch\nEOF\n";
|
||||
let args = json!({
|
||||
"command": script,
|
||||
"timeout_ms": 1_000,
|
||||
// The intercepted apply_patch path self-invokes codex, which can take
|
||||
// longer than a second in Bazel macOS test environments.
|
||||
"timeout_ms": 5_000,
|
||||
});
|
||||
let call_id = "shell-snapshot-apply-patch";
|
||||
let responses = vec![
|
||||
|
||||
@@ -57,6 +57,7 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::ForkSnapshot;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
@@ -2502,7 +2503,7 @@ impl App {
|
||||
);
|
||||
let forked = thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
ForkSnapshot::Interrupted,
|
||||
config.clone(),
|
||||
target_session.path.clone(),
|
||||
/*persist_extended_history*/ false,
|
||||
@@ -2925,7 +2926,7 @@ impl App {
|
||||
match self
|
||||
.server
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
ForkSnapshot::Interrupted,
|
||||
self.config.clone(),
|
||||
path.clone(),
|
||||
/*persist_extended_history*/ false,
|
||||
|
||||
Reference in New Issue
Block a user