use std::sync::Arc; use crate::ModelProviderInfo; use crate::Prompt; use crate::client::ModelClientSession; use crate::client_common::ResponseEvent; #[cfg(test)] use crate::codex::PreviousTurnSettings; use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::CompactedItem; use crate::protocol::EventMsg; use crate::protocol::TurnStartedEvent; use crate::protocol::WarningEvent; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; use futures::prelude::*; use tracing::error; pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md"); pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; /// Controls whether compaction replacement history must include initial context. /// /// Pre-turn/manual compaction variants use `DoNotInject`: they replace history with a summary and /// clear `reference_context_item`, so the next regular turn will fully reinject initial context /// after compaction. /// /// Mid-turn compaction must use `BeforeLastUserMessage` because the model is trained to see the /// compaction summary as the last item in history after mid-turn compaction; we therefore inject /// initial context into the replacement history just above the last real user message. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub(crate) enum InitialContextInjection { BeforeLastUserMessage, DoNotInject, } pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool { provider.is_openai() } pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, initial_context_injection: InitialContextInjection, ) -> CodexResult<()> { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { text: prompt, // Compaction prompt is synthesized; no UI element ranges to preserve. text_elements: Vec::new(), }]; run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?; Ok(()) } pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, input: Vec, ) -> CodexResult<()> { let start_event = EventMsg::TurnStarted(TurnStartedEvent { turn_id: turn_context.sub_id.clone(), model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner( sess.clone(), turn_context, input, InitialContextInjection::DoNotInject, ) .await } async fn run_compact_task_inner( sess: Arc, turn_context: Arc, input: Vec, initial_context_injection: InitialContextInjection, ) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(&turn_context, &compaction_item) .await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut history = sess.clone_history().await; history.record_items( &[initial_input_for_turn.into()], turn_context.truncation_policy, ); let mut truncated_count = 0usize; let max_retries = turn_context.provider.stream_max_retries(); let mut retries = 0; let mut client_session = sess.services.model_client.new_session(); // Reuse one client session so turn-scoped state (sticky routing, websocket append tracking) // survives retries within this compact turn. loop { // Clone is required because of the loop let turn_input = history .clone() .for_prompt(&turn_context.model_info.input_modalities); let turn_input_len = turn_input.len(); let prompt = Prompt { input: turn_input, base_instructions: sess.get_base_instructions().await, personality: turn_context.personality, ..Default::default() }; let turn_metadata_header = turn_context.turn_metadata_state.current_header_value(); let attempt_result = drain_to_completed( &sess, turn_context.as_ref(), &mut client_session, turn_metadata_header.as_deref(), &prompt, ) .await; match attempt_result { Ok(()) => { if truncated_count > 0 { sess.notify_background_event( turn_context.as_ref(), format!( "Trimmed {truncated_count} older thread item(s) before compacting so the prompt fits the model context window." ), ) .await; } break; } Err(CodexErr::Interrupted) => { return Err(CodexErr::Interrupted); } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input_len > 1 { // Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact. error!( "Context window exceeded while compacting; removing oldest history item. Error: {e}" ); history.remove_first_item(); truncated_count += 1; retries = 0; continue; } sess.set_total_tokens_full(turn_context.as_ref()).await; let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; return Err(e); } Err(e) => { if retries < max_retries { retries += 1; let delay = backoff(retries); sess.notify_stream_error( turn_context.as_ref(), format!("Reconnecting... {retries}/{max_retries}"), e, ) .await; tokio::time::sleep(delay).await; continue; } else { let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; return Err(e); } } } } let history_snapshot = sess.clone_history().await; let history_items = history_snapshot.raw_items(); let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default(); let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}"); let user_messages = collect_user_messages(history_items); let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text); if matches!( initial_context_injection, InitialContextInjection::BeforeLastUserMessage ) { let initial_context = sess.build_initial_context(turn_context.as_ref()).await; new_history = insert_initial_context_before_last_real_user_or_summary(new_history, initial_context); } let ghost_snapshots: Vec = history_items .iter() .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .cloned() .collect(); new_history.extend(ghost_snapshots); let reference_context_item = match initial_context_injection { InitialContextInjection::DoNotInject => None, InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()), }; let compacted_item = CompactedItem { message: summary_text.clone(), replacement_history: Some(new_history.clone()), }; sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; sess.recompute_token_usage(&turn_context).await; sess.emit_turn_item_completed(&turn_context, compaction_item) .await; let warning = EventMsg::Warning(WarningEvent { message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(), }); sess.send_event(&turn_context, warning).await; Ok(()) } pub fn content_items_to_text(content: &[ContentItem]) -> Option { let mut pieces = Vec::new(); for item in content { match item { ContentItem::InputText { text } | ContentItem::OutputText { text } => { if !text.is_empty() { pieces.push(text.as_str()); } } ContentItem::InputImage { .. } => {} } } if pieces.is_empty() { None } else { Some(pieces.join("\n")) } } pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { items .iter() .filter_map(|item| match crate::event_mapping::parse_turn_item(item) { Some(TurnItem::UserMessage(user)) => { if is_summary_message(&user.message()) { None } else { Some(user.message()) } } _ => None, }) .collect() } pub(crate) fn is_summary_message(message: &str) -> bool { message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str()) } /// Inserts canonical initial context into compacted replacement history at the /// model-expected boundary. /// /// Placement rules: /// - Prefer immediately before the last real user message. /// - If no real user messages remain, insert before the compaction summary so /// the summary stays last. /// - If there are no user messages, insert before the last compaction item so /// that item remains last (remote compaction may return only compaction items). /// - If there are no user messages or compaction items, append the context. pub(crate) fn insert_initial_context_before_last_real_user_or_summary( mut compacted_history: Vec, initial_context: Vec, ) -> Vec { let mut last_user_or_summary_index = None; let mut last_real_user_index = None; for (i, item) in compacted_history.iter().enumerate().rev() { let Some(TurnItem::UserMessage(user)) = crate::event_mapping::parse_turn_item(item) else { continue; }; // Compaction summaries are encoded as user messages, so track both: // the last real user message (preferred insertion point) and the last // user-message-like item (fallback summary insertion point). last_user_or_summary_index.get_or_insert(i); if !is_summary_message(&user.message()) { last_real_user_index = Some(i); break; } } let last_compaction_index = compacted_history .iter() .enumerate() .rev() .find_map(|(i, item)| matches!(item, ResponseItem::Compaction { .. }).then_some(i)); let insertion_index = last_real_user_index .or(last_user_or_summary_index) .or(last_compaction_index); // Re-inject canonical context from the current session since we stripped it // from the pre-compaction history. Prefer placing it before the last real // user message; if there is no real user message left, place it before the // summary or compaction item so the compaction item remains last. if let Some(insertion_index) = insertion_index { compacted_history.splice(insertion_index..insertion_index, initial_context); } else { compacted_history.extend(initial_context); } compacted_history } pub(crate) fn build_compacted_history( initial_context: Vec, user_messages: &[String], summary_text: &str, ) -> Vec { build_compacted_history_with_limit( initial_context, user_messages, summary_text, COMPACT_USER_MESSAGE_MAX_TOKENS, ) } fn build_compacted_history_with_limit( mut history: Vec, user_messages: &[String], summary_text: &str, max_tokens: usize, ) -> Vec { let mut selected_messages: Vec = Vec::new(); if max_tokens > 0 { let mut remaining = max_tokens; for message in user_messages.iter().rev() { if remaining == 0 { break; } let tokens = approx_token_count(message); if tokens <= remaining { selected_messages.push(message.clone()); remaining = remaining.saturating_sub(tokens); } else { let truncated = truncate_text(message, TruncationPolicy::Tokens(remaining)); selected_messages.push(truncated); break; } } selected_messages.reverse(); } for message in &selected_messages { history.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: message.clone(), }], end_turn: None, phase: None, }); } let summary_text = if summary_text.is_empty() { "(no summary available)".to_string() } else { summary_text.to_string() }; history.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: summary_text }], end_turn: None, phase: None, }); history } async fn drain_to_completed( sess: &Session, turn_context: &TurnContext, client_session: &mut ModelClientSession, turn_metadata_header: Option<&str>, prompt: &Prompt, ) -> CodexResult<()> { let mut stream = client_session .stream( prompt, &turn_context.model_info, &turn_context.otel_manager, turn_context.reasoning_effort, turn_context.reasoning_summary, turn_context.config.service_tier, turn_metadata_header, ) .await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { return Err(CodexErr::Stream( "stream closed before response.completed".into(), None, )); }; match event { Ok(ResponseEvent::OutputItemDone(item)) => { sess.record_into_history(std::slice::from_ref(&item), turn_context) .await; } Ok(ResponseEvent::ServerReasoningIncluded(included)) => { sess.set_server_reasoning_included(included).await; } Ok(ResponseEvent::RateLimits(snapshot)) => { sess.update_rate_limits(turn_context, snapshot).await; } Ok(ResponseEvent::Completed { token_usage, .. }) => { sess.update_token_usage_info(turn_context, token_usage.as_ref()) .await; return Ok(()); } Ok(_) => continue, Err(e) => return Err(e), } } } #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; async fn process_compacted_history_with_test_session( compacted_history: Vec, previous_turn_settings: Option<&PreviousTurnSettings>, ) -> (Vec, Vec) { let (session, turn_context) = crate::codex::make_session_and_context().await; session .set_previous_turn_settings(previous_turn_settings.cloned()) .await; let initial_context = session.build_initial_context(&turn_context).await; let refreshed = crate::compact_remote::process_compacted_history( &session, &turn_context, compacted_history, InitialContextInjection::BeforeLastUserMessage, ) .await; (refreshed, initial_context) } #[test] fn content_items_to_text_joins_non_empty_segments() { let items = vec![ ContentItem::InputText { text: "hello".to_string(), }, ContentItem::OutputText { text: String::new(), }, ContentItem::OutputText { text: "world".to_string(), }, ]; let joined = content_items_to_text(&items); assert_eq!(Some("hello\nworld".to_string()), joined); } #[test] fn content_items_to_text_ignores_image_only_content() { let items = vec![ContentItem::InputImage { image_url: "file://image.png".to_string(), }]; let joined = content_items_to_text(&items); assert_eq!(None, joined); } #[test] fn collect_user_messages_extracts_user_text_only() { let items = vec![ ResponseItem::Message { id: Some("assistant".to_string()), role: "assistant".to_string(), content: vec![ContentItem::OutputText { text: "ignored".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: Some("user".to_string()), role: "user".to_string(), content: vec![ContentItem::InputText { text: "first".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Other, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["first".to_string()], collected); } #[test] fn collect_user_messages_filters_session_prefix_entries() { let items = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: r#"# AGENTS.md instructions for project do things "# .to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "cwd=/tmp".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "real user message".to_string(), }], end_turn: None, phase: None, }, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["real user message".to_string()], collected); } #[test] fn build_token_limited_compacted_history_truncates_overlong_user_messages() { // Use a small truncation limit so the test remains fast while still validating // that oversized user content is truncated. let max_tokens = 16; let big = "word ".repeat(200); let history = super::build_compacted_history_with_limit( Vec::new(), std::slice::from_ref(&big), "SUMMARY", max_tokens, ); assert_eq!(history.len(), 2); let truncated_message = &history[0]; let summary_message = &history[1]; let truncated_text = match truncated_message { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content).unwrap_or_default() } other => panic!("unexpected item in history: {other:?}"), }; assert!( truncated_text.contains("tokens truncated"), "expected truncation marker in truncated user message" ); assert!( !truncated_text.contains(&big), "truncated user message should not include the full oversized user text" ); let summary_text = match summary_message { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content).unwrap_or_default() } other => panic!("unexpected item in history: {other:?}"), }; assert_eq!(summary_text, "SUMMARY"); } #[test] fn build_token_limited_compacted_history_appends_summary_message() { let initial_context: Vec = Vec::new(); let user_messages = vec!["first user message".to_string()]; let summary_text = "summary text"; let history = build_compacted_history(initial_context, &user_messages, summary_text); assert!( !history.is_empty(), "expected compacted history to include summary" ); let last = history.last().expect("history should have a summary entry"); let summary = match last { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content).unwrap_or_default() } other => panic!("expected summary message, found {other:?}"), }; assert_eq!(summary, summary_text); } #[tokio::test] async fn process_compacted_history_replaces_developer_messages() { let compacted_history = vec![ ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "stale permissions".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "stale personality".to_string(), }], end_turn: None, phase: None, }, ]; let (refreshed, mut expected) = process_compacted_history_with_test_session(compacted_history, None).await; expected.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }); assert_eq!(refreshed, expected); } #[tokio::test] async fn process_compacted_history_reinjects_full_initial_context() { let compacted_history = vec![ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }]; let (refreshed, mut expected) = process_compacted_history_with_test_session(compacted_history, None).await; expected.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }); assert_eq!(refreshed, expected); } #[tokio::test] async fn process_compacted_history_drops_non_user_content_messages() { let compacted_history = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: r#"# AGENTS.md instructions for /repo keep me updated "# .to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: r#" /repo zsh "# .to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: r#" turn-1 interrupted "# .to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "stale developer instructions".to_string(), }], end_turn: None, phase: None, }, ]; let (refreshed, mut expected) = process_compacted_history_with_test_session(compacted_history, None).await; expected.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }); assert_eq!(refreshed, expected); } #[tokio::test] async fn process_compacted_history_inserts_context_before_last_real_user_message_only() { let compacted_history = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "older user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: format!("{SUMMARY_PREFIX}\nsummary text"), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "latest user".to_string(), }], end_turn: None, phase: None, }, ]; let (refreshed, initial_context) = process_compacted_history_with_test_session(compacted_history, None).await; let mut expected = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "older user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: format!("{SUMMARY_PREFIX}\nsummary text"), }], end_turn: None, phase: None, }, ]; expected.extend(initial_context); expected.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "latest user".to_string(), }], end_turn: None, phase: None, }); assert_eq!(refreshed, expected); } #[tokio::test] async fn process_compacted_history_reinjects_model_switch_message() { let compacted_history = vec![ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }]; let previous_turn_settings = PreviousTurnSettings { model: "previous-regular-model".to_string(), realtime_active: None, }; let (refreshed, initial_context) = process_compacted_history_with_test_session( compacted_history, Some(&previous_turn_settings), ) .await; let ResponseItem::Message { role, content, .. } = &initial_context[0] else { panic!("expected developer message"); }; assert_eq!(role, "developer"); let [ContentItem::InputText { text }, ..] = content.as_slice() else { panic!("expected developer text"); }; assert!(text.contains("")); let mut expected = initial_context; expected.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "summary".to_string(), }], end_turn: None, phase: None, }); assert_eq!(refreshed, expected); } #[test] fn insert_initial_context_before_last_real_user_or_summary_keeps_summary_last() { let compacted_history = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "older user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "latest user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: format!("{SUMMARY_PREFIX}\nsummary text"), }], end_turn: None, phase: None, }, ]; let initial_context = vec![ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "fresh permissions".to_string(), }], end_turn: None, phase: None, }]; let refreshed = insert_initial_context_before_last_real_user_or_summary( compacted_history, initial_context, ); let expected = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "older user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "fresh permissions".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "latest user".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: format!("{SUMMARY_PREFIX}\nsummary text"), }], end_turn: None, phase: None, }, ]; assert_eq!(refreshed, expected); } #[test] fn insert_initial_context_before_last_real_user_or_summary_keeps_compaction_last() { let compacted_history = vec![ResponseItem::Compaction { encrypted_content: "encrypted".to_string(), }]; let initial_context = vec![ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "fresh permissions".to_string(), }], end_turn: None, phase: None, }]; let refreshed = insert_initial_context_before_last_real_user_or_summary( compacted_history, initial_context, ); let expected = vec![ ResponseItem::Message { id: None, role: "developer".to_string(), content: vec![ContentItem::InputText { text: "fresh permissions".to_string(), }], end_turn: None, phase: None, }, ResponseItem::Compaction { encrypted_content: "encrypted".to_string(), }, ]; assert_eq!(refreshed, expected); } }