try something 3

This commit is contained in:
jif-oai
2025-11-12 14:06:04 +00:00
parent 3d58659451
commit c3480d94a1
4 changed files with 191 additions and 44 deletions

View File

@@ -1,4 +1,5 @@
use std::time::Duration;
use std::time::Instant;
use bytes::Bytes;
use codex_otel::otel_event_manager::OtelEventManager;
@@ -30,9 +31,16 @@ pub async fn process_sse_wire<S, D>(
let mut buffer = String::new();
loop {
let start = Instant::now();
let result = timeout(max_idle_duration, stream.next()).await;
let duration = start.elapsed();
match result {
Err(_) => {
otel_event_manager.sse_event_failed(
None,
duration,
&"idle timeout waiting for SSE",
);
let _ = tx_event
.send(Err(Error::Stream(
"stream idle timeout fired before Completed event".to_string(),
@@ -42,6 +50,7 @@ pub async fn process_sse_wire<S, D>(
return;
}
Ok(Some(Err(err))) => {
otel_event_manager.sse_event_failed(None, duration, &err);
let _ = tx_event.send(Err(err)).await;
return;
}
@@ -49,6 +58,11 @@ pub async fn process_sse_wire<S, D>(
let chunk_str = match std::str::from_utf8(&chunk) {
Ok(s) => s,
Err(err) => {
otel_event_manager.sse_event_failed(
None,
duration,
&format!("UTF8 error: {err}"),
);
let _ = tx_event
.send(Err(Error::Other(format!(
"Invalid UTF-8 in SSE chunk: {err}"
@@ -62,13 +76,29 @@ pub async fn process_sse_wire<S, D>(
buffer.push_str(&chunk_str);
while let Some(frame) = next_frame(&mut buffer) {
if !handle_frame(frame, &mut decoder, &tx_event, &otel_event_manager).await {
if !handle_frame(
frame,
&mut decoder,
&tx_event,
&otel_event_manager,
duration,
)
.await
{
return;
}
}
}
Ok(None) => {
if !drain_buffer(&mut buffer, &mut decoder, &tx_event, &otel_event_manager).await {
if !drain_buffer(
&mut buffer,
&mut decoder,
&tx_event,
&otel_event_manager,
duration,
)
.await
{
return;
}
return;
@@ -99,12 +129,13 @@ async fn drain_buffer<D>(
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
duration: Duration,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
while let Some(frame) = next_frame(buffer) {
if !handle_frame(frame, decoder, tx_event, otel_event_manager).await {
if !handle_frame(frame, decoder, tx_event, otel_event_manager, duration).await {
return false;
}
}
@@ -114,7 +145,7 @@ where
}
let remainder = std::mem::take(buffer);
handle_frame(remainder, decoder, tx_event, otel_event_manager).await
handle_frame(remainder, decoder, tx_event, otel_event_manager, duration).await
}
async fn handle_frame<D>(
@@ -122,23 +153,38 @@ async fn handle_frame<D>(
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
duration: Duration,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
if let Some(data) = parse_sse_frame(&frame) {
if let Err(e) = decoder.on_frame(&data, tx_event, otel_event_manager).await {
let _ = tx_event.send(Err(e)).await;
return false;
if let Some(frame) = parse_sse_frame(&frame) {
if frame.data.trim() == "[DONE]" {
otel_event_manager.sse_event_kind(&frame.event);
return true;
}
match decoder
.on_frame(&frame.data, tx_event, otel_event_manager)
.await
{
Ok(_) => {
otel_event_manager.sse_event_kind(&frame.event);
}
Err(e) => {
otel_event_manager.sse_event_failed(Some(&frame.event), duration, &e);
let _ = tx_event.send(Err(e)).await;
return false;
}
};
}
true
}
fn parse_sse_frame(frame: &str) -> Option<String> {
fn parse_sse_frame(frame: &str) -> Option<SseFrame> {
let mut data = String::new();
let mut saw_event = false;
let mut event: Option<String> = None;
let mut saw_data_line = false;
for raw_line in frame.split('\n') {
@@ -148,10 +194,10 @@ fn parse_sse_frame(frame: &str) -> Option<String> {
}
if let Some(rest) = line.strip_prefix("event:") {
if rest.trim_start().is_empty() && !saw_data_line {
continue;
let trimmed = rest.trim_start();
if !trimmed.is_empty() {
event = Some(trimmed.to_string());
}
saw_event = true;
continue;
}
@@ -171,11 +217,19 @@ fn parse_sse_frame(frame: &str) -> Option<String> {
}
}
if data.is_empty() && !saw_event && !saw_data_line {
if data.is_empty() && event.is_none() && !saw_data_line {
return None;
}
Some(data)
Some(SseFrame {
event: event.unwrap_or_else(|| "message".to_string()),
data,
})
}
struct SseFrame {
event: String,
data: String,
}
#[cfg(test)]