//! Tracing log export into the state SQLite database. //! //! This module provides a `tracing_subscriber::Layer` that captures events and //! inserts them into the dedicated `logs` SQLite database. The writer runs in a //! background task and batches inserts to keep logging overhead low. //! //! ## Usage //! //! ```no_run //! use codex_state::log_db; //! use tracing_subscriber::prelude::*; //! //! # async fn example(state_db: std::sync::Arc) { //! let layer = log_db::start(state_db); //! let _ = tracing_subscriber::registry() //! .with(layer) //! .try_init(); //! # } //! ``` use chrono::Duration as ChronoDuration; use chrono::Utc; use std::sync::OnceLock; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tokio::sync::mpsc; use tokio::sync::oneshot; use tracing::Event; use tracing::field::Field; use tracing::field::Visit; use tracing::span::Attributes; use tracing::span::Id; use tracing::span::Record; use tracing_subscriber::Layer; use tracing_subscriber::field::RecordFields; use tracing_subscriber::fmt::FormatFields; use tracing_subscriber::fmt::FormattedFields; use tracing_subscriber::fmt::format::DefaultFields; use tracing_subscriber::registry::LookupSpan; use uuid::Uuid; use crate::LogEntry; use crate::StateRuntime; const LOG_QUEUE_CAPACITY: usize = 512; const LOG_BATCH_SIZE: usize = 128; const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2); const LOG_RETENTION_DAYS: i64 = 10; pub struct LogDbLayer { sender: mpsc::Sender, process_uuid: String, } pub fn start(state_db: std::sync::Arc) -> LogDbLayer { let process_uuid = current_process_log_uuid().to_string(); let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver)); tokio::spawn(run_retention_cleanup(state_db)); LogDbLayer { sender, process_uuid, } } impl Clone for LogDbLayer { fn clone(&self) -> Self { Self { sender: self.sender.clone(), process_uuid: self.process_uuid.clone(), } } } impl LogDbLayer { pub async fn flush(&self) { let (tx, rx) = oneshot::channel(); if self.sender.send(LogDbCommand::Flush(tx)).await.is_ok() { let _ = rx.await; } } } impl Layer for LogDbLayer where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { fn on_new_span( &self, attrs: &Attributes<'_>, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>, ) { let mut visitor = SpanFieldVisitor::default(); attrs.record(&mut visitor); if let Some(span) = ctx.span(id) { span.extensions_mut().insert(SpanLogContext { name: span.metadata().name().to_string(), formatted_fields: format_fields(attrs), thread_id: visitor.thread_id, }); } } fn on_record( &self, id: &Id, values: &Record<'_>, ctx: tracing_subscriber::layer::Context<'_, S>, ) { let mut visitor = SpanFieldVisitor::default(); values.record(&mut visitor); if let Some(span) = ctx.span(id) { let mut extensions = span.extensions_mut(); if let Some(log_context) = extensions.get_mut::() { if let Some(thread_id) = visitor.thread_id { log_context.thread_id = Some(thread_id); } append_fields(&mut log_context.formatted_fields, values); } else { extensions.insert(SpanLogContext { name: span.metadata().name().to_string(), formatted_fields: format_fields(values), thread_id: visitor.thread_id, }); } } } fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { let metadata = event.metadata(); let mut visitor = MessageVisitor::default(); event.record(&mut visitor); let thread_id = visitor .thread_id .clone() .or_else(|| event_thread_id(event, &ctx)); let feedback_log_body = format_feedback_log_body(event, &ctx); let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_else(|_| Duration::from_secs(0)); let entry = LogEntry { ts: now.as_secs() as i64, ts_nanos: now.subsec_nanos() as i64, level: metadata.level().as_str().to_string(), target: metadata.target().to_string(), message: visitor.message, feedback_log_body: Some(feedback_log_body), thread_id, process_uuid: Some(self.process_uuid.clone()), module_path: metadata.module_path().map(ToString::to_string), file: metadata.file().map(ToString::to_string), line: metadata.line().map(|line| line as i64), }; let _ = self.sender.try_send(LogDbCommand::Entry(Box::new(entry))); } } enum LogDbCommand { Entry(Box), Flush(oneshot::Sender<()>), } #[derive(Debug)] struct SpanLogContext { name: String, formatted_fields: String, thread_id: Option, } #[derive(Default)] struct SpanFieldVisitor { thread_id: Option, } impl SpanFieldVisitor { fn record_field(&mut self, field: &Field, value: String) { if field.name() == "thread_id" && self.thread_id.is_none() { self.thread_id = Some(value); } } } impl Visit for SpanFieldVisitor { fn record_i64(&mut self, field: &Field, value: i64) { self.record_field(field, value.to_string()); } fn record_u64(&mut self, field: &Field, value: u64) { self.record_field(field, value.to_string()); } fn record_bool(&mut self, field: &Field, value: bool) { self.record_field(field, value.to_string()); } fn record_f64(&mut self, field: &Field, value: f64) { self.record_field(field, value.to_string()); } fn record_str(&mut self, field: &Field, value: &str) { self.record_field(field, value.to_string()); } fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) { self.record_field(field, value.to_string()); } fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { self.record_field(field, format!("{value:?}")); } } fn event_thread_id( event: &Event<'_>, ctx: &tracing_subscriber::layer::Context<'_, S>, ) -> Option where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { let mut thread_id = None; if let Some(scope) = ctx.event_scope(event) { for span in scope.from_root() { let extensions = span.extensions(); if let Some(log_context) = extensions.get::() && log_context.thread_id.is_some() { thread_id = log_context.thread_id.clone(); } } } thread_id } fn format_feedback_log_body( event: &Event<'_>, ctx: &tracing_subscriber::layer::Context<'_, S>, ) -> String where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { let mut feedback_log_body = String::new(); if let Some(scope) = ctx.event_scope(event) { for span in scope.from_root() { let extensions = span.extensions(); if let Some(log_context) = extensions.get::() { feedback_log_body.push_str(&log_context.name); if !log_context.formatted_fields.is_empty() { feedback_log_body.push('{'); feedback_log_body.push_str(&log_context.formatted_fields); feedback_log_body.push('}'); } } else { feedback_log_body.push_str(span.metadata().name()); } feedback_log_body.push(':'); } if !feedback_log_body.is_empty() { feedback_log_body.push(' '); } } feedback_log_body.push_str(&format_fields(event)); feedback_log_body } fn format_fields(fields: R) -> String where R: RecordFields, { let formatter = DefaultFields::default(); let mut formatted = FormattedFields::::new(String::new()); let _ = formatter.format_fields(formatted.as_writer(), fields); formatted.fields } fn append_fields(fields: &mut String, values: &Record<'_>) { let formatter = DefaultFields::default(); let mut formatted = FormattedFields::::new(std::mem::take(fields)); let _ = formatter.add_fields(&mut formatted, values); *fields = formatted.fields; } fn current_process_log_uuid() -> &'static str { static PROCESS_LOG_UUID: OnceLock = OnceLock::new(); PROCESS_LOG_UUID.get_or_init(|| { let pid = std::process::id(); let process_uuid = Uuid::new_v4(); format!("pid:{pid}:{process_uuid}") }) } async fn run_inserter( state_db: std::sync::Arc, mut receiver: mpsc::Receiver, ) { let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE); let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL); loop { tokio::select! { maybe_command = receiver.recv() => { match maybe_command { Some(LogDbCommand::Entry(entry)) => { buffer.push(*entry); if buffer.len() >= LOG_BATCH_SIZE { flush(&state_db, &mut buffer).await; } } Some(LogDbCommand::Flush(reply)) => { flush(&state_db, &mut buffer).await; let _ = reply.send(()); } None => { flush(&state_db, &mut buffer).await; break; } } } _ = ticker.tick() => { flush(&state_db, &mut buffer).await; } } } } async fn flush(state_db: &std::sync::Arc, buffer: &mut Vec) { if buffer.is_empty() { return; } let entries = buffer.split_off(0); let _ = state_db.insert_logs(entries.as_slice()).await; } async fn run_retention_cleanup(state_db: std::sync::Arc) { let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS)) else { return; }; let _ = state_db.delete_logs_before(cutoff.timestamp()).await; } #[derive(Default)] struct MessageVisitor { message: Option, thread_id: Option, } impl MessageVisitor { fn record_field(&mut self, field: &Field, value: String) { if field.name() == "message" && self.message.is_none() { self.message = Some(value.clone()); } if field.name() == "thread_id" && self.thread_id.is_none() { self.thread_id = Some(value); } } } impl Visit for MessageVisitor { fn record_i64(&mut self, field: &Field, value: i64) { self.record_field(field, value.to_string()); } fn record_u64(&mut self, field: &Field, value: u64) { self.record_field(field, value.to_string()); } fn record_bool(&mut self, field: &Field, value: bool) { self.record_field(field, value.to_string()); } fn record_f64(&mut self, field: &Field, value: f64) { self.record_field(field, value.to_string()); } fn record_str(&mut self, field: &Field, value: &str) { self.record_field(field, value.to_string()); } fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) { self.record_field(field, value.to_string()); } fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { self.record_field(field, format!("{value:?}")); } } #[cfg(test)] mod tests { use std::io; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; use tokio::time::Instant; use tracing_subscriber::filter::Targets; use tracing_subscriber::fmt::writer::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use super::*; #[derive(Clone, Default)] struct SharedWriter { bytes: Arc>>, } impl SharedWriter { fn snapshot(&self) -> String { String::from_utf8(self.bytes.lock().expect("writer mutex poisoned").clone()) .expect("valid utf-8") } } struct SharedWriterGuard { bytes: Arc>>, } impl<'a> MakeWriter<'a> for SharedWriter { type Writer = SharedWriterGuard; fn make_writer(&'a self) -> Self::Writer { SharedWriterGuard { bytes: Arc::clone(&self.bytes), } } } impl io::Write for SharedWriterGuard { fn write(&mut self, buf: &[u8]) -> io::Result { self.bytes .lock() .expect("writer mutex poisoned") .extend_from_slice(buf); Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } #[tokio::test] async fn sqlite_feedback_logs_match_feedback_formatter_shape() { let codex_home = std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let writer = SharedWriter::default(); let subscriber = tracing_subscriber::registry() .with( tracing_subscriber::fmt::layer() .with_writer(writer.clone()) .with_ansi(false) .with_target(false) .with_filter(Targets::new().with_default(tracing::Level::TRACE)), ) .with( start(runtime.clone()) .with_filter(Targets::new().with_default(tracing::Level::TRACE)), ); let guard = subscriber.set_default(); tracing::trace!("threadless-before"); tracing::info_span!("feedback-thread", thread_id = "thread-1", turn = 1).in_scope(|| { tracing::info!(foo = 2, "thread-scoped"); }); tracing::debug!("threadless-after"); drop(guard); let feedback_logs = writer.snapshot(); let without_timestamps = |logs: &str| { logs.lines() .map(|line| match line.split_once(' ') { Some((_, rest)) => rest, None => line, }) .collect::>() .join("\n") }; let deadline = Instant::now() + Duration::from_secs(2); loop { let sqlite_logs = String::from_utf8( runtime .query_feedback_logs("thread-1") .await .expect("query feedback logs"), ) .expect("valid utf-8"); if without_timestamps(&sqlite_logs) == without_timestamps(&feedback_logs) { break; } assert!( Instant::now() < deadline, "sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}" ); tokio::time::sleep(Duration::from_millis(10)).await; } let _ = tokio::fs::remove_dir_all(codex_home).await; } #[tokio::test] async fn flush_persists_logs_for_query() { let codex_home = std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let layer = start(runtime.clone()); let guard = tracing_subscriber::registry() .with( layer .clone() .with_filter(Targets::new().with_default(tracing::Level::TRACE)), ) .set_default(); tracing::info!("buffered-log"); layer.flush().await; drop(guard); let after_flush = runtime .query_logs(&crate::LogQuery::default()) .await .expect("query logs after flush"); assert_eq!(after_flush.len(), 1); assert_eq!(after_flush[0].message.as_deref(), Some("buffered-log")); let _ = tokio::fs::remove_dir_all(codex_home).await; } }