use super::*; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeConversationStartedEvent; #[cfg(not(target_os = "linux"))] use codex_protocol::protocol::RealtimeConversationVersion; use codex_protocol::protocol::RealtimeEvent; #[cfg(not(target_os = "linux"))] use std::sync::atomic::AtomicUsize; #[cfg(not(target_os = "linux"))] use std::time::Duration; const REALTIME_CONVERSATION_PROMPT: &str = "You are in a realtime voice conversation in the Codex TUI. Respond conversationally and concisely."; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub(super) enum RealtimeConversationPhase { #[default] Inactive, Starting, Active, Stopping, } #[derive(Default)] pub(super) struct RealtimeConversationUiState { pub(super) phase: RealtimeConversationPhase, #[cfg(not(target_os = "linux"))] audio_behavior: RealtimeAudioBehavior, requested_close: bool, session_id: Option, warned_audio_only_submission: bool, #[cfg(not(target_os = "linux"))] pub(super) meter_placeholder_id: Option, #[cfg(not(target_os = "linux"))] capture_stop_flag: Option>, #[cfg(not(target_os = "linux"))] capture: Option, #[cfg(not(target_os = "linux"))] audio_player: Option, #[cfg(not(target_os = "linux"))] // Shared queue depth lets capture suppress echoed speaker audio without // taking the playback queue lock from the input callback. playback_queued_samples: Arc, } #[cfg(not(target_os = "linux"))] #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] enum RealtimeAudioBehavior { #[default] Legacy, PlaybackAware, } #[cfg(not(target_os = "linux"))] impl RealtimeAudioBehavior { fn from_version(version: RealtimeConversationVersion) -> Self { match version { RealtimeConversationVersion::V1 => Self::Legacy, RealtimeConversationVersion::V2 => Self::PlaybackAware, } } fn input_behavior( self, playback_queued_samples: Arc, ) -> crate::voice::RealtimeInputBehavior { match self { Self::Legacy => crate::voice::RealtimeInputBehavior::Ungated, Self::PlaybackAware => crate::voice::RealtimeInputBehavior::PlaybackAware { playback_queued_samples, }, } } } impl RealtimeConversationUiState { pub(super) fn is_live(&self) -> bool { matches!( self.phase, RealtimeConversationPhase::Starting | RealtimeConversationPhase::Active | RealtimeConversationPhase::Stopping ) } #[cfg(not(target_os = "linux"))] pub(super) fn is_active(&self) -> bool { matches!(self.phase, RealtimeConversationPhase::Active) } } #[derive(Clone, Debug, PartialEq)] pub(super) struct RenderedUserMessageEvent { pub(super) message: String, pub(super) remote_image_urls: Vec, pub(super) local_images: Vec, pub(super) text_elements: Vec, } #[derive(Clone, Debug, PartialEq, Eq)] pub(super) struct PendingSteerCompareKey { pub(super) message: String, pub(super) image_count: usize, } impl ChatWidget { pub(super) fn stop_realtime_conversation_from_ui(&mut self) { self.request_realtime_conversation_close(/*info_message*/ None); } #[cfg(not(target_os = "linux"))] pub(crate) fn stop_realtime_conversation_for_deleted_meter(&mut self, id: &str) -> bool { if self.realtime_conversation.is_live() && self.realtime_conversation.meter_placeholder_id.as_deref() == Some(id) { self.realtime_conversation.meter_placeholder_id = None; self.stop_realtime_conversation_from_ui(); return true; } false } pub(super) fn rendered_user_message_event_from_parts( message: String, text_elements: Vec, local_images: Vec, remote_image_urls: Vec, ) -> RenderedUserMessageEvent { RenderedUserMessageEvent { message, remote_image_urls, local_images, text_elements, } } pub(super) fn rendered_user_message_event_from_event( event: &UserMessageEvent, ) -> RenderedUserMessageEvent { Self::rendered_user_message_event_from_parts( event.message.clone(), event.text_elements.clone(), event.local_images.clone(), event.images.clone().unwrap_or_default(), ) } /// Build the compare key for a submitted pending steer without invoking the /// expensive request-serialization path. Pending steers only need to match the /// committed `ItemCompleted(UserMessage)` emitted after core drains input, which /// preserves flattened text and total image count but not UI-only text ranges or /// local image paths. pub(super) fn pending_steer_compare_key_from_items( items: &[UserInput], ) -> PendingSteerCompareKey { let mut message = String::new(); let mut image_count = 0; for item in items { match item { UserInput::Text { text, .. } => message.push_str(text), UserInput::Image { .. } | UserInput::LocalImage { .. } => image_count += 1, UserInput::Skill { .. } | UserInput::Mention { .. } => {} _ => {} } } PendingSteerCompareKey { message, image_count, } } pub(super) fn pending_steer_compare_key_from_item( item: &codex_protocol::items::UserMessageItem, ) -> PendingSteerCompareKey { Self::pending_steer_compare_key_from_items(&item.content) } #[cfg(test)] pub(super) fn rendered_user_message_event_from_inputs( items: &[UserInput], ) -> RenderedUserMessageEvent { let mut message = String::new(); let mut remote_image_urls = Vec::new(); let mut local_images = Vec::new(); let mut text_elements = Vec::new(); for item in items { match item { UserInput::Text { text, text_elements: current_text_elements, } => append_text_with_rebased_elements( &mut message, &mut text_elements, text, current_text_elements.iter().map(|element| { TextElement::new( element.byte_range, element.placeholder(text).map(str::to_string), ) }), ), UserInput::Image { image_url } => remote_image_urls.push(image_url.clone()), UserInput::LocalImage { path } => local_images.push(path.clone()), UserInput::Skill { .. } | UserInput::Mention { .. } => {} _ => {} } } Self::rendered_user_message_event_from_parts( message, text_elements, local_images, remote_image_urls, ) } pub(super) fn should_render_realtime_user_message_event( &self, event: &UserMessageEvent, ) -> bool { if !self.realtime_conversation.is_live() { return false; } let key = Self::rendered_user_message_event_from_event(event); self.last_rendered_user_message_event.as_ref() != Some(&key) } pub(super) fn maybe_defer_user_message_for_realtime( &mut self, user_message: UserMessage, ) -> Option { if !self.realtime_conversation.is_live() { return Some(user_message); } self.restore_user_message_to_composer(user_message); if !self.realtime_conversation.warned_audio_only_submission { self.realtime_conversation.warned_audio_only_submission = true; self.add_info_message( "Realtime voice mode is audio-only. Use /realtime to stop.".to_string(), /*hint*/ None, ); } else { self.request_redraw(); } None } pub(super) fn start_realtime_conversation(&mut self) { self.realtime_conversation.phase = RealtimeConversationPhase::Starting; self.realtime_conversation.requested_close = false; self.realtime_conversation.session_id = None; #[cfg(not(target_os = "linux"))] { self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::Legacy; } self.realtime_conversation.warned_audio_only_submission = false; self.set_footer_hint_override(Some(vec![( "/realtime".to_string(), "stop live voice".to_string(), )])); self.submit_op(Op::RealtimeConversationStart(ConversationStartParams { prompt: REALTIME_CONVERSATION_PROMPT.to_string(), session_id: None, })); self.request_redraw(); } pub(super) fn request_realtime_conversation_close(&mut self, info_message: Option) { if !self.realtime_conversation.is_live() { if let Some(message) = info_message { self.add_info_message(message, /*hint*/ None); } return; } self.realtime_conversation.requested_close = true; self.realtime_conversation.phase = RealtimeConversationPhase::Stopping; self.submit_op(Op::RealtimeConversationClose); self.stop_realtime_local_audio(); self.set_footer_hint_override(/*items*/ None); if let Some(message) = info_message { self.add_info_message(message, /*hint*/ None); } else { self.request_redraw(); } } pub(super) fn reset_realtime_conversation_state(&mut self) { self.stop_realtime_local_audio(); self.set_footer_hint_override(/*items*/ None); self.realtime_conversation.phase = RealtimeConversationPhase::Inactive; self.realtime_conversation.requested_close = false; self.realtime_conversation.session_id = None; #[cfg(not(target_os = "linux"))] { self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::Legacy; } self.realtime_conversation.warned_audio_only_submission = false; } fn fail_realtime_conversation(&mut self, message: String) { self.add_error_message(message); if self.realtime_conversation.is_live() { self.request_realtime_conversation_close(/*info_message*/ None); } else { self.reset_realtime_conversation_state(); self.request_redraw(); } } pub(super) fn on_realtime_conversation_started( &mut self, ev: RealtimeConversationStartedEvent, ) { if !self.realtime_conversation_enabled() { self.request_realtime_conversation_close(/*info_message*/ None); return; } self.realtime_conversation.phase = RealtimeConversationPhase::Active; self.realtime_conversation.session_id = ev.session_id; #[cfg(not(target_os = "linux"))] { self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::from_version(ev.version); } self.realtime_conversation.warned_audio_only_submission = false; self.set_footer_hint_override(Some(vec![( "/realtime".to_string(), "stop live voice".to_string(), )])); self.start_realtime_local_audio(); self.request_redraw(); } pub(super) fn on_realtime_conversation_realtime( &mut self, ev: RealtimeConversationRealtimeEvent, ) { match ev.payload { RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } RealtimeEvent::InputAudioSpeechStarted(_) | RealtimeEvent::ResponseCancelled(_) => { #[cfg(not(target_os = "linux"))] { if matches!( self.realtime_conversation.audio_behavior, RealtimeAudioBehavior::PlaybackAware ) && let Some(player) = &self.realtime_conversation.audio_player { // Once the server detects user speech or the current response is cancelled, // any buffered assistant audio is stale and should stop gating mic input. player.clear(); } } } RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {} RealtimeEvent::HandoffRequested(_) => {} RealtimeEvent::Error(message) => { self.fail_realtime_conversation(format!("Realtime voice error: {message}")); } } } pub(super) fn on_realtime_conversation_closed(&mut self, ev: RealtimeConversationClosedEvent) { let requested = self.realtime_conversation.requested_close; let reason = ev.reason; self.reset_realtime_conversation_state(); if !requested && let Some(reason) = reason && reason != "error" { self.add_info_message( format!("Realtime voice mode closed: {reason}"), /*hint*/ None, ); } self.request_redraw(); } fn enqueue_realtime_audio_out(&mut self, frame: &RealtimeAudioFrame) { #[cfg(not(target_os = "linux"))] { if self.realtime_conversation.audio_player.is_none() { self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start( &self.config, Arc::clone(&self.realtime_conversation.playback_queued_samples), ) .ok(); } if let Some(player) = &self.realtime_conversation.audio_player && let Err(err) = player.enqueue_frame(frame) { warn!("failed to play realtime audio: {err}"); } } #[cfg(target_os = "linux")] { let _ = frame; } } #[cfg(not(target_os = "linux"))] fn start_realtime_local_audio(&mut self) { if self.realtime_conversation.capture_stop_flag.is_some() { return; } let placeholder_id = self.bottom_pane.insert_transcription_placeholder("тадтадтадтад"); self.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone()); self.request_redraw(); let capture = match crate::voice::VoiceCapture::start_realtime( &self.config, self.app_event_tx.clone(), self.realtime_conversation .audio_behavior .input_behavior(Arc::clone( &self.realtime_conversation.playback_queued_samples, )), ) { Ok(capture) => capture, Err(err) => { self.realtime_conversation.meter_placeholder_id = None; self.remove_transcription_placeholder(&placeholder_id); self.fail_realtime_conversation(format!( "Failed to start microphone capture: {err}" )); return; } }; let stop_flag = capture.stopped_flag(); let peak = capture.last_peak_arc(); let meter_placeholder_id = placeholder_id; let app_event_tx = self.app_event_tx.clone(); self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone()); self.realtime_conversation.capture = Some(capture); if self.realtime_conversation.audio_player.is_none() { self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start( &self.config, Arc::clone(&self.realtime_conversation.playback_queued_samples), ) .ok(); } std::thread::spawn(move || { let mut meter = crate::voice::RecordingMeterState::new(); loop { if stop_flag.load(Ordering::Relaxed) { break; } let meter_text = meter.next_text(peak.load(Ordering::Relaxed)); app_event_tx.send(AppEvent::UpdateRecordingMeter { id: meter_placeholder_id.clone(), text: meter_text, }); std::thread::sleep(Duration::from_millis(60)); } }); } #[cfg(target_os = "linux")] fn start_realtime_local_audio(&mut self) {} #[cfg(not(target_os = "linux"))] pub(crate) fn restart_realtime_audio_device(&mut self, kind: RealtimeAudioDeviceKind) { if !self.realtime_conversation.is_active() { return; } match kind { RealtimeAudioDeviceKind::Microphone => { self.stop_realtime_microphone(); self.start_realtime_local_audio(); } RealtimeAudioDeviceKind::Speaker => { self.stop_realtime_speaker(); match crate::voice::RealtimeAudioPlayer::start( &self.config, Arc::clone(&self.realtime_conversation.playback_queued_samples), ) { Ok(player) => { self.realtime_conversation.audio_player = Some(player); } Err(err) => { self.fail_realtime_conversation(format!( "Failed to start speaker output: {err}" )); } } } } self.request_redraw(); } #[cfg(target_os = "linux")] pub(crate) fn restart_realtime_audio_device(&mut self, kind: RealtimeAudioDeviceKind) { let _ = kind; } #[cfg(not(target_os = "linux"))] fn stop_realtime_local_audio(&mut self) { self.stop_realtime_microphone(); self.stop_realtime_speaker(); } #[cfg(target_os = "linux")] fn stop_realtime_local_audio(&mut self) {} #[cfg(not(target_os = "linux"))] fn stop_realtime_microphone(&mut self) { if let Some(flag) = self.realtime_conversation.capture_stop_flag.take() { flag.store(true, Ordering::Relaxed); } if let Some(capture) = self.realtime_conversation.capture.take() { let _ = capture.stop(); } if let Some(id) = self.realtime_conversation.meter_placeholder_id.take() { self.remove_transcription_placeholder(&id); } } #[cfg(not(target_os = "linux"))] fn stop_realtime_speaker(&mut self) { if let Some(player) = self.realtime_conversation.audio_player.take() { player.clear(); } } }