mirror of
https://github.com/openai/codex.git
synced 2026-04-05 06:51:44 +03:00
Compare commits
15 Commits
pr16496
...
codex/cach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4eb0d3272d | ||
|
|
12839cef8a | ||
|
|
1c890935ee | ||
|
|
b7c41e9c44 | ||
|
|
19f07d04e7 | ||
|
|
5b56e14ec2 | ||
|
|
8dd09ce08d | ||
|
|
930dd42d9a | ||
|
|
25067165d5 | ||
|
|
1a879698d6 | ||
|
|
9773cca5a2 | ||
|
|
41a82c54db | ||
|
|
504afb1dcd | ||
|
|
74ec7c1be5 | ||
|
|
081b1394b4 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1429,6 +1429,7 @@ version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"app_test_support",
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
|
||||
@@ -21,6 +21,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
arc-swap = "1.8.2"
|
||||
async-trait = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
axum = { workspace = true, default-features = false, features = [
|
||||
|
||||
@@ -233,8 +233,8 @@ use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::state_db;
|
||||
use codex_core::state_db::StateDbHandle;
|
||||
use codex_core::state_db::get_state_db;
|
||||
use codex_core::state_db::reconcile_rollout;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
|
||||
@@ -324,6 +324,7 @@ use crate::filters::source_kind_matches;
|
||||
use crate::thread_state::ThreadListenerCommand;
|
||||
use crate::thread_state::ThreadState;
|
||||
use crate::thread_state::ThreadStateManager;
|
||||
use arc_swap::ArcSwapOption;
|
||||
|
||||
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
|
||||
const THREAD_LIST_MAX_LIMIT: usize = 100;
|
||||
@@ -384,6 +385,7 @@ pub(crate) struct CodexMessageProcessor {
|
||||
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
|
||||
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
|
||||
background_tasks: TaskTracker,
|
||||
state_db: ArcSwapOption<StateRuntime>,
|
||||
feedback: CodexFeedback,
|
||||
log_db: Option<LogDbLayer>,
|
||||
}
|
||||
@@ -430,9 +432,23 @@ pub(crate) struct CodexMessageProcessorArgs {
|
||||
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
async fn shared_state_db(&self) -> Option<StateDbHandle> {
|
||||
if let Some(state_db) = self.state_db.load_full() {
|
||||
return Some(state_db);
|
||||
}
|
||||
|
||||
let recovered = state_db::init(self.config.as_ref()).await;
|
||||
if let Some(recovered) = recovered.as_ref() {
|
||||
self.state_db.store(Some(recovered.clone()));
|
||||
}
|
||||
|
||||
recovered
|
||||
}
|
||||
|
||||
pub(crate) fn clear_plugin_related_caches(&self) {
|
||||
self.thread_manager.plugins_manager().clear_cache();
|
||||
self.thread_manager.skills_manager().clear_cache();
|
||||
@@ -482,6 +498,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
Ok((thread_id, thread))
|
||||
}
|
||||
|
||||
pub fn new(args: CodexMessageProcessorArgs) -> Self {
|
||||
let CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
@@ -494,6 +511,7 @@ impl CodexMessageProcessor {
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
log_db,
|
||||
state_db,
|
||||
} = args;
|
||||
Self {
|
||||
auth_manager,
|
||||
@@ -512,6 +530,7 @@ impl CodexMessageProcessor {
|
||||
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
|
||||
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
|
||||
background_tasks: TaskTracker::new(),
|
||||
state_db: ArcSwapOption::new(state_db),
|
||||
feedback,
|
||||
log_db,
|
||||
}
|
||||
@@ -2236,29 +2255,34 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {thread_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {thread_id}: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let state_db = self.shared_state_db().await;
|
||||
let rollout_path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {thread_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {thread_id}: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let thread_id_str = thread_id.to_string();
|
||||
match self.archive_thread_common(thread_id, &rollout_path).await {
|
||||
@@ -2386,20 +2410,25 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
let thread_exists =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
|
||||
{
|
||||
Ok(Some(_)) => true,
|
||||
Ok(None) => false,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let state_db = self.shared_state_db().await;
|
||||
let thread_exists = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(_)) => true,
|
||||
Ok(None) => false,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if !thread_exists {
|
||||
self.send_invalid_request_error(request_id, format!("thread not found: {thread_id}"))
|
||||
@@ -2472,7 +2501,7 @@ impl CodexMessageProcessor {
|
||||
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
|
||||
let mut state_db_ctx = loaded_thread.as_ref().and_then(|thread| thread.state_db());
|
||||
if state_db_ctx.is_none() {
|
||||
state_db_ctx = get_state_db(&self.config).await;
|
||||
state_db_ctx = self.shared_state_db().await;
|
||||
}
|
||||
let Some(state_db_ctx) = state_db_ctx else {
|
||||
self.send_internal_error(
|
||||
@@ -2675,33 +2704,37 @@ impl CodexMessageProcessor {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
|
||||
.await
|
||||
let rollout_path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
Some(state_db_ctx.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
Some(state_db_ctx.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => path,
|
||||
Ok(None) => {
|
||||
return Err(invalid_request(format!("thread not found: {thread_uuid}")));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(internal_error(format!(
|
||||
"failed to locate archived thread id {thread_uuid}: {err}"
|
||||
)));
|
||||
}
|
||||
},
|
||||
Ok(None) => {
|
||||
return Err(invalid_request(format!("thread not found: {thread_uuid}")));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(internal_error(format!(
|
||||
"failed to locate thread id {thread_uuid}: {err}"
|
||||
"failed to locate archived thread id {thread_uuid}: {err}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(internal_error(format!(
|
||||
"failed to locate thread id {thread_uuid}: {err}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
reconcile_rollout(
|
||||
Some(state_db_ctx),
|
||||
@@ -2744,9 +2777,11 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let state_db = self.shared_state_db().await;
|
||||
let archived_path = match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -2773,7 +2808,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
let rollout_path_display = archived_path.display().to_string();
|
||||
let fallback_provider = self.config.model_provider_id.clone();
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
let state_db_ctx = self.shared_state_db().await;
|
||||
let archived_folder = self
|
||||
.config
|
||||
.codex_home
|
||||
@@ -3253,34 +3288,43 @@ impl CodexMessageProcessor {
|
||||
|
||||
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
|
||||
let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db());
|
||||
let state_db = if loaded_thread_state_db.is_some() {
|
||||
None
|
||||
} else {
|
||||
self.shared_state_db().await
|
||||
};
|
||||
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
|
||||
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await
|
||||
} else {
|
||||
read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await
|
||||
read_summary_from_state_db_by_thread_id(state_db.as_ref(), thread_uuid).await
|
||||
};
|
||||
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
|
||||
if rollout_path.is_none() || include_turns {
|
||||
rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => Some(path),
|
||||
Ok(None) => {
|
||||
if include_turns {
|
||||
None
|
||||
} else {
|
||||
rollout_path
|
||||
}
|
||||
let state_db_ctx = loaded_thread_state_db.clone().or_else(|| state_db.clone());
|
||||
rollout_path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
state_db_ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => Some(path),
|
||||
Ok(None) => {
|
||||
if include_turns {
|
||||
None
|
||||
} else {
|
||||
rollout_path
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if include_turns && rollout_path.is_none() && db_summary.is_some() {
|
||||
@@ -3651,7 +3695,7 @@ impl CodexMessageProcessor {
|
||||
let InitialHistory::Resumed(resumed_history) = thread_history else {
|
||||
return None;
|
||||
};
|
||||
let state_db_ctx = get_state_db(&self.config).await?;
|
||||
let state_db_ctx = self.shared_state_db().await?;
|
||||
let persisted_metadata = state_db_ctx
|
||||
.get_thread(resumed_history.conversation_id)
|
||||
.await
|
||||
@@ -3680,6 +3724,7 @@ impl CodexMessageProcessor {
|
||||
return true;
|
||||
}
|
||||
|
||||
let shared_state_db = self.shared_state_db().await;
|
||||
let rollout_path = if let Some(path) = existing_thread.rollout_path() {
|
||||
if path.exists() {
|
||||
path
|
||||
@@ -3687,6 +3732,9 @@ impl CodexMessageProcessor {
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
existing_thread
|
||||
.state_db()
|
||||
.or_else(|| shared_state_db.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -3713,6 +3761,9 @@ impl CodexMessageProcessor {
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
existing_thread
|
||||
.state_db()
|
||||
.or_else(|| shared_state_db.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -3772,12 +3823,13 @@ impl CodexMessageProcessor {
|
||||
mismatch_details.join("; ")
|
||||
);
|
||||
}
|
||||
let state_db_ctx = existing_thread.state_db();
|
||||
let thread_summary = match load_thread_summary_for_rollout(
|
||||
&self.config,
|
||||
existing_thread_id,
|
||||
rollout_path.as_path(),
|
||||
config_snapshot.model_provider_id.as_str(),
|
||||
/*persisted_metadata*/ None,
|
||||
state_db_ctx.as_ref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -3868,9 +3920,11 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let state_db = self.shared_state_db().await;
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -3918,12 +3972,13 @@ impl CodexMessageProcessor {
|
||||
) -> std::result::Result<Thread, String> {
|
||||
let thread = match thread_history {
|
||||
InitialHistory::Resumed(resumed) => {
|
||||
let state_db_ctx = thread.state_db();
|
||||
load_thread_summary_for_rollout(
|
||||
&self.config,
|
||||
resumed.conversation_id,
|
||||
resumed.rollout_path.as_path(),
|
||||
fallback_provider,
|
||||
persisted_resume_metadata,
|
||||
state_db_ctx.as_ref(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -3999,9 +4054,11 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let state_db = self.shared_state_db().await;
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -4025,9 +4082,13 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let history_cwd =
|
||||
read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path())
|
||||
.await;
|
||||
let state_db = self.shared_state_db().await;
|
||||
let history_cwd = read_history_cwd_from_state_db(
|
||||
state_db.as_ref(),
|
||||
source_thread_id,
|
||||
rollout_path.as_path(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Persist Windows sandbox mode.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
@@ -4249,9 +4310,10 @@ impl CodexMessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
params: GetConversationSummaryParams,
|
||||
) {
|
||||
let state_db = self.shared_state_db().await;
|
||||
if let GetConversationSummaryParams::ThreadId { conversation_id } = ¶ms
|
||||
&& let Some(summary) =
|
||||
read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await
|
||||
read_summary_from_state_db_by_thread_id(state_db.as_ref(), *conversation_id).await
|
||||
{
|
||||
let response = GetConversationSummaryResponse { summary };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
@@ -4267,9 +4329,11 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
GetConversationSummaryParams::ThreadId { conversation_id } => {
|
||||
match codex_core::find_thread_path_by_id_str(
|
||||
let state_db = self.shared_state_db().await;
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&conversation_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -4352,7 +4416,7 @@ impl CodexMessageProcessor {
|
||||
let fallback_provider = self.config.model_provider_id.clone();
|
||||
let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds);
|
||||
let allowed_sources = allowed_sources_vec.as_slice();
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
let state_db_ctx = self.shared_state_db().await;
|
||||
|
||||
while remaining > 0 {
|
||||
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
|
||||
@@ -5205,7 +5269,7 @@ impl CodexMessageProcessor {
|
||||
self.finalize_thread_teardown(thread_id).await;
|
||||
|
||||
if state_db_ctx.is_none() {
|
||||
state_db_ctx = get_state_db(&self.config).await;
|
||||
state_db_ctx = self.shared_state_db().await;
|
||||
}
|
||||
|
||||
// Move the rollout file to archived.
|
||||
@@ -6556,18 +6620,27 @@ impl CodexMessageProcessor {
|
||||
let rollout_path = if let Some(path) = parent_thread.rollout_path() {
|
||||
path
|
||||
} else {
|
||||
find_thread_path_by_id_str(&self.config.codex_home, &parent_thread_id.to_string())
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
|
||||
data: None,
|
||||
})?
|
||||
.ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {parent_thread_id}"),
|
||||
data: None,
|
||||
})?
|
||||
let state_db = if let Some(state_db) = parent_thread.state_db() {
|
||||
Some(state_db)
|
||||
} else {
|
||||
self.shared_state_db().await
|
||||
};
|
||||
find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&parent_thread_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
|
||||
data: None,
|
||||
})?
|
||||
.ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {parent_thread_id}"),
|
||||
data: None,
|
||||
})?
|
||||
};
|
||||
|
||||
let mut config = self.config.as_ref().clone();
|
||||
@@ -7160,11 +7233,10 @@ impl CodexMessageProcessor {
|
||||
if let Some(log_db) = self.log_db.as_ref() {
|
||||
log_db.flush().await;
|
||||
}
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
match (state_db_ctx.as_ref(), conversation_id) {
|
||||
(Some(state_db_ctx), Some(conversation_id)) => {
|
||||
match (self.shared_state_db().await, conversation_id) {
|
||||
(Some(state_db), Some(conversation_id)) => {
|
||||
let thread_id_text = conversation_id.to_string();
|
||||
match state_db_ctx.query_feedback_logs(&thread_id_text).await {
|
||||
match state_db.query_feedback_logs(&thread_id_text).await {
|
||||
Ok(logs) if logs.is_empty() => None,
|
||||
Ok(logs) => Some(logs),
|
||||
Err(err) => {
|
||||
@@ -7313,7 +7385,17 @@ impl CodexMessageProcessor {
|
||||
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
|
||||
match self.thread_manager.get_thread(conversation_id).await {
|
||||
Ok(conv) => conv.rollout_path(),
|
||||
Err(_) => None,
|
||||
Err(_) => {
|
||||
let state_db = self.shared_state_db().await;
|
||||
find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&conversation_id.to_string(),
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7962,12 +8044,12 @@ async fn derive_config_for_cwd(
|
||||
}
|
||||
|
||||
async fn read_history_cwd_from_state_db(
|
||||
config: &Config,
|
||||
state_db_ctx: Option<&StateDbHandle>,
|
||||
thread_id: Option<ThreadId>,
|
||||
rollout_path: &Path,
|
||||
) -> Option<PathBuf> {
|
||||
if let Some(state_db_ctx) = get_state_db(config).await
|
||||
&& let Some(thread_id) = thread_id
|
||||
if let Some(thread_id) = thread_id
|
||||
&& let Some(state_db_ctx) = state_db_ctx
|
||||
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
|
||||
{
|
||||
return Some(metadata.cwd);
|
||||
@@ -7984,11 +8066,10 @@ async fn read_history_cwd_from_state_db(
|
||||
}
|
||||
|
||||
async fn read_summary_from_state_db_by_thread_id(
|
||||
config: &Config,
|
||||
state_db_ctx: Option<&StateDbHandle>,
|
||||
thread_id: ThreadId,
|
||||
) -> Option<ConversationSummary> {
|
||||
let state_db_ctx = get_state_db(config).await;
|
||||
read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await
|
||||
read_summary_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await
|
||||
}
|
||||
|
||||
async fn read_summary_from_state_db_context_by_thread_id(
|
||||
@@ -8278,11 +8359,11 @@ fn map_git_info(git_info: &CoreGitInfo) -> ConversationGitInfo {
|
||||
}
|
||||
|
||||
async fn load_thread_summary_for_rollout(
|
||||
config: &Config,
|
||||
thread_id: ThreadId,
|
||||
rollout_path: &Path,
|
||||
fallback_provider: &str,
|
||||
persisted_metadata: Option<&ThreadMetadata>,
|
||||
state_db_ctx: Option<&StateDbHandle>,
|
||||
) -> std::result::Result<Thread, String> {
|
||||
let mut thread = read_summary_from_rollout(rollout_path, fallback_provider)
|
||||
.await
|
||||
@@ -8298,7 +8379,9 @@ async fn load_thread_summary_for_rollout(
|
||||
&mut thread,
|
||||
summary_to_thread(summary_from_thread_metadata(persisted_metadata)),
|
||||
);
|
||||
} else if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await {
|
||||
} else if let Some(summary) =
|
||||
read_summary_from_state_db_by_thread_id(state_db_ctx, thread_id).await
|
||||
{
|
||||
merge_mutable_thread_metadata(&mut thread, summary_to_thread(summary));
|
||||
}
|
||||
Ok(thread)
|
||||
@@ -8452,12 +8535,21 @@ mod tests {
|
||||
use anyhow::Result;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
@@ -8981,6 +9073,62 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shared_state_db_caches_successful_retry() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let config = Arc::new(
|
||||
ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await?,
|
||||
);
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
SessionSource::Cli,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel::<OutgoingEnvelope>(1);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
|
||||
let processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
outgoing,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config,
|
||||
cli_overrides: Arc::new(RwLock::new(Vec::new())),
|
||||
runtime_feature_enablement: Arc::new(RwLock::new(BTreeMap::new())),
|
||||
cloud_requirements: Arc::new(RwLock::new(CloudRequirementsLoader::default())),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
state_db: None,
|
||||
});
|
||||
|
||||
let first = processor
|
||||
.shared_state_db()
|
||||
.await
|
||||
.expect("state db should initialize on retry");
|
||||
let second = processor
|
||||
.shared_state_db()
|
||||
.await
|
||||
.expect("cached state db should be reused");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
assert!(processor.state_db.load_full().is_some());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn summary_from_state_db_metadata_preserves_agent_nickname() -> Result<()> {
|
||||
let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;
|
||||
|
||||
@@ -77,6 +77,8 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::state_db;
|
||||
use codex_core::state_db::StateDbHandle;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -325,7 +327,14 @@ impl InProcessClientHandle {
|
||||
/// the runtime is shut down and an `InvalidData` error is returned.
|
||||
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let initialize = args.initialize.clone();
|
||||
let client = start_uninitialized(args);
|
||||
let state_db = state_db::init(args.config.as_ref()).await;
|
||||
if state_db.is_none() {
|
||||
warn!(
|
||||
"sqlite state db unavailable at startup for {}; continuing without sqlite-backed app-server state",
|
||||
args.config.sqlite_home.display()
|
||||
);
|
||||
}
|
||||
let client = start_uninitialized(args, state_db);
|
||||
|
||||
let initialize_response = client
|
||||
.request(ClientRequest::Initialize {
|
||||
@@ -345,7 +354,10 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
fn start_uninitialized(
|
||||
args: InProcessStartArgs,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> InProcessClientHandle {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
@@ -388,6 +400,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
feedback: args.feedback,
|
||||
log_db: None,
|
||||
state_db,
|
||||
config_warnings: args.config_warnings,
|
||||
session_source: args.session_source,
|
||||
enable_codex_api_key_env: args.enable_codex_api_key_env,
|
||||
@@ -693,13 +706,17 @@ mod tests {
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
Ok(config) => config,
|
||||
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
||||
.expect("default config should load"),
|
||||
}
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await
|
||||
.expect("test config should build");
|
||||
std::mem::forget(codex_home);
|
||||
config
|
||||
}
|
||||
|
||||
async fn start_test_client_with_capacity(
|
||||
|
||||
@@ -8,6 +8,7 @@ use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::ConfigLayerStackOrdering;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::state_db::init;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
@@ -489,6 +490,13 @@ pub async fn run_main_with_transport(
|
||||
}
|
||||
|
||||
let feedback = CodexFeedback::new();
|
||||
let state_db = init(&config).await;
|
||||
if state_db.is_none() {
|
||||
warn!(
|
||||
"sqlite state db unavailable at startup for {}; continuing without sqlite-backed app-server state",
|
||||
config.sqlite_home.display()
|
||||
);
|
||||
}
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
@@ -522,13 +530,7 @@ pub async fn run_main_with_transport(
|
||||
|
||||
let feedback_layer = feedback.logger_layer();
|
||||
let feedback_metadata_layer = feedback.metadata_layer();
|
||||
let log_db = codex_state::StateRuntime::init(
|
||||
config.sqlite_home.clone(),
|
||||
config.model_provider_id.clone(),
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
.map(log_db::start);
|
||||
let log_db = state_db.clone().map(log_db::start);
|
||||
let log_db_layer = log_db
|
||||
.clone()
|
||||
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
|
||||
@@ -618,6 +620,7 @@ pub async fn run_main_with_transport(
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
feedback: feedback.clone(),
|
||||
log_db,
|
||||
state_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env: false,
|
||||
|
||||
@@ -66,6 +66,7 @@ use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::default_client::set_default_client_residency_requirement;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_core::state_db::StateDbHandle;
|
||||
use codex_features::Feature;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::auth::ExternalAuthRefreshContext;
|
||||
@@ -181,6 +182,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
pub(crate) enable_codex_api_key_env: bool,
|
||||
@@ -199,6 +201,7 @@ impl MessageProcessor {
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
log_db,
|
||||
state_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env,
|
||||
@@ -242,6 +245,7 @@ impl MessageProcessor {
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
feedback,
|
||||
log_db,
|
||||
state_db,
|
||||
});
|
||||
// Keep plugin startup warmups aligned at app-server startup.
|
||||
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
|
||||
|
||||
@@ -24,6 +24,7 @@ use codex_core::config::Config;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::state_db::init;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
@@ -117,7 +118,7 @@ impl TracingHarness {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
let config = Arc::new(build_test_config(codex_home.path(), &server.uri()).await?);
|
||||
let (processor, outgoing_rx) = build_test_processor(config);
|
||||
let (processor, outgoing_rx) = build_test_processor(config).await;
|
||||
let tracing = init_test_tracing();
|
||||
tracing.exporter.reset();
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
@@ -224,7 +225,7 @@ async fn build_test_config(codex_home: &Path, server_uri: &str) -> Result<Config
|
||||
.await?)
|
||||
}
|
||||
|
||||
fn build_test_processor(
|
||||
async fn build_test_processor(
|
||||
config: Arc<Config>,
|
||||
) -> (
|
||||
MessageProcessor,
|
||||
@@ -232,6 +233,9 @@ fn build_test_processor(
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let state_db = init(config.as_ref())
|
||||
.await
|
||||
.expect("state db should initialize in tracing tests");
|
||||
let processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
@@ -241,6 +245,7 @@ fn build_test_processor(
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
state_db: Some(state_db),
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::VSCode,
|
||||
enable_codex_api_key_env: false,
|
||||
|
||||
@@ -37,6 +37,7 @@ pub use responses::create_shell_command_sse_response;
|
||||
pub use rollout::create_fake_rollout;
|
||||
pub use rollout::create_fake_rollout_with_source;
|
||||
pub use rollout::create_fake_rollout_with_text_elements;
|
||||
pub use rollout::fake_rollout_cwd;
|
||||
pub use rollout::rollout_path;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
|
||||
@@ -23,6 +23,10 @@ pub fn rollout_path(codex_home: &Path, filename_ts: &str, thread_id: &str) -> Pa
|
||||
.join(format!("rollout-{filename_ts}-{thread_id}.jsonl"))
|
||||
}
|
||||
|
||||
pub fn fake_rollout_cwd() -> PathBuf {
|
||||
std::fs::canonicalize(Path::new("/")).unwrap_or_else(|_| PathBuf::from("/"))
|
||||
}
|
||||
|
||||
/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`.
|
||||
///
|
||||
/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::fake_rollout_cwd;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ConversationSummary;
|
||||
@@ -29,7 +30,7 @@ fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSum
|
||||
timestamp: Some(META_RFC3339.to_string()),
|
||||
updated_at: Some(META_RFC3339.to_string()),
|
||||
model_provider: MODEL_PROVIDER.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
cwd: fake_rollout_cwd(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
git_info: None,
|
||||
@@ -91,7 +92,8 @@ async fn get_conversation_summary_by_relative_rollout_path_resolves_from_codex_h
|
||||
let thread_id = ThreadId::from_string(&conversation_id)?;
|
||||
let rollout_path = rollout_path(codex_home.path(), FILENAME_TS, &conversation_id);
|
||||
let relative_path = rollout_path.strip_prefix(codex_home.path())?.to_path_buf();
|
||||
let expected = expected_summary(thread_id, std::fs::canonicalize(rollout_path)?);
|
||||
let mut expected = expected_summary(thread_id, std::fs::canonicalize(rollout_path)?);
|
||||
expected.cwd = PathBuf::from("/");
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
@@ -58,7 +58,7 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
rollout_path.display()
|
||||
);
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
|
||||
.await?
|
||||
.is_none(),
|
||||
"thread id should not be discoverable before rollout materialization"
|
||||
@@ -113,9 +113,10 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
rollout_path.display()
|
||||
);
|
||||
|
||||
let discovered_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist after materialization");
|
||||
let discovered_path =
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist after materialization");
|
||||
assert_paths_match_on_disk(&discovered_path, &rollout_path)?;
|
||||
|
||||
let archive_id = mcp
|
||||
|
||||
@@ -4,6 +4,7 @@ use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_fake_rollout_with_source;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::fake_rollout_cwd;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use chrono::DateTime;
|
||||
@@ -37,7 +38,6 @@ use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
@@ -358,12 +358,13 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(data1.len(), 2);
|
||||
let expected_cwd = fake_rollout_cwd();
|
||||
for thread in &data1 {
|
||||
assert_eq!(thread.preview, "Hello");
|
||||
assert_eq!(thread.model_provider, "mock_provider");
|
||||
assert!(thread.created_at > 0);
|
||||
assert_eq!(thread.updated_at, thread.created_at);
|
||||
assert_eq!(thread.cwd, PathBuf::from("/"));
|
||||
assert_eq!(thread.cwd, expected_cwd);
|
||||
assert_eq!(thread.cli_version, "0.0.0");
|
||||
assert_eq!(thread.source, SessionSource::Cli);
|
||||
assert_eq!(thread.git_info, None);
|
||||
@@ -390,7 +391,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
||||
assert_eq!(thread.model_provider, "mock_provider");
|
||||
assert!(thread.created_at > 0);
|
||||
assert_eq!(thread.updated_at, thread.created_at);
|
||||
assert_eq!(thread.cwd, PathBuf::from("/"));
|
||||
assert_eq!(thread.cwd, expected_cwd);
|
||||
assert_eq!(thread.cli_version, "0.0.0");
|
||||
assert_eq!(thread.source, SessionSource::Cli);
|
||||
assert_eq!(thread.git_info, None);
|
||||
@@ -446,7 +447,7 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
|
||||
let expected_ts = chrono::DateTime::parse_from_rfc3339("2025-01-02T11:00:00Z")?.timestamp();
|
||||
assert_eq!(thread.created_at, expected_ts);
|
||||
assert_eq!(thread.updated_at, expected_ts);
|
||||
assert_eq!(thread.cwd, PathBuf::from("/"));
|
||||
assert_eq!(thread.cwd, fake_rollout_cwd());
|
||||
assert_eq!(thread.cli_version, "0.0.0");
|
||||
assert_eq!(thread.source, SessionSource::Cli);
|
||||
assert_eq!(thread.git_info, None);
|
||||
@@ -484,6 +485,20 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
|
||||
)?;
|
||||
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
let expected_cwd = list_threads(
|
||||
&mut mcp,
|
||||
None,
|
||||
Some(10),
|
||||
Some(vec!["mock_provider".to_string()]),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.data
|
||||
.into_iter()
|
||||
.find(|summary| summary.id == filtered_id)
|
||||
.map(|summary| summary.cwd)
|
||||
.ok_or_else(|| anyhow::anyhow!("filtered thread missing from unfiltered list"))?;
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor: None,
|
||||
@@ -492,7 +507,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: Some(target_cwd.to_string_lossy().into_owned()),
|
||||
cwd: Some(expected_cwd.to_string_lossy().into_owned()),
|
||||
search_term: None,
|
||||
})
|
||||
.await?;
|
||||
@@ -509,7 +524,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].id, filtered_id);
|
||||
assert_ne!(data[0].id, unfiltered_id);
|
||||
assert_eq!(data[0].cwd, target_cwd);
|
||||
assert_eq!(data[0].cwd, expected_cwd);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -591,6 +606,71 @@ sqlite = true
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_keeps_empty_search_results_when_db_page_is_empty() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
suppress_unstable_features_warning = true
|
||||
|
||||
[features]
|
||||
sqlite = true
|
||||
"#,
|
||||
)?;
|
||||
|
||||
create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-02T10-00-00",
|
||||
"2025-01-02T10:00:00Z",
|
||||
"match: needle",
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-02T11-00-00",
|
||||
"2025-01-02T11:00:00Z",
|
||||
"no hit here",
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let state_db =
|
||||
codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into())
|
||||
.await?;
|
||||
state_db.mark_backfill_complete(None).await?;
|
||||
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_key: None,
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
search_term: Some("absent".to_string()),
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadListResponse {
|
||||
data, next_cursor, ..
|
||||
} = to_response::<ThreadListResponse>(resp)?;
|
||||
|
||||
assert_eq!(next_cursor, None);
|
||||
assert!(data.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -996,7 +1076,7 @@ async fn thread_list_includes_git_info() -> Result<()> {
|
||||
};
|
||||
assert_eq!(thread.git_info, Some(expected_git));
|
||||
assert_eq!(thread.source, SessionSource::Cli);
|
||||
assert_eq!(thread.cwd, PathBuf::from("/"));
|
||||
assert_eq!(thread.cwd, fake_rollout_cwd());
|
||||
assert_eq!(thread.cli_version, "0.0.0");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::fake_rollout_cwd;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
@@ -30,7 +31,6 @@ use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -81,7 +81,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> {
|
||||
assert_eq!(thread.model_provider, "mock_provider");
|
||||
assert!(!thread.ephemeral, "stored rollouts should not be ephemeral");
|
||||
assert!(thread.path.as_ref().expect("thread path").is_absolute());
|
||||
assert_eq!(thread.cwd, PathBuf::from("/"));
|
||||
assert_eq!(thread.cwd, fake_rollout_cwd());
|
||||
assert_eq!(thread.cli_version, "0.0.0");
|
||||
assert_eq!(thread.source, SessionSource::Cli);
|
||||
assert_eq!(thread.git_info, None);
|
||||
|
||||
@@ -75,9 +75,10 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
)
|
||||
.await??;
|
||||
|
||||
let found_rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist");
|
||||
let found_rollout_path =
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread.id, /*state_db_ctx*/ None)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist");
|
||||
assert_paths_match_on_disk(&found_rollout_path, &rollout_path)?;
|
||||
|
||||
let archive_id = mcp
|
||||
@@ -92,9 +93,13 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let archived_path = find_archived_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected archived rollout path for thread id to exist");
|
||||
let archived_path = find_archived_thread_path_by_id_str(
|
||||
codex_home.path(),
|
||||
&thread.id,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await?
|
||||
.expect("expected archived rollout path for thread id to exist");
|
||||
let archived_path_display = archived_path.display();
|
||||
assert!(
|
||||
archived_path.exists(),
|
||||
|
||||
@@ -205,6 +205,9 @@ impl AgentControl {
|
||||
.or(find_thread_path_by_id_str(
|
||||
config.codex_home.as_path(),
|
||||
&parent_thread_id.to_string(),
|
||||
parent_thread
|
||||
.as_ref()
|
||||
.and_then(|parent_thread| parent_thread.state_db()),
|
||||
)
|
||||
.await?)
|
||||
.ok_or_else(|| {
|
||||
@@ -295,10 +298,16 @@ impl AgentControl {
|
||||
config: crate::config::Config,
|
||||
thread_id: ThreadId,
|
||||
session_source: SessionSource,
|
||||
state_db_ctx: Option<state_db::StateDbHandle>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let root_depth = thread_spawn_depth(&session_source).unwrap_or(0);
|
||||
let resumed_thread_id = self
|
||||
.resume_single_agent_from_rollout(config.clone(), thread_id, session_source)
|
||||
.resume_single_agent_from_rollout(
|
||||
config.clone(),
|
||||
thread_id,
|
||||
session_source,
|
||||
state_db_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let state = self.upgrade()?;
|
||||
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
|
||||
@@ -344,6 +353,7 @@ impl AgentControl {
|
||||
config.clone(),
|
||||
child_thread_id,
|
||||
child_session_source,
|
||||
Some(state_db_ctx.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -368,6 +378,7 @@ impl AgentControl {
|
||||
mut config: crate::config::Config,
|
||||
thread_id: ThreadId,
|
||||
session_source: SessionSource,
|
||||
state_db_ctx: Option<state_db::StateDbHandle>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = &session_source
|
||||
&& *depth >= config.agent_max_depth
|
||||
@@ -377,6 +388,10 @@ impl AgentControl {
|
||||
}
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let state_db_ctx = match state_db_ctx {
|
||||
Some(state_db_ctx) => Some(state_db_ctx),
|
||||
None => state_db::get_state_db(&config).await,
|
||||
};
|
||||
let (session_source, agent_metadata) = match session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
@@ -386,7 +401,7 @@ impl AgentControl {
|
||||
agent_nickname: _,
|
||||
}) => {
|
||||
let (resumed_agent_nickname, resumed_agent_role) =
|
||||
if let Some(state_db_ctx) = state_db::get_state_db(&config).await {
|
||||
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
|
||||
match state_db_ctx.get_thread(thread_id).await {
|
||||
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
|
||||
Ok(None) | Err(_) => (None, None),
|
||||
@@ -413,18 +428,22 @@ impl AgentControl {
|
||||
let inherited_exec_policy = self
|
||||
.inherited_exec_policy_for_source(&state, Some(&session_source), &config)
|
||||
.await;
|
||||
let rollout_path =
|
||||
match find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
|
||||
.await?
|
||||
{
|
||||
Some(rollout_path) => rollout_path,
|
||||
None => find_archived_thread_path_by_id_str(
|
||||
config.codex_home.as_path(),
|
||||
&thread_id.to_string(),
|
||||
)
|
||||
.await?
|
||||
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?,
|
||||
};
|
||||
let rollout_path = match find_thread_path_by_id_str(
|
||||
config.codex_home.as_path(),
|
||||
&thread_id.to_string(),
|
||||
state_db_ctx.clone(),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(rollout_path) => rollout_path,
|
||||
None => find_archived_thread_path_by_id_str(
|
||||
config.codex_home.as_path(),
|
||||
&thread_id.to_string(),
|
||||
state_db_ctx,
|
||||
)
|
||||
.await?
|
||||
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?,
|
||||
};
|
||||
|
||||
let resumed_thread = state
|
||||
.resume_thread_from_rollout_with_source(
|
||||
|
||||
@@ -298,7 +298,12 @@ async fn resume_agent_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.resume_agent_from_rollout(config, ThreadId::new(), SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config,
|
||||
ThreadId::new(),
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -942,7 +947,12 @@ async fn resume_agent_respects_max_threads_limit() {
|
||||
.expect("spawn_agent should succeed for active slot");
|
||||
|
||||
let err = control
|
||||
.resume_agent_from_rollout(config, resumable_id, SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config,
|
||||
resumable_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume should respect max threads");
|
||||
let CodexErr::AgentLimitReached {
|
||||
@@ -975,7 +985,12 @@ async fn resume_agent_releases_slot_after_resume_failure() {
|
||||
let control = manager.agent_control();
|
||||
|
||||
let _ = control
|
||||
.resume_agent_from_rollout(config.clone(), ThreadId::new(), SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
config.clone(),
|
||||
ThreadId::new(),
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("resume should fail for missing rollout path");
|
||||
|
||||
@@ -1454,6 +1469,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("resume should succeed");
|
||||
@@ -1540,7 +1556,12 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
|
||||
|
||||
let resumed_thread_id = harness
|
||||
.control
|
||||
.resume_agent_from_rollout(harness.config.clone(), child_thread_id, SessionSource::Exec)
|
||||
.resume_agent_from_rollout(
|
||||
harness.config.clone(),
|
||||
child_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("resume should find archived rollout");
|
||||
assert_eq!(resumed_thread_id, child_thread_id);
|
||||
@@ -1799,6 +1820,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("single-thread resume should succeed");
|
||||
@@ -1895,6 +1917,7 @@ async fn resume_closed_child_reopens_open_descendants() {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("child resume should succeed");
|
||||
@@ -1987,6 +2010,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("tree resume should succeed");
|
||||
@@ -2100,6 +2124,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("tree resume should succeed");
|
||||
@@ -2215,6 +2240,7 @@ async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails()
|
||||
harness.config.clone(),
|
||||
parent_thread_id,
|
||||
SessionSource::Exec,
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("root resume should succeed");
|
||||
|
||||
@@ -518,7 +518,8 @@ pub async fn cleanup_stale_snapshots(codex_home: &Path, active_session_id: Threa
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home, session_id).await?;
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(codex_home, session_id, /*state_db_ctx*/ None).await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
|
||||
@@ -163,6 +163,7 @@ async fn try_resume_closed_agent(
|
||||
/*agent_role*/ None,
|
||||
/*task_name*/ None,
|
||||
)?,
|
||||
session.state_db(),
|
||||
)
|
||||
.await
|
||||
.map(|_| ())
|
||||
|
||||
@@ -78,9 +78,10 @@ async fn find_locates_rollout_file_by_id() {
|
||||
let id = Uuid::new_v4();
|
||||
let expected = write_minimal_rollout_with_id(home.path(), id);
|
||||
|
||||
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found =
|
||||
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found.unwrap(), expected);
|
||||
}
|
||||
@@ -94,9 +95,10 @@ async fn find_handles_gitignore_covering_codex_home_directory() {
|
||||
let id = Uuid::new_v4();
|
||||
let expected = write_minimal_rollout_with_id(&codex_home, id);
|
||||
|
||||
let found = find_thread_path_by_id_str(&codex_home, &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found =
|
||||
find_thread_path_by_id_str(&codex_home, &id.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found, Some(expected));
|
||||
}
|
||||
@@ -114,9 +116,10 @@ async fn find_prefers_sqlite_path_by_id() {
|
||||
write_minimal_rollout_with_id(home.path(), id);
|
||||
upsert_thread_metadata(home.path(), thread_id, db_path.clone()).await;
|
||||
|
||||
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found =
|
||||
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found, Some(db_path));
|
||||
}
|
||||
@@ -133,9 +136,10 @@ async fn find_falls_back_to_filesystem_when_sqlite_has_no_match() {
|
||||
.join("sessions/2030/12/30/rollout-2030-12-30T00-00-00-unrelated.jsonl");
|
||||
upsert_thread_metadata(home.path(), unrelated_thread_id, unrelated_path).await;
|
||||
|
||||
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found =
|
||||
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found, Some(expected));
|
||||
}
|
||||
@@ -147,9 +151,10 @@ async fn find_ignores_granular_gitignore_rules() {
|
||||
let expected = write_minimal_rollout_with_id(home.path(), id);
|
||||
std::fs::write(home.path().join("sessions/.gitignore"), "*.jsonl\n").unwrap();
|
||||
|
||||
let found = find_thread_path_by_id_str(home.path(), &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found =
|
||||
find_thread_path_by_id_str(home.path(), &id.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found, Some(expected));
|
||||
}
|
||||
@@ -210,9 +215,13 @@ async fn find_archived_locates_rollout_file_by_id() {
|
||||
let id = Uuid::new_v4();
|
||||
let expected = write_minimal_rollout_with_id_in_subdir(home.path(), "archived_sessions", id);
|
||||
|
||||
let found = find_archived_thread_path_by_id_str(home.path(), &id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let found = find_archived_thread_path_by_id_str(
|
||||
home.path(),
|
||||
&id.to_string(),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(found, Some(expected));
|
||||
}
|
||||
|
||||
@@ -1173,6 +1173,7 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
codex_home: &Path,
|
||||
subdir: &str,
|
||||
id_str: &str,
|
||||
state_db_ctx: Option<crate::state_db::StateDbHandle>,
|
||||
) -> io::Result<Option<PathBuf>> {
|
||||
// Validate UUID format early.
|
||||
if Uuid::parse_str(id_str).is_err() {
|
||||
@@ -1187,7 +1188,10 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
_ => None,
|
||||
};
|
||||
let thread_id = ThreadId::from_string(id_str).ok();
|
||||
let state_db_ctx = state_db::open_if_present(codex_home, "").await;
|
||||
let state_db_ctx = match state_db_ctx {
|
||||
Some(state_db_ctx) => Some(state_db_ctx),
|
||||
None => state_db::open_if_present(codex_home, "").await,
|
||||
};
|
||||
if let Some(state_db_ctx) = state_db_ctx.as_deref()
|
||||
&& let Some(thread_id) = thread_id
|
||||
&& let Some(db_path) = state_db::find_rollout_path_by_id(
|
||||
@@ -1247,21 +1251,25 @@ async fn find_thread_path_by_id_str_in_subdir(
|
||||
}
|
||||
|
||||
/// Locate a recorded thread rollout file by its UUID string using the existing
|
||||
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
|
||||
/// or the id is invalid.
|
||||
/// paginated listing implementation. Uses `state_db_ctx` when available to avoid reopening
|
||||
/// SQLite. Returns `Ok(Some(path))` if found, `Ok(None)` if not present or the id is invalid.
|
||||
pub async fn find_thread_path_by_id_str(
|
||||
codex_home: &Path,
|
||||
id_str: &str,
|
||||
state_db_ctx: Option<crate::state_db::StateDbHandle>,
|
||||
) -> io::Result<Option<PathBuf>> {
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, SESSIONS_SUBDIR, id_str).await
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, SESSIONS_SUBDIR, id_str, state_db_ctx).await
|
||||
}
|
||||
|
||||
/// Locate an archived thread rollout file by its UUID string.
|
||||
/// Locate an archived thread rollout file by its UUID string. Uses `state_db_ctx` when available
|
||||
/// to avoid reopening SQLite.
|
||||
pub async fn find_archived_thread_path_by_id_str(
|
||||
codex_home: &Path,
|
||||
id_str: &str,
|
||||
state_db_ctx: Option<crate::state_db::StateDbHandle>,
|
||||
) -> io::Result<Option<PathBuf>> {
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
|
||||
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str, state_db_ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
|
||||
|
||||
@@ -139,7 +139,12 @@ pub async fn find_thread_path_by_name_str(
|
||||
let Some(thread_id) = find_thread_id_by_name(codex_home, name).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await
|
||||
super::list::find_thread_path_by_id_str(
|
||||
codex_home,
|
||||
&thread_id.to_string(),
|
||||
/*state_db_ctx*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn session_index_path(codex_home: &Path) -> PathBuf {
|
||||
|
||||
@@ -25,6 +25,9 @@ use uuid::Uuid;
|
||||
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
|
||||
|
||||
/// Initialize the state runtime for thread state persistence and backfill checks.
|
||||
///
|
||||
/// Callers that keep a shared runtime alive for repeated state reads should use
|
||||
/// this helper instead of reopening SQLite on each lookup.
|
||||
pub async fn init(config: &impl RolloutConfigView) -> Option<StateDbHandle> {
|
||||
let config = RolloutConfig::from_view(config);
|
||||
let runtime = match codex_state::StateRuntime::init(
|
||||
|
||||
@@ -231,7 +231,7 @@ async fn find_thread_path_falls_back_when_db_path_is_stale() {
|
||||
));
|
||||
insert_state_db_thread(home, thread_id, stale_db_path.as_path(), false).await;
|
||||
|
||||
let found = find_thread_path_by_id_str(home, &uuid.to_string())
|
||||
let found = find_thread_path_by_id_str(home, &uuid.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.expect("lookup should succeed");
|
||||
assert_eq!(found, Some(fs_rollout_path.clone()));
|
||||
@@ -257,7 +257,7 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
|
||||
.await
|
||||
.expect("backfill should be complete");
|
||||
|
||||
let found = find_thread_path_by_id_str(home, &uuid.to_string())
|
||||
let found = find_thread_path_by_id_str(home, &uuid.to_string(), /*state_db_ctx*/ None)
|
||||
.await
|
||||
.expect("lookup should succeed");
|
||||
assert_eq!(found, Some(fs_rollout_path.clone()));
|
||||
|
||||
@@ -721,7 +721,8 @@ async fn run_ratatui_app(
|
||||
if let Some(id_str) = cli.fork_session_id.as_deref() {
|
||||
let is_uuid = Uuid::parse_str(id_str).is_ok();
|
||||
let path = if is_uuid {
|
||||
find_thread_path_by_id_str(&config.codex_home, id_str).await?
|
||||
find_thread_path_by_id_str(&config.codex_home, id_str, /*state_db_ctx*/ None)
|
||||
.await?
|
||||
} else {
|
||||
find_thread_path_by_name_str(&config.codex_home, id_str).await?
|
||||
};
|
||||
@@ -814,7 +815,7 @@ async fn run_ratatui_app(
|
||||
} else if let Some(id_str) = cli.resume_session_id.as_deref() {
|
||||
let is_uuid = Uuid::parse_str(id_str).is_ok();
|
||||
let path = if is_uuid {
|
||||
find_thread_path_by_id_str(&config.codex_home, id_str).await?
|
||||
find_thread_path_by_id_str(&config.codex_home, id_str, /*state_db_ctx*/ None).await?
|
||||
} else {
|
||||
find_thread_path_by_name_str(&config.codex_home, id_str).await?
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user