Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
2d5a5a1248 Add process-scoped SQLite telemetry
Co-authored-by: Codex <noreply@openai.com>
2026-05-11 14:36:31 +01:00
22 changed files with 556 additions and 128 deletions

View File

@@ -109,6 +109,7 @@ pub use crate::transport::auth::AppServerWebsocketAuthSettings;
pub use crate::transport::auth::WebsocketAuthCliMode;
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
const OTEL_SERVICE_NAME: &str = "codex-app-server";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum LogFormat {
@@ -512,6 +513,20 @@ pub async fn run_main_with_transport_options(
}
};
let otel = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
Some(OTEL_SERVICE_NAME),
default_analytics_enabled,
)
.map_err(|e| {
std::io::Error::new(
ErrorKind::InvalidData,
format!("error loading otel config: {e}"),
)
})?;
codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME);
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME);
let state_db_result = rollout_state_db::try_init(&config).await;
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
let state_db = state_db_result.ok();
@@ -591,19 +606,6 @@ pub async fn run_main_with_transport_options(
let feedback = CodexFeedback::new();
let otel = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
Some("codex-app-server"),
default_analytics_enabled,
)
.map_err(|e| {
std::io::Error::new(
ErrorKind::InvalidData,
format!("error loading otel config: {e}"),
)
})?;
// Install a simple subscriber so `tracing` output is visible. Users can
// control the log level with `RUST_LOG` and switch to JSON logs with
// `LOG_FORMAT=json`.

View File

@@ -65,7 +65,6 @@ use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
@@ -298,7 +297,7 @@ impl MessageProcessor {
// The thread store is intentionally process-scoped. Config reloads can
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),

View File

@@ -277,7 +277,6 @@ use codex_core::exec::ExecCapturePolicy;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::find_thread_path_by_id_str;
use codex_core::path_utils;
#[cfg(test)]
use codex_core::read_head_for_summary;

View File

@@ -107,7 +107,7 @@ impl ThreadGoalRequestProcessor {
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => find_thread_path_by_id_str(
None => codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),
@@ -272,7 +272,7 @@ impl ThreadGoalRequestProcessor {
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => find_thread_path_by_id_str(
None => codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),
@@ -336,7 +336,7 @@ impl ThreadGoalRequestProcessor {
return Ok(state_db);
}
} else {
find_thread_path_by_id_str(
codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),

View File

@@ -99,3 +99,18 @@ pub fn build_provider(
pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool {
meta.target().starts_with("codex_otel")
}
pub fn record_process_start(otel: Option<&OtelProvider>, originator: &str) {
let Some(metrics) = otel.and_then(OtelProvider::metrics) else {
return;
};
let _ = codex_otel::record_process_start_once(metrics, originator);
}
pub fn install_sqlite_telemetry(otel: Option<&OtelProvider>, originator: &str) {
let Some(metrics) = otel.and_then(OtelProvider::metrics) else {
return;
};
let telemetry = codex_rollout::sqlite_telemetry_recorder(metrics.clone(), originator);
let _ = codex_state::install_process_db_telemetry(telemetry);
}

View File

@@ -478,6 +478,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
None
}
};
codex_core::otel_init::record_process_start(otel.as_ref(), "codex_exec");
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), "codex_exec");
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());

View File

@@ -74,19 +74,6 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
set_default_client_residency_requirement(config.enforce_residency.value());
let state_db = codex_core::init_state_db(&config).await;
let environment_manager = Arc::new(
EnvironmentManager::from_codex_home(
config.codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
)
.await
.map_err(std::io::Error::other)?,
);
let otel = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
@@ -99,6 +86,20 @@ pub async fn run_main(
format!("error loading otel config: {e}"),
)
})?;
codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME);
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME);
let state_db = codex_core::init_state_db(&config).await;
let environment_manager = Arc::new(
EnvironmentManager::from_codex_home(
config.codex_home.clone(),
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
arg0_paths.codex_linux_sandbox_exe.clone(),
)?,
)
.await
.map_err(std::io::Error::other)?,
);
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)

View File

@@ -5,7 +5,6 @@ use codex_arg0::Arg0DispatchPaths;
use codex_core::StateDbHandle;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::empty_extension_registry;
use codex_login::AuthManager;
@@ -71,7 +70,7 @@ impl MessageProcessor {
environment_manager,
empty_extension_registry(),
/*analytics_events_client*/ None,
thread_store_from_config(config.as_ref(), state_db.clone()),
codex_core::thread_store_from_config(config.as_ref(), state_db.clone()),
state_db.clone(),
installation_id,
/*attestation_provider*/ None,

View File

@@ -2,6 +2,7 @@ mod client;
mod config;
mod error;
pub(crate) mod names;
mod process;
pub(crate) mod runtime_metrics;
pub(crate) mod tags;
pub(crate) mod timer;
@@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig;
pub use crate::metrics::config::MetricsExporter;
pub use crate::metrics::error::MetricsError;
pub use crate::metrics::error::Result;
pub use crate::metrics::process::record_process_start_once;
pub use names::*;
use std::sync::OnceLock;
pub use tags::ORIGINATOR_TAG;
pub use tags::SessionMetricTagValues;
pub use tags::bounded_originator_tag_value;
static GLOBAL_METRICS: OnceLock<MetricsClient> = OnceLock::new();
static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock<StatsigMetricsSettings> = OnceLock::new();

View File

@@ -1,6 +1,7 @@
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
pub const PROCESS_START_METRIC: &str = "codex.process.start";
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";

View File

@@ -0,0 +1,27 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use super::client::MetricsClient;
use super::error::Result;
use super::names::PROCESS_START_METRIC;
use super::tags::ORIGINATOR_TAG;
use super::tags::bounded_originator_tag_value;
static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false);
/// Record the process start counter at most once for this process.
pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result<bool> {
if PROCESS_START_RECORDED
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Ok(false);
}
metrics.counter(
PROCESS_START_METRIC,
/*inc*/ 1,
&[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))],
)?;
Ok(true)
}

View File

@@ -1,6 +1,7 @@
use crate::metrics::Result;
use crate::metrics::validation::validate_tag_key;
use crate::metrics::validation::validate_tag_value;
use codex_utils_string::sanitize_metric_tag_value;
pub const APP_VERSION_TAG: &str = "app.version";
pub const AUTH_MODE_TAG: &str = "auth_mode";
@@ -9,6 +10,31 @@ pub const ORIGINATOR_TAG: &str = "originator";
pub const SERVICE_NAME_TAG: &str = "service_name";
pub const SESSION_SOURCE_TAG: &str = "session_source";
const OTHER_ORIGINATOR_TAG_VALUE: &str = "other";
const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[
"codex_desktop",
"codex-app-server",
"codex_mcp_server",
"codex_cli_rs",
"codex-tui",
"codex_vscode",
"none",
"codex_exec",
"codex-cli",
"codex_sdk_ts",
"codex-app-server-sdk",
];
/// Return a known low-cardinality originator tag value, or `other`.
pub fn bounded_originator_tag_value(originator: &str) -> &'static str {
let sanitized = sanitize_metric_tag_value(originator);
KNOWN_ORIGINATOR_TAG_VALUES
.iter()
.copied()
.find(|known| *known == sanitized.as_str())
.unwrap_or(OTHER_ORIGINATOR_TAG_VALUE)
}
pub struct SessionMetricTagValues<'a> {
pub auth_mode: Option<&'a str>,
pub session_source: &'a str,

View File

@@ -10,6 +10,7 @@ pub(crate) mod metadata;
pub(crate) mod policy;
pub(crate) mod recorder;
pub(crate) mod session_index;
mod sqlite_metrics;
pub mod state_db;
pub(crate) mod default_client {
@@ -63,6 +64,7 @@ pub use session_index::find_thread_meta_by_name_str;
pub use session_index::find_thread_name_by_id;
pub use session_index::find_thread_names_by_ids;
pub use state_db::StateDbHandle;
pub use state_db::sqlite_telemetry_recorder;
#[cfg(test)]
mod tests;

View File

@@ -1255,47 +1255,57 @@ async fn find_thread_path_by_id_str_in_subdir(
};
let thread_id = ThreadId::from_string(id_str).ok();
let mut unverified_db_path = None;
let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable");
if let Some(state_db_ctx) = state_db_ctx
&& let Some(thread_id) = thread_id
&& let Some(db_path) = state_db::find_rollout_path_by_id(
Some(state_db_ctx),
thread_id,
archived_only,
"find_path_query",
)
.await
{
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
match read_session_meta_line(&db_path).await {
Ok(meta_line) if meta_line.meta.id == thread_id => {
return Ok(Some(db_path));
}
Ok(meta_line) => {
match state_db_ctx
.find_rollout_path_by_id(thread_id, archived_only)
.await
{
Ok(Some(db_path)) => {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
match read_session_meta_line(&db_path).await {
Ok(meta_line) if meta_line.meta.id == thread_id => {
return Ok(Some(db_path));
}
Ok(meta_line) => {
tracing::error!(
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
meta_line.meta.id,
db_path.display()
);
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
);
codex_state::record_fallback(None, "find_thread_path", "mismatch");
}
Err(err) => {
tracing::debug!(
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
db_path.display()
);
unverified_db_path = Some(db_path);
}
}
} else {
tracing::error!(
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
meta_line.meta.id,
"state db returned stale rollout path for thread {id_str}: {}",
db_path.display()
);
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
);
}
Err(err) => {
tracing::debug!(
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
db_path.display()
);
unverified_db_path = Some(db_path);
codex_state::record_fallback(None, "find_thread_path", "stale_path");
}
}
} else {
tracing::error!(
"state db returned stale rollout path for thread {id_str}: {}",
db_path.display()
);
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
);
Ok(None) => fallback_reason = Some("missing_row"),
Err(err) => {
tracing::warn!(
"state db find_rollout_path_by_id failed during find_path_query: {err}"
);
fallback_reason = Some("db_error");
}
}
}
@@ -1323,6 +1333,9 @@ async fn find_thread_path_by_id_str_in_subdir(
tracing::warn!(
"state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back"
);
if let Some(reason) = fallback_reason {
codex_state::record_fallback(None, "find_thread_path", reason);
}
state_db::read_repair_rollout_path(
state_db_ctx,
thread_id,

View File

@@ -450,6 +450,7 @@ impl RolloutRecorder {
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
codex_state::record_fallback(None, "list_threads", "db_unavailable");
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
@@ -558,6 +559,7 @@ impl RolloutRecorder {
)
.await;
}
codex_state::record_fallback(None, "list_threads", "metadata_filter");
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
@@ -569,6 +571,7 @@ impl RolloutRecorder {
}
if listing_has_metadata_filters {
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
codex_state::record_fallback(None, "list_threads", "db_error");
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
page,
@@ -578,6 +581,7 @@ impl RolloutRecorder {
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
codex_state::record_fallback(None, "list_threads", "db_error");
Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
@@ -601,6 +605,7 @@ impl RolloutRecorder {
) -> std::io::Result<Option<PathBuf>> {
let codex_home = config.codex_home();
let cwd_filter = filter_cwd.map(Path::to_path_buf);
let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable");
if state_db_ctx.is_some() {
let mut db_cursor = cursor.cloned();
loop {
@@ -619,6 +624,7 @@ impl RolloutRecorder {
)
.await
else {
fallback_reason = Some("db_error");
break;
};
if let Some(path) =
@@ -628,10 +634,14 @@ impl RolloutRecorder {
}
db_cursor = db_page.next_anchor.map(Into::into);
if db_cursor.is_none() {
fallback_reason = Some("missing_row");
break;
}
}
}
if let Some(reason) = fallback_reason {
codex_state::record_fallback(None, "find_latest_thread_path", reason);
}
let mut cursor = cursor.cloned();
loop {

View File

@@ -0,0 +1,40 @@
use std::sync::Arc;
use std::time::Duration;
use codex_otel::ORIGINATOR_TAG;
use codex_otel::bounded_originator_tag_value;
use codex_state::DbTelemetry;
use codex_state::DbTelemetryHandle;
struct OtelSqliteTelemetry {
metrics: codex_otel::MetricsClient,
originator: &'static str,
}
impl DbTelemetry for OtelSqliteTelemetry {
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) {
let tags = with_originator(tags, self.originator);
let _ = self.metrics.counter(name, inc, &tags);
}
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
let tags = with_originator(tags, self.originator);
let _ = self.metrics.record_duration(name, duration, &tags);
}
}
pub(crate) fn recorder(metrics: codex_otel::MetricsClient, originator: &str) -> DbTelemetryHandle {
Arc::new(OtelSqliteTelemetry {
metrics,
originator: bounded_originator_tag_value(originator),
})
}
fn with_originator<'a>(
tags: &[(&'a str, &'a str)],
originator: &'static str,
) -> Vec<(&'a str, &'a str)> {
let mut tags = tags.to_vec();
tags.push((ORIGINATOR_TAG, originator));
tags
}

View File

@@ -4,6 +4,7 @@ use crate::list::Cursor;
use crate::list::SortDirection;
use crate::list::ThreadSortKey;
use crate::metadata;
use crate::sqlite_metrics;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -115,6 +116,25 @@ async fn try_init_with_roots_inner(
sqlite_home.display()
)
})?;
let backfill_gate_started = Instant::now();
let backfill_gate_result = wait_for_backfill_gate(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
backfill_lease_seconds,
)
.await;
codex_state::record_backfill_gate(None, backfill_gate_started.elapsed(), &backfill_gate_result);
backfill_gate_result?;
Ok(runtime)
}
async fn wait_for_backfill_gate(
runtime: &codex_state::StateRuntime,
codex_home: &Path,
default_model_provider_id: &str,
backfill_lease_seconds: Option<i64>,
) -> anyhow::Result<()> {
let wait_started = Instant::now();
let mut reported_wait = false;
loop {
@@ -125,24 +145,19 @@ async fn try_init_with_roots_inner(
)
})?;
if backfill_state.status == codex_state::BackfillStatus::Complete {
return Ok(runtime);
return Ok(());
}
if let Some(backfill_lease_seconds) = backfill_lease_seconds {
metadata::backfill_sessions_with_lease(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
runtime,
codex_home,
default_model_provider_id,
backfill_lease_seconds,
)
.await;
} else {
metadata::backfill_sessions(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
)
.await;
metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await;
}
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
anyhow::anyhow!(
@@ -151,7 +166,7 @@ async fn try_init_with_roots_inner(
)
})?;
if backfill_state.status == codex_state::BackfillStatus::Complete {
return Ok(runtime);
return Ok(());
}
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
return Err(anyhow::anyhow!(
@@ -195,17 +210,32 @@ fn emit_startup_warning(message: &str) {
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
let state_path = codex_state::state_db_path(config.sqlite_home());
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
codex_state::record_fallback(None, "get_state_db", "db_unavailable");
return None;
}
let runtime = codex_state::StateRuntime::init(
let runtime = match codex_state::StateRuntime::init(
config.sqlite_home().to_path_buf(),
config.model_provider_id().to_string(),
)
.await
.ok()?;
{
Ok(runtime) => runtime,
Err(_) => {
codex_state::record_fallback(None, "get_state_db", "db_error");
return None;
}
};
require_backfill_complete(runtime, config.sqlite_home()).await
}
/// Build a SQLite telemetry recorder backed by an OTEL metrics client.
pub fn sqlite_telemetry_recorder(
metrics: codex_otel::MetricsClient,
originator: &str,
) -> codex_state::DbTelemetryHandle {
sqlite_metrics::recorder(metrics, originator)
}
async fn require_backfill_complete(
runtime: StateDbHandle,
codex_home: &Path,
@@ -218,6 +248,7 @@ async fn require_backfill_complete(
codex_home.display(),
state.status.as_str()
);
codex_state::record_fallback(None, "get_state_db", "backfill_incomplete");
None
}
Err(err) => {
@@ -225,6 +256,7 @@ async fn require_backfill_complete(
"failed to read backfill state at {}: {err}",
codex_home.display()
);
codex_state::record_fallback(None, "get_state_db", "db_error");
None
}
}

View File

@@ -10,6 +10,7 @@ mod migrations;
mod model;
mod paths;
mod runtime;
mod telemetry;
pub use model::LogEntry;
pub use model::LogQuery;
@@ -56,6 +57,11 @@ pub use runtime::logs_db_filename;
pub use runtime::logs_db_path;
pub use runtime::state_db_filename;
pub use runtime::state_db_path;
pub use telemetry::DbTelemetry;
pub use telemetry::DbTelemetryHandle;
pub use telemetry::install_process_db_telemetry;
pub use telemetry::record_backfill_gate;
pub use telemetry::record_fallback;
/// Environment variable for overriding the SQLite state database home directory.
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
@@ -69,3 +75,9 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error";
pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill";
/// Metrics on backfill duration. Tags: [status]
pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms";
/// SQLite initialization attempts. Tags: [status, phase, db, error]
pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count";
/// SQLite initialization latency. Tags: [status, phase, db, error]
pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms";
/// Rollout fallback attempts. Tags: [caller, reason]
pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count";

View File

@@ -25,6 +25,7 @@ use crate::model::datetime_to_epoch_millis;
use crate::model::datetime_to_epoch_seconds;
use crate::model::epoch_millis_to_datetime;
use crate::paths::file_modified_time_utc;
use crate::telemetry::DbKind;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -50,6 +51,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::time::Duration;
use std::time::Instant;
use tracing::warn;
mod agent_jobs;
@@ -112,10 +114,30 @@ impl StateRuntime {
return Err(err);
}
};
let thread_updated_at_millis: Option<i64> =
let started = Instant::now();
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
crate::telemetry::record_init_result(
None,
DbKind::State,
"ensure_backfill_state",
started.elapsed(),
&backfill_state_result,
);
backfill_state_result?;
let started = Instant::now();
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
.fetch_one(pool.as_ref())
.await?;
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
None,
DbKind::State,
"post_init_query",
started.elapsed(),
&thread_updated_at_millis_result,
);
let thread_updated_at_millis = thread_updated_at_millis_result?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
pool,
@@ -153,25 +175,60 @@ async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<S
// New state DBs should use incremental auto-vacuum, but retrofitting an
// existing DB requires a full VACUUM. Do not attempt that during process
// startup: it is maintenance work that can contend with foreground writers.
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
migrator.run(&pool).await?;
Ok(pool)
open_sqlite(path, migrator, DbKind::State, "open_state", "migrate_state").await
}
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
open_sqlite(path, migrator, DbKind::Logs, "open_logs", "migrate_logs").await
}
async fn open_sqlite(
path: &Path,
migrator: &Migrator,
db: DbKind,
open_phase: &'static str,
migrate_phase: &'static str,
) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
let started = Instant::now();
let pool_result = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
migrator.run(&pool).await?;
.await
.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(None, db, open_phase, started.elapsed(), &pool_result);
let pool = pool_result?;
let started = Instant::now();
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
crate::telemetry::record_init_result(
None,
db,
migrate_phase,
started.elapsed(),
&migrate_result,
);
migrate_result?;
Ok(pool)
}
pub(super) async fn ensure_backfill_state_row_in_pool(
pool: &sqlx::SqlitePool,
) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(pool)
.await?;
Ok(())
}
pub fn state_db_filename() -> String {
STATE_DB_FILENAME.to_string()
}

View File

@@ -103,19 +103,7 @@ WHERE id = 1
}
async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(self.pool.as_ref())
.await?;
Ok(())
ensure_backfill_state_row_in_pool(self.pool.as_ref()).await
}
}

View File

@@ -0,0 +1,193 @@
use std::borrow::Cow;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use crate::DB_FALLBACK_METRIC;
use crate::DB_INIT_DURATION_METRIC;
use crate::DB_INIT_METRIC;
/// Low-cardinality sink for SQLite startup and fallback telemetry.
///
/// Implementations should absorb delivery failures locally. Database behavior
/// must not depend on whether telemetry export succeeds.
pub trait DbTelemetry: Send + Sync + 'static {
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]);
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]);
}
pub type DbTelemetryHandle = Arc<dyn DbTelemetry>;
static PROCESS_DB_TELEMETRY: OnceLock<DbTelemetryHandle> = OnceLock::new();
/// Install the process-wide SQLite telemetry sink.
///
/// Startup owners should call this once after OTEL initialization. Low-level
/// database paths will use the registered sink unless a test passes an
/// explicit sink for that call.
pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool {
PROCESS_DB_TELEMETRY.set(telemetry).is_ok()
}
#[derive(Clone, Copy)]
pub(crate) enum DbKind {
State,
Logs,
}
impl DbKind {
fn as_str(self) -> &'static str {
match self {
Self::State => "state",
Self::Logs => "logs",
}
}
}
pub(crate) fn record_init_result<T>(
telemetry: Option<&dyn DbTelemetry>,
db: DbKind,
phase: &'static str,
duration: Duration,
result: &anyhow::Result<T>,
) {
let outcome = DbOutcomeTags::from_result(result);
let tags = [
("status", outcome.status),
("phase", phase),
("db", db.as_str()),
("error", outcome.error),
];
record_counter(telemetry, DB_INIT_METRIC, &tags);
record_duration(telemetry, DB_INIT_DURATION_METRIC, duration, &tags);
}
pub fn record_backfill_gate(
telemetry: Option<&dyn DbTelemetry>,
duration: Duration,
result: &anyhow::Result<()>,
) {
record_init_result(telemetry, DbKind::State, "backfill_gate", duration, result);
}
pub fn record_fallback(
telemetry: Option<&dyn DbTelemetry>,
caller: &'static str,
reason: &'static str,
) {
record_counter(
telemetry,
DB_FALLBACK_METRIC,
&[("caller", caller), ("reason", reason)],
);
}
fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) {
if let Some(telemetry) = resolve_telemetry(telemetry) {
telemetry.counter(name, /*inc*/ 1, tags);
}
}
fn record_duration(
telemetry: Option<&dyn DbTelemetry>,
name: &str,
duration: Duration,
tags: &[(&str, &str)],
) {
if let Some(telemetry) = resolve_telemetry(telemetry) {
telemetry.record_duration(name, duration, tags);
}
}
fn resolve_telemetry(telemetry: Option<&dyn DbTelemetry>) -> Option<&dyn DbTelemetry> {
telemetry.or_else(|| PROCESS_DB_TELEMETRY.get().map(AsRef::as_ref))
}
struct DbOutcomeTags {
status: &'static str,
error: &'static str,
}
impl DbOutcomeTags {
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
match result {
Ok(_) => Self {
status: "success",
error: "none",
},
Err(err) => Self {
status: "failed",
error: classify_error(err),
},
}
}
}
fn classify_error(err: &anyhow::Error) -> &'static str {
for cause in err.chain() {
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
return classify_sqlx_error(sqlx_err);
}
if cause
.downcast_ref::<sqlx::migrate::MigrateError>()
.is_some()
{
return "migration";
}
if cause.downcast_ref::<serde_json::Error>().is_some() {
return "serde";
}
if cause.downcast_ref::<std::io::Error>().is_some() {
return "io";
}
}
"unknown"
}
fn classify_sqlx_error(err: &sqlx::Error) -> &'static str {
match err {
sqlx::Error::Database(database_error) => {
let code = database_error
.code()
.unwrap_or(Cow::Borrowed("none"))
.to_string();
classify_sqlite_code(code.as_str())
}
sqlx::Error::PoolTimedOut => "pool_timeout",
sqlx::Error::Io(_) => "io",
sqlx::Error::ColumnDecode { source, .. } if source.is::<serde_json::Error>() => "serde",
sqlx::Error::Decode(source) if source.is::<serde_json::Error>() => "serde",
_ => "unknown",
}
}
fn classify_sqlite_code(code: &str) -> &'static str {
// SQLite result codes are documented at https://www.sqlite.org/rescode.html.
// Extended codes preserve the primary code in the low byte.
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
match primary_code {
Some(5) => "busy",
Some(6) => "locked",
Some(8) => "readonly",
Some(10) => "io",
Some(11) => "corrupt",
Some(13) => "full",
Some(14) => "cantopen",
Some(17) => "schema",
Some(19) => "constraint",
_ => "unknown",
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn classifies_extended_sqlite_codes() {
assert_eq!(classify_sqlite_code("5"), "busy");
assert_eq!(classify_sqlite_code("6"), "locked");
assert_eq!(classify_sqlite_code("2067"), "constraint");
}
}

View File

@@ -42,6 +42,7 @@ use codex_config::format_config_error_with_source;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::ExecServerRuntimePaths;
use codex_login::AuthConfig;
use codex_login::default_client::originator;
use codex_login::default_client::set_default_client_residency_requirement;
use codex_login::enforce_login_restrictions;
use codex_protocol::ThreadId;
@@ -892,6 +893,36 @@ pub async fn run_main(
)
.await;
let otel_originator = originator().value;
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
crate::legacy_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
/*service_name_override*/ None,
/*default_analytics_enabled*/ true,
)
})) {
Ok(Ok(otel)) => otel,
Ok(Err(e)) => {
#[allow(clippy::print_stderr)]
{
eprintln!("Could not create otel exporter: {e}");
}
None
}
Err(_) => {
#[allow(clippy::print_stderr)]
{
eprintln!("Could not create otel exporter: panicked during initialization");
}
None
}
};
crate::legacy_core::otel_init::record_process_start(otel.as_ref(), otel_originator.as_str());
crate::legacy_core::otel_init::install_sqlite_telemetry(
otel.as_ref(),
otel_originator.as_str(),
);
let state_db = match &app_server_target {
AppServerTarget::Embedded => state_db::init(&config).await,
AppServerTarget::Remote { .. } => state_db::get_state_db(&config).await,
@@ -1034,31 +1065,6 @@ pub async fn run_main(
ensure_oss_provider_ready(provider_id, &config).await?;
}
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
crate::legacy_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
/*service_name_override*/ None,
/*default_analytics_enabled*/ true,
)
})) {
Ok(Ok(otel)) => otel,
Ok(Err(e)) => {
#[allow(clippy::print_stderr)]
{
eprintln!("Could not create otel exporter: {e}");
}
None
}
Err(_) => {
#[allow(clippy::print_stderr)]
{
eprintln!("Could not create otel exporter: panicked during initialization");
}
None
}
};
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());