mirror of
https://github.com/openai/codex.git
synced 2026-05-02 04:11:39 +03:00
[codex-analytics] thread events (#15690)
- add event for thread initialization - thread/start, thread/fork, thread/resume - feature flagged behind `FeatureFlag::GeneralAnalytics` - does not yet support threads started by subagents PR stack: - --> [[telemetry] thread events #15690](https://github.com/openai/codex/pull/15690) - [[telemetry] subagent events #15915](https://github.com/openai/codex/pull/15915) - [[telemetry] turn events #15591](https://github.com/openai/codex/pull/15591) - [[telemetry] steer events #15697](https://github.com/openai/codex/pull/15697) - [[telemetry] queued prompt data #15804](https://github.com/openai/codex/pull/15804) Sample extracted logs in Codex-backend ``` INFO | 2026-03-29 16:39:37 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3bf7-9f5f-7f82-9877-6d48d1052531 product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=new subagent_source=None parent_thread_id=None created_at=1774827577 | INFO | 2026-03-29 16:45:46 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3b84-5731-79d0-9b3b-9c6efe5f5066 product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=resumed subagent_source=None parent_thread_id=None created_at=1774820022 | INFO | 2026-03-29 16:45:49 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3bfd-4cd6-7c12-a13e-48cef02e8c4d product_surface=codex product_client_id=CODEX_CLI client_name=codex-tui client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=forked subagent_source=None parent_thread_id=None created_at=1774827949 | INFO | 2026-03-29 17:20:29 | codex_backend.routers.analytics_events | analytics_events.track_analytics_events:398 | Tracked analytics event codex_thread_initialized thread_id=019d3c1d-0412-7ed2-ad24-c9c0881a36b0 product_surface=codex product_client_id=CODEX_SERVICE_EXEC client_name=codex_exec client_version=0.0.0 rpc_transport=in_process experimental_api_enabled=True codex_rs_version=0.0.0 runtime_os=macos runtime_os_version=26.4.0 runtime_arch=aarch64 model=gpt-5.3-codex ephemeral=False thread_source=user initialization_mode=new subagent_source=None parent_thread_id=None created_at=1774830027 | ``` Notes - `product_client_id` gets canonicalized in codex-backend - subagent threads are addressed in a following pr
This commit is contained in:
@@ -1,671 +0,0 @@
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use serde::Serialize;
|
||||
use sha1::Digest;
|
||||
use sha1::Sha1;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TrackEventsContext {
|
||||
pub model_slug: String,
|
||||
pub thread_id: String,
|
||||
pub turn_id: String,
|
||||
}
|
||||
|
||||
pub fn build_track_events_context(
|
||||
model_slug: String,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
) -> TrackEventsContext {
|
||||
TrackEventsContext {
|
||||
model_slug,
|
||||
thread_id,
|
||||
turn_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SkillInvocation {
|
||||
pub skill_name: String,
|
||||
pub skill_scope: SkillScope,
|
||||
pub skill_path: PathBuf,
|
||||
pub invocation_type: InvocationType,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum InvocationType {
|
||||
Explicit,
|
||||
Implicit,
|
||||
}
|
||||
|
||||
pub struct AppInvocation {
|
||||
pub connector_id: Option<String>,
|
||||
pub app_name: Option<String>,
|
||||
pub invocation_type: Option<InvocationType>,
|
||||
}
|
||||
|
||||
pub enum AnalyticsFact {
|
||||
Initialize {
|
||||
connection_id: u64,
|
||||
params: InitializeParams,
|
||||
},
|
||||
Request {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
Custom(CustomAnalyticsFact),
|
||||
}
|
||||
|
||||
pub enum CustomAnalyticsFact {
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
PluginUsed(PluginUsedInput),
|
||||
PluginStateChanged(PluginStateChangedInput),
|
||||
}
|
||||
|
||||
pub struct SkillInvokedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub invocations: Vec<SkillInvocation>,
|
||||
}
|
||||
|
||||
pub struct AppMentionedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub mentions: Vec<AppInvocation>,
|
||||
}
|
||||
|
||||
pub struct AppUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub app: AppInvocation,
|
||||
}
|
||||
|
||||
pub struct PluginUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
}
|
||||
|
||||
pub struct PluginStateChangedInput {
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
pub state: PluginState,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum PluginState {
|
||||
Installed,
|
||||
Uninstalled,
|
||||
Enabled,
|
||||
Disabled,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct AnalyticsReducer;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AnalyticsEventsQueue {
|
||||
sender: mpsc::Sender<AnalyticsFact>,
|
||||
app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsEventsClient {
|
||||
queue: AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl AnalyticsEventsQueue {
|
||||
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
|
||||
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
|
||||
tokio::spawn(async move {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
while let Some(input) = receiver.recv().await {
|
||||
let mut events = Vec::new();
|
||||
reducer.ingest(input, &mut events).await;
|
||||
send_track_events(&auth_manager, &base_url, events).await;
|
||||
}
|
||||
});
|
||||
Self {
|
||||
sender,
|
||||
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, input: AnalyticsFact) {
|
||||
if self.sender.try_send(input).is_err() {
|
||||
//TODO: add a metric for this
|
||||
tracing::warn!("dropping analytics events: queue is full");
|
||||
}
|
||||
}
|
||||
|
||||
fn should_enqueue_app_used(&self, tracking: &TrackEventsContext, app: &AppInvocation) -> bool {
|
||||
let Some(connector_id) = app.connector_id.as_ref() else {
|
||||
return true;
|
||||
};
|
||||
let mut emitted = self
|
||||
.app_used_emitted_keys
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
||||
emitted.clear();
|
||||
}
|
||||
emitted.insert((tracking.turn_id.clone(), connector_id.clone()))
|
||||
}
|
||||
|
||||
fn should_enqueue_plugin_used(
|
||||
&self,
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: &PluginTelemetryMetadata,
|
||||
) -> bool {
|
||||
let mut emitted = self
|
||||
.plugin_used_emitted_keys
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
||||
emitted.clear();
|
||||
}
|
||||
emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyticsEventsClient {
|
||||
pub fn new(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
base_url: String,
|
||||
analytics_enabled: Option<bool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
|
||||
analytics_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
invocations: Vec<SkillInvocation>,
|
||||
) {
|
||||
if invocations.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
|
||||
SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
},
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
|
||||
if mentions.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
|
||||
AppMentionedInput { tracking, mentions },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
AppUsedInput { tracking, app },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
PluginUsedInput { tracking, plugin },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Installed,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Uninstalled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Enabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Disabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
}
|
||||
|
||||
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
|
||||
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TrackEventsRequest {
|
||||
events: Vec<TrackEventRequest>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(untagged)]
|
||||
enum TrackEventRequest {
|
||||
SkillInvocation(SkillInvocationEventRequest),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
PluginEnabled(CodexPluginEventRequest),
|
||||
PluginDisabled(CodexPluginEventRequest),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SkillInvocationEventRequest {
|
||||
event_type: &'static str,
|
||||
skill_id: String,
|
||||
skill_name: String,
|
||||
event_params: SkillInvocationEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SkillInvocationEventParams {
|
||||
product_client_id: Option<String>,
|
||||
skill_scope: Option<String>,
|
||||
repo_url: Option<String>,
|
||||
thread_id: Option<String>,
|
||||
invoke_type: Option<InvocationType>,
|
||||
model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexAppMetadata {
|
||||
connector_id: Option<String>,
|
||||
thread_id: Option<String>,
|
||||
turn_id: Option<String>,
|
||||
app_name: Option<String>,
|
||||
product_client_id: Option<String>,
|
||||
invoke_type: Option<InvocationType>,
|
||||
model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexAppMentionedEventRequest {
|
||||
event_type: &'static str,
|
||||
event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexAppUsedEventRequest {
|
||||
event_type: &'static str,
|
||||
event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexPluginMetadata {
|
||||
plugin_id: Option<String>,
|
||||
plugin_name: Option<String>,
|
||||
marketplace_name: Option<String>,
|
||||
has_skills: Option<bool>,
|
||||
mcp_server_count: Option<usize>,
|
||||
connector_ids: Option<Vec<String>>,
|
||||
product_client_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexPluginUsedMetadata {
|
||||
#[serde(flatten)]
|
||||
plugin: CodexPluginMetadata,
|
||||
thread_id: Option<String>,
|
||||
turn_id: Option<String>,
|
||||
model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexPluginEventRequest {
|
||||
event_type: &'static str,
|
||||
event_params: CodexPluginMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct CodexPluginUsedEventRequest {
|
||||
event_type: &'static str,
|
||||
event_params: CodexPluginUsedMetadata,
|
||||
}
|
||||
|
||||
impl AnalyticsReducer {
|
||||
async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
|
||||
match input {
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: _connection_id,
|
||||
params: _params,
|
||||
} => {}
|
||||
AnalyticsFact::Request {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::Response {
|
||||
connection_id: _connection_id,
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::AppMentioned(input) => {
|
||||
self.ingest_app_mentioned(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::AppUsed(input) => {
|
||||
self.ingest_app_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginUsed(input) => {
|
||||
self.ingest_plugin_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginStateChanged(input) => {
|
||||
self.ingest_plugin_state_changed(input, out);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
} = input;
|
||||
for invocation in invocations {
|
||||
let skill_scope = match invocation.skill_scope {
|
||||
SkillScope::User => "user",
|
||||
SkillScope::Repo => "repo",
|
||||
SkillScope::System => "system",
|
||||
SkillScope::Admin => "admin",
|
||||
};
|
||||
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
|
||||
let repo_url = if let Some(root) = repo_root.as_ref() {
|
||||
collect_git_info(root)
|
||||
.await
|
||||
.and_then(|info| info.repository_url)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let skill_id = skill_id_for_local_skill(
|
||||
repo_url.as_deref(),
|
||||
repo_root.as_deref(),
|
||||
invocation.skill_path.as_path(),
|
||||
invocation.skill_name.as_str(),
|
||||
);
|
||||
out.push(TrackEventRequest::SkillInvocation(
|
||||
SkillInvocationEventRequest {
|
||||
event_type: "skill_invocation",
|
||||
skill_id,
|
||||
skill_name: invocation.skill_name.clone(),
|
||||
event_params: SkillInvocationEventParams {
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
invoke_type: Some(invocation.invocation_type),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
product_client_id: Some(originator().value),
|
||||
repo_url,
|
||||
skill_scope: Some(skill_scope.to_string()),
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppMentionedInput { tracking, mentions } = input;
|
||||
out.extend(mentions.into_iter().map(|mention| {
|
||||
let event_params = codex_app_metadata(&tracking, mention);
|
||||
TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest {
|
||||
event_type: "codex_app_mentioned",
|
||||
event_params,
|
||||
})
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppUsedInput { tracking, app } = input;
|
||||
let event_params = codex_app_metadata(&tracking, app);
|
||||
out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
|
||||
event_type: "codex_app_used",
|
||||
event_params,
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let PluginUsedInput { tracking, plugin } = input;
|
||||
out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
|
||||
event_type: "codex_plugin_used",
|
||||
event_params: codex_plugin_used_metadata(&tracking, plugin),
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_state_changed(
|
||||
&mut self,
|
||||
input: PluginStateChangedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let PluginStateChangedInput { plugin, state } = input;
|
||||
let event = CodexPluginEventRequest {
|
||||
event_type: plugin_state_event_type(state),
|
||||
event_params: codex_plugin_metadata(plugin),
|
||||
};
|
||||
out.push(match state {
|
||||
PluginState::Installed => TrackEventRequest::PluginInstalled(event),
|
||||
PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
|
||||
PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
|
||||
PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn plugin_state_event_type(state: PluginState) -> &'static str {
|
||||
match state {
|
||||
PluginState::Installed => "codex_plugin_installed",
|
||||
PluginState::Uninstalled => "codex_plugin_uninstalled",
|
||||
PluginState::Enabled => "codex_plugin_enabled",
|
||||
PluginState::Disabled => "codex_plugin_disabled",
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata {
|
||||
CodexAppMetadata {
|
||||
connector_id: app.connector_id,
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
turn_id: Some(tracking.turn_id.clone()),
|
||||
app_name: app.app_name,
|
||||
product_client_id: Some(originator().value),
|
||||
invoke_type: app.invocation_type,
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata {
|
||||
let capability_summary = plugin.capability_summary;
|
||||
CodexPluginMetadata {
|
||||
plugin_id: Some(plugin.plugin_id.as_key()),
|
||||
plugin_name: Some(plugin.plugin_id.plugin_name),
|
||||
marketplace_name: Some(plugin.plugin_id.marketplace_name),
|
||||
has_skills: capability_summary
|
||||
.as_ref()
|
||||
.map(|summary| summary.has_skills),
|
||||
mcp_server_count: capability_summary
|
||||
.as_ref()
|
||||
.map(|summary| summary.mcp_server_names.len()),
|
||||
connector_ids: capability_summary.map(|summary| {
|
||||
summary
|
||||
.app_connector_ids
|
||||
.into_iter()
|
||||
.map(|connector_id| connector_id.0)
|
||||
.collect()
|
||||
}),
|
||||
product_client_id: Some(originator().value),
|
||||
}
|
||||
}
|
||||
|
||||
fn codex_plugin_used_metadata(
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
) -> CodexPluginUsedMetadata {
|
||||
CodexPluginUsedMetadata {
|
||||
plugin: codex_plugin_metadata(plugin),
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
turn_id: Some(tracking.turn_id.clone()),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
events: Vec<TrackEventRequest>,
|
||||
) {
|
||||
if events.is_empty() {
|
||||
return;
|
||||
}
|
||||
let Some(auth) = auth_manager.auth().await else {
|
||||
return;
|
||||
};
|
||||
if !auth.is_chatgpt_auth() {
|
||||
return;
|
||||
}
|
||||
let access_token = match auth.get_token() {
|
||||
Ok(token) => token,
|
||||
Err(_) => return,
|
||||
};
|
||||
let Some(account_id) = auth.get_account_id() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let base_url = base_url.trim_end_matches('/');
|
||||
let url = format!("{base_url}/codex/analytics-events/events");
|
||||
let payload = TrackEventsRequest { events };
|
||||
|
||||
let response = create_client()
|
||||
.post(&url)
|
||||
.timeout(ANALYTICS_EVENTS_TIMEOUT)
|
||||
.bearer_auth(&access_token)
|
||||
.header("chatgpt-account-id", &account_id)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match response {
|
||||
Ok(response) if response.status().is_success() => {}
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
tracing::warn!("events failed with status {status}: {body}");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to send events request: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn skill_id_for_local_skill(
|
||||
repo_url: Option<&str>,
|
||||
repo_root: Option<&Path>,
|
||||
skill_path: &Path,
|
||||
skill_name: &str,
|
||||
) -> String {
|
||||
let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path);
|
||||
let prefix = if let Some(url) = repo_url {
|
||||
format!("repo_{url}")
|
||||
} else {
|
||||
"personal".to_string()
|
||||
};
|
||||
let raw_id = format!("{prefix}_{path}_{skill_name}");
|
||||
let mut hasher = Sha1::new();
|
||||
hasher.update(raw_id.as_bytes());
|
||||
format!("{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
/// Returns a normalized path for skill ID construction.
|
||||
///
|
||||
/// - Repo-scoped skills use a path relative to the repo root.
|
||||
/// - User/admin/system skills use an absolute path.
|
||||
fn normalize_path_for_skill_id(
|
||||
repo_url: Option<&str>,
|
||||
repo_root: Option<&Path>,
|
||||
skill_path: &Path,
|
||||
) -> String {
|
||||
let resolved_path =
|
||||
std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf());
|
||||
match (repo_url, repo_root) {
|
||||
(Some(_), Some(root)) => {
|
||||
let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
|
||||
resolved_path
|
||||
.strip_prefix(&resolved_root)
|
||||
.unwrap_or(resolved_path.as_path())
|
||||
.to_string_lossy()
|
||||
.replace('\\', "/")
|
||||
}
|
||||
_ => resolved_path.to_string_lossy().replace('\\', "/"),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "analytics_client_tests.rs"]
|
||||
mod tests;
|
||||
@@ -1,26 +1,47 @@
|
||||
use super::AnalyticsEventsQueue;
|
||||
use super::AnalyticsFact;
|
||||
use super::AnalyticsReducer;
|
||||
use super::AppInvocation;
|
||||
use super::AppMentionedInput;
|
||||
use super::AppUsedInput;
|
||||
use super::CodexAppMentionedEventRequest;
|
||||
use super::CodexAppUsedEventRequest;
|
||||
use super::CodexPluginEventRequest;
|
||||
use super::CodexPluginUsedEventRequest;
|
||||
use super::CustomAnalyticsFact;
|
||||
use super::InvocationType;
|
||||
use super::PluginState;
|
||||
use super::PluginStateChangedInput;
|
||||
use super::PluginUsedInput;
|
||||
use super::SkillInvocation;
|
||||
use super::SkillInvokedInput;
|
||||
use super::TrackEventRequest;
|
||||
use super::TrackEventsContext;
|
||||
use super::codex_app_metadata;
|
||||
use super::codex_plugin_metadata;
|
||||
use super::codex_plugin_used_metadata;
|
||||
use super::normalize_path_for_skill_id;
|
||||
use crate::client::AnalyticsEventsQueue;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
use crate::events::codex_plugin_used_metadata;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::reducer::normalize_path_for_skill_id;
|
||||
use crate::reducer::skill_id_for_local_skill;
|
||||
use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
|
||||
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
use codex_login::default_client::DEFAULT_ORIGINATOR;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::AppConnectorId;
|
||||
use codex_plugin::PluginCapabilitySummary;
|
||||
@@ -34,6 +55,61 @@ use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread {
|
||||
Thread {
|
||||
id: thread_id.to_string(),
|
||||
preview: "first prompt".to_string(),
|
||||
ephemeral,
|
||||
model_provider: "openai".to_string(),
|
||||
created_at: 1,
|
||||
updated_at: 2,
|
||||
status: AppServerThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: AppServerSessionSource::Exec,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: None,
|
||||
turns: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_start_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
|
||||
ClientResponse::ThreadStart {
|
||||
request_id: RequestId::Integer(1),
|
||||
response: ThreadStartResponse {
|
||||
thread: sample_thread(thread_id, ephemeral),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
reasoning_effort: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
|
||||
ClientResponse::ThreadResume {
|
||||
request_id: RequestId::Integer(2),
|
||||
response: ThreadResumeResponse {
|
||||
thread: sample_thread(thread_id, ephemeral),
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
reasoning_effort: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn expected_absolute_path(path: &PathBuf) -> String {
|
||||
std::fs::canonicalize(path)
|
||||
.unwrap_or_else(|_| path.to_path_buf())
|
||||
@@ -204,6 +280,171 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
|
||||
assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_initialized_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ThreadInitialized(ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
event_params: ThreadInitializedEventParams {
|
||||
thread_id: "thread-0".to_string(),
|
||||
app_server_client: CodexAppServerClientMetadata {
|
||||
product_client_id: DEFAULT_ORIGINATOR.to_string(),
|
||||
client_name: Some("codex-tui".to_string()),
|
||||
client_version: Some("1.0.0".to_string()),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
experimental_api_enabled: Some(true),
|
||||
},
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.1.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
model: "gpt-5".to_string(),
|
||||
ephemeral: true,
|
||||
thread_source: Some("user"),
|
||||
initialization_mode: ThreadInitializationMode::New,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
created_at: 1,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize thread initialized event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_thread_initialized",
|
||||
"event_params": {
|
||||
"thread_id": "thread-0",
|
||||
"app_server_client": {
|
||||
"product_client_id": DEFAULT_ORIGINATOR,
|
||||
"client_name": "codex-tui",
|
||||
"client_version": "1.0.0",
|
||||
"rpc_transport": "stdio",
|
||||
"experimental_api_enabled": true
|
||||
},
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"model": "gpt-5",
|
||||
"ephemeral": true,
|
||||
"thread_source": "user",
|
||||
"initialization_mode": "new",
|
||||
"subagent_source": null,
|
||||
"parent_thread_id": null,
|
||||
"created_at": 1
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-no-client",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
assert!(events.is_empty(), "thread events should require initialize");
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: Some(InitializeCapabilities {
|
||||
experimental_api: false,
|
||||
opt_out_notification_methods: None,
|
||||
}),
|
||||
},
|
||||
product_client_id: DEFAULT_ORIGINATOR.to_string(),
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "linux".to_string(),
|
||||
runtime_os_version: "24.04".to_string(),
|
||||
runtime_arch: "x86_64".to_string(),
|
||||
},
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
assert!(events.is_empty(), "initialize should not publish by itself");
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["product_client_id"],
|
||||
DEFAULT_ORIGINATOR
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["client_name"],
|
||||
"codex-tui"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["client_version"],
|
||||
"1.0.0"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["rpc_transport"],
|
||||
"websocket"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["experimental_api_enabled"],
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["runtime"]["codex_rs_version"],
|
||||
"0.99.0"
|
||||
);
|
||||
assert_eq!(payload[0]["event_params"]["runtime"]["runtime_os"], "linux");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["runtime"]["runtime_os_version"],
|
||||
"24.04"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["runtime"]["runtime_arch"],
|
||||
"x86_64"
|
||||
);
|
||||
assert_eq!(payload[0]["event_params"]["initialization_mode"], "resumed");
|
||||
assert_eq!(payload[0]["event_params"]["thread_source"], "user");
|
||||
assert_eq!(payload[0]["event_params"]["subagent_source"], json!(null));
|
||||
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plugin_used_event_serializes_expected_shape() {
|
||||
let tracking = TrackEventsContext {
|
||||
@@ -292,7 +533,7 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_skill_invoked_fact() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
let tracking = TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
@@ -300,7 +541,7 @@ async fn reducer_ingests_skill_invoked_fact() {
|
||||
turn_id: "turn-1".to_string(),
|
||||
};
|
||||
let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md");
|
||||
let expected_skill_id = super::skill_id_for_local_skill(
|
||||
let expected_skill_id = skill_id_for_local_skill(
|
||||
/*repo_url*/ None,
|
||||
/*repo_root*/ None,
|
||||
skill_path.as_path(),
|
||||
@@ -343,7 +584,7 @@ async fn reducer_ingests_skill_invoked_fact() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_app_and_plugin_facts() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
let tracking = TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
@@ -396,7 +637,7 @@ async fn reducer_ingests_app_and_plugin_facts() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
let mut reducer = AnalyticsReducer;
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
|
||||
272
codex-rs/analytics/src/client.rs
Normal file
272
codex-rs/analytics/src/client.rs
Normal file
@@ -0,0 +1,272 @@
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::TrackEventsRequest;
|
||||
use crate::events::current_runtime_metadata;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
|
||||
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AnalyticsEventsQueue {
|
||||
pub(crate) sender: mpsc::Sender<AnalyticsFact>,
|
||||
pub(crate) app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
pub(crate) plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsEventsClient {
|
||||
queue: AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
impl AnalyticsEventsQueue {
|
||||
pub(crate) fn new(auth_manager: Arc<AuthManager>, base_url: String) -> Self {
|
||||
let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE);
|
||||
tokio::spawn(async move {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
while let Some(input) = receiver.recv().await {
|
||||
let mut events = Vec::new();
|
||||
reducer.ingest(input, &mut events).await;
|
||||
send_track_events(&auth_manager, &base_url, events).await;
|
||||
}
|
||||
});
|
||||
Self {
|
||||
sender,
|
||||
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, input: AnalyticsFact) {
|
||||
if self.sender.try_send(input).is_err() {
|
||||
//TODO: add a metric for this
|
||||
tracing::warn!("dropping analytics events: queue is full");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn should_enqueue_app_used(
|
||||
&self,
|
||||
tracking: &TrackEventsContext,
|
||||
app: &AppInvocation,
|
||||
) -> bool {
|
||||
let Some(connector_id) = app.connector_id.as_ref() else {
|
||||
return true;
|
||||
};
|
||||
let mut emitted = self
|
||||
.app_used_emitted_keys
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
||||
emitted.clear();
|
||||
}
|
||||
emitted.insert((tracking.turn_id.clone(), connector_id.clone()))
|
||||
}
|
||||
|
||||
pub(crate) fn should_enqueue_plugin_used(
|
||||
&self,
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: &PluginTelemetryMetadata,
|
||||
) -> bool {
|
||||
let mut emitted = self
|
||||
.plugin_used_emitted_keys
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS {
|
||||
emitted.clear();
|
||||
}
|
||||
emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AnalyticsEventsClient {
|
||||
pub fn new(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
base_url: String,
|
||||
analytics_enabled: Option<bool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
|
||||
analytics_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
invocations: Vec<SkillInvocation>,
|
||||
) {
|
||||
if invocations.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
|
||||
SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
},
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_initialize(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
params: InitializeParams,
|
||||
product_client_id: String,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
) {
|
||||
self.record_fact(AnalyticsFact::Initialize {
|
||||
connection_id,
|
||||
params,
|
||||
product_client_id,
|
||||
runtime: current_runtime_metadata(),
|
||||
rpc_transport,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
|
||||
if mentions.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
|
||||
AppMentionedInput { tracking, mentions },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
AppUsedInput { tracking, app },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
crate::facts::PluginUsedInput { tracking, plugin },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Installed,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Uninstalled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Enabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
plugin,
|
||||
state: PluginState::Disabled,
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
|
||||
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
|
||||
self.record_fact(AnalyticsFact::Response {
|
||||
connection_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
auth_manager: &AuthManager,
|
||||
base_url: &str,
|
||||
events: Vec<TrackEventRequest>,
|
||||
) {
|
||||
if events.is_empty() {
|
||||
return;
|
||||
}
|
||||
let Some(auth) = auth_manager.auth().await else {
|
||||
return;
|
||||
};
|
||||
if !auth.is_chatgpt_auth() {
|
||||
return;
|
||||
}
|
||||
let access_token = match auth.get_token() {
|
||||
Ok(token) => token,
|
||||
Err(_) => return,
|
||||
};
|
||||
let Some(account_id) = auth.get_account_id() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let base_url = base_url.trim_end_matches('/');
|
||||
let url = format!("{base_url}/codex/analytics-events/events");
|
||||
let payload = TrackEventsRequest { events };
|
||||
|
||||
let response = create_client()
|
||||
.post(&url)
|
||||
.timeout(ANALYTICS_EVENTS_TIMEOUT)
|
||||
.bearer_auth(&access_token)
|
||||
.header("chatgpt-account-id", &account_id)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match response {
|
||||
Ok(response) if response.status().is_success() => {}
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
tracing::warn!("events failed with status {status}: {body}");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to send events request: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
230
codex-rs/analytics/src/events.rs
Normal file
230
codex-rs/analytics/src/events.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::InvocationType;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AppServerRpcTransport {
|
||||
Stdio,
|
||||
Websocket,
|
||||
InProcess,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ThreadInitializationMode {
|
||||
New,
|
||||
Forked,
|
||||
Resumed,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct TrackEventsRequest {
|
||||
pub(crate) events: Vec<TrackEventRequest>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub(crate) enum TrackEventRequest {
|
||||
SkillInvocation(SkillInvocationEventRequest),
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
PluginEnabled(CodexPluginEventRequest),
|
||||
PluginDisabled(CodexPluginEventRequest),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct SkillInvocationEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) skill_id: String,
|
||||
pub(crate) skill_name: String,
|
||||
pub(crate) event_params: SkillInvocationEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct SkillInvocationEventParams {
|
||||
pub(crate) product_client_id: Option<String>,
|
||||
pub(crate) skill_scope: Option<String>,
|
||||
pub(crate) repo_url: Option<String>,
|
||||
pub(crate) thread_id: Option<String>,
|
||||
pub(crate) invoke_type: Option<InvocationType>,
|
||||
pub(crate) model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(crate) struct CodexAppServerClientMetadata {
|
||||
pub(crate) product_client_id: String,
|
||||
pub(crate) client_name: Option<String>,
|
||||
pub(crate) client_version: Option<String>,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) experimental_api_enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(crate) struct CodexRuntimeMetadata {
|
||||
pub(crate) codex_rs_version: String,
|
||||
pub(crate) runtime_os: String,
|
||||
pub(crate) runtime_os_version: String,
|
||||
pub(crate) runtime_arch: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializedEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) app_server_client: CodexAppServerClientMetadata,
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) model: String,
|
||||
pub(crate) ephemeral: bool,
|
||||
pub(crate) thread_source: Option<&'static str>,
|
||||
pub(crate) initialization_mode: ThreadInitializationMode,
|
||||
pub(crate) subagent_source: Option<String>,
|
||||
pub(crate) parent_thread_id: Option<String>,
|
||||
pub(crate) created_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializedEvent {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: ThreadInitializedEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppMetadata {
|
||||
pub(crate) connector_id: Option<String>,
|
||||
pub(crate) thread_id: Option<String>,
|
||||
pub(crate) turn_id: Option<String>,
|
||||
pub(crate) app_name: Option<String>,
|
||||
pub(crate) product_client_id: Option<String>,
|
||||
pub(crate) invoke_type: Option<InvocationType>,
|
||||
pub(crate) model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppMentionedEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppUsedEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexAppMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginMetadata {
|
||||
pub(crate) plugin_id: Option<String>,
|
||||
pub(crate) plugin_name: Option<String>,
|
||||
pub(crate) marketplace_name: Option<String>,
|
||||
pub(crate) has_skills: Option<bool>,
|
||||
pub(crate) mcp_server_count: Option<usize>,
|
||||
pub(crate) connector_ids: Option<Vec<String>>,
|
||||
pub(crate) product_client_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginUsedMetadata {
|
||||
#[serde(flatten)]
|
||||
pub(crate) plugin: CodexPluginMetadata,
|
||||
pub(crate) thread_id: Option<String>,
|
||||
pub(crate) turn_id: Option<String>,
|
||||
pub(crate) model_slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexPluginMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexPluginUsedEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexPluginUsedMetadata,
|
||||
}
|
||||
|
||||
pub(crate) fn plugin_state_event_type(state: PluginState) -> &'static str {
|
||||
match state {
|
||||
PluginState::Installed => "codex_plugin_installed",
|
||||
PluginState::Uninstalled => "codex_plugin_uninstalled",
|
||||
PluginState::Enabled => "codex_plugin_enabled",
|
||||
PluginState::Disabled => "codex_plugin_disabled",
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_app_metadata(
|
||||
tracking: &TrackEventsContext,
|
||||
app: AppInvocation,
|
||||
) -> CodexAppMetadata {
|
||||
CodexAppMetadata {
|
||||
connector_id: app.connector_id,
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
turn_id: Some(tracking.turn_id.clone()),
|
||||
app_name: app.app_name,
|
||||
product_client_id: Some(originator().value),
|
||||
invoke_type: app.invocation_type,
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata {
|
||||
let capability_summary = plugin.capability_summary;
|
||||
CodexPluginMetadata {
|
||||
plugin_id: Some(plugin.plugin_id.as_key()),
|
||||
plugin_name: Some(plugin.plugin_id.plugin_name),
|
||||
marketplace_name: Some(plugin.plugin_id.marketplace_name),
|
||||
has_skills: capability_summary
|
||||
.as_ref()
|
||||
.map(|summary| summary.has_skills),
|
||||
mcp_server_count: capability_summary
|
||||
.as_ref()
|
||||
.map(|summary| summary.mcp_server_names.len()),
|
||||
connector_ids: capability_summary.map(|summary| {
|
||||
summary
|
||||
.app_connector_ids
|
||||
.into_iter()
|
||||
.map(|connector_id| connector_id.0)
|
||||
.collect()
|
||||
}),
|
||||
product_client_id: Some(originator().value),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex_plugin_used_metadata(
|
||||
tracking: &TrackEventsContext,
|
||||
plugin: PluginTelemetryMetadata,
|
||||
) -> CodexPluginUsedMetadata {
|
||||
CodexPluginUsedMetadata {
|
||||
plugin: codex_plugin_metadata(plugin),
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
turn_id: Some(tracking.turn_id.clone()),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn thread_source_name(thread_source: &SessionSource) -> Option<&'static str> {
|
||||
match thread_source {
|
||||
SessionSource::Cli | SessionSource::VSCode | SessionSource::Exec => Some("user"),
|
||||
SessionSource::SubAgent(_) => Some("subagent"),
|
||||
SessionSource::Mcp | SessionSource::Custom(_) | SessionSource::Unknown => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn current_runtime_metadata() -> CodexRuntimeMetadata {
|
||||
let os_info = os_info::get();
|
||||
CodexRuntimeMetadata {
|
||||
codex_rs_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
runtime_os: std::env::consts::OS.to_string(),
|
||||
runtime_os_version: os_info.version().to_string(),
|
||||
runtime_arch: std::env::consts::ARCH.to_string(),
|
||||
}
|
||||
}
|
||||
116
codex-rs/analytics/src/facts.rs
Normal file
116
codex-rs/analytics/src/facts.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use serde::Serialize;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TrackEventsContext {
|
||||
pub model_slug: String,
|
||||
pub thread_id: String,
|
||||
pub turn_id: String,
|
||||
}
|
||||
|
||||
pub fn build_track_events_context(
|
||||
model_slug: String,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
) -> TrackEventsContext {
|
||||
TrackEventsContext {
|
||||
model_slug,
|
||||
thread_id,
|
||||
turn_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SkillInvocation {
|
||||
pub skill_name: String,
|
||||
pub skill_scope: SkillScope,
|
||||
pub skill_path: PathBuf,
|
||||
pub invocation_type: InvocationType,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum InvocationType {
|
||||
Explicit,
|
||||
Implicit,
|
||||
}
|
||||
|
||||
pub struct AppInvocation {
|
||||
pub connector_id: Option<String>,
|
||||
pub app_name: Option<String>,
|
||||
pub invocation_type: Option<InvocationType>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum AnalyticsFact {
|
||||
Initialize {
|
||||
connection_id: u64,
|
||||
params: InitializeParams,
|
||||
product_client_id: String,
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
},
|
||||
Request {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
Custom(CustomAnalyticsFact),
|
||||
}
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
PluginUsed(PluginUsedInput),
|
||||
PluginStateChanged(PluginStateChangedInput),
|
||||
}
|
||||
|
||||
pub(crate) struct SkillInvokedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub invocations: Vec<SkillInvocation>,
|
||||
}
|
||||
|
||||
pub(crate) struct AppMentionedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub mentions: Vec<AppInvocation>,
|
||||
}
|
||||
|
||||
pub(crate) struct AppUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub app: AppInvocation,
|
||||
}
|
||||
|
||||
pub(crate) struct PluginUsedInput {
|
||||
pub tracking: TrackEventsContext,
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
}
|
||||
|
||||
pub(crate) struct PluginStateChangedInput {
|
||||
pub plugin: PluginTelemetryMetadata,
|
||||
pub state: PluginState,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum PluginState {
|
||||
Installed,
|
||||
Uninstalled,
|
||||
Enabled,
|
||||
Disabled,
|
||||
}
|
||||
@@ -1,17 +1,15 @@
|
||||
mod analytics_client;
|
||||
mod client;
|
||||
mod events;
|
||||
mod facts;
|
||||
mod reducer;
|
||||
|
||||
pub use analytics_client::AnalyticsEventsClient;
|
||||
pub use analytics_client::AnalyticsFact;
|
||||
pub use analytics_client::AnalyticsReducer;
|
||||
pub use analytics_client::AppInvocation;
|
||||
pub use analytics_client::AppMentionedInput;
|
||||
pub use analytics_client::AppUsedInput;
|
||||
pub use analytics_client::CustomAnalyticsFact;
|
||||
pub use analytics_client::InvocationType;
|
||||
pub use analytics_client::PluginState;
|
||||
pub use analytics_client::PluginStateChangedInput;
|
||||
pub use analytics_client::PluginUsedInput;
|
||||
pub use analytics_client::SkillInvocation;
|
||||
pub use analytics_client::SkillInvokedInput;
|
||||
pub use analytics_client::TrackEventsContext;
|
||||
pub use analytics_client::build_track_events_context;
|
||||
pub use client::AnalyticsEventsClient;
|
||||
pub use events::AppServerRpcTransport;
|
||||
pub use facts::AppInvocation;
|
||||
pub use facts::InvocationType;
|
||||
pub use facts::SkillInvocation;
|
||||
pub use facts::TrackEventsContext;
|
||||
pub use facts::build_track_events_context;
|
||||
|
||||
#[cfg(test)]
|
||||
mod analytics_client_tests;
|
||||
|
||||
305
codex-rs/analytics/src/reducer.rs
Normal file
305
codex-rs/analytics/src/reducer.rs
Normal file
@@ -0,0 +1,305 @@
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::SkillInvocationEventParams;
|
||||
use crate::events::SkillInvocationEventRequest;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
use crate::events::codex_plugin_used_metadata;
|
||||
use crate::events::plugin_state_event_type;
|
||||
use crate::events::thread_source_name;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use sha1::Digest;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AnalyticsReducer {
|
||||
connections: HashMap<u64, ConnectionState>,
|
||||
}
|
||||
|
||||
struct ConnectionState {
|
||||
app_server_client: CodexAppServerClientMetadata,
|
||||
runtime: CodexRuntimeMetadata,
|
||||
}
|
||||
|
||||
impl AnalyticsReducer {
|
||||
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
|
||||
match input {
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id,
|
||||
params,
|
||||
product_client_id,
|
||||
runtime,
|
||||
rpc_transport,
|
||||
} => {
|
||||
self.ingest_initialize(
|
||||
connection_id,
|
||||
params,
|
||||
product_client_id,
|
||||
runtime,
|
||||
rpc_transport,
|
||||
);
|
||||
}
|
||||
AnalyticsFact::Request {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::Response {
|
||||
connection_id,
|
||||
response,
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::AppMentioned(input) => {
|
||||
self.ingest_app_mentioned(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::AppUsed(input) => {
|
||||
self.ingest_app_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginUsed(input) => {
|
||||
self.ingest_plugin_used(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::PluginStateChanged(input) => {
|
||||
self.ingest_plugin_state_changed(input, out);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_initialize(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
params: InitializeParams,
|
||||
product_client_id: String,
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
) {
|
||||
self.connections.insert(
|
||||
connection_id,
|
||||
ConnectionState {
|
||||
app_server_client: CodexAppServerClientMetadata {
|
||||
product_client_id,
|
||||
client_name: Some(params.client_info.name),
|
||||
client_version: Some(params.client_info.version),
|
||||
rpc_transport,
|
||||
experimental_api_enabled: params
|
||||
.capabilities
|
||||
.map(|capabilities| capabilities.experimental_api),
|
||||
},
|
||||
runtime,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let SkillInvokedInput {
|
||||
tracking,
|
||||
invocations,
|
||||
} = input;
|
||||
for invocation in invocations {
|
||||
let skill_scope = match invocation.skill_scope {
|
||||
SkillScope::User => "user",
|
||||
SkillScope::Repo => "repo",
|
||||
SkillScope::System => "system",
|
||||
SkillScope::Admin => "admin",
|
||||
};
|
||||
let repo_root = get_git_repo_root(invocation.skill_path.as_path());
|
||||
let repo_url = if let Some(root) = repo_root.as_ref() {
|
||||
collect_git_info(root)
|
||||
.await
|
||||
.and_then(|info| info.repository_url)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let skill_id = skill_id_for_local_skill(
|
||||
repo_url.as_deref(),
|
||||
repo_root.as_deref(),
|
||||
invocation.skill_path.as_path(),
|
||||
invocation.skill_name.as_str(),
|
||||
);
|
||||
out.push(TrackEventRequest::SkillInvocation(
|
||||
SkillInvocationEventRequest {
|
||||
event_type: "skill_invocation",
|
||||
skill_id,
|
||||
skill_name: invocation.skill_name.clone(),
|
||||
event_params: SkillInvocationEventParams {
|
||||
thread_id: Some(tracking.thread_id.clone()),
|
||||
invoke_type: Some(invocation.invocation_type),
|
||||
model_slug: Some(tracking.model_slug.clone()),
|
||||
product_client_id: Some(originator().value),
|
||||
repo_url,
|
||||
skill_scope: Some(skill_scope.to_string()),
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppMentionedInput { tracking, mentions } = input;
|
||||
out.extend(mentions.into_iter().map(|mention| {
|
||||
let event_params = codex_app_metadata(&tracking, mention);
|
||||
TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest {
|
||||
event_type: "codex_app_mentioned",
|
||||
event_params,
|
||||
})
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let AppUsedInput { tracking, app } = input;
|
||||
let event_params = codex_app_metadata(&tracking, app);
|
||||
out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest {
|
||||
event_type: "codex_app_used",
|
||||
event_params,
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec<TrackEventRequest>) {
|
||||
let PluginUsedInput { tracking, plugin } = input;
|
||||
out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
|
||||
event_type: "codex_plugin_used",
|
||||
event_params: codex_plugin_used_metadata(&tracking, plugin),
|
||||
}));
|
||||
}
|
||||
|
||||
fn ingest_plugin_state_changed(
|
||||
&mut self,
|
||||
input: PluginStateChangedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let PluginStateChangedInput { plugin, state } = input;
|
||||
let event = CodexPluginEventRequest {
|
||||
event_type: plugin_state_event_type(state),
|
||||
event_params: codex_plugin_metadata(plugin),
|
||||
};
|
||||
out.push(match state {
|
||||
PluginState::Installed => TrackEventRequest::PluginInstalled(event),
|
||||
PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
|
||||
PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
|
||||
PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
|
||||
});
|
||||
}
|
||||
|
||||
fn ingest_response(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
response: ClientResponse,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let (thread, model, initialization_mode) = match response {
|
||||
ClientResponse::ThreadStart { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::New,
|
||||
),
|
||||
ClientResponse::ThreadResume { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Resumed,
|
||||
),
|
||||
ClientResponse::ThreadFork { response, .. } => (
|
||||
response.thread,
|
||||
response.model,
|
||||
ThreadInitializationMode::Forked,
|
||||
),
|
||||
_ => return,
|
||||
};
|
||||
let thread_source: SessionSource = thread.source.into();
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
event_params: ThreadInitializedEventParams {
|
||||
thread_id: thread.id,
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
model,
|
||||
ephemeral: thread.ephemeral,
|
||||
thread_source: thread_source_name(&thread_source),
|
||||
initialization_mode,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn skill_id_for_local_skill(
|
||||
repo_url: Option<&str>,
|
||||
repo_root: Option<&Path>,
|
||||
skill_path: &Path,
|
||||
skill_name: &str,
|
||||
) -> String {
|
||||
let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path);
|
||||
let prefix = if let Some(url) = repo_url {
|
||||
format!("repo_{url}")
|
||||
} else {
|
||||
"personal".to_string()
|
||||
};
|
||||
let raw_id = format!("{prefix}_{path}_{skill_name}");
|
||||
let mut hasher = sha1::Sha1::new();
|
||||
sha1::Digest::update(&mut hasher, raw_id.as_bytes());
|
||||
format!("{:x}", sha1::Digest::finalize(hasher))
|
||||
}
|
||||
|
||||
/// Returns a normalized path for skill ID construction.
|
||||
///
|
||||
/// - Repo-scoped skills use a path relative to the repo root.
|
||||
/// - User/admin/system skills use an absolute path.
|
||||
pub(crate) fn normalize_path_for_skill_id(
|
||||
repo_url: Option<&str>,
|
||||
repo_root: Option<&Path>,
|
||||
skill_path: &Path,
|
||||
) -> String {
|
||||
let resolved_path =
|
||||
std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf());
|
||||
match (repo_url, repo_root) {
|
||||
(Some(_), Some(root)) => {
|
||||
let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
|
||||
resolved_path
|
||||
.strip_prefix(&resolved_root)
|
||||
.unwrap_or(resolved_path.as_path())
|
||||
.to_string_lossy()
|
||||
.replace('\\', "/")
|
||||
}
|
||||
_ => resolved_path.to_string_lossy().replace('\\', "/"),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user