Simplification 3

This commit is contained in:
jif-oai
2025-11-12 16:18:24 +00:00
parent 166ca2fce7
commit 001ed59f5c
6 changed files with 270 additions and 397 deletions

View File

@@ -16,16 +16,139 @@ use crate::stream::WireEvent;
// Legacy ResponseEvent-based SSE framer removed
async fn send_stream_error(
otel_event_manager: &OtelEventManager,
tx_event: &mpsc::Sender<Result<WireEvent>>,
event: Option<String>,
duration: Duration,
log_reason: impl std::fmt::Display,
error: Error,
) {
otel_event_manager.sse_event_failed(event.as_ref(), duration, &log_reason);
let _ = tx_event.send(Err(error)).await;
struct SseProcessor<S, D> {
stream: S,
decoder: D,
tx_event: mpsc::Sender<Result<WireEvent>>,
otel_event_manager: OtelEventManager,
buffer: String,
max_idle_duration: Duration,
}
impl<S, D> SseProcessor<S, D>
where
S: Stream<Item = Result<Bytes>> + Send + 'static + Unpin,
D: crate::client::WireResponseDecoder + Send,
{
async fn run(mut self) {
loop {
let start = Instant::now();
let result = timeout(self.max_idle_duration, self.stream.next()).await;
let duration = start.elapsed();
match result {
Err(_) => {
self.send_error(
None,
duration,
"idle timeout waiting for SSE",
Error::Stream(
"stream idle timeout fired before Completed event".to_string(),
None,
),
)
.await;
return;
}
Ok(Some(Err(err))) => {
let message = format!("{err}");
self.send_error(None, duration, &message, err).await;
return;
}
Ok(Some(Ok(chunk))) => {
if !self.process_chunk(chunk, duration).await {
return;
}
}
Ok(None) => {
if !self.drain_buffer(duration).await {
return;
}
return;
}
}
}
}
async fn process_chunk(&mut self, chunk: Bytes, duration: Duration) -> bool {
let chunk_str = match std::str::from_utf8(&chunk) {
Ok(s) => s,
Err(err) => {
self.send_error(
None,
duration,
&format!("UTF8 error: {err}"),
Error::Other(format!("Invalid UTF-8 in SSE chunk: {err}")),
)
.await;
return false;
}
}
.replace("\r\n", "\n")
.replace('\r', "\n");
self.buffer.push_str(&chunk_str);
while let Some(frame) = next_frame(&mut self.buffer) {
if !self.handle_frame(frame, duration).await {
return false;
}
}
true
}
async fn drain_buffer(&mut self, duration: Duration) -> bool {
while let Some(frame) = next_frame(&mut self.buffer) {
if !self.handle_frame(frame, duration).await {
return false;
}
}
if self.buffer.is_empty() {
return true;
}
let remainder = std::mem::take(&mut self.buffer);
self.handle_frame(remainder, duration).await
}
async fn handle_frame(&mut self, frame: String, duration: Duration) -> bool {
if let Some(frame) = parse_sse_frame(&frame) {
if frame.data.trim() == "[DONE]" {
self.otel_event_manager.sse_event_kind(&frame.event);
return true;
}
match self
.decoder
.on_frame(&frame.data, &self.tx_event, &self.otel_event_manager)
.await
{
Ok(_) => {
self.otel_event_manager.sse_event_kind(&frame.event);
}
Err(e) => {
let reason = format!("{e}");
self.send_error(Some(frame.event.clone()), duration, &reason, e)
.await;
return false;
}
};
}
true
}
async fn send_error(
&mut self,
event: Option<String>,
duration: Duration,
log_reason: impl std::fmt::Display,
error: Error,
) {
self.otel_event_manager
.sse_event_failed(event.as_ref(), duration, &log_reason);
let _ = self.tx_event.send(Err(error)).await;
}
}
/// Spawn an SSE processing task and return a sender/stream pair for wire events.
@@ -68,116 +191,21 @@ pub async fn process_sse_wire<S, D>(
tx_event: mpsc::Sender<Result<WireEvent>>,
max_idle_duration: Duration,
otel_event_manager: OtelEventManager,
mut decoder: D,
decoder: D,
) where
S: Stream<Item = Result<Bytes>> + Send + 'static + Unpin,
D: crate::client::WireResponseDecoder + Send,
{
let mut stream = stream;
let mut buffer = String::new();
loop {
let start = Instant::now();
let result = timeout(max_idle_duration, stream.next()).await;
let duration = start.elapsed();
match result {
Err(_) => {
send_stream_error(
&otel_event_manager,
&tx_event,
None,
duration,
"idle timeout waiting for SSE",
Error::Stream(
"stream idle timeout fired before Completed event".to_string(),
None,
),
)
.await;
return;
}
Ok(Some(Err(err))) => {
let message = format!("{err}");
send_stream_error(
&otel_event_manager,
&tx_event,
None,
duration,
&message,
err,
)
.await;
return;
}
Ok(Some(Ok(chunk))) => {
if !process_chunk(
chunk,
duration,
&mut buffer,
&mut decoder,
&tx_event,
&otel_event_manager,
)
.await
{
return;
}
}
Ok(None) => {
if !drain_buffer(
&mut buffer,
&mut decoder,
&tx_event,
&otel_event_manager,
duration,
)
.await
{
return;
}
return;
}
}
SseProcessor {
stream,
decoder,
tx_event,
otel_event_manager,
buffer: String::new(),
max_idle_duration,
}
}
async fn process_chunk<D>(
chunk: Bytes,
duration: Duration,
buffer: &mut String,
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
let chunk_str = match std::str::from_utf8(&chunk) {
Ok(s) => s,
Err(err) => {
send_stream_error(
otel_event_manager,
tx_event,
None,
duration,
&format!("UTF8 error: {err}"),
Error::Other(format!("Invalid UTF-8 in SSE chunk: {err}")),
)
.await;
return false;
}
}
.replace("\r\n", "\n")
.replace('\r', "\n");
buffer.push_str(&chunk_str);
while let Some(frame) = next_frame(buffer) {
if !handle_frame(frame, decoder, tx_event, otel_event_manager, duration).await {
return false;
}
}
true
.run()
.await;
}
fn next_frame(buffer: &mut String) -> Option<String> {
@@ -195,72 +223,6 @@ fn next_frame(buffer: &mut String) -> Option<String> {
}
}
async fn drain_buffer<D>(
buffer: &mut String,
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
duration: Duration,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
while let Some(frame) = next_frame(buffer) {
if !handle_frame(frame, decoder, tx_event, otel_event_manager, duration).await {
return false;
}
}
if buffer.is_empty() {
return true;
}
let remainder = std::mem::take(buffer);
handle_frame(remainder, decoder, tx_event, otel_event_manager, duration).await
}
async fn handle_frame<D>(
frame: String,
decoder: &mut D,
tx_event: &mpsc::Sender<Result<WireEvent>>,
otel_event_manager: &OtelEventManager,
duration: Duration,
) -> bool
where
D: crate::client::WireResponseDecoder + Send,
{
if let Some(frame) = parse_sse_frame(&frame) {
if frame.data.trim() == "[DONE]" {
otel_event_manager.sse_event_kind(&frame.event);
return true;
}
match decoder
.on_frame(&frame.data, tx_event, otel_event_manager)
.await
{
Ok(_) => {
otel_event_manager.sse_event_kind(&frame.event);
}
Err(e) => {
let reason = format!("{e}");
send_stream_error(
otel_event_manager,
tx_event,
Some(frame.event.clone()),
duration,
&reason,
e,
)
.await;
return false;
}
};
}
true
}
fn parse_sse_frame(frame: &str) -> Option<SseFrame> {
let mut data = String::new();
let mut event: Option<String> = None;

View File

@@ -3,7 +3,7 @@ use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use serde_json::Value;
use serde::Deserialize;
use tokio::sync::mpsc;
use tracing::debug;
@@ -16,10 +16,6 @@ async fn send_wire_event(tx: &mpsc::Sender<crate::error::Result<WireEvent>>, eve
let _ = tx.send(Ok(event)).await;
}
fn serialize_response_item(item: ResponseItem) -> Value {
serde_json::to_value(item).unwrap_or_else(|_| Value::String(String::new()))
}
#[derive(Default)]
struct FunctionCallState {
active: bool,
@@ -28,6 +24,52 @@ struct FunctionCallState {
arguments: String,
}
#[derive(Debug, Default, Deserialize)]
struct ChatChunk {
#[serde(default)]
choices: Vec<ChatChoice>,
}
#[derive(Debug, Default, Deserialize)]
struct ChatChoice {
#[serde(default)]
delta: Option<ChatDelta>,
#[serde(default)]
finish_reason: Option<String>,
}
#[derive(Debug, Default, Deserialize)]
struct ChatDelta {
#[serde(default)]
content: Vec<DeltaText>,
#[serde(default)]
reasoning_content: Vec<DeltaText>,
#[serde(default)]
tool_calls: Vec<ChatToolCall>,
}
#[derive(Debug, Default, Deserialize)]
struct DeltaText {
#[serde(default)]
text: String,
}
#[derive(Debug, Default, Deserialize)]
struct ChatToolCall {
#[serde(default)]
id: Option<String>,
#[serde(default)]
function: Option<ChatFunction>,
}
#[derive(Debug, Default, Deserialize)]
struct ChatFunction {
#[serde(default)]
name: String,
#[serde(default)]
arguments: String,
}
#[derive(Default)]
pub struct WireChatSseDecoder {
fn_call_state: FunctionCallState,
@@ -53,24 +95,22 @@ impl WireChatSseDecoder {
async fn handle_content_delta(
&mut self,
delta: &Value,
delta: &ChatDelta,
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
) {
if let Some(content) = delta.get("content").and_then(|c| c.as_array()) {
for piece in content {
if let Some(text) = piece.get("text").and_then(|t| t.as_str()) {
self.push_assistant_text(text, tx).await;
}
for piece in &delta.content {
if !piece.text.is_empty() {
self.push_assistant_text(&piece.text, tx).await;
}
}
if let Some(reasoning) = delta.get("reasoning_content").and_then(|c| c.as_array()) {
for entry in reasoning {
if let Some(text) = entry.get("text").and_then(|t| t.as_str()) {
self.push_reasoning_text(text, tx).await;
}
for entry in &delta.reasoning_content {
if !entry.text.is_empty() {
self.push_reasoning_text(&entry.text, tx).await;
}
}
self.record_tool_calls(&delta.tool_calls);
}
async fn push_assistant_text(
@@ -105,11 +145,7 @@ impl WireChatSseDecoder {
text: String::new(),
}],
};
send_wire_event(
tx,
WireEvent::OutputItemAdded(serialize_response_item(message)),
)
.await;
send_wire_event(tx, WireEvent::OutputItemAdded(message)).await;
}
async fn start_reasoning(&mut self, tx: &mpsc::Sender<crate::error::Result<WireEvent>>) {
@@ -123,33 +159,27 @@ impl WireChatSseDecoder {
content: None,
encrypted_content: None,
};
send_wire_event(
tx,
WireEvent::OutputItemAdded(serialize_response_item(reasoning_item)),
)
.await;
send_wire_event(tx, WireEvent::OutputItemAdded(reasoning_item)).await;
}
fn record_tool_calls(&mut self, delta: &Value) {
if let Some(tool_calls) = delta.get("tool_calls").and_then(|c| c.as_array()) {
for call in tool_calls {
if let Some(id_val) = call.get("id").and_then(|id| id.as_str()) {
self.fn_call_state.call_id = Some(id_val.to_string());
fn record_tool_calls(&mut self, tool_calls: &[ChatToolCall]) {
for call in tool_calls {
if let Some(id_val) = &call.id {
self.fn_call_state.call_id = Some(id_val.clone());
}
if let Some(function) = &call.function {
if !function.name.is_empty() {
self.fn_call_state.name = Some(function.name.clone());
self.fn_call_state.active = true;
}
if let Some(function) = call.get("function") {
if let Some(name) = function.get("name").and_then(|n| n.as_str()) {
self.fn_call_state.name = Some(name.to_string());
self.fn_call_state.active = true;
}
if let Some(args) = function.get("arguments").and_then(|a| a.as_str()) {
self.fn_call_state.arguments.push_str(args);
}
if !function.arguments.is_empty() {
self.fn_call_state.arguments.push_str(&function.arguments);
}
}
}
}
fn finish_function_call(&mut self) -> Option<Value> {
fn finish_function_call(&mut self) -> Option<ResponseItem> {
if !self.fn_call_state.active {
return None;
}
@@ -158,16 +188,15 @@ impl WireChatSseDecoder {
let arguments = std::mem::take(&mut self.fn_call_state.arguments);
self.fn_call_state = FunctionCallState::default();
Some(serde_json::json!({
"type": "function_call",
"id": call_id,
"call_id": call_id,
"name": function_name,
"arguments": arguments,
}))
Some(ResponseItem::FunctionCall {
id: Some(call_id.clone()),
name: function_name,
arguments,
call_id,
})
}
fn finish_reasoning(&mut self) -> Option<Value> {
fn finish_reasoning(&mut self) -> Option<ResponseItem> {
if !self.reasoning_started {
return None;
}
@@ -179,26 +208,26 @@ impl WireChatSseDecoder {
}
self.reasoning_started = false;
Some(serialize_response_item(ResponseItem::Reasoning {
Some(ResponseItem::Reasoning {
id: String::new(),
summary: vec![],
content: Some(content),
encrypted_content: None,
}))
})
}
fn finish_assistant(&mut self) -> Option<Value> {
fn finish_assistant(&mut self) -> Option<ResponseItem> {
if !self.assistant_started {
return None;
}
let text = std::mem::take(&mut self.assistant_text);
self.assistant_started = false;
Some(serialize_response_item(ResponseItem::Message {
Some(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText { text }],
}))
})
}
fn reset_reasoning_and_assistant(&mut self) {
@@ -217,55 +246,45 @@ impl WireResponseDecoder for WireChatSseDecoder {
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
_otel: &OtelEventManager,
) -> Result<()> {
// Chat sends a terminal "[DONE]" frame; ignore it. Treat other parse errors as failures.
let parsed_chunk = serde_json::from_str::<Value>(json).map_err(|err| {
let chunk = serde_json::from_str::<ChatChunk>(json).map_err(|err| {
debug!("failed to parse Chat SSE JSON: {}", json);
Error::Other(format!("failed to parse Chat SSE JSON: {err}"))
})?;
let choices = parsed_chunk
.get("choices")
.and_then(|choices| choices.as_array())
.cloned()
.unwrap_or_default();
for choice in choices {
for choice in chunk.choices {
self.emit_created_once(tx).await;
if let Some(delta) = choice.get("delta") {
if let Some(delta) = &choice.delta {
self.handle_content_delta(delta, tx).await;
self.record_tool_calls(delta);
}
if let Some(finish_reason) = choice.get("finish_reason").and_then(|f| f.as_str()) {
match finish_reason {
"tool_calls" => {
if let Some(item) = self.finish_function_call() {
send_wire_event(tx, WireEvent::OutputItemDone(item)).await;
}
match choice.finish_reason.as_deref() {
Some("tool_calls") => {
if let Some(item) = self.finish_function_call() {
send_wire_event(tx, WireEvent::OutputItemDone(item)).await;
}
"stop" | "length" => {
if let Some(reasoning_item) = self.finish_reasoning() {
send_wire_event(tx, WireEvent::OutputItemDone(reasoning_item)).await;
}
if let Some(message) = self.finish_assistant() {
send_wire_event(tx, WireEvent::OutputItemDone(message)).await;
}
send_wire_event(
tx,
WireEvent::Completed {
response_id: String::new(),
token_usage: None,
},
)
.await;
self.reset_reasoning_and_assistant();
}
_ => {}
}
Some("stop") | Some("length") => {
if let Some(reasoning_item) = self.finish_reasoning() {
send_wire_event(tx, WireEvent::OutputItemDone(reasoning_item)).await;
}
if let Some(message) = self.finish_assistant() {
send_wire_event(tx, WireEvent::OutputItemDone(message)).await;
}
send_wire_event(
tx,
WireEvent::Completed {
response_id: String::new(),
token_usage: None,
},
)
.await;
self.reset_reasoning_and_assistant();
}
_ => {}
}
}

View File

@@ -1,5 +1,7 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage;
use serde::Deserialize;
use serde_json::Value;
use tokio::sync::mpsc;
@@ -9,7 +11,6 @@ use crate::client::WireResponseDecoder;
use crate::error::Error;
use crate::error::Result;
use crate::stream::WireEvent;
use crate::stream::WireTokenUsage;
#[derive(Debug, Deserialize)]
struct StreamEvent {
@@ -96,12 +97,14 @@ impl WireResponseDecoder for WireResponsesSseDecoder {
}
"response.output_item.done" => {
if let Some(item_val) = event.item {
let _ = tx.send(Ok(WireEvent::OutputItemDone(item_val))).await;
let item = parse_response_item(item_val);
let _ = tx.send(Ok(WireEvent::OutputItemDone(item))).await;
}
}
"response.output_item.added" => {
if let Some(item_val) = event.item {
let _ = tx.send(Ok(WireEvent::OutputItemAdded(item_val))).await;
let item = parse_response_item(item_val);
let _ = tx.send(Ok(WireEvent::OutputItemAdded(item))).await;
}
}
"response.reasoning_summary_part.added" => {
@@ -114,7 +117,7 @@ impl WireResponseDecoder for WireResponsesSseDecoder {
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let usage = parse_wire_usage(&resp);
let usage = parse_usage(&resp);
if let Some(u) = &usage {
otel.sse_event_completed(
u.input_tokens,
@@ -151,7 +154,7 @@ impl WireResponseDecoder for WireResponsesSseDecoder {
}
}
fn parse_wire_usage(resp: &Value) -> Option<WireTokenUsage> {
fn parse_usage(resp: &Value) -> Option<TokenUsage> {
let usage: WireUsage = serde_json::from_value(resp.get("usage")?.clone()).ok()?;
let cached_input_tokens = usage
.cached_input_tokens
@@ -170,7 +173,7 @@ fn parse_wire_usage(resp: &Value) -> Option<WireTokenUsage> {
})
.unwrap_or(0);
Some(WireTokenUsage {
Some(TokenUsage {
input_tokens: usage.input_tokens,
cached_input_tokens,
output_tokens: usage.output_tokens,
@@ -178,3 +181,7 @@ fn parse_wire_usage(resp: &Value) -> Option<WireTokenUsage> {
total_tokens: usage.total_tokens,
})
}
fn parse_response_item(value: Value) -> ResponseItem {
serde_json::from_value(value).unwrap_or(ResponseItem::Other)
}

View File

@@ -125,7 +125,7 @@ impl ResponsesApiClient {
);
if let Some(snapshot) = snapshot
&& tx_event
.send(Ok(crate::stream::WireEvent::RateLimits(snapshot.into())))
.send(Ok(crate::stream::WireEvent::RateLimits(snapshot)))
.await
.is_err()
{

View File

@@ -6,7 +6,6 @@ use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::TokenUsage;
use futures::Stream;
use serde::Serialize;
@@ -83,61 +82,5 @@ impl<T> Stream for EventStream<T> {
pub type ResponseStream = EventStream<Result<ResponseEvent>>;
#[derive(Debug, Clone)]
pub struct WireTokenUsage {
pub input_tokens: i64,
pub cached_input_tokens: i64,
pub output_tokens: i64,
pub reasoning_output_tokens: i64,
pub total_tokens: i64,
}
#[derive(Debug, Clone)]
pub struct WireRateLimitWindow {
pub used_percent: Option<f64>,
pub window_minutes: Option<i64>,
pub resets_at: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct WireRateLimitSnapshot {
pub primary: Option<WireRateLimitWindow>,
pub secondary: Option<WireRateLimitWindow>,
}
impl From<RateLimitWindow> for WireRateLimitWindow {
fn from(window: RateLimitWindow) -> Self {
Self {
used_percent: Some(window.used_percent),
window_minutes: window.window_minutes,
resets_at: window.resets_at,
}
}
}
impl From<RateLimitSnapshot> for WireRateLimitSnapshot {
fn from(snapshot: RateLimitSnapshot) -> Self {
Self {
primary: snapshot.primary.map(Into::into),
secondary: snapshot.secondary.map(Into::into),
}
}
}
#[derive(Debug)]
pub enum WireEvent {
Created,
OutputItemDone(serde_json::Value),
OutputItemAdded(serde_json::Value),
Completed {
response_id: String,
token_usage: Option<WireTokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta(String),
ReasoningContentDelta(String),
ReasoningSummaryPartAdded,
RateLimits(WireRateLimitSnapshot),
}
pub type WireResponseStream = EventStream<Result<WireEvent>>;
pub type WireEvent = ResponseEvent;
pub type WireResponseStream = ResponseStream;