diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 9372cab575..047404f385 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -810,6 +810,13 @@ impl Session { .await; } + // Seed usage info from the recorded rollout so UIs can show token counts + // immediately on resume/fork. + if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) { + let mut state = self.state.lock().await; + state.set_token_info(Some(info)); + } + // If persisting, persist all rollout items as-is (recorder filters) if persist && !rollout_items.is_empty() { self.persist_rollout_items(&rollout_items).await; @@ -820,6 +827,13 @@ impl Session { } } + fn last_token_info_from_rollout(rollout_items: &[RolloutItem]) -> Option { + rollout_items.iter().rev().find_map(|item| match item { + RolloutItem::EventMsg(EventMsg::TokenCount(ev)) => ev.info.clone(), + _ => None, + }) + } + pub(crate) async fn update_settings( &self, updates: SessionSettingsUpdate, @@ -2759,6 +2773,9 @@ mod tests { use crate::protocol::RateLimitSnapshot; use crate::protocol::RateLimitWindow; use crate::protocol::ResumedHistory; + use crate::protocol::TokenCountEvent; + use crate::protocol::TokenUsage; + use crate::protocol::TokenUsageInfo; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; @@ -2813,6 +2830,83 @@ mod tests { assert_eq!(expected, actual); } + #[tokio::test] + async fn record_initial_history_seeds_token_info_from_rollout() { + let (session, turn_context) = make_session_and_context().await; + let (mut rollout_items, _expected) = sample_rollout(&session, &turn_context); + + let info1 = TokenUsageInfo { + total_token_usage: TokenUsage { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 20, + reasoning_output_tokens: 0, + total_tokens: 30, + }, + last_token_usage: TokenUsage { + input_tokens: 3, + cached_input_tokens: 0, + output_tokens: 4, + reasoning_output_tokens: 0, + total_tokens: 7, + }, + model_context_window: Some(1_000), + }; + let info2 = TokenUsageInfo { + total_token_usage: TokenUsage { + input_tokens: 100, + cached_input_tokens: 50, + output_tokens: 200, + reasoning_output_tokens: 25, + total_tokens: 375, + }, + last_token_usage: TokenUsage { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 20, + reasoning_output_tokens: 5, + total_tokens: 35, + }, + model_context_window: Some(2_000), + }; + + rollout_items.push(RolloutItem::EventMsg(EventMsg::TokenCount( + TokenCountEvent { + info: Some(info1), + rate_limits: None, + }, + ))); + rollout_items.push(RolloutItem::EventMsg(EventMsg::TokenCount( + TokenCountEvent { + info: None, + rate_limits: None, + }, + ))); + rollout_items.push(RolloutItem::EventMsg(EventMsg::TokenCount( + TokenCountEvent { + info: Some(info2.clone()), + rate_limits: None, + }, + ))); + rollout_items.push(RolloutItem::EventMsg(EventMsg::TokenCount( + TokenCountEvent { + info: None, + rate_limits: None, + }, + ))); + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ConversationId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + let actual = session.state.lock().await.token_info(); + assert_eq!(actual, Some(info2)); + } + #[tokio::test] async fn record_initial_history_reconstructs_forked_transcript() { let (session, turn_context) = make_session_and_context().await; diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 9d60684b7a..a7e70ff234 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -184,6 +184,10 @@ async fn forward_events( id: _, msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_), } => {} + Event { + id: _, + msg: EventMsg::TokenCount(_), + } => {} Event { id: _, msg: EventMsg::SessionConfigured(_), diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 4f57330a28..c7556e3388 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -8,11 +8,14 @@ use codex_core::compact::SUMMARIZATION_PROMPT; use codex_core::compact::SUMMARY_PREFIX; use codex_core::config::Config; use codex_core::features::Feature; +use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_core::protocol::SandboxPolicy; use codex_core::protocol::WarningEvent; +use codex_protocol::config_types::ReasoningSummary; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::responses::ev_local_shell_call; @@ -1228,6 +1231,117 @@ async fn auto_compact_runs_after_token_limit_hit() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let limit = 200_000; + let over_limit_tokens = 250_000; + let remote_summary = "REMOTE_COMPACT_SUMMARY"; + + let compacted_history = vec![ + codex_protocol::models::ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![codex_protocol::models::ContentItem::OutputText { + text: remote_summary.to_string(), + }], + }, + codex_protocol::models::ResponseItem::Compaction { + encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), + }, + ]; + let compact_mock = + mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; + + let mut builder = test_codex().with_config(move |config| { + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(limit); + config.features.enable(Feature::RemoteCompaction); + }); + let initial = builder.build(&server).await.unwrap(); + let home = initial.home.clone(); + let rollout_path = initial.session_configured.rollout_path.clone(); + + // A single over-limit completion should not auto-compact until the next user message. + mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", over_limit_tokens), + ]), + ) + .await; + initial.submit_turn("OVER_LIMIT_TURN").await.unwrap(); + + assert!( + compact_mock.requests().is_empty(), + "remote compaction should not run before the next user message" + ); + + let mut resume_builder = test_codex().with_config(move |config| { + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(limit); + config.features.enable(Feature::RemoteCompaction); + }); + let resumed = resume_builder + .resume(&server, home, rollout_path) + .await + .unwrap(); + + let follow_up_user = "AFTER_RESUME_USER"; + let sse_follow_up = sse(vec![ + ev_assistant_message("m2", FINAL_REPLY), + ev_completed("r2"), + ]); + + let follow_up_matcher = move |req: &wiremock::Request| { + let body = std::str::from_utf8(&req.body).unwrap_or(""); + body.contains(follow_up_user) && body.contains(remote_summary) + }; + mount_sse_once_match(&server, follow_up_matcher, sse_follow_up).await; + + resumed + .codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: follow_up_user.into(), + }], + final_output_json_schema: None, + cwd: resumed.cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: resumed.session_configured.model.clone(), + effort: None, + summary: ReasoningSummary::Auto, + }) + .await + .unwrap(); + + wait_for_event(&resumed.codex, |event| { + matches!(event, EventMsg::ContextCompacted(_)) + }) + .await; + wait_for_event(&resumed.codex, |event| { + matches!(event, EventMsg::TaskComplete(_)) + }) + .await; + + let compact_requests = compact_mock.requests(); + assert_eq!( + compact_requests.len(), + 1, + "remote compaction should run once after resume" + ); + assert_eq!( + compact_requests[0].path(), + "/v1/responses/compact", + "remote compaction should hit the compact endpoint" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compact_persists_rollout_entries() { skip_if_no_network!(); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index cff2a8ad98..4896108629 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -844,7 +844,7 @@ pub struct TaskStartedEvent { pub model_context_window: Option, } -#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, TS)] +#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, JsonSchema, TS)] pub struct TokenUsage { #[ts(type = "number")] pub input_tokens: i64, @@ -858,7 +858,7 @@ pub struct TokenUsage { pub total_tokens: i64, } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] pub struct TokenUsageInfo { pub total_token_usage: TokenUsage, pub last_token_usage: TokenUsage,