mirror of
https://github.com/openai/codex.git
synced 2026-04-14 11:31:42 +03:00
Compare commits
3 Commits
dev/cc/pro
...
pr17090
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94b0702e14 | ||
|
|
fa813617b5 | ||
|
|
c852a44669 |
@@ -3,12 +3,19 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCommandExecutionEventParams;
|
||||
use crate::events::CodexCommandExecutionEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexToolItemEventBase;
|
||||
use crate::events::CommandExecutionFamily;
|
||||
use crate::events::CommandExecutionSourceKind;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::ToolItemFinalApprovalOutcome;
|
||||
use crate::events::ToolItemTerminalStatus;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
@@ -34,12 +41,18 @@ use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
|
||||
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::CommandExecutionSource;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
@@ -114,6 +127,50 @@ fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str)
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_initialize_fact(connection_id: u64) -> AnalyticsFact {
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: Some(InitializeCapabilities {
|
||||
experimental_api: false,
|
||||
opt_out_notification_methods: None,
|
||||
}),
|
||||
},
|
||||
product_client_id: DEFAULT_ORIGINATOR.to_string(),
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "linux".to_string(),
|
||||
runtime_os_version: "24.04".to_string(),
|
||||
runtime_arch: "x86_64".to_string(),
|
||||
},
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_command_execution_item(
|
||||
status: CommandExecutionStatus,
|
||||
exit_code: Option<i32>,
|
||||
duration_ms: Option<i64>,
|
||||
) -> ThreadItem {
|
||||
ThreadItem::CommandExecution {
|
||||
id: "item-1".to_string(),
|
||||
command: "echo hi".to_string(),
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
process_id: Some("pid-1".to_string()),
|
||||
source: CommandExecutionSource::Agent,
|
||||
status,
|
||||
command_actions: Vec::new(),
|
||||
aggregated_output: None,
|
||||
exit_code,
|
||||
duration_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn expected_absolute_path(path: &PathBuf) -> String {
|
||||
std::fs::canonicalize(path)
|
||||
.unwrap_or_else(|_| path.to_path_buf())
|
||||
@@ -346,6 +403,155 @@ fn thread_initialized_event_serializes_expected_shape() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_execution_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
|
||||
event_type: "codex_command_execution_event",
|
||||
event_params: CodexCommandExecutionEventParams {
|
||||
base: CodexToolItemEventBase {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
app_server_client: CodexAppServerClientMetadata {
|
||||
product_client_id: "codex_tui".to_string(),
|
||||
client_name: Some("codex-tui".to_string()),
|
||||
client_version: Some("1.2.3".to_string()),
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
experimental_api_enabled: Some(true),
|
||||
},
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
thread_source: Some("user"),
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
tool_name: "shell".to_string(),
|
||||
started_at: 123,
|
||||
completed_at: Some(125),
|
||||
duration_ms: Some(2000),
|
||||
execution_started: true,
|
||||
review_count: 0,
|
||||
guardian_review_count: 0,
|
||||
user_review_count: 0,
|
||||
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
|
||||
terminal_status: ToolItemTerminalStatus::Completed,
|
||||
failure_kind: None,
|
||||
requested_additional_permissions: false,
|
||||
requested_network_access: false,
|
||||
retry_count: 0,
|
||||
},
|
||||
command_execution_source: CommandExecutionSourceKind::Agent,
|
||||
command_execution_family: CommandExecutionFamily::Shell,
|
||||
process_id: Some("pid-1".to_string()),
|
||||
exit_code: Some(0),
|
||||
command_action_count: 1,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize command execution event");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_command_execution_event",
|
||||
"event_params": {
|
||||
"thread_id": "thread-1",
|
||||
"turn_id": "turn-1",
|
||||
"item_id": "item-1",
|
||||
"app_server_client": {
|
||||
"product_client_id": "codex_tui",
|
||||
"client_name": "codex-tui",
|
||||
"client_version": "1.2.3",
|
||||
"rpc_transport": "websocket",
|
||||
"experimental_api_enabled": true
|
||||
},
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.99.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"thread_source": "user",
|
||||
"subagent_source": null,
|
||||
"parent_thread_id": null,
|
||||
"tool_name": "shell",
|
||||
"started_at": 123,
|
||||
"completed_at": 125,
|
||||
"duration_ms": 2000,
|
||||
"execution_started": true,
|
||||
"review_count": 0,
|
||||
"guardian_review_count": 0,
|
||||
"user_review_count": 0,
|
||||
"final_approval_outcome": "not_needed",
|
||||
"terminal_status": "completed",
|
||||
"failure_kind": null,
|
||||
"requested_additional_permissions": false,
|
||||
"requested_network_access": false,
|
||||
"retry_count": 0,
|
||||
"command_execution_source": "agent",
|
||||
"command_execution_family": "shell",
|
||||
"process_id": "pid-1",
|
||||
"exit_code": 0,
|
||||
"command_action_count": 1
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_execution_event_allows_null_thread_denormalization() {
|
||||
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
|
||||
event_type: "codex_command_execution_event",
|
||||
event_params: CodexCommandExecutionEventParams {
|
||||
base: CodexToolItemEventBase {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
app_server_client: CodexAppServerClientMetadata {
|
||||
product_client_id: "codex_tui".to_string(),
|
||||
client_name: Some("codex-tui".to_string()),
|
||||
client_version: Some("1.2.3".to_string()),
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
experimental_api_enabled: Some(true),
|
||||
},
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
thread_source: None,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
tool_name: "shell".to_string(),
|
||||
started_at: 123,
|
||||
completed_at: Some(125),
|
||||
duration_ms: Some(2000),
|
||||
execution_started: true,
|
||||
review_count: 0,
|
||||
guardian_review_count: 0,
|
||||
user_review_count: 0,
|
||||
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
|
||||
terminal_status: ToolItemTerminalStatus::Completed,
|
||||
failure_kind: None,
|
||||
requested_additional_permissions: false,
|
||||
requested_network_access: false,
|
||||
retry_count: 0,
|
||||
},
|
||||
command_execution_source: CommandExecutionSourceKind::Agent,
|
||||
command_execution_family: CommandExecutionFamily::Shell,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
command_action_count: 0,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize command execution event");
|
||||
assert_eq!(payload["event_params"]["thread_source"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -353,7 +559,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-no-client",
|
||||
@@ -397,7 +603,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
@@ -449,6 +655,87 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn item_lifecycle_notifications_publish_command_execution_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(sample_initialize_fact(/*connection_id*/ 7), &mut events)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification {
|
||||
connection_id: 7,
|
||||
notification: Box::new(ServerNotification::ItemStarted(ItemStartedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: sample_command_execution_item(
|
||||
CommandExecutionStatus::InProgress,
|
||||
/*exit_code*/ None,
|
||||
/*duration_ms*/ None,
|
||||
),
|
||||
})),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
events.is_empty(),
|
||||
"tool item event should emit on completion"
|
||||
);
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification {
|
||||
connection_id: 7,
|
||||
notification: Box::new(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item: sample_command_execution_item(
|
||||
CommandExecutionStatus::Completed,
|
||||
Some(0),
|
||||
Some(42),
|
||||
),
|
||||
},
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload.as_array().expect("events array").len(), 1);
|
||||
assert_eq!(payload[0]["event_type"], "codex_command_execution_event");
|
||||
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
|
||||
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1");
|
||||
assert_eq!(payload[0]["event_params"]["item_id"], "item-1");
|
||||
assert_eq!(payload[0]["event_params"]["tool_name"], "shell");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["command_execution_source"],
|
||||
"agent"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["command_execution_family"],
|
||||
"shell"
|
||||
);
|
||||
assert_eq!(payload[0]["event_params"]["terminal_status"], "completed");
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["failure_kind"],
|
||||
serde_json::Value::Null
|
||||
);
|
||||
assert_eq!(payload[0]["event_params"]["exit_code"], 0);
|
||||
assert_eq!(payload[0]["event_params"]["process_id"], "pid-1");
|
||||
assert_eq!(payload[0]["event_params"]["duration_ms"], 42);
|
||||
assert_eq!(payload[0]["event_params"]["execution_started"], true);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["app_server_client"]["client_name"],
|
||||
"codex-tui"
|
||||
);
|
||||
assert_eq!(payload[0]["event_params"]["thread_source"], json!(null));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subagent_thread_started_review_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
|
||||
|
||||
@@ -16,6 +16,9 @@ use crate::facts::TrackEventsContext;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
@@ -38,8 +41,7 @@ pub(crate) struct AnalyticsEventsQueue {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsEventsClient {
|
||||
queue: AnalyticsEventsQueue,
|
||||
analytics_enabled: Option<bool>,
|
||||
queue: Option<AnalyticsEventsQueue>,
|
||||
}
|
||||
|
||||
impl AnalyticsEventsQueue {
|
||||
@@ -108,11 +110,15 @@ impl AnalyticsEventsClient {
|
||||
analytics_enabled: Option<bool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
|
||||
analytics_enabled,
|
||||
queue: (analytics_enabled != Some(false))
|
||||
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disabled() -> Self {
|
||||
Self { queue: None }
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
@@ -161,7 +167,10 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
|
||||
if !self.queue.should_enqueue_app_used(&tracking, &app) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_app_used(&tracking, &app) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
@@ -170,7 +179,10 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
|
||||
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
let Some(queue) = self.queue.as_ref() else {
|
||||
return;
|
||||
};
|
||||
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
@@ -215,18 +227,37 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
|
||||
if self.analytics_enabled == Some(false) {
|
||||
return;
|
||||
if let Some(queue) = self.queue.as_ref() {
|
||||
queue.try_send(input);
|
||||
}
|
||||
self.queue.try_send(input);
|
||||
}
|
||||
|
||||
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
|
||||
self.record_fact(AnalyticsFact::Response {
|
||||
self.record_fact(AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
|
||||
self.record_fact(AnalyticsFact::ServerRequest {
|
||||
connection_id,
|
||||
request: Box::new(request),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_response(&self, response: ServerResponse) {
|
||||
self.record_fact(AnalyticsFact::ServerResponse {
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_notification(&self, connection_id: u64, notification: ServerNotification) {
|
||||
self.record_fact(AnalyticsFact::Notification {
|
||||
connection_id,
|
||||
notification: Box::new(notification),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
|
||||
@@ -37,6 +37,13 @@ pub(crate) enum TrackEventRequest {
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
CommandExecution(CodexCommandExecutionEventRequest),
|
||||
FileChange(CodexFileChangeEventRequest),
|
||||
McpToolCall(CodexMcpToolCallEventRequest),
|
||||
DynamicToolCall(CodexDynamicToolCallEventRequest),
|
||||
CollabAgentToolCall(CodexCollabAgentToolCallEventRequest),
|
||||
WebSearch(CodexWebSearchEventRequest),
|
||||
ImageGeneration(CodexImageGenerationEventRequest),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
@@ -99,6 +106,224 @@ pub(crate) struct ThreadInitializedEvent {
|
||||
pub(crate) event_params: ThreadInitializedEventParams,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolItemFinalApprovalOutcome {
|
||||
NotNeeded,
|
||||
ConfigAllowed,
|
||||
PolicyForbidden,
|
||||
GuardianApproved,
|
||||
GuardianDenied,
|
||||
GuardianAborted,
|
||||
UserApproved,
|
||||
UserApprovedForSession,
|
||||
UserDenied,
|
||||
UserAborted,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolItemTerminalStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Rejected,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolItemFailureKind {
|
||||
ToolError,
|
||||
ApprovalDenied,
|
||||
ApprovalAborted,
|
||||
SandboxDenied,
|
||||
PolicyForbidden,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexToolItemEventBase {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
/// App-server ThreadItem.id. For tool-originated items this generally
|
||||
/// corresponds to the originating core call_id.
|
||||
pub(crate) item_id: String,
|
||||
pub(crate) app_server_client: CodexAppServerClientMetadata,
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) thread_source: Option<&'static str>,
|
||||
pub(crate) subagent_source: Option<String>,
|
||||
pub(crate) parent_thread_id: Option<String>,
|
||||
pub(crate) tool_name: String,
|
||||
pub(crate) started_at: u64,
|
||||
pub(crate) completed_at: Option<u64>,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
pub(crate) execution_started: bool,
|
||||
pub(crate) review_count: u64,
|
||||
pub(crate) guardian_review_count: u64,
|
||||
pub(crate) user_review_count: u64,
|
||||
pub(crate) final_approval_outcome: ToolItemFinalApprovalOutcome,
|
||||
pub(crate) terminal_status: ToolItemTerminalStatus,
|
||||
pub(crate) failure_kind: Option<ToolItemFailureKind>,
|
||||
pub(crate) requested_additional_permissions: bool,
|
||||
pub(crate) requested_network_access: bool,
|
||||
pub(crate) retry_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum CommandExecutionFamily {
|
||||
Shell,
|
||||
UserShell,
|
||||
UnifiedExec,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum CommandExecutionSourceKind {
|
||||
Agent,
|
||||
UserShell,
|
||||
UnifiedExecStartup,
|
||||
UnifiedExecInteraction,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum CollabAgentToolKind {
|
||||
SpawnAgent,
|
||||
SendInput,
|
||||
ResumeAgent,
|
||||
Wait,
|
||||
CloseAgent,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum WebSearchActionKind {
|
||||
Search,
|
||||
OpenPage,
|
||||
FindInPage,
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCommandExecutionEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) command_execution_source: CommandExecutionSourceKind,
|
||||
pub(crate) command_execution_family: CommandExecutionFamily,
|
||||
pub(crate) process_id: Option<String>,
|
||||
pub(crate) exit_code: Option<i32>,
|
||||
pub(crate) command_action_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCommandExecutionEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexCommandExecutionEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexFileChangeEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) file_change_count: u64,
|
||||
pub(crate) file_add_count: u64,
|
||||
pub(crate) file_update_count: u64,
|
||||
pub(crate) file_delete_count: u64,
|
||||
pub(crate) file_move_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexFileChangeEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexFileChangeEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexMcpToolCallEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) mcp_server_name: String,
|
||||
pub(crate) mcp_tool_name: String,
|
||||
pub(crate) mcp_error_present: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexMcpToolCallEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexMcpToolCallEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexDynamicToolCallEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) dynamic_tool_name: String,
|
||||
pub(crate) success: Option<bool>,
|
||||
pub(crate) output_content_item_count: Option<u64>,
|
||||
pub(crate) output_text_item_count: Option<u64>,
|
||||
pub(crate) output_image_item_count: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexDynamicToolCallEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexDynamicToolCallEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCollabAgentToolCallEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) collab_agent_tool: CollabAgentToolKind,
|
||||
pub(crate) sender_thread_id: String,
|
||||
pub(crate) receiver_thread_count: u64,
|
||||
pub(crate) receiver_thread_ids: Vec<String>,
|
||||
pub(crate) requested_model: Option<String>,
|
||||
pub(crate) requested_reasoning_effort: Option<String>,
|
||||
pub(crate) agent_state_count: u64,
|
||||
pub(crate) completed_agent_count: u64,
|
||||
pub(crate) failed_agent_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexCollabAgentToolCallEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexCollabAgentToolCallEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexWebSearchEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) web_search_action: Option<WebSearchActionKind>,
|
||||
pub(crate) query_present: bool,
|
||||
pub(crate) query_count: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexWebSearchEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexWebSearchEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexImageGenerationEventParams {
|
||||
#[serde(flatten)]
|
||||
pub(crate) base: CodexToolItemEventBase,
|
||||
pub(crate) image_generation_status: String,
|
||||
pub(crate) revised_prompt_present: bool,
|
||||
pub(crate) saved_path_present: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexImageGenerationEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexImageGenerationEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppMetadata {
|
||||
pub(crate) connector_id: Option<String>,
|
||||
|
||||
@@ -5,6 +5,8 @@ use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
@@ -72,16 +74,26 @@ pub(crate) enum AnalyticsFact {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
},
|
||||
Request {
|
||||
ClientRequest {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
Response {
|
||||
ClientResponse {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
ServerRequest {
|
||||
connection_id: u64,
|
||||
request: Box<ServerRequest>,
|
||||
},
|
||||
ServerResponse {
|
||||
response: Box<ServerResponse>,
|
||||
},
|
||||
Notification {
|
||||
connection_id: u64,
|
||||
notification: Box<ServerNotification>,
|
||||
},
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
Custom(CustomAnalyticsFact),
|
||||
|
||||
@@ -2,15 +2,37 @@ use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCollabAgentToolCallEventParams;
|
||||
use crate::events::CodexCollabAgentToolCallEventRequest;
|
||||
use crate::events::CodexCommandExecutionEventParams;
|
||||
use crate::events::CodexCommandExecutionEventRequest;
|
||||
use crate::events::CodexDynamicToolCallEventParams;
|
||||
use crate::events::CodexDynamicToolCallEventRequest;
|
||||
use crate::events::CodexFileChangeEventParams;
|
||||
use crate::events::CodexFileChangeEventRequest;
|
||||
use crate::events::CodexImageGenerationEventParams;
|
||||
use crate::events::CodexImageGenerationEventRequest;
|
||||
use crate::events::CodexMcpToolCallEventParams;
|
||||
use crate::events::CodexMcpToolCallEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexToolItemEventBase;
|
||||
use crate::events::CodexWebSearchEventParams;
|
||||
use crate::events::CodexWebSearchEventRequest;
|
||||
use crate::events::CollabAgentToolKind;
|
||||
use crate::events::CommandExecutionFamily;
|
||||
use crate::events::CommandExecutionSourceKind;
|
||||
use crate::events::SkillInvocationEventParams;
|
||||
use crate::events::SkillInvocationEventRequest;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::ToolItemFailureKind;
|
||||
use crate::events::ToolItemFinalApprovalOutcome;
|
||||
use crate::events::ToolItemTerminalStatus;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::WebSearchActionKind;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
use crate::events::codex_plugin_used_metadata;
|
||||
@@ -27,7 +49,20 @@ use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::CollabAgentStatus;
|
||||
use codex_app_server_protocol::CollabAgentTool;
|
||||
use codex_app_server_protocol::CollabAgentToolCallStatus;
|
||||
use codex_app_server_protocol::CommandExecutionSource;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallStatus;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::McpToolCallStatus;
|
||||
use codex_app_server_protocol::PatchApplyStatus;
|
||||
use codex_app_server_protocol::PatchChangeKind;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::WebSearchAction;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::default_client::originator;
|
||||
@@ -36,10 +71,14 @@ use codex_protocol::protocol::SkillScope;
|
||||
use sha1::Digest;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AnalyticsReducer {
|
||||
connections: HashMap<u64, ConnectionState>,
|
||||
threads: HashMap<String, ThreadMetadataState>,
|
||||
tool_items: HashMap<String, ToolItemState>,
|
||||
}
|
||||
|
||||
struct ConnectionState {
|
||||
@@ -47,6 +86,18 @@ struct ConnectionState {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct ThreadMetadataState {
|
||||
thread_source: Option<&'static str>,
|
||||
subagent_source: Option<String>,
|
||||
parent_thread_id: Option<String>,
|
||||
}
|
||||
|
||||
struct ToolItemState {
|
||||
connection_id: u64,
|
||||
started_at: u64,
|
||||
}
|
||||
|
||||
impl AnalyticsReducer {
|
||||
pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec<TrackEventRequest>) {
|
||||
match input {
|
||||
@@ -65,18 +116,30 @@ impl AnalyticsReducer {
|
||||
rpc_transport,
|
||||
);
|
||||
}
|
||||
AnalyticsFact::Request {
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::Response {
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id,
|
||||
response,
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::ServerRequest {
|
||||
connection_id: _connection_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::ServerResponse {
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Notification {
|
||||
connection_id,
|
||||
notification,
|
||||
} => {
|
||||
self.ingest_notification(connection_id, *notification, out);
|
||||
}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
@@ -130,9 +193,16 @@ impl AnalyticsReducer {
|
||||
input: SubAgentThreadStartedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
subagent_thread_started_event_request(input),
|
||||
));
|
||||
let event = subagent_thread_started_event_request(input);
|
||||
self.threads.insert(
|
||||
event.event_params.thread_id.clone(),
|
||||
ThreadMetadataState {
|
||||
thread_source: event.event_params.thread_source,
|
||||
subagent_source: event.event_params.subagent_source.clone(),
|
||||
parent_thread_id: event.event_params.parent_thread_id.clone(),
|
||||
},
|
||||
);
|
||||
out.push(TrackEventRequest::ThreadInitialized(event));
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
@@ -257,16 +327,26 @@ impl AnalyticsReducer {
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
let thread_id = thread.id.clone();
|
||||
let thread_source = thread_source_name(&thread_source);
|
||||
self.threads.insert(
|
||||
thread_id.clone(),
|
||||
ThreadMetadataState {
|
||||
thread_source,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
},
|
||||
);
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
event_params: ThreadInitializedEventParams {
|
||||
thread_id: thread.id,
|
||||
thread_id,
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
model,
|
||||
ephemeral: thread.ephemeral,
|
||||
thread_source: thread_source_name(&thread_source),
|
||||
thread_source,
|
||||
initialization_mode,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
@@ -275,6 +355,662 @@ impl AnalyticsReducer {
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_notification(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
notification: ServerNotification,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
match notification {
|
||||
ServerNotification::ItemStarted(notification) => {
|
||||
if let Some(item_id) = tool_item_id(¬ification.item) {
|
||||
self.tool_items
|
||||
.entry(item_id.to_string())
|
||||
.or_insert_with(|| ToolItemState {
|
||||
connection_id,
|
||||
started_at: now_unix_secs(),
|
||||
});
|
||||
}
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) => {
|
||||
let Some(item_id) = tool_item_id(¬ification.item) else {
|
||||
return;
|
||||
};
|
||||
let Some(started) = self.tool_items.remove(item_id) else {
|
||||
return;
|
||||
};
|
||||
let Some(connection_state) = self.connections.get(&started.connection_id) else {
|
||||
return;
|
||||
};
|
||||
let completed_at = now_unix_secs();
|
||||
if let Some(event) = tool_item_event(
|
||||
¬ification.thread_id,
|
||||
¬ification.turn_id,
|
||||
¬ification.item,
|
||||
started.started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
self.threads.get(¬ification.thread_id),
|
||||
) {
|
||||
out.push(event);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn now_unix_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_secs())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn tool_item_id(item: &ThreadItem) -> Option<&str> {
|
||||
match item {
|
||||
ThreadItem::CommandExecution { id, .. }
|
||||
| ThreadItem::FileChange { id, .. }
|
||||
| ThreadItem::McpToolCall { id, .. }
|
||||
| ThreadItem::DynamicToolCall { id, .. }
|
||||
| ThreadItem::CollabAgentToolCall { id, .. }
|
||||
| ThreadItem::WebSearch { id, .. }
|
||||
| ThreadItem::ImageGeneration { id, .. } => Some(id),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn tool_item_event(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
item: &ThreadItem,
|
||||
started_at: u64,
|
||||
completed_at: u64,
|
||||
connection_state: &ConnectionState,
|
||||
thread_metadata: Option<&ThreadMetadataState>,
|
||||
) -> Option<TrackEventRequest> {
|
||||
match item {
|
||||
ThreadItem::CommandExecution {
|
||||
id,
|
||||
process_id,
|
||||
source,
|
||||
status,
|
||||
command_actions,
|
||||
exit_code,
|
||||
duration_ms,
|
||||
..
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = command_execution_outcome(status)?;
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
command_execution_tool_name(*source).to_string(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(
|
||||
option_i64_to_u64(*duration_ms),
|
||||
started_at,
|
||||
completed_at,
|
||||
),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::CommandExecution(
|
||||
CodexCommandExecutionEventRequest {
|
||||
event_type: "codex_command_execution_event",
|
||||
event_params: CodexCommandExecutionEventParams {
|
||||
base,
|
||||
command_execution_source: command_execution_source_kind(*source),
|
||||
command_execution_family: command_execution_family(*source),
|
||||
process_id: process_id.clone(),
|
||||
exit_code: *exit_code,
|
||||
command_action_count: usize_to_u64(command_actions.len()),
|
||||
},
|
||||
},
|
||||
))
|
||||
}
|
||||
ThreadItem::FileChange {
|
||||
id,
|
||||
changes,
|
||||
status,
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = patch_apply_outcome(status)?;
|
||||
let counts = file_change_counts(changes);
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
"apply_patch".to_string(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(None, started_at, completed_at),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::FileChange(CodexFileChangeEventRequest {
|
||||
event_type: "codex_file_change_event",
|
||||
event_params: CodexFileChangeEventParams {
|
||||
base,
|
||||
file_change_count: usize_to_u64(changes.len()),
|
||||
file_add_count: counts.add,
|
||||
file_update_count: counts.update,
|
||||
file_delete_count: counts.delete,
|
||||
file_move_count: counts.move_,
|
||||
},
|
||||
}))
|
||||
}
|
||||
ThreadItem::McpToolCall {
|
||||
id,
|
||||
server,
|
||||
tool,
|
||||
status,
|
||||
error,
|
||||
duration_ms,
|
||||
..
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = mcp_tool_call_outcome(status)?;
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
tool.clone(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(
|
||||
option_i64_to_u64(*duration_ms),
|
||||
started_at,
|
||||
completed_at,
|
||||
),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::McpToolCall(
|
||||
CodexMcpToolCallEventRequest {
|
||||
event_type: "codex_mcp_tool_call_event",
|
||||
event_params: CodexMcpToolCallEventParams {
|
||||
base,
|
||||
mcp_server_name: server.clone(),
|
||||
mcp_tool_name: tool.clone(),
|
||||
mcp_error_present: error.is_some(),
|
||||
},
|
||||
},
|
||||
))
|
||||
}
|
||||
ThreadItem::DynamicToolCall {
|
||||
id,
|
||||
tool,
|
||||
status,
|
||||
content_items,
|
||||
success,
|
||||
duration_ms,
|
||||
..
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = dynamic_tool_call_outcome(status)?;
|
||||
let counts = content_items
|
||||
.as_ref()
|
||||
.map(|items| dynamic_content_counts(items));
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
tool.clone(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(
|
||||
option_i64_to_u64(*duration_ms),
|
||||
started_at,
|
||||
completed_at,
|
||||
),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::DynamicToolCall(
|
||||
CodexDynamicToolCallEventRequest {
|
||||
event_type: "codex_dynamic_tool_call_event",
|
||||
event_params: CodexDynamicToolCallEventParams {
|
||||
base,
|
||||
dynamic_tool_name: tool.clone(),
|
||||
success: *success,
|
||||
output_content_item_count: counts.map(|counts| counts.total),
|
||||
output_text_item_count: counts.map(|counts| counts.text),
|
||||
output_image_item_count: counts.map(|counts| counts.image),
|
||||
},
|
||||
},
|
||||
))
|
||||
}
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id,
|
||||
tool,
|
||||
status,
|
||||
sender_thread_id,
|
||||
receiver_thread_ids,
|
||||
model,
|
||||
reasoning_effort,
|
||||
agents_states,
|
||||
..
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = collab_tool_call_outcome(status)?;
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
collab_agent_tool_name(tool).to_string(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(None, started_at, completed_at),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::CollabAgentToolCall(
|
||||
CodexCollabAgentToolCallEventRequest {
|
||||
event_type: "codex_collab_agent_tool_call_event",
|
||||
event_params: CodexCollabAgentToolCallEventParams {
|
||||
base,
|
||||
collab_agent_tool: collab_agent_tool_kind(tool),
|
||||
sender_thread_id: sender_thread_id.clone(),
|
||||
receiver_thread_count: usize_to_u64(receiver_thread_ids.len()),
|
||||
receiver_thread_ids: receiver_thread_ids.clone(),
|
||||
requested_model: model.clone(),
|
||||
requested_reasoning_effort: reasoning_effort
|
||||
.as_ref()
|
||||
.and_then(serialize_enum_as_string),
|
||||
agent_state_count: usize_to_u64(agents_states.len()),
|
||||
completed_agent_count: usize_to_u64(
|
||||
agents_states
|
||||
.values()
|
||||
.filter(|state| state.status == CollabAgentStatus::Completed)
|
||||
.count(),
|
||||
),
|
||||
failed_agent_count: usize_to_u64(
|
||||
agents_states
|
||||
.values()
|
||||
.filter(|state| {
|
||||
matches!(
|
||||
state.status,
|
||||
CollabAgentStatus::Errored
|
||||
| CollabAgentStatus::Shutdown
|
||||
| CollabAgentStatus::NotFound
|
||||
)
|
||||
})
|
||||
.count(),
|
||||
),
|
||||
},
|
||||
},
|
||||
))
|
||||
}
|
||||
ThreadItem::WebSearch { id, query, action } => {
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
"web_search".to_string(),
|
||||
ToolItemOutcome {
|
||||
terminal_status: ToolItemTerminalStatus::Completed,
|
||||
failure_kind: None,
|
||||
duration_ms: completed_duration_ms(None, started_at, completed_at),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::WebSearch(CodexWebSearchEventRequest {
|
||||
event_type: "codex_web_search_event",
|
||||
event_params: CodexWebSearchEventParams {
|
||||
base,
|
||||
web_search_action: action.as_ref().map(web_search_action_kind),
|
||||
query_present: !query.trim().is_empty(),
|
||||
query_count: web_search_query_count(query, action.as_ref()),
|
||||
},
|
||||
}))
|
||||
}
|
||||
ThreadItem::ImageGeneration {
|
||||
id,
|
||||
status,
|
||||
revised_prompt,
|
||||
saved_path,
|
||||
..
|
||||
} => {
|
||||
let (terminal_status, failure_kind) = image_generation_outcome(status.as_str());
|
||||
let base = tool_item_base(
|
||||
thread_id,
|
||||
turn_id,
|
||||
id.clone(),
|
||||
"image_generation".to_string(),
|
||||
ToolItemOutcome {
|
||||
terminal_status,
|
||||
failure_kind,
|
||||
duration_ms: completed_duration_ms(None, started_at, completed_at),
|
||||
},
|
||||
ToolItemContext {
|
||||
started_at,
|
||||
completed_at,
|
||||
connection_state,
|
||||
thread_metadata,
|
||||
},
|
||||
);
|
||||
Some(TrackEventRequest::ImageGeneration(
|
||||
CodexImageGenerationEventRequest {
|
||||
event_type: "codex_image_generation_event",
|
||||
event_params: CodexImageGenerationEventParams {
|
||||
base,
|
||||
image_generation_status: status.clone(),
|
||||
revised_prompt_present: revised_prompt.is_some(),
|
||||
saved_path_present: saved_path.is_some(),
|
||||
},
|
||||
},
|
||||
))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
struct ToolItemOutcome {
|
||||
terminal_status: ToolItemTerminalStatus,
|
||||
failure_kind: Option<ToolItemFailureKind>,
|
||||
duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
struct ToolItemContext<'a> {
|
||||
started_at: u64,
|
||||
completed_at: u64,
|
||||
connection_state: &'a ConnectionState,
|
||||
thread_metadata: Option<&'a ThreadMetadataState>,
|
||||
}
|
||||
|
||||
fn tool_item_base(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
item_id: String,
|
||||
tool_name: String,
|
||||
outcome: ToolItemOutcome,
|
||||
context: ToolItemContext<'_>,
|
||||
) -> CodexToolItemEventBase {
|
||||
let thread_metadata = context.thread_metadata.cloned().unwrap_or_default();
|
||||
CodexToolItemEventBase {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn_id: turn_id.to_string(),
|
||||
item_id,
|
||||
app_server_client: context.connection_state.app_server_client.clone(),
|
||||
runtime: context.connection_state.runtime.clone(),
|
||||
thread_source: thread_metadata.thread_source,
|
||||
subagent_source: thread_metadata.subagent_source,
|
||||
parent_thread_id: thread_metadata.parent_thread_id,
|
||||
tool_name,
|
||||
started_at: context.started_at,
|
||||
completed_at: Some(context.completed_at),
|
||||
duration_ms: outcome.duration_ms,
|
||||
execution_started: true,
|
||||
review_count: 0,
|
||||
guardian_review_count: 0,
|
||||
user_review_count: 0,
|
||||
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
|
||||
terminal_status: outcome.terminal_status,
|
||||
failure_kind: outcome.failure_kind,
|
||||
requested_additional_permissions: false,
|
||||
requested_network_access: false,
|
||||
retry_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn completed_duration_ms(
|
||||
item_duration_ms: Option<u64>,
|
||||
started_at: u64,
|
||||
completed_at: u64,
|
||||
) -> Option<u64> {
|
||||
item_duration_ms.or_else(|| {
|
||||
completed_at
|
||||
.checked_sub(started_at)
|
||||
.map(|duration_secs| duration_secs.saturating_mul(1000))
|
||||
})
|
||||
}
|
||||
|
||||
fn command_execution_source_kind(source: CommandExecutionSource) -> CommandExecutionSourceKind {
|
||||
match source {
|
||||
CommandExecutionSource::Agent => CommandExecutionSourceKind::Agent,
|
||||
CommandExecutionSource::UserShell => CommandExecutionSourceKind::UserShell,
|
||||
CommandExecutionSource::UnifiedExecStartup => {
|
||||
CommandExecutionSourceKind::UnifiedExecStartup
|
||||
}
|
||||
CommandExecutionSource::UnifiedExecInteraction => {
|
||||
CommandExecutionSourceKind::UnifiedExecInteraction
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn command_execution_family(source: CommandExecutionSource) -> CommandExecutionFamily {
|
||||
match source {
|
||||
CommandExecutionSource::Agent => CommandExecutionFamily::Shell,
|
||||
CommandExecutionSource::UserShell => CommandExecutionFamily::UserShell,
|
||||
CommandExecutionSource::UnifiedExecStartup
|
||||
| CommandExecutionSource::UnifiedExecInteraction => CommandExecutionFamily::UnifiedExec,
|
||||
}
|
||||
}
|
||||
|
||||
fn command_execution_tool_name(source: CommandExecutionSource) -> &'static str {
|
||||
match source {
|
||||
CommandExecutionSource::UnifiedExecStartup
|
||||
| CommandExecutionSource::UnifiedExecInteraction => "unified_exec",
|
||||
CommandExecutionSource::UserShell => "user_shell",
|
||||
CommandExecutionSource::Agent => "shell",
|
||||
}
|
||||
}
|
||||
|
||||
fn command_execution_outcome(
|
||||
status: &CommandExecutionStatus,
|
||||
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
|
||||
match status {
|
||||
CommandExecutionStatus::InProgress => None,
|
||||
CommandExecutionStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
|
||||
CommandExecutionStatus::Failed => Some((
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
)),
|
||||
CommandExecutionStatus::Declined => Some((
|
||||
ToolItemTerminalStatus::Rejected,
|
||||
Some(ToolItemFailureKind::ApprovalDenied),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn patch_apply_outcome(
|
||||
status: &PatchApplyStatus,
|
||||
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
|
||||
match status {
|
||||
PatchApplyStatus::InProgress => None,
|
||||
PatchApplyStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
|
||||
PatchApplyStatus::Failed => Some((
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
)),
|
||||
PatchApplyStatus::Declined => Some((
|
||||
ToolItemTerminalStatus::Rejected,
|
||||
Some(ToolItemFailureKind::ApprovalDenied),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn mcp_tool_call_outcome(
|
||||
status: &McpToolCallStatus,
|
||||
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
|
||||
match status {
|
||||
McpToolCallStatus::InProgress => None,
|
||||
McpToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
|
||||
McpToolCallStatus::Failed => Some((
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn dynamic_tool_call_outcome(
|
||||
status: &DynamicToolCallStatus,
|
||||
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
|
||||
match status {
|
||||
DynamicToolCallStatus::InProgress => None,
|
||||
DynamicToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
|
||||
DynamicToolCallStatus::Failed => Some((
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_tool_call_outcome(
|
||||
status: &CollabAgentToolCallStatus,
|
||||
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
|
||||
match status {
|
||||
CollabAgentToolCallStatus::InProgress => None,
|
||||
CollabAgentToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
|
||||
CollabAgentToolCallStatus::Failed => Some((
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn image_generation_outcome(status: &str) -> (ToolItemTerminalStatus, Option<ToolItemFailureKind>) {
|
||||
match status {
|
||||
"failed" | "error" => (
|
||||
ToolItemTerminalStatus::Failed,
|
||||
Some(ToolItemFailureKind::ToolError),
|
||||
),
|
||||
_ => (ToolItemTerminalStatus::Completed, None),
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_agent_tool_name(tool: &CollabAgentTool) -> &'static str {
|
||||
match tool {
|
||||
CollabAgentTool::SpawnAgent => "spawn_agent",
|
||||
CollabAgentTool::SendInput => "send_input",
|
||||
CollabAgentTool::ResumeAgent => "resume_agent",
|
||||
CollabAgentTool::Wait => "wait_agent",
|
||||
CollabAgentTool::CloseAgent => "close_agent",
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_agent_tool_kind(tool: &CollabAgentTool) -> CollabAgentToolKind {
|
||||
match tool {
|
||||
CollabAgentTool::SpawnAgent => CollabAgentToolKind::SpawnAgent,
|
||||
CollabAgentTool::SendInput => CollabAgentToolKind::SendInput,
|
||||
CollabAgentTool::ResumeAgent => CollabAgentToolKind::ResumeAgent,
|
||||
CollabAgentTool::Wait => CollabAgentToolKind::Wait,
|
||||
CollabAgentTool::CloseAgent => CollabAgentToolKind::CloseAgent,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct FileChangeCounts {
|
||||
add: u64,
|
||||
update: u64,
|
||||
delete: u64,
|
||||
move_: u64,
|
||||
}
|
||||
|
||||
fn file_change_counts(changes: &[codex_app_server_protocol::FileUpdateChange]) -> FileChangeCounts {
|
||||
let mut counts = FileChangeCounts::default();
|
||||
for change in changes {
|
||||
match &change.kind {
|
||||
PatchChangeKind::Add => counts.add += 1,
|
||||
PatchChangeKind::Delete => counts.delete += 1,
|
||||
PatchChangeKind::Update { move_path: Some(_) } => counts.move_ += 1,
|
||||
PatchChangeKind::Update { move_path: None } => counts.update += 1,
|
||||
}
|
||||
}
|
||||
counts
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct DynamicContentCounts {
|
||||
total: u64,
|
||||
text: u64,
|
||||
image: u64,
|
||||
}
|
||||
|
||||
fn dynamic_content_counts(items: &[DynamicToolCallOutputContentItem]) -> DynamicContentCounts {
|
||||
let mut text = 0;
|
||||
let mut image = 0;
|
||||
for item in items {
|
||||
match item {
|
||||
DynamicToolCallOutputContentItem::InputText { .. } => text += 1,
|
||||
DynamicToolCallOutputContentItem::InputImage { .. } => image += 1,
|
||||
}
|
||||
}
|
||||
DynamicContentCounts {
|
||||
total: usize_to_u64(items.len()),
|
||||
text,
|
||||
image,
|
||||
}
|
||||
}
|
||||
|
||||
fn web_search_action_kind(action: &WebSearchAction) -> WebSearchActionKind {
|
||||
match action {
|
||||
WebSearchAction::Search { .. } => WebSearchActionKind::Search,
|
||||
WebSearchAction::OpenPage { .. } => WebSearchActionKind::OpenPage,
|
||||
WebSearchAction::FindInPage { .. } => WebSearchActionKind::FindInPage,
|
||||
WebSearchAction::Other => WebSearchActionKind::Other,
|
||||
}
|
||||
}
|
||||
|
||||
fn web_search_query_count(query: &str, action: Option<&WebSearchAction>) -> Option<u64> {
|
||||
match action {
|
||||
Some(WebSearchAction::Search { query, queries }) => queries
|
||||
.as_ref()
|
||||
.map(|queries| usize_to_u64(queries.len()))
|
||||
.or_else(|| query.as_ref().map(|_| 1)),
|
||||
Some(WebSearchAction::OpenPage { .. })
|
||||
| Some(WebSearchAction::FindInPage { .. })
|
||||
| Some(WebSearchAction::Other) => None,
|
||||
None => (!query.trim().is_empty()).then_some(1),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_enum_as_string<T: serde::Serialize>(value: &T) -> Option<String> {
|
||||
serde_json::to_value(value)
|
||||
.ok()
|
||||
.and_then(|value| value.as_str().map(str::to_string))
|
||||
}
|
||||
|
||||
fn usize_to_u64(value: usize) -> u64 {
|
||||
u64::try_from(value).unwrap_or(u64::MAX)
|
||||
}
|
||||
|
||||
fn option_i64_to_u64(value: Option<i64>) -> Option<u64> {
|
||||
value.and_then(|value| u64::try_from(value).ok())
|
||||
}
|
||||
|
||||
pub(crate) fn skill_id_for_local_skill(
|
||||
|
||||
16
codex-rs/app-server/src/analytics_events.rs
Normal file
16
codex-rs/app-server/src/analytics_events.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_core::config::Config;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
}
|
||||
@@ -3169,7 +3169,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3238,7 +3238,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3330,7 +3330,7 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
let thread_watch_manager = ThreadWatchManager::new();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3725,7 +3725,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let event_turn_id = "complete1".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3788,7 +3788,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3835,7 +3835,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3876,7 +3876,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3930,7 +3930,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-123".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4018,7 +4018,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-456".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4089,7 +4089,7 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4342,7 +4342,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4380,7 +4380,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4406,7 +4406,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let conversation_id = ThreadId::new();
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
|
||||
@@ -9661,7 +9661,7 @@ mod tests {
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
vec![connection_id],
|
||||
|
||||
@@ -757,7 +757,7 @@ mod tests {
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(1),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(42),
|
||||
@@ -793,7 +793,7 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-99".to_string()),
|
||||
exec_request: windows_sandbox_exec_request(),
|
||||
@@ -843,7 +843,7 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-100".to_string()),
|
||||
exec_request: ExecRequest::new(
|
||||
|
||||
@@ -235,7 +235,7 @@ mod tests {
|
||||
const OUTGOING_BUFFER: usize = 1;
|
||||
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
|
||||
FsWatchManager::new_with_file_watcher(
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::error_code::OVERLOADED_ERROR_CODE;
|
||||
@@ -357,7 +358,14 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -382,12 +390,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
environment_manager: args.environment_manager,
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
@@ -63,6 +64,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod analytics_events;
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
@@ -640,14 +642,20 @@ pub async fn run_main_with_transport(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
|
||||
let loader_overrides = loader_overrides_for_config_api;
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
environment_manager,
|
||||
|
||||
@@ -185,6 +185,7 @@ pub(crate) struct ConnectionSessionState {
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) arg0_paths: Arg0DispatchPaths,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
@@ -206,6 +207,7 @@ impl MessageProcessor {
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
environment_manager,
|
||||
@@ -234,11 +236,6 @@ impl MessageProcessor {
|
||||
},
|
||||
environment_manager,
|
||||
));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::ConnectionSessionState;
|
||||
use super::MessageProcessor;
|
||||
use super::MessageProcessorArgs;
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::AppServerTransport;
|
||||
@@ -234,11 +235,14 @@ fn build_test_processor(
|
||||
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
|
||||
let processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
||||
|
||||
@@ -4,12 +4,14 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_otel::span_w3c_trace_context;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
@@ -117,6 +119,7 @@ pub(crate) struct OutgoingMessageSender {
|
||||
/// We keep them here because this is where responses, errors, and
|
||||
/// disconnect cleanup all get handled.
|
||||
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -203,15 +206,24 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
}
|
||||
|
||||
impl OutgoingMessageSender {
|
||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
) -> Self {
|
||||
Self {
|
||||
next_server_request_id: AtomicI64::new(0),
|
||||
sender,
|
||||
request_id_to_callback: Mutex::new(HashMap::new()),
|
||||
request_contexts: Mutex::new(HashMap::new()),
|
||||
analytics_events_client,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_for_tests(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
Self::new(sender, AnalyticsEventsClient::disabled())
|
||||
}
|
||||
|
||||
pub(crate) async fn register_request_context(&self, request_context: RequestContext) {
|
||||
let mut request_contexts = self.request_contexts.lock().await;
|
||||
if request_contexts
|
||||
@@ -298,7 +310,7 @@ impl OutgoingMessageSender {
|
||||
);
|
||||
}
|
||||
|
||||
let outgoing_message = OutgoingMessage::Request(request);
|
||||
let outgoing_message = OutgoingMessage::Request(request.clone());
|
||||
let send_result = match connection_ids {
|
||||
None => {
|
||||
self.sender
|
||||
@@ -321,6 +333,9 @@ impl OutgoingMessageSender {
|
||||
{
|
||||
send_error = Some(err);
|
||||
break;
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_server_request(connection_id.0, request.clone());
|
||||
}
|
||||
}
|
||||
match send_error {
|
||||
@@ -364,6 +379,10 @@ impl OutgoingMessageSender {
|
||||
|
||||
match entry {
|
||||
Some((id, entry)) => {
|
||||
if let Some(response) = server_response_from_result(&entry.request, result.clone())
|
||||
{
|
||||
self.analytics_events_client.track_server_response(response);
|
||||
}
|
||||
if let Err(err) = entry.callback.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
@@ -518,7 +537,7 @@ impl OutgoingMessageSender {
|
||||
targeted_connections = connection_ids.len(),
|
||||
"app-server event: {notification}"
|
||||
);
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
|
||||
if connection_ids.is_empty() {
|
||||
if let Err(err) = self
|
||||
.sender
|
||||
@@ -542,6 +561,9 @@ impl OutgoingMessageSender {
|
||||
.await
|
||||
{
|
||||
warn!("failed to send server notification to client: {err:?}");
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_notification(connection_id.0, notification.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -552,7 +574,7 @@ impl OutgoingMessageSender {
|
||||
notification: ServerNotification,
|
||||
) {
|
||||
tracing::trace!("app-server event: {notification}");
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
|
||||
let (write_complete_tx, write_complete_rx) = oneshot::channel();
|
||||
if let Err(err) = self
|
||||
.sender
|
||||
@@ -564,6 +586,9 @@ impl OutgoingMessageSender {
|
||||
.await
|
||||
{
|
||||
warn!("failed to send server notification to client: {err:?}");
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_notification(connection_id.0, notification);
|
||||
}
|
||||
let _ = write_complete_rx.await;
|
||||
}
|
||||
@@ -621,6 +646,14 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
fn server_response_from_result(request: &ServerRequest, result: Result) -> Option<ServerResponse> {
|
||||
let mut value = serde_json::to_value(request).ok()?;
|
||||
let object = value.as_object_mut()?;
|
||||
object.remove("params");
|
||||
object.insert("response".to_string(), result);
|
||||
serde_json::from_value(value).ok()
|
||||
}
|
||||
|
||||
/// Outgoing message from the server to the client.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(untagged)]
|
||||
@@ -654,6 +687,8 @@ mod tests {
|
||||
use codex_app_server_protocol::AccountUpdatedNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
@@ -838,10 +873,53 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_response_from_result_decodes_typed_response_with_original_method() {
|
||||
let request = ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(7),
|
||||
params: CommandExecutionRequestApprovalParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
approval_id: None,
|
||||
reason: None,
|
||||
network_approval_context: None,
|
||||
command: Some("echo hi".to_string()),
|
||||
cwd: None,
|
||||
command_actions: None,
|
||||
additional_permissions: None,
|
||||
proposed_execpolicy_amendment: None,
|
||||
proposed_network_policy_amendments: None,
|
||||
available_decisions: None,
|
||||
},
|
||||
};
|
||||
|
||||
let response = server_response_from_result(
|
||||
&request,
|
||||
json!({
|
||||
"decision": "acceptForSession",
|
||||
}),
|
||||
)
|
||||
.expect("decode typed server response");
|
||||
|
||||
let ServerResponse::CommandExecutionRequestApproval {
|
||||
request_id,
|
||||
response,
|
||||
} = response
|
||||
else {
|
||||
panic!("expected command execution approval response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(
|
||||
response.decision,
|
||||
CommandExecutionApprovalDecision::AcceptForSession
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_response_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -876,7 +954,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_clears_registered_request_context() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -901,7 +979,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_error_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -939,7 +1017,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let send_task = tokio::spawn(async move {
|
||||
outgoing
|
||||
.send_server_notification_to_connection_and_wait(
|
||||
@@ -983,7 +1061,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn connection_closed_clears_registered_request_contexts() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let closed_connection_request = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1017,7 +1095,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request(ServerRequestPayload::ApplyPatchApproval(
|
||||
@@ -1051,7 +1129,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
@@ -1108,7 +1186,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
|
||||
@@ -674,9 +674,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn status_change_emits_notification() {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
|
||||
OutgoingMessageSender::new_for_tests(outgoing_tx),
|
||||
));
|
||||
|
||||
manager
|
||||
.upsert_thread(test_thread(
|
||||
@@ -716,9 +716,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn silent_upsert_skips_initial_notification() {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
|
||||
OutgoingMessageSender::new_for_tests(outgoing_tx),
|
||||
));
|
||||
|
||||
manager
|
||||
.upsert_thread_silently(test_thread(
|
||||
|
||||
Reference in New Issue
Block a user