This commit is contained in:
jif-oai
2025-11-11 14:58:19 +00:00
parent f6494aa85c
commit 0c609d441b
10 changed files with 224 additions and 43 deletions

View File

@@ -106,9 +106,7 @@ where
}
}
} else {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
item,
))));
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
}
}
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
@@ -116,9 +114,7 @@ where
&item,
ResponseItem::Message { role, .. } if role == "assistant"
) {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(
item,
))));
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => {

View File

@@ -1,4 +1,3 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::protocol::SessionSource;

View File

@@ -1,15 +1,6 @@
use std::time::Duration;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
/// Exponential backoff with a 100ms base and a cap on the exponent to avoid
/// unbounded growth. The attempt number is clamped to [0, 6].
pub(crate) fn backoff(attempt: i64) -> Duration {
let capped = attempt.clamp(0, 6) as u32;
Duration::from_millis(100 * 2_i64.pow(capped) as u64)
}
/// Apply the `x-openai-subagent` header when the session source indicates a
/// subagent. Returns the original builder unchanged when not applicable.
pub(crate) fn apply_subagent_header(

View File

@@ -34,9 +34,7 @@ pub struct ErrorBody {
pub resets_at: Option<i64>,
}
pub fn is_quota_exceeded_error(error: &ErrorBody) -> bool {
error.code.as_deref() == Some("quota_exceeded")
}
// legacy helper removed; decoupled error handling in core
#[derive(Debug, Deserialize)]
pub struct StreamEvent {

View File

@@ -30,6 +30,8 @@ pub use crate::stream::ResponseStream;
pub use crate::stream::TextControls;
pub use crate::stream::TextFormat;
pub use crate::stream::TextFormatType;
pub use crate::stream::WireEvent;
pub use crate::stream::WireResponseStream;
pub use codex_provider_config::BUILT_IN_OSS_MODEL_PROVIDER_ID;
pub use codex_provider_config::ModelProviderInfo;
pub use codex_provider_config::WireApi;

View File

@@ -1,12 +1,10 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use codex_app_server_protocol::AuthMode;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use futures::TryStreamExt;
use reqwest::StatusCode;
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::debug;
@@ -14,8 +12,6 @@ use tracing::trace;
use crate::api::PayloadClient;
use crate::auth::AuthProvider;
use crate::common::backoff;
use crate::decode::responses::ErrorResponse;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
@@ -45,7 +41,6 @@ pub struct ResponsesApiClient {
config: ResponsesApiClientConfig,
}
#[async_trait]
impl PayloadClient for ResponsesApiClient {
type Config = ResponsesApiClientConfig;
@@ -145,4 +140,3 @@ impl PayloadClient for ResponsesApiClient {
Ok(crate::stream::EventStream::from_receiver(rx_event))
}
}

View File

@@ -12,9 +12,12 @@ use crate::ResponsesApiClient;
use crate::ResponsesApiClientConfig;
use crate::Result;
use crate::WireApi;
use crate::WireEvent;
use crate::WireResponseStream;
use crate::api::PayloadClient;
use crate::auth::AuthProvider;
use crate::client::fixtures::stream_from_fixture;
use crate::stream::WireTokenUsage;
use codex_provider_config::ModelProviderInfo;
/// Dispatches to the appropriate API client implementation based on the provider wire API.
@@ -80,6 +83,25 @@ impl RoutedApiClient {
}
}
}
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
) -> Result<WireResponseStream> {
use futures::StreamExt;
let legacy = self.stream_payload(payload_json).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1600);
tokio::spawn(async move {
futures::pin_mut!(legacy);
while let Some(item) = legacy.next().await {
let converted = item.and_then(|ev| map_response_event_to_wire(ev));
if tx.send(converted).await.is_err() {
break;
}
}
});
Ok(crate::stream::EventStream::from_receiver(rx))
}
}
#[async_trait::async_trait]
@@ -98,3 +120,54 @@ impl PayloadClient for RoutedApiClient {
self.stream_payload(payload_json).await
}
}
fn map_response_event_to_wire(ev: crate::stream::ResponseEvent) -> Result<WireEvent> {
Ok(match ev {
crate::stream::ResponseEvent::Created => WireEvent::Created,
crate::stream::ResponseEvent::OutputItemDone(item) => {
WireEvent::OutputItemDone(serde_json::to_value(item).unwrap_or(serde_json::Value::Null))
}
crate::stream::ResponseEvent::OutputItemAdded(item) => WireEvent::OutputItemAdded(
serde_json::to_value(item).unwrap_or(serde_json::Value::Null),
),
crate::stream::ResponseEvent::Completed {
response_id,
token_usage,
} => {
let mapped = token_usage.map(|u| WireTokenUsage {
input_tokens: u.input_tokens,
cached_input_tokens: u.cached_input_tokens,
output_tokens: u.output_tokens,
reasoning_output_tokens: u.reasoning_output_tokens,
total_tokens: u.total_tokens,
});
WireEvent::Completed {
response_id,
token_usage: mapped,
}
}
crate::stream::ResponseEvent::OutputTextDelta(s) => WireEvent::OutputTextDelta(s),
crate::stream::ResponseEvent::ReasoningSummaryDelta(s) => {
WireEvent::ReasoningSummaryDelta(s)
}
crate::stream::ResponseEvent::ReasoningContentDelta(s) => {
WireEvent::ReasoningContentDelta(s)
}
crate::stream::ResponseEvent::ReasoningSummaryPartAdded => {
WireEvent::ReasoningSummaryPartAdded
}
crate::stream::ResponseEvent::RateLimits(s) => {
let to_win = |w: Option<codex_protocol::protocol::RateLimitWindow>| -> Option<crate::stream::WireRateLimitWindow> {
w.map(|w| crate::stream::WireRateLimitWindow {
used_percent: Some(w.used_percent),
window_minutes: w.window_minutes,
resets_at: w.resets_at,
})
};
WireEvent::RateLimits(crate::stream::WireRateLimitSnapshot {
primary: to_win(s.primary),
secondary: to_win(s.secondary),
})
}
})
}

View File

@@ -81,3 +81,43 @@ impl<T> Stream for EventStream<T> {
}
pub type ResponseStream = EventStream<Result<ResponseEvent>>;
#[derive(Debug, Clone)]
pub struct WireTokenUsage {
pub input_tokens: i64,
pub cached_input_tokens: i64,
pub output_tokens: i64,
pub reasoning_output_tokens: i64,
pub total_tokens: i64,
}
#[derive(Debug, Clone)]
pub struct WireRateLimitWindow {
pub used_percent: Option<f64>,
pub window_minutes: Option<i64>,
pub resets_at: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct WireRateLimitSnapshot {
pub primary: Option<WireRateLimitWindow>,
pub secondary: Option<WireRateLimitWindow>,
}
#[derive(Debug)]
pub enum WireEvent {
Created,
OutputItemDone(serde_json::Value),
OutputItemAdded(serde_json::Value),
Completed {
response_id: String,
token_usage: Option<WireTokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta(String),
ReasoningContentDelta(String),
ReasoningSummaryPartAdded,
RateLimits(WireRateLimitSnapshot),
}
pub type WireResponseStream = EventStream<Result<WireEvent>>;