Compare commits

...

15 Commits

Author SHA1 Message Date
Charles Cunningham
4eb0d3272d codex: use arc-swap for shared state db
Co-authored-by: Codex <noreply@openai.com>
2026-03-25 11:26:52 -07:00
Charles Cunningham
12839cef8a codex: drop empty db-page fallback
Co-authored-by: Codex <noreply@openai.com>
2026-03-25 10:48:19 -07:00
Charles Cunningham
1c890935ee codex: memoize recovered shared state db
Co-authored-by: Codex <noreply@openai.com>
2026-03-25 09:59:44 -07:00
Charles Cunningham
b7c41e9c44 codex: qualify in-process state_db init
Co-authored-by: Codex <noreply@openai.com>
2026-03-25 09:54:38 -07:00
Charles Cunningham
19f07d04e7 codex: qualify state_db init call
Co-authored-by: Codex <noreply@openai.com>
2026-03-25 09:52:29 -07:00
Charles Cunningham
5b56e14ec2 codex: narrow conversation summary cwd expectation
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 23:24:28 -07:00
Charles Cunningham
8dd09ce08d codex: normalize fake rollout cwd expectations
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 22:47:19 -07:00
Charles Cunningham
930dd42d9a codex: preserve search filtering on db fallback
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 22:37:02 -07:00
Charles Cunningham
25067165d5 codex: stabilize thread list cwd filter test
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 21:23:37 -07:00
Charles Cunningham
1a879698d6 codex: narrow thread/list cwd fix
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:54:26 -07:00
Charles Cunningham
9773cca5a2 Reuse shared state DB for agent resume
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:54:26 -07:00
Charles Cunningham
41a82c54db Fail softly when app-server state DB init fails
Keep app-server startup aligned with origin/main by warning and continuing when SQLite initialization fails instead of aborting startup. When the shared startup handle is absent, retry state DB initialization on demand for later metadata and rollout lookups.

Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:54:26 -07:00
Charles Cunningham
504afb1dcd Thread state DB through rollout lookups
Remove the one-line rollout path wrappers and make callers pass the shared optional state DB handle directly. Reuse the initialized app-server handle on cold thread paths and carry existing thread DB handles through core and app-server callsites.

Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:54:21 -07:00
Charles Cunningham
74ec7c1be5 Make app-server state DB init atomic
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:53:23 -07:00
Charles Cunningham
081b1394b4 Reuse app-server state DB for lookup paths
Co-authored-by: Codex <noreply@openai.com>
2026-03-24 20:53:23 -07:00
24 changed files with 547 additions and 202 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1429,6 +1429,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"app_test_support",
"arc-swap",
"async-trait",
"axum",
"base64 0.22.1",

View File

@@ -21,6 +21,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
arc-swap = "1.8.2"
async-trait = { workspace = true }
base64 = { workspace = true }
axum = { workspace = true, default-features = false, features = [

View File

@@ -233,8 +233,8 @@ use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::state_db;
use codex_core::state_db::StateDbHandle;
use codex_core::state_db::get_state_db;
use codex_core::state_db::reconcile_rollout;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
@@ -324,6 +324,7 @@ use crate::filters::source_kind_matches;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadState;
use crate::thread_state::ThreadStateManager;
use arc_swap::ArcSwapOption;
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
const THREAD_LIST_MAX_LIMIT: usize = 100;
@@ -384,6 +385,7 @@ pub(crate) struct CodexMessageProcessor {
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
background_tasks: TaskTracker,
state_db: ArcSwapOption<StateRuntime>,
feedback: CodexFeedback,
log_db: Option<LogDbLayer>,
}
@@ -430,9 +432,23 @@ pub(crate) struct CodexMessageProcessorArgs {
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: Option<StateDbHandle>,
}
impl CodexMessageProcessor {
async fn shared_state_db(&self) -> Option<StateDbHandle> {
if let Some(state_db) = self.state_db.load_full() {
return Some(state_db);
}
let recovered = state_db::init(self.config.as_ref()).await;
if let Some(recovered) = recovered.as_ref() {
self.state_db.store(Some(recovered.clone()));
}
recovered
}
pub(crate) fn clear_plugin_related_caches(&self) {
self.thread_manager.plugins_manager().clear_cache();
self.thread_manager.skills_manager().clear_cache();
@@ -482,6 +498,7 @@ impl CodexMessageProcessor {
Ok((thread_id, thread))
}
pub fn new(args: CodexMessageProcessorArgs) -> Self {
let CodexMessageProcessorArgs {
auth_manager,
@@ -494,6 +511,7 @@ impl CodexMessageProcessor {
cloud_requirements,
feedback,
log_db,
state_db,
} = args;
Self {
auth_manager,
@@ -512,6 +530,7 @@ impl CodexMessageProcessor {
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
background_tasks: TaskTracker::new(),
state_db: ArcSwapOption::new(state_db),
feedback,
log_db,
}
@@ -2236,29 +2255,34 @@ impl CodexMessageProcessor {
}
};
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {thread_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate thread id {thread_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let state_db = self.shared_state_db().await;
let rollout_path = match find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
state_db,
)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {thread_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate thread id {thread_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let thread_id_str = thread_id.to_string();
match self.archive_thread_common(thread_id, &rollout_path).await {
@@ -2386,20 +2410,25 @@ impl CodexMessageProcessor {
return;
}
let thread_exists =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
{
Ok(Some(_)) => true,
Ok(None) => false,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_id}: {err}"),
)
.await;
return;
}
};
let state_db = self.shared_state_db().await;
let thread_exists = match find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
state_db,
)
.await
{
Ok(Some(_)) => true,
Ok(None) => false,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_id}: {err}"),
)
.await;
return;
}
};
if !thread_exists {
self.send_invalid_request_error(request_id, format!("thread not found: {thread_id}"))
@@ -2472,7 +2501,7 @@ impl CodexMessageProcessor {
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let mut state_db_ctx = loaded_thread.as_ref().and_then(|thread| thread.state_db());
if state_db_ctx.is_none() {
state_db_ctx = get_state_db(&self.config).await;
state_db_ctx = self.shared_state_db().await;
}
let Some(state_db_ctx) = state_db_ctx else {
self.send_internal_error(
@@ -2675,33 +2704,37 @@ impl CodexMessageProcessor {
return Ok(());
}
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
let rollout_path = match find_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
Some(state_db_ctx.clone()),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
Some(state_db_ctx.clone()),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
return Err(invalid_request(format!("thread not found: {thread_uuid}")));
}
Err(err) => {
return Err(internal_error(format!(
"failed to locate archived thread id {thread_uuid}: {err}"
)));
}
},
Ok(None) => {
return Err(invalid_request(format!("thread not found: {thread_uuid}")));
}
Err(err) => {
return Err(internal_error(format!(
"failed to locate thread id {thread_uuid}: {err}"
"failed to locate archived thread id {thread_uuid}: {err}"
)));
}
};
},
Err(err) => {
return Err(internal_error(format!(
"failed to locate thread id {thread_uuid}: {err}"
)));
}
};
reconcile_rollout(
Some(state_db_ctx),
@@ -2744,9 +2777,11 @@ impl CodexMessageProcessor {
}
};
let state_db = self.shared_state_db().await;
let archived_path = match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
state_db,
)
.await
{
@@ -2773,7 +2808,7 @@ impl CodexMessageProcessor {
let rollout_path_display = archived_path.display().to_string();
let fallback_provider = self.config.model_provider_id.clone();
let state_db_ctx = get_state_db(&self.config).await;
let state_db_ctx = self.shared_state_db().await;
let archived_folder = self
.config
.codex_home
@@ -3253,34 +3288,43 @@ impl CodexMessageProcessor {
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db());
let state_db = if loaded_thread_state_db.is_some() {
None
} else {
self.shared_state_db().await
};
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await
} else {
read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await
read_summary_from_state_db_by_thread_id(state_db.as_ref(), thread_uuid).await
};
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => {
if include_turns {
None
} else {
rollout_path
}
let state_db_ctx = loaded_thread_state_db.clone().or_else(|| state_db.clone());
rollout_path = match find_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
state_db_ctx,
)
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => {
if include_turns {
None
} else {
rollout_path
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
if include_turns && rollout_path.is_none() && db_summary.is_some() {
@@ -3651,7 +3695,7 @@ impl CodexMessageProcessor {
let InitialHistory::Resumed(resumed_history) = thread_history else {
return None;
};
let state_db_ctx = get_state_db(&self.config).await?;
let state_db_ctx = self.shared_state_db().await?;
let persisted_metadata = state_db_ctx
.get_thread(resumed_history.conversation_id)
.await
@@ -3680,6 +3724,7 @@ impl CodexMessageProcessor {
return true;
}
let shared_state_db = self.shared_state_db().await;
let rollout_path = if let Some(path) = existing_thread.rollout_path() {
if path.exists() {
path
@@ -3687,6 +3732,9 @@ impl CodexMessageProcessor {
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
existing_thread
.state_db()
.or_else(|| shared_state_db.clone()),
)
.await
{
@@ -3713,6 +3761,9 @@ impl CodexMessageProcessor {
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
existing_thread
.state_db()
.or_else(|| shared_state_db.clone()),
)
.await
{
@@ -3772,12 +3823,13 @@ impl CodexMessageProcessor {
mismatch_details.join("; ")
);
}
let state_db_ctx = existing_thread.state_db();
let thread_summary = match load_thread_summary_for_rollout(
&self.config,
existing_thread_id,
rollout_path.as_path(),
config_snapshot.model_provider_id.as_str(),
/*persisted_metadata*/ None,
state_db_ctx.as_ref(),
)
.await
{
@@ -3868,9 +3920,11 @@ impl CodexMessageProcessor {
}
};
let state_db = self.shared_state_db().await;
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
state_db,
)
.await
{
@@ -3918,12 +3972,13 @@ impl CodexMessageProcessor {
) -> std::result::Result<Thread, String> {
let thread = match thread_history {
InitialHistory::Resumed(resumed) => {
let state_db_ctx = thread.state_db();
load_thread_summary_for_rollout(
&self.config,
resumed.conversation_id,
resumed.rollout_path.as_path(),
fallback_provider,
persisted_resume_metadata,
state_db_ctx.as_ref(),
)
.await
}
@@ -3999,9 +4054,11 @@ impl CodexMessageProcessor {
}
};
let state_db = self.shared_state_db().await;
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
state_db,
)
.await
{
@@ -4025,9 +4082,13 @@ impl CodexMessageProcessor {
}
};
let history_cwd =
read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path())
.await;
let state_db = self.shared_state_db().await;
let history_cwd = read_history_cwd_from_state_db(
state_db.as_ref(),
source_thread_id,
rollout_path.as_path(),
)
.await;
// Persist Windows sandbox mode.
let mut cli_overrides = cli_overrides.unwrap_or_default();
@@ -4249,9 +4310,10 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: GetConversationSummaryParams,
) {
let state_db = self.shared_state_db().await;
if let GetConversationSummaryParams::ThreadId { conversation_id } = &params
&& let Some(summary) =
read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await
read_summary_from_state_db_by_thread_id(state_db.as_ref(), *conversation_id).await
{
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
@@ -4267,9 +4329,11 @@ impl CodexMessageProcessor {
}
}
GetConversationSummaryParams::ThreadId { conversation_id } => {
match codex_core::find_thread_path_by_id_str(
let state_db = self.shared_state_db().await;
match find_thread_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
state_db,
)
.await
{
@@ -4352,7 +4416,7 @@ impl CodexMessageProcessor {
let fallback_provider = self.config.model_provider_id.clone();
let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds);
let allowed_sources = allowed_sources_vec.as_slice();
let state_db_ctx = get_state_db(&self.config).await;
let state_db_ctx = self.shared_state_db().await;
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
@@ -5205,7 +5269,7 @@ impl CodexMessageProcessor {
self.finalize_thread_teardown(thread_id).await;
if state_db_ctx.is_none() {
state_db_ctx = get_state_db(&self.config).await;
state_db_ctx = self.shared_state_db().await;
}
// Move the rollout file to archived.
@@ -6556,18 +6620,27 @@ impl CodexMessageProcessor {
let rollout_path = if let Some(path) = parent_thread.rollout_path() {
path
} else {
find_thread_path_by_id_str(&self.config.codex_home, &parent_thread_id.to_string())
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
data: None,
})?
.ok_or_else(|| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {parent_thread_id}"),
data: None,
})?
let state_db = if let Some(state_db) = parent_thread.state_db() {
Some(state_db)
} else {
self.shared_state_db().await
};
find_thread_path_by_id_str(
&self.config.codex_home,
&parent_thread_id.to_string(),
state_db,
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
data: None,
})?
.ok_or_else(|| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {parent_thread_id}"),
data: None,
})?
};
let mut config = self.config.as_ref().clone();
@@ -7160,11 +7233,10 @@ impl CodexMessageProcessor {
if let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
let state_db_ctx = get_state_db(&self.config).await;
match (state_db_ctx.as_ref(), conversation_id) {
(Some(state_db_ctx), Some(conversation_id)) => {
match (self.shared_state_db().await, conversation_id) {
(Some(state_db), Some(conversation_id)) => {
let thread_id_text = conversation_id.to_string();
match state_db_ctx.query_feedback_logs(&thread_id_text).await {
match state_db.query_feedback_logs(&thread_id_text).await {
Ok(logs) if logs.is_empty() => None,
Ok(logs) => Some(logs),
Err(err) => {
@@ -7313,7 +7385,17 @@ impl CodexMessageProcessor {
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
match self.thread_manager.get_thread(conversation_id).await {
Ok(conv) => conv.rollout_path(),
Err(_) => None,
Err(_) => {
let state_db = self.shared_state_db().await;
find_thread_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
state_db,
)
.await
.ok()
.flatten()
}
}
}
}
@@ -7962,12 +8044,12 @@ async fn derive_config_for_cwd(
}
async fn read_history_cwd_from_state_db(
config: &Config,
state_db_ctx: Option<&StateDbHandle>,
thread_id: Option<ThreadId>,
rollout_path: &Path,
) -> Option<PathBuf> {
if let Some(state_db_ctx) = get_state_db(config).await
&& let Some(thread_id) = thread_id
if let Some(thread_id) = thread_id
&& let Some(state_db_ctx) = state_db_ctx
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
{
return Some(metadata.cwd);
@@ -7984,11 +8066,10 @@ async fn read_history_cwd_from_state_db(
}
async fn read_summary_from_state_db_by_thread_id(
config: &Config,
state_db_ctx: Option<&StateDbHandle>,
thread_id: ThreadId,
) -> Option<ConversationSummary> {
let state_db_ctx = get_state_db(config).await;
read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await
read_summary_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await
}
async fn read_summary_from_state_db_context_by_thread_id(
@@ -8278,11 +8359,11 @@ fn map_git_info(git_info: &CoreGitInfo) -> ConversationGitInfo {
}
async fn load_thread_summary_for_rollout(
config: &Config,
thread_id: ThreadId,
rollout_path: &Path,
fallback_provider: &str,
persisted_metadata: Option<&ThreadMetadata>,
state_db_ctx: Option<&StateDbHandle>,
) -> std::result::Result<Thread, String> {
let mut thread = read_summary_from_rollout(rollout_path, fallback_provider)
.await
@@ -8298,7 +8379,9 @@ async fn load_thread_summary_for_rollout(
&mut thread,
summary_to_thread(summary_from_thread_metadata(persisted_metadata)),
);
} else if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await {
} else if let Some(summary) =
read_summary_from_state_db_by_thread_id(state_db_ctx, thread_id).await
{
merge_mutable_thread_metadata(&mut thread, summary_to_thread(summary));
}
Ok(thread)
@@ -8452,12 +8535,21 @@ mod tests {
use anyhow::Result;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_arg0::Arg0DispatchPaths;
use codex_core::ThreadManager;
use codex_core::config::ConfigBuilder;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use tempfile::TempDir;
#[test]
@@ -8981,6 +9073,62 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn shared_state_db_caches_successful_retry() -> Result<()> {
let codex_home = TempDir::new()?;
let config = Arc::new(
ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await?,
);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
SessionSource::Cli,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel::<OutgoingEnvelope>(1);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
thread_manager,
outgoing,
arg0_paths: Arg0DispatchPaths::default(),
config,
cli_overrides: Arc::new(RwLock::new(Vec::new())),
runtime_feature_enablement: Arc::new(RwLock::new(BTreeMap::new())),
cloud_requirements: Arc::new(RwLock::new(CloudRequirementsLoader::default())),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
});
let first = processor
.shared_state_db()
.await
.expect("state db should initialize on retry");
let second = processor
.shared_state_db()
.await
.expect("cached state db should be reused");
assert!(Arc::ptr_eq(&first, &second));
assert!(processor.state_db.load_full().is_some());
Ok(())
}
#[test]
fn summary_from_state_db_metadata_preserves_agent_nickname() -> Result<()> {
let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;

View File

@@ -77,6 +77,8 @@ use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::state_db;
use codex_core::state_db::StateDbHandle;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use tokio::sync::mpsc;
@@ -325,7 +327,14 @@ impl InProcessClientHandle {
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args);
let state_db = state_db::init(args.config.as_ref()).await;
if state_db.is_none() {
warn!(
"sqlite state db unavailable at startup for {}; continuing without sqlite-backed app-server state",
args.config.sqlite_home.display()
);
}
let client = start_uninitialized(args, state_db);
let initialize_response = client
.request(ClientRequest::Initialize {
@@ -345,7 +354,10 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
Ok(client)
}
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
fn start_uninitialized(
args: InProcessStartArgs,
state_db: Option<StateDbHandle>,
) -> InProcessClientHandle {
let channel_capacity = args.channel_capacity.max(1);
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
@@ -388,6 +400,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
cloud_requirements: args.cloud_requirements,
feedback: args.feedback,
log_db: None,
state_db,
config_warnings: args.config_warnings,
session_source: args.session_source,
enable_codex_api_key_env: args.enable_codex_api_key_env,
@@ -693,13 +706,17 @@ mod tests {
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.expect("default config should load"),
}
let codex_home = TempDir::new().expect("create temp dir");
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await
.expect("test config should build");
std::mem::forget(codex_home);
config
}
async fn start_test_client_with_capacity(

View File

@@ -8,6 +8,7 @@ use codex_core::config::ConfigBuilder;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::config_loader::LoaderOverrides;
use codex_core::state_db::init;
use codex_utils_cli::CliConfigOverrides;
use std::collections::HashMap;
use std::collections::HashSet;
@@ -489,6 +490,13 @@ pub async fn run_main_with_transport(
}
let feedback = CodexFeedback::new();
let state_db = init(&config).await;
if state_db.is_none() {
warn!(
"sqlite state db unavailable at startup for {}; continuing without sqlite-backed app-server state",
config.sqlite_home.display()
);
}
let otel = codex_core::otel_init::build_provider(
&config,
@@ -522,13 +530,7 @@ pub async fn run_main_with_transport(
let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
let log_db = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.ok()
.map(log_db::start);
let log_db = state_db.clone().map(log_db::start);
let log_db_layer = log_db
.clone()
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
@@ -618,6 +620,7 @@ pub async fn run_main_with_transport(
cloud_requirements: cloud_requirements.clone(),
feedback: feedback.clone(),
log_db,
state_db,
config_warnings,
session_source,
enable_codex_api_key_env: false,

View File

@@ -66,6 +66,7 @@ use codex_core::default_client::get_codex_user_agent;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::default_client::set_default_originator;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_core::state_db::StateDbHandle;
use codex_features::Feature;
use codex_feedback::CodexFeedback;
use codex_login::auth::ExternalAuthRefreshContext;
@@ -181,6 +182,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) cloud_requirements: CloudRequirementsLoader,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) enable_codex_api_key_env: bool,
@@ -199,6 +201,7 @@ impl MessageProcessor {
cloud_requirements,
feedback,
log_db,
state_db,
config_warnings,
session_source,
enable_codex_api_key_env,
@@ -242,6 +245,7 @@ impl MessageProcessor {
cloud_requirements: cloud_requirements.clone(),
feedback,
log_db,
state_db,
});
// Keep plugin startup warmups aligned at app-server startup.
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.

View File

@@ -24,6 +24,7 @@ use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::state_db::init;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
@@ -117,7 +118,7 @@ impl TracingHarness {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let config = Arc::new(build_test_config(codex_home.path(), &server.uri()).await?);
let (processor, outgoing_rx) = build_test_processor(config);
let (processor, outgoing_rx) = build_test_processor(config).await;
let tracing = init_test_tracing();
tracing.exporter.reset();
tracing::callsite::rebuild_interest_cache();
@@ -224,7 +225,7 @@ async fn build_test_config(codex_home: &Path, server_uri: &str) -> Result<Config
.await?)
}
fn build_test_processor(
async fn build_test_processor(
config: Arc<Config>,
) -> (
MessageProcessor,
@@ -232,6 +233,9 @@ fn build_test_processor(
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let state_db = init(config.as_ref())
.await
.expect("state db should initialize in tracing tests");
let processor = MessageProcessor::new(MessageProcessorArgs {
outgoing,
arg0_paths: Arg0DispatchPaths::default(),
@@ -241,6 +245,7 @@ fn build_test_processor(
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
log_db: None,
state_db: Some(state_db),
config_warnings: Vec::new(),
session_source: SessionSource::VSCode,
enable_codex_api_key_env: false,

View File

@@ -37,6 +37,7 @@ pub use responses::create_shell_command_sse_response;
pub use rollout::create_fake_rollout;
pub use rollout::create_fake_rollout_with_source;
pub use rollout::create_fake_rollout_with_text_elements;
pub use rollout::fake_rollout_cwd;
pub use rollout::rollout_path;
use serde::de::DeserializeOwned;

View File

@@ -23,6 +23,10 @@ pub fn rollout_path(codex_home: &Path, filename_ts: &str, thread_id: &str) -> Pa
.join(format!("rollout-{filename_ts}-{thread_id}.jsonl"))
}
pub fn fake_rollout_cwd() -> PathBuf {
std::fs::canonicalize(Path::new("/")).unwrap_or_else(|_| PathBuf::from("/"))
}
/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`.
///
/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format.

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::fake_rollout_cwd;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::ConversationSummary;
@@ -29,7 +30,7 @@ fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSum
timestamp: Some(META_RFC3339.to_string()),
updated_at: Some(META_RFC3339.to_string()),
model_provider: MODEL_PROVIDER.to_string(),
cwd: PathBuf::from("/"),
cwd: fake_rollout_cwd(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
git_info: None,
@@ -91,7 +92,8 @@ async fn get_conversation_summary_by_relative_rollout_path_resolves_from_codex_h
let thread_id = ThreadId::from_string(&conversation_id)?;
let rollout_path = rollout_path(codex_home.path(), FILENAME_TS, &conversation_id);
let relative_path = rollout_path.strip_prefix(codex_home.path())?.to_path_buf();
let expected = expected_summary(thread_id, std::fs::canonicalize(rollout_path)?);
let mut expected = expected_summary(thread_id, std::fs::canonicalize(rollout_path)?);
expected.cwd = PathBuf::from("/");
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

View File

@@ -58,7 +58,7 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
rollout_path.display()
);
assert!(
find_thread_path_by_id_str(codex_home.path(), &thread.id)
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
.await?
.is_none(),
"thread id should not be discoverable before rollout materialization"
@@ -113,9 +113,10 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
rollout_path.display()
);
let discovered_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist after materialization");
let discovered_path =
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
.await?
.expect("expected rollout path for thread id to exist after materialization");
assert_paths_match_on_disk(&discovered_path, &rollout_path)?;
let archive_id = mcp

View File

@@ -4,6 +4,7 @@ use app_test_support::create_fake_rollout;
use app_test_support::create_fake_rollout_with_source;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::fake_rollout_cwd;
use app_test_support::rollout_path;
use app_test_support::to_response;
use chrono::DateTime;
@@ -37,7 +38,6 @@ use std::fs;
use std::fs::FileTimes;
use std::fs::OpenOptions;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
@@ -358,12 +358,13 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
)
.await?;
assert_eq!(data1.len(), 2);
let expected_cwd = fake_rollout_cwd();
for thread in &data1 {
assert_eq!(thread.preview, "Hello");
assert_eq!(thread.model_provider, "mock_provider");
assert!(thread.created_at > 0);
assert_eq!(thread.updated_at, thread.created_at);
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cwd, expected_cwd);
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
@@ -390,7 +391,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
assert_eq!(thread.model_provider, "mock_provider");
assert!(thread.created_at > 0);
assert_eq!(thread.updated_at, thread.created_at);
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cwd, expected_cwd);
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
@@ -446,7 +447,7 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
let expected_ts = chrono::DateTime::parse_from_rfc3339("2025-01-02T11:00:00Z")?.timestamp();
assert_eq!(thread.created_at, expected_ts);
assert_eq!(thread.updated_at, expected_ts);
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cwd, fake_rollout_cwd());
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
@@ -484,6 +485,20 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let expected_cwd = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
None,
None,
)
.await?
.data
.into_iter()
.find(|summary| summary.id == filtered_id)
.map(|summary| summary.cwd)
.ok_or_else(|| anyhow::anyhow!("filtered thread missing from unfiltered list"))?;
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
@@ -492,7 +507,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: Some(target_cwd.to_string_lossy().into_owned()),
cwd: Some(expected_cwd.to_string_lossy().into_owned()),
search_term: None,
})
.await?;
@@ -509,7 +524,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
assert_eq!(data.len(), 1);
assert_eq!(data[0].id, filtered_id);
assert_ne!(data[0].id, unfiltered_id);
assert_eq!(data[0].cwd, target_cwd);
assert_eq!(data[0].cwd, expected_cwd);
Ok(())
}
@@ -591,6 +606,71 @@ sqlite = true
Ok(())
}
#[tokio::test]
async fn thread_list_keeps_empty_search_results_when_db_page_is_empty() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
r#"
model = "mock-model"
approval_policy = "never"
suppress_unstable_features_warning = true
[features]
sqlite = true
"#,
)?;
create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"match: needle",
Some("mock_provider"),
None,
)?;
create_fake_rollout(
codex_home.path(),
"2025-01-02T11-00-00",
"2025-01-02T11:00:00Z",
"no hit here",
Some("mock_provider"),
None,
)?;
let state_db =
codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into())
.await?;
state_db.mark_backfill_complete(None).await?;
let mut mcp = init_mcp(codex_home.path()).await?;
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: None,
search_term: Some("absent".to_string()),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadListResponse {
data, next_cursor, ..
} = to_response::<ThreadListResponse>(resp)?;
assert_eq!(next_cursor, None);
assert!(data.is_empty());
Ok(())
}
#[tokio::test]
async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -996,7 +1076,7 @@ async fn thread_list_includes_git_info() -> Result<()> {
};
assert_eq!(thread.git_info, Some(expected_git));
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cwd, fake_rollout_cwd());
assert_eq!(thread.cli_version, "0.0.0");
Ok(())

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::fake_rollout_cwd;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
@@ -30,7 +31,6 @@ use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -81,7 +81,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> {
assert_eq!(thread.model_provider, "mock_provider");
assert!(!thread.ephemeral, "stored rollouts should not be ephemeral");
assert!(thread.path.as_ref().expect("thread path").is_absolute());
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cwd, fake_rollout_cwd());
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);

View File

@@ -75,9 +75,10 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
)
.await??;
let found_rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
let found_rollout_path =
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
.await?
.expect("expected rollout path for thread id to exist");
assert_paths_match_on_disk(&found_rollout_path, &rollout_path)?;
let archive_id = mcp
@@ -92,9 +93,13 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let archived_path = find_archived_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected archived rollout path for thread id to exist");
let archived_path = find_archived_thread_path_by_id_str(
codex_home.path(),
&thread.id,
/*state_db_ctx*/ None,
)
.await?
.expect("expected archived rollout path for thread id to exist");
let archived_path_display = archived_path.display();
assert!(
archived_path.exists(),

View File

@@ -205,6 +205,9 @@ impl AgentControl {
.or(find_thread_path_by_id_str(
config.codex_home.as_path(),
&parent_thread_id.to_string(),
parent_thread
.as_ref()
.and_then(|parent_thread| parent_thread.state_db()),
)
.await?)
.ok_or_else(|| {
@@ -295,10 +298,16 @@ impl AgentControl {
config: crate::config::Config,
thread_id: ThreadId,
session_source: SessionSource,
state_db_ctx: Option<state_db::StateDbHandle>,
) -> CodexResult<ThreadId> {
let root_depth = thread_spawn_depth(&session_source).unwrap_or(0);
let resumed_thread_id = self
.resume_single_agent_from_rollout(config.clone(), thread_id, session_source)
.resume_single_agent_from_rollout(
config.clone(),
thread_id,
session_source,
state_db_ctx.clone(),
)
.await?;
let state = self.upgrade()?;
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
@@ -344,6 +353,7 @@ impl AgentControl {
config.clone(),
child_thread_id,
child_session_source,
Some(state_db_ctx.clone()),
)
.await
{
@@ -368,6 +378,7 @@ impl AgentControl {
mut config: crate::config::Config,
thread_id: ThreadId,
session_source: SessionSource,
state_db_ctx: Option<state_db::StateDbHandle>,
) -> CodexResult<ThreadId> {
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source
&& *depth >= config.agent_max_depth
@@ -377,6 +388,10 @@ impl AgentControl {
}
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let state_db_ctx = match state_db_ctx {
Some(state_db_ctx) => Some(state_db_ctx),
None => state_db::get_state_db(&config).await,
};
let (session_source, agent_metadata) = match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
@@ -386,7 +401,7 @@ impl AgentControl {
agent_nickname: _,
}) => {
let (resumed_agent_nickname, resumed_agent_role) =
if let Some(state_db_ctx) = state_db::get_state_db(&config).await {
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),
@@ -413,18 +428,22 @@ impl AgentControl {
let inherited_exec_policy = self
.inherited_exec_policy_for_source(&state, Some(&session_source), &config)
.await;
let rollout_path =
match find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
{
Some(rollout_path) => rollout_path,
None => find_archived_thread_path_by_id_str(
config.codex_home.as_path(),
&thread_id.to_string(),
)
.await?
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?,
};
let rollout_path = match find_thread_path_by_id_str(
config.codex_home.as_path(),
&thread_id.to_string(),
state_db_ctx.clone(),
)
.await?
{
Some(rollout_path) => rollout_path,
None => find_archived_thread_path_by_id_str(
config.codex_home.as_path(),
&thread_id.to_string(),
state_db_ctx,
)
.await?
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?,
};
let resumed_thread = state
.resume_thread_from_rollout_with_source(

View File

@@ -298,7 +298,12 @@ async fn resume_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.resume_agent_from_rollout(config, ThreadId::new(), SessionSource::Exec)
.resume_agent_from_rollout(
config,
ThreadId::new(),
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect_err("resume_agent should fail without a manager");
assert_eq!(
@@ -942,7 +947,12 @@ async fn resume_agent_respects_max_threads_limit() {
.expect("spawn_agent should succeed for active slot");
let err = control
.resume_agent_from_rollout(config, resumable_id, SessionSource::Exec)
.resume_agent_from_rollout(
config,
resumable_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect_err("resume should respect max threads");
let CodexErr::AgentLimitReached {
@@ -975,7 +985,12 @@ async fn resume_agent_releases_slot_after_resume_failure() {
let control = manager.agent_control();
let _ = control
.resume_agent_from_rollout(config.clone(), ThreadId::new(), SessionSource::Exec)
.resume_agent_from_rollout(
config.clone(),
ThreadId::new(),
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect_err("resume should fail for missing rollout path");
@@ -1454,6 +1469,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
agent_nickname: None,
agent_role: None,
}),
/*state_db_ctx*/ None,
)
.await
.expect("resume should succeed");
@@ -1540,7 +1556,12 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
let resumed_thread_id = harness
.control
.resume_agent_from_rollout(harness.config.clone(), child_thread_id, SessionSource::Exec)
.resume_agent_from_rollout(
harness.config.clone(),
child_thread_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect("resume should find archived rollout");
assert_eq!(resumed_thread_id, child_thread_id);
@@ -1799,6 +1820,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect("single-thread resume should succeed");
@@ -1895,6 +1917,7 @@ async fn resume_closed_child_reopens_open_descendants() {
agent_nickname: None,
agent_role: None,
}),
/*state_db_ctx*/ None,
)
.await
.expect("child resume should succeed");
@@ -1987,6 +2010,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect("tree resume should succeed");
@@ -2100,6 +2124,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect("tree resume should succeed");
@@ -2215,6 +2240,7 @@ async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails()
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
/*state_db_ctx*/ None,
)
.await
.expect("root resume should succeed");

View File

@@ -518,7 +518,8 @@ pub async fn cleanup_stale_snapshots(codex_home: &Path, active_session_id: Threa
continue;
}
let rollout_path = find_thread_path_by_id_str(codex_home, session_id).await?;
let rollout_path =
find_thread_path_by_id_str(codex_home, session_id, /*state_db_ctx*/ None).await?;
let Some(rollout_path) = rollout_path else {
remove_snapshot_file(&path).await;
continue;

View File

@@ -163,6 +163,7 @@ async fn try_resume_closed_agent(
/*agent_role*/ None,
/*task_name*/ None,
)?,
session.state_db(),
)
.await
.map(|_| ())

View File

@@ -78,9 +78,10 @@ async fn find_locates_rollout_file_by_id() {
let id = Uuid::new_v4();
let expected = write_minimal_rollout_with_id(home.path(), id);
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
let found =
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
.await
.unwrap();
assert_eq!(found.unwrap(), expected);
}
@@ -94,9 +95,10 @@ async fn find_handles_gitignore_covering_codex_home_directory() {
let id = Uuid::new_v4();
let expected = write_minimal_rollout_with_id(&codex_home, id);
let found = find_thread_path_by_id_str(&codex_home, &id.to_string())
.await
.unwrap();
let found =
find_thread_path_by_id_str(&codex_home, &id.to_string(), /*state_db_ctx*/ None)
.await
.unwrap();
assert_eq!(found, Some(expected));
}
@@ -114,9 +116,10 @@ async fn find_prefers_sqlite_path_by_id() {
write_minimal_rollout_with_id(home.path(), id);
upsert_thread_metadata(home.path(), thread_id, db_path.clone()).await;
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
let found =
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
.await
.unwrap();
assert_eq!(found, Some(db_path));
}
@@ -133,9 +136,10 @@ async fn find_falls_back_to_filesystem_when_sqlite_has_no_match() {
.join("sessions/2030/12/30/rollout-2030-12-30T00-00-00-unrelated.jsonl");
upsert_thread_metadata(home.path(), unrelated_thread_id, unrelated_path).await;
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
let found =
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
.await
.unwrap();
assert_eq!(found, Some(expected));
}
@@ -147,9 +151,10 @@ async fn find_ignores_granular_gitignore_rules() {
let expected = write_minimal_rollout_with_id(home.path(), id);
std::fs::write(home.path().join("sessions/.gitignore"), "*.jsonl\n").unwrap();
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
let found =
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
.await
.unwrap();
assert_eq!(found, Some(expected));
}
@@ -210,9 +215,13 @@ async fn find_archived_locates_rollout_file_by_id() {
let id = Uuid::new_v4();
let expected = write_minimal_rollout_with_id_in_subdir(home.path(), "archived_sessions", id);
let found = find_archived_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
let found = find_archived_thread_path_by_id_str(
home.path(),
&id.to_string(),
/*state_db_ctx*/ None,
)
.await
.unwrap();
assert_eq!(found, Some(expected));
}

View File

@@ -1173,6 +1173,7 @@ async fn find_thread_path_by_id_str_in_subdir(
codex_home: &Path,
subdir: &str,
id_str: &str,
state_db_ctx: Option<crate::state_db::StateDbHandle>,
) -> io::Result<Option<PathBuf>> {
// Validate UUID format early.
if Uuid::parse_str(id_str).is_err() {
@@ -1187,7 +1188,10 @@ async fn find_thread_path_by_id_str_in_subdir(
_ => None,
};
let thread_id = ThreadId::from_string(id_str).ok();
let state_db_ctx = state_db::open_if_present(codex_home, "").await;
let state_db_ctx = match state_db_ctx {
Some(state_db_ctx) => Some(state_db_ctx),
None => state_db::open_if_present(codex_home, "").await,
};
if let Some(state_db_ctx) = state_db_ctx.as_deref()
&& let Some(thread_id) = thread_id
&& let Some(db_path) = state_db::find_rollout_path_by_id(
@@ -1247,21 +1251,25 @@ async fn find_thread_path_by_id_str_in_subdir(
}
/// Locate a recorded thread rollout file by its UUID string using the existing
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
/// or the id is invalid.
/// paginated listing implementation. Uses `state_db_ctx` when available to avoid reopening
/// SQLite. Returns `Ok(Some(path))` if found, `Ok(None)` if not present or the id is invalid.
pub async fn find_thread_path_by_id_str(
codex_home: &Path,
id_str: &str,
state_db_ctx: Option<crate::state_db::StateDbHandle>,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(codex_home, SESSIONS_SUBDIR, id_str).await
find_thread_path_by_id_str_in_subdir(codex_home, SESSIONS_SUBDIR, id_str, state_db_ctx).await
}
/// Locate an archived thread rollout file by its UUID string.
/// Locate an archived thread rollout file by its UUID string. Uses `state_db_ctx` when available
/// to avoid reopening SQLite.
pub async fn find_archived_thread_path_by_id_str(
codex_home: &Path,
id_str: &str,
state_db_ctx: Option<crate::state_db::StateDbHandle>,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str, state_db_ctx)
.await
}
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.

View File

@@ -139,7 +139,12 @@ pub async fn find_thread_path_by_name_str(
let Some(thread_id) = find_thread_id_by_name(codex_home, name).await? else {
return Ok(None);
};
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await
super::list::find_thread_path_by_id_str(
codex_home,
&thread_id.to_string(),
/*state_db_ctx*/ None,
)
.await
}
fn session_index_path(codex_home: &Path) -> PathBuf {

View File

@@ -25,6 +25,9 @@ use uuid::Uuid;
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
/// Initialize the state runtime for thread state persistence and backfill checks.
///
/// Callers that keep a shared runtime alive for repeated state reads should use
/// this helper instead of reopening SQLite on each lookup.
pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
let config = RolloutConfig::from_view(config);
let runtime = match codex_state::StateRuntime::init(

View File

@@ -231,7 +231,7 @@ async fn find_thread_path_falls_back_when_db_path_is_stale() {
));
insert_state_db_thread(home, thread_id, stale_db_path.as_path(), false).await;
let found = find_thread_path_by_id_str(home, &uuid.to_string())
let found = find_thread_path_by_id_str(home, &uuid.to_string(), /*state_db_ctx*/ None)
.await
.expect("lookup should succeed");
assert_eq!(found, Some(fs_rollout_path.clone()));
@@ -257,7 +257,7 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
.await
.expect("backfill should be complete");
let found = find_thread_path_by_id_str(home, &uuid.to_string())
let found = find_thread_path_by_id_str(home, &uuid.to_string(), /*state_db_ctx*/ None)
.await
.expect("lookup should succeed");
assert_eq!(found, Some(fs_rollout_path.clone()));

View File

@@ -721,7 +721,8 @@ async fn run_ratatui_app(
if let Some(id_str) = cli.fork_session_id.as_deref() {
let is_uuid = Uuid::parse_str(id_str).is_ok();
let path = if is_uuid {
find_thread_path_by_id_str(&config.codex_home, id_str).await?
find_thread_path_by_id_str(&config.codex_home, id_str, /*state_db_ctx*/ None)
.await?
} else {
find_thread_path_by_name_str(&config.codex_home, id_str).await?
};
@@ -814,7 +815,7 @@ async fn run_ratatui_app(
} else if let Some(id_str) = cli.resume_session_id.as_deref() {
let is_uuid = Uuid::parse_str(id_str).is_ok();
let path = if is_uuid {
find_thread_path_by_id_str(&config.codex_home, id_str).await?
find_thread_path_by_id_str(&config.codex_home, id_str, /*state_db_ctx*/ None).await?
} else {
find_thread_path_by_name_str(&config.codex_home, id_str).await?
};