Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
6b38ce87b4 Delay v1 realtime delegation for transcript deltas
Co-authored-by: Codex <noreply@openai.com>
2026-04-09 01:55:06 -07:00
4 changed files with 60 additions and 45 deletions

View File

@@ -402,6 +402,10 @@ impl RealtimeWebsocketEvents {
}
}
pub async fn take_active_transcript(&self) -> Vec<RealtimeTranscriptEntry> {
std::mem::take(&mut self.active_transcript.lock().await.entries)
}
async fn update_active_transcript(&self, event: &mut RealtimeEvent) {
let mut active_transcript = self.active_transcript.lock().await;
match event {

View File

@@ -49,6 +49,7 @@ use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
@@ -62,6 +63,7 @@ const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64;
const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256;
const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5";
const V1_HANDOFF_DELEGATION_GRACE: Duration = Duration::from_secs(1);
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
@@ -81,18 +83,12 @@ pub(crate) struct RealtimeConversationManager {
state: Mutex<Option<ConversationState>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RealtimeSessionKind {
V1,
V2,
}
#[derive(Clone, Debug)]
struct RealtimeHandoffState {
output_tx: Sender<HandoffOutput>,
active_handoff: Arc<Mutex<Option<String>>>,
last_output_text: Arc<Mutex<Option<String>>>,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
}
#[derive(Debug, PartialEq, Eq)]
@@ -121,16 +117,16 @@ struct RealtimeInputTask {
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
fn new(output_tx: Sender<HandoffOutput>, event_parser: RealtimeEventParser) -> Self {
Self {
output_tx,
active_handoff: Arc::new(Mutex::new(None)),
last_output_text: Arc::new(Mutex::new(None)),
session_kind,
event_parser,
}
}
}
@@ -195,10 +191,7 @@ impl RealtimeConversationManager {
model_client,
sdp,
} = start;
let session_kind = match session_config.event_parser {
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let event_parser = session_config.event_parser;
let client = RealtimeWebsocketClient::new(api_provider);
let (connection, sdp) = if let Some(sdp) = sdp {
@@ -243,7 +236,7 @@ impl RealtimeConversationManager {
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let handoff = RealtimeHandoffState::new(handoff_output_tx, event_parser);
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: writer.clone(),
events,
@@ -252,7 +245,7 @@ impl RealtimeConversationManager {
audio_rx,
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
let mut guard = self.state.lock().await;
@@ -366,7 +359,7 @@ impl RealtimeConversationManager {
};
*handoff.last_output_text.lock().await = Some(output_text.clone());
if matches!(handoff.session_kind, RealtimeSessionKind::V1) {
if matches!(handoff.event_parser, RealtimeEventParser::V1) {
handoff
.output_tx
.send(HandoffOutput::ImmediateAppend {
@@ -387,7 +380,7 @@ impl RealtimeConversationManager {
let Some(handoff) = handoff else {
return Ok(());
};
if matches!(handoff.session_kind, RealtimeSessionKind::V1) {
if matches!(handoff.event_parser, RealtimeEventParser::V1) {
return Ok(());
}
@@ -708,16 +701,8 @@ async fn handle_start_inner(
if matches!(event, RealtimeEvent::Error(_)) {
end = RealtimeConversationEnd::Error;
}
let maybe_routed_text = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
realtime_text_from_handoff_request(handoff)
}
_ => None,
};
if let Some(text) = maybe_routed_text {
debug!(text = %text, "[realtime-text] realtime conversation text output");
let sess_for_routed_text = Arc::clone(&sess_clone);
sess_for_routed_text.route_realtime_text_input(text).await;
if let RealtimeEvent::HandoffRequested(handoff) = &event {
route_realtime_handoff(&sess_clone, handoff).await;
}
if !fanout_realtime_active.load(Ordering::Relaxed) {
break;
@@ -764,6 +749,13 @@ pub(crate) async fn handle_audio(
}
}
async fn route_realtime_handoff(sess: &Arc<Session>, handoff: &RealtimeHandoffRequested) {
if let Some(text) = realtime_text_from_handoff_request(handoff) {
debug!(text = %text, "[realtime-text] realtime conversation text output");
Arc::clone(sess).route_realtime_text_input(text).await;
}
}
fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option<String> {
let active_transcript = handoff
.active_transcript
@@ -854,7 +846,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
audio_rx,
events_tx,
handoff_state,
session_kind,
event_parser,
} = input;
tokio::spawn(async move {
@@ -875,7 +867,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
.await;
break;
}
if matches!(session_kind, RealtimeSessionKind::V2) {
if matches!(event_parser, RealtimeEventParser::RealtimeV2) {
if response_in_progress {
pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
@@ -929,7 +921,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
.await;
break;
}
if matches!(session_kind, RealtimeSessionKind::V2) {
if matches!(event_parser, RealtimeEventParser::RealtimeV2) {
if response_in_progress {
pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
@@ -962,12 +954,12 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
RealtimeEvent::ConversationItemAdded(item) => {
match item.get("type").and_then(Value::as_str) {
Some("response.created")
if matches!(session_kind, RealtimeSessionKind::V2) =>
if matches!(event_parser, RealtimeEventParser::RealtimeV2) =>
{
response_in_progress = true;
}
Some("response.done")
if matches!(session_kind, RealtimeSessionKind::V2) =>
if matches!(event_parser, RealtimeEventParser::RealtimeV2) =>
{
response_in_progress = false;
output_audio_state = None;
@@ -992,12 +984,12 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
RealtimeEvent::AudioOut(frame) => {
if matches!(session_kind, RealtimeSessionKind::V2) {
if matches!(event_parser, RealtimeEventParser::RealtimeV2) {
update_output_audio_state(&mut output_audio_state, frame);
}
}
RealtimeEvent::InputAudioSpeechStarted(event) => {
if matches!(session_kind, RealtimeSessionKind::V2)
if matches!(event_parser, RealtimeEventParser::RealtimeV2)
&& let Some(output_audio_state) =
output_audio_state.take()
&& event
@@ -1021,7 +1013,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
RealtimeEvent::ResponseCancelled(_) => {
response_in_progress = false;
output_audio_state = None;
if matches!(session_kind, RealtimeSessionKind::V2)
if matches!(event_parser, RealtimeEventParser::RealtimeV2)
&& pending_response_create
{
if let Err(err) = writer.send_response_create().await {
@@ -1044,9 +1036,24 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
*handoff_state.last_output_text.lock().await = None;
response_in_progress = false;
output_audio_state = None;
if event_parser == RealtimeEventParser::V1 {
let mut handoff = handoff.clone();
let events = events.clone();
let events_tx = events_tx.clone();
forward_event = false;
tokio::spawn(async move {
tokio::time::sleep(V1_HANDOFF_DELEGATION_GRACE).await;
handoff
.active_transcript
.extend(events.take_active_transcript().await);
let _ = events_tx
.send(RealtimeEvent::HandoffRequested(handoff))
.await;
});
}
}
RealtimeEvent::Error(message)
if matches!(session_kind, RealtimeSessionKind::V2)
if matches!(event_parser, RealtimeEventParser::RealtimeV2)
&& message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) =>
{
warn!(

View File

@@ -1,7 +1,7 @@
use super::RealtimeHandoffState;
use super::RealtimeSessionKind;
use super::realtime_text_from_handoff_request;
use async_channel::bounded;
use codex_api::RealtimeEventParser;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptEntry;
use pretty_assertions::assert_eq;
@@ -57,7 +57,7 @@ fn ignores_empty_handoff_request_input_transcript() {
#[tokio::test]
async fn clears_active_handoff_explicitly() {
let (tx, _rx) = bounded(1);
let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1);
let state = RealtimeHandoffState::new(tx, RealtimeEventParser::V1);
*state.active_handoff.lock().await = Some("handoff_1".to_string());
assert_eq!(

View File

@@ -1992,7 +1992,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
async fn inbound_handoff_request_collects_late_transcript_before_routing() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
@@ -2017,11 +2017,7 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "delegated query"
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assist confirm"
"delta": "delegated "
}),
json!({
"type": "conversation.handoff.requested",
@@ -2029,6 +2025,14 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
"item_id": "item_inbound_multi",
"input_transcript": "ignored"
}),
json!({
"type": "conversation.input_transcript.delta",
"delta": "query"
}),
json!({
"type": "conversation.output_transcript.delta",
"delta": "assist confirm"
}),
]]])
.await;