Add codex tool support for realtime v2 handoff (#14554)

- Advertise a `codex` function tool in realtime v2 session updates.
- Emit handoff replies as `function_call_output` items while keeping v1
behavior unchanged.
- Split realtime event parsing into explicit v1/v2 modules with shared
common helpers.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-12 23:30:02 -07:00
committed by GitHub
parent 0c60eea4a5
commit eaf81d3f6f
6 changed files with 474 additions and 272 deletions

View File

@@ -1,5 +1,7 @@
use crate::endpoint::realtime_websocket::protocol::ConversationItem;
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
@@ -11,6 +13,7 @@ use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
use crate::error::ApiError;
@@ -21,6 +24,7 @@ use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -41,6 +45,13 @@ use tracing::trace;
use tungstenite::protocol::WebSocketConfig;
use url::Url;
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_AUDIO_VOICE: &str = "fathom";
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
struct WsStream {
tx_command: mpsc::Sender<WsCommand>,
pump_task: tokio::task::JoinHandle<()>,
@@ -197,6 +208,7 @@ pub struct RealtimeWebsocketConnection {
pub struct RealtimeWebsocketWriter {
stream: Arc<WsStream>,
is_closed: Arc<AtomicBool>,
event_parser: RealtimeEventParser,
}
#[derive(Clone)]
@@ -258,6 +270,7 @@ impl RealtimeWebsocketConnection {
writer: RealtimeWebsocketWriter {
stream: Arc::clone(&stream),
is_closed: Arc::clone(&is_closed),
event_parser,
},
events: RealtimeWebsocketEvents {
rx_message: Arc::new(Mutex::new(rx_message)),
@@ -277,14 +290,14 @@ impl RealtimeWebsocketWriter {
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItem {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "text".to_string(),
text,
}],
},
}),
})
.await
}
@@ -294,29 +307,65 @@ impl RealtimeWebsocketWriter {
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.send_json(RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
})
.await
let message = match self.event_parser {
RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
},
RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::FunctionCallOutput(
ConversationFunctionCallOutputItem {
kind: "function_call_output".to_string(),
call_id: handoff_id,
output: output_text,
},
),
},
};
self.send_json(message).await
}
pub async fn send_session_update(&self, instructions: String) -> Result<(), ApiError> {
let (session_kind, tools) = match self.event_parser {
RealtimeEventParser::V1 => (REALTIME_V1_SESSION_TYPE.to_string(), None),
RealtimeEventParser::RealtimeV2 => (
REALTIME_V2_SESSION_TYPE.to_string(),
Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}]),
),
};
self.send_json(RealtimeOutboundMessage::SessionUpdate {
session: SessionUpdateSession {
kind: "quicksilver".to_string(),
kind: session_kind,
instructions,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: "audio/pcm".to_string(),
rate: 24_000,
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: SessionAudioOutput {
voice: "fathom".to_string(),
voice: REALTIME_AUDIO_VOICE.to_string(),
},
},
tools,
},
})
.await
@@ -1195,6 +1244,126 @@ mod tests {
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_v2_session_update_includes_codex_tool_and_handoff_output_item() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("realtime".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["type"],
Value::String("function".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["name"],
Value::String("codex".to_string())
);
assert_eq!(
first_json["session"]["tools"][0]["parameters"]["required"],
json!(["prompt"])
);
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_v2", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "conversation.item.create");
assert_eq!(
second_json["item"]["type"],
Value::String("function_call_output".to_string())
);
assert_eq!(
second_json["item"]["call_id"],
Value::String("call_1".to_string())
);
assert_eq!(
second_json["item"]["output"],
Value::String("delegated result".to_string())
);
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_v2".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
connection
.send_conversation_handoff_append("call_1".to_string(), "delegated result".to_string())
.await
.expect("send handoff output");
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn send_does_not_block_while_next_event_waits_for_inbound_data() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");

View File

@@ -1,5 +1,7 @@
pub mod methods;
pub mod protocol;
mod protocol_common;
mod protocol_v1;
mod protocol_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;

View File

@@ -1,3 +1,4 @@
use crate::endpoint::realtime_websocket::protocol_v1::parse_realtime_event_v1;
use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
@@ -6,7 +7,6 @@ pub use codex_protocol::protocol::RealtimeTranscriptDelta;
pub use codex_protocol::protocol::RealtimeTranscriptEntry;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RealtimeEventParser {
@@ -35,7 +35,7 @@ pub(super) enum RealtimeOutboundMessage {
#[serde(rename = "session.update")]
SessionUpdate { session: SessionUpdateSession },
#[serde(rename = "conversation.item.create")]
ConversationItemCreate { item: ConversationItem },
ConversationItemCreate { item: ConversationItemPayload },
}
#[derive(Debug, Clone, Serialize)]
@@ -44,6 +44,8 @@ pub(super) struct SessionUpdateSession {
pub(super) kind: String,
pub(super) instructions: String,
pub(super) audio: SessionAudio,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) tools: Option<Vec<SessionFunctionTool>>,
}
#[derive(Debug, Clone, Serialize)]
@@ -70,13 +72,28 @@ pub(super) struct SessionAudioOutput {
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationItem {
pub(super) struct ConversationMessageItem {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) role: String,
pub(super) content: Vec<ConversationItemContent>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub(super) enum ConversationItemPayload {
Message(ConversationMessageItem),
FunctionCallOutput(ConversationFunctionCallOutputItem),
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationFunctionCallOutputItem {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) call_id: String,
pub(super) output: String,
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationItemContent {
#[serde(rename = "type")]
@@ -84,6 +101,15 @@ pub(super) struct ConversationItemContent {
pub(super) text: String,
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionFunctionTool {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) name: String,
pub(super) description: String,
pub(super) parameters: Value,
}
pub(super) fn parse_realtime_event(
payload: &str,
event_parser: RealtimeEventParser,
@@ -93,125 +119,3 @@ pub(super) fn parse_realtime_event(
RealtimeEventParser::RealtimeV2 => parse_realtime_event_v2(payload),
}
}
fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {
debug!("failed to parse realtime event: {err}, data: {payload}");
return None;
}
};
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type,
None => {
debug!("received realtime event without type field: {payload}");
return None;
}
};
match message_type {
"session.updated" => {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
}
"conversation.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.or_else(|| parsed.get("data").and_then(Value::as_str))
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|v| u32::try_from(v).ok())?;
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|v| u16::try_from(v).ok())?;
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|v| u32::try_from(v).ok()),
}))
}
"conversation.input_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.output_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => parsed
.get("item")
.and_then(Value::as_object)
.and_then(|item| item.get("id"))
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
"conversation.handoff.requested" => {
let handoff_id = parsed
.get("handoff_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let item_id = parsed
.get("item_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let input_transcript = parsed
.get("input_transcript")
.and_then(Value::as_str)
.map(str::to_string)?;
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id,
item_id,
input_transcript,
active_transcript: Vec::new(),
}))
}
"error" => parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(std::string::ToString::to_string))
.map(RealtimeEvent::Error),
_ => {
debug!("received unsupported realtime event type: {message_type}, data: {payload}");
None
}
}
}

View File

@@ -0,0 +1,71 @@
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeTranscriptDelta;
use serde_json::Value;
use tracing::debug;
pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option<(Value, String)> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(message) => message,
Err(err) => {
debug!("failed to parse {parser_name} event: {err}, data: {payload}");
return None;
}
};
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type.to_string(),
None => {
debug!("received {parser_name} event without type field: {payload}");
return None;
}
};
Some((parsed, message_type))
}
pub(super) fn parse_session_updated_event(parsed: &Value) -> Option<RealtimeEvent> {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string)?;
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
Some(RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
}
pub(super) fn parse_transcript_delta_event(
parsed: &Value,
field: &str,
) -> Option<RealtimeTranscriptDelta> {
parsed
.get(field)
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeTranscriptDelta { delta })
}
pub(super) fn parse_error_event(parsed: &Value) -> Option<RealtimeEvent> {
parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(ToString::to_string))
.map(RealtimeEvent::Error)
}

View File

@@ -0,0 +1,83 @@
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use serde_json::Value;
use tracing::debug;
pub(super) fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v1")?;
match message_type.as_str() {
"session.updated" => parse_session_updated_event(&parsed),
"conversation.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.or_else(|| parsed.get("data").and_then(Value::as_str))
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())?;
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())?;
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
}
"conversation.input_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
}
"conversation.output_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
}
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => parsed
.get("item")
.and_then(Value::as_object)
.and_then(|item| item.get("id"))
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id }),
"conversation.handoff.requested" => {
let handoff_id = parsed
.get("handoff_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let item_id = parsed
.get("item_id")
.and_then(Value::as_str)
.map(str::to_string)?;
let input_transcript = parsed
.get("input_transcript")
.and_then(Value::as_str)
.map(str::to_string)?;
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id,
item_id,
input_transcript,
active_transcript: Vec::new(),
}))
}
"error" => parse_error_event(&parsed),
_ => {
debug!("received unsupported realtime v1 event type: {message_type}, data: {payload}");
None
}
}
}

View File

@@ -1,157 +1,130 @@
use crate::endpoint::realtime_websocket::protocol_common::parse_error_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_realtime_payload;
use crate::endpoint::realtime_websocket::protocol_common::parse_session_updated_event;
use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta_event;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptDelta;
use serde_json::Map as JsonMap;
use serde_json::Value;
use tracing::debug;
const CODEX_TOOL_NAME: &str = "codex";
const DEFAULT_AUDIO_SAMPLE_RATE: u32 = 24_000;
const DEFAULT_AUDIO_CHANNELS: u16 = 1;
const TOOL_ARGUMENT_KEYS: [&str; 5] = ["input_transcript", "input", "text", "prompt", "query"];
pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {
debug!("failed to parse realtime v2 event: {err}, data: {payload}");
return None;
}
};
let (parsed, message_type) = parse_realtime_payload(payload, "realtime v2")?;
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type,
None => {
debug!("received realtime v2 event without type field: {payload}");
return None;
match message_type.as_str() {
"session.updated" => parse_session_updated_event(&parsed),
"response.output_audio.delta" => parse_output_audio_delta_event(&parsed),
"conversation.item.input_audio_transcription.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta)
}
};
match message_type {
"session.updated" => {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
"conversation.item.input_audio_transcription.completed" => {
parse_transcript_delta_event(&parsed, "transcript")
.map(RealtimeEvent::InputTranscriptDelta)
}
"response.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(24_000);
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())
.unwrap_or(1);
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
"response.output_text.delta" | "response.output_audio_transcript.delta" => {
parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta)
}
"conversation.item.input_audio_transcription.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.input_audio_transcription.completed" => parsed
.get("transcript")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"response.output_text.delta" | "response.output_audio_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => {
let item = parsed.get("item")?.as_object()?;
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
if item_type == Some("function_call") && item_name == Some("codex") {
let call_id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))?;
let item_id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or(call_id)
.to_string();
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
let mut input_transcript = String::new();
if !arguments.is_empty() {
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
&& let Some(arguments_object) = arguments_json.as_object()
{
for key in ["input_transcript", "input", "text", "prompt", "query"] {
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
let trimmed = value.trim();
if !trimmed.is_empty() {
input_transcript = trimmed.to_string();
break;
}
}
}
}
if input_transcript.is_empty() {
input_transcript = arguments.to_string();
}
}
return Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: call_id.to_string(),
item_id,
input_transcript,
active_transcript: Vec::new(),
}));
}
item.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
}
"error" => parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(ToString::to_string))
.map(RealtimeEvent::Error),
"conversation.item.done" => parse_conversation_item_done_event(&parsed),
"error" => parse_error_event(&parsed),
_ => {
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
None
}
}
}
fn parse_output_audio_delta_event(parsed: &Value) -> Option<RealtimeEvent> {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(DEFAULT_AUDIO_SAMPLE_RATE);
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())
.unwrap_or(DEFAULT_AUDIO_CHANNELS);
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
}
fn parse_conversation_item_done_event(parsed: &Value) -> Option<RealtimeEvent> {
let item = parsed.get("item")?.as_object()?;
if let Some(handoff) = parse_handoff_requested_event(item) {
return Some(handoff);
}
item.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
}
fn parse_handoff_requested_event(item: &JsonMap<String, Value>) -> Option<RealtimeEvent> {
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
if item_type != Some("function_call") || item_name != Some(CODEX_TOOL_NAME) {
return None;
}
let call_id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))?;
let item_id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or(call_id)
.to_string();
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: call_id.to_string(),
item_id,
input_transcript: extract_input_transcript(arguments),
active_transcript: Vec::new(),
}))
}
fn extract_input_transcript(arguments: &str) -> String {
if arguments.is_empty() {
return String::new();
}
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
&& let Some(arguments_object) = arguments_json.as_object()
{
for key in TOOL_ARGUMENT_KEYS {
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
}
}
arguments.to_string()
}