app-server: source /feedback logs from sqlite at trace level (#12969)

## Summary
- write app-server SQLite logs at TRACE level when SQLite is enabled
- source app-server `/feedback` log attachments from SQLite for the
requested thread when available
- flush buffered SQLite log writes before `/feedback` queries them so
newly emitted events are not lost behind the async inserter
- include same-process threadless SQLite rows in those `/feedback` logs
so the attachment matches the process-wide feedback buffer more closely
- keep the existing in-memory ring buffer fallback unchanged, including
when the SQLite query returns no rows

## Details
- add a byte-bounded `query_feedback_logs` helper in `codex-state` so
`/feedback` does not fetch all rows before truncating
- scope SQLite feedback logs to the requested thread plus threadless
rows from the same `process_uuid`
- format exported SQLite feedback lines with the log level prefix to
better match the in-memory feedback formatter
- add an explicit `LogDbLayer::flush()` control path and await it in
app-server before querying SQLite for feedback logs
- pass optional SQLite log bytes through `codex-feedback` as the
`codex-logs.log` attachment override
- leave TUI behavior unchanged apart from the updated `upload_feedback`
call signature
- add regression coverage for:
  - newest-within-budget ordering
  - excluding oversized newest rows
  - including same-process threadless rows
  - keeping the newest suffix across mixed thread and threadless rows
  - matching the feedback formatter shape aside from span prefixes
  - falling back to the in-memory snapshot when SQLite returns no logs
  - flushing buffered SQLite rows before querying

## Follow-up
- SQLite feedback exports still do not reproduce span prefixes like
`feedback-thread{thread_id=...}:`; there is a `TODO(ccunningham)` in
`codex-rs/state/src/log_db.rs` for that follow-up.

## Testing
- `cd codex-rs && cargo test -p codex-state`
- `cd codex-rs && cargo test -p codex-app-server`
- `cd codex-rs && just fmt`
This commit is contained in:
Charley Cunningham
2026-03-03 11:17:06 -08:00
committed by GitHub
parent 69df12efb3
commit c4bd0aa3b9
8 changed files with 631 additions and 7 deletions

View File

@@ -26,6 +26,7 @@ use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Event;
use tracing::field::Field;
use tracing::field::Visit;
@@ -45,7 +46,7 @@ const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250);
const LOG_RETENTION_DAYS: i64 = 90;
pub struct LogDbLayer {
sender: mpsc::Sender<LogEntry>,
sender: mpsc::Sender<LogDbCommand>,
process_uuid: String,
}
@@ -61,6 +62,24 @@ pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
}
}
impl Clone for LogDbLayer {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
process_uuid: self.process_uuid.clone(),
}
}
}
impl LogDbLayer {
pub async fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.sender.send(LogDbCommand::Flush(tx)).await.is_ok() {
let _ = rx.await;
}
}
}
impl<S> Layer<S> for LogDbLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
@@ -131,10 +150,15 @@ where
line: metadata.line().map(|line| line as i64),
};
let _ = self.sender.try_send(entry);
let _ = self.sender.try_send(LogDbCommand::Entry(entry));
}
}
enum LogDbCommand {
Entry(LogEntry),
Flush(oneshot::Sender<()>),
}
#[derive(Clone, Debug, Default)]
struct SpanLogContext {
thread_id: Option<String>,
@@ -215,20 +239,24 @@ fn current_process_log_uuid() -> &'static str {
async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogEntry>,
mut receiver: mpsc::Receiver<LogDbCommand>,
) {
let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE);
let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL);
loop {
tokio::select! {
maybe_entry = receiver.recv() => {
match maybe_entry {
Some(entry) => {
maybe_command = receiver.recv() => {
match maybe_command {
Some(LogDbCommand::Entry(entry)) => {
buffer.push(entry);
if buffer.len() >= LOG_BATCH_SIZE {
flush(&state_db, &mut buffer).await;
}
}
Some(LogDbCommand::Flush(reply)) => {
flush(&state_db, &mut buffer).await;
let _ = reply.send(());
}
None => {
flush(&state_db, &mut buffer).await;
break;
@@ -304,3 +332,150 @@ impl Visit for MessageVisitor {
self.record_field(field, format!("{value:?}"));
}
}
#[cfg(test)]
mod tests {
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::time::Instant;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::fmt::writer::MakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use super::*;
#[derive(Clone, Default)]
struct SharedWriter {
bytes: Arc<Mutex<Vec<u8>>>,
}
impl SharedWriter {
fn snapshot(&self) -> String {
String::from_utf8(self.bytes.lock().expect("writer mutex poisoned").clone())
.expect("valid utf-8")
}
}
struct SharedWriterGuard {
bytes: Arc<Mutex<Vec<u8>>>,
}
impl<'a> MakeWriter<'a> for SharedWriter {
type Writer = SharedWriterGuard;
fn make_writer(&'a self) -> Self::Writer {
SharedWriterGuard {
bytes: Arc::clone(&self.bytes),
}
}
}
impl io::Write for SharedWriterGuard {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.bytes
.lock()
.expect("writer mutex poisoned")
.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn sqlite_feedback_logs_match_feedback_formatter_shape() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let writer = SharedWriter::default();
let subscriber = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_writer(writer.clone())
.without_time()
.with_ansi(false)
.with_target(false)
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.with(
start(runtime.clone())
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
);
let guard = subscriber.set_default();
tracing::trace!("threadless-before");
tracing::info_span!("feedback-thread", thread_id = "thread-1").in_scope(|| {
tracing::info!("thread-scoped");
});
tracing::debug!("threadless-after");
drop(guard);
// TODO(ccunningham): Store enough span metadata in SQLite to reproduce span
// prefixes like `feedback-thread{thread_id="thread-1"}:` in feedback exports.
let feedback_logs = writer
.snapshot()
.replace("feedback-thread{thread_id=\"thread-1\"}: ", "");
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let sqlite_logs = String::from_utf8(
runtime
.query_feedback_logs("thread-1")
.await
.expect("query feedback logs"),
)
.expect("valid utf-8");
if sqlite_logs == feedback_logs {
break;
}
assert!(
Instant::now() < deadline,
"sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn flush_persists_logs_for_query() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let layer = start(runtime.clone());
let guard = tracing_subscriber::registry()
.with(
layer
.clone()
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.set_default();
tracing::info!("buffered-log");
layer.flush().await;
drop(guard);
let after_flush = runtime
.query_logs(&crate::LogQuery::default())
.await
.expect("query logs after flush");
assert_eq!(after_flush.len(), 1);
assert_eq!(after_flush[0].message.as_deref(), Some("buffered-log"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}