mirror of
https://github.com/openai/codex.git
synced 2026-04-09 00:51:43 +03:00
Compare commits
1 Commits
pr17089
...
latest-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
abbea13020 |
@@ -92,7 +92,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.0.0"
|
||||
version = "0.119.0-alpha.23"
|
||||
# Track the edition for all workspace crates in one place. Individual
|
||||
# crates can still override this value, but keeping it here means new
|
||||
# crates created with `cargo new -w ...` automatically inherit the 2024
|
||||
|
||||
@@ -6,14 +6,9 @@ use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexToolCallEventParams;
|
||||
use crate::events::CodexToolCallEventRequest;
|
||||
use crate::events::ThreadInitializationMode;
|
||||
use crate::events::ThreadInitializedEvent;
|
||||
use crate::events::ThreadInitializedEventParams;
|
||||
use crate::events::ToolCallFinalReviewOutcome;
|
||||
use crate::events::ToolCallTerminalStatus;
|
||||
use crate::events::ToolKind;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::events::codex_app_metadata;
|
||||
use crate::events::codex_plugin_metadata;
|
||||
@@ -351,89 +346,6 @@ fn thread_initialized_event_serializes_expected_shape() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_call_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::ToolCall(CodexToolCallEventRequest {
|
||||
event_type: "codex_tool_call_event",
|
||||
event_params: CodexToolCallEventParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
tool_call_id: "tool-call-1".to_string(),
|
||||
app_server_client: CodexAppServerClientMetadata {
|
||||
product_client_id: "codex_tui".to_string(),
|
||||
client_name: Some("codex-tui".to_string()),
|
||||
client_version: Some("1.2.3".to_string()),
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
experimental_api_enabled: Some(true),
|
||||
},
|
||||
runtime: CodexRuntimeMetadata {
|
||||
codex_rs_version: "0.99.0".to_string(),
|
||||
runtime_os: "macos".to_string(),
|
||||
runtime_os_version: "15.3.1".to_string(),
|
||||
runtime_arch: "aarch64".to_string(),
|
||||
},
|
||||
tool_name: "shell".to_string(),
|
||||
tool_kind: ToolKind::Shell,
|
||||
started_at: 123,
|
||||
completed_at: Some(125),
|
||||
duration_ms: Some(2000),
|
||||
execution_started: true,
|
||||
review_count: 0,
|
||||
guardian_review_count: 0,
|
||||
user_review_count: 0,
|
||||
final_review_outcome: ToolCallFinalReviewOutcome::NotNeeded,
|
||||
terminal_status: ToolCallTerminalStatus::Completed,
|
||||
failure_kind: None,
|
||||
exit_code: Some(0),
|
||||
requested_additional_permissions: false,
|
||||
requested_network_access: false,
|
||||
retry_count: 0,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize tool call event");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_tool_call_event",
|
||||
"event_params": {
|
||||
"thread_id": "thread-1",
|
||||
"turn_id": "turn-1",
|
||||
"tool_call_id": "tool-call-1",
|
||||
"app_server_client": {
|
||||
"product_client_id": "codex_tui",
|
||||
"client_name": "codex-tui",
|
||||
"client_version": "1.2.3",
|
||||
"rpc_transport": "websocket",
|
||||
"experimental_api_enabled": true
|
||||
},
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.99.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"tool_name": "shell",
|
||||
"tool_kind": "shell",
|
||||
"started_at": 123,
|
||||
"completed_at": 125,
|
||||
"duration_ms": 2000,
|
||||
"execution_started": true,
|
||||
"review_count": 0,
|
||||
"guardian_review_count": 0,
|
||||
"user_review_count": 0,
|
||||
"final_review_outcome": "not_needed",
|
||||
"terminal_status": "completed",
|
||||
"failure_kind": null,
|
||||
"exit_code": 0,
|
||||
"requested_additional_permissions": false,
|
||||
"requested_network_access": false,
|
||||
"retry_count": 0
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -441,7 +353,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-no-client",
|
||||
@@ -485,7 +397,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
|
||||
@@ -16,8 +16,6 @@ use crate::facts::TrackEventsContext;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
@@ -115,18 +113,6 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_disabled() -> Self {
|
||||
let (sender, _receiver) = mpsc::channel(1);
|
||||
Self {
|
||||
queue: AnalyticsEventsQueue {
|
||||
sender,
|
||||
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
|
||||
},
|
||||
analytics_enabled: Some(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn track_skill_invocations(
|
||||
&self,
|
||||
tracking: TrackEventsContext,
|
||||
@@ -236,24 +222,11 @@ impl AnalyticsEventsClient {
|
||||
}
|
||||
|
||||
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
|
||||
self.record_fact(AnalyticsFact::ClientResponse {
|
||||
self.record_fact(AnalyticsFact::Response {
|
||||
connection_id,
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
|
||||
self.record_fact(AnalyticsFact::ServerRequest {
|
||||
connection_id,
|
||||
request: Box::new(request),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_server_response(&self, response: ServerResponse) {
|
||||
self.record_fact(AnalyticsFact::ServerResponse {
|
||||
response: Box::new(response),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_track_events(
|
||||
|
||||
@@ -37,7 +37,6 @@ pub(crate) enum TrackEventRequest {
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
ToolCall(CodexToolCallEventRequest),
|
||||
PluginUsed(CodexPluginUsedEventRequest),
|
||||
PluginInstalled(CodexPluginEventRequest),
|
||||
PluginUninstalled(CodexPluginEventRequest),
|
||||
@@ -100,81 +99,6 @@ pub(crate) struct ThreadInitializedEvent {
|
||||
pub(crate) event_params: ThreadInitializedEventParams,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolKind {
|
||||
Shell,
|
||||
UnifiedExec,
|
||||
ApplyPatch,
|
||||
Mcp,
|
||||
Dynamic,
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolCallFinalReviewOutcome {
|
||||
NotNeeded,
|
||||
GuardianApproved,
|
||||
GuardianDenied,
|
||||
GuardianAborted,
|
||||
UserApproved,
|
||||
UserApprovedForSession,
|
||||
UserDenied,
|
||||
UserAborted,
|
||||
ConfigAllowed,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolCallTerminalStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Rejected,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum ToolCallFailureKind {
|
||||
ToolError,
|
||||
ApprovalDenied,
|
||||
ApprovalAborted,
|
||||
SandboxDenied,
|
||||
PolicyForbidden,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexToolCallEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
pub(crate) tool_call_id: String,
|
||||
pub(crate) app_server_client: CodexAppServerClientMetadata,
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) tool_name: String,
|
||||
pub(crate) tool_kind: ToolKind,
|
||||
pub(crate) started_at: u64,
|
||||
pub(crate) completed_at: Option<u64>,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
pub(crate) execution_started: bool,
|
||||
pub(crate) review_count: u64,
|
||||
pub(crate) guardian_review_count: u64,
|
||||
pub(crate) user_review_count: u64,
|
||||
pub(crate) final_review_outcome: ToolCallFinalReviewOutcome,
|
||||
pub(crate) terminal_status: ToolCallTerminalStatus,
|
||||
pub(crate) failure_kind: Option<ToolCallFailureKind>,
|
||||
pub(crate) exit_code: Option<i32>,
|
||||
pub(crate) requested_additional_permissions: bool,
|
||||
pub(crate) requested_network_access: bool,
|
||||
pub(crate) retry_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexToolCallEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexToolCallEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppMetadata {
|
||||
pub(crate) connector_id: Option<String>,
|
||||
|
||||
@@ -5,8 +5,6 @@ use codex_app_server_protocol::ClientResponse;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
@@ -74,22 +72,15 @@ pub(crate) enum AnalyticsFact {
|
||||
runtime: CodexRuntimeMetadata,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
},
|
||||
ClientRequest {
|
||||
Request {
|
||||
connection_id: u64,
|
||||
request_id: RequestId,
|
||||
request: Box<ClientRequest>,
|
||||
},
|
||||
ClientResponse {
|
||||
Response {
|
||||
connection_id: u64,
|
||||
response: Box<ClientResponse>,
|
||||
},
|
||||
ServerRequest {
|
||||
connection_id: u64,
|
||||
request: Box<ServerRequest>,
|
||||
},
|
||||
ServerResponse {
|
||||
response: Box<ServerResponse>,
|
||||
},
|
||||
Notification(Box<ServerNotification>),
|
||||
// Facts that do not naturally exist on the app-server protocol surface, or
|
||||
// would require non-trivial protocol reshaping on this branch.
|
||||
|
||||
@@ -65,24 +65,17 @@ impl AnalyticsReducer {
|
||||
rpc_transport,
|
||||
);
|
||||
}
|
||||
AnalyticsFact::ClientRequest {
|
||||
AnalyticsFact::Request {
|
||||
connection_id: _connection_id,
|
||||
request_id: _request_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::ClientResponse {
|
||||
AnalyticsFact::Response {
|
||||
connection_id,
|
||||
response,
|
||||
} => {
|
||||
self.ingest_response(connection_id, *response, out);
|
||||
}
|
||||
AnalyticsFact::ServerRequest {
|
||||
connection_id: _connection_id,
|
||||
request: _request,
|
||||
} => {}
|
||||
AnalyticsFact::ServerResponse {
|
||||
response: _response,
|
||||
} => {}
|
||||
AnalyticsFact::Notification(_notification) => {}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_core::config::Config;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub(crate) fn analytics_events_client_from_config(
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: &Config,
|
||||
) -> AnalyticsEventsClient {
|
||||
AnalyticsEventsClient::new(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
}
|
||||
@@ -3166,7 +3166,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3235,7 +3235,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let thread_state = new_thread_state();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3327,7 +3327,7 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
let thread_watch_manager = ThreadWatchManager::new();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3722,7 +3722,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let event_turn_id = "complete1".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3785,7 +3785,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3832,7 +3832,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3873,7 +3873,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -3927,7 +3927,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-123".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4015,7 +4015,7 @@ mod tests {
|
||||
let conversation_id = ThreadId::new();
|
||||
let turn_id = "turn-456".to_string();
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4086,7 +4086,7 @@ mod tests {
|
||||
let thread_state = new_thread_state();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4339,7 +4339,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4377,7 +4377,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
vec![ConnectionId(1)],
|
||||
@@ -4403,7 +4403,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
|
||||
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let conversation_id = ThreadId::new();
|
||||
let outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing,
|
||||
|
||||
@@ -9638,7 +9638,7 @@ mod tests {
|
||||
let connection_id = ConnectionId(7);
|
||||
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
vec![connection_id],
|
||||
|
||||
@@ -757,7 +757,7 @@ mod tests {
|
||||
let manager = CommandExecManager::default();
|
||||
let err = manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id: ConnectionRequestId {
|
||||
connection_id: ConnectionId(1),
|
||||
request_id: codex_app_server_protocol::RequestId::Integer(42),
|
||||
@@ -793,7 +793,7 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-99".to_string()),
|
||||
exec_request: windows_sandbox_exec_request(),
|
||||
@@ -843,7 +843,7 @@ mod tests {
|
||||
|
||||
manager
|
||||
.start(StartCommandExecParams {
|
||||
outgoing: Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
|
||||
request_id: request_id.clone(),
|
||||
process_id: Some("proc-100".to_string()),
|
||||
exec_request: ExecRequest::new(
|
||||
|
||||
@@ -235,7 +235,7 @@ mod tests {
|
||||
const OUTGOING_BUFFER: usize = 1;
|
||||
let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER);
|
||||
FsWatchManager::new_with_file_watcher(
|
||||
Arc::new(OutgoingMessageSender::new_for_tests(tx)),
|
||||
Arc::new(OutgoingMessageSender::new(tx)),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::error_code::OVERLOADED_ERROR_CODE;
|
||||
@@ -358,14 +357,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -390,11 +382,12 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
analytics_events_client,
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
environment_manager: args.environment_manager,
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
@@ -64,7 +63,6 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod analytics_events;
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
@@ -642,20 +640,14 @@ pub async fn run_main_with_transport(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
));
|
||||
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
|
||||
let loader_overrides = loader_overrides_for_config_api;
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config: Arc::new(config),
|
||||
environment_manager,
|
||||
|
||||
@@ -185,7 +185,6 @@ pub(crate) struct ConnectionSessionState {
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) arg0_paths: Arg0DispatchPaths,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
@@ -207,7 +206,6 @@ impl MessageProcessor {
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
environment_manager,
|
||||
@@ -236,6 +234,11 @@ impl MessageProcessor {
|
||||
},
|
||||
environment_manager,
|
||||
));
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
);
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use super::ConnectionSessionState;
|
||||
use super::MessageProcessor;
|
||||
use super::MessageProcessorArgs;
|
||||
use crate::analytics_events::analytics_events_client_from_config;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::AppServerTransport;
|
||||
@@ -235,14 +234,11 @@ fn build_test_processor(
|
||||
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(outgoing_tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
|
||||
let processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
||||
|
||||
@@ -4,14 +4,12 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_otel::span_w3c_trace_context;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
@@ -119,7 +117,6 @@ pub(crate) struct OutgoingMessageSender {
|
||||
/// We keep them here because this is where responses, errors, and
|
||||
/// disconnect cleanup all get handled.
|
||||
request_contexts: Mutex<HashMap<ConnectionRequestId, RequestContext>>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -206,24 +203,15 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
}
|
||||
|
||||
impl OutgoingMessageSender {
|
||||
pub(crate) fn new(
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
) -> Self {
|
||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
Self {
|
||||
next_server_request_id: AtomicI64::new(0),
|
||||
sender,
|
||||
request_id_to_callback: Mutex::new(HashMap::new()),
|
||||
request_contexts: Mutex::new(HashMap::new()),
|
||||
analytics_events_client,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_for_tests(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
|
||||
Self::new(sender, AnalyticsEventsClient::new_disabled())
|
||||
}
|
||||
|
||||
pub(crate) async fn register_request_context(&self, request_context: RequestContext) {
|
||||
let mut request_contexts = self.request_contexts.lock().await;
|
||||
if request_contexts
|
||||
@@ -310,7 +298,7 @@ impl OutgoingMessageSender {
|
||||
);
|
||||
}
|
||||
|
||||
let outgoing_message = OutgoingMessage::Request(request.clone());
|
||||
let outgoing_message = OutgoingMessage::Request(request);
|
||||
let send_result = match connection_ids {
|
||||
None => {
|
||||
self.sender
|
||||
@@ -333,9 +321,6 @@ impl OutgoingMessageSender {
|
||||
{
|
||||
send_error = Some(err);
|
||||
break;
|
||||
} else {
|
||||
self.analytics_events_client
|
||||
.track_server_request(connection_id.0, request.clone());
|
||||
}
|
||||
}
|
||||
match send_error {
|
||||
@@ -379,10 +364,6 @@ impl OutgoingMessageSender {
|
||||
|
||||
match entry {
|
||||
Some((id, entry)) => {
|
||||
if let Some(response) = server_response_from_result(&entry.request, result.clone())
|
||||
{
|
||||
self.analytics_events_client.track_server_response(response);
|
||||
}
|
||||
if let Err(err) = entry.callback.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
@@ -640,14 +621,6 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
fn server_response_from_result(request: &ServerRequest, result: Result) -> Option<ServerResponse> {
|
||||
let mut value = serde_json::to_value(request).ok()?;
|
||||
let object = value.as_object_mut()?;
|
||||
object.remove("params");
|
||||
object.insert("response".to_string(), result);
|
||||
serde_json::from_value(value).ok()
|
||||
}
|
||||
|
||||
/// Outgoing message from the server to the client.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(untagged)]
|
||||
@@ -681,8 +654,6 @@ mod tests {
|
||||
use codex_app_server_protocol::AccountUpdatedNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
@@ -867,53 +838,10 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_response_from_result_decodes_typed_response_with_original_method() {
|
||||
let request = ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(7),
|
||||
params: CommandExecutionRequestApprovalParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
approval_id: None,
|
||||
reason: None,
|
||||
network_approval_context: None,
|
||||
command: Some("echo hi".to_string()),
|
||||
cwd: None,
|
||||
command_actions: None,
|
||||
additional_permissions: None,
|
||||
proposed_execpolicy_amendment: None,
|
||||
proposed_network_policy_amendments: None,
|
||||
available_decisions: None,
|
||||
},
|
||||
};
|
||||
|
||||
let response = server_response_from_result(
|
||||
&request,
|
||||
json!({
|
||||
"decision": "acceptForSession",
|
||||
}),
|
||||
)
|
||||
.expect("decode typed server response");
|
||||
|
||||
let ServerResponse::CommandExecutionRequestApproval {
|
||||
request_id,
|
||||
response,
|
||||
} = response
|
||||
else {
|
||||
panic!("expected command execution approval response");
|
||||
};
|
||||
assert_eq!(request_id, RequestId::Integer(7));
|
||||
assert_eq!(
|
||||
response.decision,
|
||||
CommandExecutionApprovalDecision::AcceptForSession
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_response_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -948,7 +876,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_response_clears_registered_request_context() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(42),
|
||||
request_id: RequestId::Integer(7),
|
||||
@@ -973,7 +901,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_error_routes_to_target_connection() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1011,7 +939,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn send_server_notification_to_connection_and_wait_tracks_write_completion() {
|
||||
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let send_task = tokio::spawn(async move {
|
||||
outgoing
|
||||
.send_server_notification_to_connection_and_wait(
|
||||
@@ -1055,7 +983,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn connection_closed_clears_registered_request_contexts() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
let closed_connection_request = ConnectionRequestId {
|
||||
connection_id: ConnectionId(9),
|
||||
request_id: RequestId::Integer(3),
|
||||
@@ -1089,7 +1017,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new_for_tests(tx);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request(ServerRequestPayload::ApplyPatchApproval(
|
||||
@@ -1123,7 +1051,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
@@ -1180,7 +1108,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn cancel_requests_for_thread_cancels_all_thread_requests() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(8);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new_for_tests(tx));
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
|
||||
let thread_id = ThreadId::new();
|
||||
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
|
||||
outgoing.clone(),
|
||||
|
||||
@@ -674,9 +674,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn status_change_emits_notification() {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
|
||||
OutgoingMessageSender::new_for_tests(outgoing_tx),
|
||||
));
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
|
||||
manager
|
||||
.upsert_thread(test_thread(
|
||||
@@ -716,9 +716,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn silent_upsert_skips_initial_notification() {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(
|
||||
OutgoingMessageSender::new_for_tests(outgoing_tx),
|
||||
));
|
||||
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
)));
|
||||
|
||||
manager
|
||||
.upsert_thread_silently(test_thread(
|
||||
|
||||
Reference in New Issue
Block a user