[codex-analytics] feature plumbing and emittance

This commit is contained in:
rhan-oai
2026-04-06 12:26:33 -07:00
parent 6d273dd348
commit 031eb880e7
21 changed files with 1264 additions and 44 deletions

View File

@@ -120,6 +120,41 @@ pub(crate) async fn wait_for_analytics_payload(
serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))
}
pub(crate) async fn wait_for_analytics_event(
server: &MockServer,
read_timeout: Duration,
event_type: &str,
) -> Result<Value> {
timeout(read_timeout, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
for request in &requests {
if request.method != "POST"
|| request.url.path() != "/codex/analytics-events/events"
{
continue;
}
let payload: Value = serde_json::from_slice(&request.body)
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"))?;
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
return Ok::<Value, anyhow::Error>(event.clone());
}
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?
}
pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> {
let events = payload["events"]
.as_array()

View File

@@ -3,8 +3,10 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -22,6 +24,9 @@ use codex_app_server_protocol::UserInput as V2UserInput;
use tempfile::TempDir;
use tokio::time::timeout;
use super::analytics::enable_analytics_capture;
use super::analytics::wait_for_analytics_event;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
@@ -43,14 +48,20 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
std::fs::create_dir(&working_directory)?;
// Mock server: long-running shell command then (after abort) nothing else needed.
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri(), "never")?;
let server =
create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
write_mock_responses_config_toml_with_chatgpt_base_url(
&codex_home,
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, &codex_home).await?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -87,6 +98,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let turn_id = turn.id.clone();
// Give the command a brief moment to start.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
@@ -96,7 +108,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
let interrupt_id = mcp
.send_turn_interrupt_request(TurnInterruptParams {
thread_id: thread_id.clone(),
turn_id: turn.id,
turn_id: turn_id.clone(),
})
.await?;
let interrupt_resp: JSONRPCResponse = timeout(
@@ -119,6 +131,12 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
assert_eq!(completed.thread_id, thread_id);
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread_id);
assert_eq!(event["event_params"]["turn_id"], turn_id);
assert_eq!(event["event_params"]["status"], "interrupted");
assert_eq!(event["event_params"]["turn_error"], serde_json::Value::Null);
Ok(())
}
@@ -131,7 +149,11 @@ async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let shell_command = vec![
"python3".to_string(),
"-c".to_string(),
"import time; time.sleep(10)".to_string(),
];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_exec_command_sse_response;
@@ -9,6 +10,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::format_with_current_shell_display;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::ByteRange;
@@ -64,6 +66,9 @@ use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use super::analytics::enable_analytics_capture;
use super::analytics::wait_for_analytics_event;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
@@ -238,6 +243,143 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home.path(),
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Image {
url: "https://example.com/a.png".to_string(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
assert_eq!(event["event_params"]["turn_id"], turn.id);
assert_eq!(
event["event_params"]["product_client_id"],
DEFAULT_CLIENT_NAME
);
assert_eq!(event["event_params"]["model"], "mock-model");
assert_eq!(event["event_params"]["model_provider"], "mock_provider");
assert_eq!(event["event_params"]["sandbox_policy"], "read_only");
assert_eq!(event["event_params"]["num_input_images"], 1);
assert_eq!(event["event_params"]["status"], "completed");
assert!(event["event_params"]["started_at"].as_u64().is_some());
assert!(event["event_params"]["completed_at"].as_u64().is_some());
assert!(event["event_params"]["duration_ms"].as_u64().is_some());
Ok(())
}
#[tokio::test]
async fn turn_start_tracks_failed_turn_event_analytics() -> Result<()> {
let server = create_mock_responses_server_sequence(vec![String::new()]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home.path(),
&server.uri(),
&server.uri(),
)?;
enable_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "trigger failed turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification = serde_json::from_value(
completed_notif
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.turn.status, TurnStatus::Failed);
assert!(completed.turn.error.is_some());
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
assert_eq!(event["event_params"]["turn_id"], turn.id);
assert_eq!(event["event_params"]["status"], "failed");
assert_ne!(event["event_params"]["turn_error"], serde_json::Value::Null);
Ok(())
}
#[tokio::test]
async fn turn_start_accepts_text_at_limit_with_mention_item() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];