Add realtime transcription mode for websocket sessions (#14556)

- add experimental_realtime_ws_mode (conversational/transcription) and
plumb it into realtime conversation session config
- switch realtime websocket intent and session.update payload shape
based on mode
- update config schema and realtime/config tests

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-12 23:50:30 -07:00
committed by GitHub
parent eaf81d3f6f
commit 2253a9d1d7
9 changed files with 482 additions and 63 deletions

View File

@@ -7,6 +7,7 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
@@ -52,6 +53,16 @@ const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";
fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}
struct WsStream {
tx_command: mpsc::Sender<WsCommand>,
pump_task: tokio::task::JoinHandle<()>,
@@ -289,12 +300,16 @@ impl RealtimeWebsocketWriter {
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
let content_kind = match self.event_parser {
RealtimeEventParser::V1 => "text",
RealtimeEventParser::RealtimeV2 => "input_text",
};
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "text".to_string(),
kind: content_kind.to_string(),
text,
}],
}),
@@ -326,34 +341,51 @@ impl RealtimeWebsocketWriter {
self.send_json(message).await
}
pub async fn send_session_update(&self, instructions: String) -> Result<(), ApiError> {
let (session_kind, tools) = match self.event_parser {
RealtimeEventParser::V1 => (REALTIME_V1_SESSION_TYPE.to_string(), None),
RealtimeEventParser::RealtimeV2 => (
REALTIME_V2_SESSION_TYPE.to_string(),
Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
pub async fn send_session_update(
&self,
instructions: String,
session_mode: RealtimeSessionMode,
) -> Result<(), ApiError> {
let session_mode = normalized_session_mode(self.event_parser, session_mode);
let (session_kind, session_instructions, output_audio) = match session_mode {
RealtimeSessionMode::Conversational => {
let kind = match self.event_parser {
RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(),
RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(),
};
(
kind,
Some(instructions),
Some(SessionAudioOutput {
voice: REALTIME_AUDIO_VOICE.to_string(),
}),
}]),
),
)
}
RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None),
};
let tools = match self.event_parser {
RealtimeEventParser::RealtimeV2 => Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}]),
RealtimeEventParser::V1 => None,
};
self.send_json(RealtimeOutboundMessage::SessionUpdate {
session: SessionUpdateSession {
kind: session_kind,
instructions,
instructions: session_instructions,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
@@ -361,9 +393,7 @@ impl RealtimeWebsocketWriter {
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: SessionAudioOutput {
voice: REALTIME_AUDIO_VOICE.to_string(),
},
output: output_audio,
},
tools,
},
@@ -514,6 +544,8 @@ impl RealtimeWebsocketClient {
self.provider.base_url.as_str(),
self.provider.query_params.as_ref(),
config.model.as_deref(),
config.event_parser,
config.session_mode,
)?;
let mut request = ws_url
@@ -555,7 +587,7 @@ impl RealtimeWebsocketClient {
);
connection
.writer
.send_session_update(config.instructions)
.send_session_update(config.instructions, config.session_mode)
.await?;
Ok(connection)
}
@@ -600,6 +632,8 @@ fn websocket_url_from_api_url(
api_url: &str,
query_params: Option<&HashMap<String, String>>,
model: Option<&str>,
event_parser: RealtimeEventParser,
_session_mode: RealtimeSessionMode,
) -> Result<Url, ApiError> {
let mut url = Url::parse(api_url)
.map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?;
@@ -619,9 +653,20 @@ fn websocket_url_from_api_url(
}
}
{
let intent = match event_parser {
RealtimeEventParser::V1 => Some("quicksilver"),
RealtimeEventParser::RealtimeV2 => None,
};
let has_extra_query_params = query_params.is_some_and(|query_params| {
query_params
.iter()
.any(|(key, _)| key != "intent" && !(key == "model" && model.is_some()))
});
if intent.is_some() || model.is_some() || has_extra_query_params {
let mut query = url.query_pairs_mut();
query.append_pair("intent", "quicksilver");
if let Some(intent) = intent {
query.append_pair("intent", intent);
}
if let Some(model) = model {
query.append_pair("model", model);
}
@@ -902,8 +947,14 @@ mod tests {
#[test]
fn websocket_url_from_http_base_defaults_to_ws_path() {
let url =
websocket_url_from_api_url("http://127.0.0.1:8011", None, None).expect("build ws url");
let url = websocket_url_from_api_url(
"http://127.0.0.1:8011",
None,
None,
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"ws://127.0.0.1:8011/v1/realtime?intent=quicksilver"
@@ -912,9 +963,14 @@ mod tests {
#[test]
fn websocket_url_from_ws_base_defaults_to_ws_path() {
let url =
websocket_url_from_api_url("wss://example.com", None, Some("realtime-test-model"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"wss://example.com",
None,
Some("realtime-test-model"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?intent=quicksilver&model=realtime-test-model"
@@ -923,8 +979,14 @@ mod tests {
#[test]
fn websocket_url_from_v1_base_appends_realtime_path() {
let url = websocket_url_from_api_url("https://api.openai.com/v1", None, Some("snapshot"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"https://api.openai.com/v1",
None,
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://api.openai.com/v1/realtime?intent=quicksilver&model=snapshot"
@@ -933,9 +995,14 @@ mod tests {
#[test]
fn websocket_url_from_nested_v1_base_appends_realtime_path() {
let url =
websocket_url_from_api_url("https://example.com/openai/v1", None, Some("snapshot"))
.expect("build ws url");
let url = websocket_url_from_api_url(
"https://example.com/openai/v1",
None,
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/openai/v1/realtime?intent=quicksilver&model=snapshot"
@@ -951,6 +1018,8 @@ mod tests {
("intent".to_string(), "ignored".to_string()),
])),
Some("snapshot"),
RealtimeEventParser::V1,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
@@ -959,6 +1028,54 @@ mod tests {
);
}
#[test]
fn websocket_url_v1_ignores_transcription_mode() {
let url = websocket_url_from_api_url(
"https://example.com",
None,
None,
RealtimeEventParser::V1,
RealtimeSessionMode::Transcription,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?intent=quicksilver"
);
}
#[test]
fn websocket_url_omits_intent_for_realtime_v2_conversational_mode() {
let url = websocket_url_from_api_url(
"https://example.com/v1/realtime?foo=bar",
Some(&HashMap::from([
("trace".to_string(), "1".to_string()),
("intent".to_string(), "ignored".to_string()),
])),
Some("snapshot"),
RealtimeEventParser::RealtimeV2,
RealtimeSessionMode::Conversational,
)
.expect("build ws url");
assert_eq!(
url.as_str(),
"wss://example.com/v1/realtime?foo=bar&model=snapshot&trace=1"
);
}
#[test]
fn websocket_url_omits_intent_for_realtime_v2_transcription_mode() {
let url = websocket_url_from_api_url(
"https://example.com",
None,
None,
RealtimeEventParser::RealtimeV2,
RealtimeSessionMode::Transcription,
)
.expect("build ws url");
assert_eq!(url.as_str(), "wss://example.com/v1/realtime");
}
#[tokio::test]
async fn e2e_connect_and_exchange_events_against_mock_ws_server() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
@@ -1124,6 +1241,7 @@ mod tests {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -1301,14 +1419,36 @@ mod tests {
assert_eq!(second_json["type"], "conversation.item.create");
assert_eq!(
second_json["item"]["type"],
Value::String("message".to_string())
);
assert_eq!(
second_json["item"]["content"][0]["type"],
Value::String("input_text".to_string())
);
assert_eq!(
second_json["item"]["content"][0]["text"],
Value::String("delegate this".to_string())
);
let third = ws
.next()
.await
.expect("third msg")
.expect("third msg ok")
.into_text()
.expect("text");
let third_json: Value = serde_json::from_str(&third).expect("json");
assert_eq!(third_json["type"], "conversation.item.create");
assert_eq!(
third_json["item"]["type"],
Value::String("function_call_output".to_string())
);
assert_eq!(
second_json["item"]["call_id"],
third_json["item"]["call_id"],
Value::String("call_1".to_string())
);
assert_eq!(
second_json["item"]["output"],
third_json["item"]["output"],
Value::String("delegated result".to_string())
);
});
@@ -1335,6 +1475,7 @@ mod tests {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
@@ -1355,6 +1496,10 @@ mod tests {
}
);
connection
.send_conversation_item_create("delegate this".to_string())
.await
.expect("send text item");
connection
.send_conversation_handoff_append("call_1".to_string(), "delegated result".to_string())
.await
@@ -1364,6 +1509,205 @@ mod tests {
server.await.expect("server task");
}
#[tokio::test]
async fn transcription_mode_session_update_omits_output_audio_and_instructions() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("transcription".to_string())
);
assert!(first_json["session"].get("instructions").is_none());
assert!(first_json["session"]["audio"].get("output").is_none());
assert_eq!(
first_json["session"]["tools"][0]["name"],
Value::String("codex".to_string())
);
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_transcription"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "input_audio_buffer.append");
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Transcription,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_transcription".to_string(),
instructions: None,
}
);
connection
.send_audio_frame(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),
})
.await
.expect("send audio");
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn v1_transcription_mode_is_treated_as_conversational() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut ws = accept_async(stream).await.expect("accept ws");
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("quicksilver".to_string())
);
assert_eq!(
first_json["session"]["instructions"],
Value::String("backend prompt".to_string())
);
assert_eq!(
first_json["session"]["audio"]["output"]["voice"],
Value::String("fathom".to_string())
);
assert!(first_json["session"].get("tools").is_none());
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_v1_mode"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
});
let provider = Provider {
name: "test".to_string(),
base_url: format!("http://{addr}"),
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: crate::provider::RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
};
let client = RealtimeWebsocketClient::new(provider);
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Transcription,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_v1_mode".to_string(),
instructions: None,
}
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn send_does_not_block_while_next_event_waits_for_inbound_data() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
@@ -1427,6 +1771,7 @@ mod tests {
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),

View File

@@ -12,3 +12,4 @@ pub use methods::RealtimeWebsocketEvents;
pub use methods::RealtimeWebsocketWriter;
pub use protocol::RealtimeEventParser;
pub use protocol::RealtimeSessionConfig;
pub use protocol::RealtimeSessionMode;

View File

@@ -14,12 +14,19 @@ pub enum RealtimeEventParser {
RealtimeV2,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RealtimeSessionMode {
Conversational,
Transcription,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RealtimeSessionConfig {
pub instructions: String,
pub model: Option<String>,
pub session_id: Option<String>,
pub event_parser: RealtimeEventParser,
pub session_mode: RealtimeSessionMode,
}
#[derive(Debug, Clone, Serialize)]
@@ -42,7 +49,8 @@ pub(super) enum RealtimeOutboundMessage {
pub(super) struct SessionUpdateSession {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) instructions: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) instructions: Option<String>,
pub(super) audio: SessionAudio,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) tools: Option<Vec<SessionFunctionTool>>,
@@ -51,7 +59,8 @@ pub(super) struct SessionUpdateSession {
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionAudio {
pub(super) input: SessionAudioInput,
pub(super) output: SessionAudioOutput,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) output: Option<SessionAudioOutput>,
}
#[derive(Debug, Clone, Serialize)]

View File

@@ -29,6 +29,7 @@ pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;
pub use crate::endpoint::responses::ResponsesClient;