introduce generic enum for app-server-protocol constructs

This commit is contained in:
Roy Han
2026-03-26 14:32:55 -07:00
parent 57194fb349
commit bb61e20675
7 changed files with 157 additions and 108 deletions

View File

@@ -1,3 +1,7 @@
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::AuthManager;
@@ -26,12 +30,6 @@ pub struct TrackEventsContext {
pub turn_id: String,
}
#[derive(Clone)]
pub struct InitializeInput {
pub connection_id: u64,
pub product_client_id: String,
}
#[derive(Clone)]
pub struct ThreadInitializeInput {
pub connection_id: u64,
@@ -83,8 +81,28 @@ pub struct AppInvocation {
pub invocation_type: Option<InvocationType>,
}
pub enum AnalyticsInput {
Initialize(InitializeInput),
pub enum AnalyticsFact {
Initialize {
connection_id: u64,
params: InitializeParams,
},
Request {
connection_id: u64,
request_id: RequestId,
request: ClientRequest,
},
Notification {
connection_id: u64,
notification: ServerNotification,
},
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.
Custom(CustomAnalyticsFact),
}
pub enum CustomAnalyticsFact {
// This remains custom on this branch because app-server-protocol does not
// yet expose a generic client response enum we can reduce over directly.
ThreadInitialized(ThreadInitializeInput),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
@@ -137,7 +155,7 @@ struct ClientState {
#[derive(Clone)]
pub(crate) struct AnalyticsEventsQueue {
sender: mpsc::Sender<AnalyticsInput>,
sender: mpsc::Sender<AnalyticsFact>,
app_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
plugin_used_emitted_keys: Arc<Mutex<HashSet<(String, String)>>>,
}
@@ -166,7 +184,7 @@ impl AnalyticsEventsQueue {
}
}
fn try_send(&self, input: AnalyticsInput) {
fn try_send(&self, input: AnalyticsFact) {
if self.sender.try_send(input).is_err() {
//TODO: add a metric for this
tracing::warn!("dropping analytics events: queue is full");
@@ -223,84 +241,91 @@ impl AnalyticsEventsClient {
if invocations.is_empty() {
return;
}
self.record(AnalyticsInput::SkillInvoked(SkillInvokedInput {
tracking,
invocations,
}));
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
SkillInvokedInput {
tracking,
invocations,
},
)));
}
pub fn track_initialize(&self, input: InitializeInput) {
self.record(AnalyticsInput::Initialize(input));
pub fn track_initialize(&self, connection_id: u64, params: InitializeParams) {
self.record_fact(AnalyticsFact::Initialize {
connection_id,
params,
});
}
pub fn track_thread_initialized(&self, input: ThreadInitializeInput) {
self.record(AnalyticsInput::ThreadInitialized(input));
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::ThreadInitialized(input),
));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
if mentions.is_empty() {
return;
}
self.record(AnalyticsInput::AppMentioned(AppMentionedInput {
tracking,
mentions,
}));
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
AppMentionedInput { tracking, mentions },
)));
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record(AnalyticsInput::AppUsed(AppUsedInput { tracking, app }));
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
AppUsedInput { tracking, app },
)));
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record(AnalyticsInput::PluginUsed(PluginUsedInput {
tracking,
plugin,
}));
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
PluginUsedInput { tracking, plugin },
)));
}
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Installed,
},
}),
));
}
pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) {
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Uninstalled,
},
}),
));
}
pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) {
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Enabled,
},
}),
));
}
pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) {
self.record(AnalyticsInput::PluginStateChanged(
PluginStateChangedInput {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
plugin,
state: PluginState::Disabled,
},
}),
));
}
pub fn record(&self, input: AnalyticsInput) {
pub fn record_fact(&self, input: AnalyticsFact) {
if self.analytics_enabled == Some(false) {
return;
}
@@ -424,37 +449,51 @@ struct CodexPluginUsedEventRequest {
}
impl AnalyticsReducer {
async fn ingest(&mut self, input: AnalyticsInput, out: &mut Vec<TrackEventRequest>) {
async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
match input {
AnalyticsInput::Initialize(input) => {
self.ingest_initialize(input);
}
AnalyticsInput::ThreadInitialized(input) => {
self.ingest_thread_initialized(input, out);
}
AnalyticsInput::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
AnalyticsInput::AppMentioned(input) => {
self.ingest_app_mentioned(input, out);
}
AnalyticsInput::AppUsed(input) => {
self.ingest_app_used(input, out);
}
AnalyticsInput::PluginUsed(input) => {
self.ingest_plugin_used(input, out);
}
AnalyticsInput::PluginStateChanged(input) => {
self.ingest_plugin_state_changed(input, out);
AnalyticsFact::Initialize {
connection_id,
params,
} => {
self.ingest_initialize(connection_id, params);
}
AnalyticsFact::Request {
connection_id: _connection_id,
request_id: _request_id,
request: _request,
} => {}
AnalyticsFact::Notification {
connection_id: _connection_id,
notification: _notification,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::ThreadInitialized(input) => {
self.ingest_thread_initialized(input, out);
}
CustomAnalyticsFact::SkillInvoked(input) => {
self.ingest_skill_invoked(input, out).await;
}
CustomAnalyticsFact::AppMentioned(input) => {
self.ingest_app_mentioned(input, out);
}
CustomAnalyticsFact::AppUsed(input) => {
self.ingest_app_used(input, out);
}
CustomAnalyticsFact::PluginUsed(input) => {
self.ingest_plugin_used(input, out);
}
CustomAnalyticsFact::PluginStateChanged(input) => {
self.ingest_plugin_state_changed(input, out);
}
},
}
}
fn ingest_initialize(&mut self, input: InitializeInput) {
fn ingest_initialize(&mut self, connection_id: u64, params: InitializeParams) {
self.clients.insert(
input.connection_id,
connection_id,
ClientState {
product_client_id: input.product_client_id,
product_client_id: params.client_info.name,
},
);
}

View File

@@ -1,13 +1,13 @@
use super::AnalyticsEventsQueue;
use super::AnalyticsInput;
use super::AnalyticsFact;
use super::AnalyticsReducer;
use super::AppInvocation;
use super::CodexAppMentionedEventRequest;
use super::CodexAppUsedEventRequest;
use super::CodexPluginEventRequest;
use super::CodexPluginUsedEventRequest;
use super::CustomAnalyticsFact;
use super::InitializationMode;
use super::InitializeInput;
use super::InvocationType;
use super::ThreadInitializeInput;
use super::TrackEventRequest;
@@ -17,6 +17,8 @@ use super::codex_plugin_metadata;
use super::codex_plugin_used_metadata;
use super::codex_thread_initialized_event_request;
use super::normalize_path_for_skill_id;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeParams;
use codex_login::default_client::originator;
use codex_plugin::AppConnectorId;
use codex_plugin::PluginCapabilitySummary;
@@ -197,7 +199,7 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() {
#[test]
fn thread_initialized_event_serializes_expected_shape() {
let event = TrackEventRequest::CodexThreadInitialized(codex_thread_initialized_event_request(
originator().value,
"codex-tui".to_string(),
ThreadInitializeInput {
connection_id: 1,
thread_id: "thread-0".to_string(),
@@ -216,7 +218,7 @@ fn thread_initialized_event_serializes_expected_shape() {
"event_type": "codex_thread_initialized",
"event_params": {
"thread_id": "thread-0",
"product_client_id": originator().value,
"product_client_id": "codex-tui",
"model": "gpt-5",
"ephemeral": true,
"session_source": "user",
@@ -233,7 +235,7 @@ fn thread_initialized_event_serializes_expected_shape() {
#[test]
fn thread_initialized_event_serializes_subagent_source() {
let event = TrackEventRequest::CodexThreadInitialized(codex_thread_initialized_event_request(
originator().value,
"codex-tui".to_string(),
ThreadInitializeInput {
connection_id: 1,
thread_id: "thread-1".to_string(),
@@ -257,14 +259,16 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsInput::ThreadInitialized(ThreadInitializeInput {
connection_id: 7,
thread_id: "thread-no-client".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
session_source: SessionSource::Exec,
initialization_mode: InitializationMode::New,
}),
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadInitialized(
ThreadInitializeInput {
connection_id: 7,
thread_id: "thread-no-client".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
session_source: SessionSource::Exec,
initialization_mode: InitializationMode::New,
},
)),
&mut events,
)
.await;
@@ -272,10 +276,17 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsInput::Initialize(InitializeInput {
AnalyticsFact::Initialize {
connection_id: 7,
product_client_id: "codex-app-server-tests".to_string(),
}),
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
},
&mut events,
)
.await;
@@ -283,23 +294,25 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsInput::ThreadInitialized(ThreadInitializeInput {
connection_id: 7,
thread_id: "thread-1".to_string(),
model: "gpt-5".to_string(),
ephemeral: true,
session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: codex_protocol::ThreadId::from_string(
"11111111-1111-1111-1111-111111111111",
)
.expect("valid thread id"),
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
initialization_mode: InitializationMode::Resumed,
}),
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadInitialized(
ThreadInitializeInput {
connection_id: 7,
thread_id: "thread-1".to_string(),
model: "gpt-5".to_string(),
ephemeral: true,
session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: codex_protocol::ThreadId::from_string(
"11111111-1111-1111-1111-111111111111",
)
.expect("valid thread id"),
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
initialization_mode: InitializationMode::Resumed,
},
)),
&mut events,
)
.await;
@@ -307,10 +320,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_thread_initialized");
assert_eq!(
payload[0]["event_params"]["product_client_id"],
"codex-app-server-tests"
);
assert_eq!(payload[0]["event_params"]["product_client_id"], "codex-tui");
assert_eq!(payload[0]["event_params"]["initialization_mode"], "resumed");
assert_eq!(payload[0]["event_params"]["session_source"], "subagent");
assert_eq!(

View File

@@ -1,13 +1,13 @@
mod analytics_client;
pub use analytics_client::AnalyticsEventsClient;
pub use analytics_client::AnalyticsInput;
pub use analytics_client::AnalyticsFact;
pub use analytics_client::AnalyticsReducer;
pub use analytics_client::AppInvocation;
pub use analytics_client::AppMentionedInput;
pub use analytics_client::AppUsedInput;
pub use analytics_client::CustomAnalyticsFact;
pub use analytics_client::InitializationMode;
pub use analytics_client::InitializeInput;
pub use analytics_client::InvocationType;
pub use analytics_client::PluginState;
pub use analytics_client::PluginStateChangedInput;