establish reducer structure

This commit is contained in:
Roy Han
2026-03-25 15:05:08 -07:00
parent 4a163ee2a9
commit b41bc22e45
2 changed files with 257 additions and 424 deletions

View File

@@ -10,6 +10,7 @@ use codex_protocol::protocol::SubAgentSource;
use serde::Serialize;
use sha1::Digest;
use sha1::Sha1;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
@@ -78,9 +79,64 @@ pub struct AppInvocation {
pub invocation_type: Option<InvocationType>,
}
pub enum AnalyticsInput {
ThreadInitialized(ThreadInitializedInput),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),
PluginUsed(PluginUsedInput),
PluginStateChanged(PluginStateChangedInput),
}
pub struct ThreadInitializedInput {
pub thread_event: CodexThreadInitializedEvent,
}
pub struct SkillInvokedInput {
pub tracking: TrackEventsContext,
pub invocations: Vec<SkillInvocation>,
}
pub struct AppMentionedInput {
pub tracking: TrackEventsContext,
pub mentions: Vec<AppInvocation>,
}
pub struct AppUsedInput {
pub tracking: TrackEventsContext,
pub app: AppInvocation,
}
pub struct PluginUsedInput {
pub tracking: TrackEventsContext,
pub plugin: PluginTelemetryMetadata,
}
pub struct PluginStateChangedInput {
pub plugin: PluginTelemetryMetadata,
pub state: PluginState,
}
#[derive(Clone, Copy)]
pub enum PluginState {
Installed,
Uninstalled,
Enabled,
Disabled,
}
#[derive(Default)]
pub struct AnalyticsReducer {
threads: HashMap<String, ThreadState>,
}
struct ThreadState {
_initialized_event: CodexThreadInitializedEvent,
}
#[derive(Clone)]
pub(crate) struct AnalyticsEventsQueue {
sender: mpsc::Sender<TrackEventsJob>,
sender: mpsc::Sender<AnalyticsInput>,
app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
}
@@ -95,36 +151,11 @@ impl AnalyticsEventsQueue {
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
tokio::spawn(async move {
let mut reducer = AnalyticsReducer::default();
while let Some(job) = receiver.recv().await {
match job {
TrackEventsJob::SkillInvocations(job) => {
send_track_skill_invocations(&auth_manager, &base_url, job).await;
}
TrackEventsJob::ThreadInitialized(job) => {
send_track_thread_initialized(&auth_manager, &base_url, job).await;
}
TrackEventsJob::AppMentioned(job) => {
send_track_app_mentioned(&auth_manager, &base_url, job).await;
}
TrackEventsJob::AppUsed(job) => {
send_track_app_used(&auth_manager, &base_url, job).await;
}
TrackEventsJob::PluginUsed(job) => {
send_track_plugin_used(&auth_manager, &base_url, job).await;
}
TrackEventsJob::PluginInstalled(job) => {
send_track_plugin_installed(&auth_manager, &base_url, job).await;
}
TrackEventsJob::PluginUninstalled(job) => {
send_track_plugin_uninstalled(&auth_manager, &base_url, job).await;
}
TrackEventsJob::PluginEnabled(job) => {
send_track_plugin_enabled(&auth_manager, &base_url, job).await;
}
TrackEventsJob::PluginDisabled(job) => {
send_track_plugin_disabled(&auth_manager, &base_url, job).await;
}
}
let mut events = Vec::new();
reducer.ingest(job, &mut events).await;
send_track_events(&auth_manager, &base_url, events).await;
}
});
Self {
@@ -134,8 +165,8 @@ impl AnalyticsEventsQueue {
}
}
fn try_send(&self, job: TrackEventsJob) {
if self.sender.try_send(job).is_err() {
fn try_send(&self, input: AnalyticsInput) {
if self.sender.try_send(input).is_err() {
//TODO: add a metric for this
tracing::warn!("dropping analytics events: queue is full");
}
@@ -188,124 +219,90 @@ impl AnalyticsEventsClient {
tracking: TrackEventsContext,
invocations: Vec<SkillInvocation>,
) {
track_skill_invocations(
&self.queue,
self.analytics_enabled,
Some(tracking),
if invocations.is_empty() {
return;
}
self.record(AnalyticsInput::SkillInvoked(SkillInvokedInput {
tracking,
invocations,
);
}));
}
pub fn track_thread_initialized(&self, thread_event: CodexThreadInitializedEvent) {
track_thread_initialized(&self.queue, self.analytics_enabled, thread_event);
self.record(AnalyticsInput::ThreadInitialized(ThreadInitializedInput {
thread_event,
}));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
track_app_mentioned(
&self.queue,
self.analytics_enabled,
Some(tracking),
if mentions.is_empty() {
return;
}
self.record(AnalyticsInput::AppMentioned(AppMentionedInput {
tracking,
mentions,
);
}));
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
track_app_used(&self.queue, self.analytics_enabled, Some(tracking), app);
if !self.queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record(AnalyticsInput::AppUsed(AppUsedInput { tracking, app }));
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
track_plugin_used(&self.queue, self.analytics_enabled, Some(tracking), plugin);
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record(AnalyticsInput::PluginUsed(PluginUsedInput {
tracking,
plugin,
}));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
self.analytics_enabled,
PluginManagementEventType::Installed,
plugin,
);
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
plugin,
state: PluginState::Installed,
},
));
}
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
self.analytics_enabled,
PluginManagementEventType::Uninstalled,
plugin,
);
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
plugin,
state: PluginState::Uninstalled,
},
));
}
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
self.analytics_enabled,
PluginManagementEventType::Enabled,
plugin,
);
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
plugin,
state: PluginState::Enabled,
},
));
}
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
track_plugin_management(
&self.queue,
self.analytics_enabled,
PluginManagementEventType::Disabled,
plugin,
);
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
plugin,
state: PluginState::Disabled,
},
));
}
}
enum TrackEventsJob {
SkillInvocations(TrackSkillInvocationsJob),
ThreadInitialized(TrackThreadInitializedJob),
AppMentioned(TrackAppMentionedJob),
AppUsed(TrackAppUsedJob),
PluginUsed(TrackPluginUsedJob),
PluginInstalled(TrackPluginManagementJob),
PluginUninstalled(TrackPluginManagementJob),
PluginEnabled(TrackPluginManagementJob),
PluginDisabled(TrackPluginManagementJob),
}
struct TrackSkillInvocationsJob {
analytics_enabled: Option<bool>,
tracking: TrackEventsContext,
invocations: Vec<SkillInvocation>,
}
struct TrackThreadInitializedJob {
analytics_enabled: Option<bool>,
thread_event: CodexThreadInitializedEvent,
}
struct TrackAppMentionedJob {
analytics_enabled: Option<bool>,
tracking: TrackEventsContext,
mentions: Vec<AppInvocation>,
}
struct TrackAppUsedJob {
analytics_enabled: Option<bool>,
tracking: TrackEventsContext,
app: AppInvocation,
}
struct TrackPluginUsedJob {
analytics_enabled: Option<bool>,
tracking: TrackEventsContext,
plugin: PluginTelemetryMetadata,
}
struct TrackPluginManagementJob {
analytics_enabled: Option<bool>,
plugin: PluginTelemetryMetadata,
}
#[derive(Clone, Copy)]
enum PluginManagementEventType {
Installed,
Uninstalled,
Enabled,
Disabled,
pub fn record(&self, input: AnalyticsInput) {
if self.analytics_enabled == Some(false) {
return;
}
self.queue.try_send(input);
}
}
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
@@ -423,320 +420,151 @@ struct CodexPluginUsedEventRequest {
event_params: CodexPluginUsedMetadata,
}
pub(crate) fn track_skill_invocations(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
tracking: Option<TrackEventsContext>,
invocations: Vec<SkillInvocation>,
) {
if analytics_enabled == Some(false) {
return;
impl AnalyticsReducer {
async fn ingest(&mut self, input: AnalyticsInput, out: &mut Vec<TrackEventRequest>) {
match input {
AnalyticsInput::ThreadInitialized(input) => {
self.ingest_thread_initialized(input, out);
}
AnalyticsInput::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
AnalyticsInput::AppMentioned(input) => {
self.ingest_app_mentioned(input, out);
}
AnalyticsInput::AppUsed(input) => {
self.ingest_app_used(input, out);
}
AnalyticsInput::PluginUsed(input) => {
self.ingest_plugin_used(input, out);
}
AnalyticsInput::PluginStateChanged(input) => {
self.ingest_plugin_state_changed(input, out);
}
}
}
let Some(tracking) = tracking else {
return;
};
if invocations.is_empty() {
return;
}
let job = TrackEventsJob::SkillInvocations(TrackSkillInvocationsJob {
analytics_enabled,
tracking,
invocations,
});
queue.try_send(job);
}
pub(crate) fn track_thread_initialized(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
thread_event: CodexThreadInitializedEvent,
) {
if analytics_enabled == Some(false) {
return;
}
let job = TrackEventsJob::ThreadInitialized(TrackThreadInitializedJob {
analytics_enabled,
thread_event,
});
queue.try_send(job);
}
pub(crate) fn track_app_mentioned(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
tracking: Option<TrackEventsContext>,
mentions: Vec<AppInvocation>,
) {
if analytics_enabled == Some(false) {
return;
}
let Some(tracking) = tracking else {
return;
};
if mentions.is_empty() {
return;
}
let job = TrackEventsJob::AppMentioned(TrackAppMentionedJob {
analytics_enabled,
tracking,
mentions,
});
queue.try_send(job);
}
pub(crate) fn track_app_used(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
tracking: Option<TrackEventsContext>,
app: AppInvocation,
) {
if analytics_enabled == Some(false) {
return;
}
let Some(tracking) = tracking else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
let job = TrackEventsJob::AppUsed(TrackAppUsedJob {
analytics_enabled,
tracking,
app,
});
queue.try_send(job);
}
pub(crate) fn track_plugin_used(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
tracking: Option<TrackEventsContext>,
plugin: PluginTelemetryMetadata,
) {
if analytics_enabled == Some(false) {
return;
}
let Some(tracking) = tracking else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
let job = TrackEventsJob::PluginUsed(TrackPluginUsedJob {
analytics_enabled,
tracking,
plugin,
});
queue.try_send(job);
}
fn track_plugin_management(
queue: &AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
event_type: PluginManagementEventType,
plugin: PluginTelemetryMetadata,
) {
if analytics_enabled == Some(false) {
return;
}
let job = TrackPluginManagementJob {
analytics_enabled,
plugin,
};
let job = match event_type {
PluginManagementEventType::Installed => TrackEventsJob::PluginInstalled(job),
PluginManagementEventType::Uninstalled => TrackEventsJob::PluginUninstalled(job),
PluginManagementEventType::Enabled => TrackEventsJob::PluginEnabled(job),
PluginManagementEventType::Disabled => TrackEventsJob::PluginDisabled(job),
};
queue.try_send(job);
}
async fn send_track_skill_invocations(
auth_manager: &AuthManager,
base_url: &str,
job: TrackSkillInvocationsJob,
) {
let TrackSkillInvocationsJob {
analytics_enabled,
tracking,
invocations,
} = job;
let mut events = Vec::with_capacity(invocations.len());
for invocation in invocations {
let skill_scope = match invocation.skill_scope {
SkillScope::User => "user",
SkillScope::Repo => "repo",
SkillScope::System => "system",
SkillScope::Admin => "admin",
};
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
let repo_url = if let Some(root) = repo_root.as_ref() {
collect_git_info(root)
.await
.and_then(|info| info.repository_url)
} else {
None
};
let skill_id = skill_id_for_local_skill(
repo_url.as_deref(),
repo_root.as_deref(),
invocation.skill_path.as_path(),
invocation.skill_name.as_str(),
fn ingest_thread_initialized(
&mut self,
input: ThreadInitializedInput,
out: &mut Vec<TrackEventRequest>,
) {
self.threads.insert(
input.thread_event.thread_id.clone(),
ThreadState {
_initialized_event: input.thread_event.clone(),
},
);
events.push(TrackEventRequest::SkillInvocation(
SkillInvocationEventRequest {
event_type: "skill_invocation",
skill_id,
skill_name: invocation.skill_name.clone(),
event_params: SkillInvocationEventParams {
thread_id: Some(tracking.thread_id.clone()),
invoke_type: Some(invocation.invocation_type),
model_slug: Some(tracking.model_slug.clone()),
product_client_id: Some(originator().value),
repo_url,
skill_scope: Some(skill_scope.to_string()),
},
out.push(TrackEventRequest::ThreadInitialized(
CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(input.thread_event),
},
));
}
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,
out: &mut Vec<TrackEventRequest>,
) {
let SkillInvokedInput {
tracking,
invocations,
} = input;
for invocation in invocations {
let skill_scope = match invocation.skill_scope {
SkillScope::User => "user",
SkillScope::Repo => "repo",
SkillScope::System => "system",
SkillScope::Admin => "admin",
};
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
let repo_url = if let Some(root) = repo_root.as_ref() {
collect_git_info(root)
.await
.and_then(|info| info.repository_url)
} else {
None
};
let skill_id = skill_id_for_local_skill(
repo_url.as_deref(),
repo_root.as_deref(),
invocation.skill_path.as_path(),
invocation.skill_name.as_str(),
);
out.push(TrackEventRequest::SkillInvocation(
SkillInvocationEventRequest {
event_type: "skill_invocation",
skill_id,
skill_name: invocation.skill_name.clone(),
event_params: SkillInvocationEventParams {
thread_id: Some(tracking.thread_id.clone()),
invoke_type: Some(invocation.invocation_type),
model_slug: Some(tracking.model_slug.clone()),
product_client_id: Some(originator().value),
repo_url,
skill_scope: Some(skill_scope.to_string()),
},
},
));
}
}
async fn send_track_thread_initialized(
auth_manager: &AuthManager,
base_url: &str,
job: TrackThreadInitializedJob,
) {
let TrackThreadInitializedJob {
analytics_enabled,
thread_event,
} = job;
let events = vec![TrackEventRequest::ThreadInitialized(
CodexThreadInitializedEventRequest {
event_type: "codex_thread_initialized",
event_params: codex_thread_initialized_event_params(thread_event),
},
)];
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
}
async fn send_track_app_mentioned(
auth_manager: &AuthManager,
base_url: &str,
job: TrackAppMentionedJob,
) {
let TrackAppMentionedJob {
analytics_enabled,
tracking,
mentions,
} = job;
let events = mentions
.into_iter()
.map(|mention| {
fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec<TrackEventRequest>) {
let AppMentionedInput { tracking, mentions } = input;
out.extend(mentions.into_iter().map(|mention| {
let event_params = codex_app_metadata(&tracking, mention);
TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest {
event_type: "codex_app_mentioned",
event_params,
})
})
.collect::<Vec<_>>();
}));
}
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec<TrackEventRequest>) {
let AppUsedInput { tracking, app } = input;
let event_params = codex_app_metadata(&tracking, app);
out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
event_type: "codex_app_used",
event_params,
}));
}
fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec<TrackEventRequest>) {
let PluginUsedInput { tracking, plugin } = input;
out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: codex_plugin_used_metadata(&tracking, plugin),
}));
}
fn ingest_plugin_state_changed(
&mut self,
input: PluginStateChangedInput,
out: &mut Vec<TrackEventRequest>,
) {
let PluginStateChangedInput { plugin, state } = input;
let event = CodexPluginEventRequest {
event_type: plugin_state_event_type(state),
event_params: codex_plugin_metadata(plugin),
};
out.push(match state {
PluginState::Installed => TrackEventRequest::PluginInstalled(event),
PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
});
}
}
async fn send_track_app_used(auth_manager: &AuthManager, base_url: &str, job: TrackAppUsedJob) {
let TrackAppUsedJob {
analytics_enabled,
tracking,
app,
} = job;
let event_params = codex_app_metadata(&tracking, app);
let events = vec![TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
event_type: "codex_app_used",
event_params,
})];
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
}
async fn send_track_plugin_used(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginUsedJob,
) {
let TrackPluginUsedJob {
analytics_enabled,
tracking,
plugin,
} = job;
let events = vec![TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: codex_plugin_used_metadata(&tracking, plugin),
})];
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
}
async fn send_track_plugin_installed(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginManagementJob,
) {
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_installed").await;
}
async fn send_track_plugin_uninstalled(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginManagementJob,
) {
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_uninstalled")
.await;
}
async fn send_track_plugin_enabled(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginManagementJob,
) {
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_enabled").await;
}
async fn send_track_plugin_disabled(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginManagementJob,
) {
send_track_plugin_management_event(auth_manager, base_url, job, "codex_plugin_disabled").await;
}
async fn send_track_plugin_management_event(
auth_manager: &AuthManager,
base_url: &str,
job: TrackPluginManagementJob,
event_type: &'static str,
) {
let TrackPluginManagementJob {
analytics_enabled,
plugin,
} = job;
let event_params = codex_plugin_metadata(plugin);
let event = CodexPluginEventRequest {
event_type,
event_params,
};
let events = vec![match event_type {
"codex_plugin_installed" => TrackEventRequest::PluginInstalled(event),
"codex_plugin_uninstalled" => TrackEventRequest::PluginUninstalled(event),
"codex_plugin_enabled" => TrackEventRequest::PluginEnabled(event),
"codex_plugin_disabled" => TrackEventRequest::PluginDisabled(event),
_ => unreachable!("unknown plugin management event type"),
}];
send_track_events(auth_manager, analytics_enabled, base_url, events).await;
fn plugin_state_event_type(state: PluginState) -> &'static str {
match state {
PluginState::Installed => "codex_plugin_installed",
PluginState::Uninstalled => "codex_plugin_uninstalled",
PluginState::Enabled => "codex_plugin_enabled",
PluginState::Disabled => "codex_plugin_disabled",
}
}
fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata {
@@ -822,13 +650,9 @@ fn subagent_source_name(subagent_source: SubAgentSource) -> String {
async fn send_track_events(
auth_manager: &AuthManager,
analytics_enabled: Option<bool>,
base_url: &str,
events: Vec<TrackEventRequest>,
) {
if analytics_enabled == Some(false) {
return;
}
if events.is_empty() {
return;
}