Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Lin
9ddf828d4c Add SQLite init and fallback telemetry 2026-05-07 19:24:28 -07:00
16 changed files with 631 additions and 64 deletions

View File

@@ -80,7 +80,7 @@ pub fn build_provider(
let service_name = service_name_override.unwrap_or(originator.value.as_str());
let runtime_metrics = config.features.enabled(Feature::RuntimeMetrics);
OtelProvider::from(&OtelSettings {
let provider = OtelProvider::from(&OtelSettings {
service_name: service_name.to_string(),
service_version: service_version.to_string(),
codex_home: config.codex_home.to_path_buf(),
@@ -91,7 +91,15 @@ pub fn build_provider(
runtime_metrics,
span_attributes: config.otel.span_attributes.clone(),
tracestate: config.otel.tracestate.clone(),
})
})?;
if let Some(provider) = provider.as_ref()
&& let Some(metrics) = provider.metrics()
{
let _ = codex_otel::record_process_start_once(metrics, originator.value.as_str());
}
Ok(provider)
}
/// Filter predicate for exporting only Codex-owned events via OTEL.

View File

@@ -200,10 +200,15 @@ pub async fn run_main(
mod tests {
use super::*;
use codex_config::types::OtelExporterKind;
use codex_config::types::OtelHttpProtocol;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
#[test]
fn mcp_server_defaults_analytics_to_enabled() {
@@ -212,14 +217,21 @@ mod tests {
#[tokio::test]
async fn mcp_server_builds_otel_provider_with_logs_traces_and_metrics() -> anyhow::Result<()> {
let collector = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(200))
.mount(&collector)
.await;
let codex_home = TempDir::new()?;
let mut config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await?;
let exporter = OtelExporterKind::OtlpGrpc {
endpoint: "http://localhost:4317".to_string(),
let exporter = OtelExporterKind::OtlpHttp {
endpoint: collector.uri(),
headers: HashMap::new(),
protocol: OtelHttpProtocol::Binary,
tls: None,
};
config.otel.exporter = exporter.clone();

View File

@@ -41,6 +41,7 @@ use std::time::Duration;
use tracing::debug;
const ENV_ATTRIBUTE: &str = "env";
const ARCH_ATTRIBUTE: &str = "arch";
const METER_NAME: &str = "codex";
const DURATION_UNIT: &str = "ms";
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";
@@ -198,13 +199,13 @@ impl MetricsClient {
validate_tags(&default_tags)?;
let mut resource_attributes = Vec::with_capacity(4);
let mut resource_attributes = Vec::with_capacity(5);
resource_attributes.push(KeyValue::new(
semconv::attribute::SERVICE_VERSION,
service_version,
));
resource_attributes.push(KeyValue::new(ENV_ATTRIBUTE, environment));
resource_attributes.extend(os_resource_attributes());
resource_attributes.extend(platform_resource_attributes());
let resource = Resource::builder()
.with_service_name(service_name)
@@ -290,12 +291,13 @@ impl MetricsClient {
}
}
fn os_resource_attributes() -> Vec<KeyValue> {
fn platform_resource_attributes() -> Vec<KeyValue> {
let os_info = os_info::get();
let os_type_raw = os_info.os_type().to_string();
let os_type = sanitize_metric_tag_value(os_type_raw.as_str());
let os_version_raw = os_info.version().to_string();
let os_version = sanitize_metric_tag_value(os_version_raw.as_str());
let arch = sanitize_metric_tag_value(std::env::consts::ARCH);
let mut attributes = Vec::new();
if os_type != "unspecified" {
attributes.push(KeyValue::new("os", os_type));
@@ -303,6 +305,9 @@ fn os_resource_attributes() -> Vec<KeyValue> {
if os_version != "unspecified" {
attributes.push(KeyValue::new("os_version", os_version));
}
if arch != "unspecified" {
attributes.push(KeyValue::new(ARCH_ATTRIBUTE, arch));
}
attributes
}

View File

@@ -2,6 +2,7 @@ mod client;
mod config;
mod error;
pub(crate) mod names;
mod process;
pub(crate) mod runtime_metrics;
pub(crate) mod tags;
pub(crate) mod timer;
@@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig;
pub use crate::metrics::config::MetricsExporter;
pub use crate::metrics::error::MetricsError;
pub use crate::metrics::error::Result;
pub use crate::metrics::process::record_process_start_once;
pub use names::*;
use std::sync::OnceLock;
pub use tags::ORIGINATOR_TAG;
pub use tags::SessionMetricTagValues;
pub use tags::bounded_originator_tag_value;
static GLOBAL_METRICS: OnceLock<MetricsClient> = OnceLock::new();
static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock<StatsigMetricsSettings> = OnceLock::new();

View File

@@ -1,6 +1,7 @@
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
pub const PROCESS_START_METRIC: &str = "codex.process.start";
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";

View File

@@ -0,0 +1,27 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use super::client::MetricsClient;
use super::error::Result;
use super::names::PROCESS_START_METRIC;
use super::tags::ORIGINATOR_TAG;
use super::tags::bounded_originator_tag_value;
static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false);
/// Record the process start counter at most once for this process.
pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result<bool> {
if PROCESS_START_RECORDED
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Ok(false);
}
metrics.counter(
PROCESS_START_METRIC,
/*inc*/ 1,
&[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))],
)?;
Ok(true)
}

View File

@@ -1,6 +1,7 @@
use crate::metrics::Result;
use crate::metrics::validation::validate_tag_key;
use crate::metrics::validation::validate_tag_value;
use codex_utils_string::sanitize_metric_tag_value;
pub const APP_VERSION_TAG: &str = "app.version";
pub const AUTH_MODE_TAG: &str = "auth_mode";
@@ -9,6 +10,24 @@ pub const ORIGINATOR_TAG: &str = "originator";
pub const SERVICE_NAME_TAG: &str = "service_name";
pub const SESSION_SOURCE_TAG: &str = "session_source";
const OTHER_ORIGINATOR_TAG_VALUE: &str = "other";
/// Returns a sanitized, low-cardinality originator value that is safe to use as a metric tag.
pub fn bounded_originator_tag_value(originator: &str) -> &'static str {
match sanitize_metric_tag_value(originator).as_str() {
"codex_desktop" => "codex_desktop",
"codex_cli_rs" => "codex_cli_rs",
"codex-tui" => "codex-tui",
"codex_vscode" => "codex_vscode",
"none" => "none",
"codex_exec" => "codex_exec",
"codex-cli" => "codex-cli",
"codex_sdk_ts" => "codex_sdk_ts",
"codex-app-server-sdk" => "codex-app-server-sdk",
_ => OTHER_ORIGINATOR_TAG_VALUE,
}
}
pub struct SessionMetricTagValues<'a> {
pub auth_mode: Option<&'a str>,
pub session_source: &'a str,

View File

@@ -10,6 +10,7 @@ pub(crate) mod metadata;
pub(crate) mod policy;
pub(crate) mod recorder;
pub(crate) mod session_index;
pub(crate) mod sqlite_metrics;
pub mod state_db;
pub(crate) mod default_client {

View File

@@ -1279,6 +1279,7 @@ async fn find_thread_path_by_id_str_in_subdir(
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
);
crate::sqlite_metrics::record_fallback("find_thread_path", "mismatch");
}
Err(err) => {
tracing::debug!(
@@ -1296,6 +1297,7 @@ async fn find_thread_path_by_id_str_in_subdir(
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
);
crate::sqlite_metrics::record_fallback("find_thread_path", "stale_path");
}
}
@@ -1323,6 +1325,12 @@ async fn find_thread_path_by_id_str_in_subdir(
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back"
);
let reason = if state_db_ctx.is_some() {
"missing_row"
} else {
"db_unavailable"
};
crate::sqlite_metrics::record_fallback("find_thread_path", reason);
state_db::read_repair_rollout_path(
state_db_ctx,
thread_id,

View File

@@ -450,6 +450,7 @@ impl RolloutRecorder {
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
crate::sqlite_metrics::record_fallback("list_threads", "db_unavailable");
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
@@ -569,6 +570,7 @@ impl RolloutRecorder {
}
if listing_has_metadata_filters {
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
crate::sqlite_metrics::record_fallback("list_threads", "db_error");
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
page,
@@ -578,6 +580,7 @@ impl RolloutRecorder {
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
crate::sqlite_metrics::record_fallback("list_threads", "db_error");
Ok(page_from_filesystem_scan(
fs_page,
sort_direction,

View File

@@ -0,0 +1,49 @@
use std::sync::Arc;
use std::time::Duration;
use codex_otel::ORIGINATOR_TAG;
use codex_otel::bounded_originator_tag_value;
use codex_state::DbMetricsRecorder;
use codex_state::DbMetricsRecorderHandle;
use crate::default_client::originator;
struct OtelDbMetrics {
metrics: codex_otel::MetricsClient,
originator: &'static str,
}
impl DbMetricsRecorder for OtelDbMetrics {
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) {
let tags = sqlite_originator_tags(tags, self.originator);
let _ = self.metrics.counter(name, inc, &tags);
}
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
let tags = sqlite_originator_tags(tags, self.originator);
let _ = self.metrics.record_duration(name, duration, &tags);
}
}
pub(crate) fn global() -> Option<DbMetricsRecorderHandle> {
codex_otel::global().map(|metrics| {
Arc::new(OtelDbMetrics {
metrics,
originator: bounded_originator_tag_value(originator().value.as_str()),
}) as DbMetricsRecorderHandle
})
}
pub(crate) fn record_fallback(caller: &'static str, reason: &'static str) {
let metrics = global();
codex_state::record_db_fallback_metric(metrics.as_deref(), caller, reason);
}
fn sqlite_originator_tags<'a>(
tags: &[(&'a str, &'a str)],
originator: &'static str,
) -> Vec<(&'a str, &'a str)> {
let mut tags = tags.to_vec();
tags.push((ORIGINATOR_TAG, originator));
tags
}

View File

@@ -4,6 +4,7 @@ use crate::list::Cursor;
use crate::list::SortDirection;
use crate::list::ThreadSortKey;
use crate::metadata;
use crate::sqlite_metrics;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -106,52 +107,80 @@ async fn try_init_with_roots_inner(
default_model_provider_id: String,
backfill_lease_seconds: Option<i64>,
) -> anyhow::Result<StateDbHandle> {
let runtime =
codex_state::StateRuntime::init(sqlite_home.clone(), default_model_provider_id.clone())
.await
.map_err(|err| {
anyhow::anyhow!(
"failed to initialize state runtime at {}: {err}",
sqlite_home.display()
)
})?;
let metrics = sqlite_metrics::global();
let runtime = codex_state::StateRuntime::init_with_metrics(
sqlite_home.clone(),
default_model_provider_id.clone(),
metrics.clone(),
)
.await
.map_err(|err| {
anyhow::anyhow!(
"failed to initialize state runtime at {}: {err}",
sqlite_home.display()
)
})?;
let backfill_gate_started = Instant::now();
let backfill_gate_result = wait_for_startup_backfill(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
backfill_lease_seconds,
)
.await;
codex_state::record_db_init_backfill_gate_metric(
metrics.as_deref(),
backfill_gate_started.elapsed(),
&backfill_gate_result,
);
backfill_gate_result?;
Ok(runtime)
}
async fn wait_for_startup_backfill(
runtime: &codex_state::StateRuntime,
codex_home: &Path,
default_model_provider_id: &str,
backfill_lease_seconds: Option<i64>,
) -> anyhow::Result<()> {
let wait_started = Instant::now();
let mut reported_wait = false;
loop {
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
anyhow::anyhow!(
"failed to read backfill state at {}: {err}",
codex_home.display()
)
})?;
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
return Err(anyhow::anyhow!(
"failed to read backfill state at {}: {err}",
codex_home.display()
));
}
};
if backfill_state.status == codex_state::BackfillStatus::Complete {
return Ok(runtime);
return Ok(());
}
if let Some(backfill_lease_seconds) = backfill_lease_seconds {
metadata::backfill_sessions_with_lease(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
runtime,
codex_home,
default_model_provider_id,
backfill_lease_seconds,
)
.await;
} else {
metadata::backfill_sessions(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
)
.await;
metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await;
}
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
anyhow::anyhow!(
"failed to read backfill state at {} after startup backfill: {err}",
codex_home.display()
)
})?;
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
return Err(anyhow::anyhow!(
"failed to read backfill state at {} after startup backfill: {err}",
codex_home.display()
));
}
};
if backfill_state.status == codex_state::BackfillStatus::Complete {
return Ok(runtime);
return Ok(());
}
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
return Err(anyhow::anyhow!(
@@ -193,22 +222,36 @@ fn emit_startup_warning(message: &str) {
/// Unlike [`init`], this helper does not run rollout backfill. It is for
/// optional local reads from non-owning contexts such as remote app-server mode.
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
let metrics = sqlite_metrics::global();
let state_path = codex_state::state_db_path(config.sqlite_home());
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
codex_state::record_db_fallback_metric(
metrics.as_deref(),
"get_state_db",
"db_unavailable",
);
return None;
}
let runtime = codex_state::StateRuntime::init(
let runtime = match codex_state::StateRuntime::init_with_metrics(
config.sqlite_home().to_path_buf(),
config.model_provider_id().to_string(),
metrics.clone(),
)
.await
.ok()?;
require_backfill_complete(runtime, config.sqlite_home()).await
{
Ok(runtime) => runtime,
Err(_) => {
codex_state::record_db_fallback_metric(metrics.as_deref(), "get_state_db", "db_error");
return None;
}
};
require_backfill_complete(runtime, config.sqlite_home(), metrics.as_deref()).await
}
async fn require_backfill_complete(
runtime: StateDbHandle,
codex_home: &Path,
metrics: Option<&dyn codex_state::DbMetricsRecorder>,
) -> Option<StateDbHandle> {
match runtime.get_backfill_state().await {
Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime),
@@ -218,6 +261,7 @@ async fn require_backfill_complete(
codex_home.display(),
state.status.as_str()
);
codex_state::record_db_fallback_metric(metrics, "get_state_db", "backfill_incomplete");
None
}
Err(err) => {
@@ -225,6 +269,7 @@ async fn require_backfill_complete(
"failed to read backfill state at {}: {err}",
codex_home.display()
);
codex_state::record_db_fallback_metric(metrics, "get_state_db", "db_error");
None
}
}

View File

@@ -10,12 +10,13 @@ mod migrations;
mod model;
mod paths;
mod runtime;
mod telemetry;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
pub use model::Phase2JobClaimOutcome;
/// Preferred entrypoint: owns configuration and metrics.
/// Preferred entrypoint: owns SQLite configuration and optional metrics injection.
pub use runtime::StateRuntime;
/// Low-level storage engine: useful for focused tests.
@@ -56,6 +57,8 @@ pub use runtime::logs_db_filename;
pub use runtime::logs_db_path;
pub use runtime::state_db_filename;
pub use runtime::state_db_path;
pub use telemetry::DbMetricsRecorder;
pub use telemetry::DbMetricsRecorderHandle;
/// Environment variable for overriding the SQLite state database home directory.
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
@@ -71,3 +74,25 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error";
pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill";
/// Metrics on backfill duration. Tags: [status]
pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms";
/// SQLite startup initialization attempts. Tags: [status, phase, db, error]
pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count";
/// SQLite startup initialization duration. Tags: [status, phase, db, error]
pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms";
/// Filesystem fallback after SQLite could not serve a request. Tags: [caller, reason]
pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count";
pub fn record_db_fallback_metric(
metrics: Option<&dyn DbMetricsRecorder>,
caller: &'static str,
reason: &'static str,
) {
telemetry::record_fallback(metrics, caller, reason);
}
pub fn record_db_init_backfill_gate_metric(
metrics: Option<&dyn DbMetricsRecorder>,
duration: std::time::Duration,
result: &anyhow::Result<()>,
) {
telemetry::record_init_backfill_gate(metrics, duration, result);
}

View File

@@ -27,6 +27,9 @@ use crate::model::datetime_to_epoch_millis;
use crate::model::datetime_to_epoch_seconds;
use crate::model::epoch_millis_to_datetime;
use crate::paths::file_modified_time_utc;
use crate::telemetry::DbKind;
use crate::telemetry::DbMetricsRecorder;
use crate::telemetry::DbMetricsRecorderHandle;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -52,6 +55,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::time::Duration;
use std::time::Instant;
use tracing::warn;
mod agent_jobs;
@@ -93,8 +97,18 @@ impl StateRuntime {
///
/// This opens (and migrates) the SQLite databases under `codex_home`,
/// keeping logs in a dedicated file to reduce lock contention with the
/// rest of the state store.
/// rest of the state store. Use [`Self::init_with_metrics`] when the caller
/// has a metrics sink to attach.
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
Self::init_with_metrics(codex_home, default_provider, /*metrics*/ None).await
}
/// Initialize the state runtime with an explicit metrics client.
pub async fn init_with_metrics(
codex_home: PathBuf,
default_provider: String,
metrics: Option<DbMetricsRecorderHandle>,
) -> anyhow::Result<Arc<Self>> {
tokio::fs::create_dir_all(&codex_home).await?;
let state_migrator = runtime_state_migrator();
let logs_migrator = runtime_logs_migrator();
@@ -116,24 +130,45 @@ impl StateRuntime {
.await;
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
let pool = match open_state_sqlite(&state_path, &state_migrator, metrics.as_deref()).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator, metrics.as_deref()).await
{
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
return Err(err);
}
};
let thread_updated_at_millis: Option<i64> =
let started = Instant::now();
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
crate::telemetry::record_init_result(
metrics.as_deref(),
DbKind::State,
"ensure_backfill_state",
started.elapsed(),
&backfill_state_result,
);
backfill_state_result?;
let started = Instant::now();
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
.fetch_one(pool.as_ref())
.await?;
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
metrics.as_deref(),
DbKind::State,
"post_init_query",
started.elapsed(),
&thread_updated_at_millis_result,
);
let thread_updated_at_millis = thread_updated_at_millis_result?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
pool,
@@ -165,29 +200,90 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250))
}
async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
async fn open_state_sqlite(
path: &Path,
migrator: &Migrator,
metrics: Option<&dyn DbMetricsRecorder>,
) -> anyhow::Result<SqlitePool> {
// New state DBs should use incremental auto-vacuum, but retrofitting an
// existing DB requires a full VACUUM. Do not attempt that during process
// startup: it is maintenance work that can contend with foreground writers.
open_sqlite(
path,
migrator,
metrics,
DbKind::State,
"open_state",
"migrate_state",
)
.await
}
async fn open_logs_sqlite(
path: &Path,
migrator: &Migrator,
metrics: Option<&dyn DbMetricsRecorder>,
) -> anyhow::Result<SqlitePool> {
open_sqlite(
path,
migrator,
metrics,
DbKind::Logs,
"open_logs",
"migrate_logs",
)
.await
}
async fn open_sqlite(
path: &Path,
migrator: &Migrator,
metrics: Option<&dyn DbMetricsRecorder>,
db: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
let started = Instant::now();
let pool_result = SqlitePoolOptions::new()
.max_connections(5)
.acquire_slow_level(LevelFilter::Warn)
.acquire_slow_threshold(Duration::from_millis(250))
.connect_with(options)
.await?;
migrator.run(&pool).await?;
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(metrics, db, open_phase, started.elapsed(), &pool_result);
let pool = pool_result?;
let started = Instant::now();
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
metrics,
db,
migrate_phase,
started.elapsed(),
&migrate_result,
);
migrate_result?;
Ok(pool)
}
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
migrator.run(&pool).await?;
Ok(pool)
async fn ensure_backfill_state_row_in_pool(pool: &sqlx::SqlitePool) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(pool)
.await?;
Ok(())
}
fn db_filename(base_name: &str, version: u32) -> String {
@@ -355,9 +451,13 @@ mod tests {
strict_pool.close().await;
let tolerant_migrator = runtime_state_migrator();
let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator)
.await
.expect("runtime migrator should tolerate newer applied migrations");
let tolerant_pool = open_state_sqlite(
state_path.as_path(),
&tolerant_migrator,
/*metrics*/ None,
)
.await
.expect("runtime migrator should tolerate newer applied migrations");
tolerant_pool.close().await;
let _ = tokio::fs::remove_dir_all(codex_home).await;

View File

@@ -2,7 +2,6 @@ use super::*;
impl StateRuntime {
pub async fn get_backfill_state(&self) -> anyhow::Result<crate::BackfillState> {
self.ensure_backfill_state_row().await?;
let row = sqlx::query(
r#"
SELECT status, last_watermark, last_success_at

View File

@@ -0,0 +1,261 @@
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use crate::DB_FALLBACK_METRIC;
use crate::DB_INIT_DURATION_METRIC;
use crate::DB_INIT_METRIC;
/// Low-cardinality metrics sink used by the SQLite state runtime.
///
/// Implementations should ignore recording errors locally. Database operations
/// must never fail because telemetry delivery failed.
pub trait DbMetricsRecorder: Send + Sync + 'static {
/// Increment a counter metric by `inc` with low-cardinality tags.
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]);
/// Record an elapsed duration metric with low-cardinality tags.
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]);
}
/// Shared recorder handle stored by rollout SQLite telemetry plumbing.
pub type DbMetricsRecorderHandle = Arc<dyn DbMetricsRecorder>;
#[derive(Clone, Copy)]
pub(crate) enum DbKind {
State,
Logs,
}
impl DbKind {
fn as_str(self) -> &'static str {
match self {
Self::State => "state",
Self::Logs => "logs",
}
}
}
pub(crate) fn record_init_result<T>(
metrics: Option<&dyn DbMetricsRecorder>,
db: DbKind,
phase: &'static str,
duration: Duration,
result: &anyhow::Result<T>,
) {
let outcome = DbOutcomeTags::from_result(result);
let tags = [
("status", outcome.status),
("phase", phase),
("db", db.as_str()),
("error", outcome.error),
];
record_counter(metrics, DB_INIT_METRIC, &tags);
record_duration(metrics, DB_INIT_DURATION_METRIC, duration, &tags);
}
pub fn record_fallback(
metrics: Option<&dyn DbMetricsRecorder>,
caller: &'static str,
reason: &'static str,
) {
let tags = [("caller", caller), ("reason", reason)];
record_counter(metrics, DB_FALLBACK_METRIC, &tags);
}
pub fn record_init_backfill_gate(
metrics: Option<&dyn DbMetricsRecorder>,
duration: Duration,
result: &anyhow::Result<()>,
) {
record_init_result(metrics, DbKind::State, "backfill_gate", duration, result);
}
pub(crate) fn classify_error(err: &anyhow::Error) -> &'static str {
for cause in err.chain() {
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
return classify_sqlx_error(sqlx_err);
}
if cause
.downcast_ref::<sqlx::migrate::MigrateError>()
.is_some()
{
return "migration";
}
if cause.downcast_ref::<serde_json::Error>().is_some() {
return "serde";
}
if cause.downcast_ref::<std::io::Error>().is_some() {
return "io";
}
}
"unknown"
}
pub(crate) fn classify_sqlite_code(code: &str) -> &'static str {
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
match primary_code {
Some(5) => "busy",
Some(6) => "locked",
Some(8) => "readonly",
Some(10) => "io",
Some(11) => "corrupt",
Some(13) => "full",
Some(14) => "cantopen",
Some(19) => "constraint",
Some(17) => "schema",
_ => "unknown",
}
}
struct DbOutcomeTags {
status: &'static str,
error: &'static str,
}
impl DbOutcomeTags {
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
match result {
Ok(_) => Self {
status: "success",
error: "none",
},
Err(err) => Self {
status: "failed",
error: classify_error(err),
},
}
}
}
fn classify_sqlx_error(err: &sqlx::Error) -> &'static str {
match err {
sqlx::Error::Database(database_error) => {
let code = database_error
.code()
.unwrap_or(Cow::Borrowed("none"))
.to_string();
classify_sqlite_code(code.as_str())
}
sqlx::Error::PoolTimedOut => "pool_timeout",
sqlx::Error::Io(_) => "io",
sqlx::Error::ColumnDecode { source, .. } if source.is::<serde_json::Error>() => "serde",
sqlx::Error::Decode(source) if source.is::<serde_json::Error>() => "serde",
_ => "unknown",
}
}
fn record_counter(metrics: Option<&dyn DbMetricsRecorder>, name: &str, tags: &[(&str, &str)]) {
if let Some(metrics) = metrics {
metrics.counter(name, /*inc*/ 1, tags);
}
}
fn record_duration(
metrics: Option<&dyn DbMetricsRecorder>,
name: &str,
duration: Duration,
tags: &[(&str, &str)],
) {
if let Some(metrics) = metrics {
metrics.record_duration(name, duration, tags);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DB_FALLBACK_METRIC;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use std::sync::Mutex;
#[derive(Default)]
struct TestMetrics {
events: Mutex<Vec<MetricEvent>>,
}
#[derive(Debug, Eq, PartialEq)]
struct MetricEvent {
name: String,
tags: BTreeMap<String, String>,
}
impl TestMetrics {
fn events(&self) -> Vec<MetricEvent> {
self.events
.lock()
.expect("metrics lock")
.iter()
.map(|event| MetricEvent {
name: event.name.clone(),
tags: event.tags.clone(),
})
.collect()
}
}
impl DbMetricsRecorder for TestMetrics {
fn counter(&self, name: &str, _inc: i64, tags: &[(&str, &str)]) {
self.events.lock().expect("metrics lock").push(MetricEvent {
name: name.to_string(),
tags: tags_to_map(tags),
});
}
fn record_duration(&self, _name: &str, _duration: Duration, _tags: &[(&str, &str)]) {}
}
fn tags_to_map(tags: &[(&str, &str)]) -> BTreeMap<String, String> {
tags.iter()
.map(|(key, value)| ((*key).to_string(), (*value).to_string()))
.collect()
}
#[test]
fn classifies_sqlite_primary_codes() {
assert_eq!(classify_sqlite_code("5"), "busy");
assert_eq!(classify_sqlite_code("6"), "locked");
assert_eq!(classify_sqlite_code("14"), "cantopen");
assert_eq!(classify_sqlite_code("2067"), "constraint");
}
#[test]
fn classifies_non_sqlite_errors() {
let io_error =
anyhow::Error::new(std::io::Error::new(std::io::ErrorKind::NotFound, "missing"));
assert_eq!(classify_error(&io_error), "io");
let serde_error =
anyhow::Error::new(serde_json::from_str::<serde_json::Value>("not-json").unwrap_err());
assert_eq!(classify_error(&serde_error), "serde");
let unknown_error = anyhow::anyhow!("plain failure");
assert_eq!(classify_error(&unknown_error), "unknown");
}
#[test]
fn classifies_sqlx_pool_timeout() {
let err = anyhow::Error::new(sqlx::Error::PoolTimedOut);
assert_eq!(classify_error(&err), "pool_timeout");
}
#[test]
fn records_fallback_metric_with_reason() {
let metrics = TestMetrics::default();
record_fallback(Some(&metrics), "list_threads", "db_error");
assert_eq!(
metrics.events(),
vec![MetricEvent {
name: DB_FALLBACK_METRIC.to_string(),
tags: BTreeMap::from([
("caller".to_string(), "list_threads".to_string()),
("reason".to_string(), "db_error".to_string()),
]),
}]
);
}
}