feat: add multi-thread log query (#15776)

Required for multi-agent v2
This commit is contained in:
jif-oai
2026-03-25 16:30:04 +00:00
committed by GitHub
parent 14c35a16a8
commit 303d0190c5

View File

@@ -313,28 +313,46 @@ WHERE id IN (
Ok(rows)
}
/// Query per-thread feedback logs, capped to the per-thread SQLite retention budget.
pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result<Vec<u8>> {
/// Query feedback logs for a set of threads, capped to the SQLite retention budget.
pub async fn query_feedback_logs_for_threads(
&self,
thread_ids: &[&str],
) -> anyhow::Result<Vec<u8>> {
if thread_ids.is_empty() {
return Ok(Vec::new());
}
let max_bytes = usize::try_from(LOG_PARTITION_SIZE_LIMIT_BYTES).unwrap_or(usize::MAX);
// Bound the fetched rows in SQL first so over-retained partitions do not have to load
// every row into memory, then apply the exact whole-line byte cap after formatting.
let rows = sqlx::query_as::<_, FeedbackLogRow>(
let requested_threads = vec!["(?)"; thread_ids.len()].join(", ");
let query = format!(
r#"
WITH latest_process AS (
SELECT process_uuid
FROM logs
WHERE thread_id = ? AND process_uuid IS NOT NULL
ORDER BY ts DESC, ts_nanos DESC, id DESC
LIMIT 1
WITH requested_threads(thread_id) AS (
VALUES {requested_threads}
),
latest_processes AS (
SELECT (
SELECT process_uuid
FROM logs
WHERE logs.thread_id = requested_threads.thread_id AND process_uuid IS NOT NULL
ORDER BY ts DESC, ts_nanos DESC, id DESC
LIMIT 1
) AS process_uuid
FROM requested_threads
),
feedback_logs AS (
SELECT ts, ts_nanos, level, feedback_log_body, estimated_bytes, id
FROM logs
WHERE feedback_log_body IS NOT NULL AND (
thread_id = ?
thread_id IN (SELECT thread_id FROM requested_threads)
OR (
thread_id IS NULL
AND process_uuid IN (SELECT process_uuid FROM latest_process)
AND process_uuid IN (
SELECT process_uuid
FROM latest_processes
WHERE process_uuid IS NOT NULL
)
)
)
),
@@ -354,13 +372,16 @@ SELECT ts, ts_nanos, level, feedback_log_body
FROM bounded_feedback_logs
WHERE cumulative_estimated_bytes <= ?
ORDER BY ts DESC, ts_nanos DESC, id DESC
"#,
)
.bind(thread_id)
.bind(thread_id)
.bind(LOG_PARTITION_SIZE_LIMIT_BYTES)
.fetch_all(self.logs_pool.as_ref())
.await?;
"#
);
let mut sql = sqlx::query_as::<_, FeedbackLogRow>(query.as_str());
for thread_id in thread_ids {
sql = sql.bind(thread_id);
}
let rows = sql
.bind(LOG_PARTITION_SIZE_LIMIT_BYTES)
.fetch_all(self.logs_pool.as_ref())
.await?;
let mut lines = Vec::new();
let mut total_bytes = 0usize;
@@ -382,6 +403,11 @@ ORDER BY ts DESC, ts_nanos DESC, id DESC
Ok(ordered_bytes)
}
/// Query per-thread feedback logs, capped to the per-thread SQLite retention budget.
pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result<Vec<u8>> {
self.query_feedback_logs_for_threads(&[thread_id]).await
}
/// Return the max log id matching optional filters.
pub async fn max_log_id(&self, query: &LogQuery) -> anyhow::Result<i64> {
let mut builder =
@@ -1521,4 +1547,131 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn query_feedback_logs_for_threads_merges_requested_threads_and_threadless_rows() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
runtime
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("thread-1".to_string()),
feedback_log_body: None,
thread_id: Some("thread-1".to_string()),
process_uuid: Some("proc-1".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("thread-2".to_string()),
feedback_log_body: None,
thread_id: Some("thread-2".to_string()),
process_uuid: Some("proc-2".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 3,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("threadless-proc-1".to_string()),
feedback_log_body: None,
thread_id: None,
process_uuid: Some("proc-1".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 4,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("threadless-proc-2".to_string()),
feedback_log_body: None,
thread_id: None,
process_uuid: Some("proc-2".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 5,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("thread-3".to_string()),
feedback_log_body: None,
thread_id: Some("thread-3".to_string()),
process_uuid: Some("proc-3".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 6,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("threadless-proc-3".to_string()),
feedback_log_body: None,
thread_id: None,
process_uuid: Some("proc-3".to_string()),
file: None,
line: None,
module_path: None,
},
])
.await
.expect("insert test logs");
let bytes = runtime
.query_feedback_logs_for_threads(&["thread-1", "thread-2"])
.await
.expect("query feedback logs");
assert_eq!(
String::from_utf8(bytes).expect("valid utf-8"),
[
format_feedback_log_line(1, 0, "INFO", "thread-1"),
format_feedback_log_line(2, 0, "INFO", "thread-2"),
format_feedback_log_line(3, 0, "INFO", "threadless-proc-1"),
format_feedback_log_line(4, 0, "INFO", "threadless-proc-2"),
]
.concat()
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn query_feedback_logs_for_threads_returns_empty_for_empty_thread_list() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let bytes = runtime
.query_feedback_logs_for_threads(&[])
.await
.expect("query feedback logs");
assert_eq!(bytes, Vec::<u8>::new());
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}