diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 5e54b772c1..68e0ec092f 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1461,6 +1461,7 @@ dependencies = [ "codex-sandboxing", "codex-shell-command", "codex-state", + "codex-thread-store", "codex-tools", "codex-utils-absolute-path", "codex-utils-cargo-bin", @@ -1938,6 +1939,7 @@ dependencies = [ "codex-state", "codex-terminal-detection", "codex-test-binary-support", + "codex-thread-store", "codex-tools", "codex-utils-absolute-path", "codex-utils-cache", @@ -2841,9 +2843,17 @@ version = "0.0.0" dependencies = [ "async-trait", "chrono", + "codex-git-utils", "codex-protocol", + "codex-rollout", + "codex-state", + "pretty_assertions", "serde", + "serde_json", + "tempfile", "thiserror 2.0.18", + "tokio", + "uuid", ] [[package]] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index b57a464d5e..4bb8837211 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -166,6 +166,7 @@ codex-state = { path = "state" } codex-stdio-to-uds = { path = "stdio-to-uds" } codex-terminal-detection = { path = "terminal-detection" } codex-test-binary-support = { path = "test-binary-support" } +codex-thread-store = { path = "thread-store" } codex-tools = { path = "tools" } codex-tui = { path = "tui" } codex-utils-absolute-path = { path = "utils/absolute-path" } diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 5dc3a31485..408f39fafe 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -54,6 +54,7 @@ codex-rmcp-client = { workspace = true } codex-rollout = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } +codex-thread-store = { workspace = true } codex-tools = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-json-to-toml = { workspace = true } diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 513837c7ce..05ce2d7974 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -203,7 +203,6 @@ use codex_chatgpt::connectors; use codex_cloud_requirements::cloud_requirements_loader; use codex_config::types::McpServerTransportConfig; use codex_core::CodexThread; -use codex_core::Cursor as RolloutCursor; use codex_core::ForkSnapshot; use codex_core::NewThread; use codex_core::RolloutRecorder; @@ -211,7 +210,6 @@ use codex_core::SessionMeta; use codex_core::SteerInputError; use codex_core::ThreadConfigSnapshot; use codex_core::ThreadManager; -use codex_core::ThreadSortKey as CoreThreadSortKey; use codex_core::append_thread_name; use codex_core::config::Config; use codex_core::config::ConfigOverrides; @@ -232,7 +230,6 @@ use codex_core::find_archived_thread_path_by_id_str; use codex_core::find_thread_name_by_id; use codex_core::find_thread_names_by_ids; use codex_core::find_thread_path_by_id_str; -use codex_core::parse_cursor; use codex_core::path_utils; use codex_core::plugins::MarketplaceAddError; use codex_core::plugins::MarketplaceError; @@ -322,6 +319,12 @@ use codex_state::StateRuntime; use codex_state::ThreadMetadata; use codex_state::ThreadMetadataBuilder; use codex_state::log_db::LogDbLayer; +use codex_thread_store::ListThreadsParams as StoreListThreadsParams; +use codex_thread_store::LocalThreadStore; +use codex_thread_store::StoredThread; +use codex_thread_store::ThreadSortKey as StoreThreadSortKey; +use codex_thread_store::ThreadStore; +use codex_thread_store::ThreadStoreError; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_json_to_toml::json_to_toml; use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; @@ -3819,15 +3822,15 @@ impl CodexMessageProcessor { .map(|value| value as usize) .unwrap_or(THREAD_LIST_DEFAULT_LIMIT) .clamp(1, THREAD_LIST_MAX_LIMIT); - let core_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { - ThreadSortKey::CreatedAt => CoreThreadSortKey::CreatedAt, - ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt, + let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { + ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt, + ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt, }; let (summaries, next_cursor) = match self .list_threads_common( requested_page_size, cursor, - core_sort_key, + store_sort_key, ThreadListFilters { model_providers, source_kinds, @@ -5071,7 +5074,7 @@ impl CodexMessageProcessor { &self, requested_page_size: usize, cursor: Option, - sort_key: CoreThreadSortKey, + sort_key: StoreThreadSortKey, filters: ThreadListFilters, ) -> Result<(Vec, Option), JSONRPCErrorError> { let ThreadListFilters { @@ -5081,16 +5084,7 @@ impl CodexMessageProcessor { cwd, search_term, } = filters; - let mut cursor_obj: Option = match cursor.as_ref() { - Some(cursor_str) => { - Some(parse_cursor(cursor_str).ok_or_else(|| JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid cursor: {cursor_str}"), - data: None, - })?) - } - None => None, - }; + let mut cursor_obj = cursor; let mut last_cursor = cursor_obj.clone(); let mut remaining = requested_page_size; let mut items = Vec::with_capacity(requested_page_size); @@ -5109,54 +5103,26 @@ impl CodexMessageProcessor { let fallback_provider = self.config.model_provider_id.clone(); let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); - let state_db_ctx = get_state_db(&self.config).await; + let store = LocalThreadStore::new(codex_rollout::RolloutConfig::from_view(&self.config)); while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); - let page = if archived { - RolloutRecorder::list_archived_threads( - &self.config, + let page = store + .list_threads(StoreListThreadsParams { page_size, - cursor_obj.as_ref(), + cursor: cursor_obj.clone(), sort_key, - allowed_sources, - model_provider_filter.as_deref(), - fallback_provider.as_str(), - search_term.as_deref(), - ) + allowed_sources: allowed_sources.to_vec(), + model_providers: model_provider_filter.clone(), + archived, + search_term: search_term.clone(), + }) .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to list threads: {err}"), - data: None, - })? - } else { - RolloutRecorder::list_threads( - &self.config, - page_size, - cursor_obj.as_ref(), - sort_key, - allowed_sources, - model_provider_filter.as_deref(), - fallback_provider.as_str(), - search_term.as_deref(), - ) - .await - .map_err(|err| JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to list threads: {err}"), - data: None, - })? - }; + .map_err(thread_store_list_error)?; let mut filtered = Vec::with_capacity(page.items.len()); for it in page.items { - let Some(summary) = summary_from_thread_list_item( - it, - fallback_provider.as_str(), - state_db_ctx.as_ref(), - ) - .await + let Some(summary) = summary_from_stored_thread(it, fallback_provider.as_str()) else { continue; }; @@ -5176,12 +5142,8 @@ impl CodexMessageProcessor { items.extend(filtered); remaining = requested_page_size.saturating_sub(items.len()); - // Encode RolloutCursor into the JSON-RPC string form returned to clients. let next_cursor_value = page.next_cursor.clone(); - next_cursor = next_cursor_value - .as_ref() - .and_then(|cursor| serde_json::to_value(cursor).ok()) - .and_then(|value| value.as_str().map(str::to_owned)); + next_cursor = next_cursor_value.clone(); if remaining == 0 { break; } @@ -9421,67 +9383,52 @@ fn set_thread_name_from_title(thread: &mut Thread, title: String) { thread.name = Some(title); } -async fn summary_from_thread_list_item( - it: codex_core::ThreadItem, - fallback_provider: &str, - state_db_ctx: Option<&StateDbHandle>, -) -> Option { - if let Some(thread_id) = it.thread_id { - let timestamp = it.created_at.clone(); - let updated_at = it.updated_at.clone().or_else(|| timestamp.clone()); - let model_provider = it - .model_provider - .clone() - .unwrap_or_else(|| fallback_provider.to_string()); - let cwd = it.cwd?; - let cli_version = it.cli_version.unwrap_or_default(); - let source = with_thread_spawn_agent_metadata( - it.source - .unwrap_or(codex_protocol::protocol::SessionSource::Unknown), - it.agent_nickname.clone(), - it.agent_role.clone(), - ); - return Some(ConversationSummary { - conversation_id: thread_id, - path: it.path, - preview: it.first_user_message.unwrap_or_default(), - timestamp, - updated_at, - model_provider, - cwd, - cli_version, - source, - git_info: if it.git_sha.is_none() - && it.git_branch.is_none() - && it.git_origin_url.is_none() - { - None - } else { - Some(ConversationGitInfo { - sha: it.git_sha, - branch: it.git_branch, - origin_url: it.git_origin_url, - }) - }, - }); +fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError { + match err { + ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message, + data: None, + }, + err => JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list threads: {err}"), + data: None, + }, } - if let Some(thread_id) = thread_id_from_rollout_path(it.path.as_path()) { - return read_summary_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await; - } - None } -fn thread_id_from_rollout_path(path: &Path) -> Option { - let file_name = path.file_name()?.to_str()?; - let stem = file_name.strip_suffix(".jsonl")?; - if stem.len() < 37 { - return None; - } - let uuid_start = stem.len().saturating_sub(36); - if !stem[..uuid_start].ends_with('-') { - return None; - } - ThreadId::from_string(&stem[uuid_start..]).ok() +fn summary_from_stored_thread( + thread: StoredThread, + fallback_provider: &str, +) -> Option { + let path = thread.rollout_path?; + let source = with_thread_spawn_agent_metadata( + thread.source, + thread.agent_nickname.clone(), + thread.agent_role.clone(), + ); + let git_info = thread.git_info.map(|git| ConversationGitInfo { + sha: git.commit_hash.map(|sha| sha.0), + branch: git.branch, + origin_url: git.repository_url, + }); + Some(ConversationSummary { + conversation_id: thread.thread_id, + path, + preview: thread.first_user_message.unwrap_or(thread.preview), + timestamp: Some(thread.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), + updated_at: Some(thread.updated_at.to_rfc3339_opts(SecondsFormat::Secs, true)), + model_provider: if thread.model_provider.is_empty() { + fallback_provider.to_string() + } else { + thread.model_provider + }, + cwd: thread.cwd, + cli_version: thread.cli_version, + source, + git_info, + }) } #[allow(clippy::too_many_arguments)] diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index cf1ac4ff11..8fd4e47303 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -563,15 +563,34 @@ sqlite = true /*git_info*/ None, )?; - // `thread/list` only applies `search_term` on the sqlite path. In this test we - // create rollouts manually, so we must also create the sqlite DB and mark backfill - // complete; otherwise app-server will permanently use filesystem fallback. + // `thread/list` applies `search_term` on the sqlite fast path. This test creates + // rollouts manually, so mark the DB backfill complete and then run an unsearched + // list large enough to repair every rollout the searched list should find. let state_db = codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()) .await?; state_db .mark_backfill_complete(/*last_watermark*/ None) .await?; + let rollout_config = codex_rollout::RolloutConfig { + codex_home: codex_home.path().to_path_buf(), + sqlite_home: codex_home.path().to_path_buf(), + cwd: codex_home.path().to_path_buf(), + model_provider_id: "mock_provider".to_string(), + generate_memories: false, + }; + let repaired_page = codex_core::RolloutRecorder::list_threads( + &rollout_config, + /*page_size*/ 10, + /*cursor*/ None, + codex_core::ThreadSortKey::CreatedAt, + &[], + /*model_providers*/ None, + "mock_provider", + /*search_term*/ None, + ) + .await?; + assert_eq!(repaired_page.items.len(), 3); let mut mcp = init_mcp(codex_home.path()).await?; let request_id = mcp diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index a371966079..78dfd3c070 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -57,6 +57,7 @@ codex-rmcp-client = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } codex-terminal-detection = { workspace = true } +codex-thread-store = { workspace = true } codex-tools = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-cache = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a3d7594813..bc7cd07e4c 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -136,9 +136,11 @@ use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputArgs; use codex_protocol::request_user_input::RequestUserInputResponse; use codex_rmcp_client::ElicitationResponse; +use codex_rollout::RolloutConfig; use codex_rollout::state_db; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; +use codex_thread_store::LocalThreadStore; use codex_tools::filter_tool_suggest_discoverable_tools_for_client; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_stream_parser::AssistantTextChunk; @@ -2127,6 +2129,7 @@ impl Session { network_proxy, network_approval: Arc::clone(&network_approval), state_db: state_db_ctx.clone(), + thread_store: LocalThreadStore::new(RolloutConfig::from_view(config.as_ref())), model_client: ModelClient::new( Some(Arc::clone(&auth_manager)), conversation_id, diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 46d4cda57e..fb7bbaadba 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2836,6 +2836,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { network_proxy: None, network_approval: Arc::clone(&network_approval), state_db: None, + thread_store: codex_thread_store::LocalThreadStore::new( + codex_rollout::RolloutConfig::from_view(config.as_ref()), + ), model_client: ModelClient::new( Some(auth_manager.clone()), conversation_id, @@ -3693,6 +3696,9 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( network_proxy: None, network_approval: Arc::clone(&network_approval), state_db: None, + thread_store: codex_thread_store::LocalThreadStore::new( + codex_rollout::RolloutConfig::from_view(config.as_ref()), + ), model_client: ModelClient::new( Some(Arc::clone(&auth_manager)), conversation_id, diff --git a/codex-rs/core/src/personality_migration.rs b/codex-rs/core/src/personality_migration.rs index 52cabf55de..4b53a2629e 100644 --- a/codex-rs/core/src/personality_migration.rs +++ b/codex-rs/core/src/personality_migration.rs @@ -1,14 +1,10 @@ use crate::config::edit::ConfigEditsBuilder; -use crate::rollout::ARCHIVED_SESSIONS_SUBDIR; -use crate::rollout::SESSIONS_SUBDIR; -use crate::rollout::list::ThreadListConfig; -use crate::rollout::list::ThreadListLayout; -use crate::rollout::list::ThreadSortKey; -use crate::rollout::list::get_threads_in_root; use codex_config::config_toml::ConfigToml; use codex_protocol::config_types::Personality; -use codex_protocol::protocol::SessionSource; -use codex_rollout::state_db; +use codex_thread_store::ListThreadsParams; +use codex_thread_store::LocalThreadStore; +use codex_thread_store::ThreadSortKey; +use codex_thread_store::ThreadStore; use std::io; use std::path::Path; use tokio::fs::OpenOptions; @@ -64,57 +60,33 @@ pub async fn maybe_migrate_personality( } async fn has_recorded_sessions(codex_home: &Path, default_provider: &str) -> io::Result { - let allowed_sources: &[SessionSource] = &[]; + let store = LocalThreadStore::new(codex_rollout::RolloutConfig { + codex_home: codex_home.to_path_buf(), + sqlite_home: codex_home.to_path_buf(), + cwd: codex_home.to_path_buf(), + model_provider_id: default_provider.to_string(), + generate_memories: false, + }); + if has_threads(&store, /*archived*/ false).await? { + return Ok(true); + } + has_threads(&store, /*archived*/ true).await +} - if let Some(state_db_ctx) = state_db::open_if_present(codex_home, default_provider).await - && let Some(ids) = state_db::list_thread_ids_db( - Some(state_db_ctx.as_ref()), - codex_home, - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - allowed_sources, - /*model_providers*/ None, - /*archived_only*/ false, - "personality_migration", - ) +async fn has_threads(store: &LocalThreadStore, archived: bool) -> io::Result { + store + .list_threads(ListThreadsParams { + page_size: 1, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived, + search_term: None, + }) .await - && !ids.is_empty() - { - return Ok(true); - } - - let sessions = get_threads_in_root( - codex_home.join(SESSIONS_SUBDIR), - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - ThreadListConfig { - allowed_sources, - model_providers: None, - default_provider, - layout: ThreadListLayout::NestedByDate, - }, - ) - .await?; - if !sessions.items.is_empty() { - return Ok(true); - } - - let archived_sessions = get_threads_in_root( - codex_home.join(ARCHIVED_SESSIONS_SUBDIR), - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - ThreadListConfig { - allowed_sources, - model_providers: None, - default_provider, - layout: ThreadListLayout::Flat, - }, - ) - .await?; - Ok(!archived_sessions.items.is_empty()) + .map(|page| !page.items.is_empty()) + .map_err(io::Error::other) } async fn create_marker(marker_path: &Path) -> io::Result<()> { diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index de1070ad34..4aef53a5c4 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -7,6 +7,8 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::UserMessageEvent; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; +use codex_rollout::SESSIONS_SUBDIR; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::io::AsyncWriteExt; @@ -25,6 +27,16 @@ async fn write_session_with_user_event(codex_home: &Path) -> io::Result<()> { .join("2025") .join("01") .join("01"); + write_rollout_with_user_event(&dir, thread_id).await +} + +async fn write_archived_session_with_user_event(codex_home: &Path) -> io::Result<()> { + let thread_id = ThreadId::new(); + let dir = codex_home.join(ARCHIVED_SESSIONS_SUBDIR); + write_rollout_with_user_event(&dir, thread_id).await +} + +async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::Result<()> { tokio::fs::create_dir_all(&dir).await?; let file_path = dir.join(format!("rollout-{TEST_TIMESTAMP}-{thread_id}.jsonl")); let mut file = tokio::fs::File::create(&file_path).await?; @@ -85,6 +97,22 @@ async fn applies_when_sessions_exist_and_no_personality() -> io::Result<()> { Ok(()) } +#[tokio::test] +async fn applies_when_only_archived_sessions_exist_and_no_personality() -> io::Result<()> { + let temp = TempDir::new()?; + write_archived_session_with_user_event(temp.path()).await?; + + let config_toml = ConfigToml::default(); + let status = maybe_migrate_personality(temp.path(), &config_toml).await?; + + assert_eq!(status, PersonalityMigrationStatus::Applied); + assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists()); + + let persisted = read_config_toml(temp.path()).await?; + assert_eq!(persisted.personality, Some(Personality::Pragmatic)); + Ok(()) +} + #[tokio::test] async fn skips_when_marker_exists() -> io::Result<()> { let temp = TempDir::new()?; diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs index 6e9815f562..30cbd6f636 100644 --- a/codex-rs/core/src/realtime_context.rs +++ b/codex-rs/core/src/realtime_context.rs @@ -4,8 +4,10 @@ use crate::event_mapping::is_contextual_user_message_content; use chrono::Utc; use codex_git_utils::resolve_root_git_project_for_trust; use codex_protocol::models::ResponseItem; -use codex_state::SortKey; -use codex_state::ThreadMetadata; +use codex_thread_store::ListThreadsParams; +use codex_thread_store::StoredThread; +use codex_thread_store::ThreadSortKey; +use codex_thread_store::ThreadStore; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_output_truncation::truncate_text; use dirs::home_dir; @@ -98,7 +100,7 @@ pub(crate) async fn build_realtime_startup_context( } if let Some(section) = format_section( "Notes", - Some("Built at realtime startup from the current thread history, persisted thread metadata in the state DB, and a bounded local workspace scan. This excludes repo memory instructions, AGENTS files, project-doc prompt blends, and memory summaries.".to_string()), + Some("Built at realtime startup from the current thread history, local thread metadata, and a bounded local workspace scan. This excludes repo memory instructions, AGENTS files, project-doc prompt blends, and memory summaries.".to_string()), NOTES_SECTION_TOKEN_BUDGET, ) { parts.push(section); @@ -117,33 +119,31 @@ pub(crate) async fn build_realtime_startup_context( Some(context) } -async fn load_recent_threads(sess: &Session) -> Vec { - let Some(state_db) = sess.services.state_db.as_ref() else { - return Vec::new(); - }; - - match state_db - .list_threads( - MAX_RECENT_THREADS, - /*anchor*/ None, - SortKey::UpdatedAt, - &[], - /*model_providers*/ None, - /*archived_only*/ false, - /*search_term*/ None, - ) +async fn load_recent_threads(sess: &Session) -> Vec { + match sess + .services + .thread_store + .list_threads(ListThreadsParams { + page_size: MAX_RECENT_THREADS, + cursor: None, + sort_key: ThreadSortKey::UpdatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: false, + search_term: None, + }) .await { Ok(page) => page.items, Err(err) => { - warn!("failed to load realtime startup threads from state db: {err}"); + warn!("failed to load realtime startup threads from thread store: {err}"); Vec::new() } } } -fn build_recent_work_section(cwd: &Path, recent_threads: &[ThreadMetadata]) -> Option { - let mut groups: HashMap> = HashMap::new(); +fn build_recent_work_section(cwd: &Path, recent_threads: &[StoredThread]) -> Option { + let mut groups: HashMap> = HashMap::new(); for entry in recent_threads { let group = resolve_root_git_project_for_trust(&entry.cwd).unwrap_or_else(|| entry.cwd.clone()); @@ -446,7 +446,7 @@ fn format_section(title: &str, body: Option, budget_tokens: usize) -> Op fn format_thread_group( current_group: &Path, group: &Path, - entries: Vec<&ThreadMetadata>, + entries: Vec<&StoredThread>, ) -> Option { let latest = entries.first()?; let group_label = if resolve_root_git_project_for_trust(latest.cwd.as_path()).is_some() { @@ -461,8 +461,9 @@ fn format_thread_group( ]; if let Some(git_branch) = latest - .git_branch - .as_deref() + .git_info + .as_ref() + .and_then(|git| git.branch.as_deref()) .filter(|git_branch| !git_branch.is_empty()) { lines.push(format!("Latest branch: {git_branch}")); diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index 7495161d46..7e1e82476f 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -3,20 +3,31 @@ use super::build_recent_work_section; use super::build_workspace_section_with_user_root; use chrono::TimeZone; use chrono::Utc; +use codex_git_utils::GitSha; use codex_protocol::ThreadId; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; -use codex_state::ThreadMetadata; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::GitInfo; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_thread_store::StoredThread; use pretty_assertions::assert_eq; use std::fs; use std::path::PathBuf; use std::process::Command; use tempfile::TempDir; -fn thread_metadata(cwd: &str, title: &str, first_user_message: &str) -> ThreadMetadata { - ThreadMetadata { - id: ThreadId::new(), - rollout_path: PathBuf::from("/tmp/rollout.jsonl"), +fn stored_thread(cwd: &str, title: &str, first_user_message: &str) -> StoredThread { + StoredThread { + thread_id: ThreadId::new(), + rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), + forked_from_id: None, + preview: first_user_message.to_string(), + name: (!title.is_empty()).then(|| title.to_string()), + model_provider: "test-provider".to_string(), + model: Some("gpt-5".to_string()), + reasoning_effort: None, created_at: Utc .timestamp_opt(1_709_251_100, 0) .single() @@ -25,24 +36,23 @@ fn thread_metadata(cwd: &str, title: &str, first_user_message: &str) -> ThreadMe .timestamp_opt(1_709_251_200, 0) .single() .expect("valid timestamp"), - source: "cli".to_string(), - agent_path: None, - agent_nickname: None, - agent_role: None, - model_provider: "test-provider".to_string(), - model: Some("gpt-5".to_string()), - reasoning_effort: None, + archived_at: None, cwd: PathBuf::from(cwd), cli_version: "test".to_string(), - title: title.to_string(), - sandbox_policy: "workspace-write".to_string(), - approval_mode: "never".to_string(), - tokens_used: 0, + source: SessionSource::Cli, + agent_nickname: None, + agent_role: None, + agent_path: None, + git_info: Some(GitInfo { + commit_hash: Some(GitSha::new("abcdef")), + branch: Some("main".to_string()), + repository_url: None, + }), + approval_mode: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + token_usage: None, first_user_message: Some(first_user_message.to_string()), - archived_at: None, - git_sha: None, - git_branch: Some("main".to_string()), - git_origin_url: None, + history: None, } } @@ -224,17 +234,17 @@ fn recent_work_section_groups_threads_by_cwd() { fs::create_dir_all(&outside).expect("create outside dir"); let recent_threads = vec![ - thread_metadata( + stored_thread( workspace_a.to_string_lossy().as_ref(), "Investigate realtime startup context", "Log the startup context before sending it", ), - thread_metadata( + stored_thread( workspace_b.to_string_lossy().as_ref(), "Trim websocket startup payload", "Remove memories from the realtime startup context", ), - thread_metadata(outside.to_string_lossy().as_ref(), "", "Inspect flaky test"), + stored_thread(outside.to_string_lossy().as_ref(), "", "Inspect flaky test"), ]; let current_cwd = workspace_a; let repo = fs::canonicalize(repo).expect("canonicalize repo"); diff --git a/codex-rs/core/src/rollout.rs b/codex-rs/core/src/rollout.rs index 26e3a84d5c..4913f957ea 100644 --- a/codex-rs/core/src/rollout.rs +++ b/codex-rs/core/src/rollout.rs @@ -46,11 +46,7 @@ impl codex_rollout::RolloutConfigView for Config { } pub(crate) mod list { - pub use codex_rollout::ThreadListConfig; - pub use codex_rollout::ThreadListLayout; - pub use codex_rollout::ThreadSortKey; pub use codex_rollout::find_thread_path_by_id_str; - pub use codex_rollout::get_threads_in_root; } pub(crate) mod metadata { diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 4b6c5b30f5..5db38f7b72 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -24,6 +24,7 @@ use codex_mcp::McpConnectionManager; use codex_models_manager::manager::ModelsManager; use codex_otel::SessionTelemetry; use codex_rollout::state_db::StateDbHandle; +use codex_thread_store::LocalThreadStore; use std::path::PathBuf; use tokio::sync::Mutex; use tokio::sync::RwLock; @@ -59,6 +60,7 @@ pub(crate) struct SessionServices { pub(crate) network_proxy: Option, pub(crate) network_approval: Arc, pub(crate) state_db: Option, + pub(crate) thread_store: LocalThreadStore, /// Session-scoped model client shared across turns. pub(crate) model_client: ModelClient, pub(crate) code_mode_service: CodeModeService, diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index add4065f7b..ecaaeb8cba 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -194,10 +194,16 @@ async fn seed_recent_thread( let db = test.codex.state_db().context("state db enabled")?; let thread_id = ThreadId::new(); let updated_at = Utc::now(); + let rollout_path = test + .codex_home_path() + .join(format!("rollout-{thread_id}.jsonl")); + // This helper seeds SQLite metadata directly. Local listing drops stale metadata rows whose + // rollout path no longer exists, so create the placeholder path that the test metadata points + // at without exercising rollout writing in this realtime-context test. + std::fs::write(&rollout_path, "")?; let mut metadata_builder = codex_state::ThreadMetadataBuilder::new( thread_id, - test.codex_home_path() - .join(format!("rollout-{thread_id}.jsonl")), + rollout_path, updated_at, SessionSource::Cli, ); diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 4f88ff1a21..ab1ae0a5bf 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -279,6 +279,10 @@ impl RolloutRecorder { let codex_home = config.codex_home(); let state_db_ctx = state_db::get_state_db(config).await; + // Search is the SQLite-optimized path and assumes a DB marked backfill-complete is + // actually populated enough to answer the query. If unmigrated rollout files still exist + // on disk, the repair path below may or may not run and catch them depending on whether + // SQLite already has another matching search hit. if search_term.is_some() && let Some(db_page) = state_db::list_threads_db( state_db_ctx.as_deref(), diff --git a/codex-rs/thread-store/Cargo.toml b/codex-rs/thread-store/Cargo.toml index a8f22fbfe5..328d9a4a7f 100644 --- a/codex-rs/thread-store/Cargo.toml +++ b/codex-rs/thread-store/Cargo.toml @@ -14,6 +14,16 @@ workspace = true [dependencies] async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } +codex-git-utils = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } thiserror = { workspace = true } + +[dev-dependencies] +codex-state = { workspace = true } +pretty_assertions = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros"] } +uuid = { workspace = true } diff --git a/codex-rs/thread-store/src/lib.rs b/codex-rs/thread-store/src/lib.rs index 5525db1394..a64bf023be 100644 --- a/codex-rs/thread-store/src/lib.rs +++ b/codex-rs/thread-store/src/lib.rs @@ -5,12 +5,14 @@ //! any other backing store. mod error; +mod local; mod recorder; mod store; mod types; pub use error::ThreadStoreError; pub use error::ThreadStoreResult; +pub use local::LocalThreadStore; pub use recorder::ThreadRecorder; pub use store::ThreadStore; pub use types::AppendThreadItemsParams; diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs new file mode 100644 index 0000000000..cc15033cd9 --- /dev/null +++ b/codex-rs/thread-store/src/local/mod.rs @@ -0,0 +1,565 @@ +use async_trait::async_trait; +use chrono::DateTime; +use chrono::Utc; +use codex_git_utils::GitSha; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::GitInfo; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_rollout::RolloutConfig; +use codex_rollout::RolloutRecorder; +use codex_rollout::ThreadItem; +use codex_rollout::parse_cursor; + +use crate::AppendThreadItemsParams; +use crate::ArchiveThreadParams; +use crate::CreateThreadParams; +use crate::ListThreadsParams; +use crate::LoadThreadHistoryParams; +use crate::ReadThreadParams; +use crate::ResumeThreadRecorderParams; +use crate::SetThreadNameParams; +use crate::StoredThread; +use crate::StoredThreadHistory; +use crate::ThreadPage; +use crate::ThreadRecorder; +use crate::ThreadSortKey; +use crate::ThreadStore; +use crate::ThreadStoreError; +use crate::ThreadStoreResult; +use crate::UpdateThreadMetadataParams; + +/// Local filesystem/SQLite-backed implementation of [`ThreadStore`]. +#[derive(Clone, Debug)] +pub struct LocalThreadStore { + config: RolloutConfig, +} + +impl LocalThreadStore { + /// Create a local store from the rollout configuration used by existing local persistence. + pub fn new(config: RolloutConfig) -> Self { + Self { config } + } +} + +#[async_trait] +impl ThreadStore for LocalThreadStore { + async fn create_thread( + &self, + _params: CreateThreadParams, + ) -> ThreadStoreResult> { + unsupported("create_thread") + } + + async fn resume_thread_recorder( + &self, + _params: ResumeThreadRecorderParams, + ) -> ThreadStoreResult> { + unsupported("resume_thread_recorder") + } + + async fn append_items(&self, _params: AppendThreadItemsParams) -> ThreadStoreResult<()> { + unsupported("append_items") + } + + async fn load_history( + &self, + _params: LoadThreadHistoryParams, + ) -> ThreadStoreResult { + unsupported("load_history") + } + + async fn read_thread(&self, _params: ReadThreadParams) -> ThreadStoreResult { + unsupported("read_thread") + } + + async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult { + let cursor = params + .cursor + .as_deref() + .map(|cursor| { + parse_cursor(cursor).ok_or_else(|| ThreadStoreError::InvalidRequest { + message: format!("invalid cursor: {cursor}"), + }) + }) + .transpose()?; + let sort_key = match params.sort_key { + ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt, + ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt, + }; + let page = list_rollout_threads(&self.config, ¶ms, cursor.as_ref(), sort_key).await?; + + let next_cursor = page + .next_cursor + .as_ref() + .and_then(|cursor| serde_json::to_value(cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); + let items = page + .items + .into_iter() + .filter_map(|item| { + stored_thread_from_rollout_item( + item, + params.archived, + self.config.model_provider_id.as_str(), + ) + }) + .collect::>(); + + Ok(ThreadPage { items, next_cursor }) + } + + async fn set_thread_name(&self, _params: SetThreadNameParams) -> ThreadStoreResult<()> { + unsupported("set_thread_name") + } + + async fn update_thread_metadata( + &self, + _params: UpdateThreadMetadataParams, + ) -> ThreadStoreResult { + unsupported("update_thread_metadata") + } + + async fn archive_thread(&self, _params: ArchiveThreadParams) -> ThreadStoreResult<()> { + unsupported("archive_thread") + } + + async fn unarchive_thread( + &self, + _params: ArchiveThreadParams, + ) -> ThreadStoreResult { + unsupported("unarchive_thread") + } +} + +fn unsupported(operation: &str) -> ThreadStoreResult { + Err(ThreadStoreError::Internal { + message: format!("local thread store does not implement {operation} in this slice"), + }) +} + +async fn list_rollout_threads( + config: &RolloutConfig, + params: &ListThreadsParams, + cursor: Option<&codex_rollout::Cursor>, + sort_key: codex_rollout::ThreadSortKey, +) -> ThreadStoreResult { + let page = if params.archived { + RolloutRecorder::list_archived_threads( + config, + params.page_size, + cursor, + sort_key, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + config.model_provider_id.as_str(), + params.search_term.as_deref(), + ) + .await + } else { + RolloutRecorder::list_threads( + config, + params.page_size, + cursor, + sort_key, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + config.model_provider_id.as_str(), + params.search_term.as_deref(), + ) + .await + }; + page.map_err(|err| ThreadStoreError::Internal { + message: format!("failed to list threads: {err}"), + }) +} + +fn stored_thread_from_rollout_item( + item: ThreadItem, + archived: bool, + default_provider: &str, +) -> Option { + let thread_id = item + .thread_id + .or_else(|| thread_id_from_rollout_path(item.path.as_path()))?; + let created_at = parse_rfc3339(item.created_at.as_deref()).unwrap_or_else(Utc::now); + let updated_at = parse_rfc3339(item.updated_at.as_deref()).unwrap_or(created_at); + let archived_at = archived.then_some(updated_at); + let git_info = git_info_from_parts( + item.git_sha.clone(), + item.git_branch.clone(), + item.git_origin_url.clone(), + ); + let source = item.source.unwrap_or(SessionSource::Unknown); + let preview = item.first_user_message.clone().unwrap_or_default(); + + Some(StoredThread { + thread_id, + rollout_path: Some(item.path), + forked_from_id: None, + preview, + name: None, + model_provider: item + .model_provider + .filter(|provider| !provider.is_empty()) + .unwrap_or_else(|| default_provider.to_string()), + model: None, + reasoning_effort: None, + created_at, + updated_at, + archived_at, + cwd: item.cwd.unwrap_or_default(), + cli_version: item.cli_version.unwrap_or_default(), + source, + agent_nickname: item.agent_nickname, + agent_role: item.agent_role, + agent_path: None, + git_info, + approval_mode: AskForApproval::OnRequest, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + token_usage: None, + first_user_message: item.first_user_message, + history: None, + }) +} + +fn parse_rfc3339(value: Option<&str>) -> Option> { + DateTime::parse_from_rfc3339(value?) + .ok() + .map(|dt| dt.with_timezone(&Utc)) +} + +fn git_info_from_parts( + sha: Option, + branch: Option, + origin_url: Option, +) -> Option { + if sha.is_none() && branch.is_none() && origin_url.is_none() { + return None; + } + Some(GitInfo { + commit_hash: sha.as_deref().map(GitSha::new), + branch, + repository_url: origin_url, + }) +} + +fn thread_id_from_rollout_path(path: &std::path::Path) -> Option { + let file_name = path.file_name()?.to_str()?; + let stem = file_name.strip_suffix(".jsonl")?; + if stem.len() < 37 { + return None; + } + let uuid_start = stem.len().saturating_sub(36); + if !stem[..uuid_start].ends_with('-') { + return None; + } + codex_protocol::ThreadId::from_string(&stem[uuid_start..]).ok() +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::io::Write; + use std::path::Path; + use std::path::PathBuf; + + use codex_protocol::ThreadId; + use codex_protocol::protocol::SessionSource; + use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; + use pretty_assertions::assert_eq; + use tempfile::TempDir; + use uuid::Uuid; + + use super::*; + + fn test_config(codex_home: &Path) -> RolloutConfig { + RolloutConfig { + codex_home: codex_home.to_path_buf(), + sqlite_home: codex_home.to_path_buf(), + cwd: codex_home.to_path_buf(), + model_provider_id: "test-provider".to_string(), + generate_memories: true, + } + } + + fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result { + write_session_file_with( + root, + root.join("sessions/2025/01/03"), + ts, + uuid, + "Hello from user", + Some("test-provider"), + ) + } + + fn write_archived_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result { + write_session_file_with( + root, + root.join(ARCHIVED_SESSIONS_SUBDIR), + ts, + uuid, + "Archived user message", + Some("test-provider"), + ) + } + + fn write_session_file_with( + root: &Path, + day_dir: PathBuf, + ts: &str, + uuid: Uuid, + first_user_message: &str, + model_provider: Option<&str>, + ) -> std::io::Result { + fs::create_dir_all(&day_dir)?; + let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); + let mut file = fs::File::create(&path)?; + let meta = serde_json::json!({ + "timestamp": ts, + "type": "session_meta", + "payload": { + "id": uuid, + "timestamp": ts, + "cwd": root, + "originator": "test_originator", + "cli_version": "test_version", + "source": "cli", + "model_provider": model_provider, + "git": { + "commit_hash": "abcdef", + "branch": "main", + "repository_url": "https://example.com/repo.git" + } + }, + }); + writeln!(file, "{meta}")?; + let user_event = serde_json::json!({ + "timestamp": ts, + "type": "event_msg", + "payload": { + "type": "user_message", + "message": first_user_message, + "kind": "plain", + }, + }); + writeln!(file, "{user_event}")?; + Ok(path) + } + + #[tokio::test] + async fn list_threads_uses_default_provider_when_rollout_omits_provider() { + let home = TempDir::new().expect("temp dir"); + let store = LocalThreadStore::new(test_config(home.path())); + write_session_file_with( + home.path(), + home.path().join("sessions/2025/01/03"), + "2025-01-03T12-00-00", + Uuid::from_u128(102), + "Hello from user", + /*model_provider*/ None, + ) + .expect("session file"); + + let page = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: false, + search_term: None, + }) + .await + .expect("thread listing"); + + assert_eq!(page.items.len(), 1); + assert_eq!(page.items[0].model_provider, "test-provider"); + } + + #[tokio::test] + async fn list_threads_preserves_sqlite_title_search_results() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let store = LocalThreadStore::new(config.clone()); + let uuid = Uuid::from_u128(103); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let rollout_path = home.path().join("rollout-title-search.jsonl"); + fs::write(&rollout_path, "").expect("placeholder rollout file"); + + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + let created_at = Utc::now(); + let mut builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + rollout_path, + created_at, + SessionSource::Cli, + ); + builder.model_provider = Some(config.model_provider_id.clone()); + builder.cwd = home.path().to_path_buf(); + builder.cli_version = Some("test_version".to_string()); + let mut metadata = builder.build(config.model_provider_id.as_str()); + metadata.title = "needle title".to_string(); + metadata.first_user_message = Some("plain preview".to_string()); + runtime + .upsert_thread(&metadata) + .await + .expect("state db upsert should succeed"); + + let page = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: false, + search_term: Some("needle".to_string()), + }) + .await + .expect("thread listing"); + + let ids = page + .items + .iter() + .map(|item| item.thread_id) + .collect::>(); + assert_eq!(ids, vec![thread_id]); + assert_eq!( + page.items[0].first_user_message.as_deref(), + Some("plain preview") + ); + } + + #[tokio::test] + async fn list_threads_selects_active_or_archived_collection() { + let home = TempDir::new().expect("temp dir"); + let store = LocalThreadStore::new(test_config(home.path())); + let active_uuid = Uuid::from_u128(105); + let archived_uuid = Uuid::from_u128(106); + write_session_file(home.path(), "2025-01-03T12-00-00", active_uuid) + .expect("active session file"); + write_archived_session_file(home.path(), "2025-01-03T13-00-00", archived_uuid) + .expect("archived session file"); + + let active = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: false, + search_term: None, + }) + .await + .expect("active listing"); + let archived = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: true, + search_term: None, + }) + .await + .expect("archived listing"); + + let active_id = ThreadId::from_string(&active_uuid.to_string()).expect("valid thread id"); + let archived_id = + ThreadId::from_string(&archived_uuid.to_string()).expect("valid thread id"); + assert_eq!( + active + .items + .iter() + .map(|item| item.thread_id) + .collect::>(), + vec![active_id] + ); + assert_eq!( + archived + .items + .iter() + .map(|item| item.thread_id) + .collect::>(), + vec![archived_id] + ); + assert_eq!(active.items[0].archived_at, None); + assert_eq!( + archived.items[0].archived_at, + Some(archived.items[0].updated_at) + ); + } + + #[tokio::test] + async fn list_threads_returns_local_rollout_summary() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let store = LocalThreadStore::new(config); + let uuid = Uuid::from_u128(101); + let path = + write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + + let page = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: vec![SessionSource::Cli], + model_providers: Some(vec!["test-provider".to_string()]), + archived: false, + search_term: None, + }) + .await + .expect("thread listing"); + + let thread_id = + codex_protocol::ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + assert_eq!(page.next_cursor, None); + assert_eq!(page.items.len(), 1); + assert_eq!(page.items[0].thread_id, thread_id); + assert_eq!(page.items[0].rollout_path, Some(path)); + assert_eq!(page.items[0].preview, "Hello from user"); + assert_eq!( + page.items[0].first_user_message.as_deref(), + Some("Hello from user") + ); + assert_eq!(page.items[0].model_provider, "test-provider"); + assert_eq!(page.items[0].cli_version, "test_version"); + assert_eq!(page.items[0].source, SessionSource::Cli); + } + + #[tokio::test] + async fn list_threads_rejects_invalid_cursor() { + let home = TempDir::new().expect("temp dir"); + let store = LocalThreadStore::new(test_config(home.path())); + + let err = store + .list_threads(ListThreadsParams { + page_size: 10, + cursor: Some("not-a-cursor".to_string()), + sort_key: ThreadSortKey::CreatedAt, + allowed_sources: Vec::new(), + model_providers: None, + archived: false, + search_term: None, + }) + .await + .expect_err("invalid cursor should fail"); + + assert!(matches!(err, ThreadStoreError::InvalidRequest { .. })); + } +} diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 73f983a466..8d489ea6cb 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -135,6 +135,8 @@ pub struct ThreadPage { pub struct StoredThread { /// Thread id. pub thread_id: ThreadId, + /// Local rollout path when the backing store is filesystem-based. + pub rollout_path: Option, /// Source thread id when this thread was forked from another thread. pub forked_from_id: Option, /// Best available user-facing preview, usually the first user message. @@ -155,6 +157,8 @@ pub struct StoredThread { pub archived_at: Option>, /// Working directory captured for the thread. pub cwd: PathBuf, + /// CLI version captured for the thread. + pub cli_version: String, /// Runtime source for the thread. pub source: SessionSource, /// Optional random nickname for thread-spawn sub-agents.