mirror of
https://github.com/openai/codex.git
synced 2026-04-09 09:01:44 +03:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2fc7d7d375 | ||
|
|
595a74a809 | ||
|
|
0b2428f63d | ||
|
|
1d27971725 | ||
|
|
03544aff7b | ||
|
|
61122ffaac | ||
|
|
6636449efa | ||
|
|
4754a36da7 | ||
|
|
53520c23e5 | ||
|
|
9ca7a71d05 |
@@ -3,6 +3,7 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCompactionEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
@@ -18,6 +19,13 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CompactionImplementation;
|
||||
use crate::facts::CompactionPhase;
|
||||
use crate::facts::CompactionReason;
|
||||
use crate::facts::CompactionStatus;
|
||||
use crate::facts::CompactionStrategy;
|
||||
use crate::facts::CompactionTrigger;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
@@ -254,6 +262,54 @@ fn app_used_event_serializes_expected_shape() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compaction_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::Compaction(Box::new(CodexCompactionEventRequest {
|
||||
event_type: "codex_compaction_event",
|
||||
event_params: crate::events::codex_compaction_event_params(CodexCompactionEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
trigger: CompactionTrigger::Auto,
|
||||
reason: CompactionReason::TokenLimit,
|
||||
implementation: CompactionImplementation::ResponsesCompact,
|
||||
phase: CompactionPhase::MidTurn,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status: CompactionStatus::Completed,
|
||||
error: None,
|
||||
active_context_tokens_before: 120_000,
|
||||
active_context_tokens_after: 18_000,
|
||||
started_at: 100,
|
||||
completed_at: 106,
|
||||
duration_ms: Some(6543),
|
||||
}),
|
||||
}));
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize compaction event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_compaction_event",
|
||||
"event_params": {
|
||||
"thread_id": "thread-1",
|
||||
"turn_id": "turn-1",
|
||||
"trigger": "auto",
|
||||
"reason": "token_limit",
|
||||
"implementation": "responses_compact",
|
||||
"phase": "mid_turn",
|
||||
"strategy": "memento",
|
||||
"status": "completed",
|
||||
"error": null,
|
||||
"active_context_tokens_before": 120000,
|
||||
"active_context_tokens_after": 18000,
|
||||
"started_at": 100,
|
||||
"completed_at": 106,
|
||||
"duration_ms": 6543
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_used_dedupe_is_keyed_by_turn_and_connector() {
|
||||
let (sender, _receiver) = mpsc::channel(1);
|
||||
@@ -449,6 +505,48 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compaction_event_ingests_custom_fact() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-compact".to_string(),
|
||||
trigger: CompactionTrigger::Manual,
|
||||
reason: CompactionReason::UserRequested,
|
||||
implementation: CompactionImplementation::Responses,
|
||||
phase: CompactionPhase::StandaloneTurn,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status: CompactionStatus::Failed,
|
||||
error: Some("context limit exceeded".to_string()),
|
||||
active_context_tokens_before: 131_000,
|
||||
active_context_tokens_after: 131_000,
|
||||
started_at: 100,
|
||||
completed_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
},
|
||||
))),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_compaction_event");
|
||||
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
|
||||
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-compact");
|
||||
assert_eq!(payload[0]["event_params"]["trigger"], "manual");
|
||||
assert_eq!(payload[0]["event_params"]["reason"], "user_requested");
|
||||
assert_eq!(payload[0]["event_params"]["implementation"], "responses");
|
||||
assert_eq!(payload[0]["event_params"]["phase"], "standalone_turn");
|
||||
assert_eq!(payload[0]["event_params"]["strategy"], "memento");
|
||||
assert_eq!(payload[0]["event_params"]["status"], "failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subagent_thread_started_review_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
|
||||
|
||||
@@ -178,6 +178,12 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
|
||||
Box::new(event),
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
@@ -37,6 +38,7 @@ pub(crate) enum TrackEventRequest {
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
Compaction(Box<CodexCompactionEventRequest>),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
@@ -122,6 +124,30 @@ pub(crate) struct CodexAppUsedEventRequest {
|
||||
pub(crate) event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCompactionEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
pub(crate) trigger: crate::facts::CompactionTrigger,
|
||||
pub(crate) reason: crate::facts::CompactionReason,
|
||||
pub(crate) implementation: crate::facts::CompactionImplementation,
|
||||
pub(crate) phase: crate::facts::CompactionPhase,
|
||||
pub(crate) strategy: crate::facts::CompactionStrategy,
|
||||
pub(crate) status: crate::facts::CompactionStatus,
|
||||
pub(crate) error: Option<String>,
|
||||
pub(crate) active_context_tokens_before: i64,
|
||||
pub(crate) active_context_tokens_after: i64,
|
||||
pub(crate) started_at: u64,
|
||||
pub(crate) completed_at: u64,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCompactionEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexCompactionEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginMetadata {
|
||||
pub(crate) plugin_id: Option<String>,
|
||||
@@ -201,6 +227,27 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_compaction_event_params(
|
||||
input: CodexCompactionEvent,
|
||||
) -> CodexCompactionEventParams {
|
||||
CodexCompactionEventParams {
|
||||
thread_id: input.thread_id,
|
||||
turn_id: input.turn_id,
|
||||
trigger: input.trigger,
|
||||
reason: input.reason,
|
||||
implementation: input.implementation,
|
||||
phase: input.phase,
|
||||
strategy: input.strategy,
|
||||
status: input.status,
|
||||
error: input.error,
|
||||
active_context_tokens_before: input.active_context_tokens_before,
|
||||
active_context_tokens_after: input.active_context_tokens_after,
|
||||
started_at: input.started_at,
|
||||
completed_at: input.completed_at,
|
||||
duration_ms: input.duration_ms,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_plugin_used_metadata(
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
|
||||
@@ -63,6 +63,69 @@ pub struct SubAgentThreadStartedInput {
|
||||
pub created_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionTrigger {
|
||||
Manual,
|
||||
Auto,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionReason {
|
||||
UserRequested,
|
||||
TokenLimit,
|
||||
ModelDownshift,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionImplementation {
|
||||
Responses,
|
||||
ResponsesCompact,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionPhase {
|
||||
StandaloneTurn,
|
||||
PreTurn,
|
||||
MidTurn,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionStrategy {
|
||||
Memento,
|
||||
PrefixCompaction,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CodexCompactionEvent {
|
||||
pub thread_id: String,
|
||||
pub turn_id: String,
|
||||
pub trigger: CompactionTrigger,
|
||||
pub reason: CompactionReason,
|
||||
pub implementation: CompactionImplementation,
|
||||
pub phase: CompactionPhase,
|
||||
pub strategy: CompactionStrategy,
|
||||
pub status: CompactionStatus,
|
||||
pub error: Option<String>,
|
||||
pub active_context_tokens_before: i64,
|
||||
pub active_context_tokens_after: i64,
|
||||
pub started_at: u64,
|
||||
pub completed_at: u64,
|
||||
pub duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum AnalyticsFact {
|
||||
Initialize {
|
||||
@@ -89,6 +152,7 @@ pub(crate) enum AnalyticsFact {
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
Compaction(Box<CodexCompactionEvent>),
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
|
||||
@@ -6,6 +6,13 @@ mod reducer;
|
||||
pub use client::AnalyticsEventsClient;
|
||||
pub use events::AppServerRpcTransport;
|
||||
pub use facts::AppInvocation;
|
||||
pub use facts::CodexCompactionEvent;
|
||||
pub use facts::CompactionImplementation;
|
||||
pub use facts::CompactionPhase;
|
||||
pub use facts::CompactionReason;
|
||||
pub use facts::CompactionStatus;
|
||||
pub use facts::CompactionStrategy;
|
||||
pub use facts::CompactionTrigger;
|
||||
pub use facts::InvocationType;
|
||||
pub use facts::SkillInvocation;
|
||||
pub use facts::SubAgentThreadStartedInput;
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCompactionEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
@@ -12,6 +13,7 @@ use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_compaction_event_params;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
use crate::events::codex_plugin_used_metadata;
|
||||
use crate::events::plugin_state_event_type;
|
||||
@@ -20,6 +22,7 @@ use crate::events::thread_source_name;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
@@ -81,6 +84,9 @@ impl AnalyticsReducer {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::Compaction(input) => {
|
||||
self.ingest_compaction(*input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
@@ -254,6 +260,7 @@ impl AnalyticsReducer {
|
||||
_ => return,
|
||||
};
|
||||
let thread_source: SessionSource = thread.source.into();
|
||||
let thread_id = thread.id;
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
@@ -261,7 +268,7 @@ impl AnalyticsReducer {
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
event_params: ThreadInitializedEventParams {
|
||||
thread_id: thread.id,
|
||||
thread_id,
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
model,
|
||||
@@ -275,6 +282,15 @@ impl AnalyticsReducer {
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
|
||||
out.push(TrackEventRequest::Compaction(Box::new(
|
||||
CodexCompactionEventRequest {
|
||||
event_type: "codex_compaction_event",
|
||||
event_params: codex_compaction_event_params(input),
|
||||
},
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn skill_id_for_local_skill(
|
||||
|
||||
@@ -50,6 +50,8 @@ use chrono::Local;
|
||||
use chrono::Utc;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_analytics::AppInvocation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
use codex_analytics::CompactionReason;
|
||||
use codex_analytics::InvocationType;
|
||||
use codex_analytics::SubAgentThreadStartedInput;
|
||||
use codex_analytics::build_track_events_context;
|
||||
@@ -6217,6 +6219,8 @@ pub(crate) async fn run_turn(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::BeforeLastUserMessage,
|
||||
CompactionReason::TokenLimit,
|
||||
CompactionPhase::MidTurn,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
@@ -6399,7 +6403,14 @@ async fn run_pre_sampling_compact(
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionReason::TokenLimit,
|
||||
CompactionPhase::PreTurn,
|
||||
)
|
||||
.await?;
|
||||
pre_sampling_compacted = true;
|
||||
}
|
||||
Ok(pre_sampling_compacted)
|
||||
@@ -6443,6 +6454,8 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
sess,
|
||||
&previous_model_turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionReason::ModelDownshift,
|
||||
CompactionPhase::PreTurn,
|
||||
)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
@@ -6454,12 +6467,16 @@ async fn run_auto_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
if should_use_remote_compact_task(&turn_context.provider) {
|
||||
run_inline_remote_auto_compact_task(
|
||||
Arc::clone(sess),
|
||||
Arc::clone(turn_context),
|
||||
initial_context_injection,
|
||||
reason,
|
||||
phase,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -6467,6 +6484,8 @@ async fn run_auto_compact(
|
||||
Arc::clone(sess),
|
||||
Arc::clone(turn_context),
|
||||
initial_context_injection,
|
||||
reason,
|
||||
phase,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use crate::Prompt;
|
||||
use crate::client::ModelClientSession;
|
||||
@@ -9,6 +12,14 @@ use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::get_last_assistant_message_from_turn;
|
||||
use crate::util::backoff;
|
||||
use codex_analytics::CodexCompactionEvent;
|
||||
use codex_analytics::CompactionImplementation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
use codex_analytics::CompactionReason;
|
||||
use codex_analytics::CompactionStatus;
|
||||
use codex_analytics::CompactionStrategy;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_features::Feature;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
@@ -55,6 +66,8 @@ pub(crate) async fn run_inline_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
let prompt = turn_context.compact_prompt().to_string();
|
||||
let input = vec![UserInput::Text {
|
||||
@@ -63,7 +76,16 @@ pub(crate) async fn run_inline_auto_compact_task(
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
|
||||
run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?;
|
||||
run_compact_task_inner(
|
||||
sess,
|
||||
turn_context,
|
||||
input,
|
||||
initial_context_injection,
|
||||
CompactionTrigger::Auto,
|
||||
reason,
|
||||
phase,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -84,6 +106,9 @@ pub(crate) async fn run_compact_task(
|
||||
turn_context,
|
||||
input,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::Manual,
|
||||
CompactionReason::UserRequested,
|
||||
CompactionPhase::StandaloneTurn,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -93,6 +118,41 @@ async fn run_compact_task_inner(
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
let attempt = CompactionAnalyticsAttempt::begin(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
trigger,
|
||||
reason,
|
||||
CompactionImplementation::Responses,
|
||||
phase,
|
||||
)
|
||||
.await;
|
||||
let result = run_compact_task_inner_impl(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
input,
|
||||
initial_context_injection,
|
||||
)
|
||||
.await;
|
||||
attempt
|
||||
.track(
|
||||
sess.as_ref(),
|
||||
compaction_status_from_result(&result),
|
||||
result.as_ref().err().map(ToString::to_string),
|
||||
)
|
||||
.await;
|
||||
result
|
||||
}
|
||||
|
||||
async fn run_compact_task_inner_impl(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
) -> CodexResult<()> {
|
||||
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
|
||||
sess.emit_turn_item_started(&turn_context, &compaction_item)
|
||||
@@ -233,6 +293,96 @@ async fn run_compact_task_inner(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) struct CompactionAnalyticsAttempt {
|
||||
enabled: bool,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
trigger: CompactionTrigger,
|
||||
reason: CompactionReason,
|
||||
implementation: CompactionImplementation,
|
||||
phase: CompactionPhase,
|
||||
active_context_tokens_before: i64,
|
||||
started_at: u64,
|
||||
start_instant: Instant,
|
||||
}
|
||||
|
||||
impl CompactionAnalyticsAttempt {
|
||||
pub(crate) async fn begin(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
trigger: CompactionTrigger,
|
||||
reason: CompactionReason,
|
||||
implementation: CompactionImplementation,
|
||||
phase: CompactionPhase,
|
||||
) -> Self {
|
||||
let enabled = sess.enabled(Feature::GeneralAnalytics);
|
||||
let active_context_tokens_before = if enabled {
|
||||
sess.get_total_token_usage().await
|
||||
} else {
|
||||
0
|
||||
};
|
||||
Self {
|
||||
enabled,
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
trigger,
|
||||
reason,
|
||||
implementation,
|
||||
phase,
|
||||
active_context_tokens_before,
|
||||
started_at: now_unix_seconds(),
|
||||
start_instant: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn track(
|
||||
self,
|
||||
sess: &Session,
|
||||
status: CompactionStatus,
|
||||
error: Option<String>,
|
||||
) {
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
let active_context_tokens_after = sess.get_total_token_usage().await;
|
||||
sess.services
|
||||
.analytics_events_client
|
||||
.track_compaction(CodexCompactionEvent {
|
||||
thread_id: self.thread_id,
|
||||
turn_id: self.turn_id,
|
||||
trigger: self.trigger,
|
||||
reason: self.reason,
|
||||
implementation: self.implementation,
|
||||
phase: self.phase,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status,
|
||||
error,
|
||||
active_context_tokens_before: self.active_context_tokens_before,
|
||||
active_context_tokens_after,
|
||||
started_at: self.started_at,
|
||||
completed_at: now_unix_seconds(),
|
||||
duration_ms: Some(
|
||||
u64::try_from(self.start_instant.elapsed().as_millis()).unwrap_or(u64::MAX),
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn compaction_status_from_result<T>(result: &CodexResult<T>) -> CompactionStatus {
|
||||
match result {
|
||||
Ok(_) => CompactionStatus::Completed,
|
||||
Err(CodexErr::Interrupted | CodexErr::TurnAborted) => CompactionStatus::Interrupted,
|
||||
Err(_) => CompactionStatus::Failed,
|
||||
}
|
||||
}
|
||||
|
||||
fn now_unix_seconds() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_secs())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
|
||||
let mut pieces = Vec::new();
|
||||
for item in content {
|
||||
|
||||
@@ -5,12 +5,18 @@ use crate::Prompt;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::built_tools;
|
||||
use crate::compact::CompactionAnalyticsAttempt;
|
||||
use crate::compact::InitialContextInjection;
|
||||
use crate::compact::compaction_status_from_result;
|
||||
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::context_manager::estimate_response_item_model_visible_bytes;
|
||||
use crate::context_manager::is_codex_generated_item;
|
||||
use codex_analytics::CompactionImplementation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
use codex_analytics::CompactionReason;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
@@ -29,8 +35,18 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection).await?;
|
||||
run_remote_compact_task_inner(
|
||||
&sess,
|
||||
&turn_context,
|
||||
initial_context_injection,
|
||||
CompactionTrigger::Auto,
|
||||
reason,
|
||||
phase,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,17 +62,44 @@ pub(crate) async fn run_remote_compact_task(
|
||||
});
|
||||
sess.send_event(&turn_context, start_event).await;
|
||||
|
||||
run_remote_compact_task_inner(&sess, &turn_context, InitialContextInjection::DoNotInject).await
|
||||
run_remote_compact_task_inner(
|
||||
&sess,
|
||||
&turn_context,
|
||||
InitialContextInjection::DoNotInject,
|
||||
CompactionTrigger::Manual,
|
||||
CompactionReason::UserRequested,
|
||||
CompactionPhase::StandaloneTurn,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn run_remote_compact_task_inner(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
initial_context_injection: InitialContextInjection,
|
||||
trigger: CompactionTrigger,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<()> {
|
||||
if let Err(err) =
|
||||
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await
|
||||
{
|
||||
let attempt = CompactionAnalyticsAttempt::begin(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
trigger,
|
||||
reason,
|
||||
CompactionImplementation::ResponsesCompact,
|
||||
phase,
|
||||
)
|
||||
.await;
|
||||
let result =
|
||||
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await;
|
||||
attempt
|
||||
.track(
|
||||
sess.as_ref(),
|
||||
compaction_status_from_result(&result),
|
||||
result.as_ref().err().map(ToString::to_string),
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
let event = EventMsg::Error(
|
||||
err.to_error_event(Some("Error running remote compact task".to_string())),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user