feat: add auto refresh on thread listeners (#9105)

This PR is in the scope of multi-agent work. 

An agent (=thread) can now spawn other agents. Those other agents are
not attached to any clients. We need a way to make sure that the clients
are aware of the new threads to look at (for approval for example). This
PR adds a channel to the `ThreadManager` that pushes the ID of those
newly created agents such that the client (here the app-server) can also
subscribe to those ones.
This commit is contained in:
jif-oai
2026-01-14 16:26:01 +00:00
committed by GitHub
parent 32b1795ff4
commit bcd7858ced
6 changed files with 114 additions and 42 deletions

View File

@@ -178,6 +178,7 @@ use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use toml::Value as TomlValue;
use tracing::error;
@@ -229,6 +230,7 @@ pub(crate) struct CodexMessageProcessor {
config: Arc<Config>,
cli_overrides: Vec<(String, TomlValue)>,
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
listener_thread_ids_by_subscription: HashMap<Uuid, ThreadId>,
active_login: Arc<Mutex<Option<ActiveLogin>>>,
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: PendingInterrupts,
@@ -286,6 +288,7 @@ impl CodexMessageProcessor {
config,
cli_overrides,
conversation_listeners: HashMap::new(),
listener_thread_ids_by_subscription: HashMap::new(),
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
@@ -1271,7 +1274,11 @@ impl CodexMessageProcessor {
});
}
async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) {
async fn process_new_conversation(
&mut self,
request_id: RequestId,
params: NewConversationParams,
) {
let NewConversationParams {
model,
model_provider,
@@ -1669,6 +1676,31 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}
/// Best-effort: attach a listener for thread_id if missing.
pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
if self
.listener_thread_ids_by_subscription
.values()
.any(|entry| *entry == thread_id)
{
return;
}
if let Err(err) = self
.attach_conversation_listener(thread_id, false, ApiVersion::V2)
.await
{
warn!(
"failed to attach listener for thread {thread_id}: {message}",
message = err.message
);
}
}
async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) {
let ThreadResumeParams {
thread_id,
@@ -3563,6 +3595,12 @@ impl CodexMessageProcessor {
Some(sender) => {
// Signal the spawned task to exit and acknowledge.
let _ = sender.send(());
if let Some(thread_id) = self
.listener_thread_ids_by_subscription
.remove(&subscription_id)
{
info!("removed listener for thread {thread_id}");
}
let response = RemoveConversationSubscriptionResponse {};
self.outgoing.send_response(request_id, response).await;
}
@@ -3598,6 +3636,8 @@ impl CodexMessageProcessor {
let (cancel_tx, mut cancel_rx) = oneshot::channel();
self.conversation_listeners
.insert(subscription_id, cancel_tx);
self.listener_thread_ids_by_subscription
.insert(subscription_id, conversation_id);
let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone();

View File

@@ -24,6 +24,7 @@ use toml::Value as TomlValue;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
@@ -175,13 +176,39 @@ pub async fn run_main(
feedback.clone(),
config_warnings,
);
let mut thread_created_rx = processor.thread_created_receiver();
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
break;
};
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// TODO(jif) handle lag.
// Assumes thread creation volume is low enough that lag never happens.
// If it does, we log and continue without resyncing to avoid attaching
// listeners for threads that should remain unsubscribed.
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}

View File

@@ -28,7 +28,9 @@ use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_core::default_client::set_default_originator;
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use tokio::sync::broadcast;
use toml::Value as TomlValue;
pub(crate) struct MessageProcessor {
@@ -159,7 +161,6 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;
self.initialized = true;
if !self.config_warnings.is_empty() {
for notification in self.config_warnings.drain(..) {
self.outgoing
@@ -214,6 +215,19 @@ impl MessageProcessor {
tracing::info!("<- notification: {:?}", notification);
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.codex_message_processor.thread_created_receiver()
}
pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
if !self.initialized {
return;
}
self.codex_message_processor
.try_attach_thread_listener(thread_id)
.await;
}
/// Handle a standalone JSON-RPC response originating from the peer.
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
tracing::info!("<- response: {:?}", response);

View File

@@ -1,10 +1,8 @@
use crate::CodexThread;
use crate::agent::AgentStatus;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
@@ -29,21 +27,18 @@ impl AgentControl {
}
/// Spawn a new agent thread and submit the initial prompt.
///
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
/// of the channel queue when there is no client actively reading the thread events.
pub(crate) async fn spawn_agent(
&self,
config: crate::config::Config,
prompt: String,
headless: bool,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let new_thread = state.spawn_new_thread(config, self.clone()).await?;
if headless {
spawn_headless_drain(Arc::clone(&new_thread.thread));
}
// Notify a new thread has been created. This notification will be processed by clients
// to subscribe or drain this newly created thread.
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.send_prompt(new_thread.thread_id, prompt).await?;
@@ -110,38 +105,18 @@ impl AgentControl {
}
}
/// When an agent is spawned "headless" (no UI/view attached), there may be no consumer polling
/// `CodexThread::next_event()`. The underlying event channel is unbounded, so the producer can
/// accumulate events indefinitely. This drain task prevents that memory growth by polling and
/// discarding events until shutdown.
fn spawn_headless_drain(thread: Arc<CodexThread>) {
tokio::spawn(async move {
loop {
match thread.next_event().await {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) {
break;
}
}
Err(err) => {
tracing::warn!("failed to receive event from agent: {err:?}");
break;
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CodexAuth;
use crate::CodexThread;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::config::Config;
use crate::config::ConfigBuilder;
use assert_matches::assert_matches;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
@@ -262,7 +237,7 @@ mod tests {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, "hello".to_string(), false)
.spawn_agent(config, "hello".to_string())
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
@@ -363,7 +338,7 @@ mod tests {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), "spawned".to_string(), false)
.spawn_agent(harness.config.clone(), "spawned".to_string())
.await
.expect("spawn_agent should succeed");
let _thread = harness

View File

@@ -31,8 +31,11 @@ use std::sync::Arc;
#[cfg(any(test, feature = "test-support"))]
use tempfile::TempDir;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use tracing::warn;
const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewThread {
@@ -54,6 +57,7 @@ pub struct ThreadManager {
/// function to require an `Arc<&Self>`.
pub(crate) struct ThreadManagerState {
threads: Arc<RwLock<HashMap<ThreadId, Arc<CodexThread>>>>,
thread_created_tx: broadcast::Sender<ThreadId>,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
@@ -70,9 +74,11 @@ impl ThreadManager {
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
) -> Self {
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::new(
codex_home.clone(),
auth_manager.clone(),
@@ -108,9 +114,11 @@ impl ThreadManager {
codex_home: PathBuf,
) -> Self {
let auth_manager = AuthManager::from_auth_for_testing(auth);
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::with_provider(
codex_home.clone(),
auth_manager.clone(),
@@ -174,6 +182,10 @@ impl ThreadManager {
}
}
pub fn subscribe_thread_created(&self) -> broadcast::Receiver<ThreadId> {
self.state.thread_created_tx.subscribe()
}
pub async fn get_thread(&self, thread_id: ThreadId) -> CodexResult<Arc<CodexThread>> {
self.state.get_thread(thread_id).await
}
@@ -348,6 +360,10 @@ impl ThreadManagerState {
session_configured,
})
}
pub(crate) fn notify_thread_created(&self, thread_id: ThreadId) {
let _ = self.thread_created_tx.send(thread_id);
}
}
/// Return a prefix of `items` obtained by cutting strictly before the nth user message

View File

@@ -94,7 +94,7 @@ mod spawn {
let result = session
.services
.agent_control
.spawn_agent(config, args.message, true)
.spawn_agent(config, args.message)
.await
.map_err(collab_spawn_error)?;