diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 8a4ec05fd3..3efc7140af 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -32,6 +32,7 @@ codex-protocol = { workspace = true } codex-app-server-protocol = { workspace = true } codex-feedback = { workspace = true } codex-rmcp-client = { workspace = true } +codex-state = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-json-to-toml = { workspace = true } chrono = { workspace = true } diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3c1c29ecab..ca9eb374c6 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -274,6 +274,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS; use codex_protocol::user_input::UserInput as CoreInputItem; use codex_rmcp_client::perform_oauth_login_return_url; +use codex_state::log_db::LogDbLayer; use codex_utils_json_to_toml::json_to_toml; use std::collections::HashMap; use std::collections::HashSet; @@ -383,6 +384,7 @@ pub(crate) struct CodexMessageProcessor { pending_fuzzy_searches: Arc>>>, fuzzy_search_sessions: Arc>>, feedback: CodexFeedback, + log_db: Option, } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -417,6 +419,7 @@ pub(crate) struct CodexMessageProcessorArgs { pub(crate) cli_overrides: Vec<(String, TomlValue)>, pub(crate) cloud_requirements: Arc>, pub(crate) feedback: CodexFeedback, + pub(crate) log_db: Option, } impl CodexMessageProcessor { @@ -461,6 +464,7 @@ impl CodexMessageProcessor { cli_overrides, cloud_requirements, feedback, + log_db, } = args; Self { auth_manager, @@ -477,6 +481,7 @@ impl CodexMessageProcessor { pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())), feedback, + log_db, } } @@ -6944,6 +6949,30 @@ impl CodexMessageProcessor { let snapshot = self.feedback.snapshot(conversation_id); let thread_id = snapshot.thread_id.clone(); + let sqlite_feedback_logs = if include_logs { + if let Some(log_db) = self.log_db.as_ref() { + log_db.flush().await; + } + let state_db_ctx = get_state_db(&self.config, None).await; + match (state_db_ctx.as_ref(), conversation_id) { + (Some(state_db_ctx), Some(conversation_id)) => { + let thread_id_text = conversation_id.to_string(); + match state_db_ctx.query_feedback_logs(&thread_id_text).await { + Ok(logs) if logs.is_empty() => None, + Ok(logs) => Some(logs), + Err(err) => { + warn!( + "failed to query feedback logs from sqlite for thread_id={thread_id_text}: {err}" + ); + None + } + } + } + _ => None, + } + } else { + None + }; let validated_rollout_path = if include_logs { match conversation_id { @@ -6967,6 +6996,7 @@ impl CodexMessageProcessor { include_logs, &attachment_paths, Some(session_source), + sqlite_feedback_logs, ) }) .await; diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index a84712ea05..580282d5d3 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -38,16 +38,20 @@ use codex_core::ExecPolicyError; use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; +use codex_core::features::Feature; use codex_feedback::CodexFeedback; +use codex_state::log_db; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use toml::Value as TomlValue; +use tracing::Level; use tracing::error; use tracing::info; use tracing::warn; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; +use tracing_subscriber::filter::Targets; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; use tracing_subscriber::util::SubscriberInitExt; @@ -477,6 +481,21 @@ pub async fn run_main_with_transport( let feedback_layer = feedback.logger_layer(); let feedback_metadata_layer = feedback.metadata_layer(); + let log_db = if config.features.enabled(Feature::Sqlite) { + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + None, + ) + .await + .ok() + .map(log_db::start) + } else { + None + }; + let log_db_layer = log_db + .clone() + .map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE))); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); @@ -484,6 +503,7 @@ pub async fn run_main_with_transport( .with(stderr_fmt) .with(feedback_layer) .with(feedback_metadata_layer) + .with(log_db_layer) .with(otel_logger_layer) .with(otel_tracing_layer) .try_init(); @@ -562,6 +582,7 @@ pub async fn run_main_with_transport( loader_overrides, cloud_requirements: cloud_requirements.clone(), feedback: feedback.clone(), + log_db, config_warnings, }); let mut thread_created_rx = processor.thread_created_receiver(); diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 48f2f69e87..f56e12467f 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -54,6 +54,7 @@ use codex_core::models_manager::collaboration_mode_presets::CollaborationModesCo use codex_feedback::CodexFeedback; use codex_protocol::ThreadId; use codex_protocol::protocol::SessionSource; +use codex_state::log_db::LogDbLayer; use futures::FutureExt; use tokio::sync::broadcast; use tokio::sync::watch; @@ -154,6 +155,7 @@ pub(crate) struct MessageProcessorArgs { pub(crate) loader_overrides: LoaderOverrides, pub(crate) cloud_requirements: CloudRequirementsLoader, pub(crate) feedback: CodexFeedback, + pub(crate) log_db: Option, pub(crate) config_warnings: Vec, } @@ -169,6 +171,7 @@ impl MessageProcessor { loader_overrides, cloud_requirements, feedback, + log_db, config_warnings, } = args; let auth_manager = AuthManager::shared( @@ -201,6 +204,7 @@ impl MessageProcessor { cli_overrides: cli_overrides.clone(), cloud_requirements: cloud_requirements.clone(), feedback, + log_db, }); let config_api = ConfigApi::new( config.codex_home.clone(), diff --git a/codex-rs/feedback/src/lib.rs b/codex-rs/feedback/src/lib.rs index a4fe6cc185..ee8e98c39e 100644 --- a/codex-rs/feedback/src/lib.rs +++ b/codex-rs/feedback/src/lib.rs @@ -226,6 +226,7 @@ impl CodexLogSnapshot { include_logs: bool, extra_log_files: &[PathBuf], session_source: Option, + logs_override: Option>, ) -> Result<()> { use std::collections::BTreeMap; use std::fs; @@ -310,7 +311,7 @@ impl CodexLogSnapshot { if include_logs { envelope.add_item(EnvelopeItem::Attachment(Attachment { - buffer: self.bytes.clone(), + buffer: logs_override.unwrap_or_else(|| self.bytes.clone()), filename: String::from("codex-logs.log"), content_type: Some("text/plain".to_string()), ty: None, diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 64deeef2ad..e646828bda 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -26,6 +26,7 @@ use std::time::SystemTime; use std::time::UNIX_EPOCH; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tracing::Event; use tracing::field::Field; use tracing::field::Visit; @@ -45,7 +46,7 @@ const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250); const LOG_RETENTION_DAYS: i64 = 90; pub struct LogDbLayer { - sender: mpsc::Sender, + sender: mpsc::Sender, process_uuid: String, } @@ -61,6 +62,24 @@ pub fn start(state_db: std::sync::Arc) -> LogDbLayer { } } +impl Clone for LogDbLayer { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + process_uuid: self.process_uuid.clone(), + } + } +} + +impl LogDbLayer { + pub async fn flush(&self) { + let (tx, rx) = oneshot::channel(); + if self.sender.send(LogDbCommand::Flush(tx)).await.is_ok() { + let _ = rx.await; + } + } +} + impl Layer for LogDbLayer where S: tracing::Subscriber + for<'a> LookupSpan<'a>, @@ -131,10 +150,15 @@ where line: metadata.line().map(|line| line as i64), }; - let _ = self.sender.try_send(entry); + let _ = self.sender.try_send(LogDbCommand::Entry(entry)); } } +enum LogDbCommand { + Entry(LogEntry), + Flush(oneshot::Sender<()>), +} + #[derive(Clone, Debug, Default)] struct SpanLogContext { thread_id: Option, @@ -215,20 +239,24 @@ fn current_process_log_uuid() -> &'static str { async fn run_inserter( state_db: std::sync::Arc, - mut receiver: mpsc::Receiver, + mut receiver: mpsc::Receiver, ) { let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE); let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL); loop { tokio::select! { - maybe_entry = receiver.recv() => { - match maybe_entry { - Some(entry) => { + maybe_command = receiver.recv() => { + match maybe_command { + Some(LogDbCommand::Entry(entry)) => { buffer.push(entry); if buffer.len() >= LOG_BATCH_SIZE { flush(&state_db, &mut buffer).await; } } + Some(LogDbCommand::Flush(reply)) => { + flush(&state_db, &mut buffer).await; + let _ = reply.send(()); + } None => { flush(&state_db, &mut buffer).await; break; @@ -304,3 +332,150 @@ impl Visit for MessageVisitor { self.record_field(field, format!("{value:?}")); } } + +#[cfg(test)] +mod tests { + use std::io; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; + + use tokio::time::Instant; + use tracing_subscriber::filter::Targets; + use tracing_subscriber::fmt::writer::MakeWriter; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + use super::*; + + #[derive(Clone, Default)] + struct SharedWriter { + bytes: Arc>>, + } + + impl SharedWriter { + fn snapshot(&self) -> String { + String::from_utf8(self.bytes.lock().expect("writer mutex poisoned").clone()) + .expect("valid utf-8") + } + } + + struct SharedWriterGuard { + bytes: Arc>>, + } + + impl<'a> MakeWriter<'a> for SharedWriter { + type Writer = SharedWriterGuard; + + fn make_writer(&'a self) -> Self::Writer { + SharedWriterGuard { + bytes: Arc::clone(&self.bytes), + } + } + } + + impl io::Write for SharedWriterGuard { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.bytes + .lock() + .expect("writer mutex poisoned") + .extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn sqlite_feedback_logs_match_feedback_formatter_shape() { + let codex_home = + std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let writer = SharedWriter::default(); + + let subscriber = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_writer(writer.clone()) + .without_time() + .with_ansi(false) + .with_target(false) + .with_filter(Targets::new().with_default(tracing::Level::TRACE)), + ) + .with( + start(runtime.clone()) + .with_filter(Targets::new().with_default(tracing::Level::TRACE)), + ); + let guard = subscriber.set_default(); + + tracing::trace!("threadless-before"); + tracing::info_span!("feedback-thread", thread_id = "thread-1").in_scope(|| { + tracing::info!("thread-scoped"); + }); + tracing::debug!("threadless-after"); + + drop(guard); + + // TODO(ccunningham): Store enough span metadata in SQLite to reproduce span + // prefixes like `feedback-thread{thread_id="thread-1"}:` in feedback exports. + let feedback_logs = writer + .snapshot() + .replace("feedback-thread{thread_id=\"thread-1\"}: ", ""); + let deadline = Instant::now() + Duration::from_secs(2); + loop { + let sqlite_logs = String::from_utf8( + runtime + .query_feedback_logs("thread-1") + .await + .expect("query feedback logs"), + ) + .expect("valid utf-8"); + if sqlite_logs == feedback_logs { + break; + } + assert!( + Instant::now() < deadline, + "sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}" + ); + tokio::time::sleep(Duration::from_millis(10)).await; + } + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn flush_persists_logs_for_query() { + let codex_home = + std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let layer = start(runtime.clone()); + + let guard = tracing_subscriber::registry() + .with( + layer + .clone() + .with_filter(Targets::new().with_default(tracing::Level::TRACE)), + ) + .set_default(); + + tracing::info!("buffered-log"); + + layer.flush().await; + drop(guard); + + let after_flush = runtime + .query_logs(&crate::LogQuery::default()) + .await + .expect("query logs after flush"); + assert_eq!(after_flush.len(), 1); + assert_eq!(after_flush[0].message.as_deref(), Some("buffered-log")); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } +} diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index 762f68f1a9..2ee8940afd 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -285,6 +285,67 @@ WHERE id IN ( Ok(rows) } + /// Query per-thread feedback logs, capped to the per-thread SQLite retention budget. + pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result> { + let max_bytes = LOG_PARTITION_SIZE_LIMIT_BYTES; + let lines = sqlx::query_scalar::<_, String>( + r#" +WITH latest_process AS ( + SELECT process_uuid + FROM logs + WHERE thread_id = ? AND process_uuid IS NOT NULL + ORDER BY ts DESC, ts_nanos DESC, id DESC + LIMIT 1 +), +feedback_logs AS ( + SELECT + printf('%5s %s', level, message) || CASE + WHEN substr(message, -1, 1) = char(10) THEN '' + ELSE char(10) + END AS line, + length(CAST( + printf('%5s %s', level, message) || CASE + WHEN substr(message, -1, 1) = char(10) THEN '' + ELSE char(10) + END AS BLOB + )) AS line_bytes, + ts, + ts_nanos, + id + FROM logs + WHERE message IS NOT NULL AND ( + thread_id = ? + OR ( + thread_id IS NULL + AND process_uuid IN (SELECT process_uuid FROM latest_process) + ) + ) +) +SELECT line +FROM ( + SELECT + line, + ts, + ts_nanos, + id, + SUM(line_bytes) OVER ( + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS cumulative_bytes + FROM feedback_logs +) +WHERE cumulative_bytes <= ? +ORDER BY ts ASC, ts_nanos ASC, id ASC +"#, + ) + .bind(thread_id) + .bind(thread_id) + .bind(max_bytes) + .fetch_all(self.pool.as_ref()) + .await?; + + Ok(lines.concat().into_bytes()) + } + /// Return the max log id matching optional filters. pub async fn max_log_id(&self, query: &LogQuery) -> anyhow::Result { let mut builder = @@ -712,4 +773,334 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + + #[tokio::test] + async fn query_feedback_logs_returns_newest_lines_within_limit_in_order() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("alpha".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("bravo".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("charlie".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + ]) + .await + .expect("insert test logs"); + + let bytes = runtime + .query_feedback_logs("thread-1") + .await + .expect("query feedback logs"); + + assert_eq!( + String::from_utf8(bytes).expect("valid utf-8"), + " INFO alpha\n INFO bravo\n INFO charlie\n" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn query_feedback_logs_excludes_oversized_newest_row() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let eleven_mebibytes = "z".repeat(11 * 1024 * 1024); + + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("small".to_string()), + thread_id: Some("thread-oversized".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(eleven_mebibytes), + thread_id: Some("thread-oversized".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + ]) + .await + .expect("insert test logs"); + + let bytes = runtime + .query_feedback_logs("thread-oversized") + .await + .expect("query feedback logs"); + + assert_eq!(bytes, Vec::::new()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn query_feedback_logs_includes_threadless_rows_from_same_process() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("threadless-before".to_string()), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("thread-scoped".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("threadless-after".to_string()), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 4, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("other-process-threadless".to_string()), + thread_id: None, + process_uuid: Some("proc-2".to_string()), + file: None, + line: None, + module_path: None, + }, + ]) + .await + .expect("insert test logs"); + + let bytes = runtime + .query_feedback_logs("thread-1") + .await + .expect("query feedback logs"); + + assert_eq!( + String::from_utf8(bytes).expect("valid utf-8"), + " INFO threadless-before\n INFO thread-scoped\n INFO threadless-after\n" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn query_feedback_logs_excludes_threadless_rows_from_prior_processes() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("old-process-threadless".to_string()), + thread_id: None, + process_uuid: Some("proc-old".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("old-process-thread".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-old".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("new-process-thread".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-new".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 4, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("new-process-threadless".to_string()), + thread_id: None, + process_uuid: Some("proc-new".to_string()), + file: None, + line: None, + module_path: None, + }, + ]) + .await + .expect("insert test logs"); + + let bytes = runtime + .query_feedback_logs("thread-1") + .await + .expect("query feedback logs"); + + assert_eq!( + String::from_utf8(bytes).expect("valid utf-8"), + " INFO old-process-thread\n INFO new-process-thread\n INFO new-process-threadless\n" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn query_feedback_logs_keeps_newest_suffix_across_thread_and_threadless_logs() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let thread_marker = "thread-scoped-oldest"; + let threadless_older_marker = "threadless-older"; + let threadless_newer_marker = "threadless-newer"; + let five_mebibytes = format!("{threadless_older_marker} {}", "a".repeat(5 * 1024 * 1024)); + let four_and_half_mebibytes = format!( + "{threadless_newer_marker} {}", + "b".repeat((9 * 1024 * 1024) / 2) + ); + let one_mebibyte = format!("{thread_marker} {}", "c".repeat(1024 * 1024)); + + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(one_mebibyte.clone()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(five_mebibytes), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(four_and_half_mebibytes), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: None, + line: None, + module_path: None, + }, + ]) + .await + .expect("insert test logs"); + + let bytes = runtime + .query_feedback_logs("thread-1") + .await + .expect("query feedback logs"); + let logs = String::from_utf8(bytes).expect("valid utf-8"); + + assert!(!logs.contains(thread_marker)); + assert!(logs.contains(threadless_older_marker)); + assert!(logs.contains(threadless_newer_marker)); + assert_eq!(logs.matches('\n').count(), 2); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } } diff --git a/codex-rs/tui/src/bottom_pane/feedback_view.rs b/codex-rs/tui/src/bottom_pane/feedback_view.rs index 4144c87f97..33368faa45 100644 --- a/codex-rs/tui/src/bottom_pane/feedback_view.rs +++ b/codex-rs/tui/src/bottom_pane/feedback_view.rs @@ -102,6 +102,7 @@ impl FeedbackNoteView { self.include_logs, &log_file_paths, Some(SessionSource::Cli), + None, ); match result {