Compare commits

...

1 Commits

Author SHA1 Message Date
Charles Cunningham
36d8c0676e core: make compact turns finish and interrupt promptly
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 15:18:50 -07:00
2 changed files with 80 additions and 7 deletions

View File

@@ -1,8 +1,10 @@
use std::future::Future;
use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::error::Result as CodexResult;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
@@ -11,6 +13,18 @@ use tokio_util::sync::CancellationToken;
#[derive(Clone, Copy, Default)]
pub(crate) struct CompactTask;
async fn await_compaction_or_cancellation<F>(
cancellation_token: CancellationToken,
compact_future: F,
) where
F: Future<Output = CodexResult<()>>,
{
tokio::select! {
_ = cancellation_token.cancelled() => {}
_ = compact_future => {}
}
}
#[async_trait]
impl SessionTask for CompactTask {
fn kind(&self) -> TaskKind {
@@ -26,24 +40,81 @@ impl SessionTask for CompactTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = session.services.session_telemetry.counter(
"codex.task.compact",
/*inc*/ 1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
await_compaction_or_cancellation(
cancellation_token,
crate::compact_remote::run_remote_compact_task(session.clone(), ctx),
)
.await;
} else {
let _ = session.services.session_telemetry.counter(
"codex.task.compact",
/*inc*/ 1,
&[("type", "local")],
);
crate::compact::run_compact_task(session.clone(), ctx, input).await
};
await_compaction_or_cancellation(
cancellation_token,
crate::compact::run_compact_task(session.clone(), ctx, input),
)
.await;
}
None
}
}
#[cfg(test)]
mod tests {
use std::future::pending;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use super::await_compaction_or_cancellation;
use crate::error::Result as CodexResult;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn await_compaction_or_cancellation_returns_when_future_finishes() {
let finished = Arc::new(Notify::new());
let finished_clone = Arc::clone(&finished);
let task = tokio::spawn(async move {
await_compaction_or_cancellation(CancellationToken::new(), async move {
finished_clone.notify_waiters();
Ok(())
})
.await;
});
tokio::time::timeout(Duration::from_secs(1), finished.notified())
.await
.expect("compaction future should be awaited");
task.await.expect("task should complete");
}
#[tokio::test]
async fn await_compaction_or_cancellation_returns_when_cancelled() {
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token();
let task = tokio::spawn(async move {
await_compaction_or_cancellation(child_token, pending::<CodexResult<()>>()).await;
});
cancellation_token.cancel();
tokio::time::timeout(Duration::from_secs(1), task)
.await
.expect("cancellation should unblock compaction waiting")
.expect("task should complete cleanly");
}
}

View File

@@ -199,12 +199,14 @@ impl Session {
)
.await;
let sess = session_ctx.clone_session();
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
// Emit completion uniformly from spawn site so all tasks share the same
// lifecycle. Do this before flushing rollout durability so the UI is not
// stuck in `Working` after the task's visible work already finished.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
}
sess.flush_rollout().await;
done_clone.notify_waiters();
}
.instrument(task_span),