mirror of
https://github.com/openai/codex.git
synced 2026-04-27 09:51:03 +03:00
Queue Realtime V2 response.create while active (#17306)
Builds on #17264. - queues Realtime V2 `response.create` while an active response is open, then flushes it after `response.done` or `response.cancelled` - requests `response.create` after background agent final output and steering acknowledgements - adds app-server integration coverage for all `response.create` paths Validation: - `just fmt` - `cargo check -p codex-app-server --tests` - `git diff --check` - CI green --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -445,6 +445,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
RealtimeEvent::ResponseCreated(_) => {}
|
||||
RealtimeEvent::ResponseCancelled(event) => {
|
||||
let notification = ThreadRealtimeItemAddedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -459,6 +460,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
))
|
||||
.await;
|
||||
}
|
||||
RealtimeEvent::ResponseDone(_) => {}
|
||||
RealtimeEvent::ConversationItemAdded(item) => {
|
||||
let notification = ThreadRealtimeItemAddedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
|
||||
@@ -71,6 +71,8 @@ use wiremock::matchers::path_regex;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const V2_STEERING_ACKNOWLEDGEMENT: &str =
|
||||
"This was sent to steer the previous background agent task.";
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum StartupContextConfig<'a> {
|
||||
@@ -329,6 +331,8 @@ impl RealtimeE2eHarness {
|
||||
read_notification(&mut self.mcp, method).await
|
||||
}
|
||||
|
||||
/// Returns the nth JSON message app-server wrote to the fake Realtime API
|
||||
/// sideband websocket.
|
||||
async fn sideband_outbound_request(&self, request_index: usize) -> Value {
|
||||
self.realtime_server
|
||||
.wait_for_request(/*connection_index*/ 0, request_index)
|
||||
@@ -1204,6 +1208,169 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Regression coverage for Realtime V2's single-active-response rule.
|
||||
///
|
||||
/// The Realtime API rejects a new `response.create` while a default response is
|
||||
/// still active, so the input task should queue the second create and flush it
|
||||
/// only after the server sends `response.done` for the active response.
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
// Phase 1: script a server-side response that becomes active after the first
|
||||
// user text turn, then finishes only after a later audio input.
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V2,
|
||||
no_main_loop_responses(),
|
||||
realtime_sideband(vec![realtime_sideband_connection(vec![
|
||||
vec![session_updated("sess_v2_response_queue")],
|
||||
vec![],
|
||||
vec![
|
||||
json!({
|
||||
"type": "response.created",
|
||||
"response": { "id": "resp_active" }
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_text.delta",
|
||||
"delta": "active response started"
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
vec![json!({
|
||||
"type": "response.done",
|
||||
"response": { "id": "resp_active" }
|
||||
})],
|
||||
vec![],
|
||||
])]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
|
||||
|
||||
// From here on, `sideband_outbound_request(n)` reads outbound messages to
|
||||
// the fake Realtime API sideband websocket. These are not client-facing
|
||||
// notifications; they are the protocol frames app-server sends upstream.
|
||||
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
|
||||
|
||||
// Phase 2: send the first text turn. It is safe to emit `response.create`
|
||||
// immediately because no default response is active yet.
|
||||
let thread_id = started.started.thread_id.clone();
|
||||
harness.append_text(thread_id.clone(), "first").await?;
|
||||
assert_v2_user_text_item(
|
||||
&harness.sideband_outbound_request(/*request_index*/ 1).await,
|
||||
"first",
|
||||
);
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await);
|
||||
let transcript = harness
|
||||
.read_notification::<ThreadRealtimeTranscriptUpdatedNotification>(
|
||||
"thread/realtime/transcriptUpdated",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(transcript.text, "active response started");
|
||||
|
||||
// Phase 3: send a second text turn while `resp_active` is still open. The
|
||||
// user message must reach realtime, but `response.create` must not be sent
|
||||
// yet or the Realtime API rejects it as an active-response conflict.
|
||||
harness.append_text(thread_id.clone(), "second").await?;
|
||||
assert_v2_user_text_item(
|
||||
&harness.sideband_outbound_request(/*request_index*/ 3).await,
|
||||
"second",
|
||||
);
|
||||
|
||||
// Phase 4: the audio input causes the scripted sideband stream to send
|
||||
// `response.done`, which clears the active response and flushes the queued
|
||||
// `response.create` for the second text turn.
|
||||
harness.append_audio(thread_id).await?;
|
||||
|
||||
// This is the negative check: if the second text turn had emitted
|
||||
// `response.create` immediately, request 4 would be that create instead of
|
||||
// the audio append.
|
||||
let audio = harness.sideband_outbound_request(/*request_index*/ 4).await;
|
||||
assert_eq!(audio["type"], "input_audio_buffer.append");
|
||||
assert_eq!(audio["audio"], "BQYH");
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Regression coverage for the same queued `response.create` path when the
|
||||
/// active Realtime V2 response is cancelled instead of completed.
|
||||
///
|
||||
/// `response.cancelled` should clear the active-response guard exactly like
|
||||
/// `response.done`, so a text turn queued during the active response still gets
|
||||
/// one deferred `response.create`.
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelled() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
// Phase 1: script a server-side response that becomes active after the first
|
||||
// text turn, then is cancelled only after a later audio input.
|
||||
let mut harness = RealtimeE2eHarness::new(
|
||||
RealtimeTestVersion::V2,
|
||||
no_main_loop_responses(),
|
||||
realtime_sideband(vec![realtime_sideband_connection(vec![
|
||||
vec![session_updated("sess_v2_response_cancel_queue")],
|
||||
vec![],
|
||||
vec![json!({
|
||||
"type": "response.created",
|
||||
"response": { "id": "resp_cancelled" }
|
||||
})],
|
||||
vec![],
|
||||
vec![json!({
|
||||
"type": "response.cancelled",
|
||||
"response": { "id": "resp_cancelled" }
|
||||
})],
|
||||
vec![],
|
||||
])]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
|
||||
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
|
||||
|
||||
// Phase 2: send the first text turn. It is safe to emit `response.create`
|
||||
// immediately because no default response is active yet.
|
||||
let thread_id = started.started.thread_id.clone();
|
||||
harness.append_text(thread_id.clone(), "first").await?;
|
||||
assert_v2_user_text_item(
|
||||
&harness.sideband_outbound_request(/*request_index*/ 1).await,
|
||||
"first",
|
||||
);
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await);
|
||||
|
||||
// Phase 3: send a second text turn while `resp_cancelled` is still open.
|
||||
// The user message must reach realtime, but `response.create` stays queued.
|
||||
harness.append_text(thread_id.clone(), "second").await?;
|
||||
assert_v2_user_text_item(
|
||||
&harness.sideband_outbound_request(/*request_index*/ 3).await,
|
||||
"second",
|
||||
);
|
||||
|
||||
// Phase 4: the audio input causes the scripted sideband stream to send
|
||||
// `response.cancelled`, which clears the active response and flushes the
|
||||
// queued `response.create` for the second text turn.
|
||||
harness.append_audio(thread_id).await?;
|
||||
|
||||
// This is the negative check: if the second text turn had emitted
|
||||
// `response.create` immediately, request 4 would be that create instead of
|
||||
// the audio append.
|
||||
let audio = harness.sideband_outbound_request(/*request_index*/ 4).await;
|
||||
assert_eq!(audio["type"], "input_audio_buffer.append");
|
||||
assert_eq!(audio["audio"], "BQYH");
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Regression coverage for the Realtime V2 background-agent final-output path.
|
||||
///
|
||||
/// Once the background agent finishes, app-server sends the final function-call
|
||||
/// output to realtime and then requests a new `response.create` so realtime can
|
||||
/// react to that final output.
|
||||
#[tokio::test]
|
||||
async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_output() -> Result<()>
|
||||
{
|
||||
@@ -1223,6 +1390,7 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
|
||||
],
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
])]),
|
||||
)
|
||||
.await?;
|
||||
@@ -1240,8 +1408,8 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
// Phase 3: assert the delegated prompt went to Responses and the result returned as exactly one
|
||||
// v2 function-call output event on the sideband.
|
||||
// Phase 3: assert the delegated prompt went to Responses and the result
|
||||
// returned as exactly one v2 function-call output event on the sideband.
|
||||
let requests = harness.main_loop_responses_requests().await?;
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert!(
|
||||
@@ -1260,6 +1428,99 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
|
||||
1
|
||||
);
|
||||
|
||||
// Phase 4: after the final function-call output, realtime needs an explicit
|
||||
// `response.create` to produce the next user-visible response.
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Regression coverage for Realtime V2 steering while a background-agent task is
|
||||
/// already active.
|
||||
///
|
||||
/// The second background-agent tool call is treated as guidance for the active
|
||||
/// task. App-server acknowledges that steering message to realtime and then
|
||||
/// emits `response.create` so realtime can speak that acknowledgement.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn webrtc_v2_background_agent_steering_ack_requests_response_create() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
// Phase 1: gate the delegated Responses turn from the first tool call so
|
||||
// the background-agent handoff stays active while realtime sends a second
|
||||
// tool call that should steer the active task.
|
||||
let main_loop_responses_server = responses::start_mock_server().await;
|
||||
let (gate_completed_tx, gate_completed_rx) = mpsc::channel();
|
||||
let gated_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "first task finished"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(GatedSseResponse {
|
||||
gate_rx: Mutex::new(Some(gate_completed_rx)),
|
||||
response: gated_response,
|
||||
})
|
||||
.expect(2)
|
||||
.mount(&main_loop_responses_server)
|
||||
.await;
|
||||
|
||||
let mut harness = RealtimeE2eHarness::new_with_main_loop_responses_server(
|
||||
RealtimeTestVersion::V2,
|
||||
main_loop_responses_server,
|
||||
realtime_sideband(vec![realtime_sideband_connection(vec![
|
||||
vec![
|
||||
session_updated("sess_v2_steering_ack"),
|
||||
v2_background_agent_tool_call("call_active", "start a task"),
|
||||
v2_background_agent_tool_call("call_steer", "steer the active task"),
|
||||
],
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
])]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
|
||||
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
|
||||
assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
|
||||
let turn_started = harness
|
||||
.read_notification::<TurnStartedNotification>("turn/started")
|
||||
.await?;
|
||||
assert_eq!(turn_started.thread_id, harness.thread_id);
|
||||
|
||||
// Phase 2: the second tool call happens while `call_active` is still
|
||||
// running, so app-server sends a steering acknowledgement as a function-call
|
||||
// output for the second call.
|
||||
assert_v2_function_call_output(
|
||||
&harness.sideband_outbound_request(/*request_index*/ 1).await,
|
||||
"call_steer",
|
||||
V2_STEERING_ACKNOWLEDGEMENT,
|
||||
);
|
||||
|
||||
// Phase 3: realtime needs a `response.create` after the steering
|
||||
// acknowledgement so it can surface that acknowledgement to the user.
|
||||
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await);
|
||||
|
||||
// Phase 4: release the gated delegated turn. Codex should then continue
|
||||
// the same run with the steering text included in the follow-up Responses
|
||||
// request, proving realtime did not merely acknowledge and drop it.
|
||||
let _ = gate_completed_tx.send(());
|
||||
let turn_completed = harness
|
||||
.read_notification::<TurnCompletedNotification>("turn/completed")
|
||||
.await?;
|
||||
assert_eq!(turn_completed.thread_id, harness.thread_id);
|
||||
|
||||
let requests = harness.main_loop_responses_requests().await?;
|
||||
assert_eq!(requests.len(), 2);
|
||||
assert!(
|
||||
response_request_contains_text(&requests[1], "steer the active task"),
|
||||
"follow-up Responses request should contain steering prompt: {}",
|
||||
requests[1]
|
||||
);
|
||||
|
||||
harness.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1714,6 +1975,32 @@ fn assert_v2_progress_update(request: &Value, expected_text: &str) {
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_v2_user_text_item(request: &Value, expected_text: &str) {
|
||||
assert_eq!(
|
||||
request,
|
||||
&json!({
|
||||
"type": "conversation.item.create",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
"content": [{
|
||||
"type": "input_text",
|
||||
"text": expected_text
|
||||
}]
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_v2_response_create(request: &Value) {
|
||||
assert_eq!(
|
||||
request,
|
||||
&json!({
|
||||
"type": "response.create"
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_v1_session_update(request: &Value) -> Result<()> {
|
||||
assert_eq!(request["type"].as_str(), Some("session.update"));
|
||||
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));
|
||||
|
||||
@@ -419,7 +419,9 @@ impl RealtimeWebsocketEvents {
|
||||
}
|
||||
RealtimeEvent::SessionUpdated { .. }
|
||||
| RealtimeEvent::AudioOut(_)
|
||||
| RealtimeEvent::ResponseCreated(_)
|
||||
| RealtimeEvent::ResponseCancelled(_)
|
||||
| RealtimeEvent::ResponseDone(_)
|
||||
| RealtimeEvent::ConversationItemAdded(_)
|
||||
| RealtimeEvent::ConversationItemDone { .. }
|
||||
| RealtimeEvent::Error(_) => {}
|
||||
@@ -724,6 +726,8 @@ mod tests {
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeInputAudioSpeechStarted;
|
||||
use codex_protocol::protocol::RealtimeResponseCancelled;
|
||||
use codex_protocol::protocol::RealtimeResponseCreated;
|
||||
use codex_protocol::protocol::RealtimeResponseDone;
|
||||
use codex_protocol::protocol::RealtimeVoice;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -999,7 +1003,9 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
|
||||
None
|
||||
Some(RealtimeEvent::ResponseDone(RealtimeResponseDone {
|
||||
response_id: None
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1013,10 +1019,9 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
|
||||
Some(RealtimeEvent::ConversationItemAdded(json!({
|
||||
"type": "response.created",
|
||||
"response": {"id": "resp_created_1"}
|
||||
})))
|
||||
Some(RealtimeEvent::ResponseCreated(RealtimeResponseCreated {
|
||||
response_id: Some("resp_created_1".to_string())
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeInputAudioSpeechStarted;
|
||||
use codex_protocol::protocol::RealtimeResponseCancelled;
|
||||
use codex_protocol::protocol::RealtimeResponseCreated;
|
||||
use codex_protocol::protocol::RealtimeResponseDone;
|
||||
use serde_json::Map as JsonMap;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
@@ -47,23 +49,17 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
|
||||
.cloned()
|
||||
.map(RealtimeEvent::ConversationItemAdded),
|
||||
"conversation.item.done" => parse_conversation_item_done_event(&parsed),
|
||||
"response.created" => Some(RealtimeEvent::ConversationItemAdded(parsed)),
|
||||
"response.created" => Some(RealtimeEvent::ResponseCreated(RealtimeResponseCreated {
|
||||
response_id: parse_response_event_response_id(&parsed),
|
||||
})),
|
||||
"response.cancelled" => Some(RealtimeEvent::ResponseCancelled(
|
||||
RealtimeResponseCancelled {
|
||||
response_id: parsed
|
||||
.get("response")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|response| response.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.or_else(|| {
|
||||
parsed
|
||||
.get("response_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
}),
|
||||
response_id: parse_response_event_response_id(&parsed),
|
||||
},
|
||||
)),
|
||||
"response.done" => Some(RealtimeEvent::ResponseDone(RealtimeResponseDone {
|
||||
response_id: parse_response_event_response_id(&parsed),
|
||||
})),
|
||||
"error" => parse_error_event(&parsed),
|
||||
_ => {
|
||||
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
|
||||
@@ -72,6 +68,21 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_response_event_response_id(parsed: &Value) -> Option<String> {
|
||||
parsed
|
||||
.get("response")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|response| response.get("id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.or_else(|| {
|
||||
parsed
|
||||
.get("response_id")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_output_audio_delta_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
let data = parsed
|
||||
.get("delta")
|
||||
|
||||
@@ -68,6 +68,8 @@ const REALTIME_V2_PROGRESS_UPDATE_SUFFIX: &str =
|
||||
"\n\nUpdate from background agent (task hasn't finished yet):";
|
||||
const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str =
|
||||
"This was sent to steer the previous background agent task.";
|
||||
const REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX: &str =
|
||||
"Conversation already has an active response in progress:";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum RealtimeConversationEnd {
|
||||
@@ -117,6 +119,68 @@ struct OutputAudioState {
|
||||
audio_end_ms: u32,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct RealtimeResponseCreateQueue {
|
||||
active_default_response: bool,
|
||||
pending_create: bool,
|
||||
}
|
||||
|
||||
impl RealtimeResponseCreateQueue {
|
||||
async fn request_create(
|
||||
&mut self,
|
||||
writer: &RealtimeWebsocketWriter,
|
||||
events_tx: &Sender<RealtimeEvent>,
|
||||
reason: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
if self.active_default_response {
|
||||
self.pending_create = true;
|
||||
return Ok(());
|
||||
}
|
||||
self.send_create_now(writer, events_tx, reason).await
|
||||
}
|
||||
|
||||
fn mark_started(&mut self) {
|
||||
self.active_default_response = true;
|
||||
}
|
||||
|
||||
async fn mark_finished(
|
||||
&mut self,
|
||||
writer: &RealtimeWebsocketWriter,
|
||||
events_tx: &Sender<RealtimeEvent>,
|
||||
reason: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
self.active_default_response = false;
|
||||
if !self.pending_create {
|
||||
return Ok(());
|
||||
}
|
||||
self.pending_create = false;
|
||||
self.send_create_now(writer, events_tx, reason).await
|
||||
}
|
||||
|
||||
async fn send_create_now(
|
||||
&mut self,
|
||||
writer: &RealtimeWebsocketWriter,
|
||||
events_tx: &Sender<RealtimeEvent>,
|
||||
reason: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
if let Err(err) = writer.send_response_create().await {
|
||||
let mapped_error = map_api_error(err);
|
||||
let error_message = mapped_error.to_string();
|
||||
if error_message.starts_with(REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX) {
|
||||
warn!("realtime response.create raced an active response; deferring");
|
||||
self.active_default_response = true;
|
||||
self.pending_create = true;
|
||||
return Ok(());
|
||||
}
|
||||
warn!("failed to send {reason} response.create: {mapped_error}");
|
||||
let _ = events_tx.send(RealtimeEvent::Error(error_message)).await;
|
||||
return Err(mapped_error.into());
|
||||
}
|
||||
self.active_default_response = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct RealtimeInputTask {
|
||||
writer: RealtimeWebsocketWriter,
|
||||
events: RealtimeWebsocketEvents,
|
||||
@@ -871,12 +935,19 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut output_audio_state: Option<OutputAudioState> = None;
|
||||
let mut response_create_queue = RealtimeResponseCreateQueue::default();
|
||||
|
||||
loop {
|
||||
let result = tokio::select! {
|
||||
// Text typed by the user that should be sent into realtime.
|
||||
user_text = user_text_rx.recv() => {
|
||||
handle_user_text_input(user_text, &writer, &events_tx, session_kind)
|
||||
handle_user_text_input(
|
||||
user_text,
|
||||
&writer,
|
||||
&events_tx,
|
||||
session_kind,
|
||||
&mut response_create_queue,
|
||||
)
|
||||
.await
|
||||
}
|
||||
// Background agent progress or final output that should be sent back to realtime.
|
||||
@@ -887,6 +958,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
&events_tx,
|
||||
&handoff_state,
|
||||
event_parser,
|
||||
&mut response_create_queue,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -899,6 +971,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
&handoff_state,
|
||||
session_kind,
|
||||
&mut output_audio_state,
|
||||
&mut response_create_queue,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -920,6 +993,7 @@ async fn handle_user_text_input(
|
||||
writer: &RealtimeWebsocketWriter,
|
||||
events_tx: &Sender<RealtimeEvent>,
|
||||
session_kind: RealtimeSessionKind,
|
||||
response_create_queue: &mut RealtimeResponseCreateQueue,
|
||||
) -> anyhow::Result<()> {
|
||||
let text = text.context("user text input channel closed")?;
|
||||
|
||||
@@ -934,14 +1008,9 @@ async fn handle_user_text_input(
|
||||
match session_kind {
|
||||
RealtimeSessionKind::V1 => {}
|
||||
RealtimeSessionKind::V2 => {
|
||||
if let Err(err) = writer.send_response_create().await {
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send text response.create: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
return Err(mapped_error.into());
|
||||
}
|
||||
response_create_queue
|
||||
.request_create(writer, events_tx, "text")
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -953,6 +1022,7 @@ async fn handle_handoff_output(
|
||||
events_tx: &Sender<RealtimeEvent>,
|
||||
handoff_state: &RealtimeHandoffState,
|
||||
event_parser: RealtimeEventParser,
|
||||
response_create_queue: &mut RealtimeResponseCreateQueue,
|
||||
) -> anyhow::Result<()> {
|
||||
let handoff_output = handoff_output.context("handoff output channel closed")?;
|
||||
|
||||
@@ -1000,7 +1070,9 @@ async fn handle_handoff_output(
|
||||
{
|
||||
Err(err)
|
||||
} else {
|
||||
writer.send_response_create().await
|
||||
return response_create_queue
|
||||
.request_create(writer, events_tx, "handoff")
|
||||
.await;
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -1023,6 +1095,7 @@ async fn handle_realtime_server_event(
|
||||
handoff_state: &RealtimeHandoffState,
|
||||
session_kind: RealtimeSessionKind,
|
||||
output_audio_state: &mut Option<OutputAudioState>,
|
||||
response_create_queue: &mut RealtimeResponseCreateQueue,
|
||||
) -> anyhow::Result<()> {
|
||||
let event = match event {
|
||||
Ok(Some(event)) => event,
|
||||
@@ -1079,8 +1152,35 @@ async fn handle_realtime_server_event(
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::ResponseCreated(_) => {
|
||||
match session_kind {
|
||||
RealtimeSessionKind::V1 => {}
|
||||
RealtimeSessionKind::V2 => response_create_queue.mark_started(),
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::ResponseCancelled(_) => {
|
||||
*output_audio_state = None;
|
||||
match session_kind {
|
||||
RealtimeSessionKind::V1 => {}
|
||||
RealtimeSessionKind::V2 => {
|
||||
response_create_queue
|
||||
.mark_finished(writer, events_tx, "deferred")
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::ResponseDone(_) => {
|
||||
*output_audio_state = None;
|
||||
match session_kind {
|
||||
RealtimeSessionKind::V1 => {}
|
||||
RealtimeSessionKind::V2 => {
|
||||
response_create_queue
|
||||
.mark_finished(writer, events_tx, "deferred")
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::HandoffRequested(handoff) => {
|
||||
@@ -1111,16 +1211,9 @@ async fn handle_realtime_server_event(
|
||||
.await;
|
||||
return Err(mapped_error.into());
|
||||
}
|
||||
if let Err(err) = writer.send_response_create().await {
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!(
|
||||
"failed to send handoff steering response.create: {mapped_error}"
|
||||
);
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
return Err(mapped_error.into());
|
||||
}
|
||||
response_create_queue
|
||||
.request_create(writer, events_tx, "handoff steering")
|
||||
.await?;
|
||||
}
|
||||
None => {
|
||||
*handoff_state.last_output_text.lock().await = None;
|
||||
|
||||
@@ -313,6 +313,16 @@ pub struct RealtimeResponseCancelled {
|
||||
pub response_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeResponseCreated {
|
||||
pub response_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeResponseDone {
|
||||
pub response_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub enum RealtimeEvent {
|
||||
SessionUpdated {
|
||||
@@ -323,7 +333,9 @@ pub enum RealtimeEvent {
|
||||
InputTranscriptDelta(RealtimeTranscriptDelta),
|
||||
OutputTranscriptDelta(RealtimeTranscriptDelta),
|
||||
AudioOut(RealtimeAudioFrame),
|
||||
ResponseCreated(RealtimeResponseCreated),
|
||||
ResponseCancelled(RealtimeResponseCancelled),
|
||||
ResponseDone(RealtimeResponseDone),
|
||||
ConversationItemAdded(Value),
|
||||
ConversationItemDone {
|
||||
item_id: String,
|
||||
|
||||
@@ -340,7 +340,9 @@ impl ChatWidget {
|
||||
ev.payload,
|
||||
RealtimeEvent::AudioOut(_)
|
||||
| RealtimeEvent::InputAudioSpeechStarted(_)
|
||||
| RealtimeEvent::ResponseCreated(_)
|
||||
| RealtimeEvent::ResponseCancelled(_)
|
||||
| RealtimeEvent::ResponseDone(_)
|
||||
)
|
||||
{
|
||||
return;
|
||||
@@ -353,7 +355,9 @@ impl ChatWidget {
|
||||
RealtimeEvent::InputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::OutputTranscriptDelta(_) => {}
|
||||
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
|
||||
RealtimeEvent::ResponseCreated(_) => {}
|
||||
RealtimeEvent::ResponseCancelled(_) => self.interrupt_realtime_audio_playback(),
|
||||
RealtimeEvent::ResponseDone(_) => {}
|
||||
RealtimeEvent::ConversationItemAdded(_item) => {}
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
RealtimeEvent::HandoffRequested(_) => {}
|
||||
|
||||
Reference in New Issue
Block a user