diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index 6b07e2458c..e42bdb1755 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -2,28 +2,15 @@ use std::sync::Arc; use super::Session; use super::TurnContext; -use super::get_last_assistant_message_from_turn; -use crate::Prompt; -use crate::client_common::ResponseEvent; -use crate::error::CodexErr; -use crate::error::Result as CodexResult; -use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; -use crate::protocol::ErrorEvent; -use crate::protocol::EventMsg; -use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; use crate::truncate::truncate_middle; -use crate::util::backoff; use askama::Template; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; -use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; -use futures::prelude::*; -use tracing::error; pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; @@ -39,136 +26,54 @@ pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, ) { + persist_turn_context_rollout(&sess, &turn_context).await; + let input = vec![UserInput::Text { text: SUMMARIZATION_PROMPT.to_string(), }]; - run_compact_task_inner(sess, turn_context, input).await; -} -pub(crate) async fn run_compact_task( - sess: Arc, - turn_context: Arc, - input: Vec, -) -> Option { - let start_event = EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: turn_context.client.get_model_context_window(), - }); - sess.send_event(&turn_context, start_event).await; - run_compact_task_inner(sess.clone(), turn_context, input).await; - None -} + // Build forked history from parent to seed sub-agent. + let history_snapshot = sess.clone_history().await.get_history(); + let forked: Vec = history_snapshot + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); -async fn run_compact_task_inner( - sess: Arc, - turn_context: Arc, - input: Vec, -) { - let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - - let mut history = sess.clone_history().await; - history.record_items(&[initial_input_for_turn.into()]); - - let mut truncated_count = 0usize; - - let max_retries = turn_context.client.get_provider().stream_max_retries(); - let mut retries = 0; - - let rollout_item = RolloutItem::TurnContext(TurnContextItem { - cwd: turn_context.cwd.clone(), - approval_policy: turn_context.approval_policy, - sandbox_policy: turn_context.sandbox_policy.clone(), - model: turn_context.client.get_model(), - effort: turn_context.client.get_reasoning_effort(), - summary: turn_context.client.get_reasoning_summary(), - }); - sess.persist_rollout_items(&[rollout_item]).await; - - loop { - let turn_input = history.get_history_for_prompt(); - let prompt = Prompt { - input: turn_input.clone(), - ..Default::default() - }; - let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await; - - match attempt_result { - Ok(()) => { - if truncated_count > 0 { - sess.notify_background_event( - turn_context.as_ref(), - format!( - "Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window." - ), - ) - .await; - } + // Launch sub-agent one-shot; drain to completion and capture summary. + let config = turn_context.client.config().as_ref().clone(); + let cancel = tokio_util::sync::CancellationToken::new(); + if let Ok(io) = crate::codex_delegate::run_codex_conversation_one_shot_with( + config, + sess.services.auth_manager.clone(), + codex_protocol::protocol::InitialHistory::Forked(forked), + codex_protocol::protocol::SubAgentSource::Compact, + input, + Arc::clone(&sess), + Arc::clone(&turn_context), + cancel, + ) + .await + { + let mut summary_text: Option = None; + while let Ok(event) = io.next_event().await { + if let crate::protocol::EventMsg::TaskComplete(tc) = event.msg { + summary_text = tc.last_agent_message; break; } - Err(CodexErr::Interrupted) => { - return; - } - Err(e @ CodexErr::ContextWindowExceeded) => { - if turn_input.len() > 1 { - // Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact. - error!( - "Context window exceeded while compacting; removing oldest history item. Error: {e}" - ); - history.remove_first_item(); - truncated_count += 1; - retries = 0; - continue; - } - sess.set_total_tokens_full(turn_context.as_ref()).await; - let event = EventMsg::Error(ErrorEvent { - message: e.to_string(), - }); - sess.send_event(&turn_context, event).await; - return; - } - Err(e) => { - if retries < max_retries { - retries += 1; - let delay = backoff(retries); - sess.notify_stream_error( - turn_context.as_ref(), - format!("Reconnecting... {retries}/{max_retries}"), - ) - .await; - tokio::time::sleep(delay).await; - continue; - } else { - let event = EventMsg::Error(ErrorEvent { - message: e.to_string(), - }); - sess.send_event(&turn_context, event).await; - return; - } + if matches!(event.msg, crate::protocol::EventMsg::TurnAborted(_)) { + break; } } + if let Some(summary) = summary_text { + apply_compaction(sess, turn_context, &summary).await; + let event = + crate::protocol::EventMsg::AgentMessage(crate::protocol::AgentMessageEvent { + message: "Compact task completed".to_string(), + }); + sess.send_event(&Arc::clone(&turn_context), event).await; + } } - - let history_snapshot = sess.clone_history().await.get_history(); - let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default(); - let user_messages = collect_user_messages(&history_snapshot); - let initial_context = sess.build_initial_context(turn_context.as_ref()); - let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text); - let ghost_snapshots: Vec = history_snapshot - .iter() - .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) - .cloned() - .collect(); - new_history.extend(ghost_snapshots); - sess.replace_history(new_history).await; - - let rollout_item = RolloutItem::Compacted(CompactedItem { - message: summary_text.clone(), - }); - sess.persist_rollout_items(&[rollout_item]).await; - - let event = EventMsg::AgentMessage(AgentMessageEvent { - message: "Compact task completed".to_string(), - }); - sess.send_event(&turn_context, event).await; } pub fn content_items_to_text(content: &[ContentItem]) -> Option { @@ -249,36 +154,45 @@ fn build_compacted_history_with_limit( history } -async fn drain_to_completed( - sess: &Session, - turn_context: &TurnContext, - prompt: &Prompt, -) -> CodexResult<()> { - let mut stream = turn_context.client.clone().stream(prompt).await?; - loop { - let maybe_event = stream.next().await; - let Some(event) = maybe_event else { - return Err(CodexErr::Stream( - "stream closed before response.completed".into(), - None, - )); - }; - match event { - Ok(ResponseEvent::OutputItemDone(item)) => { - sess.record_into_history(std::slice::from_ref(&item)).await; - } - Ok(ResponseEvent::RateLimits(snapshot)) => { - sess.update_rate_limits(turn_context, snapshot).await; - } - Ok(ResponseEvent::Completed { token_usage, .. }) => { - sess.update_token_usage_info(turn_context, token_usage.as_ref()) - .await; - return Ok(()); - } - Ok(_) => continue, - Err(e) => return Err(e), - } - } +// streaming helpers removed; compact now uses the Codex delegate for sampling. + +/// Apply compaction to the parent session given a summary text: rebuild the +/// conversation with a bridge message, preserve ghost snapshots, persist the +/// Compacted rollout entry, and replace history. +pub(crate) async fn apply_compaction( + sess: Arc, + turn_context: Arc, + summary_text: &str, +) { + let history_snapshot = sess.clone_history().await.get_history(); + let user_messages = collect_user_messages(&history_snapshot); + let initial_context = sess.build_initial_context(turn_context.as_ref()); + let mut new_history = build_compacted_history(initial_context, &user_messages, summary_text); + let ghost_snapshots: Vec = history_snapshot + .iter() + .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) + .cloned() + .collect(); + new_history.extend(ghost_snapshots); + sess.replace_history(new_history).await; + + let rollout_item = RolloutItem::Compacted(CompactedItem { + message: summary_text.to_string(), + }); + sess.persist_rollout_items(&[rollout_item]).await; +} + +/// Persist a TurnContext rollout entry capturing the model/session context. +pub(crate) async fn persist_turn_context_rollout(sess: &Session, turn_context: &TurnContext) { + let rollout_item = RolloutItem::TurnContext(TurnContextItem { + cwd: turn_context.cwd.clone(), + approval_policy: turn_context.approval_policy, + sandbox_policy: turn_context.sandbox_policy.clone(), + model: turn_context.client.get_model(), + effort: turn_context.client.get_reasoning_effort(), + summary: turn_context.client.get_reasoning_summary(), + }); + sess.persist_rollout_items(&[rollout_item]).await; } #[cfg(test)] diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index b123a8a3ff..a935273015 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -36,6 +36,28 @@ pub(crate) async fn run_codex_conversation_interactive( parent_session: Arc, parent_ctx: Arc, cancel_token: CancellationToken, +) -> Result { + run_codex_conversation_interactive_with( + config, + auth_manager, + InitialHistory::New, + SubAgentSource::Review, + parent_session, + parent_ctx, + cancel_token, + ) + .await +} + +/// Start an interactive sub-Codex conversation with custom initial history and source. +pub(crate) async fn run_codex_conversation_interactive_with( + config: Config, + auth_manager: Arc, + initial_history: InitialHistory, + sub_source: SubAgentSource, + parent_session: Arc, + parent_ctx: Arc, + cancel_token: CancellationToken, ) -> Result { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); @@ -43,8 +65,8 @@ pub(crate) async fn run_codex_conversation_interactive( let CodexSpawnOk { codex, .. } = Codex::spawn( config, auth_manager, - InitialHistory::New, - SessionSource::SubAgent(SubAgentSource::Review), + initial_history, + SessionSource::SubAgent(sub_source), ) .await?; let codex = Arc::new(codex); @@ -93,13 +115,39 @@ pub(crate) async fn run_codex_conversation_one_shot( parent_session: Arc, parent_ctx: Arc, cancel_token: CancellationToken, +) -> Result { + run_codex_conversation_one_shot_with( + config, + auth_manager, + InitialHistory::New, + SubAgentSource::Review, + input, + parent_session, + parent_ctx, + cancel_token, + ) + .await +} + +/// One-shot variant with custom initial history and source. +pub(crate) async fn run_codex_conversation_one_shot_with( + config: Config, + auth_manager: Arc, + initial_history: InitialHistory, + sub_source: SubAgentSource, + input: Vec, + parent_session: Arc, + parent_ctx: Arc, + cancel_token: CancellationToken, ) -> Result { // Use a child token so we can stop the delegate after completion without // requiring the caller to cancel the parent token. let child_cancel = cancel_token.child_token(); - let io = run_codex_conversation_interactive( + let io = run_codex_conversation_interactive_with( config, auth_manager, + initial_history, + sub_source, parent_session, parent_ctx, child_cancel.clone(), diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 64b2a9d2b4..04669e1e9e 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -5,8 +5,13 @@ use tokio_util::sync::CancellationToken; use crate::codex::TurnContext; use crate::codex::compact; +use crate::codex_delegate::run_codex_conversation_one_shot_with; +use crate::protocol::EventMsg; +use crate::protocol::SubAgentSource; use crate::state::TaskKind; +use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; +use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; @@ -25,8 +30,68 @@ impl SessionTask for CompactTask { session: Arc, ctx: Arc, input: Vec, - _cancellation_token: CancellationToken, + cancellation_token: CancellationToken, ) -> Option { - compact::run_compact_task(session.clone_session(), ctx, input).await + // Persist a TurnContext entry in the parent rollout so manual compact + // still appears as a separate API turn in rollout, matching prior behavior. + crate::codex::compact::persist_turn_context_rollout( + session.clone_session().as_ref(), + ctx.as_ref(), + ) + .await; + + // Build initial forked history from parent so the sub-agent sees the + // same context without mutating the parent transcript. + let parent_history: Vec = + session.clone_session().clone_history().await.get_history(); + let forked: Vec = parent_history + .into_iter() + .map(RolloutItem::ResponseItem) + .collect(); + + // Start sub-agent one-shot conversation for summarization. + let config = ctx.client.config().as_ref().clone(); + let io = run_codex_conversation_one_shot_with( + config, + session.auth_manager(), + codex_protocol::protocol::InitialHistory::Forked(forked), + SubAgentSource::Compact, + input, + session.clone_session(), + ctx.clone(), + cancellation_token.clone(), + ) + .await; + + // Drain events and capture last_agent_message from TaskComplete. + let mut summary_text: Option = None; + if let Ok(io) = io { + while let Ok(event) = io.next_event().await { + match event.msg { + EventMsg::TaskComplete(done) => { + summary_text = done.last_agent_message; + break; + } + EventMsg::TurnAborted(_) => break, + _ => {} + } + } + } + + // Apply compaction into the parent session if we captured a summary. + if let Some(summary_text) = summary_text { + compact::apply_compaction(session.clone_session(), ctx.clone(), &summary_text).await; + // Inform users that compaction finished. + session + .clone_session() + .send_event( + ctx.as_ref(), + EventMsg::AgentMessage(crate::protocol::AgentMessageEvent { + message: "Compact task completed".to_string(), + }), + ) + .await; + } + None } } diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index b13c6e14fd..d4cdeb0b66 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -146,7 +146,19 @@ async fn compact_resume_and_fork_preserve_model_history_view() { .unwrap_or_default() .to_string(); let tool_calls = json!(requests[0]["tools"].as_array()); - let prompt_cache_key = requests[0]["prompt_cache_key"] + let prompt_cache_key_first = requests[0]["prompt_cache_key"] + .as_str() + .unwrap_or_default() + .to_string(); + let prompt_cache_key_compact = requests[1]["prompt_cache_key"] + .as_str() + .unwrap_or_default() + .to_string(); + let prompt_cache_key_after_compact = requests[2]["prompt_cache_key"] + .as_str() + .unwrap_or_default() + .to_string(); + let prompt_cache_key_after_resume = requests[3]["prompt_cache_key"] .as_str() .unwrap_or_default() .to_string(); @@ -202,7 +214,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { "include": [ "reasoning.encrypted_content" ], - "prompt_cache_key": prompt_cache_key + "prompt_cache_key": prompt_cache_key_first }); let compact_1 = json!( { @@ -271,7 +283,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { "include": [ "reasoning.encrypted_content" ], - "prompt_cache_key": prompt_cache_key + "prompt_cache_key": prompt_cache_key_compact }); let user_turn_2_after_compact = json!( { @@ -336,7 +348,7 @@ SUMMARY_ONLY_CONTEXT" "include": [ "reasoning.encrypted_content" ], - "prompt_cache_key": prompt_cache_key + "prompt_cache_key": prompt_cache_key_after_compact }); let usert_turn_3_after_resume = json!( { @@ -421,7 +433,7 @@ SUMMARY_ONLY_CONTEXT" "include": [ "reasoning.encrypted_content" ], - "prompt_cache_key": prompt_cache_key + "prompt_cache_key": prompt_cache_key_after_resume }); let user_turn_3_after_fork = json!( {