Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
dfb779bf14 Skip codex tool call transcript in realtime v2
Co-authored-by: Codex <noreply@openai.com>
2026-03-18 15:26:14 -07:00

View File

@@ -20,6 +20,7 @@ use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -210,6 +211,8 @@ pub struct RealtimeWebsocketEvents {
#[derive(Default)]
struct ActiveTranscriptState {
entries: Vec<RealtimeTranscriptEntry>,
active_output_item_id: Option<String>,
suppress_output_transcript: bool,
}
impl RealtimeWebsocketConnection {
@@ -405,21 +408,46 @@ impl RealtimeWebsocketEvents {
append_transcript_delta(&mut active_transcript.entries, "user", delta);
}
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "assistant", delta);
if !active_transcript.suppress_output_transcript {
append_transcript_delta(&mut active_transcript.entries, "assistant", delta);
}
}
RealtimeEvent::ConversationItemAdded(item) => {
update_active_output_item(&mut active_transcript, item);
}
RealtimeEvent::ConversationItemDone { item_id } => {
if active_transcript.active_output_item_id.as_deref() == Some(item_id.as_str()) {
active_transcript.active_output_item_id = None;
active_transcript.suppress_output_transcript = false;
}
}
RealtimeEvent::HandoffRequested(handoff) => {
active_transcript.active_output_item_id = None;
active_transcript.suppress_output_transcript = false;
handoff.active_transcript = std::mem::take(&mut active_transcript.entries);
}
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::AudioOut(_)
| RealtimeEvent::ResponseCancelled(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. }
| RealtimeEvent::Error(_) => {}
}
}
}
fn update_active_output_item(active_transcript: &mut ActiveTranscriptState, item: &Value) {
let item = match item.get("item").and_then(Value::as_object) {
Some(item) => item,
None => return,
};
let item_id = item.get("id").and_then(Value::as_str).map(str::to_string);
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
active_transcript.active_output_item_id = item_id;
active_transcript.suppress_output_transcript =
item_type == Some("function_call") && item_name == Some("codex");
}
fn append_transcript_delta(entries: &mut Vec<RealtimeTranscriptEntry>, role: &str, delta: &str) {
if delta.is_empty() {
return;
@@ -1564,6 +1592,312 @@ mod tests {
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_v2_handoff_excludes_codex_tool_call_transcription() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_v2", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
ws.send(Message::Text(
json!({
"type": "conversation.item.input_audio_transcription.delta",
"delta": "delegate "
})
.to_string()
.into(),
))
.await
.expect("send input transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.item.input_audio_transcription.delta",
"delta": "now"
})
.to_string()
.into(),
))
.await
.expect("send input transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.item.added",
"item": {
"id": "assistant_message_1",
"type": "message",
"role": "assistant"
}
})
.to_string()
.into(),
))
.await
.expect("send assistant item added");
ws.send(Message::Text(
json!({
"type": "response.output_text.delta",
"delta": "working"
})
.to_string()
.into(),
))
.await
.expect("send assistant transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.item.done",
"item": {
"id": "assistant_message_1",
"type": "message",
"role": "assistant"
}
})
.to_string()
.into(),
))
.await
.expect("send assistant item done");
ws.send(Message::Text(
json!({
"type": "conversation.item.added",
"item": {
"id": "item_2",
"type": "function_call",
"name": "codex",
"call_id": "handoff_1",
"arguments": "{\"prompt\":\"delegate now\"}"
}
})
.to_string()
.into(),
))
.await
.expect("send codex tool item added");
ws.send(Message::Text(
json!({
"type": "response.output_text.delta",
"delta": "should not be included"
})
.to_string()
.into(),
))
.await
.expect("send codex transcript delta");
ws.send(Message::Text(
json!({
"type": "conversation.item.done",
"item": {
"id": "item_2",
"type": "function_call",
"name": "codex",
"call_id": "handoff_1",
"arguments": "{\"prompt\":\"delegate now\"}"
}
})
.to_string()
.into(),
))
.await
.expect("send codex tool item done");
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_v2".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
let input_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
input_delta_event,
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
delta: "delegate ".to_string(),
})
);
let input_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
input_delta_event,
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta {
delta: "now".to_string(),
})
);
let assistant_item_added = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
assistant_item_added,
RealtimeEvent::ConversationItemAdded(json!({
"type": "conversation.item.added",
"item": {
"id": "assistant_message_1",
"type": "message",
"role": "assistant"
}
}))
);
let output_delta_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
output_delta_event,
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
delta: "working".to_string(),
})
);
let assistant_item_done = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
assistant_item_done,
RealtimeEvent::ConversationItemDone {
item_id: "assistant_message_1".to_string(),
}
);
let codex_item_added = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
codex_item_added,
RealtimeEvent::ConversationItemAdded(json!({
"type": "conversation.item.added",
"item": {
"id": "item_2",
"type": "function_call",
"name": "codex",
"call_id": "handoff_1",
"arguments": "{\"prompt\":\"delegate now\"}"
}
}))
);
let codex_transcript_delta = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
codex_transcript_delta,
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta {
delta: "should not be included".to_string(),
})
);
let handoff_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
handoff_event,
RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "handoff_1".to_string(),
item_id: "item_2".to_string(),
input_transcript: "delegate now".to_string(),
active_transcript: vec![
RealtimeTranscriptEntry {
role: "user".to_string(),
text: "delegate now".to_string(),
},
RealtimeTranscriptEntry {
role: "assistant".to_string(),
text: "working".to_string(),
},
],
})
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn transcription_mode_session_update_omits_output_audio_and_instructions() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");