mirror of
https://github.com/openai/codex.git
synced 2026-05-08 07:11:10 +03:00
Compare commits
1 Commits
main
...
owen/sqlit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ddf828d4c |
@@ -80,7 +80,7 @@ pub fn build_provider(
|
||||
let service_name = service_name_override.unwrap_or(originator.value.as_str());
|
||||
let runtime_metrics = config.features.enabled(Feature::RuntimeMetrics);
|
||||
|
||||
OtelProvider::from(&OtelSettings {
|
||||
let provider = OtelProvider::from(&OtelSettings {
|
||||
service_name: service_name.to_string(),
|
||||
service_version: service_version.to_string(),
|
||||
codex_home: config.codex_home.to_path_buf(),
|
||||
@@ -91,7 +91,15 @@ pub fn build_provider(
|
||||
runtime_metrics,
|
||||
span_attributes: config.otel.span_attributes.clone(),
|
||||
tracestate: config.otel.tracestate.clone(),
|
||||
})
|
||||
})?;
|
||||
|
||||
if let Some(provider) = provider.as_ref()
|
||||
&& let Some(metrics) = provider.metrics()
|
||||
{
|
||||
let _ = codex_otel::record_process_start_once(metrics, originator.value.as_str());
|
||||
}
|
||||
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
/// Filter predicate for exporting only Codex-owned events via OTEL.
|
||||
|
||||
@@ -200,10 +200,15 @@ pub async fn run_main(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_config::types::OtelExporterKind;
|
||||
use codex_config::types::OtelHttpProtocol;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
|
||||
#[test]
|
||||
fn mcp_server_defaults_analytics_to_enabled() {
|
||||
@@ -212,14 +217,21 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn mcp_server_builds_otel_provider_with_logs_traces_and_metrics() -> anyhow::Result<()> {
|
||||
let collector = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(&collector)
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut config = ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await?;
|
||||
let exporter = OtelExporterKind::OtlpGrpc {
|
||||
endpoint: "http://localhost:4317".to_string(),
|
||||
let exporter = OtelExporterKind::OtlpHttp {
|
||||
endpoint: collector.uri(),
|
||||
headers: HashMap::new(),
|
||||
protocol: OtelHttpProtocol::Binary,
|
||||
tls: None,
|
||||
};
|
||||
config.otel.exporter = exporter.clone();
|
||||
|
||||
@@ -41,6 +41,7 @@ use std::time::Duration;
|
||||
use tracing::debug;
|
||||
|
||||
const ENV_ATTRIBUTE: &str = "env";
|
||||
const ARCH_ATTRIBUTE: &str = "arch";
|
||||
const METER_NAME: &str = "codex";
|
||||
const DURATION_UNIT: &str = "ms";
|
||||
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
|
||||
@@ -198,13 +199,13 @@ impl MetricsClient {
|
||||
|
||||
validate_tags(&default_tags)?;
|
||||
|
||||
let mut resource_attributes = Vec::with_capacity(4);
|
||||
let mut resource_attributes = Vec::with_capacity(5);
|
||||
resource_attributes.push(KeyValue::new(
|
||||
semconv::attribute::SERVICE_VERSION,
|
||||
service_version,
|
||||
));
|
||||
resource_attributes.push(KeyValue::new(ENV_ATTRIBUTE, environment));
|
||||
resource_attributes.extend(os_resource_attributes());
|
||||
resource_attributes.extend(platform_resource_attributes());
|
||||
|
||||
let resource = Resource::builder()
|
||||
.with_service_name(service_name)
|
||||
@@ -290,12 +291,13 @@ impl MetricsClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn os_resource_attributes() -> Vec<KeyValue> {
|
||||
fn platform_resource_attributes() -> Vec<KeyValue> {
|
||||
let os_info = os_info::get();
|
||||
let os_type_raw = os_info.os_type().to_string();
|
||||
let os_type = sanitize_metric_tag_value(os_type_raw.as_str());
|
||||
let os_version_raw = os_info.version().to_string();
|
||||
let os_version = sanitize_metric_tag_value(os_version_raw.as_str());
|
||||
let arch = sanitize_metric_tag_value(std::env::consts::ARCH);
|
||||
let mut attributes = Vec::new();
|
||||
if os_type != "unspecified" {
|
||||
attributes.push(KeyValue::new("os", os_type));
|
||||
@@ -303,6 +305,9 @@ fn os_resource_attributes() -> Vec<KeyValue> {
|
||||
if os_version != "unspecified" {
|
||||
attributes.push(KeyValue::new("os_version", os_version));
|
||||
}
|
||||
if arch != "unspecified" {
|
||||
attributes.push(KeyValue::new(ARCH_ATTRIBUTE, arch));
|
||||
}
|
||||
attributes
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ mod client;
|
||||
mod config;
|
||||
mod error;
|
||||
pub(crate) mod names;
|
||||
mod process;
|
||||
pub(crate) mod runtime_metrics;
|
||||
pub(crate) mod tags;
|
||||
pub(crate) mod timer;
|
||||
@@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig;
|
||||
pub use crate::metrics::config::MetricsExporter;
|
||||
pub use crate::metrics::error::MetricsError;
|
||||
pub use crate::metrics::error::Result;
|
||||
pub use crate::metrics::process::record_process_start_once;
|
||||
pub use names::*;
|
||||
use std::sync::OnceLock;
|
||||
pub use tags::ORIGINATOR_TAG;
|
||||
pub use tags::SessionMetricTagValues;
|
||||
pub use tags::bounded_originator_tag_value;
|
||||
|
||||
static GLOBAL_METRICS: OnceLock<MetricsClient> = OnceLock::new();
|
||||
static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock<StatsigMetricsSettings> = OnceLock::new();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
|
||||
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
|
||||
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
|
||||
pub const PROCESS_START_METRIC: &str = "codex.process.start";
|
||||
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
|
||||
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
|
||||
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";
|
||||
|
||||
27
codex-rs/otel/src/metrics/process.rs
Normal file
27
codex-rs/otel/src/metrics/process.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use super::client::MetricsClient;
|
||||
use super::error::Result;
|
||||
use super::names::PROCESS_START_METRIC;
|
||||
use super::tags::ORIGINATOR_TAG;
|
||||
use super::tags::bounded_originator_tag_value;
|
||||
|
||||
static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
/// Record the process start counter at most once for this process.
|
||||
pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result<bool> {
|
||||
if PROCESS_START_RECORDED
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_err()
|
||||
{
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
metrics.counter(
|
||||
PROCESS_START_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))],
|
||||
)?;
|
||||
Ok(true)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::metrics::Result;
|
||||
use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use codex_utils_string::sanitize_metric_tag_value;
|
||||
|
||||
pub const APP_VERSION_TAG: &str = "app.version";
|
||||
pub const AUTH_MODE_TAG: &str = "auth_mode";
|
||||
@@ -9,6 +10,24 @@ pub const ORIGINATOR_TAG: &str = "originator";
|
||||
pub const SERVICE_NAME_TAG: &str = "service_name";
|
||||
pub const SESSION_SOURCE_TAG: &str = "session_source";
|
||||
|
||||
const OTHER_ORIGINATOR_TAG_VALUE: &str = "other";
|
||||
|
||||
/// Returns a sanitized, low-cardinality originator value that is safe to use as a metric tag.
|
||||
pub fn bounded_originator_tag_value(originator: &str) -> &'static str {
|
||||
match sanitize_metric_tag_value(originator).as_str() {
|
||||
"codex_desktop" => "codex_desktop",
|
||||
"codex_cli_rs" => "codex_cli_rs",
|
||||
"codex-tui" => "codex-tui",
|
||||
"codex_vscode" => "codex_vscode",
|
||||
"none" => "none",
|
||||
"codex_exec" => "codex_exec",
|
||||
"codex-cli" => "codex-cli",
|
||||
"codex_sdk_ts" => "codex_sdk_ts",
|
||||
"codex-app-server-sdk" => "codex-app-server-sdk",
|
||||
_ => OTHER_ORIGINATOR_TAG_VALUE,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SessionMetricTagValues<'a> {
|
||||
pub auth_mode: Option<&'a str>,
|
||||
pub session_source: &'a str,
|
||||
|
||||
@@ -10,6 +10,7 @@ pub(crate) mod metadata;
|
||||
pub(crate) mod policy;
|
||||
pub(crate) mod recorder;
|
||||
pub(crate) mod session_index;
|
||||
pub(crate) mod sqlite_metrics;
|
||||
pub mod state_db;
|
||||
|
||||
pub(crate) mod default_client {
|
||||
|
||||
@@ -1279,6 +1279,7 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
|
||||
);
|
||||
crate::sqlite_metrics::record_fallback("find_thread_path", "mismatch");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
@@ -1296,6 +1297,7 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
|
||||
);
|
||||
crate::sqlite_metrics::record_fallback("find_thread_path", "stale_path");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1323,6 +1325,12 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back"
|
||||
);
|
||||
let reason = if state_db_ctx.is_some() {
|
||||
"missing_row"
|
||||
} else {
|
||||
"db_unavailable"
|
||||
};
|
||||
crate::sqlite_metrics::record_fallback("find_thread_path", reason);
|
||||
state_db::read_repair_rollout_path(
|
||||
state_db_ctx,
|
||||
thread_id,
|
||||
|
||||
@@ -450,6 +450,7 @@ impl RolloutRecorder {
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
crate::sqlite_metrics::record_fallback("list_threads", "db_unavailable");
|
||||
return Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
@@ -569,6 +570,7 @@ impl RolloutRecorder {
|
||||
}
|
||||
if listing_has_metadata_filters {
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
crate::sqlite_metrics::record_fallback("list_threads", "db_error");
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
page,
|
||||
@@ -578,6 +580,7 @@ impl RolloutRecorder {
|
||||
// If SQLite listing still fails, return the filesystem page rather than failing the list.
|
||||
tracing::error!("Falling back on rollout system");
|
||||
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
|
||||
crate::sqlite_metrics::record_fallback("list_threads", "db_error");
|
||||
Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
|
||||
49
codex-rs/rollout/src/sqlite_metrics.rs
Normal file
49
codex-rs/rollout/src/sqlite_metrics.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_otel::ORIGINATOR_TAG;
|
||||
use codex_otel::bounded_originator_tag_value;
|
||||
use codex_state::DbMetricsRecorder;
|
||||
use codex_state::DbMetricsRecorderHandle;
|
||||
|
||||
use crate::default_client::originator;
|
||||
|
||||
struct OtelDbMetrics {
|
||||
metrics: codex_otel::MetricsClient,
|
||||
originator: &'static str,
|
||||
}
|
||||
|
||||
impl DbMetricsRecorder for OtelDbMetrics {
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) {
|
||||
let tags = sqlite_originator_tags(tags, self.originator);
|
||||
let _ = self.metrics.counter(name, inc, &tags);
|
||||
}
|
||||
|
||||
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
let tags = sqlite_originator_tags(tags, self.originator);
|
||||
let _ = self.metrics.record_duration(name, duration, &tags);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn global() -> Option<DbMetricsRecorderHandle> {
|
||||
codex_otel::global().map(|metrics| {
|
||||
Arc::new(OtelDbMetrics {
|
||||
metrics,
|
||||
originator: bounded_originator_tag_value(originator().value.as_str()),
|
||||
}) as DbMetricsRecorderHandle
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn record_fallback(caller: &'static str, reason: &'static str) {
|
||||
let metrics = global();
|
||||
codex_state::record_db_fallback_metric(metrics.as_deref(), caller, reason);
|
||||
}
|
||||
|
||||
fn sqlite_originator_tags<'a>(
|
||||
tags: &[(&'a str, &'a str)],
|
||||
originator: &'static str,
|
||||
) -> Vec<(&'a str, &'a str)> {
|
||||
let mut tags = tags.to_vec();
|
||||
tags.push((ORIGINATOR_TAG, originator));
|
||||
tags
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use crate::list::Cursor;
|
||||
use crate::list::SortDirection;
|
||||
use crate::list::ThreadSortKey;
|
||||
use crate::metadata;
|
||||
use crate::sqlite_metrics;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -106,52 +107,80 @@ async fn try_init_with_roots_inner(
|
||||
default_model_provider_id: String,
|
||||
backfill_lease_seconds: Option<i64>,
|
||||
) -> anyhow::Result<StateDbHandle> {
|
||||
let runtime =
|
||||
codex_state::StateRuntime::init(sqlite_home.clone(), default_model_provider_id.clone())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"failed to initialize state runtime at {}: {err}",
|
||||
sqlite_home.display()
|
||||
)
|
||||
})?;
|
||||
let metrics = sqlite_metrics::global();
|
||||
let runtime = codex_state::StateRuntime::init_with_metrics(
|
||||
sqlite_home.clone(),
|
||||
default_model_provider_id.clone(),
|
||||
metrics.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"failed to initialize state runtime at {}: {err}",
|
||||
sqlite_home.display()
|
||||
)
|
||||
})?;
|
||||
let backfill_gate_started = Instant::now();
|
||||
let backfill_gate_result = wait_for_startup_backfill(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
backfill_lease_seconds,
|
||||
)
|
||||
.await;
|
||||
codex_state::record_db_init_backfill_gate_metric(
|
||||
metrics.as_deref(),
|
||||
backfill_gate_started.elapsed(),
|
||||
&backfill_gate_result,
|
||||
);
|
||||
backfill_gate_result?;
|
||||
Ok(runtime)
|
||||
}
|
||||
|
||||
async fn wait_for_startup_backfill(
|
||||
runtime: &codex_state::StateRuntime,
|
||||
codex_home: &Path,
|
||||
default_model_provider_id: &str,
|
||||
backfill_lease_seconds: Option<i64>,
|
||||
) -> anyhow::Result<()> {
|
||||
let wait_started = Instant::now();
|
||||
let mut reported_wait = false;
|
||||
loop {
|
||||
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"failed to read backfill state at {}: {err}",
|
||||
codex_home.display()
|
||||
)
|
||||
})?;
|
||||
let backfill_state = match runtime.get_backfill_state().await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to read backfill state at {}: {err}",
|
||||
codex_home.display()
|
||||
));
|
||||
}
|
||||
};
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
return Ok(runtime);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(backfill_lease_seconds) = backfill_lease_seconds {
|
||||
metadata::backfill_sessions_with_lease(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
runtime,
|
||||
codex_home,
|
||||
default_model_provider_id,
|
||||
backfill_lease_seconds,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
metadata::backfill_sessions(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
)
|
||||
.await;
|
||||
metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await;
|
||||
}
|
||||
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"failed to read backfill state at {} after startup backfill: {err}",
|
||||
codex_home.display()
|
||||
)
|
||||
})?;
|
||||
let backfill_state = match runtime.get_backfill_state().await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to read backfill state at {} after startup backfill: {err}",
|
||||
codex_home.display()
|
||||
));
|
||||
}
|
||||
};
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
return Ok(runtime);
|
||||
return Ok(());
|
||||
}
|
||||
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -193,22 +222,36 @@ fn emit_startup_warning(message: &str) {
|
||||
/// Unlike [`init`], this helper does not run rollout backfill. It is for
|
||||
/// optional local reads from non-owning contexts such as remote app-server mode.
|
||||
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
let metrics = sqlite_metrics::global();
|
||||
let state_path = codex_state::state_db_path(config.sqlite_home());
|
||||
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
|
||||
codex_state::record_db_fallback_metric(
|
||||
metrics.as_deref(),
|
||||
"get_state_db",
|
||||
"db_unavailable",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
let runtime = codex_state::StateRuntime::init(
|
||||
let runtime = match codex_state::StateRuntime::init_with_metrics(
|
||||
config.sqlite_home().to_path_buf(),
|
||||
config.model_provider_id().to_string(),
|
||||
metrics.clone(),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
require_backfill_complete(runtime, config.sqlite_home()).await
|
||||
{
|
||||
Ok(runtime) => runtime,
|
||||
Err(_) => {
|
||||
codex_state::record_db_fallback_metric(metrics.as_deref(), "get_state_db", "db_error");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
require_backfill_complete(runtime, config.sqlite_home(), metrics.as_deref()).await
|
||||
}
|
||||
|
||||
async fn require_backfill_complete(
|
||||
runtime: StateDbHandle,
|
||||
codex_home: &Path,
|
||||
metrics: Option<&dyn codex_state::DbMetricsRecorder>,
|
||||
) -> Option<StateDbHandle> {
|
||||
match runtime.get_backfill_state().await {
|
||||
Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime),
|
||||
@@ -218,6 +261,7 @@ async fn require_backfill_complete(
|
||||
codex_home.display(),
|
||||
state.status.as_str()
|
||||
);
|
||||
codex_state::record_db_fallback_metric(metrics, "get_state_db", "backfill_incomplete");
|
||||
None
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -225,6 +269,7 @@ async fn require_backfill_complete(
|
||||
"failed to read backfill state at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
codex_state::record_db_fallback_metric(metrics, "get_state_db", "db_error");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,12 +10,13 @@ mod migrations;
|
||||
mod model;
|
||||
mod paths;
|
||||
mod runtime;
|
||||
mod telemetry;
|
||||
|
||||
pub use model::LogEntry;
|
||||
pub use model::LogQuery;
|
||||
pub use model::LogRow;
|
||||
pub use model::Phase2JobClaimOutcome;
|
||||
/// Preferred entrypoint: owns configuration and metrics.
|
||||
/// Preferred entrypoint: owns SQLite configuration and optional metrics injection.
|
||||
pub use runtime::StateRuntime;
|
||||
|
||||
/// Low-level storage engine: useful for focused tests.
|
||||
@@ -56,6 +57,8 @@ pub use runtime::logs_db_filename;
|
||||
pub use runtime::logs_db_path;
|
||||
pub use runtime::state_db_filename;
|
||||
pub use runtime::state_db_path;
|
||||
pub use telemetry::DbMetricsRecorder;
|
||||
pub use telemetry::DbMetricsRecorderHandle;
|
||||
|
||||
/// Environment variable for overriding the SQLite state database home directory.
|
||||
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
@@ -71,3 +74,25 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill";
|
||||
/// Metrics on backfill duration. Tags: [status]
|
||||
pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms";
|
||||
/// SQLite startup initialization attempts. Tags: [status, phase, db, error]
|
||||
pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count";
|
||||
/// SQLite startup initialization duration. Tags: [status, phase, db, error]
|
||||
pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms";
|
||||
/// Filesystem fallback after SQLite could not serve a request. Tags: [caller, reason]
|
||||
pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count";
|
||||
|
||||
pub fn record_db_fallback_metric(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
caller: &'static str,
|
||||
reason: &'static str,
|
||||
) {
|
||||
telemetry::record_fallback(metrics, caller, reason);
|
||||
}
|
||||
|
||||
pub fn record_db_init_backfill_gate_metric(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
duration: std::time::Duration,
|
||||
result: &anyhow::Result<()>,
|
||||
) {
|
||||
telemetry::record_init_backfill_gate(metrics, duration, result);
|
||||
}
|
||||
|
||||
@@ -27,6 +27,9 @@ use crate::model::datetime_to_epoch_millis;
|
||||
use crate::model::datetime_to_epoch_seconds;
|
||||
use crate::model::epoch_millis_to_datetime;
|
||||
use crate::paths::file_modified_time_utc;
|
||||
use crate::telemetry::DbKind;
|
||||
use crate::telemetry::DbMetricsRecorder;
|
||||
use crate::telemetry::DbMetricsRecorderHandle;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -52,6 +55,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::warn;
|
||||
|
||||
mod agent_jobs;
|
||||
@@ -93,8 +97,18 @@ impl StateRuntime {
|
||||
///
|
||||
/// This opens (and migrates) the SQLite databases under `codex_home`,
|
||||
/// keeping logs in a dedicated file to reduce lock contention with the
|
||||
/// rest of the state store.
|
||||
/// rest of the state store. Use [`Self::init_with_metrics`] when the caller
|
||||
/// has a metrics sink to attach.
|
||||
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
|
||||
Self::init_with_metrics(codex_home, default_provider, /*metrics*/ None).await
|
||||
}
|
||||
|
||||
/// Initialize the state runtime with an explicit metrics client.
|
||||
pub async fn init_with_metrics(
|
||||
codex_home: PathBuf,
|
||||
default_provider: String,
|
||||
metrics: Option<DbMetricsRecorderHandle>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
let state_migrator = runtime_state_migrator();
|
||||
let logs_migrator = runtime_logs_migrator();
|
||||
@@ -116,24 +130,45 @@ impl StateRuntime {
|
||||
.await;
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let logs_path = logs_db_path(codex_home.as_path());
|
||||
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
|
||||
let pool = match open_state_sqlite(&state_path, &state_migrator, metrics.as_deref()).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open state db at {}: {err}", state_path.display());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
|
||||
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, metrics.as_deref()).await
|
||||
{
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open logs db at {}: {err}", logs_path.display());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let thread_updated_at_millis: Option<i64> =
|
||||
let started = Instant::now();
|
||||
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
|
||||
crate::telemetry::record_init_result(
|
||||
metrics.as_deref(),
|
||||
DbKind::State,
|
||||
"ensure_backfill_state",
|
||||
started.elapsed(),
|
||||
&backfill_state_result,
|
||||
);
|
||||
backfill_state_result?;
|
||||
let started = Instant::now();
|
||||
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
|
||||
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
|
||||
.fetch_one(pool.as_ref())
|
||||
.await?;
|
||||
.await
|
||||
.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(
|
||||
metrics.as_deref(),
|
||||
DbKind::State,
|
||||
"post_init_query",
|
||||
started.elapsed(),
|
||||
&thread_updated_at_millis_result,
|
||||
);
|
||||
let thread_updated_at_millis = thread_updated_at_millis_result?;
|
||||
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
|
||||
let runtime = Arc::new(Self {
|
||||
pool,
|
||||
@@ -165,29 +200,90 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.busy_timeout(Duration::from_secs(5))
|
||||
.log_statements(LevelFilter::Off)
|
||||
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250))
|
||||
}
|
||||
|
||||
async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
|
||||
async fn open_state_sqlite(
|
||||
path: &Path,
|
||||
migrator: &Migrator,
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
) -> anyhow::Result<SqlitePool> {
|
||||
// New state DBs should use incremental auto-vacuum, but retrofitting an
|
||||
// existing DB requires a full VACUUM. Do not attempt that during process
|
||||
// startup: it is maintenance work that can contend with foreground writers.
|
||||
open_sqlite(
|
||||
path,
|
||||
migrator,
|
||||
metrics,
|
||||
DbKind::State,
|
||||
"open_state",
|
||||
"migrate_state",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_logs_sqlite(
|
||||
path: &Path,
|
||||
migrator: &Migrator,
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
) -> anyhow::Result<SqlitePool> {
|
||||
open_sqlite(
|
||||
path,
|
||||
migrator,
|
||||
metrics,
|
||||
DbKind::Logs,
|
||||
"open_logs",
|
||||
"migrate_logs",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_sqlite(
|
||||
path: &Path,
|
||||
migrator: &Migrator,
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
db: DbKind,
|
||||
open_phase: &'static str,
|
||||
migrate_phase: &'static str,
|
||||
) -> anyhow::Result<SqlitePool> {
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
let started = Instant::now();
|
||||
let pool_result = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
.acquire_slow_level(LevelFilter::Warn)
|
||||
.acquire_slow_threshold(Duration::from_millis(250))
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
migrator.run(&pool).await?;
|
||||
.await
|
||||
.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(metrics, db, open_phase, started.elapsed(), &pool_result);
|
||||
let pool = pool_result?;
|
||||
let started = Instant::now();
|
||||
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(
|
||||
metrics,
|
||||
db,
|
||||
migrate_phase,
|
||||
started.elapsed(),
|
||||
&migrate_result,
|
||||
);
|
||||
migrate_result?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
migrator.run(&pool).await?;
|
||||
Ok(pool)
|
||||
async fn ensure_backfill_state_row_in_pool(pool: &sqlx::SqlitePool) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
|
||||
VALUES (?, ?, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(1_i64)
|
||||
.bind(crate::BackfillStatus::Pending.as_str())
|
||||
.bind(Utc::now().timestamp())
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn db_filename(base_name: &str, version: u32) -> String {
|
||||
@@ -355,9 +451,13 @@ mod tests {
|
||||
strict_pool.close().await;
|
||||
|
||||
let tolerant_migrator = runtime_state_migrator();
|
||||
let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator)
|
||||
.await
|
||||
.expect("runtime migrator should tolerate newer applied migrations");
|
||||
let tolerant_pool = open_state_sqlite(
|
||||
state_path.as_path(),
|
||||
&tolerant_migrator,
|
||||
/*metrics*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("runtime migrator should tolerate newer applied migrations");
|
||||
tolerant_pool.close().await;
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
|
||||
@@ -2,7 +2,6 @@ use super::*;
|
||||
|
||||
impl StateRuntime {
|
||||
pub async fn get_backfill_state(&self) -> anyhow::Result<crate::BackfillState> {
|
||||
self.ensure_backfill_state_row().await?;
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT status, last_watermark, last_success_at
|
||||
|
||||
261
codex-rs/state/src/telemetry.rs
Normal file
261
codex-rs/state/src/telemetry.rs
Normal file
@@ -0,0 +1,261 @@
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::DB_FALLBACK_METRIC;
|
||||
use crate::DB_INIT_DURATION_METRIC;
|
||||
use crate::DB_INIT_METRIC;
|
||||
|
||||
/// Low-cardinality metrics sink used by the SQLite state runtime.
|
||||
///
|
||||
/// Implementations should ignore recording errors locally. Database operations
|
||||
/// must never fail because telemetry delivery failed.
|
||||
pub trait DbMetricsRecorder: Send + Sync + 'static {
|
||||
/// Increment a counter metric by `inc` with low-cardinality tags.
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]);
|
||||
|
||||
/// Record an elapsed duration metric with low-cardinality tags.
|
||||
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]);
|
||||
}
|
||||
|
||||
/// Shared recorder handle stored by rollout SQLite telemetry plumbing.
|
||||
pub type DbMetricsRecorderHandle = Arc<dyn DbMetricsRecorder>;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum DbKind {
|
||||
State,
|
||||
Logs,
|
||||
}
|
||||
|
||||
impl DbKind {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::State => "state",
|
||||
Self::Logs => "logs",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn record_init_result<T>(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
db: DbKind,
|
||||
phase: &'static str,
|
||||
duration: Duration,
|
||||
result: &anyhow::Result<T>,
|
||||
) {
|
||||
let outcome = DbOutcomeTags::from_result(result);
|
||||
let tags = [
|
||||
("status", outcome.status),
|
||||
("phase", phase),
|
||||
("db", db.as_str()),
|
||||
("error", outcome.error),
|
||||
];
|
||||
record_counter(metrics, DB_INIT_METRIC, &tags);
|
||||
record_duration(metrics, DB_INIT_DURATION_METRIC, duration, &tags);
|
||||
}
|
||||
|
||||
pub fn record_fallback(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
caller: &'static str,
|
||||
reason: &'static str,
|
||||
) {
|
||||
let tags = [("caller", caller), ("reason", reason)];
|
||||
record_counter(metrics, DB_FALLBACK_METRIC, &tags);
|
||||
}
|
||||
|
||||
pub fn record_init_backfill_gate(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
duration: Duration,
|
||||
result: &anyhow::Result<()>,
|
||||
) {
|
||||
record_init_result(metrics, DbKind::State, "backfill_gate", duration, result);
|
||||
}
|
||||
|
||||
pub(crate) fn classify_error(err: &anyhow::Error) -> &'static str {
|
||||
for cause in err.chain() {
|
||||
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
|
||||
return classify_sqlx_error(sqlx_err);
|
||||
}
|
||||
if cause
|
||||
.downcast_ref::<sqlx::migrate::MigrateError>()
|
||||
.is_some()
|
||||
{
|
||||
return "migration";
|
||||
}
|
||||
if cause.downcast_ref::<serde_json::Error>().is_some() {
|
||||
return "serde";
|
||||
}
|
||||
if cause.downcast_ref::<std::io::Error>().is_some() {
|
||||
return "io";
|
||||
}
|
||||
}
|
||||
|
||||
"unknown"
|
||||
}
|
||||
|
||||
pub(crate) fn classify_sqlite_code(code: &str) -> &'static str {
|
||||
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
|
||||
match primary_code {
|
||||
Some(5) => "busy",
|
||||
Some(6) => "locked",
|
||||
Some(8) => "readonly",
|
||||
Some(10) => "io",
|
||||
Some(11) => "corrupt",
|
||||
Some(13) => "full",
|
||||
Some(14) => "cantopen",
|
||||
Some(19) => "constraint",
|
||||
Some(17) => "schema",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
struct DbOutcomeTags {
|
||||
status: &'static str,
|
||||
error: &'static str,
|
||||
}
|
||||
|
||||
impl DbOutcomeTags {
|
||||
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
|
||||
match result {
|
||||
Ok(_) => Self {
|
||||
status: "success",
|
||||
error: "none",
|
||||
},
|
||||
Err(err) => Self {
|
||||
status: "failed",
|
||||
error: classify_error(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_sqlx_error(err: &sqlx::Error) -> &'static str {
|
||||
match err {
|
||||
sqlx::Error::Database(database_error) => {
|
||||
let code = database_error
|
||||
.code()
|
||||
.unwrap_or(Cow::Borrowed("none"))
|
||||
.to_string();
|
||||
classify_sqlite_code(code.as_str())
|
||||
}
|
||||
sqlx::Error::PoolTimedOut => "pool_timeout",
|
||||
sqlx::Error::Io(_) => "io",
|
||||
sqlx::Error::ColumnDecode { source, .. } if source.is::<serde_json::Error>() => "serde",
|
||||
sqlx::Error::Decode(source) if source.is::<serde_json::Error>() => "serde",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
fn record_counter(metrics: Option<&dyn DbMetricsRecorder>, name: &str, tags: &[(&str, &str)]) {
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.counter(name, /*inc*/ 1, tags);
|
||||
}
|
||||
}
|
||||
|
||||
fn record_duration(
|
||||
metrics: Option<&dyn DbMetricsRecorder>,
|
||||
name: &str,
|
||||
duration: Duration,
|
||||
tags: &[(&str, &str)],
|
||||
) {
|
||||
if let Some(metrics) = metrics {
|
||||
metrics.record_duration(name, duration, tags);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::DB_FALLBACK_METRIC;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Mutex;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestMetrics {
|
||||
events: Mutex<Vec<MetricEvent>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct MetricEvent {
|
||||
name: String,
|
||||
tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl TestMetrics {
|
||||
fn events(&self) -> Vec<MetricEvent> {
|
||||
self.events
|
||||
.lock()
|
||||
.expect("metrics lock")
|
||||
.iter()
|
||||
.map(|event| MetricEvent {
|
||||
name: event.name.clone(),
|
||||
tags: event.tags.clone(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl DbMetricsRecorder for TestMetrics {
|
||||
fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) {
|
||||
self.events.lock().expect("metrics lock").push(MetricEvent {
|
||||
name: name.to_string(),
|
||||
tags: tags_to_map(tags),
|
||||
});
|
||||
}
|
||||
|
||||
fn record_duration(&self, _name: &str, _duration: Duration, _tags: &[(&str, &str)]) {}
|
||||
}
|
||||
|
||||
fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap<String, String> {
|
||||
tags.iter()
|
||||
.map(|(key, value)| ((*key).to_string(), (*value).to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classifies_sqlite_primary_codes() {
|
||||
assert_eq!(classify_sqlite_code("5"), "busy");
|
||||
assert_eq!(classify_sqlite_code("6"), "locked");
|
||||
assert_eq!(classify_sqlite_code("14"), "cantopen");
|
||||
assert_eq!(classify_sqlite_code("2067"), "constraint");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classifies_non_sqlite_errors() {
|
||||
let io_error =
|
||||
anyhow::Error::new(std::io::Error::new(std::io::ErrorKind::NotFound, "missing"));
|
||||
assert_eq!(classify_error(&io_error), "io");
|
||||
|
||||
let serde_error =
|
||||
anyhow::Error::new(serde_json::from_str::<serde_json::Value>("not-json").unwrap_err());
|
||||
assert_eq!(classify_error(&serde_error), "serde");
|
||||
|
||||
let unknown_error = anyhow::anyhow!("plain failure");
|
||||
assert_eq!(classify_error(&unknown_error), "unknown");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classifies_sqlx_pool_timeout() {
|
||||
let err = anyhow::Error::new(sqlx::Error::PoolTimedOut);
|
||||
assert_eq!(classify_error(&err), "pool_timeout");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn records_fallback_metric_with_reason() {
|
||||
let metrics = TestMetrics::default();
|
||||
|
||||
record_fallback(Some(&metrics), "list_threads", "db_error");
|
||||
|
||||
assert_eq!(
|
||||
metrics.events(),
|
||||
vec![MetricEvent {
|
||||
name: DB_FALLBACK_METRIC.to_string(),
|
||||
tags: BTreeMap::from([
|
||||
("caller".to_string(), "list_threads".to_string()),
|
||||
("reason".to_string(), "db_error".to_string()),
|
||||
]),
|
||||
}]
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user