Compare commits

...

6 Commits

Author SHA1 Message Date
alexsong-oai
4daf209709 minor 2026-03-25 19:16:16 -07:00
alexsong-oai
d19ca96cb4 rename 2026-03-25 19:12:31 -07:00
alexsong-oai
1f615494c8 clean 2026-03-25 18:50:03 -07:00
alexsong-oai
1fac4b3be6 updates 2026-03-25 18:44:05 -07:00
alexsong-oai
e272b5bade update 2026-03-25 16:37:17 -07:00
alexsong-oai
bc9ad48874 Add monitoring metrics to analytics events 2026-03-25 13:29:21 -07:00
2 changed files with 287 additions and 4 deletions

View File

@@ -8,6 +8,7 @@ use codex_protocol::protocol::SkillScope;
use serde::Serialize;
use sha1::Digest;
use sha1::Sha1;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
@@ -15,6 +16,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
#[derive(Clone)]
pub(crate) struct TrackEventsContext {
@@ -110,9 +112,22 @@ impl AnalyticsEventsQueue {
}
fn try_send(&self, job: TrackEventsJob) {
if self.sender.try_send(job).is_err() {
//TODO: add a metric for this
tracing::warn!("dropping analytics events: queue is full");
if let Err(err) = self.sender.try_send(job) {
let (reason, job) = match &err {
TrySendError::Full(job) => ("queue_full", job),
TrySendError::Closed(job) => ("queue_closed", job),
};
emit_analytics_events_failure_count(reason, &[]);
emit_analytics_events_failed_events_count(
reason,
job.job_type(),
job.event_count(),
&[],
);
tracing::warn!(
"dropping analytics events job: reason={reason} job_type={}",
job.job_type()
);
}
}
@@ -245,6 +260,100 @@ enum TrackEventsJob {
PluginDisabled(TrackPluginManagementJob),
}
const SKILL_INVOCATIONS_JOB_TYPE: &str = "skill_invocations";
const APP_MENTIONED_JOB_TYPE: &str = "app_mentioned";
const APP_USED_JOB_TYPE: &str = "app_used";
const PLUGIN_USED_JOB_TYPE: &str = "plugin_used";
const PLUGIN_INSTALLED_JOB_TYPE: &str = "plugin_installed";
const PLUGIN_UNINSTALLED_JOB_TYPE: &str = "plugin_uninstalled";
const PLUGIN_ENABLED_JOB_TYPE: &str = "plugin_enabled";
const PLUGIN_DISABLED_JOB_TYPE: &str = "plugin_disabled";
impl TrackEventsJob {
fn job_type(&self) -> &'static str {
match self {
Self::SkillInvocations(_) => SKILL_INVOCATIONS_JOB_TYPE,
Self::AppMentioned(_) => APP_MENTIONED_JOB_TYPE,
Self::AppUsed(_) => APP_USED_JOB_TYPE,
Self::PluginUsed(_) => PLUGIN_USED_JOB_TYPE,
Self::PluginInstalled(_) => PLUGIN_INSTALLED_JOB_TYPE,
Self::PluginUninstalled(_) => PLUGIN_UNINSTALLED_JOB_TYPE,
Self::PluginEnabled(_) => PLUGIN_ENABLED_JOB_TYPE,
Self::PluginDisabled(_) => PLUGIN_DISABLED_JOB_TYPE,
}
}
fn event_count(&self) -> usize {
match self {
Self::SkillInvocations(job) => job.invocations.len(),
Self::AppMentioned(job) => job.mentions.len(),
Self::AppUsed(_)
| Self::PluginUsed(_)
| Self::PluginInstalled(_)
| Self::PluginUninstalled(_)
| Self::PluginEnabled(_)
| Self::PluginDisabled(_) => 1,
}
}
}
fn emit_analytics_events_failure_count(reason: &'static str, extra_tags: &[(&str, &str)]) {
if let Some(metrics) = codex_otel::metrics::global() {
let mut tags = Vec::with_capacity(1 + extra_tags.len());
tags.push(("reason", reason));
tags.extend(extra_tags.iter().copied());
let increment = 1;
let _ = metrics.counter("codex.analytics_events.emit.failure", increment, &tags);
}
}
fn emit_analytics_events_failed_events_count(
reason: &'static str,
job_type: &'static str,
event_count: usize,
extra_tags: &[(&str, &str)],
) {
if let Some(metrics) = codex_otel::metrics::global() {
let mut tags = Vec::with_capacity(2 + extra_tags.len());
tags.push(("reason", reason));
tags.push(("job_type", job_type));
tags.extend(extra_tags.iter().copied());
let event_count = event_count.min(i64::MAX as usize) as i64;
let _ = metrics.counter(
"codex.analytics_events.emit.failure_events",
event_count,
&tags,
);
}
}
fn emit_analytics_events_request_failure_counts(
reason: &'static str,
events: &[TrackEventRequest],
extra_tags: &[(&str, &str)],
) {
emit_analytics_events_failure_count(reason, extra_tags);
let mut counts = BTreeMap::new();
for event in events {
let job_type = match event {
TrackEventRequest::SkillInvocation(_) => SKILL_INVOCATIONS_JOB_TYPE,
TrackEventRequest::AppMentioned(_) => APP_MENTIONED_JOB_TYPE,
TrackEventRequest::AppUsed(_) => APP_USED_JOB_TYPE,
TrackEventRequest::PluginUsed(_) => PLUGIN_USED_JOB_TYPE,
TrackEventRequest::PluginInstalled(_) => PLUGIN_INSTALLED_JOB_TYPE,
TrackEventRequest::PluginUninstalled(_) => PLUGIN_UNINSTALLED_JOB_TYPE,
TrackEventRequest::PluginEnabled(_) => PLUGIN_ENABLED_JOB_TYPE,
TrackEventRequest::PluginDisabled(_) => PLUGIN_DISABLED_JOB_TYPE,
};
*counts.entry(job_type).or_insert(0) += 1;
}
for (job_type, event_count) in counts {
emit_analytics_events_failed_events_count(reason, job_type, event_count, extra_tags);
}
}
struct TrackSkillInvocationsJob {
config: Arc<Config>,
tracking: TrackEventsContext,
@@ -679,16 +788,22 @@ async fn send_track_events(
return;
}
let Some(auth) = auth_manager.auth().await else {
emit_analytics_events_request_failure_counts("auth_missing", &events, &[]);
return;
};
if !auth.is_chatgpt_auth() {
emit_analytics_events_request_failure_counts("non_chatgpt_auth", &events, &[]);
return;
}
let access_token = match auth.get_token() {
Ok(token) => token,
Err(_) => return,
Err(_) => {
emit_analytics_events_request_failure_counts("token_error", &events, &[]);
return;
}
};
let Some(account_id) = auth.get_account_id() else {
emit_analytics_events_request_failure_counts("account_id_missing", &events, &[]);
return;
};
@@ -710,10 +825,16 @@ async fn send_track_events(
Ok(response) if response.status().is_success() => {}
Ok(response) => {
let status = response.status();
emit_analytics_events_request_failure_counts(
"http_status",
&payload.events,
&[("status_code", status.as_str())],
);
let body = response.text().await.unwrap_or_default();
tracing::warn!("events failed with status {status}: {body}");
}
Err(err) => {
emit_analytics_events_request_failure_counts("request_error", &payload.events, &[]);
tracing::warn!("failed to send events request: {err}");
}
}

View File

@@ -5,12 +5,19 @@ use super::CodexAppUsedEventRequest;
use super::CodexPluginEventRequest;
use super::CodexPluginUsedEventRequest;
use super::InvocationType;
use super::TrackAppMentionedJob;
use super::TrackAppUsedJob;
use super::TrackEventRequest;
use super::TrackEventsContext;
use super::TrackEventsJob;
use super::TrackPluginManagementJob;
use super::TrackPluginUsedJob;
use super::TrackSkillInvocationsJob;
use super::codex_app_metadata;
use super::codex_plugin_metadata;
use super::codex_plugin_used_metadata;
use super::normalize_path_for_skill_id;
use crate::config::ConfigBuilder;
use crate::plugins::AppConnectorId;
use crate::plugins::PluginCapabilitySummary;
use crate::plugins::PluginId;
@@ -21,6 +28,7 @@ use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use tempfile::TempDir;
use tokio::sync::mpsc;
fn expected_absolute_path(path: &PathBuf) -> String {
@@ -271,6 +279,146 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() {
assert_eq!(queue.should_enqueue_plugin_used(&turn_2, &plugin), true);
}
#[test]
fn track_events_job_type_uses_expected_tag_values() {
let codex_home = TempDir::new().expect("tempdir should create");
let config = Arc::new(load_test_config(codex_home.path()));
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
};
let cases = vec![
(
TrackEventsJob::SkillInvocations(TrackSkillInvocationsJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
invocations: Vec::new(),
}),
"skill_invocations",
),
(
TrackEventsJob::AppMentioned(TrackAppMentionedJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
mentions: Vec::new(),
}),
"app_mentioned",
),
(
TrackEventsJob::AppUsed(TrackAppUsedJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
app: AppInvocation {
connector_id: None,
app_name: None,
invocation_type: None,
},
}),
"app_used",
),
(
TrackEventsJob::PluginUsed(TrackPluginUsedJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
plugin: sample_plugin_metadata(),
}),
"plugin_used",
),
(
TrackEventsJob::PluginInstalled(TrackPluginManagementJob {
config: Arc::clone(&config),
plugin: sample_plugin_metadata(),
}),
"plugin_installed",
),
(
TrackEventsJob::PluginUninstalled(TrackPluginManagementJob {
config: Arc::clone(&config),
plugin: sample_plugin_metadata(),
}),
"plugin_uninstalled",
),
(
TrackEventsJob::PluginEnabled(TrackPluginManagementJob {
config: Arc::clone(&config),
plugin: sample_plugin_metadata(),
}),
"plugin_enabled",
),
(
TrackEventsJob::PluginDisabled(TrackPluginManagementJob {
config,
plugin: sample_plugin_metadata(),
}),
"plugin_disabled",
),
];
for (job, expected) in cases {
assert_eq!(job.job_type(), expected);
}
}
#[test]
fn track_events_job_event_count_matches_underlying_payloads() {
let codex_home = TempDir::new().expect("tempdir should create");
let config = Arc::new(load_test_config(codex_home.path()));
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
};
let skill_job = TrackEventsJob::SkillInvocations(TrackSkillInvocationsJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
invocations: vec![
super::SkillInvocation {
skill_name: "doc".to_string(),
skill_scope: codex_protocol::protocol::SkillScope::Repo,
skill_path: codex_home.path().join("SKILL.md"),
invocation_type: InvocationType::Explicit,
},
super::SkillInvocation {
skill_name: "doc-2".to_string(),
skill_scope: codex_protocol::protocol::SkillScope::Repo,
skill_path: codex_home.path().join("SKILL-2.md"),
invocation_type: InvocationType::Implicit,
},
],
});
let app_mentioned_job = TrackEventsJob::AppMentioned(TrackAppMentionedJob {
config: Arc::clone(&config),
tracking: tracking.clone(),
mentions: vec![
AppInvocation {
connector_id: Some("drive".to_string()),
app_name: Some("Drive".to_string()),
invocation_type: Some(InvocationType::Explicit),
},
AppInvocation {
connector_id: Some("calendar".to_string()),
app_name: Some("Calendar".to_string()),
invocation_type: Some(InvocationType::Implicit),
},
AppInvocation {
connector_id: Some("gmail".to_string()),
app_name: Some("Gmail".to_string()),
invocation_type: None,
},
],
});
let plugin_job = TrackEventsJob::PluginUsed(TrackPluginUsedJob {
config,
tracking,
plugin: sample_plugin_metadata(),
});
assert_eq!(skill_job.event_count(), 2);
assert_eq!(app_mentioned_job.event_count(), 3);
assert_eq!(plugin_job.event_count(), 1);
}
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
PluginTelemetryMetadata {
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),
@@ -287,3 +435,17 @@ fn sample_plugin_metadata() -> PluginTelemetryMetadata {
}),
}
}
fn load_test_config(codex_home: &std::path::Path) -> crate::config::Config {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime should build")
.block_on(
ConfigBuilder::default()
.codex_home(codex_home.to_path_buf())
.fallback_cwd(Some(codex_home.to_path_buf()))
.build(),
)
.expect("config should load")
}