mirror of
https://github.com/openai/codex.git
synced 2026-05-13 17:51:14 +03:00
Compare commits
1 Commits
latest-alp
...
jif/sqlite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d5a5a1248 |
@@ -109,6 +109,7 @@ pub use crate::transport::auth::AppServerWebsocketAuthSettings;
|
||||
pub use crate::transport::auth::WebsocketAuthCliMode;
|
||||
|
||||
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
|
||||
const OTEL_SERVICE_NAME: &str = "codex-app-server";
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum LogFormat {
|
||||
@@ -512,6 +513,20 @@ pub async fn run_main_with_transport_options(
|
||||
}
|
||||
};
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some(OTEL_SERVICE_NAME),
|
||||
default_analytics_enabled,
|
||||
)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("error loading otel config: {e}"),
|
||||
)
|
||||
})?;
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
let state_db_result = rollout_state_db::try_init(&config).await;
|
||||
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
|
||||
let state_db = state_db_result.ok();
|
||||
@@ -591,19 +606,6 @@ pub async fn run_main_with_transport_options(
|
||||
|
||||
let feedback = CodexFeedback::new();
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some("codex-app-server"),
|
||||
default_analytics_enabled,
|
||||
)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("error loading otel config: {e}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||||
// control the log level with `RUST_LOG` and switch to JSON logs with
|
||||
// `LOG_FORMAT=json`.
|
||||
|
||||
@@ -65,7 +65,6 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_chatgpt::workspace_settings;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::thread_store_from_config;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
@@ -298,7 +297,7 @@ impl MessageProcessor {
|
||||
// The thread store is intentionally process-scoped. Config reloads can
|
||||
// affect per-thread behavior, but they must not move newly started,
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
|
||||
@@ -277,7 +277,6 @@ use codex_core::exec::ExecCapturePolicy;
|
||||
use codex_core::exec::ExecExpiration;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec_env::create_env;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::path_utils;
|
||||
#[cfg(test)]
|
||||
use codex_core::read_head_for_summary;
|
||||
|
||||
@@ -107,7 +107,7 @@ impl ThreadGoalRequestProcessor {
|
||||
"ephemeral thread does not support goals: {thread_id}"
|
||||
))
|
||||
})?,
|
||||
None => find_thread_path_by_id_str(
|
||||
None => codex_rollout::find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
self.state_db.as_deref(),
|
||||
@@ -272,7 +272,7 @@ impl ThreadGoalRequestProcessor {
|
||||
"ephemeral thread does not support goals: {thread_id}"
|
||||
))
|
||||
})?,
|
||||
None => find_thread_path_by_id_str(
|
||||
None => codex_rollout::find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
self.state_db.as_deref(),
|
||||
@@ -336,7 +336,7 @@ impl ThreadGoalRequestProcessor {
|
||||
return Ok(state_db);
|
||||
}
|
||||
} else {
|
||||
find_thread_path_by_id_str(
|
||||
codex_rollout::find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
self.state_db.as_deref(),
|
||||
|
||||
@@ -99,3 +99,18 @@ pub fn build_provider(
|
||||
pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool {
|
||||
meta.target().starts_with("codex_otel")
|
||||
}
|
||||
|
||||
pub fn record_process_start(otel: Option<&OtelProvider>, originator: &str) {
|
||||
let Some(metrics) = otel.and_then(OtelProvider::metrics) else {
|
||||
return;
|
||||
};
|
||||
let _ = codex_otel::record_process_start_once(metrics, originator);
|
||||
}
|
||||
|
||||
pub fn install_sqlite_telemetry(otel: Option<&OtelProvider>, originator: &str) {
|
||||
let Some(metrics) = otel.and_then(OtelProvider::metrics) else {
|
||||
return;
|
||||
};
|
||||
let telemetry = codex_rollout::sqlite_telemetry_recorder(metrics.clone(), originator);
|
||||
let _ = codex_state::install_process_db_telemetry(telemetry);
|
||||
}
|
||||
|
||||
@@ -478,6 +478,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
None
|
||||
}
|
||||
};
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), "codex_exec");
|
||||
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), "codex_exec");
|
||||
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
|
||||
|
||||
@@ -74,19 +74,6 @@ pub async fn run_main(
|
||||
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
|
||||
})?;
|
||||
set_default_client_residency_requirement(config.enforce_residency.value());
|
||||
let state_db = codex_core::init_state_db(&config).await;
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_codex_home(
|
||||
config.codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
@@ -99,6 +86,20 @@ pub async fn run_main(
|
||||
format!("error loading otel config: {e}"),
|
||||
)
|
||||
})?;
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
let state_db = codex_core::init_state_db(&config).await;
|
||||
let environment_manager = Arc::new(
|
||||
EnvironmentManager::from_codex_home(
|
||||
config.codex_home.clone(),
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?,
|
||||
)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?,
|
||||
);
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr)
|
||||
|
||||
@@ -5,7 +5,6 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::StateDbHandle;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::thread_store_from_config;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_extension_api::empty_extension_registry;
|
||||
use codex_login::AuthManager;
|
||||
@@ -71,7 +70,7 @@ impl MessageProcessor {
|
||||
environment_manager,
|
||||
empty_extension_registry(),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(config.as_ref(), state_db.clone()),
|
||||
codex_core::thread_store_from_config(config.as_ref(), state_db.clone()),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
/*attestation_provider*/ None,
|
||||
|
||||
@@ -2,6 +2,7 @@ mod client;
|
||||
mod config;
|
||||
mod error;
|
||||
pub(crate) mod names;
|
||||
mod process;
|
||||
pub(crate) mod runtime_metrics;
|
||||
pub(crate) mod tags;
|
||||
pub(crate) mod timer;
|
||||
@@ -13,9 +14,12 @@ pub use crate::metrics::config::MetricsConfig;
|
||||
pub use crate::metrics::config::MetricsExporter;
|
||||
pub use crate::metrics::error::MetricsError;
|
||||
pub use crate::metrics::error::Result;
|
||||
pub use crate::metrics::process::record_process_start_once;
|
||||
pub use names::*;
|
||||
use std::sync::OnceLock;
|
||||
pub use tags::ORIGINATOR_TAG;
|
||||
pub use tags::SessionMetricTagValues;
|
||||
pub use tags::bounded_originator_tag_value;
|
||||
|
||||
static GLOBAL_METRICS: OnceLock<MetricsClient> = OnceLock::new();
|
||||
static GLOBAL_STATSIG_METRICS_SETTINGS: OnceLock<StatsigMetricsSettings> = OnceLock::new();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
|
||||
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
|
||||
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
|
||||
pub const PROCESS_START_METRIC: &str = "codex.process.start";
|
||||
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
|
||||
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
|
||||
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";
|
||||
|
||||
27
codex-rs/otel/src/metrics/process.rs
Normal file
27
codex-rs/otel/src/metrics/process.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use super::client::MetricsClient;
|
||||
use super::error::Result;
|
||||
use super::names::PROCESS_START_METRIC;
|
||||
use super::tags::ORIGINATOR_TAG;
|
||||
use super::tags::bounded_originator_tag_value;
|
||||
|
||||
static PROCESS_START_RECORDED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
/// Record the process start counter at most once for this process.
|
||||
pub fn record_process_start_once(metrics: &MetricsClient, originator: &str) -> Result<bool> {
|
||||
if PROCESS_START_RECORDED
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_err()
|
||||
{
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
metrics.counter(
|
||||
PROCESS_START_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[(ORIGINATOR_TAG, bounded_originator_tag_value(originator))],
|
||||
)?;
|
||||
Ok(true)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::metrics::Result;
|
||||
use crate::metrics::validation::validate_tag_key;
|
||||
use crate::metrics::validation::validate_tag_value;
|
||||
use codex_utils_string::sanitize_metric_tag_value;
|
||||
|
||||
pub const APP_VERSION_TAG: &str = "app.version";
|
||||
pub const AUTH_MODE_TAG: &str = "auth_mode";
|
||||
@@ -9,6 +10,31 @@ pub const ORIGINATOR_TAG: &str = "originator";
|
||||
pub const SERVICE_NAME_TAG: &str = "service_name";
|
||||
pub const SESSION_SOURCE_TAG: &str = "session_source";
|
||||
|
||||
const OTHER_ORIGINATOR_TAG_VALUE: &str = "other";
|
||||
const KNOWN_ORIGINATOR_TAG_VALUES: &[&str] = &[
|
||||
"codex_desktop",
|
||||
"codex-app-server",
|
||||
"codex_mcp_server",
|
||||
"codex_cli_rs",
|
||||
"codex-tui",
|
||||
"codex_vscode",
|
||||
"none",
|
||||
"codex_exec",
|
||||
"codex-cli",
|
||||
"codex_sdk_ts",
|
||||
"codex-app-server-sdk",
|
||||
];
|
||||
|
||||
/// Return a known low-cardinality originator tag value, or `other`.
|
||||
pub fn bounded_originator_tag_value(originator: &str) -> &'static str {
|
||||
let sanitized = sanitize_metric_tag_value(originator);
|
||||
KNOWN_ORIGINATOR_TAG_VALUES
|
||||
.iter()
|
||||
.copied()
|
||||
.find(|known| *known == sanitized.as_str())
|
||||
.unwrap_or(OTHER_ORIGINATOR_TAG_VALUE)
|
||||
}
|
||||
|
||||
pub struct SessionMetricTagValues<'a> {
|
||||
pub auth_mode: Option<&'a str>,
|
||||
pub session_source: &'a str,
|
||||
|
||||
@@ -10,6 +10,7 @@ pub(crate) mod metadata;
|
||||
pub(crate) mod policy;
|
||||
pub(crate) mod recorder;
|
||||
pub(crate) mod session_index;
|
||||
mod sqlite_metrics;
|
||||
pub mod state_db;
|
||||
|
||||
pub(crate) mod default_client {
|
||||
@@ -63,6 +64,7 @@ pub use session_index::find_thread_meta_by_name_str;
|
||||
pub use session_index::find_thread_name_by_id;
|
||||
pub use session_index::find_thread_names_by_ids;
|
||||
pub use state_db::StateDbHandle;
|
||||
pub use state_db::sqlite_telemetry_recorder;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
@@ -1255,47 +1255,57 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
};
|
||||
let thread_id = ThreadId::from_string(id_str).ok();
|
||||
let mut unverified_db_path = None;
|
||||
let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable");
|
||||
if let Some(state_db_ctx) = state_db_ctx
|
||||
&& let Some(thread_id) = thread_id
|
||||
&& let Some(db_path) = state_db::find_rollout_path_by_id(
|
||||
Some(state_db_ctx),
|
||||
thread_id,
|
||||
archived_only,
|
||||
"find_path_query",
|
||||
)
|
||||
.await
|
||||
{
|
||||
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
match read_session_meta_line(&db_path).await {
|
||||
Ok(meta_line) if meta_line.meta.id == thread_id => {
|
||||
return Ok(Some(db_path));
|
||||
}
|
||||
Ok(meta_line) => {
|
||||
match state_db_ctx
|
||||
.find_rollout_path_by_id(thread_id, archived_only)
|
||||
.await
|
||||
{
|
||||
Ok(Some(db_path)) => {
|
||||
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
match read_session_meta_line(&db_path).await {
|
||||
Ok(meta_line) if meta_line.meta.id == thread_id => {
|
||||
return Ok(Some(db_path));
|
||||
}
|
||||
Ok(meta_line) => {
|
||||
tracing::error!(
|
||||
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
|
||||
meta_line.meta.id,
|
||||
db_path.display()
|
||||
);
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
|
||||
);
|
||||
codex_state::record_fallback(None, "find_thread_path", "mismatch");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
|
||||
db_path.display()
|
||||
);
|
||||
unverified_db_path = Some(db_path);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::error!(
|
||||
"state db returned rollout path for thread {id_str} but file belongs to thread {}: {}",
|
||||
meta_line.meta.id,
|
||||
"state db returned stale rollout path for thread {id_str}: {}",
|
||||
db_path.display()
|
||||
);
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: mismatched_db_path"
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
"state db returned rollout path for thread {id_str} that could not be verified: {}: {err}",
|
||||
db_path.display()
|
||||
);
|
||||
unverified_db_path = Some(db_path);
|
||||
codex_state::record_fallback(None, "find_thread_path", "stale_path");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::error!(
|
||||
"state db returned stale rollout path for thread {id_str}: {}",
|
||||
db_path.display()
|
||||
);
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: stale_db_path"
|
||||
);
|
||||
Ok(None) => fallback_reason = Some("missing_row"),
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"state db find_rollout_path_by_id failed during find_path_query: {err}"
|
||||
);
|
||||
fallback_reason = Some("db_error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1323,6 +1333,9 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
tracing::warn!(
|
||||
"state db discrepancy during find_thread_path_by_id_str_in_subdir: falling_back"
|
||||
);
|
||||
if let Some(reason) = fallback_reason {
|
||||
codex_state::record_fallback(None, "find_thread_path", reason);
|
||||
}
|
||||
state_db::read_repair_rollout_path(
|
||||
state_db_ctx,
|
||||
thread_id,
|
||||
|
||||
@@ -450,6 +450,7 @@ impl RolloutRecorder {
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
codex_state::record_fallback(None, "list_threads", "db_unavailable");
|
||||
return Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
@@ -558,6 +559,7 @@ impl RolloutRecorder {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
codex_state::record_fallback(None, "list_threads", "metadata_filter");
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
@@ -569,6 +571,7 @@ impl RolloutRecorder {
|
||||
}
|
||||
if listing_has_metadata_filters {
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
codex_state::record_fallback(None, "list_threads", "db_error");
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
page,
|
||||
@@ -578,6 +581,7 @@ impl RolloutRecorder {
|
||||
// If SQLite listing still fails, return the filesystem page rather than failing the list.
|
||||
tracing::error!("Falling back on rollout system");
|
||||
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
|
||||
codex_state::record_fallback(None, "list_threads", "db_error");
|
||||
Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
@@ -601,6 +605,7 @@ impl RolloutRecorder {
|
||||
) -> std::io::Result<Option<PathBuf>> {
|
||||
let codex_home = config.codex_home();
|
||||
let cwd_filter = filter_cwd.map(Path::to_path_buf);
|
||||
let mut fallback_reason = state_db_ctx.is_none().then_some("db_unavailable");
|
||||
if state_db_ctx.is_some() {
|
||||
let mut db_cursor = cursor.cloned();
|
||||
loop {
|
||||
@@ -619,6 +624,7 @@ impl RolloutRecorder {
|
||||
)
|
||||
.await
|
||||
else {
|
||||
fallback_reason = Some("db_error");
|
||||
break;
|
||||
};
|
||||
if let Some(path) =
|
||||
@@ -628,10 +634,14 @@ impl RolloutRecorder {
|
||||
}
|
||||
db_cursor = db_page.next_anchor.map(Into::into);
|
||||
if db_cursor.is_none() {
|
||||
fallback_reason = Some("missing_row");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(reason) = fallback_reason {
|
||||
codex_state::record_fallback(None, "find_latest_thread_path", reason);
|
||||
}
|
||||
|
||||
let mut cursor = cursor.cloned();
|
||||
loop {
|
||||
|
||||
40
codex-rs/rollout/src/sqlite_metrics.rs
Normal file
40
codex-rs/rollout/src/sqlite_metrics.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_otel::ORIGINATOR_TAG;
|
||||
use codex_otel::bounded_originator_tag_value;
|
||||
use codex_state::DbTelemetry;
|
||||
use codex_state::DbTelemetryHandle;
|
||||
|
||||
struct OtelSqliteTelemetry {
|
||||
metrics: codex_otel::MetricsClient,
|
||||
originator: &'static str,
|
||||
}
|
||||
|
||||
impl DbTelemetry for OtelSqliteTelemetry {
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) {
|
||||
let tags = with_originator(tags, self.originator);
|
||||
let _ = self.metrics.counter(name, inc, &tags);
|
||||
}
|
||||
|
||||
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) {
|
||||
let tags = with_originator(tags, self.originator);
|
||||
let _ = self.metrics.record_duration(name, duration, &tags);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn recorder(metrics: codex_otel::MetricsClient, originator: &str) -> DbTelemetryHandle {
|
||||
Arc::new(OtelSqliteTelemetry {
|
||||
metrics,
|
||||
originator: bounded_originator_tag_value(originator),
|
||||
})
|
||||
}
|
||||
|
||||
fn with_originator<'a>(
|
||||
tags: &[(&'a str, &'a str)],
|
||||
originator: &'static str,
|
||||
) -> Vec<(&'a str, &'a str)> {
|
||||
let mut tags = tags.to_vec();
|
||||
tags.push((ORIGINATOR_TAG, originator));
|
||||
tags
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use crate::list::Cursor;
|
||||
use crate::list::SortDirection;
|
||||
use crate::list::ThreadSortKey;
|
||||
use crate::metadata;
|
||||
use crate::sqlite_metrics;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -115,6 +116,25 @@ async fn try_init_with_roots_inner(
|
||||
sqlite_home.display()
|
||||
)
|
||||
})?;
|
||||
let backfill_gate_started = Instant::now();
|
||||
let backfill_gate_result = wait_for_backfill_gate(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
backfill_lease_seconds,
|
||||
)
|
||||
.await;
|
||||
codex_state::record_backfill_gate(None, backfill_gate_started.elapsed(), &backfill_gate_result);
|
||||
backfill_gate_result?;
|
||||
Ok(runtime)
|
||||
}
|
||||
|
||||
async fn wait_for_backfill_gate(
|
||||
runtime: &codex_state::StateRuntime,
|
||||
codex_home: &Path,
|
||||
default_model_provider_id: &str,
|
||||
backfill_lease_seconds: Option<i64>,
|
||||
) -> anyhow::Result<()> {
|
||||
let wait_started = Instant::now();
|
||||
let mut reported_wait = false;
|
||||
loop {
|
||||
@@ -125,24 +145,19 @@ async fn try_init_with_roots_inner(
|
||||
)
|
||||
})?;
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
return Ok(runtime);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(backfill_lease_seconds) = backfill_lease_seconds {
|
||||
metadata::backfill_sessions_with_lease(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
runtime,
|
||||
codex_home,
|
||||
default_model_provider_id,
|
||||
backfill_lease_seconds,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
metadata::backfill_sessions(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
)
|
||||
.await;
|
||||
metadata::backfill_sessions(runtime, codex_home, default_model_provider_id).await;
|
||||
}
|
||||
let backfill_state = runtime.get_backfill_state().await.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
@@ -151,7 +166,7 @@ async fn try_init_with_roots_inner(
|
||||
)
|
||||
})?;
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
return Ok(runtime);
|
||||
return Ok(());
|
||||
}
|
||||
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -195,17 +210,32 @@ fn emit_startup_warning(message: &str) {
|
||||
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
let state_path = codex_state::state_db_path(config.sqlite_home());
|
||||
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
|
||||
codex_state::record_fallback(None, "get_state_db", "db_unavailable");
|
||||
return None;
|
||||
}
|
||||
let runtime = codex_state::StateRuntime::init(
|
||||
let runtime = match codex_state::StateRuntime::init(
|
||||
config.sqlite_home().to_path_buf(),
|
||||
config.model_provider_id().to_string(),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
{
|
||||
Ok(runtime) => runtime,
|
||||
Err(_) => {
|
||||
codex_state::record_fallback(None, "get_state_db", "db_error");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
require_backfill_complete(runtime, config.sqlite_home()).await
|
||||
}
|
||||
|
||||
/// Build a SQLite telemetry recorder backed by an OTEL metrics client.
|
||||
pub fn sqlite_telemetry_recorder(
|
||||
metrics: codex_otel::MetricsClient,
|
||||
originator: &str,
|
||||
) -> codex_state::DbTelemetryHandle {
|
||||
sqlite_metrics::recorder(metrics, originator)
|
||||
}
|
||||
|
||||
async fn require_backfill_complete(
|
||||
runtime: StateDbHandle,
|
||||
codex_home: &Path,
|
||||
@@ -218,6 +248,7 @@ async fn require_backfill_complete(
|
||||
codex_home.display(),
|
||||
state.status.as_str()
|
||||
);
|
||||
codex_state::record_fallback(None, "get_state_db", "backfill_incomplete");
|
||||
None
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -225,6 +256,7 @@ async fn require_backfill_complete(
|
||||
"failed to read backfill state at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
codex_state::record_fallback(None, "get_state_db", "db_error");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ mod migrations;
|
||||
mod model;
|
||||
mod paths;
|
||||
mod runtime;
|
||||
mod telemetry;
|
||||
|
||||
pub use model::LogEntry;
|
||||
pub use model::LogQuery;
|
||||
@@ -56,6 +57,11 @@ pub use runtime::logs_db_filename;
|
||||
pub use runtime::logs_db_path;
|
||||
pub use runtime::state_db_filename;
|
||||
pub use runtime::state_db_path;
|
||||
pub use telemetry::DbTelemetry;
|
||||
pub use telemetry::DbTelemetryHandle;
|
||||
pub use telemetry::install_process_db_telemetry;
|
||||
pub use telemetry::record_backfill_gate;
|
||||
pub use telemetry::record_fallback;
|
||||
|
||||
/// Environment variable for overriding the SQLite state database home directory.
|
||||
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
@@ -69,3 +75,9 @@ pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill";
|
||||
/// Metrics on backfill duration. Tags: [status]
|
||||
pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms";
|
||||
/// SQLite initialization attempts. Tags: [status, phase, db, error]
|
||||
pub const DB_INIT_METRIC: &str = "codex.sqlite.init.count";
|
||||
/// SQLite initialization latency. Tags: [status, phase, db, error]
|
||||
pub const DB_INIT_DURATION_METRIC: &str = "codex.sqlite.init.duration_ms";
|
||||
/// Rollout fallback attempts. Tags: [caller, reason]
|
||||
pub const DB_FALLBACK_METRIC: &str = "codex.sqlite.fallback.count";
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::model::datetime_to_epoch_millis;
|
||||
use crate::model::datetime_to_epoch_seconds;
|
||||
use crate::model::epoch_millis_to_datetime;
|
||||
use crate::paths::file_modified_time_utc;
|
||||
use crate::telemetry::DbKind;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -50,6 +51,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::warn;
|
||||
|
||||
mod agent_jobs;
|
||||
@@ -112,10 +114,30 @@ impl StateRuntime {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let thread_updated_at_millis: Option<i64> =
|
||||
let started = Instant::now();
|
||||
let backfill_state_result = ensure_backfill_state_row_in_pool(pool.as_ref()).await;
|
||||
crate::telemetry::record_init_result(
|
||||
None,
|
||||
DbKind::State,
|
||||
"ensure_backfill_state",
|
||||
started.elapsed(),
|
||||
&backfill_state_result,
|
||||
);
|
||||
backfill_state_result?;
|
||||
let started = Instant::now();
|
||||
let thread_updated_at_millis_result: anyhow::Result<Option<i64>> =
|
||||
sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads")
|
||||
.fetch_one(pool.as_ref())
|
||||
.await?;
|
||||
.await
|
||||
.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(
|
||||
None,
|
||||
DbKind::State,
|
||||
"post_init_query",
|
||||
started.elapsed(),
|
||||
&thread_updated_at_millis_result,
|
||||
);
|
||||
let thread_updated_at_millis = thread_updated_at_millis_result?;
|
||||
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
|
||||
let runtime = Arc::new(Self {
|
||||
pool,
|
||||
@@ -153,25 +175,60 @@ async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<S
|
||||
// New state DBs should use incremental auto-vacuum, but retrofitting an
|
||||
// existing DB requires a full VACUUM. Do not attempt that during process
|
||||
// startup: it is maintenance work that can contend with foreground writers.
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
migrator.run(&pool).await?;
|
||||
Ok(pool)
|
||||
open_sqlite(path, migrator, DbKind::State, "open_state", "migrate_state").await
|
||||
}
|
||||
|
||||
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
|
||||
open_sqlite(path, migrator, DbKind::Logs, "open_logs", "migrate_logs").await
|
||||
}
|
||||
|
||||
async fn open_sqlite(
|
||||
path: &Path,
|
||||
migrator: &Migrator,
|
||||
db: DbKind,
|
||||
open_phase: &'static str,
|
||||
migrate_phase: &'static str,
|
||||
) -> anyhow::Result<SqlitePool> {
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
let started = Instant::now();
|
||||
let pool_result = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
migrator.run(&pool).await?;
|
||||
.await
|
||||
.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(None, db, open_phase, started.elapsed(), &pool_result);
|
||||
let pool = pool_result?;
|
||||
let started = Instant::now();
|
||||
let migrate_result = migrator.run(&pool).await.map_err(anyhow::Error::from);
|
||||
crate::telemetry::record_init_result(
|
||||
None,
|
||||
db,
|
||||
migrate_phase,
|
||||
started.elapsed(),
|
||||
&migrate_result,
|
||||
);
|
||||
migrate_result?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub(super) async fn ensure_backfill_state_row_in_pool(
|
||||
pool: &sqlx::SqlitePool,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
|
||||
VALUES (?, ?, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(1_i64)
|
||||
.bind(crate::BackfillStatus::Pending.as_str())
|
||||
.bind(Utc::now().timestamp())
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn state_db_filename() -> String {
|
||||
STATE_DB_FILENAME.to_string()
|
||||
}
|
||||
|
||||
@@ -103,19 +103,7 @@ WHERE id = 1
|
||||
}
|
||||
|
||||
async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
|
||||
VALUES (?, ?, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(1_i64)
|
||||
.bind(crate::BackfillStatus::Pending.as_str())
|
||||
.bind(Utc::now().timestamp())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
ensure_backfill_state_row_in_pool(self.pool.as_ref()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
193
codex-rs/state/src/telemetry.rs
Normal file
193
codex-rs/state/src/telemetry.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::DB_FALLBACK_METRIC;
|
||||
use crate::DB_INIT_DURATION_METRIC;
|
||||
use crate::DB_INIT_METRIC;
|
||||
|
||||
/// Low-cardinality sink for SQLite startup and fallback telemetry.
|
||||
///
|
||||
/// Implementations should absorb delivery failures locally. Database behavior
|
||||
/// must not depend on whether telemetry export succeeds.
|
||||
pub trait DbTelemetry: Send + Sync + 'static {
|
||||
fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]);
|
||||
fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]);
|
||||
}
|
||||
|
||||
pub type DbTelemetryHandle = Arc<dyn DbTelemetry>;
|
||||
|
||||
static PROCESS_DB_TELEMETRY: OnceLock<DbTelemetryHandle> = OnceLock::new();
|
||||
|
||||
/// Install the process-wide SQLite telemetry sink.
|
||||
///
|
||||
/// Startup owners should call this once after OTEL initialization. Low-level
|
||||
/// database paths will use the registered sink unless a test passes an
|
||||
/// explicit sink for that call.
|
||||
pub fn install_process_db_telemetry(telemetry: DbTelemetryHandle) -> bool {
|
||||
PROCESS_DB_TELEMETRY.set(telemetry).is_ok()
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum DbKind {
|
||||
State,
|
||||
Logs,
|
||||
}
|
||||
|
||||
impl DbKind {
|
||||
fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::State => "state",
|
||||
Self::Logs => "logs",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn record_init_result<T>(
|
||||
telemetry: Option<&dyn DbTelemetry>,
|
||||
db: DbKind,
|
||||
phase: &'static str,
|
||||
duration: Duration,
|
||||
result: &anyhow::Result<T>,
|
||||
) {
|
||||
let outcome = DbOutcomeTags::from_result(result);
|
||||
let tags = [
|
||||
("status", outcome.status),
|
||||
("phase", phase),
|
||||
("db", db.as_str()),
|
||||
("error", outcome.error),
|
||||
];
|
||||
record_counter(telemetry, DB_INIT_METRIC, &tags);
|
||||
record_duration(telemetry, DB_INIT_DURATION_METRIC, duration, &tags);
|
||||
}
|
||||
|
||||
pub fn record_backfill_gate(
|
||||
telemetry: Option<&dyn DbTelemetry>,
|
||||
duration: Duration,
|
||||
result: &anyhow::Result<()>,
|
||||
) {
|
||||
record_init_result(telemetry, DbKind::State, "backfill_gate", duration, result);
|
||||
}
|
||||
|
||||
pub fn record_fallback(
|
||||
telemetry: Option<&dyn DbTelemetry>,
|
||||
caller: &'static str,
|
||||
reason: &'static str,
|
||||
) {
|
||||
record_counter(
|
||||
telemetry,
|
||||
DB_FALLBACK_METRIC,
|
||||
&[("caller", caller), ("reason", reason)],
|
||||
);
|
||||
}
|
||||
|
||||
fn record_counter(telemetry: Option<&dyn DbTelemetry>, name: &str, tags: &[(&str, &str)]) {
|
||||
if let Some(telemetry) = resolve_telemetry(telemetry) {
|
||||
telemetry.counter(name, /*inc*/ 1, tags);
|
||||
}
|
||||
}
|
||||
|
||||
fn record_duration(
|
||||
telemetry: Option<&dyn DbTelemetry>,
|
||||
name: &str,
|
||||
duration: Duration,
|
||||
tags: &[(&str, &str)],
|
||||
) {
|
||||
if let Some(telemetry) = resolve_telemetry(telemetry) {
|
||||
telemetry.record_duration(name, duration, tags);
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_telemetry(telemetry: Option<&dyn DbTelemetry>) -> Option<&dyn DbTelemetry> {
|
||||
telemetry.or_else(|| PROCESS_DB_TELEMETRY.get().map(AsRef::as_ref))
|
||||
}
|
||||
|
||||
struct DbOutcomeTags {
|
||||
status: &'static str,
|
||||
error: &'static str,
|
||||
}
|
||||
|
||||
impl DbOutcomeTags {
|
||||
fn from_result<T>(result: &anyhow::Result<T>) -> Self {
|
||||
match result {
|
||||
Ok(_) => Self {
|
||||
status: "success",
|
||||
error: "none",
|
||||
},
|
||||
Err(err) => Self {
|
||||
status: "failed",
|
||||
error: classify_error(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_error(err: &anyhow::Error) -> &'static str {
|
||||
for cause in err.chain() {
|
||||
if let Some(sqlx_err) = cause.downcast_ref::<sqlx::Error>() {
|
||||
return classify_sqlx_error(sqlx_err);
|
||||
}
|
||||
if cause
|
||||
.downcast_ref::<sqlx::migrate::MigrateError>()
|
||||
.is_some()
|
||||
{
|
||||
return "migration";
|
||||
}
|
||||
if cause.downcast_ref::<serde_json::Error>().is_some() {
|
||||
return "serde";
|
||||
}
|
||||
if cause.downcast_ref::<std::io::Error>().is_some() {
|
||||
return "io";
|
||||
}
|
||||
}
|
||||
"unknown"
|
||||
}
|
||||
|
||||
fn classify_sqlx_error(err: &sqlx::Error) -> &'static str {
|
||||
match err {
|
||||
sqlx::Error::Database(database_error) => {
|
||||
let code = database_error
|
||||
.code()
|
||||
.unwrap_or(Cow::Borrowed("none"))
|
||||
.to_string();
|
||||
classify_sqlite_code(code.as_str())
|
||||
}
|
||||
sqlx::Error::PoolTimedOut => "pool_timeout",
|
||||
sqlx::Error::Io(_) => "io",
|
||||
sqlx::Error::ColumnDecode { source, .. } if source.is::<serde_json::Error>() => "serde",
|
||||
sqlx::Error::Decode(source) if source.is::<serde_json::Error>() => "serde",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_sqlite_code(code: &str) -> &'static str {
|
||||
// SQLite result codes are documented at https://www.sqlite.org/rescode.html.
|
||||
// Extended codes preserve the primary code in the low byte.
|
||||
let primary_code = code.parse::<i32>().ok().map(|code| code & 0xff);
|
||||
match primary_code {
|
||||
Some(5) => "busy",
|
||||
Some(6) => "locked",
|
||||
Some(8) => "readonly",
|
||||
Some(10) => "io",
|
||||
Some(11) => "corrupt",
|
||||
Some(13) => "full",
|
||||
Some(14) => "cantopen",
|
||||
Some(17) => "schema",
|
||||
Some(19) => "constraint",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn classifies_extended_sqlite_codes() {
|
||||
assert_eq!(classify_sqlite_code("5"), "busy");
|
||||
assert_eq!(classify_sqlite_code("6"), "locked");
|
||||
assert_eq!(classify_sqlite_code("2067"), "constraint");
|
||||
}
|
||||
}
|
||||
@@ -42,6 +42,7 @@ use codex_config::format_config_error_with_source;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_login::AuthConfig;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_login::default_client::set_default_client_residency_requirement;
|
||||
use codex_login::enforce_login_restrictions;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -892,6 +893,36 @@ pub async fn run_main(
|
||||
)
|
||||
.await;
|
||||
|
||||
let otel_originator = originator().value;
|
||||
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
crate::legacy_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
/*service_name_override*/ None,
|
||||
/*default_analytics_enabled*/ true,
|
||||
)
|
||||
})) {
|
||||
Ok(Ok(otel)) => otel,
|
||||
Ok(Err(e)) => {
|
||||
#[allow(clippy::print_stderr)]
|
||||
{
|
||||
eprintln!("Could not create otel exporter: {e}");
|
||||
}
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
#[allow(clippy::print_stderr)]
|
||||
{
|
||||
eprintln!("Could not create otel exporter: panicked during initialization");
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
crate::legacy_core::otel_init::record_process_start(otel.as_ref(), otel_originator.as_str());
|
||||
crate::legacy_core::otel_init::install_sqlite_telemetry(
|
||||
otel.as_ref(),
|
||||
otel_originator.as_str(),
|
||||
);
|
||||
let state_db = match &app_server_target {
|
||||
AppServerTarget::Embedded => state_db::init(&config).await,
|
||||
AppServerTarget::Remote { .. } => state_db::get_state_db(&config).await,
|
||||
@@ -1034,31 +1065,6 @@ pub async fn run_main(
|
||||
ensure_oss_provider_ready(provider_id, &config).await?;
|
||||
}
|
||||
|
||||
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
crate::legacy_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
/*service_name_override*/ None,
|
||||
/*default_analytics_enabled*/ true,
|
||||
)
|
||||
})) {
|
||||
Ok(Ok(otel)) => otel,
|
||||
Ok(Err(e)) => {
|
||||
#[allow(clippy::print_stderr)]
|
||||
{
|
||||
eprintln!("Could not create otel exporter: {e}");
|
||||
}
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
#[allow(clippy::print_stderr)]
|
||||
{
|
||||
eprintln!("Could not create otel exporter: panicked during initialization");
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
|
||||
Reference in New Issue
Block a user