mirror of
https://github.com/openai/codex.git
synced 2026-05-06 06:12:59 +03:00
codex: preserve thread list ordering with DB previews (#20800)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
|
||||
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
@@ -561,7 +562,58 @@ impl RolloutRecorder {
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Ok(repaired_db_page.into());
|
||||
let mut page =
|
||||
page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
page = fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
page,
|
||||
)
|
||||
.await;
|
||||
|
||||
let db_page: ThreadsPage = repaired_db_page.into();
|
||||
let db_more_matches_available = db_page.next_cursor.is_some();
|
||||
page.num_scanned_files = page
|
||||
.num_scanned_files
|
||||
.saturating_add(db_page.num_scanned_files);
|
||||
let fs_more_matches_available =
|
||||
page.next_cursor.is_some() || page.reached_scan_cap;
|
||||
let mut seen_thread_ids = page
|
||||
.items
|
||||
.iter()
|
||||
.filter_map(|item| item.thread_id)
|
||||
.collect::<HashSet<_>>();
|
||||
page.items.extend(db_page.items.into_iter().filter(|item| {
|
||||
let Some(thread_id) = item.thread_id else {
|
||||
return false;
|
||||
};
|
||||
seen_thread_ids.insert(thread_id)
|
||||
}));
|
||||
|
||||
let mut keyed_items = page
|
||||
.items
|
||||
.into_iter()
|
||||
.filter_map(|item| {
|
||||
thread_item_sort_key(&item, sort_key).map(|key| (key, item))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
match sort_direction {
|
||||
SortDirection::Asc => keyed_items.sort_by_key(|(key, _)| *key),
|
||||
SortDirection::Desc => keyed_items.sort_by_key(|(key, _)| Reverse(*key)),
|
||||
}
|
||||
|
||||
let more_matches_available = fs_more_matches_available
|
||||
|| db_more_matches_available
|
||||
|| keyed_items.len() > page_size;
|
||||
keyed_items.truncate(page_size);
|
||||
page.items = keyed_items.into_iter().map(|(_, item)| item).collect();
|
||||
page.next_cursor = if more_matches_available {
|
||||
page.items
|
||||
.last()
|
||||
.and_then(|item| cursor_from_thread_item(item, sort_key))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
return Ok(page);
|
||||
}
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
|
||||
Reference in New Issue
Block a user