mirror of
https://github.com/openai/codex.git
synced 2026-04-09 00:51:43 +03:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53520c23e5 | ||
|
|
9ca7a71d05 |
@@ -3,6 +3,7 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCompactionEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
@@ -18,6 +19,10 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CompactionMode;
|
||||
use crate::facts::CompactionStatus;
|
||||
use crate::facts::CompactionTrigger;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
@@ -254,6 +259,86 @@ fn app_used_event_serializes_expected_shape() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compaction_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::Compaction(Box::new(CodexCompactionEventRequest {
|
||||
event_type: "codex_compaction_event",
|
||||
event_params: crate::events::codex_compaction_event_params(
|
||||
CodexAppServerClientMetadata {
|
||||
product_client_id: "CODEX_CLI".to_string(),
|
||||
client_name: Some("codex-tui".to_string()),
|
||||
client_version: Some("1.2.3".to_string()),
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
experimental_api_enabled: Some(true),
|
||||
},
|
||||
CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.1.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
trigger: CompactionTrigger::AutoMidTurn,
|
||||
mode: CompactionMode::Remote,
|
||||
status: CompactionStatus::Completed,
|
||||
error: None,
|
||||
started_at: 100,
|
||||
completed_at: 106,
|
||||
duration_ms: Some(6543),
|
||||
input_tokens_before: Some(1000),
|
||||
input_tokens_after: Some(600),
|
||||
estimated_tokens_before: Some(1500),
|
||||
estimated_tokens_after: Some(900),
|
||||
history_items_before: 50,
|
||||
history_items_after: 12,
|
||||
deleted_items_before_remote_compact: Some(3),
|
||||
},
|
||||
),
|
||||
}));
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize compaction event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_compaction_event",
|
||||
"event_params": {
|
||||
"thread_id": "thread-1",
|
||||
"turn_id": "turn-1",
|
||||
"app_server_client": {
|
||||
"product_client_id": "CODEX_CLI",
|
||||
"client_name": "codex-tui",
|
||||
"client_version": "1.2.3",
|
||||
"rpc_transport": "in_process",
|
||||
"experimental_api_enabled": true
|
||||
},
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"trigger": "auto_mid_turn",
|
||||
"mode": "remote",
|
||||
"status": "completed",
|
||||
"error": null,
|
||||
"started_at": 100,
|
||||
"completed_at": 106,
|
||||
"duration_ms": 6543,
|
||||
"input_tokens_before": 1000,
|
||||
"input_tokens_after": 600,
|
||||
"estimated_tokens_before": 1500,
|
||||
"estimated_tokens_after": 900,
|
||||
"history_items_before": 50,
|
||||
"history_items_after": 12,
|
||||
"deleted_items_before_remote_compact": 3
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_used_dedupe_is_keyed_by_turn_and_connector() {
|
||||
let (sender, _receiver) = mpsc::channel(1);
|
||||
@@ -449,6 +534,99 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compaction_event_waits_for_thread_connection_metadata() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-compact".to_string(),
|
||||
trigger: CompactionTrigger::Manual,
|
||||
mode: CompactionMode::Local,
|
||||
status: CompactionStatus::Failed,
|
||||
error: Some("context limit exceeded".to_string()),
|
||||
started_at: 100,
|
||||
completed_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
input_tokens_before: Some(2000),
|
||||
input_tokens_after: Some(2000),
|
||||
estimated_tokens_before: Some(2500),
|
||||
estimated_tokens_after: Some(2500),
|
||||
history_items_before: 20,
|
||||
history_items_after: 20,
|
||||
deleted_items_before_remote_compact: None,
|
||||
},
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
events.is_empty(),
|
||||
"compaction events should wait for client/runtime metadata"
|
||||
);
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: Some(InitializeCapabilities {
|
||||
experimental_api: false,
|
||||
opt_out_notification_methods: None,
|
||||
}),
|
||||
},
|
||||
product_client_id: DEFAULT_ORIGINATOR.to_string(),
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "linux".to_string(),
|
||||
runtime_os_version: "24.04".to_string(),
|
||||
runtime_arch: "x86_64".to_string(),
|
||||
},
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-1", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 2);
|
||||
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");
|
||||
assert_eq!(payload[1]["event_type"], "codex_compaction_event");
|
||||
assert_eq!(payload[1]["event_params"]["thread_id"], "thread-1");
|
||||
assert_eq!(payload[1]["event_params"]["turn_id"], "turn-compact");
|
||||
assert_eq!(payload[1]["event_params"]["trigger"], "manual");
|
||||
assert_eq!(payload[1]["event_params"]["mode"], "local");
|
||||
assert_eq!(payload[1]["event_params"]["status"], "failed");
|
||||
assert_eq!(
|
||||
payload[1]["event_params"]["app_server_client"]["product_client_id"],
|
||||
DEFAULT_ORIGINATOR
|
||||
);
|
||||
assert_eq!(
|
||||
payload[1]["event_params"]["runtime"]["codex_rs_version"],
|
||||
"0.99.0"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subagent_thread_started_review_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
|
||||
|
||||
@@ -178,6 +178,12 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
|
||||
Box::new(event),
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
@@ -37,6 +38,7 @@ pub(crate) enum TrackEventRequest {
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
Compaction(Box<CodexCompactionEventRequest>),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
@@ -122,6 +124,34 @@ pub(crate) struct CodexAppUsedEventRequest {
|
||||
pub(crate) event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCompactionEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
pub(crate) app_server_client: CodexAppServerClientMetadata,
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) trigger: crate::facts::CompactionTrigger,
|
||||
pub(crate) mode: crate::facts::CompactionMode,
|
||||
pub(crate) status: crate::facts::CompactionStatus,
|
||||
pub(crate) error: Option<String>,
|
||||
pub(crate) started_at: u64,
|
||||
pub(crate) completed_at: u64,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
pub(crate) input_tokens_before: Option<i64>,
|
||||
pub(crate) input_tokens_after: Option<i64>,
|
||||
pub(crate) estimated_tokens_before: Option<i64>,
|
||||
pub(crate) estimated_tokens_after: Option<i64>,
|
||||
pub(crate) history_items_before: usize,
|
||||
pub(crate) history_items_after: usize,
|
||||
pub(crate) deleted_items_before_remote_compact: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCompactionEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexCompactionEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginMetadata {
|
||||
pub(crate) plugin_id: Option<String>,
|
||||
@@ -201,6 +231,33 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_compaction_event_params(
|
||||
app_server_client: CodexAppServerClientMetadata,
|
||||
runtime: CodexRuntimeMetadata,
|
||||
input: CodexCompactionEvent,
|
||||
) -> CodexCompactionEventParams {
|
||||
CodexCompactionEventParams {
|
||||
thread_id: input.thread_id,
|
||||
turn_id: input.turn_id,
|
||||
app_server_client,
|
||||
runtime,
|
||||
trigger: input.trigger,
|
||||
mode: input.mode,
|
||||
status: input.status,
|
||||
error: input.error,
|
||||
started_at: input.started_at,
|
||||
completed_at: input.completed_at,
|
||||
duration_ms: input.duration_ms,
|
||||
input_tokens_before: input.input_tokens_before,
|
||||
input_tokens_after: input.input_tokens_after,
|
||||
estimated_tokens_before: input.estimated_tokens_before,
|
||||
estimated_tokens_after: input.estimated_tokens_after,
|
||||
history_items_before: input.history_items_before,
|
||||
history_items_after: input.history_items_after,
|
||||
deleted_items_before_remote_compact: input.deleted_items_before_remote_compact,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_plugin_used_metadata(
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
|
||||
@@ -63,6 +63,50 @@ pub struct SubAgentThreadStartedInput {
|
||||
pub created_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionTrigger {
|
||||
Manual,
|
||||
AutoPreTurn,
|
||||
AutoMidTurn,
|
||||
ModelDownshift,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionMode {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CodexCompactionEvent {
|
||||
pub thread_id: String,
|
||||
pub turn_id: String,
|
||||
pub trigger: CompactionTrigger,
|
||||
pub mode: CompactionMode,
|
||||
pub status: CompactionStatus,
|
||||
pub error: Option<String>,
|
||||
pub started_at: u64,
|
||||
pub completed_at: u64,
|
||||
pub duration_ms: Option<u64>,
|
||||
pub input_tokens_before: Option<i64>,
|
||||
pub input_tokens_after: Option<i64>,
|
||||
pub estimated_tokens_before: Option<i64>,
|
||||
pub estimated_tokens_after: Option<i64>,
|
||||
pub history_items_before: usize,
|
||||
pub history_items_after: usize,
|
||||
pub deleted_items_before_remote_compact: Option<usize>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum AnalyticsFact {
|
||||
Initialize {
|
||||
@@ -89,6 +133,7 @@ pub(crate) enum AnalyticsFact {
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
Compaction(Box<CodexCompactionEvent>),
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
|
||||
@@ -6,6 +6,10 @@ mod reducer;
|
||||
pub use client::AnalyticsEventsClient;
|
||||
pub use events::AppServerRpcTransport;
|
||||
pub use facts::AppInvocation;
|
||||
pub use facts::CodexCompactionEvent;
|
||||
pub use facts::CompactionMode;
|
||||
pub use facts::CompactionStatus;
|
||||
pub use facts::CompactionTrigger;
|
||||
pub use facts::InvocationType;
|
||||
pub use facts::SkillInvocation;
|
||||
pub use facts::SubAgentThreadStartedInput;
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCompactionEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
@@ -12,6 +13,7 @@ use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_compaction_event_params;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
use crate::events::codex_plugin_used_metadata;
|
||||
use crate::events::plugin_state_event_type;
|
||||
@@ -20,6 +22,7 @@ use crate::events::thread_source_name;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
@@ -40,6 +43,8 @@ use std::path::Path;
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AnalyticsReducer {
|
||||
connections: HashMap<u64, ConnectionState>,
|
||||
thread_connections: HashMap<String, u64>,
|
||||
pending_compactions: HashMap<String, Vec<CodexCompactionEvent>>,
|
||||
}
|
||||
|
||||
struct ConnectionState {
|
||||
@@ -81,6 +86,9 @@ impl AnalyticsReducer {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::Compaction(input) => {
|
||||
self.ingest_compaction(*input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
@@ -254,6 +262,9 @@ impl AnalyticsReducer {
|
||||
_ => return,
|
||||
};
|
||||
let thread_source: SessionSource = thread.source.into();
|
||||
let thread_id = thread.id;
|
||||
self.thread_connections
|
||||
.insert(thread_id.clone(), connection_id);
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
@@ -261,7 +272,7 @@ impl AnalyticsReducer {
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
event_params: ThreadInitializedEventParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread_id.clone(),
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
model,
|
||||
@@ -274,6 +285,65 @@ impl AnalyticsReducer {
|
||||
},
|
||||
},
|
||||
));
|
||||
self.maybe_emit_pending_compactions(&thread_id, out);
|
||||
}
|
||||
|
||||
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
|
||||
let thread_id = input.thread_id.clone();
|
||||
if self.try_emit_compaction(input.clone(), out) {
|
||||
return;
|
||||
}
|
||||
self.pending_compactions
|
||||
.entry(thread_id)
|
||||
.or_default()
|
||||
.push(input);
|
||||
}
|
||||
|
||||
fn maybe_emit_pending_compactions(
|
||||
&mut self,
|
||||
thread_id: &str,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let Some(pending) = self.pending_compactions.remove(thread_id) else {
|
||||
return;
|
||||
};
|
||||
let mut still_pending = Vec::new();
|
||||
for input in pending {
|
||||
if !self.try_emit_compaction(input.clone(), out) {
|
||||
still_pending.push(input);
|
||||
}
|
||||
}
|
||||
if !still_pending.is_empty() {
|
||||
self.pending_compactions
|
||||
.insert(thread_id.to_string(), still_pending);
|
||||
}
|
||||
}
|
||||
|
||||
fn try_emit_compaction(
|
||||
&mut self,
|
||||
input: CodexCompactionEvent,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) -> bool {
|
||||
let connection_metadata = self
|
||||
.thread_connections
|
||||
.get(&input.thread_id)
|
||||
.and_then(|connection_id| self.connections.get(connection_id))
|
||||
.map(|connection_state| {
|
||||
(
|
||||
connection_state.app_server_client.clone(),
|
||||
connection_state.runtime.clone(),
|
||||
)
|
||||
});
|
||||
let Some((app_server_client, runtime)) = connection_metadata else {
|
||||
return false;
|
||||
};
|
||||
out.push(TrackEventRequest::Compaction(Box::new(
|
||||
CodexCompactionEventRequest {
|
||||
event_type: "codex_compaction_event",
|
||||
event_params: codex_compaction_event_params(app_server_client, runtime, input),
|
||||
},
|
||||
)));
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ use chrono::Local;
|
||||
use chrono::Utc;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AppInvocation;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_analytics::InvocationType;
|
||||
use codex_analytics::SubAgentThreadStartedInput;
|
||||
use codex_analytics::build_track_events_context;
|
||||
@@ -6214,6 +6215,7 @@ pub(crate) async fn run_turn(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
CompactionTrigger::AutoMidTurn,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
@@ -6396,7 +6398,13 @@ async fn run_pre_sampling_compact(
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::AutoPreTurn,
|
||||
)
|
||||
.await?;
|
||||
pre_sampling_compacted = true;
|
||||
}
|
||||
Ok(pre_sampling_compacted)
|
||||
@@ -6440,6 +6448,7 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
sess,
|
||||
&previous_model_turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::ModelDownshift,
|
||||
)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
@@ -6451,12 +6460,14 @@ async fn run_auto_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
) -> CodexResult<()> {
|
||||
if should_use_remote_compact_task(&turn_context.provider) {
|
||||
run_inline_remote_auto_compact_task(
|
||||
Arc::clone(sess),
|
||||
Arc::clone(turn_context),
|
||||
initial_context_injection,
|
||||
trigger,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -6464,6 +6475,7 @@ async fn run_auto_compact(
|
||||
Arc::clone(sess),
|
||||
Arc::clone(turn_context),
|
||||
initial_context_injection,
|
||||
trigger,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use crate::Prompt;
|
||||
use crate::client::ModelClientSession;
|
||||
@@ -9,6 +12,11 @@ use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::get_last_assistant_message_from_turn;
|
||||
use crate::util::backoff;
|
||||
use codex_analytics::CodexCompactionEvent;
|
||||
use codex_analytics::CompactionMode;
|
||||
use codex_analytics::CompactionStatus;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_features::Feature;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
@@ -55,6 +63,7 @@ pub(crate) async fn run_inline_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
) -> CodexResult<()> {
|
||||
let prompt = turn_context.compact_prompt().to_string();
|
||||
let input = vec![UserInput::Text {
|
||||
@@ -63,7 +72,14 @@ pub(crate) async fn run_inline_auto_compact_task(
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
|
||||
run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?;
|
||||
run_compact_task_inner(
|
||||
sess,
|
||||
turn_context,
|
||||
input,
|
||||
initial_context_injection,
|
||||
trigger,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -84,6 +100,7 @@ pub(crate) async fn run_compact_task(
|
||||
turn_context,
|
||||
input,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::Manual,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -93,6 +110,39 @@ async fn run_compact_task_inner(
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
) -> CodexResult<()> {
|
||||
let attempt = CompactionAnalyticsAttempt::begin(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
trigger,
|
||||
CompactionMode::Local,
|
||||
)
|
||||
.await;
|
||||
let result = run_compact_task_inner_impl(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
input,
|
||||
initial_context_injection,
|
||||
)
|
||||
.await;
|
||||
attempt
|
||||
.track(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
compaction_status_from_result(&result),
|
||||
result.as_ref().err().map(ToString::to_string),
|
||||
/*deleted_items_before_remote_compact*/ None,
|
||||
)
|
||||
.await;
|
||||
result
|
||||
}
|
||||
|
||||
async fn run_compact_task_inner_impl(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
) -> CodexResult<()> {
|
||||
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
|
||||
sess.emit_turn_item_started(&turn_context, &compaction_item)
|
||||
@@ -233,6 +283,110 @@ async fn run_compact_task_inner(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) struct CompactionAnalyticsAttempt {
|
||||
enabled: bool,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
trigger: CompactionTrigger,
|
||||
mode: CompactionMode,
|
||||
started_at: u64,
|
||||
start_instant: Instant,
|
||||
before: CompactionAnalyticsSnapshot,
|
||||
}
|
||||
|
||||
struct CompactionAnalyticsSnapshot {
|
||||
input_tokens: Option<i64>,
|
||||
estimated_tokens: Option<i64>,
|
||||
history_items: usize,
|
||||
}
|
||||
|
||||
impl CompactionAnalyticsAttempt {
|
||||
pub(crate) async fn begin(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
trigger: CompactionTrigger,
|
||||
mode: CompactionMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
enabled: sess.enabled(Feature::GeneralAnalytics),
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
trigger,
|
||||
mode,
|
||||
started_at: now_unix_seconds(),
|
||||
start_instant: Instant::now(),
|
||||
before: CompactionAnalyticsSnapshot::capture(sess, turn_context).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn track(
|
||||
self,
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
status: CompactionStatus,
|
||||
error: Option<String>,
|
||||
deleted_items_before_remote_compact: Option<usize>,
|
||||
) {
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
let after = CompactionAnalyticsSnapshot::capture(sess, turn_context).await;
|
||||
sess.services
|
||||
.analytics_events_client
|
||||
.track_compaction(CodexCompactionEvent {
|
||||
thread_id: self.thread_id,
|
||||
turn_id: self.turn_id,
|
||||
trigger: self.trigger,
|
||||
mode: self.mode,
|
||||
status,
|
||||
error,
|
||||
started_at: self.started_at,
|
||||
completed_at: now_unix_seconds(),
|
||||
duration_ms: Some(
|
||||
u64::try_from(self.start_instant.elapsed().as_millis()).unwrap_or(u64::MAX),
|
||||
),
|
||||
input_tokens_before: self.before.input_tokens,
|
||||
input_tokens_after: after.input_tokens,
|
||||
estimated_tokens_before: self.before.estimated_tokens,
|
||||
estimated_tokens_after: after.estimated_tokens,
|
||||
history_items_before: self.before.history_items,
|
||||
history_items_after: after.history_items,
|
||||
deleted_items_before_remote_compact,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionAnalyticsSnapshot {
|
||||
async fn capture(sess: &Session, turn_context: &TurnContext) -> Self {
|
||||
let input_tokens = sess
|
||||
.total_token_usage()
|
||||
.await
|
||||
.map(|usage| usage.input_tokens);
|
||||
let estimated_tokens = sess.get_estimated_token_count(turn_context).await;
|
||||
let history_items = sess.clone_history().await.raw_items().len();
|
||||
Self {
|
||||
input_tokens,
|
||||
estimated_tokens,
|
||||
history_items,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn compaction_status_from_result<T>(result: &CodexResult<T>) -> CompactionStatus {
|
||||
match result {
|
||||
Ok(_) => CompactionStatus::Completed,
|
||||
Err(CodexErr::Interrupted | CodexErr::TurnAborted) => CompactionStatus::Interrupted,
|
||||
Err(_) => CompactionStatus::Failed,
|
||||
}
|
||||
}
|
||||
|
||||
fn now_unix_seconds() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_secs())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
|
||||
let mut pieces = Vec::new();
|
||||
for item in content {
|
||||
|
||||
@@ -5,12 +5,16 @@ use crate::Prompt;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::built_tools;
|
||||
use crate::compact::CompactionAnalyticsAttempt;
|
||||
use crate::compact::InitialContextInjection;
|
||||
use crate::compact::compaction_status_from_result;
|
||||
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::context_manager::estimate_response_item_model_visible_bytes;
|
||||
use crate::context_manager::is_codex_generated_item;
|
||||
use codex_analytics::CompactionMode;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
@@ -29,8 +33,9 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
) -> CodexResult<()> {
|
||||
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection).await?;
|
||||
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection, trigger).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,17 +51,41 @@ pub(crate) async fn run_remote_compact_task(
|
||||
});
|
||||
sess.send_event(&turn_context, start_event).await;
|
||||
|
||||
run_remote_compact_task_inner(&sess, &turn_context, InitialContextInjection::DoNotInject).await
|
||||
run_remote_compact_task_inner(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::Manual,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn run_remote_compact_task_inner(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
) -> CodexResult<()> {
|
||||
if let Err(err) =
|
||||
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await
|
||||
{
|
||||
let attempt = CompactionAnalyticsAttempt::begin(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
trigger,
|
||||
CompactionMode::Remote,
|
||||
)
|
||||
.await;
|
||||
let result =
|
||||
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await;
|
||||
let deleted_items_before_remote_compact = result.as_ref().ok().copied();
|
||||
attempt
|
||||
.track(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
compaction_status_from_result(&result),
|
||||
result.as_ref().err().map(ToString::to_string),
|
||||
deleted_items_before_remote_compact,
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
let event = EventMsg::Error(
|
||||
err.to_error_event(Some("Error running remote compact task".to_string())),
|
||||
);
|
||||
@@ -70,7 +99,7 @@ async fn run_remote_compact_task_inner_impl(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
) -> CodexResult<()> {
|
||||
) -> CodexResult<usize> {
|
||||
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
|
||||
sess.emit_turn_item_started(turn_context, &compaction_item)
|
||||
.await;
|
||||
@@ -163,7 +192,7 @@ async fn run_remote_compact_task_inner_impl(
|
||||
|
||||
sess.emit_turn_item_completed(turn_context, compaction_item)
|
||||
.await;
|
||||
Ok(())
|
||||
Ok(deleted_items)
|
||||
}
|
||||
|
||||
pub(crate) async fn process_compacted_history(
|
||||
|
||||
Reference in New Issue
Block a user