diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index dcfaed6893..c10ec3a74f 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -75,6 +75,7 @@ async fn run_compact_task_inner( let extra_items: Vec = vec![initial_input_for_turn.clone().into()]; let mut turn_input = sess.turn_input_with_history(extra_items.clone()).await; let mut truncated_count = 0usize; + let mut trimmed_tails: Vec> = Vec::new(); let max_retries = turn_context.client.get_provider().stream_max_retries(); let mut retries = 0; @@ -120,8 +121,9 @@ async fn run_compact_task_inner( let mut prompt_items = turn_input.split_off(history_len); let trimmed = trim_recent_history_to_previous_user_message(&mut turn_input); turn_input.append(&mut prompt_items); - if trimmed > 0 { - truncated_count += trimmed; + if !trimmed.is_empty() { + truncated_count += trimmed.len(); + trimmed_tails.push(trimmed); retries = 0; continue; } @@ -166,7 +168,10 @@ async fn run_compact_task_inner( let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default(); let user_messages = collect_user_messages(&history_snapshot); let initial_context = sess.build_initial_context(turn_context.as_ref()); - let new_history = build_compacted_history(initial_context, &user_messages, &summary_text); + let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text); + for mut trimmed in trimmed_tails.into_iter().rev() { + new_history.append(&mut trimmed); + } sess.replace_history(new_history).await; let rollout_item = RolloutItem::Compacted(CompactedItem { @@ -184,22 +189,24 @@ async fn run_compact_task_inner( } /// Trim conversation history back to the previous user message boundary, removing that user turn. -fn trim_recent_history_to_previous_user_message(turn_input: &mut Vec) -> usize { +/// +/// Returns the removed items in their original order so they can be restored later. +fn trim_recent_history_to_previous_user_message( + turn_input: &mut Vec, +) -> Vec { if turn_input.is_empty() { - return 0; + return Vec::new(); } - let original_len = turn_input.len(); if let Some(last_user_index) = turn_input.iter().rposition(|item| { matches!( item, ResponseItem::Message { role, .. } if role == "user" ) }) { - turn_input.truncate(last_user_index); + turn_input.split_off(last_user_index) } else { - turn_input.clear(); + turn_input.drain(..).collect() } - original_len.saturating_sub(turn_input.len()) } pub fn content_items_to_text(content: &[ContentItem]) -> Option { diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index a59e5172fd..a16f4d5734 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -943,6 +943,181 @@ async fn manual_compact_trims_last_user_turn_with_function_calls_on_context_erro ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn manual_compact_restores_trimmed_tail_after_retry() { + skip_if_no_network!(); + + const FIRST_USER_MSG: &str = "first user turn"; + const FIRST_ASSISTANT_MSG: &str = "first assistant reply"; + const SECOND_USER_MSG: &str = "second user turn"; + const SECOND_ASSISTANT_MSG: &str = "second assistant reply"; + const THIRD_USER_MSG: &str = "post compact user"; + const THIRD_ASSISTANT_MSG: &str = "post compact assistant"; + + let server = start_mock_server().await; + + let first_turn = sse(vec![ + ev_assistant_message("assistant-first", FIRST_ASSISTANT_MSG), + ev_completed("resp-first"), + ]); + let second_turn = sse(vec![ + ev_assistant_message("assistant-second", SECOND_ASSISTANT_MSG), + ev_completed("resp-second"), + ]); + let compact_failed = sse_failed( + "resp-fail", + "context_length_exceeded", + CONTEXT_LIMIT_MESSAGE, + ); + let compact_retry = sse(vec![ + ev_assistant_message("assistant-summary", SUMMARY_TEXT), + ev_completed("resp-summary"), + ]); + let third_turn = sse(vec![ + ev_assistant_message("assistant-third", THIRD_ASSISTANT_MSG), + ev_completed("resp-third"), + ]); + + let request_log = mount_sse_sequence( + &server, + vec![ + first_turn, + second_turn, + compact_failed, + compact_retry, + third_turn, + ], + ) + .await; + + let model_provider = ModelProviderInfo { + base_url: Some(format!("{}/v1", server.uri())), + ..built_in_model_providers()["openai"].clone() + }; + + let home = TempDir::new().unwrap(); + let mut config = load_default_config_for_test(&home); + config.model_provider = model_provider; + config.model_auto_compact_token_limit = Some(200_000); + let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")) + .new_conversation(config) + .await + .unwrap() + .conversation; + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: FIRST_USER_MSG.into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: SECOND_USER_MSG.into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await.unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: THIRD_USER_MSG.into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 5, + "expected two user turns, two compact attempts, and a post-compact turn", + ); + + let retry_request = &requests[3]; + let retry_body = retry_request.body_json(); + let retry_input = retry_body + .get("input") + .and_then(Value::as_array) + .unwrap_or_else(|| panic!("retry request missing input array: {retry_body}")); + assert!( + retry_input.iter().all(|item| { + item.get("content") + .and_then(Value::as_array) + .and_then(|entries| entries.first()) + .and_then(|entry| entry.get("text")) + .and_then(Value::as_str) + .map(|text| text != SECOND_USER_MSG && text != SECOND_ASSISTANT_MSG) + .unwrap_or(true) + }), + "retry compact input should omit trimmed second turn", + ); + + let final_request = &requests[4]; + let body = final_request.body_json(); + let input_items = body + .get("input") + .and_then(Value::as_array) + .unwrap_or_else(|| panic!("final request missing input array: {body}")); + + fn message_index(items: &[Value], needle: &str) -> Option { + items.iter().position(|item| { + item.get("type").and_then(Value::as_str) == Some("message") + && item + .get("content") + .and_then(Value::as_array) + .and_then(|entries| entries.first()) + .and_then(|entry| entry.get("text")) + .and_then(Value::as_str) + .is_some_and(|text| text == needle) + }) + } + + let summary_index = input_items + .iter() + .position(|item| { + item.get("content") + .and_then(Value::as_array) + .and_then(|entries| entries.first()) + .and_then(|entry| entry.get("text")) + .and_then(Value::as_str) + .map(|text| text.contains(SUMMARY_TEXT)) + .unwrap_or(false) + }) + .expect("final request should include summary bridge"); + + let second_user_index = message_index(input_items, SECOND_USER_MSG) + .expect("trimmed second user message should remain in history"); + let second_assistant_index = message_index(input_items, SECOND_ASSISTANT_MSG) + .expect("trimmed assistant reply should remain in history"); + let third_user_index = message_index(input_items, THIRD_USER_MSG) + .expect("post-compact user turn should be present"); + + assert!( + summary_index < second_user_index, + "summary bridge should precede restored user message" + ); + assert!( + second_user_index < second_assistant_index, + "restored user message should precede assistant reply" + ); + assert!( + second_assistant_index < third_user_index, + "restored assistant reply should precede new user turn" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_events() { skip_if_no_network!();