This commit is contained in:
Ahmed Ibrahim
2025-10-29 17:59:31 -07:00
parent bf35105af6
commit 0af13d5c6a
4 changed files with 212 additions and 173 deletions

View File

@@ -2,28 +2,15 @@ use std::sync::Arc;
use super::Session;
use super::TurnContext;
use super::get_last_assistant_message_from_turn;
use crate::Prompt;
use crate::client_common::ResponseEvent;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
use crate::truncate::truncate_middle;
use crate::util::backoff;
use askama::Template;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;
use tracing::error;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
@@ -39,136 +26,54 @@ pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
persist_turn_context_rollout(&sess, &turn_context).await;
let input = vec![UserInput::Text {
text: SUMMARIZATION_PROMPT.to_string(),
}];
run_compact_task_inner(sess, turn_context, input).await;
}
pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
None
}
// Build forked history from parent to seed sub-agent.
let history_snapshot = sess.clone_history().await.get_history();
let forked: Vec<RolloutItem> = history_snapshot
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
history.record_items(&[initial_input_for_turn.into()]);
let mut truncated_count = 0usize;
let max_retries = turn_context.client.get_provider().stream_max_retries();
let mut retries = 0;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.client.get_model(),
effort: turn_context.client.get_reasoning_effort(),
summary: turn_context.client.get_reasoning_summary(),
});
sess.persist_rollout_items(&[rollout_item]).await;
loop {
let turn_input = history.get_history_for_prompt();
let prompt = Prompt {
input: turn_input.clone(),
..Default::default()
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
match attempt_result {
Ok(()) => {
if truncated_count > 0 {
sess.notify_background_event(
turn_context.as_ref(),
format!(
"Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window."
),
)
.await;
}
// Launch sub-agent one-shot; drain to completion and capture summary.
let config = turn_context.client.config().as_ref().clone();
let cancel = tokio_util::sync::CancellationToken::new();
if let Ok(io) = crate::codex_delegate::run_codex_conversation_one_shot_with(
config,
sess.services.auth_manager.clone(),
codex_protocol::protocol::InitialHistory::Forked(forked),
codex_protocol::protocol::SubAgentSource::Compact,
input,
Arc::clone(&sess),
Arc::clone(&turn_context),
cancel,
)
.await
{
let mut summary_text: Option<String> = None;
while let Ok(event) = io.next_event().await {
if let crate::protocol::EventMsg::TaskComplete(tc) = event.msg {
summary_text = tc.last_agent_message;
break;
}
Err(CodexErr::Interrupted) => {
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
truncated_count += 1;
retries = 0;
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(ErrorEvent {
message: e.to_string(),
});
sess.send_event(&turn_context, event).await;
return;
}
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(ErrorEvent {
message: e.to_string(),
});
sess.send_event(&turn_context, event).await;
return;
}
if matches!(event.msg, crate::protocol::EventMsg::TurnAborted(_)) {
break;
}
}
if let Some(summary) = summary_text {
apply_compaction(sess, turn_context, &summary).await;
let event =
crate::protocol::EventMsg::AgentMessage(crate::protocol::AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&Arc::clone(&turn_context), event).await;
}
}
let history_snapshot = sess.clone_history().await.get_history();
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
});
sess.persist_rollout_items(&[rollout_item]).await;
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&turn_context, event).await;
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
@@ -249,36 +154,45 @@ fn build_compacted_history_with_limit(
history
}
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
prompt: &Prompt,
) -> CodexResult<()> {
let mut stream = turn_context.client.clone().stream(prompt).await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
sess.record_into_history(std::slice::from_ref(&item)).await;
}
Ok(ResponseEvent::RateLimits(snapshot)) => {
sess.update_rate_limits(turn_context, snapshot).await;
}
Ok(ResponseEvent::Completed { token_usage, .. }) => {
sess.update_token_usage_info(turn_context, token_usage.as_ref())
.await;
return Ok(());
}
Ok(_) => continue,
Err(e) => return Err(e),
}
}
// streaming helpers removed; compact now uses the Codex delegate for sampling.
/// Apply compaction to the parent session given a summary text: rebuild the
/// conversation with a bridge message, preserve ghost snapshots, persist the
/// Compacted rollout entry, and replace history.
pub(crate) async fn apply_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
summary_text: &str,
) {
let history_snapshot = sess.clone_history().await.get_history();
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let mut new_history = build_compacted_history(initial_context, &user_messages, summary_text);
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.to_string(),
});
sess.persist_rollout_items(&[rollout_item]).await;
}
/// Persist a TurnContext rollout entry capturing the model/session context.
pub(crate) async fn persist_turn_context_rollout(sess: &Session, turn_context: &TurnContext) {
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.client.get_model(),
effort: turn_context.client.get_reasoning_effort(),
summary: turn_context.client.get_reasoning_summary(),
});
sess.persist_rollout_items(&[rollout_item]).await;
}
#[cfg(test)]

View File

@@ -36,6 +36,28 @@ pub(crate) async fn run_codex_conversation_interactive(
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_interactive_with(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// Start an interactive sub-Codex conversation with custom initial history and source.
pub(crate) async fn run_codex_conversation_interactive_with(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -43,8 +65,8 @@ pub(crate) async fn run_codex_conversation_interactive(
let CodexSpawnOk { codex, .. } = Codex::spawn(
config,
auth_manager,
InitialHistory::New,
SessionSource::SubAgent(SubAgentSource::Review),
initial_history,
SessionSource::SubAgent(sub_source),
)
.await?;
let codex = Arc::new(codex);
@@ -93,13 +115,39 @@ pub(crate) async fn run_codex_conversation_one_shot(
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_one_shot_with(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
input,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// One-shot variant with custom initial history and source.
pub(crate) async fn run_codex_conversation_one_shot_with(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
input: Vec<UserInput>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
// Use a child token so we can stop the delegate after completion without
// requiring the caller to cancel the parent token.
let child_cancel = cancel_token.child_token();
let io = run_codex_conversation_interactive(
let io = run_codex_conversation_interactive_with(
config,
auth_manager,
initial_history,
sub_source,
parent_session,
parent_ctx,
child_cancel.clone(),

View File

@@ -5,8 +5,13 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::compact;
use crate::codex_delegate::run_codex_conversation_one_shot_with;
use crate::protocol::EventMsg;
use crate::protocol::SubAgentSource;
use crate::state::TaskKind;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
@@ -25,8 +30,68 @@ impl SessionTask for CompactTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
cancellation_token: CancellationToken,
) -> Option<String> {
compact::run_compact_task(session.clone_session(), ctx, input).await
// Persist a TurnContext entry in the parent rollout so manual compact
// still appears as a separate API turn in rollout, matching prior behavior.
crate::codex::compact::persist_turn_context_rollout(
session.clone_session().as_ref(),
ctx.as_ref(),
)
.await;
// Build initial forked history from parent so the sub-agent sees the
// same context without mutating the parent transcript.
let parent_history: Vec<ResponseItem> =
session.clone_session().clone_history().await.get_history();
let forked: Vec<RolloutItem> = parent_history
.into_iter()
.map(RolloutItem::ResponseItem)
.collect();
// Start sub-agent one-shot conversation for summarization.
let config = ctx.client.config().as_ref().clone();
let io = run_codex_conversation_one_shot_with(
config,
session.auth_manager(),
codex_protocol::protocol::InitialHistory::Forked(forked),
SubAgentSource::Compact,
input,
session.clone_session(),
ctx.clone(),
cancellation_token.clone(),
)
.await;
// Drain events and capture last_agent_message from TaskComplete.
let mut summary_text: Option<String> = None;
if let Ok(io) = io {
while let Ok(event) = io.next_event().await {
match event.msg {
EventMsg::TaskComplete(done) => {
summary_text = done.last_agent_message;
break;
}
EventMsg::TurnAborted(_) => break,
_ => {}
}
}
}
// Apply compaction into the parent session if we captured a summary.
if let Some(summary_text) = summary_text {
compact::apply_compaction(session.clone_session(), ctx.clone(), &summary_text).await;
// Inform users that compaction finished.
session
.clone_session()
.send_event(
ctx.as_ref(),
EventMsg::AgentMessage(crate::protocol::AgentMessageEvent {
message: "Compact task completed".to_string(),
}),
)
.await;
}
None
}
}

View File

@@ -146,7 +146,19 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
.unwrap_or_default()
.to_string();
let tool_calls = json!(requests[0]["tools"].as_array());
let prompt_cache_key = requests[0]["prompt_cache_key"]
let prompt_cache_key_first = requests[0]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let prompt_cache_key_compact = requests[1]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let prompt_cache_key_after_compact = requests[2]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let prompt_cache_key_after_resume = requests[3]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
@@ -202,7 +214,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": prompt_cache_key_first
});
let compact_1 = json!(
{
@@ -271,7 +283,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": prompt_cache_key_compact
});
let user_turn_2_after_compact = json!(
{
@@ -336,7 +348,7 @@ SUMMARY_ONLY_CONTEXT"
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": prompt_cache_key_after_compact
});
let usert_turn_3_after_resume = json!(
{
@@ -421,7 +433,7 @@ SUMMARY_ONLY_CONTEXT"
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": prompt_cache_key_after_resume
});
let user_turn_3_after_fork = json!(
{