mirror of
https://github.com/openai/codex.git
synced 2026-05-06 14:21:08 +03:00
Compare commits
4 Commits
starr/exec
...
joeytrasat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
829ce9f730 | ||
|
|
eb392628c6 | ||
|
|
4cf7633db9 | ||
|
|
ffcedfd570 |
@@ -1287,6 +1287,70 @@ stream_max_retries = 0
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_by_path_prefers_persisted_git_metadata_for_local_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
let thread_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), "2025-01-05T12-00-00", &thread_id);
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let update_id = mcp
|
||||
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
|
||||
thread_id: thread_id.clone(),
|
||||
git_info: Some(ThreadMetadataGitInfoUpdateParams {
|
||||
sha: None,
|
||||
branch: Some(Some("feature/path-resume".to_string())),
|
||||
origin_url: None,
|
||||
}),
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(update_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id,
|
||||
path: Some(rollout_file_path),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(
|
||||
thread
|
||||
.git_info
|
||||
.as_ref()
|
||||
.and_then(|git| git.branch.as_deref()),
|
||||
Some("feature/path-resume")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
|
||||
{
|
||||
|
||||
@@ -62,6 +62,8 @@ use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_state::ThreadGitInfoFields;
|
||||
use codex_state::ThreadMetadata;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use codex_utils_path as path_utils;
|
||||
|
||||
@@ -365,7 +367,9 @@ impl RolloutRecorder {
|
||||
search_term: Option<&str>,
|
||||
) -> std::io::Result<ThreadsPage> {
|
||||
let codex_home = config.codex_home();
|
||||
let state_db_ctx = state_db::get_state_db(config).await;
|
||||
let state_db = state_db::thread_list_state_db(config).await;
|
||||
let state_db_ctx = state_db.completed;
|
||||
let exact_lookup_state_db_ctx = state_db.exact_lookup;
|
||||
let archived = match archive_filter {
|
||||
ThreadListArchiveFilter::Active => false,
|
||||
ThreadListArchiveFilter::Archived => true,
|
||||
@@ -436,12 +440,12 @@ impl RolloutRecorder {
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
return Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
page_size,
|
||||
sort_key,
|
||||
));
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
return Ok(overlay_thread_item_metadata_from_state_db(
|
||||
exact_lookup_state_db_ctx.as_ref(),
|
||||
page,
|
||||
)
|
||||
.await);
|
||||
}
|
||||
|
||||
// Warm the DB by repairing every filesystem hit before querying SQLite.
|
||||
@@ -529,8 +533,8 @@ impl RolloutRecorder {
|
||||
.await;
|
||||
}
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
return Ok(overlay_thread_item_metadata_from_state_db(
|
||||
exact_lookup_state_db_ctx.as_ref(),
|
||||
page,
|
||||
)
|
||||
.await);
|
||||
@@ -539,8 +543,8 @@ impl RolloutRecorder {
|
||||
}
|
||||
if listing_has_metadata_filters {
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
return Ok(fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx.as_deref(),
|
||||
return Ok(overlay_thread_item_metadata_from_state_db(
|
||||
exact_lookup_state_db_ctx.as_ref(),
|
||||
page,
|
||||
)
|
||||
.await);
|
||||
@@ -548,12 +552,11 @@ impl RolloutRecorder {
|
||||
// If SQLite listing still fails, return the filesystem page rather than failing the list.
|
||||
tracing::error!("Falling back on rollout system");
|
||||
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
|
||||
Ok(page_from_filesystem_scan(
|
||||
fs_page,
|
||||
sort_direction,
|
||||
page_size,
|
||||
sort_key,
|
||||
))
|
||||
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
|
||||
Ok(
|
||||
overlay_thread_item_metadata_from_state_db(exact_lookup_state_db_ctx.as_ref(), page)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
|
||||
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
|
||||
@@ -978,11 +981,11 @@ fn page_from_filesystem_scan(
|
||||
}
|
||||
}
|
||||
|
||||
async fn fill_missing_thread_item_metadata_from_state_db(
|
||||
state_db_ctx: Option<&StateRuntime>,
|
||||
async fn overlay_thread_item_metadata_from_state_db(
|
||||
exact_lookup: Option<&state_db::ExactThreadMetadataLookup>,
|
||||
mut page: ThreadsPage,
|
||||
) -> ThreadsPage {
|
||||
let Some(state_db_ctx) = state_db_ctx else {
|
||||
let Some(exact_lookup) = exact_lookup else {
|
||||
return page;
|
||||
};
|
||||
|
||||
@@ -990,7 +993,7 @@ async fn fill_missing_thread_item_metadata_from_state_db(
|
||||
let Some(thread_id) = item.thread_id else {
|
||||
continue;
|
||||
};
|
||||
let metadata = match state_db_ctx.get_thread(thread_id).await {
|
||||
let metadata = match exact_lookup.get_thread(thread_id).await {
|
||||
Ok(Some(metadata)) => metadata,
|
||||
Ok(None) => continue,
|
||||
Err(err) => {
|
||||
@@ -1000,65 +1003,60 @@ async fn fill_missing_thread_item_metadata_from_state_db(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata));
|
||||
overlay_thread_item_metadata_from_state(item, &metadata);
|
||||
}
|
||||
|
||||
page
|
||||
}
|
||||
|
||||
fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadItem) {
|
||||
let ThreadItem {
|
||||
path: _state_path,
|
||||
thread_id: _state_thread_id,
|
||||
first_user_message,
|
||||
cwd,
|
||||
git_branch,
|
||||
git_sha,
|
||||
git_origin_url,
|
||||
source,
|
||||
agent_nickname,
|
||||
agent_role,
|
||||
model_provider,
|
||||
cli_version,
|
||||
created_at,
|
||||
updated_at,
|
||||
} = state_item;
|
||||
|
||||
fn overlay_thread_item_metadata_from_state(item: &mut ThreadItem, metadata: &ThreadMetadata) {
|
||||
if item.first_user_message.is_none() {
|
||||
item.first_user_message = first_user_message;
|
||||
item.first_user_message = metadata.first_user_message.clone();
|
||||
}
|
||||
if item.cwd.is_none() {
|
||||
item.cwd = cwd;
|
||||
}
|
||||
if item.git_branch.is_none() {
|
||||
item.git_branch = git_branch;
|
||||
}
|
||||
if item.git_sha.is_none() {
|
||||
item.git_sha = git_sha;
|
||||
}
|
||||
if item.git_origin_url.is_none() {
|
||||
item.git_origin_url = git_origin_url;
|
||||
item.cwd = Some(metadata.cwd.clone());
|
||||
}
|
||||
let mut git_info = ThreadGitInfoFields::new(
|
||||
item.git_sha.clone(),
|
||||
item.git_branch.clone(),
|
||||
item.git_origin_url.clone(),
|
||||
);
|
||||
git_info.overlay_non_null(&metadata.git_info_fields());
|
||||
item.git_sha = git_info.sha;
|
||||
item.git_branch = git_info.branch;
|
||||
item.git_origin_url = git_info.origin_url;
|
||||
if item.source.is_none() {
|
||||
item.source = source;
|
||||
item.source = Some(
|
||||
serde_json::from_str(metadata.source.as_str())
|
||||
.or_else(|_| serde_json::from_value(Value::String(metadata.source.clone())))
|
||||
.unwrap_or(SessionSource::Unknown),
|
||||
);
|
||||
}
|
||||
if item.agent_nickname.is_none() {
|
||||
item.agent_nickname = agent_nickname;
|
||||
item.agent_nickname = metadata.agent_nickname.clone();
|
||||
}
|
||||
if item.agent_role.is_none() {
|
||||
item.agent_role = agent_role;
|
||||
item.agent_role = metadata.agent_role.clone();
|
||||
}
|
||||
if item.model_provider.is_none() {
|
||||
item.model_provider = model_provider;
|
||||
item.model_provider = Some(metadata.model_provider.clone());
|
||||
}
|
||||
if item.cli_version.is_none() {
|
||||
item.cli_version = cli_version;
|
||||
item.cli_version = Some(metadata.cli_version.clone());
|
||||
}
|
||||
if item.created_at.is_none() {
|
||||
item.created_at = created_at;
|
||||
item.created_at = Some(
|
||||
metadata
|
||||
.created_at
|
||||
.to_rfc3339_opts(SecondsFormat::Millis, true),
|
||||
);
|
||||
}
|
||||
if item.updated_at.is_none() {
|
||||
item.updated_at = updated_at;
|
||||
item.updated_at = Some(
|
||||
metadata
|
||||
.updated_at
|
||||
.to_rfc3339_opts(SecondsFormat::Millis, true),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -778,8 +778,67 @@ async fn list_threads_metadata_filter_overlays_state_db_list_metadata() -> std::
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_overlays_state_db_git_info_before_backfill_complete() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let config = test_config(home.path());
|
||||
|
||||
let uuid = Uuid::from_u128(9016);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
|
||||
let rollout_path = write_session_file(home.path(), "2025-01-03T17-00-00", uuid)?;
|
||||
|
||||
let runtime = codex_state::StateRuntime::init(
|
||||
home.path().to_path_buf(),
|
||||
config.model_provider_id.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let created_at = chrono::Utc
|
||||
.with_ymd_and_hms(2025, 1, 3, 17, 0, 0)
|
||||
.single()
|
||||
.expect("valid datetime");
|
||||
let mut builder = codex_state::ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
rollout_path,
|
||||
created_at,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
builder.model_provider = Some(config.model_provider_id.clone());
|
||||
builder.cwd = home.path().to_path_buf();
|
||||
builder.git_branch = Some("sqlite-branch".to_string());
|
||||
builder.git_sha = Some("sqlite-sha".to_string());
|
||||
builder.git_origin_url = Some("https://example.com/repo.git".to_string());
|
||||
runtime
|
||||
.upsert_thread(&builder.build(config.model_provider_id.as_str()))
|
||||
.await
|
||||
.expect("state db upsert should succeed");
|
||||
|
||||
let page = RolloutRecorder::list_threads(
|
||||
&config,
|
||||
/*page_size*/ 10,
|
||||
/*cursor*/ None,
|
||||
ThreadSortKey::CreatedAt,
|
||||
SortDirection::Desc,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
/*cwd_filters*/ None,
|
||||
config.model_provider_id.as_str(),
|
||||
/*search_term*/ None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(page.items.len(), 1);
|
||||
assert_eq!(page.items[0].git_branch.as_deref(), Some("sqlite-branch"));
|
||||
assert_eq!(page.items[0].git_sha.as_deref(), Some("sqlite-sha"));
|
||||
assert_eq!(
|
||||
page.items[0].git_origin_url.as_deref(),
|
||||
Some("https://example.com/repo.git")
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
|
||||
fn overlay_thread_item_metadata_from_state_preserves_identity_and_merges_git_fields() {
|
||||
let filesystem_thread_id = ThreadId::new();
|
||||
let state_thread_id = ThreadId::new();
|
||||
let filesystem_path = PathBuf::from("/tmp/filesystem-rollout.jsonl");
|
||||
@@ -789,9 +848,9 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
|
||||
thread_id: Some(filesystem_thread_id),
|
||||
first_user_message: Some("filesystem message".to_string()),
|
||||
cwd: None,
|
||||
git_branch: None,
|
||||
git_sha: None,
|
||||
git_origin_url: None,
|
||||
git_branch: Some("filesystem-branch".to_string()),
|
||||
git_sha: Some("filesystem-sha".to_string()),
|
||||
git_origin_url: Some("https://example.com/filesystem.git".to_string()),
|
||||
source: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
@@ -800,24 +859,30 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
};
|
||||
let state_item = ThreadItem {
|
||||
path: state_path,
|
||||
thread_id: Some(state_thread_id),
|
||||
first_user_message: Some("state message".to_string()),
|
||||
cwd: Some(PathBuf::from("/tmp/state-cwd")),
|
||||
git_branch: Some("state-branch".to_string()),
|
||||
git_sha: Some("state-sha".to_string()),
|
||||
git_origin_url: Some("https://example.com/state.git".to_string()),
|
||||
source: Some(SessionSource::Exec),
|
||||
agent_nickname: Some("state-agent".to_string()),
|
||||
agent_role: Some("state-role".to_string()),
|
||||
model_provider: Some("state-provider".to_string()),
|
||||
cli_version: Some("state-version".to_string()),
|
||||
created_at: Some("2025-01-03T16:00:00Z".to_string()),
|
||||
updated_at: Some("2025-01-03T16:01:02.003Z".to_string()),
|
||||
};
|
||||
let created_at = chrono::Utc
|
||||
.with_ymd_and_hms(2025, 1, 3, 16, 0, 0)
|
||||
.single()
|
||||
.expect("valid datetime");
|
||||
let updated_at = chrono::DateTime::parse_from_rfc3339("2025-01-03T16:01:02.003Z")
|
||||
.expect("valid datetime")
|
||||
.with_timezone(&chrono::Utc);
|
||||
let mut builder = codex_state::ThreadMetadataBuilder::new(
|
||||
state_thread_id,
|
||||
state_path,
|
||||
created_at,
|
||||
SessionSource::Exec,
|
||||
);
|
||||
builder.cwd = PathBuf::from("/tmp/state-cwd");
|
||||
builder.model_provider = Some("state-provider".to_string());
|
||||
builder.cli_version = Some("state-version".to_string());
|
||||
builder.agent_nickname = Some("state-agent".to_string());
|
||||
builder.agent_role = Some("state-role".to_string());
|
||||
builder.git_branch = Some("state-branch".to_string());
|
||||
let mut metadata = builder.build("fallback-provider");
|
||||
metadata.first_user_message = Some("state message".to_string());
|
||||
metadata.updated_at = updated_at;
|
||||
|
||||
fill_missing_thread_item_metadata(&mut item, state_item);
|
||||
overlay_thread_item_metadata_from_state(&mut item, &metadata);
|
||||
|
||||
assert_eq!(item.path, filesystem_path);
|
||||
assert_eq!(item.thread_id, Some(filesystem_thread_id));
|
||||
@@ -827,17 +892,17 @@ fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
|
||||
);
|
||||
assert_eq!(item.cwd.as_deref(), Some(Path::new("/tmp/state-cwd")));
|
||||
assert_eq!(item.git_branch.as_deref(), Some("state-branch"));
|
||||
assert_eq!(item.git_sha.as_deref(), Some("state-sha"));
|
||||
assert_eq!(item.git_sha.as_deref(), Some("filesystem-sha"));
|
||||
assert_eq!(
|
||||
item.git_origin_url.as_deref(),
|
||||
Some("https://example.com/state.git")
|
||||
Some("https://example.com/filesystem.git")
|
||||
);
|
||||
assert_eq!(item.source, Some(SessionSource::Exec));
|
||||
assert_eq!(item.agent_nickname.as_deref(), Some("state-agent"));
|
||||
assert_eq!(item.agent_role.as_deref(), Some("state-role"));
|
||||
assert_eq!(item.model_provider.as_deref(), Some("state-provider"));
|
||||
assert_eq!(item.cli_version.as_deref(), Some("state-version"));
|
||||
assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00Z"));
|
||||
assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00.000Z"));
|
||||
assert_eq!(item.updated_at.as_deref(), Some("2025-01-03T16:01:02.003Z"));
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
pub use codex_state::LogEntry;
|
||||
use codex_state::ThreadMetadata;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use codex_utils_path::normalize_for_path_comparison;
|
||||
use serde_json::Value;
|
||||
@@ -22,6 +23,29 @@ use tracing::warn;
|
||||
/// Core-facing handle to the SQLite-backed state runtime.
|
||||
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
|
||||
|
||||
/// Exact thread metadata lookup that does not require completed backfill.
|
||||
///
|
||||
/// This intentionally exposes only thread-id lookup, not DB-backed listing.
|
||||
#[derive(Clone)]
|
||||
pub struct ExactThreadMetadataLookup {
|
||||
runtime: StateDbHandle,
|
||||
}
|
||||
|
||||
impl ExactThreadMetadataLookup {
|
||||
pub async fn get_thread(&self, thread_id: ThreadId) -> anyhow::Result<Option<ThreadMetadata>> {
|
||||
self.runtime.get_thread(thread_id).await
|
||||
}
|
||||
}
|
||||
|
||||
/// State DB handles used by thread listing.
|
||||
///
|
||||
/// `completed` is safe for DB-backed paging. `exact_lookup` may be used only
|
||||
/// to overlay metadata for thread IDs already discovered from rollouts.
|
||||
pub(crate) struct ThreadListStateDb {
|
||||
pub(crate) completed: Option<StateDbHandle>,
|
||||
pub(crate) exact_lookup: Option<ExactThreadMetadataLookup>,
|
||||
}
|
||||
|
||||
/// Initialize the state runtime for thread state persistence and backfill checks.
|
||||
pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
let config = RolloutConfig::from_view(config);
|
||||
@@ -62,32 +86,57 @@ pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
|
||||
/// Get the DB if the feature is enabled and the DB exists.
|
||||
pub async fn get_state_db(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
let state_path = codex_state::state_db_path(config.sqlite_home());
|
||||
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
|
||||
return None;
|
||||
let runtime = open_state_db_without_backfill_check(
|
||||
config.sqlite_home(),
|
||||
config.model_provider_id().to_string(),
|
||||
)
|
||||
.await?;
|
||||
require_backfill_complete(runtime, config.sqlite_home()).await
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_list_state_db(config: &impl RolloutConfigView) -> ThreadListStateDb {
|
||||
let exact_lookup = open_exact_thread_metadata_lookup(config).await;
|
||||
let completed = match exact_lookup.as_ref().map(|lookup| lookup.runtime.clone()) {
|
||||
Some(runtime) => require_backfill_complete(runtime, config.sqlite_home()).await,
|
||||
None => None,
|
||||
};
|
||||
ThreadListStateDb {
|
||||
completed,
|
||||
exact_lookup,
|
||||
}
|
||||
let runtime = codex_state::StateRuntime::init(
|
||||
config.sqlite_home().to_path_buf(),
|
||||
}
|
||||
|
||||
pub async fn open_exact_thread_metadata_lookup(
|
||||
config: &impl RolloutConfigView,
|
||||
) -> Option<ExactThreadMetadataLookup> {
|
||||
open_state_db_without_backfill_check(
|
||||
config.sqlite_home(),
|
||||
config.model_provider_id().to_string(),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
require_backfill_complete(runtime, config.sqlite_home()).await
|
||||
.map(|runtime| ExactThreadMetadataLookup { runtime })
|
||||
}
|
||||
|
||||
/// Open the state runtime when the SQLite file exists, without feature gating.
|
||||
///
|
||||
/// This is used for parity checks during the SQLite migration phase.
|
||||
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
|
||||
let db_path = codex_state::state_db_path(codex_home);
|
||||
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
let runtime =
|
||||
open_state_db_without_backfill_check(codex_home, default_provider.to_string()).await?;
|
||||
require_backfill_complete(runtime, codex_home).await
|
||||
}
|
||||
|
||||
async fn open_state_db_without_backfill_check(
|
||||
sqlite_home: &Path,
|
||||
default_provider: String,
|
||||
) -> Option<StateDbHandle> {
|
||||
let state_path = codex_state::state_db_path(sqlite_home);
|
||||
if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) {
|
||||
return None;
|
||||
}
|
||||
let runtime =
|
||||
codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string())
|
||||
.await
|
||||
.ok()?;
|
||||
require_backfill_complete(runtime, codex_home).await
|
||||
codex_state::StateRuntime::init(sqlite_home.to_path_buf(), default_provider)
|
||||
.await
|
||||
.ok()
|
||||
}
|
||||
|
||||
async fn require_backfill_complete(
|
||||
|
||||
@@ -42,6 +42,7 @@ pub use model::Stage1JobClaim;
|
||||
pub use model::Stage1JobClaimOutcome;
|
||||
pub use model::Stage1Output;
|
||||
pub use model::Stage1StartupClaimParams;
|
||||
pub use model::ThreadGitInfoFields;
|
||||
pub use model::ThreadGoal;
|
||||
pub use model::ThreadGoalStatus;
|
||||
pub use model::ThreadMetadata;
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use thread_metadata::BackfillStats;
|
||||
pub use thread_metadata::ExtractionOutcome;
|
||||
pub use thread_metadata::SortDirection;
|
||||
pub use thread_metadata::SortKey;
|
||||
pub use thread_metadata::ThreadGitInfoFields;
|
||||
pub use thread_metadata::ThreadMetadata;
|
||||
pub use thread_metadata::ThreadMetadataBuilder;
|
||||
pub use thread_metadata::ThreadsPage;
|
||||
|
||||
@@ -104,6 +104,47 @@ pub struct ThreadMetadata {
|
||||
pub git_origin_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Git metadata fields stored with a thread.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
pub struct ThreadGitInfoFields {
|
||||
/// The git commit SHA, if known.
|
||||
pub sha: Option<String>,
|
||||
/// The git branch name, if known.
|
||||
pub branch: Option<String>,
|
||||
/// The git origin URL, if known.
|
||||
pub origin_url: Option<String>,
|
||||
}
|
||||
|
||||
impl ThreadGitInfoFields {
|
||||
/// Create git metadata from its persisted string fields.
|
||||
pub fn new(sha: Option<String>, branch: Option<String>, origin_url: Option<String>) -> Self {
|
||||
Self {
|
||||
sha,
|
||||
branch,
|
||||
origin_url,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when no git metadata fields are present.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.sha.is_none() && self.branch.is_none() && self.origin_url.is_none()
|
||||
}
|
||||
|
||||
/// Overlay non-null fields from `other`, preserving existing values for
|
||||
/// fields that `other` does not know about.
|
||||
pub fn overlay_non_null(&mut self, other: &Self) {
|
||||
if other.sha.is_some() {
|
||||
self.sha = other.sha.clone();
|
||||
}
|
||||
if other.branch.is_some() {
|
||||
self.branch = other.branch.clone();
|
||||
}
|
||||
if other.origin_url.is_some() {
|
||||
self.origin_url = other.origin_url.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder data required to construct [`ThreadMetadata`] without parsing filenames.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadMetadataBuilder {
|
||||
@@ -216,17 +257,22 @@ impl ThreadMetadataBuilder {
|
||||
}
|
||||
|
||||
impl ThreadMetadata {
|
||||
/// Return the persisted git metadata fields for this thread.
|
||||
pub fn git_info_fields(&self) -> ThreadGitInfoFields {
|
||||
ThreadGitInfoFields::new(
|
||||
self.git_sha.clone(),
|
||||
self.git_branch.clone(),
|
||||
self.git_origin_url.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Preserve existing non-null Git fields when rollout-derived metadata is reconciled.
|
||||
pub fn prefer_existing_git_info(&mut self, existing: &Self) {
|
||||
if existing.git_sha.is_some() {
|
||||
self.git_sha = existing.git_sha.clone();
|
||||
}
|
||||
if existing.git_branch.is_some() {
|
||||
self.git_branch = existing.git_branch.clone();
|
||||
}
|
||||
if existing.git_origin_url.is_some() {
|
||||
self.git_origin_url = existing.git_origin_url.clone();
|
||||
}
|
||||
let mut git_info = self.git_info_fields();
|
||||
git_info.overlay_non_null(&existing.git_info_fields());
|
||||
self.git_sha = git_info.sha;
|
||||
self.git_branch = git_info.branch;
|
||||
self.git_origin_url = git_info.origin_url;
|
||||
}
|
||||
|
||||
/// Return the list of field names that differ between `self` and `other`.
|
||||
|
||||
@@ -14,6 +14,8 @@ use codex_protocol::protocol::GitInfo;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_rollout::ThreadItem;
|
||||
use codex_state::ThreadGitInfoFields;
|
||||
use codex_state::ThreadMetadata;
|
||||
|
||||
use crate::StoredThread;
|
||||
use crate::ThreadStoreError;
|
||||
@@ -154,6 +156,34 @@ pub(super) fn git_info_from_parts(
|
||||
})
|
||||
}
|
||||
|
||||
fn git_info_fields_from_git_info(git_info: Option<&GitInfo>) -> ThreadGitInfoFields {
|
||||
let Some(git_info) = git_info else {
|
||||
return ThreadGitInfoFields::default();
|
||||
};
|
||||
ThreadGitInfoFields::new(
|
||||
git_info.commit_hash.as_ref().map(|sha| sha.0.clone()),
|
||||
git_info.branch.clone(),
|
||||
git_info.repository_url.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn git_info_from_fields(fields: ThreadGitInfoFields) -> Option<GitInfo> {
|
||||
if fields.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(GitInfo {
|
||||
commit_hash: fields.sha.as_deref().map(GitSha::new),
|
||||
branch: fields.branch,
|
||||
repository_url: fields.origin_url,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn apply_metadata_git_info(thread: &mut StoredThread, metadata: &ThreadMetadata) {
|
||||
let mut git_info = git_info_fields_from_git_info(thread.git_info.as_ref());
|
||||
git_info.overlay_non_null(&metadata.git_info_fields());
|
||||
thread.git_info = git_info_from_fields(git_info);
|
||||
}
|
||||
|
||||
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
|
||||
let file_name = path.file_name()?.to_str()?;
|
||||
let stem = file_name.strip_suffix(".jsonl")?;
|
||||
|
||||
@@ -10,10 +10,10 @@ use codex_rollout::find_thread_name_by_id;
|
||||
use codex_rollout::find_thread_path_by_id_str;
|
||||
use codex_rollout::read_session_meta_line;
|
||||
use codex_rollout::read_thread_item_from_rollout;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_state::ThreadMetadata;
|
||||
|
||||
use super::LocalThreadStore;
|
||||
use super::helpers::apply_metadata_git_info;
|
||||
use super::helpers::git_info_from_parts;
|
||||
use super::helpers::stored_thread_from_rollout_item;
|
||||
use crate::ReadThreadParams;
|
||||
@@ -27,7 +27,8 @@ pub(super) async fn read_thread(
|
||||
params: ReadThreadParams,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
let thread_id = params.thread_id;
|
||||
if let Some(metadata) = read_sqlite_metadata(store, thread_id).await
|
||||
let sqlite_metadata = read_sqlite_metadata(store, thread_id).await;
|
||||
if let Some(metadata) = sqlite_metadata.as_ref()
|
||||
&& (params.include_archived || metadata.archived_at.is_none())
|
||||
&& (!params.include_history
|
||||
|| sqlite_rollout_path_can_load_history_for_thread(
|
||||
@@ -37,7 +38,7 @@ pub(super) async fn read_thread(
|
||||
)
|
||||
.await)
|
||||
{
|
||||
let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await;
|
||||
let mut thread = stored_thread_from_sqlite_metadata(store, metadata.clone()).await;
|
||||
attach_history_if_requested(&mut thread, params.include_history).await?;
|
||||
return Ok(thread);
|
||||
}
|
||||
@@ -152,20 +153,23 @@ async fn read_thread_from_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
path: std::path::PathBuf,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
let Some(item) = read_thread_item_from_rollout(path.clone()).await else {
|
||||
return stored_thread_from_session_meta(store, path).await;
|
||||
};
|
||||
let archived = path.starts_with(
|
||||
store
|
||||
.config
|
||||
.codex_home
|
||||
.join(codex_rollout::ARCHIVED_SESSIONS_SUBDIR),
|
||||
);
|
||||
let mut thread =
|
||||
let mut thread = if let Some(item) = read_thread_item_from_rollout(path.clone()).await {
|
||||
let archived = path.starts_with(
|
||||
store
|
||||
.config
|
||||
.codex_home
|
||||
.join(codex_rollout::ARCHIVED_SESSIONS_SUBDIR),
|
||||
);
|
||||
stored_thread_from_rollout_item(item, archived, store.config.model_provider_id.as_str())
|
||||
.ok_or_else(|| ThreadStoreError::Internal {
|
||||
message: format!("failed to read thread id from {}", path.display()),
|
||||
})?;
|
||||
})?
|
||||
} else {
|
||||
stored_thread_from_session_meta(store, path.clone()).await?
|
||||
};
|
||||
if let Some(metadata) = read_sqlite_metadata(store, thread.thread_id).await {
|
||||
apply_metadata_git_info(&mut thread, &metadata);
|
||||
}
|
||||
thread.forked_from_id = read_session_meta_line(path.as_path())
|
||||
.await
|
||||
.ok()
|
||||
@@ -193,13 +197,12 @@ async fn read_sqlite_metadata(
|
||||
store: &LocalThreadStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
) -> Option<ThreadMetadata> {
|
||||
let runtime = StateRuntime::init(
|
||||
store.config.sqlite_home.clone(),
|
||||
store.config.model_provider_id.clone(),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
runtime.get_thread(thread_id).await.ok().flatten()
|
||||
codex_rollout::state_db::open_exact_thread_metadata_lookup(&store.config)
|
||||
.await?
|
||||
.get_thread(thread_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
async fn stored_thread_from_sqlite_metadata(
|
||||
@@ -217,6 +220,11 @@ async fn stored_thread_from_sqlite_metadata(
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|meta_line| meta_line.meta.forked_from_id);
|
||||
let git_info = git_info_from_parts(
|
||||
metadata.git_sha.clone(),
|
||||
metadata.git_branch.clone(),
|
||||
metadata.git_origin_url.clone(),
|
||||
);
|
||||
StoredThread {
|
||||
thread_id: metadata.id,
|
||||
rollout_path: Some(metadata.rollout_path),
|
||||
@@ -239,11 +247,7 @@ async fn stored_thread_from_sqlite_metadata(
|
||||
agent_nickname: metadata.agent_nickname,
|
||||
agent_role: metadata.agent_role,
|
||||
agent_path: metadata.agent_path,
|
||||
git_info: git_info_from_parts(
|
||||
metadata.git_sha,
|
||||
metadata.git_branch,
|
||||
metadata.git_origin_url,
|
||||
),
|
||||
git_info,
|
||||
approval_mode: parse_or_default(&metadata.approval_mode, AskForApproval::OnRequest),
|
||||
sandbox_policy: parse_or_default(
|
||||
&metadata.sandbox_policy,
|
||||
@@ -433,6 +437,56 @@ mod tests {
|
||||
assert_eq!(thread.preview, "Hello from user");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_thread_by_rollout_path_prefers_sqlite_git_info() {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let config = test_config(home.path());
|
||||
let store = LocalThreadStore::new(config.clone());
|
||||
let uuid = Uuid::from_u128(223);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
|
||||
let active_path =
|
||||
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
|
||||
let runtime = codex_state::StateRuntime::init(
|
||||
config.sqlite_home.clone(),
|
||||
config.model_provider_id.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let mut builder = ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
active_path.clone(),
|
||||
Utc::now(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
builder.model_provider = Some(config.model_provider_id.clone());
|
||||
builder.git_branch = Some("sqlite-branch".to_string());
|
||||
let metadata = builder.build(config.model_provider_id.as_str());
|
||||
runtime
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.expect("state db upsert should succeed");
|
||||
|
||||
let thread = store
|
||||
.read_thread_by_rollout_path(
|
||||
active_path,
|
||||
/*include_archived*/ false,
|
||||
/*include_history*/ false,
|
||||
)
|
||||
.await
|
||||
.expect("read thread by rollout path");
|
||||
|
||||
let git_info = thread.git_info.expect("git info should be present");
|
||||
assert_eq!(git_info.branch.as_deref(), Some("sqlite-branch"));
|
||||
assert_eq!(
|
||||
git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()),
|
||||
Some("abcdef")
|
||||
);
|
||||
assert_eq!(
|
||||
git_info.repository_url.as_deref(),
|
||||
Some("https://example.com/repo.git")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_thread_returns_archived_rollout_when_requested() {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
|
||||
Reference in New Issue
Block a user