mirror of
https://github.com/openai/codex.git
synced 2026-03-23 08:36:30 +03:00
Compare commits
8 Commits
starr/exec
...
owen/fix_r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b0a9ed548 | ||
|
|
6d657e0ebd | ||
|
|
e7ddb311da | ||
|
|
e43edf3caa | ||
|
|
da7ee5ce30 | ||
|
|
cb8147d07d | ||
|
|
638875028d | ||
|
|
7650c6d4da |
@@ -1356,7 +1356,7 @@ pub struct ThreadListParams {
|
||||
pub limit: Option<u32>,
|
||||
/// Optional sort key; defaults to created_at.
|
||||
pub sort_key: Option<ThreadSortKey>,
|
||||
/// Optional provider filter; when set, only sessions recorded under these
|
||||
/// Optional provider filter; when set, only threads recorded under these
|
||||
/// providers are returned. When present but empty, includes all providers.
|
||||
pub model_providers: Option<Vec<String>>,
|
||||
/// Optional source filter; when set, only sessions from these source kinds
|
||||
|
||||
@@ -84,7 +84,7 @@ Example (from OpenAI's official VSCode extension):
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
|
||||
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `thread/rollback` — drop the last N user turns from the thread’s effective history based on the rollout stream. If the rollback crosses a compaction marker, the compaction is undone (pre-compaction turns are restored). Persists a rollback marker so future resumes see the same history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
|
||||
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
|
||||
- `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
|
||||
|
||||
@@ -168,6 +168,7 @@ use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::rollout::map_session_init_error;
|
||||
use crate::rollout::metadata;
|
||||
use crate::rollout::truncation::apply_rollbacks_to_rollout;
|
||||
use crate::shell;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills::SkillError;
|
||||
@@ -1754,8 +1755,9 @@ impl Session {
|
||||
turn_context: &TurnContext,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<ResponseItem> {
|
||||
let rollout_items = apply_rollbacks_to_rollout(rollout_items);
|
||||
let mut history = ContextManager::new();
|
||||
for item in rollout_items {
|
||||
for item in &rollout_items {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
history.record_items(
|
||||
@@ -1776,9 +1778,6 @@ impl Session {
|
||||
history.replace(rebuilt);
|
||||
}
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -2520,7 +2519,10 @@ mod handlers {
|
||||
use crate::mcp::collect_mcp_snapshot_from_manager;
|
||||
use crate::mcp::effective_mcp_servers;
|
||||
use crate::review_prompts::resolve_review_request;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::session_index;
|
||||
use crate::rollout::truncation::apply_rollbacks_to_rollout;
|
||||
use crate::rollout::truncation::truncate_rollout_drop_last_n_user_turns;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::RegularTask;
|
||||
use crate::tasks::UndoTask;
|
||||
@@ -2975,13 +2977,36 @@ mod handlers {
|
||||
}
|
||||
|
||||
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
|
||||
let mut replaced = false;
|
||||
sess.flush_rollout().await;
|
||||
|
||||
let mut history = sess.clone_history().await;
|
||||
history.drop_last_n_user_turns(num_turns);
|
||||
let rollout_path = {
|
||||
let rollout = sess.services.rollout.lock().await;
|
||||
rollout.as_ref().map(|rec| rec.rollout_path.clone())
|
||||
};
|
||||
|
||||
// Replace with the raw items. We don't want to replace with a normalized
|
||||
// version of the history.
|
||||
sess.replace_history(history.raw_items().to_vec()).await;
|
||||
if let Some(path) = rollout_path
|
||||
&& let Ok(initial_history) = RolloutRecorder::get_rollout_history(path.as_path()).await
|
||||
{
|
||||
let rollout_items = initial_history.get_rollout_items();
|
||||
let effective = apply_rollbacks_to_rollout(rollout_items.as_slice());
|
||||
let truncated =
|
||||
truncate_rollout_drop_last_n_user_turns(effective.as_slice(), num_turns);
|
||||
let rebuilt = sess
|
||||
.reconstruct_history_from_rollout(turn_context.as_ref(), &truncated)
|
||||
.await;
|
||||
sess.replace_history(rebuilt).await;
|
||||
replaced = true;
|
||||
}
|
||||
|
||||
if !replaced {
|
||||
let mut history = sess.clone_history().await;
|
||||
history.drop_last_n_user_turns(num_turns);
|
||||
|
||||
// Replace with the raw items. We don't want to replace with a normalized
|
||||
// version of the history.
|
||||
sess.replace_history(history.raw_items().to_vec()).await;
|
||||
}
|
||||
sess.recompute_token_usage(turn_context.as_ref()).await;
|
||||
|
||||
sess.send_event_raw_flushed(Event {
|
||||
@@ -4502,6 +4527,8 @@ mod tests {
|
||||
use crate::protocol::TokenCountEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
use crate::tasks::SessionTaskContext;
|
||||
@@ -4529,6 +4556,7 @@ mod tests {
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration as StdDuration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
struct InstructionsTestCase {
|
||||
slug: &'static str,
|
||||
@@ -4869,6 +4897,82 @@ mod tests {
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_over_compaction_restores_pre_compact_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
let _rollout_home = attach_rollout_recorder(&sess).await;
|
||||
|
||||
let initial_context = sess.build_initial_context(tc.as_ref()).await;
|
||||
sess.record_conversation_items(tc.as_ref(), &initial_context)
|
||||
.await;
|
||||
|
||||
let user1 = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "turn 1 user".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
};
|
||||
let assistant1 = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: "turn 1 assistant".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
};
|
||||
sess.record_conversation_items(tc.as_ref(), &[user1.clone(), assistant1.clone()])
|
||||
.await;
|
||||
|
||||
let history_snapshot = sess.clone_history().await;
|
||||
let user_messages = collect_user_messages(history_snapshot.raw_items());
|
||||
let summary_text = "summary one";
|
||||
let compacted = compact::build_compacted_history(
|
||||
sess.build_initial_context(tc.as_ref()).await,
|
||||
&user_messages,
|
||||
summary_text,
|
||||
);
|
||||
sess.replace_history(compacted).await;
|
||||
sess.recompute_token_usage(tc.as_ref()).await;
|
||||
sess.persist_rollout_items(&[RolloutItem::Compacted(CompactedItem {
|
||||
message: summary_text.to_string(),
|
||||
replacement_history: None,
|
||||
})])
|
||||
.await;
|
||||
|
||||
let user2 = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "turn 2 user".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
};
|
||||
let assistant2 = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: "turn 2 assistant".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
};
|
||||
sess.record_conversation_items(tc.as_ref(), &[user2, assistant2])
|
||||
.await;
|
||||
sess.flush_rollout().await;
|
||||
|
||||
handlers::thread_rollback(&sess, "sub-1".to_string(), 2).await;
|
||||
let rollback_event = wait_for_thread_rolled_back(&rx).await;
|
||||
assert_eq!(rollback_event.num_turns, 2);
|
||||
|
||||
let mut expected = initial_context;
|
||||
expected.push(user1);
|
||||
expected.push(assistant1);
|
||||
|
||||
let history = sess.clone_history().await;
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
@@ -5508,6 +5612,31 @@ mod tests {
|
||||
state.initial_context_seeded = true;
|
||||
}
|
||||
|
||||
async fn attach_rollout_recorder(sess: &Arc<Session>) -> TempDir {
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
let config = build_test_config(codex_home.path()).await;
|
||||
let base_instructions = sess.get_base_instructions().await;
|
||||
let recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
RolloutRecorderParams::new(
|
||||
sess.conversation_id,
|
||||
None,
|
||||
SessionSource::Exec,
|
||||
base_instructions,
|
||||
Vec::new(),
|
||||
),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("create rollout recorder");
|
||||
{
|
||||
let mut rollout = sess.services.rollout.lock().await;
|
||||
*rollout = Some(recorder);
|
||||
}
|
||||
codex_home
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_mcp_servers_is_deferred_until_next_turn() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
@@ -31,7 +31,7 @@ use tracing::error;
|
||||
|
||||
pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md");
|
||||
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
|
||||
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
|
||||
pub(crate) const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
|
||||
|
||||
pub(crate) fn should_use_remote_compact_task(
|
||||
session: &Session,
|
||||
|
||||
@@ -3,7 +3,12 @@
|
||||
//! In core, "user turns" are detected by scanning `ResponseItem::Message` items and
|
||||
//! interpreting them via `event_mapping::parse_turn_item(...)`.
|
||||
|
||||
use crate::compact::COMPACT_USER_MESSAGE_MAX_TOKENS;
|
||||
use crate::compact::is_summary_message;
|
||||
use crate::event_mapping;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::approx_token_count;
|
||||
use crate::truncate::truncate_text;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
@@ -68,10 +73,187 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct UserTurnRef {
|
||||
source_index: usize,
|
||||
replacement_item_index: Option<usize>,
|
||||
text: String,
|
||||
}
|
||||
|
||||
fn user_turn_from_item_with_replacement(
|
||||
item: &ResponseItem,
|
||||
source_index: usize,
|
||||
replacement_item_index: Option<usize>,
|
||||
) -> Option<UserTurnRef> {
|
||||
let turn_item = event_mapping::parse_turn_item(item)?;
|
||||
match turn_item {
|
||||
TurnItem::UserMessage(user) => Some(UserTurnRef {
|
||||
source_index,
|
||||
replacement_item_index,
|
||||
text: user.message(),
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn user_turn_from_item(item: &ResponseItem, source_index: usize) -> Option<UserTurnRef> {
|
||||
user_turn_from_item_with_replacement(item, source_index, None)
|
||||
}
|
||||
|
||||
fn user_turn_from_replacement_item(
|
||||
item: &ResponseItem,
|
||||
source_index: usize,
|
||||
replacement_item_index: usize,
|
||||
) -> Option<UserTurnRef> {
|
||||
user_turn_from_item_with_replacement(item, source_index, Some(replacement_item_index))
|
||||
}
|
||||
|
||||
fn select_user_turns_for_compaction(turns: &[UserTurnRef]) -> Vec<UserTurnRef> {
|
||||
let mut selected = Vec::new();
|
||||
if COMPACT_USER_MESSAGE_MAX_TOKENS > 0 {
|
||||
let mut remaining = COMPACT_USER_MESSAGE_MAX_TOKENS;
|
||||
for turn in turns.iter().rev() {
|
||||
if remaining == 0 {
|
||||
break;
|
||||
}
|
||||
let tokens = approx_token_count(&turn.text);
|
||||
if tokens <= remaining {
|
||||
selected.push(turn.clone());
|
||||
remaining = remaining.saturating_sub(tokens);
|
||||
} else {
|
||||
let truncated = truncate_text(&turn.text, TruncationPolicy::Tokens(remaining));
|
||||
selected.push(UserTurnRef {
|
||||
source_index: turn.source_index,
|
||||
replacement_item_index: turn.replacement_item_index,
|
||||
text: truncated,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
selected.reverse();
|
||||
}
|
||||
selected
|
||||
}
|
||||
|
||||
fn user_turns_from_replacement(
|
||||
replacement: &[ResponseItem],
|
||||
source_index: usize,
|
||||
) -> Vec<UserTurnRef> {
|
||||
replacement
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, item)| user_turn_from_replacement_item(item, source_index, idx))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn effective_user_turns(items: &[RolloutItem]) -> Vec<UserTurnRef> {
|
||||
let mut user_turns = Vec::new();
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) => {
|
||||
if let Some(turn) = user_turn_from_item(item, idx) {
|
||||
user_turns.push(turn);
|
||||
}
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
user_turns = match &compacted.replacement_history {
|
||||
Some(replacement) => user_turns_from_replacement(replacement, idx),
|
||||
None => {
|
||||
let eligible: Vec<UserTurnRef> = user_turns
|
||||
.iter()
|
||||
.filter(|turn| !is_summary_message(&turn.text))
|
||||
.cloned()
|
||||
.collect();
|
||||
let mut selected = select_user_turns_for_compaction(&eligible);
|
||||
let summary = if compacted.message.is_empty() {
|
||||
"(no summary available)".to_string()
|
||||
} else {
|
||||
compacted.message.clone()
|
||||
};
|
||||
selected.push(UserTurnRef {
|
||||
source_index: idx,
|
||||
replacement_item_index: None,
|
||||
text: summary,
|
||||
});
|
||||
selected
|
||||
}
|
||||
};
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
|
||||
if num_turns >= user_turns.len() {
|
||||
user_turns.clear();
|
||||
} else {
|
||||
user_turns.truncate(user_turns.len().saturating_sub(num_turns));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
user_turns
|
||||
}
|
||||
|
||||
/// Drop the last `num_turns` user turns from a rollout stream.
|
||||
///
|
||||
/// This uses the "effective" user-turn list, which includes compaction summaries
|
||||
/// and applies any rollback markers already present in the stream.
|
||||
pub(crate) fn truncate_rollout_drop_last_n_user_turns(
|
||||
items: &[RolloutItem],
|
||||
num_turns: u32,
|
||||
) -> Vec<RolloutItem> {
|
||||
if num_turns == 0 {
|
||||
return items.to_vec();
|
||||
}
|
||||
|
||||
let user_turns = effective_user_turns(items);
|
||||
let Some(_) = user_turns.first() else {
|
||||
return items.to_vec();
|
||||
};
|
||||
|
||||
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
|
||||
if n_from_end >= user_turns.len() {
|
||||
let Some(first_user_idx) = user_turns.first().map(|turn| turn.source_index) else {
|
||||
return items.to_vec();
|
||||
};
|
||||
return items[..first_user_idx].to_vec();
|
||||
}
|
||||
|
||||
let boundary = &user_turns[user_turns.len().saturating_sub(n_from_end)];
|
||||
if let Some(replacement_item_index) = boundary.replacement_item_index {
|
||||
let compaction_index = boundary.source_index;
|
||||
let mut truncated = items[..=compaction_index].to_vec();
|
||||
if let Some(RolloutItem::Compacted(compacted)) = truncated.last_mut()
|
||||
&& let Some(replacement) = &mut compacted.replacement_history
|
||||
{
|
||||
replacement.truncate(replacement_item_index);
|
||||
}
|
||||
return truncated;
|
||||
}
|
||||
|
||||
let cut_idx = boundary.source_index;
|
||||
|
||||
items[..cut_idx].to_vec()
|
||||
}
|
||||
|
||||
/// Apply any `ThreadRolledBack` markers in a rollout stream and drop them.
|
||||
pub(crate) fn apply_rollbacks_to_rollout(items: &[RolloutItem]) -> Vec<RolloutItem> {
|
||||
let mut effective = Vec::new();
|
||||
for item in items {
|
||||
if let RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) = item {
|
||||
effective = truncate_rollout_drop_last_n_user_turns(&effective, rollback.num_turns);
|
||||
continue;
|
||||
}
|
||||
effective.push(item.clone());
|
||||
}
|
||||
effective
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::compact::SUMMARY_PREFIX;
|
||||
use crate::protocol::CompactedItem;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
@@ -100,6 +282,28 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn system_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "system".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn user_input_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncates_rollout_from_start_before_nth_user_only() {
|
||||
let items = [
|
||||
@@ -188,6 +392,133 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_user_turns_use_replacement_history_for_compaction() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_input_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "ignored summary".to_string(),
|
||||
replacement_history: Some(vec![
|
||||
user_input_msg("r1"),
|
||||
assistant_msg("ra1"),
|
||||
user_input_msg("r2"),
|
||||
]),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_input_msg("u2")),
|
||||
];
|
||||
|
||||
let turns = effective_user_turns(&rollout_items);
|
||||
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
|
||||
assert_eq!(
|
||||
texts,
|
||||
vec!["r1".to_string(), "r2".to_string(), "u2".to_string()]
|
||||
);
|
||||
|
||||
let indices: Vec<usize> = turns.iter().map(|turn| turn.source_index).collect();
|
||||
assert_eq!(indices, vec![2, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_rollout_drop_last_n_user_turns_oversize_clears_user_history() {
|
||||
let summary = format!("{SUMMARY_PREFIX}\nsummary");
|
||||
let system = system_msg("system");
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(system.clone()),
|
||||
RolloutItem::ResponseItem(user_input_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::ResponseItem(user_input_msg("u2")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a2")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: summary,
|
||||
replacement_history: None,
|
||||
}),
|
||||
];
|
||||
|
||||
let truncated = truncate_rollout_drop_last_n_user_turns(&rollout_items, 99);
|
||||
let expected = vec![RolloutItem::ResponseItem(system)];
|
||||
assert_eq!(
|
||||
serde_json::to_value(&truncated).unwrap(),
|
||||
serde_json::to_value(&expected).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_rollout_drop_last_n_user_turns_truncates_replacement_history() {
|
||||
let replacement = vec![
|
||||
user_input_msg("u1"),
|
||||
assistant_msg("a1"),
|
||||
user_input_msg("u2"),
|
||||
assistant_msg("a2"),
|
||||
];
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_input_msg("pre")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "ignored".to_string(),
|
||||
replacement_history: Some(replacement),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_input_msg("u3")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a3")),
|
||||
];
|
||||
|
||||
let truncated = truncate_rollout_drop_last_n_user_turns(&rollout_items, 2);
|
||||
let expected = vec![
|
||||
RolloutItem::ResponseItem(user_input_msg("pre")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "ignored".to_string(),
|
||||
replacement_history: Some(vec![user_input_msg("u1"), assistant_msg("a1")]),
|
||||
}),
|
||||
];
|
||||
assert_eq!(
|
||||
serde_json::to_value(&truncated).unwrap(),
|
||||
serde_json::to_value(&expected).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_user_turns_skip_prior_compaction_summaries() {
|
||||
let summary_one = format!("{SUMMARY_PREFIX}\nsummary one");
|
||||
let summary_two = format!("{SUMMARY_PREFIX}\nsummary two");
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_input_msg("u1")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: summary_one,
|
||||
replacement_history: None,
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_input_msg("u2")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: summary_two.clone(),
|
||||
replacement_history: None,
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = effective_user_turns(&rollout_items);
|
||||
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
|
||||
assert_eq!(texts, vec!["u1".to_string(), "u2".to_string(), summary_two]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_user_turns_use_placeholder_when_summary_empty() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_input_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: None,
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = effective_user_turns(&rollout_items);
|
||||
let texts: Vec<String> = turns.iter().map(|turn| turn.text.clone()).collect();
|
||||
assert_eq!(
|
||||
texts,
|
||||
vec!["u1".to_string(), "(no summary available)".to_string()]
|
||||
);
|
||||
|
||||
let indices: Vec<usize> = turns.iter().map(|turn| turn.source_index).collect();
|
||||
assert_eq!(indices, vec![0, 2]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
@@ -145,6 +145,119 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_rollback_restores_pre_compact_history() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let harness = TestCodexHarness::with_builder(
|
||||
test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.features.enable(Feature::RemoteCompaction);
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let codex = harness.test().codex.clone();
|
||||
|
||||
let responses_mock = responses::mount_sse_sequence(
|
||||
harness.server(),
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("m3", "AFTER_ROLLBACK_REPLY"),
|
||||
responses::ev_completed("resp-3"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let compacted_history = vec![
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
},
|
||||
ResponseItem::Compaction {
|
||||
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
|
||||
},
|
||||
];
|
||||
responses::mount_compact_json_once(
|
||||
harness.server(),
|
||||
serde_json::json!({ "output": compacted_history }),
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello remote compact".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex.submit(Op::Compact).await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "after compact".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
codex.submit(Op::ThreadRollback { num_turns: 2 }).await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ThreadRolledBack(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "after rollback".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let after_rollback_body = responses_mock
|
||||
.requests()
|
||||
.last()
|
||||
.expect("after rollback request missing")
|
||||
.body_json()
|
||||
.to_string();
|
||||
assert!(
|
||||
after_rollback_body.contains("FIRST_REMOTE_REPLY"),
|
||||
"expected after-rollback request to include pre-compaction assistant reply"
|
||||
);
|
||||
assert!(
|
||||
!after_rollback_body.contains("REMOTE_COMPACTED_SUMMARY"),
|
||||
"expected after-rollback request to drop compaction summary"
|
||||
);
|
||||
assert!(
|
||||
!after_rollback_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
|
||||
"expected after-rollback request to drop compaction encrypted content"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_compact_runs_automatically() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -34,6 +34,7 @@ use tempfile::TempDir;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const AFTER_SECOND_RESUME: &str = "AFTER_SECOND_RESUME";
|
||||
const AFTER_ROLLBACK: &str = "AFTER_ROLLBACK";
|
||||
|
||||
fn network_disabled() -> bool {
|
||||
std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
|
||||
@@ -655,6 +656,39 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
||||
assert_eq!(json!(requests), expected);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
/// Scenario: compact a conversation, rollback over the compaction marker, and ensure
|
||||
/// the next turn includes the pre-compaction assistant history.
|
||||
async fn rollback_over_compaction_restores_pre_compact_history() {
|
||||
if network_disabled() {
|
||||
println!("Skipping test because network is disabled in this sandbox");
|
||||
return;
|
||||
}
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let request_log = mount_compact_rollback_flow(&server).await;
|
||||
let (_home, _config, _manager, base) = start_test_conversation(&server, None).await;
|
||||
|
||||
user_turn(&base, "hello world").await;
|
||||
compact_conversation(&base).await;
|
||||
user_turn(&base, "AFTER_COMPACT").await;
|
||||
|
||||
base.submit(Op::ThreadRollback { num_turns: 2 })
|
||||
.await
|
||||
.expect("submit rollback");
|
||||
wait_for_event(&base, |ev| matches!(ev, EventMsg::ThreadRolledBack(_))).await;
|
||||
|
||||
user_turn(&base, AFTER_ROLLBACK).await;
|
||||
|
||||
let requests = gather_request_bodies(&request_log);
|
||||
let after_rollback_body = requests.last().expect("after rollback request");
|
||||
let body_text = after_rollback_body.to_string();
|
||||
assert!(
|
||||
body_text.contains(&json_fragment(FIRST_REPLY)),
|
||||
"expected after-rollback request to include first assistant reply"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
/// Scenario: after the forked branch is compacted, resuming again should reuse
|
||||
/// the compacted history and only append the new user message.
|
||||
@@ -925,6 +959,52 @@ async fn mount_initial_flow(server: &MockServer) -> Vec<ResponseMock> {
|
||||
vec![first, compact, after_compact, after_resume, after_fork]
|
||||
}
|
||||
|
||||
async fn mount_compact_rollback_flow(server: &MockServer) -> Vec<ResponseMock> {
|
||||
let sse1 = sse(vec![
|
||||
ev_assistant_message("m1", FIRST_REPLY),
|
||||
ev_completed("r1"),
|
||||
]);
|
||||
let sse2 = sse(vec![
|
||||
ev_assistant_message("m2", SUMMARY_TEXT),
|
||||
ev_completed("r2"),
|
||||
]);
|
||||
let sse3 = sse(vec![
|
||||
ev_assistant_message("m3", "AFTER_COMPACT_REPLY"),
|
||||
ev_completed("r3"),
|
||||
]);
|
||||
let sse4 = sse(vec![ev_completed("r4")]);
|
||||
|
||||
let match_first = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains("\"text\":\"hello world\"")
|
||||
&& !body.contains(&format!("\"text\":\"{SUMMARY_TEXT}\""))
|
||||
&& !body.contains("\"text\":\"AFTER_COMPACT\"")
|
||||
&& !body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
|
||||
};
|
||||
let first = mount_sse_once_match(server, match_first, sse1).await;
|
||||
|
||||
let match_compact = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY))
|
||||
};
|
||||
let compact = mount_sse_once_match(server, match_compact, sse2).await;
|
||||
|
||||
let match_after_compact = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains("\"text\":\"AFTER_COMPACT\"")
|
||||
&& !body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
|
||||
};
|
||||
let after_compact = mount_sse_once_match(server, match_after_compact, sse3).await;
|
||||
|
||||
let match_after_rollback = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(&format!("\"text\":\"{AFTER_ROLLBACK}\""))
|
||||
};
|
||||
let after_rollback = mount_sse_once_match(server, match_after_rollback, sse4).await;
|
||||
|
||||
vec![first, compact, after_compact, after_rollback]
|
||||
}
|
||||
|
||||
async fn mount_second_compact_flow(server: &MockServer) -> Vec<ResponseMock> {
|
||||
let sse6 = sse(vec![
|
||||
ev_assistant_message("m4", SUMMARY_TEXT),
|
||||
|
||||
Reference in New Issue
Block a user