Compare commits

...

2 Commits

Author SHA1 Message Date
rhan-oai
53520c23e5 Merge branch 'main' into pr17155 2026-04-08 14:17:58 -07:00
rhan-oai
9ca7a71d05 [codex-analytics] add compaction analytics event 2026-04-08 13:10:14 -07:00
9 changed files with 565 additions and 10 deletions

View File

@@ -3,6 +3,7 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
@@ -18,6 +19,10 @@ use crate::facts::AnalyticsFact;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CompactionMode;
use crate::facts::CompactionStatus;
use crate::facts::CompactionTrigger;
use crate::facts::CustomAnalyticsFact;
use crate::facts::InvocationType;
use crate::facts::PluginState;
@@ -254,6 +259,86 @@ fn app_used_event_serializes_expected_shape() {
);
}
#[test]
fn compaction_event_serializes_expected_shape() {
let event = TrackEventRequest::Compaction(Box::new(CodexCompactionEventRequest {
event_type: "codex_compaction_event",
event_params: crate::events::codex_compaction_event_params(
CodexAppServerClientMetadata {
product_client_id: "CODEX_CLI".to_string(),
client_name: Some("codex-tui".to_string()),
client_version: Some("1.2.3".to_string()),
rpc_transport: AppServerRpcTransport::InProcess,
experimental_api_enabled: Some(true),
},
CodexRuntimeMetadata {
codex_rs_version: "0.1.0".to_string(),
runtime_os: "macos".to_string(),
runtime_os_version: "15.3.1".to_string(),
runtime_arch: "aarch64".to_string(),
},
CodexCompactionEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
trigger: CompactionTrigger::AutoMidTurn,
mode: CompactionMode::Remote,
status: CompactionStatus::Completed,
error: None,
started_at: 100,
completed_at: 106,
duration_ms: Some(6543),
input_tokens_before: Some(1000),
input_tokens_after: Some(600),
estimated_tokens_before: Some(1500),
estimated_tokens_after: Some(900),
history_items_before: 50,
history_items_after: 12,
deleted_items_before_remote_compact: Some(3),
},
),
}));
let payload = serde_json::to_value(&event).expect("serialize compaction event");
assert_eq!(
payload,
json!({
"event_type": "codex_compaction_event",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"app_server_client": {
"product_client_id": "CODEX_CLI",
"client_name": "codex-tui",
"client_version": "1.2.3",
"rpc_transport": "in_process",
"experimental_api_enabled": true
},
"runtime": {
"codex_rs_version": "0.1.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"trigger": "auto_mid_turn",
"mode": "remote",
"status": "completed",
"error": null,
"started_at": 100,
"completed_at": 106,
"duration_ms": 6543,
"input_tokens_before": 1000,
"input_tokens_after": 600,
"estimated_tokens_before": 1500,
"estimated_tokens_after": 900,
"history_items_before": 50,
"history_items_after": 12,
"deleted_items_before_remote_compact": 3
}
})
);
}
#[test]
fn app_used_dedupe_is_keyed_by_turn_and_connector() {
let (sender, _receiver) = mpsc::channel(1);
@@ -449,6 +534,99 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
}
#[tokio::test]
async fn compaction_event_waits_for_thread_connection_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
mode: CompactionMode::Local,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
input_tokens_before: Some(2000),
input_tokens_after: Some(2000),
estimated_tokens_before: Some(2500),
estimated_tokens_after: Some(2500),
history_items_before: 20,
history_items_after: 20,
deleted_items_before_remote_compact: None,
},
))),
&mut events,
)
.await;
assert!(
events.is_empty(),
"compaction events should wait for client/runtime metadata"
);
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
product_client_id: DEFAULT_ORIGINATOR.to_string(),
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "linux".to_string(),
runtime_os_version: "24.04".to_string(),
runtime_arch: "x86_64".to_string(),
},
rpc_transport: AppServerRpcTransport::Websocket,
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ false, "gpt-5",
)),
},
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 2);
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");
assert_eq!(payload[1]["event_type"], "codex_compaction_event");
assert_eq!(payload[1]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[1]["event_params"]["turn_id"], "turn-compact");
assert_eq!(payload[1]["event_params"]["trigger"], "manual");
assert_eq!(payload[1]["event_params"]["mode"], "local");
assert_eq!(payload[1]["event_params"]["status"], "failed");
assert_eq!(
payload[1]["event_params"]["app_server_client"]["product_client_id"],
DEFAULT_ORIGINATOR
);
assert_eq!(
payload[1]["event_params"]["runtime"]["codex_rs_version"],
"0.99.0"
);
}
#[test]
fn subagent_thread_started_review_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(

View File

@@ -178,6 +178,12 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
Box::new(event),
)));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {

View File

@@ -1,4 +1,5 @@
use crate::facts::AppInvocation;
use crate::facts::CodexCompactionEvent;
use crate::facts::InvocationType;
use crate::facts::PluginState;
use crate::facts::SubAgentThreadStartedInput;
@@ -37,6 +38,7 @@ pub(crate) enum TrackEventRequest {
ThreadInitialized(ThreadInitializedEvent),
AppMentioned(CodexAppMentionedEventRequest),
AppUsed(CodexAppUsedEventRequest),
Compaction(Box<CodexCompactionEventRequest>),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
@@ -122,6 +124,34 @@ pub(crate) struct CodexAppUsedEventRequest {
pub(crate) event_params: CodexAppMetadata,
}
#[derive(Serialize)]
pub(crate) struct CodexCompactionEventParams {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) trigger: crate::facts::CompactionTrigger,
pub(crate) mode: crate::facts::CompactionMode,
pub(crate) status: crate::facts::CompactionStatus,
pub(crate) error: Option<String>,
pub(crate) started_at: u64,
pub(crate) completed_at: u64,
pub(crate) duration_ms: Option<u64>,
pub(crate) input_tokens_before: Option<i64>,
pub(crate) input_tokens_after: Option<i64>,
pub(crate) estimated_tokens_before: Option<i64>,
pub(crate) estimated_tokens_after: Option<i64>,
pub(crate) history_items_before: usize,
pub(crate) history_items_after: usize,
pub(crate) deleted_items_before_remote_compact: Option<usize>,
}
#[derive(Serialize)]
pub(crate) struct CodexCompactionEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCompactionEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexPluginMetadata {
pub(crate) plugin_id: Option<String>,
@@ -201,6 +231,33 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu
}
}
pub(crate) fn codex_compaction_event_params(
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
input: CodexCompactionEvent,
) -> CodexCompactionEventParams {
CodexCompactionEventParams {
thread_id: input.thread_id,
turn_id: input.turn_id,
app_server_client,
runtime,
trigger: input.trigger,
mode: input.mode,
status: input.status,
error: input.error,
started_at: input.started_at,
completed_at: input.completed_at,
duration_ms: input.duration_ms,
input_tokens_before: input.input_tokens_before,
input_tokens_after: input.input_tokens_after,
estimated_tokens_before: input.estimated_tokens_before,
estimated_tokens_after: input.estimated_tokens_after,
history_items_before: input.history_items_before,
history_items_after: input.history_items_after,
deleted_items_before_remote_compact: input.deleted_items_before_remote_compact,
}
}
pub(crate) fn codex_plugin_used_metadata(
tracking: &TrackEventsContext,
plugin: PluginTelemetryMetadata,

View File

@@ -63,6 +63,50 @@ pub struct SubAgentThreadStartedInput {
pub created_at: u64,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionTrigger {
Manual,
AutoPreTurn,
AutoMidTurn,
ModelDownshift,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionMode {
Local,
Remote,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionStatus {
Completed,
Failed,
Interrupted,
}
#[derive(Clone)]
pub struct CodexCompactionEvent {
pub thread_id: String,
pub turn_id: String,
pub trigger: CompactionTrigger,
pub mode: CompactionMode,
pub status: CompactionStatus,
pub error: Option<String>,
pub started_at: u64,
pub completed_at: u64,
pub duration_ms: Option<u64>,
pub input_tokens_before: Option<i64>,
pub input_tokens_after: Option<i64>,
pub estimated_tokens_before: Option<i64>,
pub estimated_tokens_after: Option<i64>,
pub history_items_before: usize,
pub history_items_after: usize,
pub deleted_items_before_remote_compact: Option<usize>,
}
#[allow(dead_code)]
pub(crate) enum AnalyticsFact {
Initialize {
@@ -89,6 +133,7 @@ pub(crate) enum AnalyticsFact {
pub(crate) enum CustomAnalyticsFact {
SubAgentThreadStarted(SubAgentThreadStartedInput),
Compaction(Box<CodexCompactionEvent>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),

View File

@@ -6,6 +6,10 @@ mod reducer;
pub use client::AnalyticsEventsClient;
pub use events::AppServerRpcTransport;
pub use facts::AppInvocation;
pub use facts::CodexCompactionEvent;
pub use facts::CompactionMode;
pub use facts::CompactionStatus;
pub use facts::CompactionTrigger;
pub use facts::InvocationType;
pub use facts::SkillInvocation;
pub use facts::SubAgentThreadStartedInput;

View File

@@ -2,6 +2,7 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
@@ -12,6 +13,7 @@ use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_compaction_event_params;
use crate::events::codex_plugin_metadata;
use crate::events::codex_plugin_used_metadata;
use crate::events::plugin_state_event_type;
@@ -20,6 +22,7 @@ use crate::events::thread_source_name;
use crate::facts::AnalyticsFact;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CustomAnalyticsFact;
use crate::facts::PluginState;
use crate::facts::PluginStateChangedInput;
@@ -40,6 +43,8 @@ use std::path::Path;
#[derive(Default)]
pub(crate) struct AnalyticsReducer {
connections: HashMap<u64, ConnectionState>,
thread_connections: HashMap<String, u64>,
pending_compactions: HashMap<String, Vec<CodexCompactionEvent>>,
}
struct ConnectionState {
@@ -81,6 +86,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
CustomAnalyticsFact::Compaction(input) => {
self.ingest_compaction(*input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
@@ -254,6 +262,9 @@ impl AnalyticsReducer {
_ => return,
};
let thread_source: SessionSource = thread.source.into();
let thread_id = thread.id;
self.thread_connections
.insert(thread_id.clone(), connection_id);
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
@@ -261,7 +272,7 @@ impl AnalyticsReducer {
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id: thread.id,
thread_id: thread_id.clone(),
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
model,
@@ -274,6 +285,65 @@ impl AnalyticsReducer {
},
},
));
self.maybe_emit_pending_compactions(&thread_id, out);
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let thread_id = input.thread_id.clone();
if self.try_emit_compaction(input.clone(), out) {
return;
}
self.pending_compactions
.entry(thread_id)
.or_default()
.push(input);
}
fn maybe_emit_pending_compactions(
&mut self,
thread_id: &str,
out: &mut Vec<TrackEventRequest>,
) {
let Some(pending) = self.pending_compactions.remove(thread_id) else {
return;
};
let mut still_pending = Vec::new();
for input in pending {
if !self.try_emit_compaction(input.clone(), out) {
still_pending.push(input);
}
}
if !still_pending.is_empty() {
self.pending_compactions
.insert(thread_id.to_string(), still_pending);
}
}
fn try_emit_compaction(
&mut self,
input: CodexCompactionEvent,
out: &mut Vec<TrackEventRequest>,
) -> bool {
let connection_metadata = self
.thread_connections
.get(&input.thread_id)
.and_then(|connection_id| self.connections.get(connection_id))
.map(|connection_state| {
(
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
)
});
let Some((app_server_client, runtime)) = connection_metadata else {
return false;
};
out.push(TrackEventRequest::Compaction(Box::new(
CodexCompactionEventRequest {
event_type: "codex_compaction_event",
event_params: codex_compaction_event_params(app_server_client, runtime, input),
},
)));
true
}
}

View File

@@ -50,6 +50,7 @@ use chrono::Local;
use chrono::Utc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppInvocation;
use codex_analytics::CompactionTrigger;
use codex_analytics::InvocationType;
use codex_analytics::SubAgentThreadStartedInput;
use codex_analytics::build_track_events_context;
@@ -6214,6 +6215,7 @@ pub(crate) async fn run_turn(
&sess,
&turn_context,
InitialContextInjection::BeforeLastUserMessage,
CompactionTrigger::AutoMidTurn,
)
.await
.is_err()
@@ -6396,7 +6398,13 @@ async fn run_pre_sampling_compact(
.unwrap_or(i64::MAX);
// Compact if the total usage tokens are greater than the auto compact limit
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
run_auto_compact(
sess,
turn_context,
InitialContextInjection::DoNotInject,
CompactionTrigger::AutoPreTurn,
)
.await?;
pre_sampling_compacted = true;
}
Ok(pre_sampling_compacted)
@@ -6440,6 +6448,7 @@ async fn maybe_run_previous_model_inline_compact(
sess,
&previous_model_turn_context,
InitialContextInjection::DoNotInject,
CompactionTrigger::ModelDownshift,
)
.await?;
return Ok(true);
@@ -6451,12 +6460,14 @@ async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
initial_context_injection,
trigger,
)
.await?;
} else {
@@ -6464,6 +6475,7 @@ async fn run_auto_compact(
Arc::clone(sess),
Arc::clone(turn_context),
initial_context_injection,
trigger,
)
.await?;
}

View File

@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use crate::Prompt;
use crate::client::ModelClientSession;
@@ -9,6 +12,11 @@ use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
use crate::util::backoff;
use codex_analytics::CodexCompactionEvent;
use codex_analytics::CompactionMode;
use codex_analytics::CompactionStatus;
use codex_analytics::CompactionTrigger;
use codex_features::Feature;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
@@ -55,6 +63,7 @@ pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
@@ -63,7 +72,14 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?;
run_compact_task_inner(
sess,
turn_context,
input,
initial_context_injection,
trigger,
)
.await?;
Ok(())
}
@@ -84,6 +100,7 @@ pub(crate) async fn run_compact_task(
turn_context,
input,
InitialContextInjection::DoNotInject,
CompactionTrigger::Manual,
)
.await
}
@@ -93,6 +110,39 @@ async fn run_compact_task_inner(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
) -> CodexResult<()> {
let attempt = CompactionAnalyticsAttempt::begin(
sess.as_ref(),
turn_context.as_ref(),
trigger,
CompactionMode::Local,
)
.await;
let result = run_compact_task_inner_impl(
Arc::clone(&sess),
Arc::clone(&turn_context),
input,
initial_context_injection,
)
.await;
attempt
.track(
sess.as_ref(),
turn_context.as_ref(),
compaction_status_from_result(&result),
result.as_ref().err().map(ToString::to_string),
/*deleted_items_before_remote_compact*/ None,
)
.await;
result
}
async fn run_compact_task_inner_impl(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
@@ -233,6 +283,110 @@ async fn run_compact_task_inner(
Ok(())
}
pub(crate) struct CompactionAnalyticsAttempt {
enabled: bool,
thread_id: String,
turn_id: String,
trigger: CompactionTrigger,
mode: CompactionMode,
started_at: u64,
start_instant: Instant,
before: CompactionAnalyticsSnapshot,
}
struct CompactionAnalyticsSnapshot {
input_tokens: Option<i64>,
estimated_tokens: Option<i64>,
history_items: usize,
}
impl CompactionAnalyticsAttempt {
pub(crate) async fn begin(
sess: &Session,
turn_context: &TurnContext,
trigger: CompactionTrigger,
mode: CompactionMode,
) -> Self {
Self {
enabled: sess.enabled(Feature::GeneralAnalytics),
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
trigger,
mode,
started_at: now_unix_seconds(),
start_instant: Instant::now(),
before: CompactionAnalyticsSnapshot::capture(sess, turn_context).await,
}
}
pub(crate) async fn track(
self,
sess: &Session,
turn_context: &TurnContext,
status: CompactionStatus,
error: Option<String>,
deleted_items_before_remote_compact: Option<usize>,
) {
if !self.enabled {
return;
}
let after = CompactionAnalyticsSnapshot::capture(sess, turn_context).await;
sess.services
.analytics_events_client
.track_compaction(CodexCompactionEvent {
thread_id: self.thread_id,
turn_id: self.turn_id,
trigger: self.trigger,
mode: self.mode,
status,
error,
started_at: self.started_at,
completed_at: now_unix_seconds(),
duration_ms: Some(
u64::try_from(self.start_instant.elapsed().as_millis()).unwrap_or(u64::MAX),
),
input_tokens_before: self.before.input_tokens,
input_tokens_after: after.input_tokens,
estimated_tokens_before: self.before.estimated_tokens,
estimated_tokens_after: after.estimated_tokens,
history_items_before: self.before.history_items,
history_items_after: after.history_items,
deleted_items_before_remote_compact,
});
}
}
impl CompactionAnalyticsSnapshot {
async fn capture(sess: &Session, turn_context: &TurnContext) -> Self {
let input_tokens = sess
.total_token_usage()
.await
.map(|usage| usage.input_tokens);
let estimated_tokens = sess.get_estimated_token_count(turn_context).await;
let history_items = sess.clone_history().await.raw_items().len();
Self {
input_tokens,
estimated_tokens,
history_items,
}
}
}
pub(crate) fn compaction_status_from_result<T>(result: &CodexResult<T>) -> CompactionStatus {
match result {
Ok(_) => CompactionStatus::Completed,
Err(CodexErr::Interrupted | CodexErr::TurnAborted) => CompactionStatus::Interrupted,
Err(_) => CompactionStatus::Failed,
}
}
fn now_unix_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or_default()
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {

View File

@@ -5,12 +5,16 @@ use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::built_tools;
use crate::compact::CompactionAnalyticsAttempt;
use crate::compact::InitialContextInjection;
use crate::compact::compaction_status_from_result;
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_response_item_model_visible_bytes;
use crate::context_manager::is_codex_generated_item;
use codex_analytics::CompactionMode;
use codex_analytics::CompactionTrigger;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
@@ -29,8 +33,9 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection).await?;
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection, trigger).await?;
Ok(())
}
@@ -46,17 +51,41 @@ pub(crate) async fn run_remote_compact_task(
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context, InitialContextInjection::DoNotInject).await
run_remote_compact_task_inner(
&sess,
&turn_context,
InitialContextInjection::DoNotInject,
CompactionTrigger::Manual,
)
.await
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
) -> CodexResult<()> {
if let Err(err) =
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await
{
let attempt = CompactionAnalyticsAttempt::begin(
sess.as_ref(),
turn_context.as_ref(),
trigger,
CompactionMode::Remote,
)
.await;
let result =
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await;
let deleted_items_before_remote_compact = result.as_ref().ok().copied();
attempt
.track(
sess.as_ref(),
turn_context.as_ref(),
compaction_status_from_result(&result),
result.as_ref().err().map(ToString::to_string),
deleted_items_before_remote_compact,
)
.await;
if let Err(err) = result {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
@@ -70,7 +99,7 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<()> {
) -> CodexResult<usize> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
@@ -163,7 +192,7 @@ async fn run_remote_compact_task_inner_impl(
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
Ok(())
Ok(deleted_items)
}
pub(crate) async fn process_compacted_history(