diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 760b27a53b..6c0956acf5 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -186,7 +186,8 @@ use codex_core::rollout_date_parts; use codex_core::sandboxing::SandboxPermissions; use codex_core::skills::remote::download_remote_skill; use codex_core::skills::remote::list_remote_skills; -use codex_core::state_db::get_state_db; +use codex_core::state_db::StateDbHandle; +use codex_core::state_db::open_if_present; use codex_core::token_data::parse_id_token; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_feedback::CodexFeedback; @@ -1976,7 +1977,11 @@ impl CodexMessageProcessor { let rollout_path_display = archived_path.display().to_string(); let fallback_provider = self.config.model_provider_id.clone(); - let state_db_ctx = get_state_db(&self.config, None).await; + let state_db_ctx = open_if_present( + &self.config.codex_home, + self.config.model_provider_id.as_str(), + ) + .await; let archived_folder = self .config .codex_home @@ -2285,23 +2290,44 @@ impl CodexMessageProcessor { } }; - let rollout_path = - match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) - .await - { - Ok(Some(path)) => Some(path), - Ok(None) => None, - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate thread id {thread_uuid}: {err}"), - ) - .await; - return; - } - }; + let db_summary = read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await; + let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone()); + if rollout_path.is_none() || include_turns { + rollout_path = + match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) + .await + { + Ok(Some(path)) => Some(path), + Ok(None) => { + if include_turns { + None + } else { + rollout_path + } + } + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate thread id {thread_uuid}: {err}"), + ) + .await; + return; + } + }; + } - let mut thread = if let Some(rollout_path) = rollout_path.as_ref() { + if include_turns && rollout_path.is_none() && db_summary.is_some() { + self.send_internal_error( + request_id, + format!("failed to locate rollout for thread {thread_uuid}"), + ) + .await; + return; + } + + let mut thread = if let Some(summary) = db_summary { + summary_to_thread(summary) + } else if let Some(rollout_path) = rollout_path.as_ref() { let fallback_provider = self.config.model_provider_id.as_str(); match read_summary_from_rollout(rollout_path, fallback_provider).await { Ok(summary) => summary_to_thread(summary), @@ -2608,8 +2634,8 @@ impl CodexMessageProcessor { developer_instructions, } = params; - let rollout_path = if let Some(path) = path { - path + let (rollout_path, source_thread_id) = if let Some(path) = path { + (path, None) } else { let existing_thread_id = match ThreadId::from_string(&thread_id) { Ok(id) => id, @@ -2630,7 +2656,7 @@ impl CodexMessageProcessor { ) .await { - Ok(Some(p)) => p, + Ok(Some(p)) => (p, Some(existing_thread_id)), Ok(None) => { self.send_invalid_request_error( request_id, @@ -2650,14 +2676,9 @@ impl CodexMessageProcessor { } }; - let history_cwd = match read_session_meta_line(&rollout_path).await { - Ok(meta_line) => Some(meta_line.meta.cwd), - Err(err) => { - let rollout_path = rollout_path.display(); - warn!("failed to read session metadata from rollout {rollout_path}: {err}"); - None - } - }; + let history_cwd = + read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path()) + .await; // Persist windows sandbox feature. let mut cli_overrides = cli_overrides.unwrap_or_default(); @@ -2807,6 +2828,15 @@ impl CodexMessageProcessor { request_id: RequestId, params: GetConversationSummaryParams, ) { + if let GetConversationSummaryParams::ThreadId { conversation_id } = ¶ms + && let Some(summary) = + read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await + { + let response = GetConversationSummaryResponse { summary }; + self.outgoing.send_response(request_id, response).await; + return; + } + let path = match params { GetConversationSummaryParams::RolloutPath { rollout_path } => { if rollout_path.is_relative() { @@ -2931,6 +2961,11 @@ impl CodexMessageProcessor { let fallback_provider = self.config.model_provider_id.clone(); let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); + let state_db_ctx = open_if_present( + &self.config.codex_home, + self.config.model_provider_id.as_str(), + ) + .await; while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); @@ -2968,31 +3003,26 @@ impl CodexMessageProcessor { })? }; - let mut filtered = page - .items - .into_iter() - .filter_map(|it| { - let updated_at = it.updated_at.clone(); - let session_meta_line = it.head.first().and_then(|first| { - serde_json::from_value::(first.clone()).ok() - })?; - extract_conversation_summary( - it.path, - &it.head, - &session_meta_line.meta, - session_meta_line.git.as_ref(), - fallback_provider.as_str(), - updated_at, - ) - }) - .filter(|summary| { - source_kind_filter - .as_ref() - .is_none_or(|filter| source_kind_matches(&summary.source, filter)) - }) - .collect::>(); - if filtered.len() > remaining { - filtered.truncate(remaining); + let mut filtered = Vec::with_capacity(page.items.len()); + for it in page.items { + let Some(summary) = summary_from_thread_list_item( + it, + fallback_provider.as_str(), + state_db_ctx.as_ref(), + ) + .await + else { + continue; + }; + if source_kind_filter + .as_ref() + .is_none_or(|filter| source_kind_matches(&summary.source, filter)) + { + filtered.push(summary); + if filtered.len() >= remaining { + break; + } + } } items.extend(filtered); remaining = requested_page_size.saturating_sub(items.len()); @@ -3581,13 +3611,13 @@ impl CodexMessageProcessor { } = params; // Derive a Config using the same logic as new conversation, honoring overrides if provided. - let rollout_path = if let Some(path) = path { - path + let (rollout_path, source_thread_id) = if let Some(path) = path { + (path, None) } else if let Some(conversation_id) = conversation_id { match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string()) .await { - Ok(Some(found_path)) => found_path, + Ok(Some(found_path)) => (found_path, Some(conversation_id)), Ok(None) => { self.send_invalid_request_error( request_id, @@ -3614,14 +3644,9 @@ impl CodexMessageProcessor { return; }; - let history_cwd = match read_session_meta_line(&rollout_path).await { - Ok(meta_line) => Some(meta_line.meta.cwd), - Err(err) => { - let rollout_path = rollout_path.display(); - warn!("failed to read session metadata from rollout {rollout_path}: {err}"); - None - } - }; + let history_cwd = + read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path()) + .await; let (typesafe_overrides, request_overrides) = match overrides { Some(overrides) => { @@ -3909,7 +3934,11 @@ impl CodexMessageProcessor { } if state_db_ctx.is_none() { - state_db_ctx = get_state_db(&self.config, None).await; + state_db_ctx = open_if_present( + &self.config.codex_home, + self.config.model_provider_id.as_str(), + ) + .await; } // Move the rollout file to archived. @@ -5094,6 +5123,168 @@ async fn derive_config_for_cwd( .await } +async fn read_history_cwd_from_state_db( + config: &Config, + thread_id: Option, + rollout_path: &Path, +) -> Option { + if let Some(state_db_ctx) = + open_if_present(&config.codex_home, config.model_provider_id.as_str()).await + && let Some(thread_id) = thread_id + && let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await + { + return Some(metadata.cwd); + } + + match read_session_meta_line(rollout_path).await { + Ok(meta_line) => Some(meta_line.meta.cwd), + Err(err) => { + let rollout_path = rollout_path.display(); + warn!("failed to read session metadata from rollout {rollout_path}: {err}"); + None + } + } +} + +async fn read_summary_from_state_db_by_thread_id( + config: &Config, + thread_id: ThreadId, +) -> Option { + let state_db_ctx = open_if_present(&config.codex_home, config.model_provider_id.as_str()).await; + read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await +} + +async fn read_summary_from_state_db_context_by_thread_id( + state_db_ctx: Option<&StateDbHandle>, + thread_id: ThreadId, +) -> Option { + let state_db_ctx = state_db_ctx?; + + let metadata = match state_db_ctx.get_thread(thread_id).await { + Ok(Some(metadata)) => metadata, + Ok(None) | Err(_) => return None, + }; + Some(summary_from_state_db_metadata( + metadata.id, + metadata.rollout_path, + metadata.first_user_message, + metadata + .created_at + .to_rfc3339_opts(SecondsFormat::Secs, true), + metadata + .updated_at + .to_rfc3339_opts(SecondsFormat::Secs, true), + metadata.model_provider, + metadata.cwd, + metadata.cli_version, + metadata.source, + metadata.git_sha, + metadata.git_branch, + metadata.git_origin_url, + )) +} + +async fn summary_from_thread_list_item( + it: codex_core::ThreadItem, + fallback_provider: &str, + state_db_ctx: Option<&StateDbHandle>, +) -> Option { + if let Some(thread_id) = it.thread_id { + let timestamp = it.created_at.clone(); + let updated_at = it.updated_at.clone().or_else(|| timestamp.clone()); + let model_provider = it + .model_provider + .clone() + .unwrap_or_else(|| fallback_provider.to_string()); + let cwd = it.cwd?; + let cli_version = it.cli_version.unwrap_or_default(); + let source = it + .source + .unwrap_or(codex_protocol::protocol::SessionSource::Unknown); + return Some(ConversationSummary { + conversation_id: thread_id, + path: it.path, + preview: it.first_user_message.unwrap_or_default(), + timestamp, + updated_at, + model_provider, + cwd, + cli_version, + source, + git_info: if it.git_sha.is_none() + && it.git_branch.is_none() + && it.git_origin_url.is_none() + { + None + } else { + Some(ConversationGitInfo { + sha: it.git_sha, + branch: it.git_branch, + origin_url: it.git_origin_url, + }) + }, + }); + } + if let Some(thread_id) = thread_id_from_rollout_path(it.path.as_path()) { + return read_summary_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await; + } + None +} + +fn thread_id_from_rollout_path(path: &Path) -> Option { + let file_name = path.file_name()?.to_str()?; + let stem = file_name.strip_suffix(".jsonl")?; + if stem.len() < 37 { + return None; + } + let uuid_start = stem.len().saturating_sub(36); + if !stem[..uuid_start].ends_with('-') { + return None; + } + ThreadId::from_string(&stem[uuid_start..]).ok() +} + +#[allow(clippy::too_many_arguments)] +fn summary_from_state_db_metadata( + conversation_id: ThreadId, + path: PathBuf, + first_user_message: Option, + timestamp: String, + updated_at: String, + model_provider: String, + cwd: PathBuf, + cli_version: String, + source: String, + git_sha: Option, + git_branch: Option, + git_origin_url: Option, +) -> ConversationSummary { + let preview = first_user_message.unwrap_or_default(); + let source = serde_json::from_value(serde_json::Value::String(source)) + .unwrap_or(codex_protocol::protocol::SessionSource::Unknown); + let git_info = if git_sha.is_none() && git_branch.is_none() && git_origin_url.is_none() { + None + } else { + Some(ConversationGitInfo { + sha: git_sha, + branch: git_branch, + origin_url: git_origin_url, + }) + }; + ConversationSummary { + conversation_id, + path, + preview, + timestamp: Some(timestamp), + updated_at: Some(updated_at), + model_provider, + cwd, + cli_version, + source, + git_info, + } +} + pub(crate) async fn read_summary_from_rollout( path: &Path, fallback_provider: &str, diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 092bb22c65..940402902e 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -15,9 +15,7 @@ use uuid::Uuid; use super::ARCHIVED_SESSIONS_SUBDIR; use super::SESSIONS_SUBDIR; -use crate::instructions::UserInstructions; use crate::protocol::EventMsg; -use crate::session_prefix::is_session_prefix_content; use crate::state_db; use codex_file_search as file_search; use codex_protocol::ThreadId; @@ -25,6 +23,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::USER_MESSAGE_BEGIN; /// Returned page of thread (thread) summaries. #[derive(Debug, Default, PartialEq)] @@ -44,8 +43,24 @@ pub struct ThreadsPage { pub struct ThreadItem { /// Absolute path to the rollout file. pub path: PathBuf, - /// First up to `HEAD_RECORD_LIMIT` JSONL records parsed as JSON (includes meta line). - pub head: Vec, + /// Thread ID from session metadata. + pub thread_id: Option, + /// First user message captured for this thread, if any. + pub first_user_message: Option, + /// Working directory from session metadata. + pub cwd: Option, + /// Git branch from session metadata. + pub git_branch: Option, + /// Git commit SHA from session metadata. + pub git_sha: Option, + /// Git origin URL from session metadata. + pub git_origin_url: Option, + /// Session source from session metadata. + pub source: Option, + /// Model provider from session metadata. + pub model_provider: Option, + /// CLI version from session metadata. + pub cli_version: Option, /// RFC3339 timestamp string for when the session was created, if available. /// created_at comes from the filename timestamp with second precision. pub created_at: Option, @@ -63,11 +78,17 @@ pub type ConversationsPage = ThreadsPage; #[derive(Default)] struct HeadTailSummary { - head: Vec, saw_session_meta: bool, saw_user_event: bool, + thread_id: Option, + first_user_message: Option, + cwd: Option, + git_branch: Option, + git_sha: Option, + git_origin_url: Option, source: Option, model_provider: Option, + cli_version: Option, created_at: Option, updated_at: Option, } @@ -674,7 +695,8 @@ async fn build_thread_item( if !allowed_sources.is_empty() && !summary .source - .is_some_and(|source| allowed_sources.contains(&source)) + .as_ref() + .is_some_and(|source| allowed_sources.contains(source)) { return None; } @@ -686,7 +708,15 @@ async fn build_thread_item( // Apply filters: must have session meta and at least one user message event if summary.saw_session_meta && summary.saw_user_event { let HeadTailSummary { - head, + thread_id, + first_user_message, + cwd, + git_branch, + git_sha, + git_origin_url, + source, + model_provider, + cli_version, created_at, updated_at: mut summary_updated_at, .. @@ -696,7 +726,15 @@ async fn build_thread_item( } return Some(ThreadItem { path, - head, + thread_id, + first_user_message, + cwd, + git_branch, + git_sha, + git_origin_url, + source, + model_provider, + cli_version, created_at, updated_at: summary_updated_at, }); @@ -979,31 +1017,29 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result { summary.source = Some(session_meta_line.meta.source.clone()); summary.model_provider = session_meta_line.meta.model_provider.clone(); + summary.thread_id = Some(session_meta_line.meta.id); + summary.cwd = Some(session_meta_line.meta.cwd.clone()); + summary.git_branch = session_meta_line + .git + .as_ref() + .and_then(|git| git.branch.clone()); + summary.git_sha = session_meta_line + .git + .as_ref() + .and_then(|git| git.commit_hash.clone()); + summary.git_origin_url = session_meta_line + .git + .as_ref() + .and_then(|git| git.repository_url.clone()); + summary.cli_version = Some(session_meta_line.meta.cli_version); summary.created_at = Some(session_meta_line.meta.timestamp.clone()); summary.saw_session_meta = true; - if summary.head.len() < head_limit - && let Ok(val) = serde_json::to_value(session_meta_line) - { - summary.head.push(val); - } } - RolloutItem::ResponseItem(item) => { + RolloutItem::ResponseItem(_) => { summary.created_at = summary .created_at .clone() .or_else(|| Some(rollout_line.timestamp.clone())); - if let codex_protocol::models::ResponseItem::Message { role, content, .. } = &item - && role == "user" - && !UserInstructions::is_user_instructions(content.as_slice()) - && !is_session_prefix_content(content.as_slice()) - { - summary.saw_user_event = true; - } - if summary.head.len() < head_limit - && let Ok(val) = serde_json::to_value(item) - { - summary.head.push(val); - } } RolloutItem::TurnContext(_) => { // Not included in `head`; skip. @@ -1012,8 +1048,14 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result { - if matches!(ev, EventMsg::UserMessage(_)) { + if let EventMsg::UserMessage(user) = ev { summary.saw_user_event = true; + if summary.first_user_message.is_none() { + let message = strip_user_message_prefix(user.message.as_str()).to_string(); + if !message.is_empty() { + summary.first_user_message = Some(message); + } + } } } } @@ -1029,8 +1071,48 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result io::Result> { - let summary = read_head_summary(path, HEAD_RECORD_LIMIT).await?; - Ok(summary.head) + use tokio::io::AsyncBufReadExt; + + let file = tokio::fs::File::open(path).await?; + let reader = tokio::io::BufReader::new(file); + let mut lines = reader.lines(); + let mut head = Vec::new(); + + while head.len() < HEAD_RECORD_LIMIT { + let Some(line) = lines.next_line().await? else { + break; + }; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + if let Ok(rollout_line) = serde_json::from_str::(trimmed) { + match rollout_line.item { + RolloutItem::SessionMeta(session_meta_line) => { + if let Ok(value) = serde_json::to_value(session_meta_line) { + head.push(value); + } + } + RolloutItem::ResponseItem(item) => { + if let Ok(value) = serde_json::to_value(item) { + head.push(value); + } + } + RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) => {} + } + } + } + + Ok(head) +} + +fn strip_user_message_prefix(text: &str) -> &str { + match text.find(USER_MESSAGE_BEGIN) { + Some(idx) => text[idx + USER_MESSAGE_BEGIN.len()..].trim(), + None => text.trim(), + } } /// Read the SessionMetaLine from the head of a rollout file for reuse by diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index b87879f4c7..dc87c74da4 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -44,6 +44,7 @@ pub(crate) fn builder_from_session_meta( ); builder.model_provider = session_meta.meta.model_provider.clone(); builder.cwd = session_meta.meta.cwd.clone(); + builder.cli_version = Some(session_meta.meta.cli_version.clone()); builder.sandbox_policy = SandboxPolicy::ReadOnly; builder.approval_mode = AskForApproval::OnRequest; if let Some(git) = session_meta.git.as_ref() { diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 0817577b28..c42bbb7974 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -32,7 +32,6 @@ use super::list::ThreadSortKey; use super::list::ThreadsPage; use super::list::get_threads; use super::list::get_threads_in_root; -use super::list::read_head_for_summary; use super::metadata; use super::policy::is_persisted_response_item; use crate::config::Config; @@ -184,9 +183,7 @@ impl RolloutRecorder { ) .await { - let mut page: ThreadsPage = db_page.into(); - populate_thread_heads(page.items.as_mut_slice()).await; - return Ok(page); + return Ok(db_page.into()); } tracing::error!("Falling back on rollout system"); state_db::record_discrepancy("list_threads_with_db_fallback", "falling_back"); @@ -232,9 +229,37 @@ impl RolloutRecorder { default_provider: &str, filter_cwd: Option<&Path>, ) -> std::io::Result> { + let state_db_ctx = state_db::open_if_present(codex_home, default_provider).await; + if state_db_ctx.is_some() { + let mut db_cursor = cursor.cloned(); + loop { + let Some(db_page) = state_db::list_threads_db( + state_db_ctx.as_deref(), + codex_home, + page_size, + db_cursor.as_ref(), + sort_key, + allowed_sources, + model_providers, + false, + ) + .await + else { + break; + }; + if let Some(path) = select_resume_path_from_db_page(&db_page, filter_cwd) { + return Ok(Some(path)); + } + db_cursor = db_page.next_anchor.map(Into::into); + if db_cursor.is_none() { + break; + } + } + } + let mut cursor = cursor.cloned(); loop { - let page = Self::list_threads( + let page = get_threads( codex_home, page_size, cursor.as_ref(), @@ -659,7 +684,18 @@ impl From for ThreadsPage { .into_iter() .map(|item| ThreadItem { path: item.rollout_path, - head: Vec::new(), + thread_id: Some(item.id), + first_user_message: item.first_user_message, + cwd: Some(item.cwd), + git_branch: item.git_branch, + git_sha: item.git_sha, + git_origin_url: item.git_origin_url, + source: Some( + serde_json::from_value(Value::String(item.source)) + .unwrap_or(SessionSource::Unknown), + ), + model_provider: Some(item.model_provider), + cli_version: Some(item.cli_version), created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Secs, true)), }) @@ -673,24 +709,14 @@ impl From for ThreadsPage { } } -async fn populate_thread_heads(items: &mut [ThreadItem]) { - for item in items { - item.head = read_head_for_summary(item.path.as_path()) - .await - .unwrap_or_else(|err| { - warn!( - "failed to read rollout head from state db path: {} ({err})", - item.path.display() - ); - Vec::new() - }); - } -} - fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option { match filter_cwd { Some(cwd) => page.items.iter().find_map(|item| { - if session_cwd_matches(&item.head, cwd) { + if item + .cwd + .as_ref() + .is_some_and(|session_cwd| cwd_matches(session_cwd, cwd)) + { Some(item.path.clone()) } else { None @@ -700,22 +726,28 @@ fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option

bool { - let Some(session_cwd) = extract_session_cwd(head) else { - return false; - }; +fn select_resume_path_from_db_page( + page: &codex_state::ThreadsPage, + filter_cwd: Option<&Path>, +) -> Option { + match filter_cwd { + Some(cwd) => page.items.iter().find_map(|item| { + if cwd_matches(item.cwd.as_path(), cwd) { + Some(item.rollout_path.clone()) + } else { + None + } + }), + None => page.items.first().map(|item| item.rollout_path.clone()), + } +} + +fn cwd_matches(session_cwd: &Path, cwd: &Path) -> bool { if let (Ok(ca), Ok(cb)) = ( - path_utils::normalize_for_path_comparison(&session_cwd), + path_utils::normalize_for_path_comparison(session_cwd), path_utils::normalize_for_path_comparison(cwd), ) { return ca == cb; } session_cwd == cwd } - -fn extract_session_cwd(head: &[serde_json::Value]) -> Option { - head.iter().find_map(|value| { - let meta_line = serde_json::from_value::(value.clone()).ok()?; - Some(meta_line.meta.cwd) - }) -} diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index c4d93415ba..ccd476b8cf 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -23,6 +23,7 @@ use crate::rollout::list::ThreadItem; use crate::rollout::list::ThreadSortKey; use crate::rollout::list::ThreadsPage; use crate::rollout::list::get_threads; +use crate::rollout::list::read_head_for_summary; use crate::rollout::recorder::RolloutRecorder; use crate::rollout::rollout_date_parts; use anyhow::Result; @@ -47,6 +48,10 @@ fn provider_vec(providers: &[&str]) -> Vec { .collect() } +fn thread_id_from_uuid(uuid: Uuid) -> ThreadId { + ThreadId::from_string(&uuid.to_string()).expect("valid thread id") +} + async fn insert_state_db_thread( home: &Path, thread_id: ThreadId, @@ -73,7 +78,7 @@ async fn insert_state_db_thread( builder.archived_at = Some(created_at); } let mut metadata = builder.build(TEST_PROVIDER); - metadata.has_user_event = true; + metadata.first_user_message = Some("Hello from user".to_string()); runtime .upsert_thread(&metadata) .await @@ -115,6 +120,12 @@ async fn list_threads_prefers_state_db_when_available() { assert_eq!(page.items.len(), 1); assert_eq!(page.items[0].path, db_rollout_path); + assert_eq!(page.items[0].thread_id, Some(db_thread_id)); + assert_eq!(page.items[0].cwd, Some(home.to_path_buf())); + assert_eq!( + page.items[0].first_user_message.as_deref(), + Some("Hello from user") + ); } #[tokio::test] @@ -559,37 +570,6 @@ async fn test_list_conversations_latest_first() { .join("01") .join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl")); - let head_3 = vec![serde_json::json!({ - "id": u3, - "timestamp": "2025-01-03T12-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; - let head_2 = vec![serde_json::json!({ - "id": u2, - "timestamp": "2025-01-02T12-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; - let head_1 = vec![serde_json::json!({ - "id": u1, - "timestamp": "2025-01-01T12-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; - let updated_times: Vec> = page.items.iter().map(|i| i.updated_at.clone()).collect(); @@ -597,19 +577,43 @@ async fn test_list_conversations_latest_first() { items: vec![ ThreadItem { path: p1, - head: head_3, + thread_id: Some(thread_id_from_uuid(u3)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-01-03T12-00-00".into()), updated_at: updated_times.first().cloned().flatten(), }, ThreadItem { path: p2, - head: head_2, + thread_id: Some(thread_id_from_uuid(u2)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-01-02T12-00-00".into()), updated_at: updated_times.get(1).cloned().flatten(), }, ThreadItem { path: p3, - head: head_1, + thread_id: Some(thread_id_from_uuid(u1)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-01-01T12-00-00".into()), updated_at: updated_times.get(2).cloned().flatten(), }, @@ -700,26 +704,6 @@ async fn test_pagination_cursor() { .join("03") .join("04") .join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl")); - let head_5 = vec![serde_json::json!({ - "id": u5, - "timestamp": "2025-03-05T09-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; - let head_4 = vec![serde_json::json!({ - "id": u4, - "timestamp": "2025-03-04T09-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; let updated_page1: Vec> = page1.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor1: Cursor = @@ -728,13 +712,29 @@ async fn test_pagination_cursor() { items: vec![ ThreadItem { path: p5, - head: head_5, + thread_id: Some(thread_id_from_uuid(u5)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-03-05T09-00-00".into()), updated_at: updated_page1.first().cloned().flatten(), }, ThreadItem { path: p4, - head: head_4, + thread_id: Some(thread_id_from_uuid(u4)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-03-04T09-00-00".into()), updated_at: updated_page1.get(1).cloned().flatten(), }, @@ -768,26 +768,6 @@ async fn test_pagination_cursor() { .join("03") .join("02") .join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl")); - let head_3 = vec![serde_json::json!({ - "id": u3, - "timestamp": "2025-03-03T09-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; - let head_2 = vec![serde_json::json!({ - "id": u2, - "timestamp": "2025-03-02T09-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; let updated_page2: Vec> = page2.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor2: Cursor = @@ -796,13 +776,29 @@ async fn test_pagination_cursor() { items: vec![ ThreadItem { path: p3, - head: head_3, + thread_id: Some(thread_id_from_uuid(u3)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-03-03T09-00-00".into()), updated_at: updated_page2.first().cloned().flatten(), }, ThreadItem { path: p2, - head: head_2, + thread_id: Some(thread_id_from_uuid(u2)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-03-02T09-00-00".into()), updated_at: updated_page2.get(1).cloned().flatten(), }, @@ -830,22 +826,20 @@ async fn test_pagination_cursor() { .join("03") .join("01") .join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl")); - let head_1 = vec![serde_json::json!({ - "id": u1, - "timestamp": "2025-03-01T09-00-00", - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; let updated_page3: Vec> = page3.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_page3 = ThreadsPage { items: vec![ThreadItem { path: p1, - head: head_1, + thread_id: Some(thread_id_from_uuid(u1)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some("2025-03-01T09-00-00".into()), updated_at: updated_page3.first().cloned().flatten(), }], @@ -913,20 +907,18 @@ async fn test_get_thread_contents() { .join("04") .join("01") .join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl")); - let expected_head = vec![serde_json::json!({ - "id": uuid, - "timestamp": ts, - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })]; let expected_page = ThreadsPage { items: vec![ThreadItem { path: expected_path, - head: expected_head, + thread_id: Some(thread_id_from_uuid(uuid)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some(ts.into()), updated_at: page.items[0].updated_at.clone(), }], @@ -993,13 +985,12 @@ async fn test_base_instructions_missing_in_meta_defaults_to_null() { .await .unwrap(); - let head = page - .items - .first() - .and_then(|item| item.head.first()) + let head = read_head_for_summary(&page.items[0].path) + .await .expect("session meta head"); + let first = head.first().expect("first head entry"); assert_eq!( - head.get("base_instructions"), + first.get("base_instructions"), Some(&serde_json::Value::Null) ); } @@ -1037,12 +1028,11 @@ async fn test_base_instructions_present_in_meta_is_preserved() { .await .unwrap(); - let head = page - .items - .first() - .and_then(|item| item.head.first()) + let head = read_head_for_summary(&page.items[0].path) + .await .expect("session meta head"); - let base = head + let first = head.first().expect("first head entry"); + let base = first .get("base_instructions") .and_then(|value| value.get("text")) .and_then(serde_json::Value::as_str); @@ -1222,18 +1212,6 @@ async fn test_stable_ordering_same_second_pagination() { .join("07") .join("01") .join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl")); - let head = |u: Uuid| -> Vec { - vec![serde_json::json!({ - "id": u, - "timestamp": ts, - "cwd": ".", - "originator": "test_originator", - "cli_version": "test_version", - "source": "vscode", - "model_provider": "test-provider", - "base_instructions": null, - })] - }; let updated_page1: Vec> = page1.items.iter().map(|i| i.updated_at.clone()).collect(); let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap(); @@ -1241,13 +1219,29 @@ async fn test_stable_ordering_same_second_pagination() { items: vec![ ThreadItem { path: p3, - head: head(u3), + thread_id: Some(thread_id_from_uuid(u3)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some(ts.to_string()), updated_at: updated_page1.first().cloned().flatten(), }, ThreadItem { path: p2, - head: head(u2), + thread_id: Some(thread_id_from_uuid(u2)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some(ts.to_string()), updated_at: updated_page1.get(1).cloned().flatten(), }, @@ -1280,7 +1274,15 @@ async fn test_stable_ordering_same_second_pagination() { let expected_page2 = ThreadsPage { items: vec![ThreadItem { path: p1, - head: head(u1), + thread_id: Some(thread_id_from_uuid(u1)), + first_user_message: Some("Hello from user".to_string()), + cwd: Some(Path::new(".").to_path_buf()), + git_branch: None, + git_sha: None, + git_origin_url: None, + source: Some(SessionSource::VSCode), + model_provider: Some(TEST_PROVIDER.to_string()), + cli_version: Some("test_version".to_string()), created_at: Some(ts.to_string()), updated_at: updated_page2.first().cloned().flatten(), }], @@ -1415,13 +1417,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( let openai_ids: Vec<_> = openai_sessions .items .iter() - .filter_map(|item| { - item.head - .first() - .and_then(|value| value.get("id")) - .and_then(serde_json::Value::as_str) - .map(str::to_string) - }) + .filter_map(|item| item.thread_id.as_ref().map(ToString::to_string)) .collect(); assert!(openai_ids.contains(&openai_id_str)); assert!(openai_ids.contains(&none_id_str)); @@ -1442,10 +1438,8 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( let beta_head = beta_sessions .items .first() - .and_then(|item| item.head.first()) - .and_then(|value| value.get("id")) - .and_then(serde_json::Value::as_str); - assert_eq!(beta_head, Some(beta_id_str.as_str())); + .and_then(|item| item.thread_id.as_ref().map(ToString::to_string)); + assert_eq!(beta_head.as_deref(), Some(beta_id_str.as_str())); let unknown_filter = provider_vec(&["unknown"]); let unknown_sessions = get_threads( diff --git a/codex-rs/core/src/session_prefix.rs b/codex-rs/core/src/session_prefix.rs index 198f5dff90..99283082b6 100644 --- a/codex-rs/core/src/session_prefix.rs +++ b/codex-rs/core/src/session_prefix.rs @@ -1,5 +1,3 @@ -use codex_protocol::models::ContentItem; - /// Helpers for identifying model-visible "session prefix" messages. /// /// A session prefix is a user-role message that carries configuration or state needed by @@ -15,12 +13,3 @@ pub(crate) fn is_session_prefix(text: &str) -> bool { let lowered = trimmed.to_ascii_lowercase(); lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG) } - -/// Returns true if `text` starts with a session prefix marker (case-insensitive). -pub(crate) fn is_session_prefix_content(content: &[ContentItem]) -> bool { - if let [ContentItem::InputText { text }] = content { - is_session_prefix(text) - } else { - false - } -} diff --git a/codex-rs/core/tests/suite/cli_stream.rs b/codex-rs/core/tests/suite/cli_stream.rs index 291f596e27..106e2ff148 100644 --- a/codex-rs/core/tests/suite/cli_stream.rs +++ b/codex-rs/core/tests/suite/cli_stream.rs @@ -85,13 +85,8 @@ async fn responses_mode_stream_cli() { !page.items.is_empty(), "expected at least one session to be listed" ); - // First line of head must be the SessionMeta payload (id/timestamp) - let head0 = page.items[0].head.first().expect("missing head record"); - assert!(head0.get("id").is_some(), "head[0] missing id"); - assert!( - head0.get("timestamp").is_some(), - "head[0] missing timestamp" - ); + assert!(page.items[0].thread_id.is_some(), "missing thread_id"); + assert!(page.items[0].created_at.is_some(), "missing created_at"); } /// Verify that passing `-c model_instructions_file=...` to the CLI diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 07268f4fdb..688b034ac7 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -169,7 +169,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { assert_eq!(metadata.id, thread_id); assert_eq!(metadata.rollout_path, rollout_path); assert_eq!(metadata.model_provider, default_provider); - assert!(metadata.has_user_event); + assert!(metadata.first_user_message.is_some()); let mut stored_tools = None; for _ in 0..40 { @@ -221,7 +221,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> { metadata = db.get_thread(thread_id).await?; if metadata .as_ref() - .map(|entry| entry.has_user_event) + .map(|entry| entry.first_user_message.is_some()) .unwrap_or(false) { break; @@ -230,7 +230,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> { } let metadata = metadata.expect("thread should exist in state db"); - assert!(metadata.has_user_event); + assert!(metadata.first_user_message.is_some()); Ok(()) } diff --git a/codex-rs/state/migrations/0005_threads_cli_version.sql b/codex-rs/state/migrations/0005_threads_cli_version.sql new file mode 100644 index 0000000000..8891562d90 --- /dev/null +++ b/codex-rs/state/migrations/0005_threads_cli_version.sql @@ -0,0 +1 @@ +ALTER TABLE threads ADD COLUMN cli_version TEXT NOT NULL DEFAULT ''; diff --git a/codex-rs/state/migrations/0007_threads_first_user_message.sql b/codex-rs/state/migrations/0007_threads_first_user_message.sql new file mode 100644 index 0000000000..5e9a7649bb --- /dev/null +++ b/codex-rs/state/migrations/0007_threads_first_user_message.sql @@ -0,0 +1,5 @@ +ALTER TABLE threads ADD COLUMN first_user_message TEXT NOT NULL DEFAULT ''; + +UPDATE threads +SET first_user_message = title +WHERE first_user_message = '' AND has_user_event = 1 AND title <> ''; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 753d8e426d..f8f9cb5251 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -5,9 +5,12 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::USER_MESSAGE_BEGIN; +use codex_protocol::protocol::UserMessageEvent; use serde::Serialize; use serde_json::Value; +const IMAGE_ONLY_USER_MESSAGE_PLACEHOLDER: &str = "[Image]"; + /// Apply a rollout item to the metadata structure. pub fn apply_rollout_item( metadata: &mut ThreadMetadata, @@ -37,6 +40,9 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi if let Some(provider) = meta_line.meta.model_provider.as_deref() { metadata.model_provider = provider.to_string(); } + if !meta_line.meta.cli_version.is_empty() { + metadata.cli_version = meta_line.meta.cli_version.clone(); + } if !meta_line.meta.cwd.as_os_str().is_empty() { metadata.cwd = meta_line.meta.cwd.clone(); } @@ -61,9 +67,14 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) { } } EventMsg::UserMessage(user) => { - metadata.has_user_event = true; + if metadata.first_user_message.is_none() { + metadata.first_user_message = user_message_preview(user); + } if metadata.title.is_empty() { - metadata.title = strip_user_message_prefix(user.message.as_str()).to_string(); + let title = strip_user_message_prefix(user.message.as_str()); + if !title.is_empty() { + metadata.title = title.to_string(); + } } } _ => {} @@ -71,7 +82,7 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) { } fn apply_response_item(_metadata: &mut ThreadMetadata, _item: &ResponseItem) { - // Title and has_user_event are derived from EventMsg::UserMessage only. + // Title and first_user_message are derived from EventMsg::UserMessage only. } fn strip_user_message_prefix(text: &str) -> &str { @@ -81,6 +92,22 @@ fn strip_user_message_prefix(text: &str) -> &str { } } +fn user_message_preview(user: &UserMessageEvent) -> Option { + let message = strip_user_message_prefix(user.message.as_str()); + if !message.is_empty() { + return Some(message.to_string()); + } + if user + .images + .as_ref() + .is_some_and(|images| !images.is_empty()) + || !user.local_images.is_empty() + { + return Some(IMAGE_ONLY_USER_MESSAGE_PLACEHOLDER.to_string()); + } + None +} + pub(crate) fn enum_to_string(value: &T) -> String { match serde_json::to_value(value) { Ok(Value::String(s)) => s, @@ -108,7 +135,7 @@ mod tests { use uuid::Uuid; #[test] - fn response_item_user_messages_do_not_set_title_or_has_user_event() { + fn response_item_user_messages_do_not_set_title_or_first_user_message() { let mut metadata = metadata_for_test(); let item = RolloutItem::ResponseItem(ResponseItem::Message { id: None, @@ -122,12 +149,12 @@ mod tests { apply_rollout_item(&mut metadata, &item, "test-provider"); - assert_eq!(metadata.has_user_event, false); + assert_eq!(metadata.first_user_message, None); assert_eq!(metadata.title, ""); } #[test] - fn event_msg_user_messages_set_title_and_has_user_event() { + fn event_msg_user_messages_set_title_and_first_user_message() { let mut metadata = metadata_for_test(); let item = RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: format!("{USER_MESSAGE_BEGIN} actual user request"), @@ -138,10 +165,48 @@ mod tests { apply_rollout_item(&mut metadata, &item, "test-provider"); - assert_eq!(metadata.has_user_event, true); + assert_eq!( + metadata.first_user_message.as_deref(), + Some("actual user request") + ); assert_eq!(metadata.title, "actual user request"); } + #[test] + fn event_msg_image_only_user_message_sets_image_placeholder_preview() { + let mut metadata = metadata_for_test(); + let item = RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: String::new(), + images: Some(vec!["https://example.com/image.png".to_string()]), + local_images: vec![], + text_elements: vec![], + })); + + apply_rollout_item(&mut metadata, &item, "test-provider"); + + assert_eq!( + metadata.first_user_message.as_deref(), + Some(super::IMAGE_ONLY_USER_MESSAGE_PLACEHOLDER) + ); + assert_eq!(metadata.title, ""); + } + + #[test] + fn event_msg_blank_user_message_without_images_keeps_first_user_message_empty() { + let mut metadata = metadata_for_test(); + let item = RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: " ".to_string(), + images: Some(vec![]), + local_images: vec![], + text_elements: vec![], + })); + + apply_rollout_item(&mut metadata, &item, "test-provider"); + + assert_eq!(metadata.first_user_message, None); + assert_eq!(metadata.title, ""); + } + fn metadata_for_test() -> ThreadMetadata { let id = ThreadId::from_string(&Uuid::from_u128(42).to_string()).expect("thread id"); let created_at = DateTime::::from_timestamp(1_735_689_600, 0).expect("timestamp"); @@ -153,11 +218,12 @@ mod tests { source: "cli".to_string(), model_provider: "openai".to_string(), cwd: PathBuf::from("/tmp"), + cli_version: "0.0.0".to_string(), title: String::new(), sandbox_policy: "read-only".to_string(), approval_mode: "on-request".to_string(), tokens_used: 1, - has_user_event: false, + first_user_message: None, archived_at: None, git_sha: None, git_branch: None, diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index 7d475efffc..2577ead502 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -66,6 +66,8 @@ pub struct ThreadMetadata { pub model_provider: String, /// The working directory for the thread. pub cwd: PathBuf, + /// Version of the CLI that created the thread. + pub cli_version: String, /// A best-effort thread title. pub title: String, /// The sandbox policy (stringified enum). @@ -74,8 +76,8 @@ pub struct ThreadMetadata { pub approval_mode: String, /// The last observed token usage. pub tokens_used: i64, - /// Whether the thread has observed a user message. - pub has_user_event: bool, + /// First user message observed for this thread, if any. + pub first_user_message: Option, /// The archive timestamp, if the thread is archived. pub archived_at: Option>, /// The git commit SHA, if known. @@ -103,6 +105,8 @@ pub struct ThreadMetadataBuilder { pub model_provider: Option, /// The working directory for the thread. pub cwd: PathBuf, + /// Version of the CLI that created the thread. + pub cli_version: Option, /// The sandbox policy. pub sandbox_policy: SandboxPolicy, /// The approval mode. @@ -133,6 +137,7 @@ impl ThreadMetadataBuilder { source, model_provider: None, cwd: PathBuf::new(), + cli_version: None, sandbox_policy: SandboxPolicy::ReadOnly, approval_mode: AskForApproval::OnRequest, archived_at: None, @@ -163,11 +168,12 @@ impl ThreadMetadataBuilder { .clone() .unwrap_or_else(|| default_provider.to_string()), cwd: self.cwd.clone(), + cli_version: self.cli_version.clone().unwrap_or_default(), title: String::new(), sandbox_policy, approval_mode, tokens_used: 0, - has_user_event: false, + first_user_message: None, archived_at: self.archived_at.map(canonicalize_datetime), git_sha: self.git_sha.clone(), git_branch: self.git_branch.clone(), @@ -201,6 +207,9 @@ impl ThreadMetadata { if self.cwd != other.cwd { diffs.push("cwd"); } + if self.cli_version != other.cli_version { + diffs.push("cli_version"); + } if self.title != other.title { diffs.push("title"); } @@ -213,8 +222,8 @@ impl ThreadMetadata { if self.tokens_used != other.tokens_used { diffs.push("tokens_used"); } - if self.has_user_event != other.has_user_event { - diffs.push("has_user_event"); + if self.first_user_message != other.first_user_message { + diffs.push("first_user_message"); } if self.archived_at != other.archived_at { diffs.push("archived_at"); @@ -245,11 +254,12 @@ pub(crate) struct ThreadRow { source: String, model_provider: String, cwd: String, + cli_version: String, title: String, sandbox_policy: String, approval_mode: String, tokens_used: i64, - has_user_event: bool, + first_user_message: String, archived_at: Option, git_sha: Option, git_branch: Option, @@ -266,11 +276,12 @@ impl ThreadRow { source: row.try_get("source")?, model_provider: row.try_get("model_provider")?, cwd: row.try_get("cwd")?, + cli_version: row.try_get("cli_version")?, title: row.try_get("title")?, sandbox_policy: row.try_get("sandbox_policy")?, approval_mode: row.try_get("approval_mode")?, tokens_used: row.try_get("tokens_used")?, - has_user_event: row.try_get("has_user_event")?, + first_user_message: row.try_get("first_user_message")?, archived_at: row.try_get("archived_at")?, git_sha: row.try_get("git_sha")?, git_branch: row.try_get("git_branch")?, @@ -291,11 +302,12 @@ impl TryFrom for ThreadMetadata { source, model_provider, cwd, + cli_version, title, sandbox_policy, approval_mode, tokens_used, - has_user_event, + first_user_message, archived_at, git_sha, git_branch, @@ -309,11 +321,12 @@ impl TryFrom for ThreadMetadata { source, model_provider, cwd: PathBuf::from(cwd), + cli_version, title, sandbox_policy, approval_mode, tokens_used, - has_user_event, + first_user_message: (!first_user_message.is_empty()).then_some(first_user_message), archived_at: archived_at.map(epoch_seconds_to_datetime).transpose()?, git_sha, git_branch, diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 384fc57cad..33ee7c844b 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -177,11 +177,12 @@ SELECT source, model_provider, cwd, + cli_version, title, sandbox_policy, approval_mode, tokens_used, - has_user_event, + first_user_message, archived_at, git_sha, git_branch, @@ -295,11 +296,12 @@ SELECT source, model_provider, cwd, + cli_version, title, sandbox_policy, approval_mode, tokens_used, - has_user_event, + first_user_message, archived_at, git_sha, git_branch, @@ -449,17 +451,18 @@ INSERT INTO threads ( source, model_provider, cwd, + cli_version, title, sandbox_policy, approval_mode, tokens_used, - has_user_event, + first_user_message, archived, archived_at, git_sha, git_branch, git_origin_url -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET rollout_path = excluded.rollout_path, created_at = excluded.created_at, @@ -467,11 +470,12 @@ ON CONFLICT(id) DO UPDATE SET source = excluded.source, model_provider = excluded.model_provider, cwd = excluded.cwd, + cli_version = excluded.cli_version, title = excluded.title, sandbox_policy = excluded.sandbox_policy, approval_mode = excluded.approval_mode, tokens_used = excluded.tokens_used, - has_user_event = excluded.has_user_event, + first_user_message = excluded.first_user_message, archived = excluded.archived, archived_at = excluded.archived_at, git_sha = excluded.git_sha, @@ -486,11 +490,12 @@ ON CONFLICT(id) DO UPDATE SET .bind(metadata.source.as_str()) .bind(metadata.model_provider.as_str()) .bind(metadata.cwd.display().to_string()) + .bind(metadata.cli_version.as_str()) .bind(metadata.title.as_str()) .bind(metadata.sandbox_policy.as_str()) .bind(metadata.approval_mode.as_str()) .bind(metadata.tokens_used) - .bind(metadata.has_user_event) + .bind(metadata.first_user_message.as_deref().unwrap_or_default()) .bind(metadata.archived_at.is_some()) .bind(metadata.archived_at.map(datetime_to_epoch_seconds)) .bind(metadata.git_sha.as_deref()) @@ -900,7 +905,7 @@ fn push_thread_filters<'a>( } else { builder.push(" AND archived = 0"); } - builder.push(" AND has_user_event = 1"); + builder.push(" AND first_user_message <> ''"); if !allowed_sources.is_empty() { builder.push(" AND source IN ("); let mut separated = builder.separated(", "); @@ -1391,11 +1396,12 @@ mod tests { source: "cli".to_string(), model_provider: "test-provider".to_string(), cwd, + cli_version: "0.0.0".to_string(), title: String::new(), sandbox_policy: crate::extract::enum_to_string(&SandboxPolicy::ReadOnly), approval_mode: crate::extract::enum_to_string(&AskForApproval::OnRequest), tokens_used: 0, - has_user_event: true, + first_user_message: Some("hello".to_string()), archived_at: None, git_sha: None, git_branch: None, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index e80c4ad793..f35acc07ff 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -14,7 +14,6 @@ use codex_core::ThreadSortKey; use codex_core::ThreadsPage; use codex_core::find_thread_names_by_ids; use codex_core::path_utils; -use codex_protocol::items::TurnItem; use color_eyre::eyre::Result; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -37,8 +36,6 @@ use crate::tui::FrameRequester; use crate::tui::Tui; use crate::tui::TuiEvent; use codex_protocol::ThreadId; -use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::SessionMetaLine; const PAGE_SIZE: usize = 25; const LOAD_NEAR_THRESHOLD: usize = 5; @@ -766,49 +763,33 @@ fn rows_from_items(items: Vec) -> Vec { } fn head_to_row(item: &ThreadItem) -> Row { - let created_at = item - .created_at - .as_deref() - .and_then(parse_timestamp_str) - .or_else(|| item.head.first().and_then(extract_timestamp)); + let created_at = item.created_at.as_deref().and_then(parse_timestamp_str); let updated_at = item .updated_at .as_deref() .and_then(parse_timestamp_str) .or(created_at); - let (cwd, git_branch, thread_id) = extract_session_meta_from_head(&item.head); - let preview = preview_from_head(&item.head) - .map(|s| s.trim().to_string()) + let preview = item + .first_user_message + .as_deref() + .map(str::trim) .filter(|s| !s.is_empty()) + .map(str::to_string) .unwrap_or_else(|| String::from("(no message yet)")); Row { path: item.path.clone(), preview, - thread_id, + thread_id: item.thread_id, thread_name: None, created_at, updated_at, - cwd, - git_branch, + cwd: item.cwd.clone(), + git_branch: item.git_branch.clone(), } } -fn extract_session_meta_from_head( - head: &[serde_json::Value], -) -> (Option, Option, Option) { - for value in head { - if let Ok(meta_line) = serde_json::from_value::(value.clone()) { - let cwd = Some(meta_line.meta.cwd); - let git_branch = meta_line.git.and_then(|git| git.branch); - let thread_id = Some(meta_line.meta.id); - return (cwd, git_branch, thread_id); - } - } - (None, None, None) -} - fn paths_match(a: &Path, b: &Path) -> bool { if let (Ok(ca), Ok(cb)) = ( path_utils::normalize_for_path_comparison(a), @@ -825,23 +806,6 @@ fn parse_timestamp_str(ts: &str) -> Option> { .ok() } -fn extract_timestamp(value: &serde_json::Value) -> Option> { - value - .get("timestamp") - .and_then(|v| v.as_str()) - .and_then(|t| chrono::DateTime::parse_from_rfc3339(t).ok()) - .map(|dt| dt.with_timezone(&Utc)) -} - -fn preview_from_head(head: &[serde_json::Value]) -> Option { - head.iter() - .filter_map(|value| serde_json::from_value::(value.clone()).ok()) - .find_map(|item| match codex_core::parse_turn_item(&item) { - Some(TurnItem::UserMessage(user)) => Some(user.message()), - _ => None, - }) -} - fn draw_picker(tui: &mut Tui, state: &PickerState) -> std::io::Result<()> { // Render full-screen overlay let height = tui.terminal.size()?.height; @@ -1200,24 +1164,18 @@ mod tests { use std::sync::Arc; use std::sync::Mutex; - fn head_with_ts_and_user_text(ts: &str, texts: &[&str]) -> Vec { - vec![ - json!({ "timestamp": ts }), - json!({ - "type": "message", - "role": "user", - "content": texts - .iter() - .map(|t| json!({ "type": "input_text", "text": *t })) - .collect::>() - }), - ] - } - fn make_item(path: &str, ts: &str, preview: &str) -> ThreadItem { ThreadItem { path: PathBuf::from(path), - head: head_with_ts_and_user_text(ts, &[preview]), + thread_id: None, + first_user_message: Some(preview.to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + model_provider: None, + cli_version: None, created_at: Some(ts.to_string()), updated_at: Some(ts.to_string()), } @@ -1243,39 +1201,23 @@ mod tests { } #[test] - fn preview_uses_first_message_input_text() { - let head = vec![ - json!({ "timestamp": "2025-01-01T00:00:00Z" }), - json!({ - "type": "message", - "role": "user", - "content": [ - { "type": "input_text", "text": "# AGENTS.md instructions for project\n\n\nhi\n" }, - ] - }), - json!({ - "type": "message", - "role": "user", - "content": [ - { "type": "input_text", "text": "..." }, - ] - }), - json!({ - "type": "message", - "role": "user", - "content": [ - { "type": "input_text", "text": "real question" }, - { "type": "input_image", "image_url": "ignored" } - ] - }), - json!({ - "type": "message", - "role": "user", - "content": [ { "type": "input_text", "text": "later text" } ] - }), - ]; - let preview = preview_from_head(&head); - assert_eq!(preview.as_deref(), Some("real question")); + fn head_to_row_uses_first_user_message() { + let item = ThreadItem { + path: PathBuf::from("/tmp/a.jsonl"), + thread_id: None, + first_user_message: Some("real question".to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + model_provider: None, + cli_version: None, + created_at: Some("2025-01-01T00:00:00Z".into()), + updated_at: Some("2025-01-01T00:00:00Z".into()), + }; + let row = head_to_row(&item); + assert_eq!(row.preview, "real question"); } #[test] @@ -1283,13 +1225,29 @@ mod tests { // Construct two items with different timestamps and real user text. let a = ThreadItem { path: PathBuf::from("/tmp/a.jsonl"), - head: head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["A"]), + thread_id: None, + first_user_message: Some("A".to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + model_provider: None, + cli_version: None, created_at: Some("2025-01-01T00:00:00Z".into()), updated_at: Some("2025-01-01T00:00:00Z".into()), }; let b = ThreadItem { path: PathBuf::from("/tmp/b.jsonl"), - head: head_with_ts_and_user_text("2025-01-02T00:00:00Z", &["B"]), + thread_id: None, + first_user_message: Some("B".to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + model_provider: None, + cli_version: None, created_at: Some("2025-01-02T00:00:00Z".into()), updated_at: Some("2025-01-02T00:00:00Z".into()), }; @@ -1302,10 +1260,17 @@ mod tests { #[test] fn row_uses_tail_timestamp_for_updated_at() { - let head = head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["Hello"]); let item = ThreadItem { path: PathBuf::from("/tmp/a.jsonl"), - head, + thread_id: None, + first_user_message: Some("Hello".to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + model_provider: None, + cli_version: None, created_at: Some("2025-01-01T00:00:00Z".into()), updated_at: Some("2025-01-01T01:00:00Z".into()), };