This commit is contained in:
jif-oai
2025-11-10 15:47:36 +00:00
parent 10c880d886
commit 9c267f0204
5 changed files with 396 additions and 28 deletions

View File

@@ -11,6 +11,7 @@ use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::TokenUsage;
use futures::Stream;
use futures::StreamExt;
@@ -260,13 +261,15 @@ async fn handle_error_response(
}
if status == StatusCode::TOO_MANY_REQUESTS {
let rate_limits = parse_rate_limit_snapshot(resp.headers());
let body = resp.json::<ErrorResponse>().await.ok();
if let Some(ErrorResponse { error }) = body {
if error.r#type.as_deref() == Some("usage_limit_reached") {
return StreamAttemptError::Fatal(Error::Stream(
"usage limit reached".to_string(),
None,
));
return StreamAttemptError::Fatal(Error::UsageLimitReached {
plan_type: error.plan_type,
resets_at: error.resets_at,
rate_limits,
});
} else if error.r#type.as_deref() == Some("usage_not_included") {
return StreamAttemptError::Fatal(Error::Stream(
"usage not included".to_string(),
@@ -300,6 +303,7 @@ async fn process_sse<S>(
let mut stream = stream;
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<Error> = None;
let mut data_buffer = String::new();
loop {
let result = timeout(max_idle_duration, stream.next()).await;
@@ -329,7 +333,16 @@ async fn process_sse<S>(
return;
}
Ok(Some(Ok(chunk))) => {
if let Err(err) = process_sse_chunk(chunk, &tx_event).await {
if let Err(err) = process_sse_chunk(
chunk,
&tx_event,
&mut data_buffer,
&mut response_completed,
&mut response_error,
&otel_event_manager,
)
.await
{
let _ = tx_event.send(Err(err)).await;
return;
}
@@ -343,6 +356,18 @@ async fn process_sse<S>(
&otel_event_manager,
)
.await;
} else {
otel_event_manager.sse_event_failed(
None,
Duration::from_millis(0),
&"stream closed before response.completed".to_string(),
);
let _ = tx_event
.send(Err(Error::Stream(
"stream closed before response.completed".to_string(),
None,
)))
.await;
}
return;
}
@@ -354,44 +379,126 @@ async fn emit_response_completed(
tx_event: mpsc::Sender<Result<ResponseEvent>>,
completed: ResponseCompleted,
response_error: Option<Error>,
_otel_event_manager: &OtelEventManager,
otel_event_manager: &OtelEventManager,
) -> Result<()> {
if let Some(err) = response_error {
tx_event.send(Err(err)).await.ok();
return Ok(());
}
let usage = completed.usage.clone();
let response_id = completed.id.clone();
let event = ResponseEvent::Completed {
response_id: completed.id,
token_usage: completed.usage,
response_id,
token_usage: usage.clone(),
};
tx_event.send(Ok(event)).await.ok();
if let Some(usage) = &usage {
otel_event_manager.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
} else {
otel_event_manager.see_event_completed_failed(&"missing token usage".to_string());
}
Ok(())
}
fn parse_rate_limit_snapshot(_headers: &HeaderMap) -> Option<RateLimitSnapshot> {
None
fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
fn parse_f64(headers: &HeaderMap, name: &str) -> Option<f64> {
headers.get(name)?.to_str().ok()?.parse::<f64>().ok()
}
fn parse_i64(headers: &HeaderMap, name: &str) -> Option<i64> {
headers.get(name)?.to_str().ok()?.parse::<i64>().ok()
}
let primary_used = parse_f64(headers, "x-codex-primary-used-percent");
let primary_window = parse_i64(headers, "x-codex-primary-window-minutes");
let primary_resets = parse_i64(headers, "x-codex-primary-resets-at")
.or_else(|| parse_i64(headers, "x-codex-primary-reset-at"));
let secondary_used = parse_f64(headers, "x-codex-secondary-used-percent");
let secondary_window = parse_i64(headers, "x-codex-secondary-window-minutes");
let secondary_resets = parse_i64(headers, "x-codex-secondary-resets-at")
.or_else(|| parse_i64(headers, "x-codex-secondary-reset-at"));
let primary = primary_used.map(|used_percent| RateLimitWindow {
used_percent,
window_minutes: primary_window,
resets_at: primary_resets,
});
let secondary = secondary_used.map(|used_percent| RateLimitWindow {
used_percent,
window_minutes: secondary_window,
resets_at: secondary_resets,
});
if primary.is_some() || secondary.is_some() {
Some(RateLimitSnapshot { primary, secondary })
} else {
None
}
}
async fn process_sse_chunk(
chunk: Bytes,
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
data_buffer: &mut String,
response_completed: &mut Option<ResponseCompleted>,
response_error: &mut Option<Error>,
otel_event_manager: &OtelEventManager,
) -> Result<()> {
let chunk_str = std::str::from_utf8(&chunk)
.map_err(|err| Error::Other(format!("Invalid UTF-8 in SSE chunk: {err}")))?;
trace!("responses api chunk ({chunk_str:?})");
let mut data_buffer = String::new();
for line in chunk_str.lines() {
if let Some(tail) = line.strip_prefix("data:") {
data_buffer.push_str(tail.trim_start());
} else if !line.is_empty() && !data_buffer.is_empty() {
// Continuation of a long data: line split across chunks; append raw.
data_buffer.push_str(line);
}
if line.is_empty() {
let payload: sse::Payload = serde_json::from_str(&data_buffer)
.map_err(|err| Error::Other(format!("Cannot parse SSE JSON: {err}")))?;
handle_sse_payload(payload, tx_event).await?;
// First try the "event-shaped" payload used by test harness
if let Ok(event) = serde_json::from_str::<StreamEvent>(data_buffer) {
otel_event_manager.sse_event_kind(&event.r#type);
handle_stream_event(
event,
tx_event.clone(),
response_completed,
response_error,
otel_event_manager,
)
.await;
} else {
// Log parse errors for otel tracing (event-shaped)
otel_event_manager.sse_event_failed(
None,
Duration::from_millis(0),
&format!("Cannot parse SSE JSON: {data_buffer}"),
);
// Fall back to field-shaped payload used by Responses API variants
match serde_json::from_str::<sse::Payload>(data_buffer) {
Ok(payload) => {
handle_sse_payload(payload, tx_event, otel_event_manager).await?;
}
Err(err) => {
// Also emit failure when field-shaped parse fails
otel_event_manager.sse_event_failed(
None,
Duration::from_millis(0),
&format!("Cannot parse SSE JSON: {err}"),
);
return Err(Error::Other(format!("Cannot parse SSE JSON: {err}")));
}
}
}
data_buffer.clear();
}
}
@@ -402,14 +509,29 @@ async fn process_sse_chunk(
async fn handle_sse_payload(
payload: sse::Payload,
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
otel_event_manager: &OtelEventManager,
) -> Result<()> {
if let Some(responses) = payload.responses {
for ev in responses {
let event = match ev {
sse::Response::Completed(complete) => ResponseEvent::Completed {
response_id: complete.id,
token_usage: complete.usage,
},
sse::Response::Completed(complete) => {
if let Some(usage) = &complete.usage {
otel_event_manager.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
} else {
otel_event_manager
.see_event_completed_failed(&"missing token usage".to_string());
}
ResponseEvent::Completed {
response_id: complete.id,
token_usage: complete.usage,
}
}
sse::Response::Error(err) => {
let retry_after = err
.retry_after
@@ -445,6 +567,7 @@ async fn handle_sse_payload(
match item.r#type {
sse::OutputItem::Created => {
tx_event.send(Ok(ResponseEvent::Created)).await.ok();
otel_event_manager.sse_event_kind("response.output_item.done");
}
}
}
@@ -463,6 +586,7 @@ async fn handle_sse_payload(
.send(Ok(ResponseEvent::OutputItemDone(response_item)))
.await
.ok();
otel_event_manager.sse_event_kind("response.output_item.done");
}
if let Some(reasoning_content_delta) = payload.response_output_reasoning_delta {
@@ -503,6 +627,12 @@ struct ResponseCompleted {
usage: Option<TokenUsage>,
}
#[derive(Debug, Deserialize)]
struct StreamResponseCompleted {
id: String,
usage: Option<TokenUsagePartial>,
}
#[derive(Debug, Deserialize)]
struct ErrorResponse {
error: ErrorBody,
@@ -631,8 +761,51 @@ pub async fn stream_from_fixture(
Ok(ResponseStream { rx_event })
}
fn attach_item_ids_array(_json_array: &mut Vec<Value>, _prompt_input: &[ResponseItem]) {
// no-op for current protocol version
fn attach_item_ids_array(json_array: &mut Vec<Value>, prompt_input: &[ResponseItem]) {
for (json_item, item) in json_array.iter_mut().zip(prompt_input.iter()) {
let Some(obj) = json_item.as_object_mut() else {
continue;
};
// Helper to set id only if missing/null
let mut set_id_if_absent = |id: &str| match obj.get("id") {
Some(Value::String(s)) if !s.is_empty() => {}
Some(Value::Null) | None => {
obj.insert("id".to_string(), Value::String(id.to_string()));
}
_ => {}
};
match item {
ResponseItem::Reasoning { id, .. } => set_id_if_absent(id),
ResponseItem::Message { id, .. } => {
if let Some(id) = id.as_ref() {
set_id_if_absent(id);
}
}
ResponseItem::WebSearchCall { id, .. } => {
if let Some(id) = id.as_ref() {
set_id_if_absent(id);
}
}
ResponseItem::FunctionCall { id, .. } => {
if let Some(id) = id.as_ref() {
set_id_if_absent(id);
}
}
ResponseItem::LocalShellCall { id, .. } => {
if let Some(id) = id.as_ref() {
set_id_if_absent(id);
}
}
ResponseItem::CustomToolCall { id, .. } => {
if let Some(id) = id.as_ref() {
set_id_if_absent(id);
}
}
_ => {}
}
}
}
#[derive(Debug, Deserialize)]
@@ -641,6 +814,66 @@ struct StreamEvent {
response: Option<Value>,
item: Option<Value>,
error: Option<Value>,
#[serde(default)]
delta: Option<String>,
}
#[derive(Debug, Deserialize)]
struct TokenUsagePartial {
#[serde(default)]
input_tokens: i64,
#[serde(default)]
cached_input_tokens: i64,
#[serde(default)]
input_tokens_details: Option<TokenUsageInputDetails>,
#[serde(default)]
output_tokens: i64,
#[serde(default)]
output_tokens_details: Option<TokenUsageOutputDetails>,
#[serde(default)]
reasoning_output_tokens: i64,
#[serde(default)]
total_tokens: i64,
}
impl From<TokenUsagePartial> for TokenUsage {
fn from(value: TokenUsagePartial) -> Self {
let cached_input_tokens = if value.cached_input_tokens > 0 {
Some(value.cached_input_tokens)
} else {
value
.input_tokens_details
.and_then(|d| d.cached_tokens)
.filter(|v| *v > 0)
};
let reasoning_output_tokens = if value.reasoning_output_tokens > 0 {
Some(value.reasoning_output_tokens)
} else {
value
.output_tokens_details
.and_then(|d| d.reasoning_tokens)
.filter(|v| *v > 0)
};
Self {
input_tokens: value.input_tokens,
cached_input_tokens: cached_input_tokens.unwrap_or(0),
output_tokens: value.output_tokens,
reasoning_output_tokens: reasoning_output_tokens.unwrap_or(0),
total_tokens: value.total_tokens,
}
}
}
#[derive(Debug, Deserialize)]
struct TokenUsageInputDetails {
#[serde(default)]
cached_tokens: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct TokenUsageOutputDetails {
#[serde(default)]
reasoning_tokens: Option<i64>,
}
#[derive(Debug, Deserialize)]
@@ -653,15 +886,70 @@ async fn handle_stream_event(
tx_event: mpsc::Sender<Result<ResponseEvent>>,
response_completed: &mut Option<ResponseCompleted>,
response_error: &mut Option<Error>,
otel_event_manager: &OtelEventManager,
) {
trace!("response event: {}", event.r#type);
match event.r#type.as_str() {
"response.created" => {
// Emit Created as soon as we see a created event
let _ = tx_event.send(Ok(ResponseEvent::Created)).await;
}
"response.output_text.delta" => {
if let Some(item_val) = event.item {
let resp = serde_json::from_value::<TextDelta>(item_val);
if let Ok(delta) = resp {
let event = ResponseEvent::OutputTextDelta(delta.delta);
if tx_event.send(Ok(event)).await.is_err() {}
let _ = tx_event.send(Ok(event)).await;
}
} else if let Some(delta) = event.delta {
let _ = tx_event
.send(Ok(ResponseEvent::OutputTextDelta(delta)))
.await;
}
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningContentDelta(delta);
let _ = tx_event.send(Ok(event)).await;
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
let _ = tx_event.send(Ok(event)).await;
}
}
"response.output_item.done" => {
if let Some(item_val) = event.item
&& let Ok(item) = serde_json::from_value::<ResponseItem>(item_val)
{
let event = ResponseEvent::OutputItemDone(item);
if tx_event.send(Ok(event)).await.is_err() {}
}
}
"response.failed" => {
if let Some(resp_val) = event.response {
otel_event_manager.sse_event_failed(
Some(&"response.failed".to_string()),
Duration::from_millis(0),
&resp_val,
);
// Propagate failure downstream; map context window errors to a
// stable message that core can handle specially.
if let Some(err) = resp_val
.get("error")
.cloned()
.and_then(|v| serde_json::from_value::<ErrorBody>(v).ok())
{
let msg = if err.code.as_deref() == Some("context_length_exceeded") {
"context window exceeded".to_string()
} else if err.code.as_deref() == Some("insufficient_quota") {
"quota exceeded".to_string()
} else {
err.message.unwrap_or_else(|| "fatal error".to_string())
};
let _ = tx_event.send(Err(Error::Stream(msg, None))).await;
}
}
}
@@ -691,11 +979,39 @@ async fn handle_stream_event(
}
"response.completed" => {
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseCompleted>(resp_val) {
match serde_json::from_value::<StreamResponseCompleted>(resp_val) {
Ok(resp) => {
*response_completed = Some(resp);
let usage = resp.usage.map(TokenUsage::from);
let completed = ResponseCompleted {
id: resp.id.clone(),
usage: usage.clone(),
};
// Emit Completed immediately to match field-shaped behavior.
let ev = ResponseEvent::Completed {
response_id: resp.id,
token_usage: usage,
};
let _ = tx_event.send(Ok(ev)).await;
if let Some(usage) = &completed.usage {
otel_event_manager.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
} else {
otel_event_manager
.see_event_completed_failed(&"missing token usage".to_string());
}
*response_completed = Some(completed);
}
Err(err) => {
otel_event_manager.sse_event_failed(
Some(&"response.completed".to_string()),
Duration::from_millis(0),
&format!("failed to parse ResponseCompleted: {err}"),
);
let _ = tx_event
.send(Err(Error::Stream(
format!("failed to parse ResponseCompleted: {err}"),