Refactor log DB into LogWriter interface (#19234)

## Why

This prepares feedback log capture for a future remote app-server hook
sink without changing the current local SQLite upload path. The
important boundary is now intentionally small: a log sink is a tracing
`Layer` that can also flush entries it has accepted.

That keeps the existing SQLite implementation simple while giving the
upcoming gRPC sink a place to fit beside it. SQLite and gRPC have
different worker/write semantics, so this PR avoids introducing a shared
buffered-sink abstraction and instead lets each `LogWriter` own the
buffering mechanics it needs.

## What Changed

- Added `LogSinkQueueConfig` with the existing local defaults: queue
capacity `512`, batch size `128`, and flush interval `2s`.
- Added `LogDbLayer::start_with_config(...)` while preserving
`LogDbLayer::start(...)` and `log_db::start(...)` defaults.
- Introduced the `LogWriter` trait as the minimal shared interface:
`tracing_subscriber::Layer` plus `flush()`.
- Made `LogDbLayer` implement `LogWriter`.
- Kept tracing event formatting inside `LogDbLayer`; it still creates
one `LogEntry` per tracing event before queueing it for SQLite.
- Kept normal event capture best-effort and non-blocking via bounded
`try_send`.

## Behavior Notes

This does not change the SQLite schema, retention behavior,
`/feedback/upload`, or Sentry upload behavior. Normal log events still
drop when the queue is full; explicit `flush()` still waits for queue
capacity and receiver processing before returning.

## Verification

- `cargo test -p codex-state log_db`
- `cargo test -p codex-state`
- `just fix -p codex-state`

The added tests cover configured batch-size flushing, configured
interval flushing, queue-full drops, and the flush barrier semantics.
This commit is contained in:
Rasmus Rygaard
2026-04-24 16:27:39 -07:00
committed by GitHub
parent 32aad7bd13
commit 5378cccd8a

View File

@@ -1,8 +1,9 @@
//! Tracing log export into the state SQLite database.
//! Tracing log export into the local SQLite log database.
//!
//! This module provides a `tracing_subscriber::Layer` that captures events and
//! inserts them into the dedicated `logs` SQLite database. The writer runs in a
//! background task and batches inserts to keep logging overhead low.
//! This module provides a `tracing_subscriber::Layer` that captures events,
//! formats each one into a `LogEntry`, and sends entries to a bounded background
//! queue. The background task inserts into the dedicated `logs` SQLite database
//! in batches to keep logging overhead low.
//!
//! ## Usage
//!
@@ -18,6 +19,7 @@
//! # }
//! ```
use std::future::Future;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::SystemTime;
@@ -45,20 +47,57 @@ use crate::StateRuntime;
const LOG_QUEUE_CAPACITY: usize = 512;
const LOG_BATCH_SIZE: usize = 128;
const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct LogSinkQueueConfig {
pub queue_capacity: usize,
pub batch_size: usize,
pub flush_interval: Duration,
}
impl Default for LogSinkQueueConfig {
fn default() -> Self {
Self {
queue_capacity: LOG_QUEUE_CAPACITY,
batch_size: LOG_BATCH_SIZE,
flush_interval: LOG_FLUSH_INTERVAL,
}
}
}
impl LogSinkQueueConfig {
fn normalized(self) -> Self {
Self {
queue_capacity: self.queue_capacity.max(1),
batch_size: self.batch_size.max(1),
flush_interval: if self.flush_interval.is_zero() {
LOG_FLUSH_INTERVAL
} else {
self.flush_interval
},
}
}
}
/// A tracing log writer that can flush entries accepted by its queue.
///
/// Implementations should keep `Layer::on_event` non-blocking for ordinary log
/// events. `flush` should wait for entries accepted before the flush command to
/// be processed by the writer.
pub trait LogWriter<S>: Layer<S>
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn flush(&self) -> impl Future<Output = ()> + Send + '_;
}
pub struct LogDbLayer {
sender: mpsc::Sender<LogDbCommand>,
process_uuid: String,
}
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let process_uuid = current_process_log_uuid().to_string();
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
LogDbLayer {
sender,
process_uuid,
}
LogDbLayer::start(state_db)
}
impl Clone for LogDbLayer {
@@ -71,12 +110,33 @@ impl Clone for LogDbLayer {
}
impl LogDbLayer {
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> Self {
Self::start_with_config(state_db, LogSinkQueueConfig::default())
}
pub fn start_with_config(
state_db: std::sync::Arc<StateRuntime>,
config: LogSinkQueueConfig,
) -> Self {
let config = config.normalized();
let (sender, receiver) = mpsc::channel(config.queue_capacity);
tokio::spawn(run_inserter(state_db, receiver, config));
Self {
sender,
process_uuid: current_process_log_uuid().to_string(),
}
}
pub async fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.sender.send(LogDbCommand::Flush(tx)).await.is_ok() {
let _ = rx.await;
}
}
fn try_send(&self, entry: LogEntry) {
let _ = self.sender.try_send(LogDbCommand::Entry(Box::new(entry)));
}
}
impl<S> Layer<S> for LogDbLayer
@@ -154,7 +214,16 @@ where
line: metadata.line().map(|line| line as i64),
};
let _ = self.sender.try_send(LogDbCommand::Entry(Box::new(entry)));
self.try_send(entry);
}
}
impl<S> LogWriter<S> for LogDbLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn flush(&self) -> impl Future<Output = ()> + Send + '_ {
LogDbLayer::flush(self)
}
}
@@ -294,16 +363,17 @@ fn current_process_log_uuid() -> &'static str {
async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogDbCommand>,
config: LogSinkQueueConfig,
) {
let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE);
let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL);
let mut buffer = Vec::with_capacity(config.batch_size);
let mut ticker = tokio::time::interval(config.flush_interval);
loop {
tokio::select! {
maybe_command = receiver.recv() => {
match maybe_command {
Some(LogDbCommand::Entry(entry)) => {
buffer.push(*entry);
if buffer.len() >= LOG_BATCH_SIZE {
if buffer.len() >= config.batch_size {
flush(&state_db, &mut buffer).await;
}
}
@@ -324,7 +394,7 @@ async fn run_inserter(
}
}
async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntry>) {
async fn flush(state_db: &StateRuntime, buffer: &mut Vec<LogEntry>) {
if buffer.is_empty() {
return;
}
@@ -393,6 +463,45 @@ mod tests {
use super::*;
fn temp_codex_home() -> std::path::PathBuf {
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()))
}
async fn wait_for_log_count(runtime: &StateRuntime, expected: usize) -> Vec<crate::LogRow> {
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
let rows = runtime
.query_logs(&crate::LogQuery::default())
.await
.expect("query logs");
if rows.len() == expected {
return rows;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for {expected} logs; saw {}",
rows.len()
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
fn test_entry(message: &str) -> LogEntry {
LogEntry {
ts: 1,
ts_nanos: 2,
level: "INFO".to_string(),
target: "test".to_string(),
message: Some(message.to_string()),
feedback_log_body: Some(message.to_string()),
thread_id: Some("thread-1".to_string()),
process_uuid: Some("process-1".to_string()),
module_path: Some("module".to_string()),
file: Some("file.rs".to_string()),
line: Some(7),
}
}
#[derive(Clone, Default)]
struct SharedWriter {
bytes: Arc<Mutex<Vec<u8>>>,
@@ -435,8 +544,7 @@ mod tests {
#[tokio::test]
async fn sqlite_feedback_logs_match_feedback_formatter_shape() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let codex_home = temp_codex_home();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
@@ -494,8 +602,7 @@ mod tests {
#[tokio::test]
async fn flush_persists_logs_for_query() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let codex_home = temp_codex_home();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
@@ -523,4 +630,147 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn configured_batch_size_flushes_without_explicit_flush() {
let codex_home = temp_codex_home();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let layer = LogDbLayer::start_with_config(
runtime.clone(),
LogSinkQueueConfig {
queue_capacity: 8,
batch_size: 2,
flush_interval: std::time::Duration::from_secs(60),
},
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let guard = tracing_subscriber::registry()
.with(
layer
.clone()
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.set_default();
tracing::info!("first-batch-log");
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
assert_eq!(
runtime
.query_logs(&crate::LogQuery::default())
.await
.expect("query logs before batch fills")
.len(),
0
);
tracing::info!("second-batch-log");
let after_batch = wait_for_log_count(&runtime, /*expected*/ 2).await;
drop(guard);
assert_eq!(
after_batch
.iter()
.map(|row| row.message.as_deref())
.collect::<Vec<_>>(),
vec![Some("first-batch-log"), Some("second-batch-log")]
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn configured_flush_interval_persists_buffered_logs() {
let codex_home = temp_codex_home();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let layer = LogDbLayer::start_with_config(
runtime.clone(),
LogSinkQueueConfig {
queue_capacity: 8,
batch_size: 128,
flush_interval: std::time::Duration::from_millis(10),
},
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let guard = tracing_subscriber::registry()
.with(
layer
.clone()
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.set_default();
tracing::info!("interval-log");
let after_interval = wait_for_log_count(&runtime, /*expected*/ 1).await;
drop(guard);
assert_eq!(after_interval[0].message.as_deref(), Some("interval-log"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn event_queue_drops_new_entries_when_full() {
let (sender, mut receiver) = mpsc::channel(1);
let layer = LogDbLayer {
sender,
process_uuid: "process-1".to_string(),
};
layer.try_send(test_entry("first-queued-log"));
layer.try_send(test_entry("dropped-log"));
match receiver.try_recv().expect("first entry queued") {
LogDbCommand::Entry(entry) => {
assert_eq!(entry.message.as_deref(), Some("first-queued-log"));
}
LogDbCommand::Flush(_) => panic!("expected queued entry"),
}
assert!(receiver.try_recv().is_err());
}
#[tokio::test]
async fn flush_waits_for_queue_capacity_and_receiver_processing() {
let (sender, mut receiver) = mpsc::channel(1);
let layer = LogDbLayer {
sender,
process_uuid: "process-1".to_string(),
};
layer.try_send(test_entry("queued-before-flush"));
let mut flush_task = tokio::spawn({
let layer = layer.clone();
async move {
layer.flush().await;
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(!flush_task.is_finished());
match receiver.recv().await.expect("queued entry") {
LogDbCommand::Entry(entry) => {
assert_eq!(entry.message.as_deref(), Some("queued-before-flush"));
}
LogDbCommand::Flush(_) => panic!("expected queued entry"),
}
match receiver.recv().await.expect("flush command") {
LogDbCommand::Flush(reply) => {
assert!(!flush_task.is_finished());
let _ = reply.send(());
}
LogDbCommand::Entry(_) => panic!("expected flush command"),
}
tokio::time::timeout(std::time::Duration::from_secs(1), &mut flush_task)
.await
.expect("flush task completes")
.expect("flush task succeeds");
}
}