Compare commits

...

4 Commits

Author SHA1 Message Date
Charles Cunningham
0e248815b2 Fix process_compacted_history TurnContext borrow 2026-02-06 13:12:03 -08:00
Charles Cunningham
15c838bf27 Merge remote-tracking branch 'origin/main' into fail-agent-loop-on-auto-compaction-failure 2026-02-06 13:11:02 -08:00
Charles Cunningham
14bb780e86 Propagate inline auto-compact failures and simplify local compaction 2026-02-06 12:32:02 -08:00
Charles Cunningham
fbaee85aa5 Fail agent loop eagerly on auto compaction failure 2026-02-06 12:08:16 -08:00
7 changed files with 333 additions and 40 deletions

View File

@@ -17,7 +17,7 @@ use crate::analytics_client::build_track_events_context;
use crate::compact;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::compact_remote::run_remote_compaction;
use crate::connectors;
use crate::exec_policy::ExecPolicyManager;
use crate::features::FEATURES;
@@ -3672,8 +3672,11 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
if total_usage_tokens >= auto_compact_limit
&& let Err(err) = run_auto_compact(&sess, &turn_context).await
{
emit_auto_compact_error(&sess, &turn_context, err).await;
return None;
}
let skills_outcome = Some(
@@ -3855,7 +3858,10 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
if let Err(err) = run_auto_compact(&sess, &turn_context).await {
emit_auto_compact_error(&sess, &turn_context, err).await;
break;
}
continue;
}
@@ -3913,14 +3919,27 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> crate::error::Result<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_remote_compaction(Arc::clone(sess), Arc::clone(turn_context)).await
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await
}
}
async fn emit_auto_compact_error(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
err: CodexErr,
) {
let event =
EventMsg::Error(err.to_error_event(Some("Error running auto compact task".to_string())));
sess.send_event(turn_context, event).await;
}
fn filter_connectors_for_input(
connectors: Vec<connectors::AppInfo>,
input: &[ResponseItem],

View File

@@ -39,7 +39,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
@@ -47,10 +47,10 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await;
run_local_compaction(sess, turn_context, input).await
}
pub(crate) async fn run_compact_task(
pub(crate) async fn run_user_requested_local_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
@@ -60,14 +60,17 @@ pub(crate) async fn run_compact_task(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
if let Err(err) = run_local_compaction(sess.clone(), turn_context.clone(), input).await {
let event = EventMsg::Error(err.to_error_event(None));
sess.send_event(&turn_context, event).await;
}
}
async fn run_compact_task_inner(
async fn run_local_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
@@ -142,7 +145,7 @@ async fn run_compact_task_inner(
break;
}
Err(CodexErr::Interrupted) => {
return;
return Ok(());
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
@@ -156,9 +159,7 @@ async fn run_compact_task_inner(
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
Err(e) => {
if retries < max_retries {
@@ -173,9 +174,7 @@ async fn run_compact_task_inner(
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
}
}
@@ -210,6 +209,7 @@ async fn run_compact_task_inner(
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(())
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {

View File

@@ -16,38 +16,32 @@ use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseItem;
use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
pub(crate) async fn run_user_requested_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
}
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await;
}
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
if let Err(err) = run_remote_compaction(Arc::clone(&sess), Arc::clone(&turn_context)).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
// This path is for manual `/compact`, where we report the error and keep the session
// alive. Auto-compact callers handle the error themselves to fail early.
sess.send_event(&turn_context, event).await;
}
}
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pub(crate) async fn run_remote_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
let base_instructions = sess.get_base_instructions().await;
@@ -91,14 +85,14 @@ async fn run_remote_compact_task_inner_impl(
)
.await?;
new_history = sess
.process_compacted_history(turn_context, new_history)
.process_compacted_history(turn_context.as_ref(), new_history)
.await;
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
sess.recompute_token_usage(turn_context).await;
sess.recompute_token_usage(&turn_context).await;
let compacted_item = CompactedItem {
message: String::new(),
@@ -107,7 +101,7 @@ async fn run_remote_compact_task_inner_impl(
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
sess.emit_turn_item_completed(turn_context, compaction_item)
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
Ok(())
}

View File

@@ -31,14 +31,14 @@ impl SessionTask for CompactTask {
1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session, ctx).await
crate::compact_remote::run_user_requested_remote_compact_task(session, ctx).await
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
crate::compact::run_compact_task(session, ctx, input).await
crate::compact::run_user_requested_local_compact_task(session, ctx, input).await
}
None

View File

@@ -816,6 +816,19 @@ pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Valu
response_mock
}
pub async fn mount_compact_response(
server: &MockServer,
response: ResponseTemplate,
max_times: u64,
) -> ResponseMock {
let (mock, response_mock) = compact_mock();
mock.respond_with(response)
.up_to_n_times(max_times)
.mount(server)
.await;
response_mock
}
pub async fn mount_models_once(server: &MockServer, body: ModelsResponse) -> ModelsMock {
let (mock, models_mock) = models_mock();
mock.respond_with(

View File

@@ -30,6 +30,7 @@ use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::mount_compact_json_once;
use core_test_support::responses::mount_response_once_match;
use core_test_support::responses::mount_response_sequence;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_once_match;
@@ -41,6 +42,8 @@ use core_test_support::responses::start_mock_server;
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_string_contains;
// --- Test helpers -----------------------------------------------------------
pub(super) const FIRST_REPLY: &str = "FIRST_REPLY";
@@ -1433,6 +1436,80 @@ async fn auto_compact_starts_after_turn_started() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_failure_aborts_turn_before_sampling_for_local_provider() {
skip_if_no_network!();
let server = start_mock_server().await;
let second_turn_text = "this turn should fail before sampling";
let first_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 1_000_000),
]);
let first_turn_mock = mount_sse_once(&server, first_turn).await;
let compact_failure_mock = mount_response_once_match(
&server,
body_string_contains(SUMMARIZATION_PROMPT),
ResponseTemplate::new(500)
.insert_header("content-type", "application/json")
.set_body_json(json!({
"error": {"type": "internal_server_error", "message": "synthetic compaction failure"}
})),
)
.await;
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
config.model_provider.stream_max_retries = Some(0);
});
let codex = builder.build(&server).await.unwrap().codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
let EventMsg::Error(error) =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(first_turn_mock.requests().len(), 1);
// The compact mock receives any unmatched `/responses` calls. Keeping this at 1
// verifies we only attempted the compact request and never proceeded to sampling.
assert_eq!(compact_failure_mock.requests().len(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
skip_if_no_network!();

View File

@@ -23,6 +23,8 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use wiremock::ResponseTemplate;
use wiremock::matchers::body_string_contains;
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
@@ -228,6 +230,194 @@ async fn remote_compact_runs_automatically() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_auto_compact_500_fails_turn_before_followup_sampling() -> Result<()> {
skip_if_no_network!(Ok(()));
let compact_retries = 2;
let second_turn_text = "this turn should fail before sampling";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.model_auto_compact_token_limit = Some(200);
config.model_provider.request_max_retries = Some(compact_retries);
config.model_provider.supports_websockets = false;
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_sampling = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "initial"),
responses::ev_completed_with_tokens("resp-1", 1_000_000),
]),
)
.await;
let follow_up_sampling = responses::mount_sse_once_match(
harness.server(),
body_string_contains(second_turn_text),
sse(vec![
responses::ev_assistant_message("m2", "should not run"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compact_failure = responses::mount_compact_response(
harness.server(),
ResponseTemplate::new(500)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"error": {"type": "internal_server_error", "message": "synthetic 500"}
})),
compact_retries + 1,
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let EventMsg::Error(error) =
wait_for_event(&codex, |event| matches!(event, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(initial_sampling.requests().len(), 1);
assert!(
follow_up_sampling.requests().is_empty(),
"follow-up sampling should never run when auto compact fails"
);
assert_eq!(
compact_failure.requests().len(),
usize::try_from(compact_retries + 1).expect("retry count fits usize"),
"retryable compact failures should use the request retry budget"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_auto_compact_context_error_does_not_retry_or_sample() -> Result<()> {
skip_if_no_network!(Ok(()));
let second_turn_text = "this turn should fail before sampling with context error";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
config.model_provider.request_max_retries = Some(4);
config.model_provider.supports_websockets = false;
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_sampling = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "initial"),
responses::ev_completed_with_tokens("resp-1", 1_000_000),
]),
)
.await;
let follow_up_sampling = responses::mount_sse_once_match(
harness.server(),
body_string_contains(second_turn_text),
sse(vec![
responses::ev_assistant_message("m2", "should not run"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compact_failure = responses::mount_compact_response(
harness.server(),
ResponseTemplate::new(400)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"error": {
"type": "context_length_exceeded",
"message": "Context window exceeded while compacting"
}
})),
10,
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "seed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_turn_text.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let EventMsg::Error(error) =
wait_for_event(&codex, |event| matches!(event, EventMsg::Error(_))).await
else {
panic!("expected error event");
};
assert!(
error.message.contains("Error running auto compact task"),
"expected auto-compact failure prefix in error message: {}",
error.message
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(initial_sampling.requests().len(), 1);
assert!(
follow_up_sampling.requests().is_empty(),
"follow-up sampling should never run when auto compact fails"
);
assert_eq!(
compact_failure.requests().len(),
1,
"non-retryable compact failures should not be retried"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {