Compare commits

...

4 Commits

Author SHA1 Message Date
Joey Trasatti
829ce9f730 [codex-backend] Merge git metadata during local thread reads 2026-04-27 11:51:51 -07:00
Joey Trasatti
eb392628c6 Merge remote-tracking branch 'origin/main' into joeytrasatti-openai/codex-backend-thread-git-info-metadata
# Conflicts:
#	codex-rs/state/src/lib.rs
2026-04-27 11:42:33 -07:00
Joey Trasatti
4cf7633db9 [codex-backend] Apply git metadata overlay on list fallback 2026-04-27 11:38:42 -07:00
Joey Trasatti
ffcedfd570 [codex-backend] Centralize thread git metadata overlays 2026-04-24 15:29:22 -07:00
9 changed files with 439 additions and 131 deletions

View File

@@ -1287,6 +1287,70 @@ stream_max_retries = 0
Ok(())
}
#[tokio::test]
async fn thread_resume_by_path_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"Saved user message",
Some("mock_provider"),
/*git_info*/ None,
)?;
let rollout_file_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &thread_id);
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let update_id = mcp
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
thread_id: thread_id.clone(),
git_info: Some(ThreadMetadataGitInfoUpdateParams {
sha: None,
branch: Some(Some("feature/path-resume".to_string())),
origin_url: None,
}),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_id)),
)
.await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
path: Some(rollout_file_path),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(
thread
.git_info
.as_ref()
.and_then(|git| git.branch.as_deref()),
Some("feature/path-resume")
);
Ok(())
}
#[tokio::test]
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
{

View File

@@ -62,6 +62,8 @@ use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_state::StateRuntime;
use codex_state::ThreadGitInfoFields;
use codex_state::ThreadMetadata;
use codex_state::ThreadMetadataBuilder;
use codex_utils_path as path_utils;
@@ -365,7 +367,9 @@ impl RolloutRecorder {
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
let codex_home = config.codex_home();
let state_db_ctx = state_db::get_state_db(config).await;
let state_db = state_db::thread_list_state_db(config).await;
let state_db_ctx = state_db.completed;
let exact_lookup_state_db_ctx = state_db.exact_lookup;
let archived = match archive_filter {
ThreadListArchiveFilter::Active => false,
ThreadListArchiveFilter::Archived => true,
@@ -436,12 +440,12 @@ impl RolloutRecorder {
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(overlay_thread_item_metadata_from_state_db(
exact_lookup_state_db_ctx.as_ref(),
page,
)
.await);
}
// Warm the DB by repairing every filesystem hit before querying SQLite.
@@ -529,8 +533,8 @@ impl RolloutRecorder {
.await;
}
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
return Ok(overlay_thread_item_metadata_from_state_db(
exact_lookup_state_db_ctx.as_ref(),
page,
)
.await);
@@ -539,8 +543,8 @@ impl RolloutRecorder {
}
if listing_has_metadata_filters {
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
return Ok(overlay_thread_item_metadata_from_state_db(
exact_lookup_state_db_ctx.as_ref(),
page,
)
.await);
@@ -548,12 +552,11 @@ impl RolloutRecorder {
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
))
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
Ok(
overlay_thread_item_metadata_from_state_db(exact_lookup_state_db_ctx.as_ref(), page)
.await,
)
}
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
@@ -978,11 +981,11 @@ fn page_from_filesystem_scan(
}
}
async fn fill_missing_thread_item_metadata_from_state_db(
state_db_ctx: Option<&StateRuntime>,
async fn overlay_thread_item_metadata_from_state_db(
exact_lookup: Option<&state_db::ExactThreadMetadataLookup>,
mut page: ThreadsPage,
) -> ThreadsPage {
let Some(state_db_ctx) = state_db_ctx else {
let Some(exact_lookup) = exact_lookup else {
return page;
};
@@ -990,7 +993,7 @@ async fn fill_missing_thread_item_metadata_from_state_db(
let Some(thread_id) = item.thread_id else {
continue;
};
let metadata = match state_db_ctx.get_thread(thread_id).await {
let metadata = match exact_lookup.get_thread(thread_id).await {
Ok(Some(metadata)) => metadata,
Ok(None) => continue,
Err(err) => {
@@ -1000,65 +1003,60 @@ async fn fill_missing_thread_item_metadata_from_state_db(
continue;
}
};
fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata));
overlay_thread_item_metadata_from_state(item, &metadata);
}
page
}
fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadItem) {
let ThreadItem {
path: _state_path,
thread_id: _state_thread_id,
first_user_message,
cwd,
git_branch,
git_sha,
git_origin_url,
source,
agent_nickname,
agent_role,
model_provider,
cli_version,
created_at,
updated_at,
} = state_item;
fn overlay_thread_item_metadata_from_state(item: &mut ThreadItem, metadata: &ThreadMetadata) {
if item.first_user_message.is_none() {
item.first_user_message = first_user_message;
item.first_user_message = metadata.first_user_message.clone();
}
if item.cwd.is_none() {
item.cwd = cwd;
}
if item.git_branch.is_none() {
item.git_branch = git_branch;
}
if item.git_sha.is_none() {
item.git_sha = git_sha;
}
if item.git_origin_url.is_none() {
item.git_origin_url = git_origin_url;
item.cwd = Some(metadata.cwd.clone());
}
let mut git_info = ThreadGitInfoFields::new(
item.git_sha.clone(),
item.git_branch.clone(),
item.git_origin_url.clone(),
);
git_info.overlay_non_null(&metadata.git_info_fields());
item.git_sha = git_info.sha;
item.git_branch = git_info.branch;
item.git_origin_url = git_info.origin_url;
if item.source.is_none() {
item.source = source;
item.source = Some(
serde_json::from_str(metadata.source.as_str())
.or_else(|_| serde_json::from_value(Value::String(metadata.source.clone())))
.unwrap_or(SessionSource::Unknown),
);
}
if item.agent_nickname.is_none() {
item.agent_nickname = agent_nickname;
item.agent_nickname = metadata.agent_nickname.clone();
}
if item.agent_role.is_none() {
item.agent_role = agent_role;
item.agent_role = metadata.agent_role.clone();
}
if item.model_provider.is_none() {
item.model_provider = model_provider;
item.model_provider = Some(metadata.model_provider.clone());
}
if item.cli_version.is_none() {
item.cli_version = cli_version;
item.cli_version = Some(metadata.cli_version.clone());
}
if item.created_at.is_none() {
item.created_at = created_at;
item.created_at = Some(
metadata
.created_at
.to_rfc3339_opts(SecondsFormat::Millis, true),
);
}
if item.updated_at.is_none() {
item.updated_at = updated_at;
item.updated_at = Some(
metadata
.updated_at
.to_rfc3339_opts(SecondsFormat::Millis, true),
);
}
}

View File

@@ -778,8 +778,67 @@ async fn list_threads_metadata_filter_overlays_state_db_list_metadata() -> std::
Ok(())
}
#[tokio::test]
async fn list_threads_overlays_state_db_git_info_before_backfill_complete() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let uuid = Uuid::from_u128(9016);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = write_session_file(home.path(), "2025-01-03T17-00-00", uuid)?;
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 17, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
rollout_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
builder.git_branch = Some("sqlite-branch".to_string());
builder.git_sha = Some("sqlite-sha".to_string());
builder.git_origin_url = Some("https://example.com/repo.git".to_string());
runtime
.upsert_thread(&builder.build(config.model_provider_id.as_str()))
.await
.expect("state db upsert should succeed");
let page = RolloutRecorder::list_threads(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].git_branch.as_deref(), Some("sqlite-branch"));
assert_eq!(page.items[0].git_sha.as_deref(), Some("sqlite-sha"));
assert_eq!(
page.items[0].git_origin_url.as_deref(),
Some("https://example.com/repo.git")
);
Ok(())
}
#[test]
fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
fn overlay_thread_item_metadata_from_state_preserves_identity_and_merges_git_fields() {
let filesystem_thread_id = ThreadId::new();
let state_thread_id = ThreadId::new();
let filesystem_path = PathBuf::from("/tmp/filesystem-rollout.jsonl");
@@ -789,9 +848,9 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
thread_id: Some(filesystem_thread_id),
first_user_message: Some("filesystem message".to_string()),
cwd: None,
git_branch: None,
git_sha: None,
git_origin_url: None,
git_branch: Some("filesystem-branch".to_string()),
git_sha: Some("filesystem-sha".to_string()),
git_origin_url: Some("https://example.com/filesystem.git".to_string()),
source: None,
agent_nickname: None,
agent_role: None,
@@ -800,24 +859,30 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
created_at: None,
updated_at: None,
};
let state_item = ThreadItem {
path: state_path,
thread_id: Some(state_thread_id),
first_user_message: Some("state message".to_string()),
cwd: Some(PathBuf::from("/tmp/state-cwd")),
git_branch: Some("state-branch".to_string()),
git_sha: Some("state-sha".to_string()),
git_origin_url: Some("https://example.com/state.git".to_string()),
source: Some(SessionSource::Exec),
agent_nickname: Some("state-agent".to_string()),
agent_role: Some("state-role".to_string()),
model_provider: Some("state-provider".to_string()),
cli_version: Some("state-version".to_string()),
created_at: Some("2025-01-03T16:00:00Z".to_string()),
updated_at: Some("2025-01-03T16:01:02.003Z".to_string()),
};
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 16, 0, 0)
.single()
.expect("valid datetime");
let updated_at = chrono::DateTime::parse_from_rfc3339("2025-01-03T16:01:02.003Z")
.expect("valid datetime")
.with_timezone(&chrono::Utc);
let mut builder = codex_state::ThreadMetadataBuilder::new(
state_thread_id,
state_path,
created_at,
SessionSource::Exec,
);
builder.cwd = PathBuf::from("/tmp/state-cwd");
builder.model_provider = Some("state-provider".to_string());
builder.cli_version = Some("state-version".to_string());
builder.agent_nickname = Some("state-agent".to_string());
builder.agent_role = Some("state-role".to_string());
builder.git_branch = Some("state-branch".to_string());
let mut metadata = builder.build("fallback-provider");
metadata.first_user_message = Some("state message".to_string());
metadata.updated_at = updated_at;
fill_missing_thread_item_metadata(&mut item, state_item);
overlay_thread_item_metadata_from_state(&mut item, &metadata);
assert_eq!(item.path, filesystem_path);
assert_eq!(item.thread_id, Some(filesystem_thread_id));
@@ -827,17 +892,17 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
);
assert_eq!(item.cwd.as_deref(), Some(Path::new("/tmp/state-cwd")));
assert_eq!(item.git_branch.as_deref(), Some("state-branch"));
assert_eq!(item.git_sha.as_deref(), Some("state-sha"));
assert_eq!(item.git_sha.as_deref(), Some("filesystem-sha"));
assert_eq!(
item.git_origin_url.as_deref(),
Some("https://example.com/state.git")
Some("https://example.com/filesystem.git")
);
assert_eq!(item.source, Some(SessionSource::Exec));
assert_eq!(item.agent_nickname.as_deref(), Some("state-agent"));
assert_eq!(item.agent_role.as_deref(), Some("state-role"));
assert_eq!(item.model_provider.as_deref(), Some("state-provider"));
assert_eq!(item.cli_version.as_deref(), Some("state-version"));
assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00Z"));
assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00.000Z"));
assert_eq!(item.updated_at.as_deref(), Some("2025-01-03T16:01:02.003Z"));
}

View File

@@ -11,6 +11,7 @@ use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
pub use codex_state::LogEntry;
use codex_state::ThreadMetadata;
use codex_state::ThreadMetadataBuilder;
use codex_utils_path::normalize_for_path_comparison;
use serde_json::Value;
@@ -22,6 +23,29 @@ use tracing::warn;
/// Core-facing handle to the SQLite-backed state runtime.
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
/// Exact thread metadata lookup that does not require completed backfill.
///
/// This intentionally exposes only thread-id lookup, not DB-backed listing.
#[derive(Clone)]
pub struct ExactThreadMetadataLookup {
runtime: StateDbHandle,
}
impl ExactThreadMetadataLookup {
pub async fn get_thread(&self, thread_id: ThreadId) -> anyhow::Result<Option<ThreadMetadata>> {
self.runtime.get_thread(thread_id).await
}
}
/// State DB handles used by thread listing.
///
/// `completed` is safe for DB-backed paging. `exact_lookup` may be used only
/// to overlay metadata for thread IDs already discovered from rollouts.
pub(crate) struct ThreadListStateDb {
pub(crate) completed: Option<StateDbHandle>,
pub(crate) exact_lookup: Option<ExactThreadMetadataLookup>,
}
/// Initialize the state runtime for thread state persistence and backfill checks.
pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
let config = RolloutConfig::from_view(config);
@@ -62,32 +86,57 @@ pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
let state_path = codex_state::state_db_path(config.sqlite_home());
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
return None;
let runtime = open_state_db_without_backfill_check(
config.sqlite_home(),
config.model_provider_id().to_string(),
)
.await?;
require_backfill_complete(runtime, config.sqlite_home()).await
}
pub(crate) async fn thread_list_state_db(config: &impl RolloutConfigView) -> ThreadListStateDb {
let exact_lookup = open_exact_thread_metadata_lookup(config).await;
let completed = match exact_lookup.as_ref().map(|lookup| lookup.runtime.clone()) {
Some(runtime) => require_backfill_complete(runtime, config.sqlite_home()).await,
None => None,
};
ThreadListStateDb {
completed,
exact_lookup,
}
let runtime = codex_state::StateRuntime::init(
config.sqlite_home().to_path_buf(),
}
pub async fn open_exact_thread_metadata_lookup(
config: &impl RolloutConfigView,
) -> Option<ExactThreadMetadataLookup> {
open_state_db_without_backfill_check(
config.sqlite_home(),
config.model_provider_id().to_string(),
)
.await
.ok()?;
require_backfill_complete(runtime, config.sqlite_home()).await
.map(|runtime| ExactThreadMetadataLookup { runtime })
}
/// Open the state runtime when the SQLite file exists, without feature gating.
///
/// This is used for parity checks during the SQLite migration phase.
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
let db_path = codex_state::state_db_path(codex_home);
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
let runtime =
open_state_db_without_backfill_check(codex_home, default_provider.to_string()).await?;
require_backfill_complete(runtime, codex_home).await
}
async fn open_state_db_without_backfill_check(
sqlite_home: &Path,
default_provider: String,
) -> Option<StateDbHandle> {
let state_path = codex_state::state_db_path(sqlite_home);
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
return None;
}
let runtime =
codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string())
.await
.ok()?;
require_backfill_complete(runtime, codex_home).await
codex_state::StateRuntime::init(sqlite_home.to_path_buf(), default_provider)
.await
.ok()
}
async fn require_backfill_complete(

View File

@@ -42,6 +42,7 @@ pub use model::Stage1JobClaim;
pub use model::Stage1JobClaimOutcome;
pub use model::Stage1Output;
pub use model::Stage1StartupClaimParams;
pub use model::ThreadGitInfoFields;
pub use model::ThreadGoal;
pub use model::ThreadGoalStatus;
pub use model::ThreadMetadata;

View File

@@ -31,6 +31,7 @@ pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;
pub use thread_metadata::SortDirection;
pub use thread_metadata::SortKey;
pub use thread_metadata::ThreadGitInfoFields;
pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;
pub use thread_metadata::ThreadsPage;

View File

@@ -104,6 +104,47 @@ pub struct ThreadMetadata {
pub git_origin_url: Option<String>,
}
/// Git metadata fields stored with a thread.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ThreadGitInfoFields {
/// The git commit SHA, if known.
pub sha: Option<String>,
/// The git branch name, if known.
pub branch: Option<String>,
/// The git origin URL, if known.
pub origin_url: Option<String>,
}
impl ThreadGitInfoFields {
/// Create git metadata from its persisted string fields.
pub fn new(sha: Option<String>, branch: Option<String>, origin_url: Option<String>) -> Self {
Self {
sha,
branch,
origin_url,
}
}
/// Returns true when no git metadata fields are present.
pub fn is_empty(&self) -> bool {
self.sha.is_none() && self.branch.is_none() && self.origin_url.is_none()
}
/// Overlay non-null fields from `other`, preserving existing values for
/// fields that `other` does not know about.
pub fn overlay_non_null(&mut self, other: &Self) {
if other.sha.is_some() {
self.sha = other.sha.clone();
}
if other.branch.is_some() {
self.branch = other.branch.clone();
}
if other.origin_url.is_some() {
self.origin_url = other.origin_url.clone();
}
}
}
/// Builder data required to construct [`ThreadMetadata`] without parsing filenames.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ThreadMetadataBuilder {
@@ -216,17 +257,22 @@ impl ThreadMetadataBuilder {
}
impl ThreadMetadata {
/// Return the persisted git metadata fields for this thread.
pub fn git_info_fields(&self) -> ThreadGitInfoFields {
ThreadGitInfoFields::new(
self.git_sha.clone(),
self.git_branch.clone(),
self.git_origin_url.clone(),
)
}
/// Preserve existing non-null Git fields when rollout-derived metadata is reconciled.
pub fn prefer_existing_git_info(&mut self, existing: &Self) {
if existing.git_sha.is_some() {
self.git_sha = existing.git_sha.clone();
}
if existing.git_branch.is_some() {
self.git_branch = existing.git_branch.clone();
}
if existing.git_origin_url.is_some() {
self.git_origin_url = existing.git_origin_url.clone();
}
let mut git_info = self.git_info_fields();
git_info.overlay_non_null(&existing.git_info_fields());
self.git_sha = git_info.sha;
self.git_branch = git_info.branch;
self.git_origin_url = git_info.origin_url;
}
/// Return the list of field names that differ between `self` and `other`.

View File

@@ -14,6 +14,8 @@ use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_rollout::ThreadItem;
use codex_state::ThreadGitInfoFields;
use codex_state::ThreadMetadata;
use crate::StoredThread;
use crate::ThreadStoreError;
@@ -154,6 +156,34 @@ pub(super) fn git_info_from_parts(
})
}
fn git_info_fields_from_git_info(git_info: Option<&GitInfo>) -> ThreadGitInfoFields {
let Some(git_info) = git_info else {
return ThreadGitInfoFields::default();
};
ThreadGitInfoFields::new(
git_info.commit_hash.as_ref().map(|sha| sha.0.clone()),
git_info.branch.clone(),
git_info.repository_url.clone(),
)
}
fn git_info_from_fields(fields: ThreadGitInfoFields) -> Option<GitInfo> {
if fields.is_empty() {
return None;
}
Some(GitInfo {
commit_hash: fields.sha.as_deref().map(GitSha::new),
branch: fields.branch,
repository_url: fields.origin_url,
})
}
pub(super) fn apply_metadata_git_info(thread: &mut StoredThread, metadata: &ThreadMetadata) {
let mut git_info = git_info_fields_from_git_info(thread.git_info.as_ref());
git_info.overlay_non_null(&metadata.git_info_fields());
thread.git_info = git_info_from_fields(git_info);
}
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
let file_name = path.file_name()?.to_str()?;
let stem = file_name.strip_suffix(".jsonl")?;

View File

@@ -10,10 +10,10 @@ use codex_rollout::find_thread_name_by_id;
use codex_rollout::find_thread_path_by_id_str;
use codex_rollout::read_session_meta_line;
use codex_rollout::read_thread_item_from_rollout;
use codex_state::StateRuntime;
use codex_state::ThreadMetadata;
use super::LocalThreadStore;
use super::helpers::apply_metadata_git_info;
use super::helpers::git_info_from_parts;
use super::helpers::stored_thread_from_rollout_item;
use crate::ReadThreadParams;
@@ -27,7 +27,8 @@ pub(super) async fn read_thread(
params: ReadThreadParams,
) -> ThreadStoreResult<StoredThread> {
let thread_id = params.thread_id;
if let Some(metadata) = read_sqlite_metadata(store, thread_id).await
let sqlite_metadata = read_sqlite_metadata(store, thread_id).await;
if let Some(metadata) = sqlite_metadata.as_ref()
&& (params.include_archived || metadata.archived_at.is_none())
&& (!params.include_history
|| sqlite_rollout_path_can_load_history_for_thread(
@@ -37,7 +38,7 @@ pub(super) async fn read_thread(
)
.await)
{
let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await;
let mut thread = stored_thread_from_sqlite_metadata(store, metadata.clone()).await;
attach_history_if_requested(&mut thread, params.include_history).await?;
return Ok(thread);
}
@@ -152,20 +153,23 @@ async fn read_thread_from_rollout_path(
store: &LocalThreadStore,
path: std::path::PathBuf,
) -> ThreadStoreResult<StoredThread> {
let Some(item) = read_thread_item_from_rollout(path.clone()).await else {
return stored_thread_from_session_meta(store, path).await;
};
let archived = path.starts_with(
store
.config
.codex_home
.join(codex_rollout::ARCHIVED_SESSIONS_SUBDIR),
);
let mut thread =
let mut thread = if let Some(item) = read_thread_item_from_rollout(path.clone()).await {
let archived = path.starts_with(
store
.config
.codex_home
.join(codex_rollout::ARCHIVED_SESSIONS_SUBDIR),
);
stored_thread_from_rollout_item(item, archived, store.config.model_provider_id.as_str())
.ok_or_else(|| ThreadStoreError::Internal {
message: format!("failed to read thread id from {}", path.display()),
})?;
})?
} else {
stored_thread_from_session_meta(store, path.clone()).await?
};
if let Some(metadata) = read_sqlite_metadata(store, thread.thread_id).await {
apply_metadata_git_info(&mut thread, &metadata);
}
thread.forked_from_id = read_session_meta_line(path.as_path())
.await
.ok()
@@ -193,13 +197,12 @@ async fn read_sqlite_metadata(
store: &LocalThreadStore,
thread_id: codex_protocol::ThreadId,
) -> Option<ThreadMetadata> {
let runtime = StateRuntime::init(
store.config.sqlite_home.clone(),
store.config.model_provider_id.clone(),
)
.await
.ok()?;
runtime.get_thread(thread_id).await.ok().flatten()
codex_rollout::state_db::open_exact_thread_metadata_lookup(&store.config)
.await?
.get_thread(thread_id)
.await
.ok()
.flatten()
}
async fn stored_thread_from_sqlite_metadata(
@@ -217,6 +220,11 @@ async fn stored_thread_from_sqlite_metadata(
.await
.ok()
.and_then(|meta_line| meta_line.meta.forked_from_id);
let git_info = git_info_from_parts(
metadata.git_sha.clone(),
metadata.git_branch.clone(),
metadata.git_origin_url.clone(),
);
StoredThread {
thread_id: metadata.id,
rollout_path: Some(metadata.rollout_path),
@@ -239,11 +247,7 @@ async fn stored_thread_from_sqlite_metadata(
agent_nickname: metadata.agent_nickname,
agent_role: metadata.agent_role,
agent_path: metadata.agent_path,
git_info: git_info_from_parts(
metadata.git_sha,
metadata.git_branch,
metadata.git_origin_url,
),
git_info,
approval_mode: parse_or_default(&metadata.approval_mode, AskForApproval::OnRequest),
sandbox_policy: parse_or_default(
&metadata.sandbox_policy,
@@ -433,6 +437,56 @@ mod tests {
assert_eq!(thread.preview, "Hello from user");
}
#[tokio::test]
async fn read_thread_by_rollout_path_prefers_sqlite_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let store = LocalThreadStore::new(config.clone());
let uuid = Uuid::from_u128(223);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let mut builder = ThreadMetadataBuilder::new(
thread_id,
active_path.clone(),
Utc::now(),
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.git_branch = Some("sqlite-branch".to_string());
let metadata = builder.build(config.model_provider_id.as_str());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");
let thread = store
.read_thread_by_rollout_path(
active_path,
/*include_archived*/ false,
/*include_history*/ false,
)
.await
.expect("read thread by rollout path");
let git_info = thread.git_info.expect("git info should be present");
assert_eq!(git_info.branch.as_deref(), Some("sqlite-branch"));
assert_eq!(
git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()),
Some("abcdef")
);
assert_eq!(
git_info.repository_url.as_deref(),
Some("https://example.com/repo.git")
);
}
#[tokio::test]
async fn read_thread_returns_archived_rollout_when_requested() {
let home = TempDir::new().expect("temp dir");