Compare commits

...

7 Commits

Author SHA1 Message Date
Eric Traut
0d1e827de9 Fix goal processor after app-server split 2026-05-04 10:02:05 -07:00
Eric Traut
7074e940dc Merge remote-tracking branch 'origin/main' into etraut/goal-resume-thread-bug 2026-05-04 09:56:11 -07:00
Eric Traut
cc09aeb387 codex: preserve thread list ordering with DB previews (#20800) 2026-05-04 08:44:45 -07:00
Eric Traut
a50ee6db39 codex: address PR review feedback (#20800) 2026-05-04 08:32:47 -07:00
Eric Traut
4574639574 Adapt goal materialization to dispatch refactor 2026-05-03 19:45:45 -07:00
Eric Traut
7c9333e9db Document goal-started thread materialization 2026-05-03 19:39:30 -07:00
Eric Traut
e9f19e9196 Materialize goal-started threads for resume 2026-05-03 19:39:28 -07:00
5 changed files with 418 additions and 2 deletions

View File

@@ -370,6 +370,7 @@ impl MessageProcessor {
Arc::clone(&thread_manager),
outgoing.clone(),
Arc::clone(&config),
Arc::clone(&thread_store),
thread_state_manager.clone(),
);
let thread_processor = ThreadRequestProcessor::new(

View File

@@ -6,6 +6,7 @@ pub(crate) struct ThreadGoalRequestProcessor {
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,
config: Arc<Config>,
thread_store: Arc<dyn ThreadStore>,
thread_state_manager: ThreadStateManager,
}
@@ -14,12 +15,14 @@ impl ThreadGoalRequestProcessor {
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,
config: Arc<Config>,
thread_store: Arc<dyn ThreadStore>,
thread_state_manager: ThreadStateManager,
) -> Self {
Self {
thread_manager,
outgoing,
config,
thread_store,
thread_state_manager,
}
}
@@ -141,6 +144,60 @@ impl ThreadGoalRequestProcessor {
thread.prepare_external_goal_mutation().await;
}
let goal_preview_thread = if objective.is_some() && running_thread.is_some() {
// `/goal` can be the first interaction on a lazily-created thread. Materialize the
// rollout now so thread list/resume can discover it on disk.
self.thread_store
.persist_thread(thread_id)
.await
.map_err(|err| {
internal_error(format!(
"failed to materialize thread before setting goal: {err}"
))
})?;
self.thread_store
.flush_thread(thread_id)
.await
.map_err(|err| {
internal_error(format!(
"failed to flush materialized thread before setting goal: {err}"
))
})?;
reconcile_rollout(
Some(&state_db),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
let stored_thread = self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id,
include_archived: true,
include_history: true,
})
.await
.map_err(|err| {
internal_error(format!(
"failed to read materialized thread before setting goal: {err}"
))
})?;
let has_user_prompt = stored_thread.history.as_ref().is_some_and(|history| {
build_turns_from_rollout_items(&history.items)
.iter()
.flat_map(|turn| turn.items.iter())
.any(|item| matches!(item, ThreadItem::UserMessage { .. }))
});
(!has_user_prompt).then_some(stored_thread)
} else {
None
};
let goal = (if let Some(objective) = objective {
let existing_goal = state_db
.get_thread_goal(thread_id)
@@ -195,6 +252,34 @@ impl ThreadGoalRequestProcessor {
})
})
.map_err(|err| invalid_request(err.to_string()))?;
if let Some(objective) = objective
&& let Some(stored_thread) = goal_preview_thread
{
let first_user_message = format!("/goal {objective}");
match state_db
.update_thread_first_user_message(thread_id, &first_user_message)
.await
{
Ok(true) => {}
Ok(false) => {
if let Err(err) = upsert_goal_preview_thread_metadata(
state_db.as_ref(),
&stored_thread,
&first_user_message,
self.config.model_provider_id.as_str(),
)
.await
{
warn!("failed to seed goal-started thread metadata: {err}");
}
}
Err(err) => {
warn!("failed to seed goal-started thread preview: {err}");
}
}
}
let goal_status = goal.status;
let goal = api_thread_goal_from_state(goal);
self.outgoing
@@ -402,6 +487,47 @@ impl ThreadGoalRequestProcessor {
}
}
async fn upsert_goal_preview_thread_metadata(
state_db: &StateRuntime,
stored_thread: &StoredThread,
first_user_message: &str,
default_provider: &str,
) -> anyhow::Result<()> {
let rollout_path = stored_thread.rollout_path.clone().ok_or_else(|| {
anyhow::anyhow!(
"cannot seed preview for thread {} without rollout path",
stored_thread.thread_id
)
})?;
let mut builder = ThreadMetadataBuilder::new(
stored_thread.thread_id,
rollout_path,
stored_thread.created_at,
stored_thread.source.clone(),
);
builder.updated_at = Some(Utc::now());
builder.agent_nickname = stored_thread.agent_nickname.clone();
builder.agent_role = stored_thread.agent_role.clone();
builder.agent_path = stored_thread.agent_path.clone();
builder.model_provider = Some(stored_thread.model_provider.clone());
builder.cwd = stored_thread.cwd.clone();
builder.cli_version = Some(stored_thread.cli_version.clone());
builder.sandbox_policy = stored_thread.sandbox_policy.clone();
builder.approval_mode = stored_thread.approval_mode;
builder.archived_at = stored_thread.archived_at;
if let Some(git_info) = stored_thread.git_info.as_ref() {
builder.git_sha = git_info.commit_hash.as_ref().map(|sha| sha.0.clone());
builder.git_branch = git_info.branch.clone();
builder.git_origin_url = git_info.repository_url.clone();
}
let mut metadata = builder.build(default_provider);
metadata.model = stored_thread.model.clone();
metadata.reasoning_effort = stored_thread.reasoning_effort;
metadata.first_user_message = Some(first_user_message.to_string());
state_db.upsert_thread(&metadata).await
}
fn validate_goal_budget(value: Option<i64>) -> Result<(), String> {
if let Some(value) = value
&& value <= 0

View File

@@ -3,6 +3,7 @@ use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_fake_rollout_with_source;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::rollout_path;
use app_test_support::test_absolute_path;
@@ -15,8 +16,12 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadGoalSetResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
@@ -35,6 +40,7 @@ use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::protocol::SubAgentSource;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::cmp::Reverse;
use std::fs;
use std::fs::FileTimes;
@@ -198,6 +204,144 @@ async fn thread_list_basic_empty() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_list_includes_thread_started_with_goal() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_runtime_config(codex_home.path(), &server.uri())?;
let config_path = codex_home.path().join("config.toml");
let config = std::fs::read_to_string(&config_path)?;
std::fs::write(
&config_path,
config.replace(
"[model_providers.mock_provider]",
"[features]\ngoals = true\n\n[model_providers.mock_provider]",
),
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(
!thread.path.as_ref().expect("thread path").exists(),
"fresh thread should still be lazily materialized"
);
let objective = "improve benchmark coverage";
let goal_id = mcp
.send_raw_request(
"thread/goal/set",
Some(json!({
"threadId": thread.id.clone(),
"objective": objective,
"status": "active",
})),
)
.await?;
let goal_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(goal_id)),
)
.await??;
let _: ThreadGoalSetResponse = to_response(goal_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/goal/updated"),
)
.await??;
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
/*cursor*/ None,
Some(10),
Some(vec!["mock_provider".to_string()]),
/*source_kinds*/ None,
/*archived*/ None,
)
.await?;
let listed = data
.iter()
.find(|candidate| candidate.id == thread.id)
.expect("goal-started thread should be listed for resume");
assert_eq!(listed.preview, format!("/goal {objective}"));
assert!(
listed.path.as_ref().expect("listed thread path").exists(),
"setting the first goal should materialize the rollout"
);
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id.clone(),
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert!(
read.turns
.iter()
.flat_map(|turn| turn.items.iter())
.all(|item| !matches!(item, ThreadItem::UserMessage { .. })),
"goal-started preview should not create a synthetic user item"
);
let updated_objective = "tighten benchmark tracking";
let updated_goal_id = mcp
.send_raw_request(
"thread/goal/set",
Some(json!({
"threadId": thread.id.clone(),
"objective": updated_objective,
"status": "active",
})),
)
.await?;
let updated_goal_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(updated_goal_id)),
)
.await??;
let _: ThreadGoalSetResponse = to_response(updated_goal_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/goal/updated"),
)
.await??;
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
/*cursor*/ None,
Some(10),
Some(vec!["mock_provider".to_string()]),
/*source_kinds*/ None,
/*archived*/ None,
)
.await?;
let listed = data
.iter()
.find(|candidate| candidate.id == thread.id)
.expect("goal-started thread should remain listed for resume");
assert_eq!(listed.preview, format!("/goal {updated_objective}"));
Ok(())
}
#[tokio::test]
async fn thread_list_reports_system_error_idle_flag_after_failed_turn() -> Result<()> {
let responses = vec![

View File

@@ -1,5 +1,6 @@
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
use std::cmp::Reverse;
use std::collections::HashSet;
use std::fs;
use std::fs::File;
@@ -529,7 +530,9 @@ impl RolloutRecorder {
for item in &db_page.items {
// Rows that also appeared in the filesystem page were just validated from the
// rollout head. Rows only found by SQLite may be stale filter matches, so fully
// reconcile those before returning the filesystem-backed page.
// reconcile those before trusting the DB-backed page. This also keeps
// state-managed previews, such as goal-started threads, visible even when the
// rollout has session metadata but no user turn.
if fs_page_thread_ids.contains(&item.id) {
continue;
}
@@ -544,6 +547,74 @@ impl RolloutRecorder {
)
.await;
}
if let Some(repaired_db_page) = state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
archived,
search_term,
)
.await
{
let mut page =
page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
page = fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
page,
)
.await;
let db_page: ThreadsPage = repaired_db_page.into();
let db_more_matches_available = db_page.next_cursor.is_some();
page.num_scanned_files = page
.num_scanned_files
.saturating_add(db_page.num_scanned_files);
let fs_more_matches_available =
page.next_cursor.is_some() || page.reached_scan_cap;
let mut seen_thread_ids = page
.items
.iter()
.filter_map(|item| item.thread_id)
.collect::<HashSet<_>>();
page.items.extend(db_page.items.into_iter().filter(|item| {
let Some(thread_id) = item.thread_id else {
return false;
};
seen_thread_ids.insert(thread_id)
}));
let mut keyed_items = page
.items
.into_iter()
.filter_map(|item| {
thread_item_sort_key(&item, sort_key).map(|key| (key, item))
})
.collect::<Vec<_>>();
match sort_direction {
SortDirection::Asc => keyed_items.sort_by_key(|(key, _)| *key),
SortDirection::Desc => keyed_items.sort_by_key(|(key, _)| Reverse(*key)),
}
let more_matches_available = fs_more_matches_available
|| db_more_matches_available
|| keyed_items.len() > page_size;
keyed_items.truncate(page_size);
page.items = keyed_items.into_iter().map(|(_, item)| item).collect();
page.next_cursor = if more_matches_available {
page.items
.last()
.and_then(|item| cursor_from_thread_item(item, sort_key))
} else {
None
};
return Ok(page);
}
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),

View File

@@ -573,6 +573,24 @@ ON CONFLICT(id) DO NOTHING
Ok(result.rows_affected() > 0)
}
pub async fn update_thread_first_user_message(
&self,
thread_id: ThreadId,
first_user_message: &str,
) -> anyhow::Result<bool> {
let updated_at = self.allocate_thread_updated_at(Utc::now())?;
let result = sqlx::query(
"UPDATE threads SET updated_at = ?, updated_at_ms = ?, first_user_message = ? WHERE id = ?",
)
.bind(datetime_to_epoch_seconds(updated_at))
.bind(datetime_to_epoch_millis(updated_at))
.bind(first_user_message)
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn touch_thread_updated_at(
&self,
thread_id: ThreadId,
@@ -719,7 +737,10 @@ ON CONFLICT(id) DO UPDATE SET
sandbox_policy = excluded.sandbox_policy,
approval_mode = excluded.approval_mode,
tokens_used = excluded.tokens_used,
first_user_message = excluded.first_user_message,
first_user_message = CASE
WHEN excluded.first_user_message <> '' THEN excluded.first_user_message
ELSE threads.first_user_message
END,
archived = excluded.archived,
archived_at = excluded.archived_at,
git_sha = excluded.git_sha,
@@ -1557,6 +1578,59 @@ mod tests {
);
}
#[tokio::test]
async fn upsert_thread_preserves_existing_first_user_message_when_rollout_has_none() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("state db should initialize");
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000792").expect("valid thread id");
let mut existing = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
existing.first_user_message = Some("/goal improve benchmark coverage".to_string());
runtime
.upsert_thread(&existing)
.await
.expect("initial upsert should succeed");
let mut empty_rollout_metadata =
test_thread_metadata(&codex_home, thread_id, codex_home.clone());
empty_rollout_metadata.first_user_message = None;
runtime
.upsert_thread(&empty_rollout_metadata)
.await
.expect("empty rollout upsert should succeed");
let persisted = runtime
.get_thread(thread_id)
.await
.expect("thread should load")
.expect("thread should exist");
assert_eq!(
persisted.first_user_message.as_deref(),
Some("/goal improve benchmark coverage")
);
let mut real_user_metadata =
test_thread_metadata(&codex_home, thread_id, codex_home.clone());
real_user_metadata.first_user_message = Some("actual user prompt".to_string());
runtime
.upsert_thread(&real_user_metadata)
.await
.expect("real user message upsert should succeed");
let persisted = runtime
.get_thread(thread_id)
.await
.expect("thread should load")
.expect("thread should exist");
assert_eq!(
persisted.first_user_message.as_deref(),
Some("actual user prompt")
);
}
#[tokio::test]
async fn update_thread_git_info_can_clear_fields() {
let codex_home = unique_temp_dir();