diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index cdd63b0f39..ce71e8974d 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -445,6 +445,7 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } + RealtimeEvent::ResponseCreated(_) => {} RealtimeEvent::ResponseCancelled(event) => { let notification = ThreadRealtimeItemAddedNotification { thread_id: conversation_id.to_string(), @@ -459,6 +460,7 @@ pub(crate) async fn apply_bespoke_event_handling( )) .await; } + RealtimeEvent::ResponseDone(_) => {} RealtimeEvent::ConversationItemAdded(item) => { let notification = ThreadRealtimeItemAddedNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index a9b510da25..efc167c9d0 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -71,6 +71,8 @@ use wiremock::matchers::path_regex; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; +const V2_STEERING_ACKNOWLEDGEMENT: &str = + "This was sent to steer the previous background agent task."; #[derive(Debug, Clone, Copy)] enum StartupContextConfig<'a> { @@ -329,6 +331,8 @@ impl RealtimeE2eHarness { read_notification(&mut self.mcp, method).await } + /// Returns the nth JSON message app-server wrote to the fake Realtime API + /// sideband websocket. async fn sideband_outbound_request(&self, request_index: usize) -> Value { self.realtime_server .wait_for_request(/*connection_index*/ 0, request_index) @@ -1204,6 +1208,169 @@ async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Resu Ok(()) } +/// Regression coverage for Realtime V2's single-active-response rule. +/// +/// The Realtime API rejects a new `response.create` while a default response is +/// still active, so the input task should queue the second create and flush it +/// only after the server sends `response.done` for the active response. +#[tokio::test] +async fn webrtc_v2_queues_text_response_create_while_response_is_active() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: script a server-side response that becomes active after the first + // user text turn, then finishes only after a later audio input. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + no_main_loop_responses(), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_v2_response_queue")], + vec![], + vec![ + json!({ + "type": "response.created", + "response": { "id": "resp_active" } + }), + json!({ + "type": "response.output_text.delta", + "delta": "active response started" + }), + ], + vec![], + vec![json!({ + "type": "response.done", + "response": { "id": "resp_active" } + })], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + + // From here on, `sideband_outbound_request(n)` reads outbound messages to + // the fake Realtime API sideband websocket. These are not client-facing + // notifications; they are the protocol frames app-server sends upstream. + assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?; + + // Phase 2: send the first text turn. It is safe to emit `response.create` + // immediately because no default response is active yet. + let thread_id = started.started.thread_id.clone(); + harness.append_text(thread_id.clone(), "first").await?; + assert_v2_user_text_item( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + "first", + ); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await); + let transcript = harness + .read_notification::( + "thread/realtime/transcriptUpdated", + ) + .await?; + assert_eq!(transcript.text, "active response started"); + + // Phase 3: send a second text turn while `resp_active` is still open. The + // user message must reach realtime, but `response.create` must not be sent + // yet or the Realtime API rejects it as an active-response conflict. + harness.append_text(thread_id.clone(), "second").await?; + assert_v2_user_text_item( + &harness.sideband_outbound_request(/*request_index*/ 3).await, + "second", + ); + + // Phase 4: the audio input causes the scripted sideband stream to send + // `response.done`, which clears the active response and flushes the queued + // `response.create` for the second text turn. + harness.append_audio(thread_id).await?; + + // This is the negative check: if the second text turn had emitted + // `response.create` immediately, request 4 would be that create instead of + // the audio append. + let audio = harness.sideband_outbound_request(/*request_index*/ 4).await; + assert_eq!(audio["type"], "input_audio_buffer.append"); + assert_eq!(audio["audio"], "BQYH"); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await); + + harness.shutdown().await; + Ok(()) +} + +/// Regression coverage for the same queued `response.create` path when the +/// active Realtime V2 response is cancelled instead of completed. +/// +/// `response.cancelled` should clear the active-response guard exactly like +/// `response.done`, so a text turn queued during the active response still gets +/// one deferred `response.create`. +#[tokio::test] +async fn webrtc_v2_flushes_queued_text_response_create_when_response_is_cancelled() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: script a server-side response that becomes active after the first + // text turn, then is cancelled only after a later audio input. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + no_main_loop_responses(), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_v2_response_cancel_queue")], + vec![], + vec![json!({ + "type": "response.created", + "response": { "id": "resp_cancelled" } + })], + vec![], + vec![json!({ + "type": "response.cancelled", + "response": { "id": "resp_cancelled" } + })], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?; + + // Phase 2: send the first text turn. It is safe to emit `response.create` + // immediately because no default response is active yet. + let thread_id = started.started.thread_id.clone(); + harness.append_text(thread_id.clone(), "first").await?; + assert_v2_user_text_item( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + "first", + ); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await); + + // Phase 3: send a second text turn while `resp_cancelled` is still open. + // The user message must reach realtime, but `response.create` stays queued. + harness.append_text(thread_id.clone(), "second").await?; + assert_v2_user_text_item( + &harness.sideband_outbound_request(/*request_index*/ 3).await, + "second", + ); + + // Phase 4: the audio input causes the scripted sideband stream to send + // `response.cancelled`, which clears the active response and flushes the + // queued `response.create` for the second text turn. + harness.append_audio(thread_id).await?; + + // This is the negative check: if the second text turn had emitted + // `response.create` immediately, request 4 would be that create instead of + // the audio append. + let audio = harness.sideband_outbound_request(/*request_index*/ 4).await; + assert_eq!(audio["type"], "input_audio_buffer.append"); + assert_eq!(audio["audio"], "BQYH"); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 5).await); + + harness.shutdown().await; + Ok(()) +} + +/// Regression coverage for the Realtime V2 background-agent final-output path. +/// +/// Once the background agent finishes, app-server sends the final function-call +/// output to realtime and then requests a new `response.create` so realtime can +/// react to that final output. #[tokio::test] async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_output() -> Result<()> { @@ -1223,6 +1390,7 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out ], vec![], vec![], + vec![], ])]), ) .await?; @@ -1240,8 +1408,8 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out .await?; assert_eq!(turn_completed.thread_id, harness.thread_id); - // Phase 3: assert the delegated prompt went to Responses and the result returned as exactly one - // v2 function-call output event on the sideband. + // Phase 3: assert the delegated prompt went to Responses and the result + // returned as exactly one v2 function-call output event on the sideband. let requests = harness.main_loop_responses_requests().await?; assert_eq!(requests.len(), 1); assert!( @@ -1260,6 +1428,99 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out 1 ); + // Phase 4: after the final function-call output, realtime needs an explicit + // `response.create` to produce the next user-visible response. + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await); + + harness.shutdown().await; + Ok(()) +} + +/// Regression coverage for Realtime V2 steering while a background-agent task is +/// already active. +/// +/// The second background-agent tool call is treated as guidance for the active +/// task. App-server acknowledges that steering message to realtime and then +/// emits `response.create` so realtime can speak that acknowledgement. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn webrtc_v2_background_agent_steering_ack_requests_response_create() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: gate the delegated Responses turn from the first tool call so + // the background-agent handoff stays active while realtime sends a second + // tool call that should steer the active task. + let main_loop_responses_server = responses::start_mock_server().await; + let (gate_completed_tx, gate_completed_rx) = mpsc::channel(); + let gated_response = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "first task finished"), + responses::ev_completed("resp-1"), + ]); + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .respond_with(GatedSseResponse { + gate_rx: Mutex::new(Some(gate_completed_rx)), + response: gated_response, + }) + .expect(2) + .mount(&main_loop_responses_server) + .await; + + let mut harness = RealtimeE2eHarness::new_with_main_loop_responses_server( + RealtimeTestVersion::V2, + main_loop_responses_server, + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v2_steering_ack"), + v2_background_agent_tool_call("call_active", "start a task"), + v2_background_agent_tool_call("call_steer", "steer the active task"), + ], + vec![], + vec![], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?; + let turn_started = harness + .read_notification::("turn/started") + .await?; + assert_eq!(turn_started.thread_id, harness.thread_id); + + // Phase 2: the second tool call happens while `call_active` is still + // running, so app-server sends a steering acknowledgement as a function-call + // output for the second call. + assert_v2_function_call_output( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + "call_steer", + V2_STEERING_ACKNOWLEDGEMENT, + ); + + // Phase 3: realtime needs a `response.create` after the steering + // acknowledgement so it can surface that acknowledgement to the user. + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 2).await); + + // Phase 4: release the gated delegated turn. Codex should then continue + // the same run with the steering text included in the follow-up Responses + // request, proving realtime did not merely acknowledge and drop it. + let _ = gate_completed_tx.send(()); + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + let requests = harness.main_loop_responses_requests().await?; + assert_eq!(requests.len(), 2); + assert!( + response_request_contains_text(&requests[1], "steer the active task"), + "follow-up Responses request should contain steering prompt: {}", + requests[1] + ); + harness.shutdown().await; Ok(()) } @@ -1714,6 +1975,32 @@ fn assert_v2_progress_update(request: &Value, expected_text: &str) { ); } +fn assert_v2_user_text_item(request: &Value, expected_text: &str) { + assert_eq!( + request, + &json!({ + "type": "conversation.item.create", + "item": { + "type": "message", + "role": "user", + "content": [{ + "type": "input_text", + "text": expected_text + }] + } + }) + ); +} + +fn assert_v2_response_create(request: &Value) { + assert_eq!( + request, + &json!({ + "type": "response.create" + }) + ); +} + fn assert_v1_session_update(request: &Value) -> Result<()> { assert_eq!(request["type"].as_str(), Some("session.update")); assert_eq!(request["session"]["type"].as_str(), Some("quicksilver")); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 911d362923..a2681f4969 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -419,7 +419,9 @@ impl RealtimeWebsocketEvents { } RealtimeEvent::SessionUpdated { .. } | RealtimeEvent::AudioOut(_) + | RealtimeEvent::ResponseCreated(_) | RealtimeEvent::ResponseCancelled(_) + | RealtimeEvent::ResponseDone(_) | RealtimeEvent::ConversationItemAdded(_) | RealtimeEvent::ConversationItemDone { .. } | RealtimeEvent::Error(_) => {} @@ -724,6 +726,8 @@ mod tests { use codex_protocol::protocol::RealtimeHandoffRequested; use codex_protocol::protocol::RealtimeInputAudioSpeechStarted; use codex_protocol::protocol::RealtimeResponseCancelled; + use codex_protocol::protocol::RealtimeResponseCreated; + use codex_protocol::protocol::RealtimeResponseDone; use codex_protocol::protocol::RealtimeVoice; use http::HeaderValue; use pretty_assertions::assert_eq; @@ -999,7 +1003,9 @@ mod tests { assert_eq!( parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), - None + Some(RealtimeEvent::ResponseDone(RealtimeResponseDone { + response_id: None + })) ); } @@ -1013,10 +1019,9 @@ mod tests { assert_eq!( parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), - Some(RealtimeEvent::ConversationItemAdded(json!({ - "type": "response.created", - "response": {"id": "resp_created_1"} - }))) + Some(RealtimeEvent::ResponseCreated(RealtimeResponseCreated { + response_id: Some("resp_created_1".to_string()) + })) ); } diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs index ea7b4d8117..4c2c909e80 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs @@ -7,6 +7,8 @@ use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RealtimeHandoffRequested; use codex_protocol::protocol::RealtimeInputAudioSpeechStarted; use codex_protocol::protocol::RealtimeResponseCancelled; +use codex_protocol::protocol::RealtimeResponseCreated; +use codex_protocol::protocol::RealtimeResponseDone; use serde_json::Map as JsonMap; use serde_json::Value; use tracing::debug; @@ -47,23 +49,17 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { .cloned() .map(RealtimeEvent::ConversationItemAdded), "conversation.item.done" => parse_conversation_item_done_event(&parsed), - "response.created" => Some(RealtimeEvent::ConversationItemAdded(parsed)), + "response.created" => Some(RealtimeEvent::ResponseCreated(RealtimeResponseCreated { + response_id: parse_response_event_response_id(&parsed), + })), "response.cancelled" => Some(RealtimeEvent::ResponseCancelled( RealtimeResponseCancelled { - response_id: parsed - .get("response") - .and_then(Value::as_object) - .and_then(|response| response.get("id")) - .and_then(Value::as_str) - .map(str::to_string) - .or_else(|| { - parsed - .get("response_id") - .and_then(Value::as_str) - .map(str::to_string) - }), + response_id: parse_response_event_response_id(&parsed), }, )), + "response.done" => Some(RealtimeEvent::ResponseDone(RealtimeResponseDone { + response_id: parse_response_event_response_id(&parsed), + })), "error" => parse_error_event(&parsed), _ => { debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}"); @@ -72,6 +68,21 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { } } +fn parse_response_event_response_id(parsed: &Value) -> Option { + parsed + .get("response") + .and_then(Value::as_object) + .and_then(|response| response.get("id")) + .and_then(Value::as_str) + .map(str::to_string) + .or_else(|| { + parsed + .get("response_id") + .and_then(Value::as_str) + .map(str::to_string) + }) +} + fn parse_output_audio_delta_event(parsed: &Value) -> Option { let data = parsed .get("delta") diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index fbd9d5974c..d22551a573 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -68,6 +68,8 @@ const REALTIME_V2_PROGRESS_UPDATE_SUFFIX: &str = "\n\nUpdate from background agent (task hasn't finished yet):"; const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous background agent task."; +const REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX: &str = + "Conversation already has an active response in progress:"; #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum RealtimeConversationEnd { @@ -117,6 +119,68 @@ struct OutputAudioState { audio_end_ms: u32, } +#[derive(Default)] +struct RealtimeResponseCreateQueue { + active_default_response: bool, + pending_create: bool, +} + +impl RealtimeResponseCreateQueue { + async fn request_create( + &mut self, + writer: &RealtimeWebsocketWriter, + events_tx: &Sender, + reason: &str, + ) -> anyhow::Result<()> { + if self.active_default_response { + self.pending_create = true; + return Ok(()); + } + self.send_create_now(writer, events_tx, reason).await + } + + fn mark_started(&mut self) { + self.active_default_response = true; + } + + async fn mark_finished( + &mut self, + writer: &RealtimeWebsocketWriter, + events_tx: &Sender, + reason: &str, + ) -> anyhow::Result<()> { + self.active_default_response = false; + if !self.pending_create { + return Ok(()); + } + self.pending_create = false; + self.send_create_now(writer, events_tx, reason).await + } + + async fn send_create_now( + &mut self, + writer: &RealtimeWebsocketWriter, + events_tx: &Sender, + reason: &str, + ) -> anyhow::Result<()> { + if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + let error_message = mapped_error.to_string(); + if error_message.starts_with(REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX) { + warn!("realtime response.create raced an active response; deferring"); + self.active_default_response = true; + self.pending_create = true; + return Ok(()); + } + warn!("failed to send {reason} response.create: {mapped_error}"); + let _ = events_tx.send(RealtimeEvent::Error(error_message)).await; + return Err(mapped_error.into()); + } + self.active_default_response = true; + Ok(()) + } +} + struct RealtimeInputTask { writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, @@ -871,12 +935,19 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { tokio::spawn(async move { let mut output_audio_state: Option = None; + let mut response_create_queue = RealtimeResponseCreateQueue::default(); loop { let result = tokio::select! { // Text typed by the user that should be sent into realtime. user_text = user_text_rx.recv() => { - handle_user_text_input(user_text, &writer, &events_tx, session_kind) + handle_user_text_input( + user_text, + &writer, + &events_tx, + session_kind, + &mut response_create_queue, + ) .await } // Background agent progress or final output that should be sent back to realtime. @@ -887,6 +958,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { &events_tx, &handoff_state, event_parser, + &mut response_create_queue, ) .await } @@ -899,6 +971,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { &handoff_state, session_kind, &mut output_audio_state, + &mut response_create_queue, ) .await } @@ -920,6 +993,7 @@ async fn handle_user_text_input( writer: &RealtimeWebsocketWriter, events_tx: &Sender, session_kind: RealtimeSessionKind, + response_create_queue: &mut RealtimeResponseCreateQueue, ) -> anyhow::Result<()> { let text = text.context("user text input channel closed")?; @@ -934,14 +1008,9 @@ async fn handle_user_text_input( match session_kind { RealtimeSessionKind::V1 => {} RealtimeSessionKind::V2 => { - if let Err(err) = writer.send_response_create().await { - let mapped_error = map_api_error(err); - warn!("failed to send text response.create: {mapped_error}"); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); - } + response_create_queue + .request_create(writer, events_tx, "text") + .await?; } } Ok(()) @@ -953,6 +1022,7 @@ async fn handle_handoff_output( events_tx: &Sender, handoff_state: &RealtimeHandoffState, event_parser: RealtimeEventParser, + response_create_queue: &mut RealtimeResponseCreateQueue, ) -> anyhow::Result<()> { let handoff_output = handoff_output.context("handoff output channel closed")?; @@ -1000,7 +1070,9 @@ async fn handle_handoff_output( { Err(err) } else { - writer.send_response_create().await + return response_create_queue + .request_create(writer, events_tx, "handoff") + .await; } } }, @@ -1023,6 +1095,7 @@ async fn handle_realtime_server_event( handoff_state: &RealtimeHandoffState, session_kind: RealtimeSessionKind, output_audio_state: &mut Option, + response_create_queue: &mut RealtimeResponseCreateQueue, ) -> anyhow::Result<()> { let event = match event { Ok(Some(event)) => event, @@ -1079,8 +1152,35 @@ async fn handle_realtime_server_event( } false } + RealtimeEvent::ResponseCreated(_) => { + match session_kind { + RealtimeSessionKind::V1 => {} + RealtimeSessionKind::V2 => response_create_queue.mark_started(), + } + false + } RealtimeEvent::ResponseCancelled(_) => { *output_audio_state = None; + match session_kind { + RealtimeSessionKind::V1 => {} + RealtimeSessionKind::V2 => { + response_create_queue + .mark_finished(writer, events_tx, "deferred") + .await?; + } + } + false + } + RealtimeEvent::ResponseDone(_) => { + *output_audio_state = None; + match session_kind { + RealtimeSessionKind::V1 => {} + RealtimeSessionKind::V2 => { + response_create_queue + .mark_finished(writer, events_tx, "deferred") + .await?; + } + } false } RealtimeEvent::HandoffRequested(handoff) => { @@ -1111,16 +1211,9 @@ async fn handle_realtime_server_event( .await; return Err(mapped_error.into()); } - if let Err(err) = writer.send_response_create().await { - let mapped_error = map_api_error(err); - warn!( - "failed to send handoff steering response.create: {mapped_error}" - ); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); - } + response_create_queue + .request_create(writer, events_tx, "handoff steering") + .await?; } None => { *handoff_state.last_output_text.lock().await = None; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index dddd3236e7..44693033d8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -313,6 +313,16 @@ pub struct RealtimeResponseCancelled { pub response_id: Option, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeResponseCreated { + pub response_id: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeResponseDone { + pub response_id: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] pub enum RealtimeEvent { SessionUpdated { @@ -323,7 +333,9 @@ pub enum RealtimeEvent { InputTranscriptDelta(RealtimeTranscriptDelta), OutputTranscriptDelta(RealtimeTranscriptDelta), AudioOut(RealtimeAudioFrame), + ResponseCreated(RealtimeResponseCreated), ResponseCancelled(RealtimeResponseCancelled), + ResponseDone(RealtimeResponseDone), ConversationItemAdded(Value), ConversationItemDone { item_id: String, diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 167c769bd0..f45ae5b9aa 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -340,7 +340,9 @@ impl ChatWidget { ev.payload, RealtimeEvent::AudioOut(_) | RealtimeEvent::InputAudioSpeechStarted(_) + | RealtimeEvent::ResponseCreated(_) | RealtimeEvent::ResponseCancelled(_) + | RealtimeEvent::ResponseDone(_) ) { return; @@ -353,7 +355,9 @@ impl ChatWidget { RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), + RealtimeEvent::ResponseCreated(_) => {} RealtimeEvent::ResponseCancelled(_) => self.interrupt_realtime_audio_playback(), + RealtimeEvent::ResponseDone(_) => {} RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {} RealtimeEvent::HandoffRequested(_) => {}