mirror of
https://github.com/openai/codex.git
synced 2026-05-04 13:21:54 +03:00
chore: merge name and title (#17116)
Merge title and name concept to leverage the sqlite title column and have more efficient queries --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -54,10 +54,11 @@ pub use policy::EventPersistenceMode;
|
||||
pub use policy::should_persist_response_item_for_memories;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::append_rollout_item_to_path;
|
||||
pub use session_index::append_thread_name;
|
||||
pub use session_index::find_thread_meta_by_name_str;
|
||||
pub use session_index::find_thread_name_by_id;
|
||||
pub use session_index::find_thread_names_by_ids;
|
||||
pub use session_index::find_thread_path_by_name_str;
|
||||
pub use state_db::StateDbHandle;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -97,6 +97,7 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::ContextCompacted(_)
|
||||
| EventMsg::EnteredReviewMode(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
@@ -142,7 +143,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
| EventMsg::AgentReasoningSectionBreak(_)
|
||||
| EventMsg::RawResponseItem(_)
|
||||
| EventMsg::SessionConfigured(_)
|
||||
| EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::WebSearchBegin(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
@@ -188,8 +188,10 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
mod tests {
|
||||
use super::EventPersistenceMode;
|
||||
use super::should_persist_event_msg;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ImageGenerationEndEvent;
|
||||
use codex_protocol::protocol::ThreadNameUpdatedEvent;
|
||||
|
||||
#[test]
|
||||
fn persists_image_generation_end_events_in_limited_mode() {
|
||||
@@ -206,4 +208,17 @@ mod tests {
|
||||
EventPersistenceMode::Limited
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persists_thread_name_updates_in_limited_mode() {
|
||||
let event = EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
|
||||
thread_id: ThreadId::new(),
|
||||
thread_name: Some("saved-session".to_string()),
|
||||
});
|
||||
|
||||
assert!(should_persist_event_msg(
|
||||
&event,
|
||||
EventPersistenceMode::Limited
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,6 +225,26 @@ impl RolloutRecorder {
|
||||
search_term: Option<&str>,
|
||||
) -> std::io::Result<ThreadsPage> {
|
||||
let codex_home = config.codex_home();
|
||||
let state_db_ctx = state_db::get_state_db(config).await;
|
||||
|
||||
if search_term.is_some()
|
||||
&& let Some(db_page) = state_db::list_threads_db(
|
||||
state_db_ctx.as_deref(),
|
||||
codex_home,
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
.await
|
||||
&& (!db_page.items.is_empty() || cursor.is_some())
|
||||
{
|
||||
return Ok(db_page.into());
|
||||
}
|
||||
|
||||
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
|
||||
// SQLite rollout paths before the final DB-backed page is returned.
|
||||
let fs_page_size = page_size.saturating_mul(2).max(page_size);
|
||||
@@ -256,7 +276,6 @@ impl RolloutRecorder {
|
||||
.await?
|
||||
};
|
||||
|
||||
let state_db_ctx = state_db::get_state_db(config).await;
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
@@ -951,6 +970,23 @@ async fn sync_thread_state_after_write(
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Append one already-filtered rollout item to an existing rollout JSONL file.
|
||||
///
|
||||
/// This is for metadata updates to unloaded threads. Live sessions should use
|
||||
/// `RolloutRecorder::record_items` so rollout and SQLite updates remain ordered
|
||||
/// with the rest of the session stream.
|
||||
pub async fn append_rollout_item_to_path(
|
||||
rollout_path: &Path,
|
||||
item: &RolloutItem,
|
||||
) -> std::io::Result<()> {
|
||||
let file = tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(rollout_path)
|
||||
.await?;
|
||||
let mut writer = JsonlWriter { file };
|
||||
writer.write_rollout_item(item).await
|
||||
}
|
||||
|
||||
struct JsonlWriter {
|
||||
file: tokio::fs::File,
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
@@ -111,12 +112,12 @@ pub async fn find_thread_names_by_ids(
|
||||
Ok(names)
|
||||
}
|
||||
|
||||
/// Locate a recorded thread rollout file by thread name using newest-first ordering.
|
||||
/// Returns `Ok(Some(path))` if found, `Ok(None)` if not present.
|
||||
pub async fn find_thread_path_by_name_str(
|
||||
/// Locate a recorded thread rollout and read its session metadata by thread name.
|
||||
/// Returns the newest indexed name that still has a readable rollout header.
|
||||
pub async fn find_thread_meta_by_name_str(
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> std::io::Result<Option<PathBuf>> {
|
||||
) -> std::io::Result<Option<(PathBuf, SessionMetaLine)>> {
|
||||
if name.trim().is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -136,11 +137,11 @@ pub async fn find_thread_path_by_name_str(
|
||||
// rename cannot shadow an older persisted session with the same name.
|
||||
if let Some(path) =
|
||||
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await?
|
||||
&& super::list::read_session_meta_line(&path).await.is_ok()
|
||||
&& let Ok(session_meta) = super::list::read_session_meta_line(&path).await
|
||||
{
|
||||
drop(rx);
|
||||
scan.await.map_err(std::io::Error::other)??;
|
||||
return Ok(Some(path));
|
||||
return Ok(Some((path, session_meta)));
|
||||
}
|
||||
}
|
||||
scan.await.map_err(std::io::Error::other)??;
|
||||
|
||||
@@ -73,7 +73,7 @@ fn find_thread_id_by_name_prefers_latest_entry() -> std::io::Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_skips_newest_entry_without_rollout() -> std::io::Result<()> {
|
||||
async fn find_thread_meta_by_name_str_skips_newest_entry_without_rollout() -> std::io::Result<()> {
|
||||
// A newer unsaved name entry should not shadow an older persisted rollout with the same name.
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
@@ -99,14 +99,17 @@ async fn find_thread_path_by_name_str_skips_newest_entry_without_rollout() -> st
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(saved_rollout_path));
|
||||
assert_eq!(
|
||||
found.map(|(path, session_meta)| (path, session_meta.meta.id)),
|
||||
Some((saved_rollout_path, saved_id))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_skips_partial_rollout() -> std::io::Result<()> {
|
||||
async fn find_thread_meta_by_name_str_skips_partial_rollout() -> std::io::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
let saved_id = ThreadId::new();
|
||||
@@ -133,14 +136,14 @@ async fn find_thread_path_by_name_str_skips_partial_rollout() -> std::io::Result
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(saved_rollout_path));
|
||||
assert_eq!(found.map(|(path, _)| path), Some(saved_rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_ignores_historical_name_after_rename() -> std::io::Result<()>
|
||||
async fn find_thread_meta_by_name_str_ignores_historical_name_after_rename() -> std::io::Result<()>
|
||||
{
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
@@ -171,9 +174,9 @@ async fn find_thread_path_by_name_str_ignores_historical_name_after_rename() ->
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(current_rollout_path));
|
||||
assert_eq!(found.map(|(path, _)| path), Some(current_rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user