Compare commits

...

2 Commits

Author SHA1 Message Date
rhan-oai
6c8a79680d [codex-analytics] add tool item event schemas 2026-04-14 18:03:29 -07:00
rhan-oai
845d3dc3b5 [codex-analytics] ingest server requests and responses 2026-04-14 18:03:29 -07:00
16 changed files with 599 additions and 75 deletions

View File

@@ -3,13 +3,20 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCommandExecutionEventParams;
use crate::events::CodexCommandExecutionEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolItemEventBase;
use crate::events::CodexTurnEventRequest;
use crate::events::CommandExecutionFamily;
use crate::events::CommandExecutionSourceKind;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ToolItemFinalApprovalOutcome;
use crate::events::ToolItemTerminalStatus;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_plugin_metadata;
@@ -401,7 +408,7 @@ async fn ingest_rejected_turn_steer(
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -461,7 +468,7 @@ async fn ingest_turn_prerequisites(
ingest_initialize(reducer, out).await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
@@ -475,7 +482,7 @@ async fn ingest_turn_prerequisites(
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -485,7 +492,7 @@ async fn ingest_turn_prerequisites(
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
@@ -830,6 +837,152 @@ fn thread_initialized_event_serializes_expected_shape() {
);
}
#[test]
fn command_execution_event_serializes_expected_shape() {
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
event_type: "codex_command_execution_event",
event_params: CodexCommandExecutionEventParams {
base: CodexToolItemEventBase {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
app_server_client: CodexAppServerClientMetadata {
product_client_id: "codex_tui".to_string(),
client_name: Some("codex-tui".to_string()),
client_version: Some("1.2.3".to_string()),
rpc_transport: AppServerRpcTransport::Websocket,
experimental_api_enabled: Some(true),
},
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "macos".to_string(),
runtime_os_version: "15.3.1".to_string(),
runtime_arch: "aarch64".to_string(),
},
thread_source: Some("user"),
subagent_source: None,
parent_thread_id: None,
tool_name: "shell".to_string(),
started_at: 123,
completed_at: Some(125),
duration_ms: Some(2000),
execution_started: true,
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
terminal_status: ToolItemTerminalStatus::Completed,
failure_kind: None,
requested_additional_permissions: false,
requested_network_access: false,
retry_count: 0,
},
command_execution_source: CommandExecutionSourceKind::Agent,
command_execution_family: CommandExecutionFamily::Shell,
exit_code: Some(0),
command_action_count: Some(1),
},
});
let payload = serde_json::to_value(&event).expect("serialize command execution event");
assert_eq!(
payload,
json!({
"event_type": "codex_command_execution_event",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"item_id": "item-1",
"app_server_client": {
"product_client_id": "codex_tui",
"client_name": "codex-tui",
"client_version": "1.2.3",
"rpc_transport": "websocket",
"experimental_api_enabled": true
},
"runtime": {
"codex_rs_version": "0.99.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"thread_source": "user",
"subagent_source": null,
"parent_thread_id": null,
"tool_name": "shell",
"started_at": 123,
"completed_at": 125,
"duration_ms": 2000,
"execution_started": true,
"review_count": 0,
"guardian_review_count": 0,
"user_review_count": 0,
"final_approval_outcome": "not_needed",
"terminal_status": "completed",
"failure_kind": null,
"requested_additional_permissions": false,
"requested_network_access": false,
"retry_count": 0,
"command_execution_source": "agent",
"command_execution_family": "shell",
"exit_code": 0,
"command_action_count": 1
}
})
);
}
#[test]
fn command_execution_event_allows_null_thread_denormalization() {
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
event_type: "codex_command_execution_event",
event_params: CodexCommandExecutionEventParams {
base: CodexToolItemEventBase {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
app_server_client: CodexAppServerClientMetadata {
product_client_id: "codex_tui".to_string(),
client_name: Some("codex-tui".to_string()),
client_version: Some("1.2.3".to_string()),
rpc_transport: AppServerRpcTransport::Websocket,
experimental_api_enabled: Some(true),
},
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "macos".to_string(),
runtime_os_version: "15.3.1".to_string(),
runtime_arch: "aarch64".to_string(),
},
thread_source: None,
subagent_source: None,
parent_thread_id: None,
tool_name: "shell".to_string(),
started_at: 123,
completed_at: Some(125),
duration_ms: Some(2000),
execution_started: true,
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
terminal_status: ToolItemTerminalStatus::Completed,
failure_kind: None,
requested_additional_permissions: false,
requested_network_access: false,
retry_count: 0,
},
command_execution_source: CommandExecutionSourceKind::Agent,
command_execution_family: CommandExecutionFamily::Shell,
exit_code: Some(0),
command_action_count: Some(0),
},
});
let payload = serde_json::to_value(&event).expect("serialize command execution event");
assert_eq!(payload["event_params"]["thread_source"], json!(null));
}
#[tokio::test]
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
let mut reducer = AnalyticsReducer::default();
@@ -837,7 +990,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-no-client",
@@ -881,7 +1034,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
@@ -961,7 +1114,7 @@ async fn compaction_event_ingests_custom_fact() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
"thread-1",
@@ -1579,7 +1732,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -1591,7 +1744,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
@@ -1733,7 +1886,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
@@ -1757,7 +1910,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
// failed turn/start request and attach request-scoped connection metadata.
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
@@ -1874,7 +2027,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
@@ -1886,7 +2039,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
@@ -1896,7 +2049,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(5),
request: Box::new(sample_turn_steer_request(
@@ -1920,7 +2073,7 @@ async fn accepted_steers_increment_turn_steer_count() {
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(6),
request: Box::new(sample_turn_steer_request(
@@ -1932,7 +2085,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
},

View File

@@ -24,6 +24,8 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
@@ -46,8 +48,7 @@ pub(crate) struct AnalyticsEventsQueue {
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
queue: Option<AnalyticsEventsQueue>,
}
impl AnalyticsEventsQueue {
@@ -116,11 +117,15 @@ impl AnalyticsEventsClient {
analytics_enabled: Option<bool>,
) -> Self {
Self {
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
analytics_enabled,
queue: (analytics_enabled != Some(false))
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
}
}
pub fn disabled() -> Self {
Self { queue: None }
}
pub fn track_skill_invocations(
&self,
tracking: TrackEventsContext,
@@ -175,7 +180,7 @@ impl AnalyticsEventsClient {
}
pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
self.record_fact(AnalyticsFact::Request {
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request),
@@ -183,7 +188,10 @@ impl AnalyticsEventsClient {
}
pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
@@ -192,7 +200,10 @@ impl AnalyticsEventsClient {
}
pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
@@ -255,14 +266,13 @@ impl AnalyticsEventsClient {
}
pub(crate) fn record_fact(&self, input: AnalyticsFact) {
if self.analytics_enabled == Some(false) {
return;
if let Some(queue) = self.queue.as_ref() {
queue.try_send(input);
}
self.queue.try_send(input);
}
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
self.record_fact(AnalyticsFact::Response {
self.record_fact(AnalyticsFact::ClientResponse {
connection_id,
response: Box::new(response),
});
@@ -286,6 +296,19 @@ impl AnalyticsEventsClient {
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}
pub fn track_server_response(&self, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
response: Box::new(response),
});
}
}
async fn send_track_events(

View File

@@ -42,6 +42,14 @@ pub(crate) enum TrackEventRequest {
Compaction(Box<CodexCompactionEventRequest>),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
#[allow(dead_code)]
CommandExecution(CodexCommandExecutionEventRequest),
FileChange(CodexFileChangeEventRequest),
McpToolCall(CodexMcpToolCallEventRequest),
DynamicToolCall(CodexDynamicToolCallEventRequest),
CollabAgentToolCall(CodexCollabAgentToolCallEventRequest),
WebSearch(CodexWebSearchEventRequest),
ImageGeneration(CodexImageGenerationEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
@@ -277,6 +285,226 @@ pub(crate) struct GuardianReviewEventPayload {
pub(crate) guardian_review: GuardianReviewEventParams,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemFinalApprovalOutcome {
NotNeeded,
ConfigAllowed,
PolicyForbidden,
GuardianApproved,
GuardianDenied,
GuardianAborted,
UserApproved,
UserApprovedForSession,
UserDenied,
UserAborted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolItemTerminalStatus {
Completed,
Failed,
Rejected,
Interrupted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
#[allow(dead_code)]
pub(crate) enum ToolItemFailureKind {
ToolError,
ApprovalDenied,
ApprovalAborted,
SandboxDenied,
PolicyForbidden,
}
#[derive(Serialize)]
pub(crate) struct CodexToolItemEventBase {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
/// App-server ThreadItem.id. For tool-originated items this generally
/// corresponds to the originating core call_id.
pub(crate) item_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<&'static str>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) tool_name: String,
pub(crate) started_at: u64,
pub(crate) completed_at: Option<u64>,
pub(crate) duration_ms: Option<u64>,
pub(crate) execution_started: bool,
pub(crate) review_count: u64,
pub(crate) guardian_review_count: u64,
pub(crate) user_review_count: u64,
pub(crate) final_approval_outcome: ToolItemFinalApprovalOutcome,
pub(crate) terminal_status: ToolItemTerminalStatus,
pub(crate) failure_kind: Option<ToolItemFailureKind>,
pub(crate) requested_additional_permissions: bool,
pub(crate) requested_network_access: bool,
pub(crate) retry_count: u64,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionFamily {
Shell,
UserShell,
UnifiedExec,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionSourceKind {
Agent,
UserShell,
UnifiedExecStartup,
UnifiedExecInteraction,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CollabAgentToolKind {
SpawnAgent,
SendInput,
ResumeAgent,
Wait,
CloseAgent,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum WebSearchActionKind {
Search,
OpenPage,
FindInPage,
Other,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) command_execution_source: CommandExecutionSourceKind,
pub(crate) command_execution_family: CommandExecutionFamily,
pub(crate) exit_code: Option<i32>,
pub(crate) command_action_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCommandExecutionEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) file_change_count: u64,
pub(crate) file_add_count: u64,
pub(crate) file_update_count: u64,
pub(crate) file_delete_count: u64,
pub(crate) file_move_count: u64,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexFileChangeEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) mcp_server_name: String,
pub(crate) mcp_tool_name: String,
pub(crate) mcp_error_present: bool,
pub(crate) mcp_error_code: Option<String>,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexMcpToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) dynamic_tool_name: String,
pub(crate) success: Option<bool>,
pub(crate) output_content_item_count: Option<u64>,
pub(crate) output_text_item_count: Option<u64>,
pub(crate) output_image_item_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexDynamicToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) collab_agent_tool: CollabAgentToolKind,
pub(crate) sender_thread_id: String,
pub(crate) receiver_thread_count: u64,
pub(crate) receiver_thread_ids: Option<Vec<String>>,
pub(crate) requested_model: Option<String>,
pub(crate) requested_reasoning_effort: Option<String>,
pub(crate) agent_state_count: Option<u64>,
pub(crate) completed_agent_count: Option<u64>,
pub(crate) failed_agent_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCollabAgentToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) web_search_action: Option<WebSearchActionKind>,
pub(crate) query_present: bool,
pub(crate) query_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexWebSearchEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) image_generation_status: String,
pub(crate) revised_prompt_present: bool,
pub(crate) saved_path_present: bool,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexImageGenerationEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexAppMetadata {
pub(crate) connector_id: Option<String>,

View File

@@ -7,6 +7,8 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
@@ -268,12 +270,12 @@ pub(crate) enum AnalyticsFact {
runtime: CodexRuntimeMetadata,
rpc_transport: AppServerRpcTransport,
},
Request {
ClientRequest {
connection_id: u64,
request_id: RequestId,
request: Box<ClientRequest>,
},
Response {
ClientResponse {
connection_id: u64,
response: Box<ClientResponse>,
},
@@ -283,6 +285,13 @@ pub(crate) enum AnalyticsFact {
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
},
ServerRequest {
connection_id: u64,
request: Box<ServerRequest>,
},
ServerResponse {
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.

View File

@@ -168,14 +168,14 @@ impl AnalyticsReducer {
rpc_transport,
);
}
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id,
response,
} => {
@@ -192,6 +192,13 @@ impl AnalyticsReducer {
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::ServerRequest {
connection_id: _connection_id,
request: _request,
} => {}
AnalyticsFact::ServerResponse {
response: _response,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);

View File

@@ -0,0 +1,16 @@
use std::sync::Arc;
use codex_analytics::AnalyticsEventsClient;
use codex_core::config::Config;
use codex_login::AuthManager;
pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
)
}

View File

@@ -3284,7 +3284,7 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3353,7 +3353,7 @@ mod tests {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3445,7 +3445,7 @@ mod tests {
let thread_state = new_thread_state();
let thread_watch_manager = ThreadWatchManager::new();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3889,7 +3889,7 @@ mod tests {
let conversation_id = ThreadId::new();
let event_turn_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -3953,7 +3953,7 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4001,7 +4001,7 @@ mod tests {
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4043,7 +4043,7 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4097,7 +4097,7 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-123".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4185,7 +4185,7 @@ mod tests {
let conversation_id = ThreadId::new();
let turn_id = "turn-456".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4256,7 +4256,7 @@ mod tests {
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4512,7 +4512,7 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4550,7 +4550,7 @@ mod tests {
#[tokio::test]
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
@@ -4576,7 +4576,7 @@ mod tests {
#[tokio::test]
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let conversation_id = ThreadId::new();
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,

View File

@@ -10369,7 +10369,7 @@ mod tests {
let connection_id = ConnectionId(7);
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
vec![connection_id],

View File

@@ -757,7 +757,7 @@ mod tests {
let manager = CommandExecManager::default();
let err = manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
request_id: ConnectionRequestId {
connection_id: ConnectionId(1),
request_id: codex_app_server_protocol::RequestId::Integer(42),
@@ -793,7 +793,7 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
request_id: request_id.clone(),
process_id: Some("proc-99".to_string()),
exec_request: windows_sandbox_exec_request(),
@@ -843,7 +843,7 @@ mod tests {
manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
request_id: request_id.clone(),
process_id: Some("proc-100".to_string()),
exec_request: ExecRequest::new(

View File

@@ -234,7 +234,7 @@ mod tests {
const OUTGOING_BUFFER: usize = 1;
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
FsWatchManager::new_with_file_watcher(
Arc::new(OutgoingMessageSender::new(tx)),
Arc::new(OutgoingMessageSender::new_for_tests(tx)),
Arc::new(FileWatcher::noop()),
)
}

View File

@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::analytics_events::analytics_events_client_from_config;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::OVERLOADED_ERROR_CODE;
@@ -357,7 +358,14 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -382,12 +390,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
arg0_paths: args.arg0_paths,
config: args.config,
environment_manager: args.environment_manager,

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use crate::analytics_events::analytics_events_client_from_config;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::ConnectionId;
@@ -64,6 +65,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::Registry;
use tracing_subscriber::util::SubscriberInitExt;
mod analytics_events;
mod app_server_tracing;
mod bespoke_event_handling;
mod codex_message_processor;
@@ -646,14 +648,20 @@ pub async fn run_main_with_transport(
});
let processor_handle = tokio::spawn({
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outbound_control_tx = outbound_control_tx;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
let loader_overrides = loader_overrides_for_config_api;
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
analytics_events_client,
arg0_paths,
config: Arc::new(config),
environment_manager,

View File

@@ -225,6 +225,7 @@ impl ConnectionSessionState {
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) arg0_paths: Arg0DispatchPaths,
pub(crate) config: Arc<Config>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
@@ -246,6 +247,7 @@ impl MessageProcessor {
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
let MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths,
config,
environment_manager,
@@ -263,11 +265,6 @@ impl MessageProcessor {
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let analytics_events_client = AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),

View File

@@ -1,6 +1,7 @@
use super::ConnectionSessionState;
use super::MessageProcessor;
use super::MessageProcessorArgs;
use crate::analytics_events::analytics_events_client_from_config;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::AppServerTransport;
@@ -234,11 +235,14 @@ fn build_test_processor(
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths: Arg0DispatchPaths::default(),
config,
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),

View File

@@ -4,12 +4,14 @@ use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ServerResponse;
use codex_otel::span_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::protocol::W3cTraceContext;
@@ -117,6 +119,7 @@ pub(crate) struct OutgoingMessageSender {
/// We keep them here because this is where responses, errors, and
/// disconnect cleanup all get handled.
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
analytics_events_client: AnalyticsEventsClient,
}
#[derive(Clone)]
@@ -203,15 +206,24 @@ impl ThreadScopedOutgoingMessageSender {
}
impl OutgoingMessageSender {
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
pub(crate) fn new(
sender: mpsc::Sender<OutgoingEnvelope>,
analytics_events_client: AnalyticsEventsClient,
) -> Self {
Self {
next_server_request_id: AtomicI64::new(0),
sender,
request_id_to_callback: Mutex::new(HashMap::new()),
request_contexts: Mutex::new(HashMap::new()),
analytics_events_client,
}
}
#[cfg(test)]
pub(crate) fn new_for_tests(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
Self::new(sender, AnalyticsEventsClient::disabled())
}
pub(crate) async fn register_request_context(&self, request_context: RequestContext) {
let mut request_contexts = self.request_contexts.lock().await;
if request_contexts
@@ -298,7 +310,7 @@ impl OutgoingMessageSender {
);
}
let outgoing_message = OutgoingMessage::Request(request);
let outgoing_message = OutgoingMessage::Request(request.clone());
let send_result = match connection_ids {
None => {
self.sender
@@ -321,6 +333,9 @@ impl OutgoingMessageSender {
{
send_error = Some(err);
break;
} else {
self.analytics_events_client
.track_server_request(connection_id.0, request.clone());
}
}
match send_error {
@@ -364,6 +379,10 @@ impl OutgoingMessageSender {
match entry {
Some((id, entry)) => {
if let Some(response) = server_response_from_result(&entry.request, result.clone())
{
self.analytics_events_client.track_server_response(response);
}
if let Err(err) = entry.callback.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
@@ -621,6 +640,14 @@ impl OutgoingMessageSender {
}
}
fn server_response_from_result(request: &ServerRequest, result: Result) -> Option<ServerResponse> {
let mut value = serde_json::to_value(request).ok()?;
let object = value.as_object_mut()?;
object.remove("params");
object.insert("response".to_string(), result);
serde_json::from_value(value).ok()
}
/// Outgoing message from the server to the client.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
@@ -654,6 +681,8 @@ mod tests {
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
@@ -838,10 +867,53 @@ mod tests {
);
}
#[test]
fn server_response_from_result_decodes_typed_response_with_original_method() {
let request = ServerRequest::CommandExecutionRequestApproval {
request_id: RequestId::Integer(7),
params: CommandExecutionRequestApprovalParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
approval_id: None,
reason: None,
network_approval_context: None,
command: Some("echo hi".to_string()),
cwd: None,
command_actions: None,
additional_permissions: None,
proposed_execpolicy_amendment: None,
proposed_network_policy_amendments: None,
available_decisions: None,
},
};
let response = server_response_from_result(
&request,
json!({
"decision": "acceptForSession",
}),
)
.expect("decode typed server response");
let ServerResponse::CommandExecutionRequestApproval {
request_id,
response,
} = response
else {
panic!("expected command execution approval response");
};
assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(
response.decision,
CommandExecutionApprovalDecision::AcceptForSession
);
}
#[tokio::test]
async fn send_response_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
@@ -876,7 +948,7 @@ mod tests {
#[tokio::test]
async fn send_response_clears_registered_request_context() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
@@ -901,7 +973,7 @@ mod tests {
#[tokio::test]
async fn send_error_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -939,7 +1011,7 @@ mod tests {
#[tokio::test]
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let send_task = tokio::spawn(async move {
outgoing
.send_server_notification_to_connection_and_wait(
@@ -983,7 +1055,7 @@ mod tests {
#[tokio::test]
async fn connection_closed_clears_registered_request_contexts() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let closed_connection_request = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
@@ -1017,7 +1089,7 @@ mod tests {
#[tokio::test]
async fn notify_client_error_forwards_error_to_waiter() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let outgoing = OutgoingMessageSender::new_for_tests(tx);
let (request_id, wait_for_result) = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(
@@ -1051,7 +1123,7 @@ mod tests {
#[tokio::test]
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
@@ -1108,7 +1180,7 @@ mod tests {
#[tokio::test]
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
let thread_id = ThreadId::new();
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),

View File

@@ -720,9 +720,9 @@ mod tests {
#[tokio::test]
async fn status_change_emits_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
)));
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
OutgoingMessageSender::new_for_tests(outgoing_tx),
));
manager
.upsert_thread(test_thread(
@@ -762,9 +762,9 @@ mod tests {
#[tokio::test]
async fn silent_upsert_skips_initial_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
)));
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
OutgoingMessageSender::new_for_tests(outgoing_tx),
));
manager
.upsert_thread_silently(test_thread(