try something 2

This commit is contained in:
jif-oai
2025-11-12 12:49:50 +00:00
parent fe95c24442
commit 3d58659451
3 changed files with 269 additions and 28 deletions

View File

@@ -27,7 +27,7 @@ pub async fn process_sse_wire<S, D>(
D: crate::client::WireResponseDecoder + Send,
{
let mut stream = stream;
let mut data_buffer = String::new();
let mut buffer = String::new();
loop {
let result = timeout(max_idle_duration, stream.next()).await;
@@ -56,41 +56,280 @@ pub async fn process_sse_wire<S, D>(
.await;
return;
}
};
}
.replace("\r\n", "\n")
.replace('\r', "\n");
for line in chunk_str.lines() {
if let Some(tail) = line.strip_prefix("data:") {
data_buffer.push_str(tail.trim_start());
} else if !line.is_empty() && !data_buffer.is_empty() {
data_buffer.push_str(line);
}
if line.is_empty() && !data_buffer.is_empty() {
let json = std::mem::take(&mut data_buffer);
if let Err(e) = decoder
.on_frame(&json, &tx_event, &otel_event_manager)
.await
{
let _ = tx_event.send(Err(e)).await;
return;
}
buffer.push_str(&chunk_str);
while let Some(frame) = next_frame(&mut buffer) {
if !handle_frame(frame, &mut decoder, &tx_event, &otel_event_manager).await {
return;
}
}
}
Ok(None) => {
// If the stream ended without a trailing blank line, flush any
// buffered JSON frame to the decoder before returning.
if !data_buffer.is_empty() {
let json = std::mem::take(&mut data_buffer);
if let Err(e) = decoder
.on_frame(&json, &tx_event, &otel_event_manager)
.await
{
let _ = tx_event.send(Err(e)).await;
}
if !drain_buffer(&mut buffer, &mut decoder, &tx_event, &otel_event_manager).await {
return;
}
return;
}
}
}
}
fn next_frame(buffer: &mut String) -> Option<String> {
loop {
let Some(idx) = buffer.find("\n\n") else {
return None;
};
let frame = buffer[..idx].to_string();
buffer.drain(..idx + 2);
if frame.is_empty() {
continue;
}
return Some(frame);
}
}
async fn drain_buffer<D>(
buffer: &mut String,
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
while let Some(frame) = next_frame(buffer) {
if !handle_frame(frame, decoder, tx_event, otel_event_manager).await {
return false;
}
}
if buffer.is_empty() {
return true;
}
let remainder = std::mem::take(buffer);
handle_frame(remainder, decoder, tx_event, otel_event_manager).await
}
async fn handle_frame<D>(
frame: String,
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
if let Some(data) = parse_sse_frame(&frame) {
if let Err(e) = decoder.on_frame(&data, tx_event, otel_event_manager).await {
let _ = tx_event.send(Err(e)).await;
return false;
}
}
true
}
fn parse_sse_frame(frame: &str) -> Option<String> {
let mut data = String::new();
let mut saw_event = false;
let mut saw_data_line = false;
for raw_line in frame.split('\n') {
let line = raw_line.strip_suffix('\r').unwrap_or(raw_line);
if line.is_empty() {
continue;
}
if let Some(rest) = line.strip_prefix("event:") {
if rest.trim_start().is_empty() && !saw_data_line {
continue;
}
saw_event = true;
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let content = rest.strip_prefix(' ').unwrap_or(rest);
if saw_data_line {
data.push('\n');
}
data.push_str(content);
saw_data_line = true;
continue;
}
if saw_data_line {
data.push('\n');
data.push_str(line.trim_start());
}
}
if data.is_empty() && !saw_event && !saw_data_line {
return None;
}
Some(data)
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::ConversationId;
use futures::stream;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::fmt::Write as _;
use tokio::sync::mpsc;
#[tokio::test]
async fn apply_patch_body_handles_coalesced_and_split_chunks() {
let events = apply_patch_events();
let chunk_variants = vec![
vec![sse(events.clone())],
vec![sse(events[..2].to_vec()), sse(events[2..].to_vec())],
];
for chunks in chunk_variants {
let events = collect_events(chunks).await;
assert_eq!(
events,
vec![
"created",
"response.output_item.done",
"response.output_item.added",
"response.completed"
]
);
}
}
#[tokio::test]
async fn multiple_events_in_single_chunk_emit_done() {
let chunk = sse(vec![
event_output_item_done("call-inline"),
event_completed("resp-inline"),
]);
let events = collect_events(vec![chunk]).await;
assert_eq!(
events,
vec!["response.output_item.done", "response.completed",]
);
}
async fn collect_events(chunks: Vec<String>) -> Vec<String> {
let (tx_event, mut rx_event) = mpsc::channel::<Result<WireEvent>>(16);
let stream = stream::iter(chunks.into_iter().map(|chunk| Ok(Bytes::from(chunk))));
let otel_event_manager = OtelEventManager::new(
ConversationId::new(),
"test-model",
"test-slug",
None,
None,
None,
false,
"terminal".to_string(),
);
let handle = tokio::spawn(process_sse_wire(
stream,
tx_event,
Duration::from_secs(5),
otel_event_manager,
crate::decode_wire::responses::WireResponsesSseDecoder,
));
let mut out = Vec::new();
while let Some(event) = rx_event.recv().await {
let event = event.expect("event decoding should succeed");
out.push(event_name(&event));
}
handle
.await
.expect("SSE framing task should complete without panicking");
out
}
fn event_name(event: &WireEvent) -> String {
match event {
WireEvent::Created => "created",
WireEvent::OutputItemDone(_) => "response.output_item.done",
WireEvent::OutputItemAdded(_) => "response.output_item.added",
WireEvent::Completed { .. } => "response.completed",
WireEvent::OutputTextDelta(_) => "response.output_text.delta",
WireEvent::ReasoningSummaryDelta(_) => "response.reasoning_summary_text.delta",
WireEvent::ReasoningContentDelta(_) => "response.reasoning_text.delta",
WireEvent::ReasoningSummaryPartAdded => "response.reasoning_summary_part.added",
WireEvent::RateLimits(_) => "response.rate_limits",
}
.to_string()
}
fn apply_patch_events() -> Vec<serde_json::Value> {
vec![
json!({
"type": "response.created",
"response": { "id": "resp-apply-patch" }
}),
event_output_item_done("apply-patch-call"),
json!({
"type": "response.output_item.added",
"item": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "ok"}]
}
}),
event_completed("resp-apply-patch"),
]
}
fn event_output_item_done(call_id: &str) -> serde_json::Value {
json!({
"type": "response.output_item.done",
"item": {
"type": "function_call",
"name": "apply_patch",
"arguments": "{\"input\":\"*** Begin Patch\\n*** End Patch\"}",
"call_id": call_id
}
})
}
fn event_completed(id: &str) -> serde_json::Value {
json!({
"type": "response.completed",
"response": {
"id": id,
"usage": {
"input_tokens": 0,
"input_tokens_details": null,
"output_tokens": 0,
"output_tokens_details": null,
"reasoning_output_tokens": 0,
"total_tokens": 0
}
}
})
}
fn sse(events: Vec<serde_json::Value>) -> String {
let mut out = String::new();
for ev in events {
let kind = ev.get("type").and_then(|v| v.as_str()).unwrap_or_default();
writeln!(&mut out, "event: {kind}").unwrap();
if !ev.as_object().map(|o| o.len() == 1).unwrap_or(false) {
write!(&mut out, "data: {ev}\n\n").unwrap();
} else {
out.push('\n');
}
}
out
}
}