Record realtime close marker on replacement (#13058)

## Summary
- record a realtime close developer message when a new realtime session
replaces an active one
- assert the replacement marker through the mocked responses request
path

---------

Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Charles Cunningham <ccunningham@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-01 13:54:12 -08:00
committed by GitHub
parent c9cef6ba9e
commit 0aeb55bf08
27 changed files with 1292 additions and 214 deletions

View File

@@ -27,6 +27,8 @@ use codex_protocol::protocol::RealtimeConversationStartedEvent;
use http::HeaderMap;
use serde_json::Value;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
@@ -47,6 +49,7 @@ struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
text_tx: Sender<String>,
task: JoinHandle<()>,
realtime_active: Arc<AtomicBool>,
}
#[allow(dead_code)]
@@ -59,7 +62,9 @@ impl RealtimeConversationManager {
pub(crate) async fn running_state(&self) -> Option<()> {
let state = self.state.lock().await;
state.as_ref().map(|_| ())
state
.as_ref()
.and_then(|state| state.realtime_active.load(Ordering::Relaxed).then_some(()))
}
pub(crate) async fn start(
@@ -68,12 +73,13 @@ impl RealtimeConversationManager {
extra_headers: Option<HeaderMap>,
prompt: String,
session_id: Option<String>,
) -> CodexResult<Receiver<RealtimeEvent>> {
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
let previous_state = {
let mut guard = self.state.lock().await;
guard.take()
};
if let Some(state) = previous_state {
state.realtime_active.store(false, Ordering::Relaxed);
state.task.abort();
let _ = state.task.await;
}
@@ -97,6 +103,7 @@ impl RealtimeConversationManager {
let (events_tx, events_rx) =
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let task = spawn_realtime_input_task(writer, events, text_rx, audio_rx, events_tx);
let mut guard = self.state.lock().await;
@@ -104,8 +111,9 @@ impl RealtimeConversationManager {
audio_tx,
text_tx,
task,
realtime_active: Arc::clone(&realtime_active),
});
Ok(events_rx)
Ok((events_rx, realtime_active))
}
pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> {
@@ -158,6 +166,7 @@ impl RealtimeConversationManager {
};
if let Some(state) = state {
state.realtime_active.store(false, Ordering::Relaxed);
state.task.abort();
let _ = state.task.await;
}
@@ -186,7 +195,7 @@ pub(crate) async fn handle_start(
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
info!("starting realtime conversation");
let events_rx = match sess
let (events_rx, realtime_active) = match sess
.conversation
.start(api_provider, None, prompt, requested_session_id.clone())
.await
@@ -236,7 +245,7 @@ pub(crate) async fn handle_start(
)))
.await;
}
if let Some(()) = sess_clone.conversation.running_state().await {
if realtime_active.swap(false, Ordering::Relaxed) {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(