Compare commits

...

8 Commits

Author SHA1 Message Date
Owen Lin
6b0a9ed548 fix lint 2026-01-30 16:29:32 -08:00
Owen Lin
6d657e0ebd fix again 2026-01-30 16:05:34 -08:00
Owen Lin
e7ddb311da fix test 2026-01-30 15:48:56 -08:00
Owen Lin
e43edf3caa fix imports 2026-01-30 15:31:41 -08:00
Owen Lin
da7ee5ce30 address 2026-01-30 15:26:06 -08:00
Owen Lin
cb8147d07d pr feedback 2026-01-30 15:11:42 -08:00
Owen Lin
638875028d fix 2026-01-30 14:46:46 -08:00
Owen Lin
7650c6d4da fix(app-server, core): fix thread rollbacks after compaction has occurred 2026-01-30 14:46:46 -08:00
7 changed files with 665 additions and 12 deletions

View File

@@ -1356,7 +1356,7 @@ pub struct ThreadListParams {
pub limit: Option<u32>,
/// Optional sort key; defaults to created_at.
pub sort_key: Option<ThreadSortKey>,
/// Optional provider filter; when set, only sessions recorded under these
/// Optional provider filter; when set, only threads recorded under these
/// providers are returned. When present but empty, includes all providers.
pub model_providers: Option<Vec<String>>,
/// Optional source filter; when set, only sessions from these source kinds

View File

@@ -84,7 +84,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `thread/rollback` — drop the last N user turns from the threads effective history based on the rollout stream. If the rollback crosses a compaction marker, the compaction is undone (pre-compaction turns are restored). Persists a rollback marker so future resumes see the same history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.

View File

@@ -168,6 +168,7 @@ use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::rollout::map_session_init_error;
use crate::rollout::metadata;
use crate::rollout::truncation::apply_rollbacks_to_rollout;
use crate::shell;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillError;
@@ -1754,8 +1755,9 @@ impl Session {
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> Vec<ResponseItem> {
let rollout_items = apply_rollbacks_to_rollout(rollout_items);
let mut history = ContextManager::new();
for item in rollout_items {
for item in &rollout_items {
match item {
RolloutItem::ResponseItem(response_item) => {
history.record_items(
@@ -1776,9 +1778,6 @@ impl Session {
history.replace(rebuilt);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
history.drop_last_n_user_turns(rollback.num_turns);
}
_ => {}
}
}
@@ -2520,7 +2519,10 @@ mod handlers {
use crate::mcp::collect_mcp_snapshot_from_manager;
use crate::mcp::effective_mcp_servers;
use crate::review_prompts::resolve_review_request;
use crate::rollout::RolloutRecorder;
use crate::rollout::session_index;
use crate::rollout::truncation::apply_rollbacks_to_rollout;
use crate::rollout::truncation::truncate_rollout_drop_last_n_user_turns;
use crate::tasks::CompactTask;
use crate::tasks::RegularTask;
use crate::tasks::UndoTask;
@@ -2975,13 +2977,36 @@ mod handlers {
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
let mut replaced = false;
sess.flush_rollout().await;
let mut history = sess.clone_history().await;
history.drop_last_n_user_turns(num_turns);
let rollout_path = {
let rollout = sess.services.rollout.lock().await;
rollout.as_ref().map(|rec| rec.rollout_path.clone())
};
// Replace with the raw items. We don't want to replace with a normalized
// version of the history.
sess.replace_history(history.raw_items().to_vec()).await;
if let Some(path) = rollout_path
&& let Ok(initial_history) = RolloutRecorder::get_rollout_history(path.as_path()).await
{
let rollout_items = initial_history.get_rollout_items();
let effective = apply_rollbacks_to_rollout(rollout_items.as_slice());
let truncated =
truncate_rollout_drop_last_n_user_turns(effective.as_slice(), num_turns);
let rebuilt = sess
.reconstruct_history_from_rollout(turn_context.as_ref(), &truncated)
.await;
sess.replace_history(rebuilt).await;
replaced = true;
}
if !replaced {
let mut history = sess.clone_history().await;
history.drop_last_n_user_turns(num_turns);
// Replace with the raw items. We don't want to replace with a normalized
// version of the history.
sess.replace_history(history.raw_items().to_vec()).await;
}
sess.recompute_token_usage(turn_context.as_ref()).await;
sess.send_event_raw_flushed(Event {
@@ -4502,6 +4527,8 @@ mod tests {
use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -4529,6 +4556,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tempfile::TempDir;
struct InstructionsTestCase {
slug: &'static str,
@@ -4869,6 +4897,82 @@ mod tests {
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn thread_rollback_over_compaction_restores_pre_compact_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let _rollout_home = attach_rollout_recorder(&sess).await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
sess.record_conversation_items(tc.as_ref(), &initial_context)
.await;
let user1 = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "turn 1 user".to_string(),
}],
end_turn: None,
};
let assistant1 = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "turn 1 assistant".to_string(),
}],
end_turn: None,
};
sess.record_conversation_items(tc.as_ref(), &[user1.clone(), assistant1.clone()])
.await;
let history_snapshot = sess.clone_history().await;
let user_messages = collect_user_messages(history_snapshot.raw_items());
let summary_text = "summary one";
let compacted = compact::build_compacted_history(
sess.build_initial_context(tc.as_ref()).await,
&user_messages,
summary_text,
);
sess.replace_history(compacted).await;
sess.recompute_token_usage(tc.as_ref()).await;
sess.persist_rollout_items(&[RolloutItem::Compacted(CompactedItem {
message: summary_text.to_string(),
replacement_history: None,
})])
.await;
let user2 = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "turn 2 user".to_string(),
}],
end_turn: None,
};
let assistant2 = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "turn 2 assistant".to_string(),
}],
end_turn: None,
};
sess.record_conversation_items(tc.as_ref(), &[user2, assistant2])
.await;
sess.flush_rollout().await;
handlers::thread_rollback(&sess, "sub-1".to_string(), 2).await;
let rollback_event = wait_for_thread_rolled_back(&rx).await;
assert_eq!(rollback_event.num_turns, 2);
let mut expected = initial_context;
expected.push(user1);
expected.push(assistant1);
let history = sess.clone_history().await;
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
@@ -5508,6 +5612,31 @@ mod tests {
state.initial_context_seeded = true;
}
async fn attach_rollout_recorder(sess: &Arc<Session>) -> TempDir {
let codex_home = TempDir::new().expect("create temp dir");
let config = build_test_config(codex_home.path()).await;
let base_instructions = sess.get_base_instructions().await;
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(
sess.conversation_id,
None,
SessionSource::Exec,
base_instructions,
Vec::new(),
),
None,
None,
)
.await
.expect("create rollout recorder");
{
let mut rollout = sess.services.rollout.lock().await;
*rollout = Some(recorder);
}
codex_home
}
#[tokio::test]
async fn refresh_mcp_servers_is_deferred_until_next_turn() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -31,7 +31,7 @@ use tracing::error;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md");
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
pub(crate) const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
pub(crate) fn should_use_remote_compact_task(
session: &Session,

View File

@@ -3,7 +3,12 @@
//! In core, "user turns" are detected by scanning `ResponseItem::Message` items and
//! interpreting them via `event_mapping::parse_turn_item(...)`.
use crate::compact::COMPACT_USER_MESSAGE_MAX_TOKENS;
use crate::compact::is_summary_message;
use crate::event_mapping;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
@@ -68,10 +73,187 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items[..cut_idx].to_vec()
}
#[derive(Debug, Clone)]
struct UserTurnRef {
source_index: usize,
replacement_item_index: Option<usize>,
text: String,
}
fn user_turn_from_item_with_replacement(
item: &ResponseItem,
source_index: usize,
replacement_item_index: Option<usize>,
) -> Option<UserTurnRef> {
let turn_item = event_mapping::parse_turn_item(item)?;
match turn_item {
TurnItem::UserMessage(user) => Some(UserTurnRef {
source_index,
replacement_item_index,
text: user.message(),
}),
_ => None,
}
}
fn user_turn_from_item(item: &ResponseItem, source_index: usize) -> Option<UserTurnRef> {
user_turn_from_item_with_replacement(item, source_index, None)
}
fn user_turn_from_replacement_item(
item: &ResponseItem,
source_index: usize,
replacement_item_index: usize,
) -> Option<UserTurnRef> {
user_turn_from_item_with_replacement(item, source_index, Some(replacement_item_index))
}
fn select_user_turns_for_compaction(turns: &[UserTurnRef]) -> Vec<UserTurnRef> {
let mut selected = Vec::new();
if COMPACT_USER_MESSAGE_MAX_TOKENS > 0 {
let mut remaining = COMPACT_USER_MESSAGE_MAX_TOKENS;
for turn in turns.iter().rev() {
if remaining == 0 {
break;
}
let tokens = approx_token_count(&turn.text);
if tokens <= remaining {
selected.push(turn.clone());
remaining = remaining.saturating_sub(tokens);
} else {
let truncated = truncate_text(&turn.text, TruncationPolicy::Tokens(remaining));
selected.push(UserTurnRef {
source_index: turn.source_index,
replacement_item_index: turn.replacement_item_index,
text: truncated,
});
break;
}
}
selected.reverse();
}
selected
}
fn user_turns_from_replacement(
replacement: &[ResponseItem],
source_index: usize,
) -> Vec<UserTurnRef> {
replacement
.iter()
.enumerate()
.filter_map(|(idx, item)| user_turn_from_replacement_item(item, source_index, idx))
.collect()
}
fn effective_user_turns(items: &[RolloutItem]) -> Vec<UserTurnRef> {
let mut user_turns = Vec::new();
for (idx, item) in items.iter().enumerate() {
match item {
RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) => {
if let Some(turn) = user_turn_from_item(item, idx) {
user_turns.push(turn);
}
}
RolloutItem::Compacted(compacted) => {
user_turns = match &compacted.replacement_history {
Some(replacement) => user_turns_from_replacement(replacement, idx),
None => {
let eligible: Vec<UserTurnRef> = user_turns
.iter()
.filter(|turn| !is_summary_message(&turn.text))
.cloned()
.collect();
let mut selected = select_user_turns_for_compaction(&eligible);
let summary = if compacted.message.is_empty() {
"(no summary available)".to_string()
} else {
compacted.message.clone()
};
selected.push(UserTurnRef {
source_index: idx,
replacement_item_index: None,
text: summary,
});
selected
}
};
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
if num_turns >= user_turns.len() {
user_turns.clear();
} else {
user_turns.truncate(user_turns.len().saturating_sub(num_turns));
}
}
_ => {}
}
}
user_turns
}
/// Drop the last `num_turns` user turns from a rollout stream.
///
/// This uses the "effective" user-turn list, which includes compaction summaries
/// and applies any rollback markers already present in the stream.
pub(crate) fn truncate_rollout_drop_last_n_user_turns(
items: &[RolloutItem],
num_turns: u32,
) -> Vec<RolloutItem> {
if num_turns == 0 {
return items.to_vec();
}
let user_turns = effective_user_turns(items);
let Some(_) = user_turns.first() else {
return items.to_vec();
};
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
if n_from_end >= user_turns.len() {
let Some(first_user_idx) = user_turns.first().map(|turn| turn.source_index) else {
return items.to_vec();
};
return items[..first_user_idx].to_vec();
}
let boundary = &user_turns[user_turns.len().saturating_sub(n_from_end)];
if let Some(replacement_item_index) = boundary.replacement_item_index {
let compaction_index = boundary.source_index;
let mut truncated = items[..=compaction_index].to_vec();
if let Some(RolloutItem::Compacted(compacted)) = truncated.last_mut()
&& let Some(replacement) = &mut compacted.replacement_history
{
replacement.truncate(replacement_item_index);
}
return truncated;
}
let cut_idx = boundary.source_index;
items[..cut_idx].to_vec()
}
/// Apply any `ThreadRolledBack` markers in a rollout stream and drop them.
pub(crate) fn apply_rollbacks_to_rollout(items: &[RolloutItem]) -> Vec<RolloutItem> {
let mut effective = Vec::new();
for item in items {
if let RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) = item {
effective = truncate_rollout_drop_last_n_user_turns(&effective, rollback.num_turns);
continue;
}
effective.push(item.clone());
}
effective
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codex::make_session_and_context;
use crate::compact::SUMMARY_PREFIX;
use crate::protocol::CompactedItem;
use assert_matches::assert_matches;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
@@ -100,6 +282,28 @@ mod tests {
}
}
fn system_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "system".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
}
}
fn user_input_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
}
}
#[test]
fn truncates_rollout_from_start_before_nth_user_only() {
let items = [
@@ -188,6 +392,133 @@ mod tests {
);
}
#[test]
fn effective_user_turns_use_replacement_history_for_compaction() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_input_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::Compacted(CompactedItem {
message: "ignored summary".to_string(),
replacement_history: Some(vec![
user_input_msg("r1"),
assistant_msg("ra1"),
user_input_msg("r2"),
]),
}),
RolloutItem::ResponseItem(user_input_msg("u2")),
];
let turns = effective_user_turns(&rollout_items);
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
assert_eq!(
texts,
vec!["r1".to_string(), "r2".to_string(), "u2".to_string()]
);
let indices: Vec<usize> = turns.iter().map(|turn| turn.source_index).collect();
assert_eq!(indices, vec![2, 2, 3]);
}
#[test]
fn truncate_rollout_drop_last_n_user_turns_oversize_clears_user_history() {
let summary = format!("{SUMMARY_PREFIX}\nsummary");
let system = system_msg("system");
let rollout_items = vec![
RolloutItem::ResponseItem(system.clone()),
RolloutItem::ResponseItem(user_input_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::ResponseItem(user_input_msg("u2")),
RolloutItem::ResponseItem(assistant_msg("a2")),
RolloutItem::Compacted(CompactedItem {
message: summary,
replacement_history: None,
}),
];
let truncated = truncate_rollout_drop_last_n_user_turns(&rollout_items, 99);
let expected = vec![RolloutItem::ResponseItem(system)];
assert_eq!(
serde_json::to_value(&truncated).unwrap(),
serde_json::to_value(&expected).unwrap()
);
}
#[test]
fn truncate_rollout_drop_last_n_user_turns_truncates_replacement_history() {
let replacement = vec![
user_input_msg("u1"),
assistant_msg("a1"),
user_input_msg("u2"),
assistant_msg("a2"),
];
let rollout_items = vec![
RolloutItem::ResponseItem(user_input_msg("pre")),
RolloutItem::Compacted(CompactedItem {
message: "ignored".to_string(),
replacement_history: Some(replacement),
}),
RolloutItem::ResponseItem(user_input_msg("u3")),
RolloutItem::ResponseItem(assistant_msg("a3")),
];
let truncated = truncate_rollout_drop_last_n_user_turns(&rollout_items, 2);
let expected = vec![
RolloutItem::ResponseItem(user_input_msg("pre")),
RolloutItem::Compacted(CompactedItem {
message: "ignored".to_string(),
replacement_history: Some(vec![user_input_msg("u1"), assistant_msg("a1")]),
}),
];
assert_eq!(
serde_json::to_value(&truncated).unwrap(),
serde_json::to_value(&expected).unwrap()
);
}
#[test]
fn effective_user_turns_skip_prior_compaction_summaries() {
let summary_one = format!("{SUMMARY_PREFIX}\nsummary one");
let summary_two = format!("{SUMMARY_PREFIX}\nsummary two");
let rollout_items = vec![
RolloutItem::ResponseItem(user_input_msg("u1")),
RolloutItem::Compacted(CompactedItem {
message: summary_one,
replacement_history: None,
}),
RolloutItem::ResponseItem(user_input_msg("u2")),
RolloutItem::Compacted(CompactedItem {
message: summary_two.clone(),
replacement_history: None,
}),
];
let turns = effective_user_turns(&rollout_items);
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
assert_eq!(texts, vec!["u1".to_string(), "u2".to_string(), summary_two]);
}
#[test]
fn effective_user_turns_use_placeholder_when_summary_empty() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_input_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::Compacted(CompactedItem {
message: String::new(),
replacement_history: None,
}),
];
let turns = effective_user_turns(&rollout_items);
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
assert_eq!(
texts,
vec!["u1".to_string(), "(no summary available)".to_string()]
);
let indices: Vec<usize> = turns.iter().map(|turn| turn.source_index).collect();
assert_eq!(indices, vec![0, 2]);
}
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -145,6 +145,119 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_rollback_restores_pre_compact_history() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "AFTER_ROLLBACK_REPLY"),
responses::ev_completed("resp-3"),
]),
],
)
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
},
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
];
responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::ThreadRollback { num_turns: 2 }).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ThreadRolledBack(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after rollback".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let after_rollback_body = responses_mock
.requests()
.last()
.expect("after rollback request missing")
.body_json()
.to_string();
assert!(
after_rollback_body.contains("FIRST_REMOTE_REPLY"),
"expected after-rollback request to include pre-compaction assistant reply"
);
assert!(
!after_rollback_body.contains("REMOTE_COMPACTED_SUMMARY"),
"expected after-rollback request to drop compaction summary"
);
assert!(
!after_rollback_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"expected after-rollback request to drop compaction encrypted content"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_runs_automatically() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -34,6 +34,7 @@ use tempfile::TempDir;
use wiremock::MockServer;
const AFTER_SECOND_RESUME: &str = "AFTER_SECOND_RESUME";
const AFTER_ROLLBACK: &str = "AFTER_ROLLBACK";
fn network_disabled() -> bool {
std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
@@ -655,6 +656,39 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
assert_eq!(json!(requests), expected);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Scenario: compact a conversation, rollback over the compaction marker, and ensure
/// the next turn includes the pre-compaction assistant history.
async fn rollback_over_compaction_restores_pre_compact_history() {
if network_disabled() {
println!("Skipping test because network is disabled in this sandbox");
return;
}
let server = MockServer::start().await;
let request_log = mount_compact_rollback_flow(&server).await;
let (_home, _config, _manager, base) = start_test_conversation(&server, None).await;
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await;
base.submit(Op::ThreadRollback { num_turns: 2 })
.await
.expect("submit rollback");
wait_for_event(&base, |ev| matches!(ev, EventMsg::ThreadRolledBack(_))).await;
user_turn(&base, AFTER_ROLLBACK).await;
let requests = gather_request_bodies(&request_log);
let after_rollback_body = requests.last().expect("after rollback request");
let body_text = after_rollback_body.to_string();
assert!(
body_text.contains(&json_fragment(FIRST_REPLY)),
"expected after-rollback request to include first assistant reply"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Scenario: after the forked branch is compacted, resuming again should reuse
/// the compacted history and only append the new user message.
@@ -925,6 +959,52 @@ async fn mount_initial_flow(server: &MockServer) -> Vec<ResponseMock> {
vec![first, compact, after_compact, after_resume, after_fork]
}
async fn mount_compact_rollback_flow(server: &MockServer) -> Vec<ResponseMock> {
let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", SUMMARY_TEXT),
ev_completed("r2"),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", "AFTER_COMPACT_REPLY"),
ev_completed("r3"),
]);
let sse4 = sse(vec![ev_completed("r4")]);
let match_first = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"hello world\"")
&& !body.contains(&format!("\"text\":\"{SUMMARY_TEXT}\""))
&& !body.contains("\"text\":\"AFTER_COMPACT\"")
&& !body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
};
let first = mount_sse_once_match(server, match_first, sse1).await;
let match_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY))
};
let compact = mount_sse_once_match(server, match_compact, sse2).await;
let match_after_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"AFTER_COMPACT\"")
&& !body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
};
let after_compact = mount_sse_once_match(server, match_after_compact, sse3).await;
let match_after_rollback = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
};
let after_rollback = mount_sse_once_match(server, match_after_rollback, sse4).await;
vec![first, compact, after_compact, after_rollback]
}
async fn mount_second_compact_flow(server: &MockServer) -> Vec<ResponseMock> {
let sse6 = sse(vec![
ev_assistant_message("m4", SUMMARY_TEXT),