Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
b5f583bc3f Merge branch 'main' into jif/feedback-cascade 2026-04-01 17:09:50 +02:00
jif-oai
db34036e26 fmt 2026-04-01 16:58:13 +02:00
jif-oai
404001813f feat: /feedback cascade 2026-04-01 16:56:11 +02:00
4 changed files with 290 additions and 28 deletions

View File

@@ -7419,42 +7419,101 @@ impl CodexMessageProcessor {
}
let snapshot = self.feedback.snapshot(conversation_id);
let thread_id = snapshot.thread_id.clone();
let sqlite_feedback_logs = if include_logs {
let (feedback_thread_ids, sqlite_feedback_logs, state_db_ctx) = if include_logs {
if let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
let state_db_ctx = get_state_db(&self.config).await;
match (state_db_ctx.as_ref(), conversation_id) {
(Some(state_db_ctx), Some(conversation_id)) => {
let thread_id_text = conversation_id.to_string();
match state_db_ctx.query_feedback_logs(&thread_id_text).await {
Ok(logs) if logs.is_empty() => None,
Ok(logs) => Some(logs),
Err(err) => {
warn!(
"failed to query feedback logs from sqlite for thread_id={thread_id_text}: {err}"
);
None
let feedback_thread_ids = match conversation_id {
Some(conversation_id) => match self
.thread_manager
.list_agent_subtree_thread_ids(conversation_id)
.await
{
Ok(thread_ids) => thread_ids,
Err(err) => {
warn!(
"failed to list feedback subtree for thread_id={conversation_id}: {err}"
);
let mut thread_ids = vec![conversation_id];
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
for status in [
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
codex_state::DirectionalThreadSpawnEdgeStatus::Closed,
] {
match state_db_ctx
.list_thread_spawn_descendants_with_status(
conversation_id,
status,
)
.await
{
Ok(descendant_ids) => thread_ids.extend(descendant_ids),
Err(err) => warn!(
"failed to list persisted feedback subtree for thread_id={conversation_id}: {err}"
),
}
}
}
thread_ids
}
},
None => Vec::new(),
};
let sqlite_feedback_logs = if let Some(state_db_ctx) = state_db_ctx.as_ref()
&& !feedback_thread_ids.is_empty()
{
let thread_id_texts = feedback_thread_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let thread_id_refs = thread_id_texts
.iter()
.map(String::as_str)
.collect::<Vec<_>>();
match state_db_ctx
.query_feedback_logs_for_threads(&thread_id_refs)
.await
{
Ok(logs) if logs.is_empty() => None,
Ok(logs) => Some(logs),
Err(err) => {
let thread_ids = thread_id_texts.join(", ");
warn!(
"failed to query feedback logs from sqlite for thread_ids=[{thread_ids}]: {err}"
);
None
}
}
_ => None,
}
} else {
None
};
(feedback_thread_ids, sqlite_feedback_logs, state_db_ctx)
} else {
None
(Vec::new(), None, None)
};
let validated_rollout_path = if include_logs {
match conversation_id {
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
None => None,
let mut attachment_paths = Vec::new();
let mut seen_attachment_paths = HashSet::new();
if include_logs {
for feedback_thread_id in &feedback_thread_ids {
let Some(rollout_path) = self
.resolve_rollout_path(*feedback_thread_id, state_db_ctx.as_ref())
.await
else {
continue;
};
if seen_attachment_paths.insert(rollout_path.clone()) {
attachment_paths.push(rollout_path);
}
}
} else {
None
};
let mut attachment_paths = validated_rollout_path.into_iter().collect::<Vec<_>>();
}
if let Some(extra_log_files) = extra_log_files {
attachment_paths.extend(extra_log_files);
for extra_log_file in extra_log_files {
if seen_attachment_paths.insert(extra_log_file.clone()) {
attachment_paths.push(extra_log_file);
}
}
}
let session_source = self.thread_manager.session_source();
@@ -7573,10 +7632,29 @@ impl CodexMessageProcessor {
});
}
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
match self.thread_manager.get_thread(conversation_id).await {
Ok(conv) => conv.rollout_path(),
Err(_) => None,
async fn resolve_rollout_path(
&self,
conversation_id: ThreadId,
state_db_ctx: Option<&StateDbHandle>,
) -> Option<PathBuf> {
if let Ok(conversation) = self.thread_manager.get_thread(conversation_id).await
&& let Some(rollout_path) = conversation.rollout_path()
{
return Some(rollout_path);
}
let Some(state_db_ctx) = state_db_ctx else {
return None;
};
match state_db_ctx
.find_rollout_path_by_id(conversation_id, /*archived_only*/ None)
.await
{
Ok(rollout_path) => rollout_path,
Err(err) => {
warn!("failed to resolve rollout path for thread_id={conversation_id}: {err}");
None
}
}
}
}

View File

@@ -667,6 +667,15 @@ impl AgentControl {
self.state.agent_metadata_for_thread(agent_id)
}
pub(crate) async fn list_live_agent_subtree_thread_ids(
&self,
agent_id: ThreadId,
) -> CodexResult<Vec<ThreadId>> {
let mut thread_ids = vec![agent_id];
thread_ids.extend(self.live_thread_spawn_descendants(agent_id).await?);
Ok(thread_ids)
}
pub(crate) async fn get_agent_config_snapshot(
&self,
agent_id: ThreadId,

View File

@@ -1737,6 +1737,132 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
.expect("resumed child shutdown should succeed");
}
#[tokio::test]
async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let worker_path = AgentPath::root().join("worker").expect("worker path");
let reviewer_path = AgentPath::root().join("reviewer").expect("reviewer path");
let worker_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let worker_child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello worker child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(
worker_path
.join("child")
.expect("worker child path should be valid"),
),
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("worker child spawn should succeed");
let no_path_child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello anonymous child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("no-path child spawn should succeed");
let no_path_grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello anonymous grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: no_path_child_thread_id,
depth: 3,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("no-path grandchild spawn should succeed");
let _reviewer_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello reviewer"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(reviewer_path),
agent_nickname: None,
agent_role: Some("reviewer".to_string()),
})),
)
.await
.expect("reviewer spawn should succeed");
let _ = harness
.control
.shutdown_live_agent(no_path_grandchild_thread_id)
.await
.expect("no-path grandchild shutdown should succeed");
let mut worker_subtree_thread_ids = harness
.manager
.list_agent_subtree_thread_ids(worker_thread_id)
.await
.expect("worker subtree thread ids should load");
worker_subtree_thread_ids.sort_by_key(ToString::to_string);
let mut expected_worker_subtree_thread_ids = vec![
worker_thread_id,
worker_child_thread_id,
no_path_child_thread_id,
no_path_grandchild_thread_id,
];
expected_worker_subtree_thread_ids.sort_by_key(ToString::to_string);
assert_eq!(
worker_subtree_thread_ids,
expected_worker_subtree_thread_ids
);
let mut no_path_child_subtree_thread_ids = harness
.manager
.list_agent_subtree_thread_ids(no_path_child_thread_id)
.await
.expect("no-path subtree thread ids should load");
no_path_child_subtree_thread_ids.sort_by_key(ToString::to_string);
let mut expected_no_path_child_subtree_thread_ids =
vec![no_path_child_thread_id, no_path_grandchild_thread_id];
expected_no_path_child_subtree_thread_ids.sort_by_key(ToString::to_string);
assert_eq!(
no_path_child_subtree_thread_ids,
expected_no_path_child_subtree_thread_ids
);
}
#[tokio::test]
async fn shutdown_agent_tree_closes_live_descendants() {
let harness = AgentControlHarness::new().await;

View File

@@ -42,9 +42,11 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::W3cTraceContext;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -403,6 +405,53 @@ impl ThreadManager {
self.state.get_thread(thread_id).await
}
/// List `thread_id` plus all known descendants in its spawn subtree.
pub async fn list_agent_subtree_thread_ids(
&self,
thread_id: ThreadId,
) -> CodexResult<Vec<ThreadId>> {
let thread = self.state.get_thread(thread_id).await?;
let mut subtree_thread_ids = Vec::new();
let mut seen_thread_ids = HashSet::new();
subtree_thread_ids.push(thread_id);
seen_thread_ids.insert(thread_id);
if let Some(state_db_ctx) = thread.state_db() {
for status in [
DirectionalThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Closed,
] {
for descendant_id in state_db_ctx
.list_thread_spawn_descendants_with_status(thread_id, status)
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
})?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
}
}
}
}
for descendant_id in thread
.codex
.session
.services
.agent_control
.list_live_agent_subtree_thread_ids(thread_id)
.await?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
}
}
Ok(subtree_thread_ids)
}
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
// Box delegated thread-spawn futures so these convenience wrappers do
// not inline the full spawn path into every caller's async state.