realtime: fix output format rate and smooth playback buffer

This commit is contained in:
Ahmed Ibrahim
2026-03-04 17:09:40 -08:00
parent f13917d50e
commit 159bda93c6
3 changed files with 90 additions and 15 deletions

View File

@@ -347,6 +347,7 @@ impl RealtimeWebsocketWriter {
output: SessionAudioOutput {
format: SessionAudioOutputFormat {
kind: "audio/pcm".to_string(),
rate: 24_000,
},
voice: "marin".to_string(),
},
@@ -902,6 +903,10 @@ mod tests {
first_json["session"]["audio"]["output"]["format"]["type"],
Value::String("audio/pcm".to_string())
);
assert_eq!(
first_json["session"]["audio"]["output"]["format"]["rate"],
Value::from(24_000)
);
assert_eq!(
first_json["session"]["audio"]["output"]["voice"],
Value::String("marin".to_string())

View File

@@ -67,6 +67,7 @@ pub(super) struct SessionAudioOutput {
pub(super) struct SessionAudioOutputFormat {
#[serde(rename = "type")]
pub(super) kind: String,
pub(super) rate: u32,
}
#[derive(Debug, Clone, Serialize)]

View File

@@ -484,7 +484,7 @@ fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec<i16>) -> u16 {
pub(crate) struct RealtimeAudioPlayer {
_stream: cpal::Stream,
queue: Arc<Mutex<VecDeque<i16>>>,
queue: Arc<Mutex<OutputAudioQueue>>,
output_sample_rate: u32,
output_channels: u16,
}
@@ -495,8 +495,14 @@ impl RealtimeAudioPlayer {
crate::audio_device::select_configured_output_device_and_config(config)?;
let output_sample_rate = config.sample_rate().0;
let output_channels = config.channels();
let queue = Arc::new(Mutex::new(VecDeque::new()));
let stream = build_output_stream(&device, &config, Arc::clone(&queue))?;
let prebuffer_samples = output_prebuffer_samples(output_sample_rate, output_channels);
let queue = Arc::new(Mutex::new(OutputAudioQueue::default()));
let stream = build_output_stream(
&device,
&config,
Arc::clone(&queue),
prebuffer_samples,
)?;
stream
.play()
.map_err(|e| format!("failed to start output stream: {e}"))?;
@@ -537,13 +543,14 @@ impl RealtimeAudioPlayer {
.lock()
.map_err(|_| "failed to lock output audio queue".to_string())?;
// TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback.
guard.extend(converted);
guard.samples.extend(converted);
Ok(())
}
pub(crate) fn clear(&self) {
if let Ok(mut guard) = self.queue.lock() {
guard.clear();
guard.samples.clear();
guard.primed = false;
}
}
}
@@ -551,14 +558,17 @@ impl RealtimeAudioPlayer {
fn build_output_stream(
device: &cpal::Device,
config: &cpal::SupportedStreamConfig,
queue: Arc<Mutex<VecDeque<i16>>>,
queue: Arc<Mutex<OutputAudioQueue>>,
prebuffer_samples: usize,
) -> Result<cpal::Stream, String> {
let config_any: cpal::StreamConfig = config.clone().into();
match config.sample_format() {
cpal::SampleFormat::F32 => device
.build_output_stream(
&config_any,
move |output: &mut [f32], _| fill_output_f32(output, &queue),
move |output: &mut [f32], _| {
fill_output_f32(output, &queue, prebuffer_samples)
},
move |err| error!("audio output error: {err}"),
None,
)
@@ -566,7 +576,9 @@ fn build_output_stream(
cpal::SampleFormat::I16 => device
.build_output_stream(
&config_any,
move |output: &mut [i16], _| fill_output_i16(output, &queue),
move |output: &mut [i16], _| {
fill_output_i16(output, &queue, prebuffer_samples)
},
move |err| error!("audio output error: {err}"),
None,
)
@@ -574,7 +586,9 @@ fn build_output_stream(
cpal::SampleFormat::U16 => device
.build_output_stream(
&config_any,
move |output: &mut [u16], _| fill_output_u16(output, &queue),
move |output: &mut [u16], _| {
fill_output_u16(output, &queue, prebuffer_samples)
},
move |err| error!("audio output error: {err}"),
None,
)
@@ -583,20 +597,67 @@ fn build_output_stream(
}
}
fn fill_output_i16(output: &mut [i16], queue: &Arc<Mutex<VecDeque<i16>>>) {
#[derive(Default)]
struct OutputAudioQueue {
samples: VecDeque<i16>,
primed: bool,
}
fn output_prebuffer_samples(sample_rate: u32, channels: u16) -> usize {
let samples_per_second = (sample_rate as usize).saturating_mul(channels as usize);
// 120ms jitter buffer smooths websocket burstiness without adding too much latency.
((samples_per_second as u64) * 120 / 1_000) as usize
}
fn should_output_silence(
queue: &mut OutputAudioQueue,
min_buffer_samples: usize,
) -> bool {
if !queue.primed {
if queue.samples.len() < min_buffer_samples {
return true;
}
queue.primed = true;
}
if queue.samples.is_empty() {
queue.primed = false;
return true;
}
false
}
fn fill_output_i16(
output: &mut [i16],
queue: &Arc<Mutex<OutputAudioQueue>>,
prebuffer_samples: usize,
) {
if let Ok(mut guard) = queue.lock() {
if should_output_silence(&mut guard, prebuffer_samples) {
output.fill(0);
return;
}
for sample in output {
*sample = guard.pop_front().unwrap_or(0);
*sample = guard.samples.pop_front().unwrap_or(0);
}
return;
}
output.fill(0);
}
fn fill_output_f32(output: &mut [f32], queue: &Arc<Mutex<VecDeque<i16>>>) {
fn fill_output_f32(
output: &mut [f32],
queue: &Arc<Mutex<OutputAudioQueue>>,
prebuffer_samples: usize,
) {
if let Ok(mut guard) = queue.lock() {
if should_output_silence(&mut guard, prebuffer_samples) {
output.fill(0.0);
return;
}
for sample in output {
let v = guard.pop_front().unwrap_or(0);
let v = guard.samples.pop_front().unwrap_or(0);
*sample = (v as f32) / (i16::MAX as f32);
}
return;
@@ -604,10 +665,18 @@ fn fill_output_f32(output: &mut [f32], queue: &Arc<Mutex<VecDeque<i16>>>) {
output.fill(0.0);
}
fn fill_output_u16(output: &mut [u16], queue: &Arc<Mutex<VecDeque<i16>>>) {
fn fill_output_u16(
output: &mut [u16],
queue: &Arc<Mutex<OutputAudioQueue>>,
prebuffer_samples: usize,
) {
if let Ok(mut guard) = queue.lock() {
if should_output_silence(&mut guard, prebuffer_samples) {
output.fill(32768);
return;
}
for sample in output {
let v = guard.pop_front().unwrap_or(0);
let v = guard.samples.pop_front().unwrap_or(0);
*sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16;
}
return;