Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
5f2320342e Align realtime error close handling
Co-authored-by: Codex <noreply@openai.com>
2026-03-17 09:33:52 -07:00
Ahmed Ibrahim
5eb7e86114 Unify realtime shutdown in core
Co-authored-by: Codex <noreply@openai.com>
2026-03-17 00:11:28 -07:00
3 changed files with 145 additions and 41 deletions

View File

@@ -188,7 +188,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
read_notification::<ThreadRealtimeClosedNotification>(&mut mcp, "thread/realtime/closed")
.await?;
assert_eq!(closed.thread_id, output_audio.thread_id);
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
assert_eq!(closed.reason.as_deref(), Some("error"));
let connections = realtime_server.connections();
assert_eq!(connections.len(), 1);

View File

@@ -56,6 +56,14 @@ const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
#[derive(Debug)]
enum RealtimeConversationEnd {
Requested,
TransportClosed,
Error(String),
ErrorAlreadySent,
}
pub(crate) struct RealtimeConversationManager {
state: Mutex<Option<ConversationState>>,
}
@@ -344,6 +352,23 @@ pub(crate) async fn handle_start(
sess: &Arc<Session>,
sub_id: String,
params: ConversationStartParams,
) -> CodexResult<()> {
if let Err(err) = handle_start_inner(sess, &sub_id, params).await {
error!("failed to start realtime conversation: {err}");
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
}
Ok(())
}
async fn handle_start_inner(
sess: &Arc<Session>,
sub_id: &str,
params: ConversationStartParams,
) -> CodexResult<()> {
let provider = sess.provider().await;
let auth = sess.services.auth_manager.auth().await;
@@ -392,23 +417,15 @@ pub(crate) async fn handle_start(
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
info!("starting realtime conversation");
let (events_rx, realtime_active) = match sess
let (events_rx, realtime_active) = sess
.conversation
.start(api_provider, extra_headers, session_config)
.await
{
Ok(events_rx) => events_rx,
Err(err) => {
error!("failed to start realtime conversation: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
return Ok(());
}
};
.await?;
info!("realtime conversation started");
sess.send_event_raw(Event {
id: sub_id.clone(),
id: sub_id.to_string(),
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
session_id: requested_session_id,
}),
@@ -416,11 +433,13 @@ pub(crate) async fn handle_start(
.await;
let sess_clone = Arc::clone(sess);
let sub_id = sub_id.to_string();
tokio::spawn(async move {
let ev = |msg| Event {
id: sub_id.clone(),
msg,
};
let mut end = RealtimeConversationEnd::TransportClosed;
while let Ok(event) = events_rx.recv().await {
// if not audio out, log the event
if !matches!(event, RealtimeEvent::AudioOut(_)) {
@@ -429,6 +448,9 @@ pub(crate) async fn handle_start(
"received realtime conversation event"
);
}
if matches!(event, RealtimeEvent::Error(_)) {
end = RealtimeConversationEnd::ErrorAlreadySent;
}
let maybe_routed_text = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
realtime_text_from_handoff_request(handoff)
@@ -449,14 +471,10 @@ pub(crate) async fn handle_start(
.await;
}
if realtime_active.swap(false, Ordering::Relaxed) {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
reason: Some("transport_closed".to_string()),
},
)))
.await;
if matches!(end, RealtimeConversationEnd::TransportClosed) {
info!("realtime conversation transport closed");
}
end_realtime_conversation(&sess_clone, sub_id, end).await;
}
});
@@ -470,7 +488,17 @@ pub(crate) async fn handle_audio(
) {
if let Err(err) = sess.conversation.audio_in(params.frame).await {
error!("failed to append realtime audio: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
if sess.conversation.running_state().await.is_some() {
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
@@ -545,25 +573,22 @@ pub(crate) async fn handle_text(
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
if let Err(err) = sess.conversation.text_in(params.text).await {
error!("failed to append realtime text: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
if sess.conversation.running_state().await.is_some() {
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(err.to_string()),
)
.await;
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
match sess.conversation.shutdown().await {
Ok(()) => {
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
reason: Some("requested".to_string()),
}),
})
.await;
}
Err(err) => {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
}
}
end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await;
}
fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
@@ -771,11 +796,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
Ok(None) => {
let _ = events_tx
.send(RealtimeEvent::Error(
"realtime websocket connection is closed".to_string(),
))
.await;
break;
}
Err(err) => {
@@ -868,6 +888,38 @@ async fn send_conversation_error(
.await;
}
async fn end_realtime_conversation(
sess: &Arc<Session>,
sub_id: String,
end: RealtimeConversationEnd,
) {
let _ = sess.conversation.shutdown().await;
if let RealtimeConversationEnd::Error(message) = &end {
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message.clone()),
}),
})
.await;
}
let reason = match end {
RealtimeConversationEnd::Requested => Some("requested".to_string()),
RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()),
RealtimeConversationEnd::Error(_) | RealtimeConversationEnd::ErrorAlreadySent => {
Some("error".to_string())
}
};
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { reason }),
})
.await;
}
#[cfg(test)]
#[path = "realtime_conversation_tests.rs"]
mod tests;

View File

@@ -12,6 +12,7 @@ use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::SessionSource;
@@ -381,6 +382,15 @@ impl EnvGuard {
}
Self { key, original }
}
fn unset(key: &'static str) -> Self {
let original = std::env::var_os(key);
// SAFETY: this guard restores the original value before the test exits.
unsafe {
std::env::remove_var(key);
}
Self { key, original }
}
}
impl Drop for EnvGuard {
@@ -427,6 +437,48 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> {
skip_if_no_network!(Ok(()));
let _env_guard = EnvGuard::unset(OPENAI_API_KEY_ENV_VAR);
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}) => Some(message.clone()),
_ => None,
})
.await;
assert_eq!(err, "realtime conversation requires API key auth");
let closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
assert_eq!(
closed,
RealtimeConversationClosedEvent {
reason: Some("error".to_string()),
}
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_text_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));