diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index cadbca3dd8..5c0ee59ee5 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -1,5 +1,6 @@ //! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later. +use std::cmp::Reverse; use std::collections::HashSet; use std::fs; use std::fs::File; @@ -561,7 +562,58 @@ impl RolloutRecorder { ) .await { - return Ok(repaired_db_page.into()); + let mut page = + page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); + page = fill_missing_thread_item_metadata_from_state_db( + state_db_ctx.as_deref(), + page, + ) + .await; + + let db_page: ThreadsPage = repaired_db_page.into(); + let db_more_matches_available = db_page.next_cursor.is_some(); + page.num_scanned_files = page + .num_scanned_files + .saturating_add(db_page.num_scanned_files); + let fs_more_matches_available = + page.next_cursor.is_some() || page.reached_scan_cap; + let mut seen_thread_ids = page + .items + .iter() + .filter_map(|item| item.thread_id) + .collect::>(); + page.items.extend(db_page.items.into_iter().filter(|item| { + let Some(thread_id) = item.thread_id else { + return false; + }; + seen_thread_ids.insert(thread_id) + })); + + let mut keyed_items = page + .items + .into_iter() + .filter_map(|item| { + thread_item_sort_key(&item, sort_key).map(|key| (key, item)) + }) + .collect::>(); + match sort_direction { + SortDirection::Asc => keyed_items.sort_by_key(|(key, _)| *key), + SortDirection::Desc => keyed_items.sort_by_key(|(key, _)| Reverse(*key)), + } + + let more_matches_available = fs_more_matches_available + || db_more_matches_available + || keyed_items.len() > page_size; + keyed_items.truncate(page_size); + page.items = keyed_items.into_iter().map(|(_, item)| item).collect(); + page.next_cursor = if more_matches_available { + page.items + .last() + .and_then(|item| cursor_from_thread_item(item, sort_key)) + } else { + None + }; + return Ok(page); } let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); return Ok(fill_missing_thread_item_metadata_from_state_db(