This commit is contained in:
jif-oai
2025-11-11 15:36:22 +00:00
parent b7dcc8ef5c
commit 769e9cc92c
16 changed files with 306 additions and 856 deletions

View File

@@ -1,20 +0,0 @@
use async_trait::async_trait;
use crate::error::Result;
use crate::stream::ResponseStream;
use codex_protocol::protocol::SessionSource;
use serde_json::Value;
#[async_trait]
pub trait PayloadClient: Sized {
type Config;
fn new(config: Self::Config) -> Result<Self>;
/// Start a streaming request for a pre-built wire JSON payload.
async fn stream_payload(
&self,
payload_json: &Value,
session_source: Option<&SessionSource>,
) -> Result<ResponseStream>;
}

View File

@@ -1,16 +1,9 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::protocol::SessionSource;
use futures::TryStreamExt;
use tokio::sync::mpsc;
use crate::api::PayloadClient;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
use crate::stream::ResponseStream;
use crate::stream::WireResponseStream;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_provider_config::ModelProviderInfo;
use futures::TryStreamExt;
#[derive(Clone)]
/// Configuration for the Chat Completions client (OpenAI-compatible `/v1/chat/completions`).
@@ -25,7 +18,6 @@ pub struct ChatCompletionsApiClientConfig {
pub provider: ModelProviderInfo,
pub model: String,
pub otel_event_manager: OtelEventManager,
pub session_source: SessionSource,
pub extra_headers: Vec<(String, String)>,
}
@@ -34,23 +26,16 @@ pub struct ChatCompletionsApiClient {
config: ChatCompletionsApiClientConfig,
}
// prompt-based API removed; use PayloadClient::stream_payload instead
// prompt-based API removed
#[async_trait]
impl PayloadClient for ChatCompletionsApiClient {
type Config = ChatCompletionsApiClientConfig;
fn new(config: Self::Config) -> Result<Self> {
impl ChatCompletionsApiClient {
pub fn new(config: ChatCompletionsApiClientConfig) -> Result<Self> {
Ok(Self { config })
}
async fn stream_payload(
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<ResponseStream> {
_session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
if self.config.provider.wire_api != codex_provider_config::WireApi::Chat {
return Err(crate::error::Error::UnsupportedOperation(
"ChatCompletionsApiClient requires a Chat provider".to_string(),
@@ -68,7 +53,6 @@ impl PayloadClient for ChatCompletionsApiClient {
&self.config.http_client,
&self.config.provider,
&auth,
session_source,
&extra_headers,
)
.await?;
@@ -83,7 +67,8 @@ impl PayloadClient for ChatCompletionsApiClient {
.log_request(0, || req_builder.send())
.await?;
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let (tx_event, rx_event) =
tokio::sync::mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
let stream = res
.bytes_stream()
.map_err(|err| Error::ResponseStreamFailed {
@@ -92,36 +77,14 @@ impl PayloadClient for ChatCompletionsApiClient {
});
let idle_timeout = self.config.provider.stream_idle_timeout();
let otel = self.config.otel_event_manager.clone();
tokio::spawn(crate::client::sse::process_sse(
tokio::spawn(crate::client::sse::process_sse_wire(
stream,
tx_event,
idle_timeout,
otel,
crate::decode::chat::ChatSseDecoder::new(),
crate::decode_wire::chat::WireChatSseDecoder::new(),
));
Ok(crate::stream::EventStream::from_receiver(rx_event))
}
}
impl ChatCompletionsApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
use futures::StreamExt;
let legacy = self.stream_payload(payload_json, session_source).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1600);
tokio::spawn(async move {
futures::pin_mut!(legacy);
while let Some(item) = legacy.next().await {
let converted = item.and_then(|ev| crate::wire::map_response_event_to_wire(ev));
if tx.send(converted).await.is_err() {
break;
}
}
});
Ok(crate::stream::EventStream::from_receiver(rx))
}
}

View File

@@ -8,16 +8,14 @@ use tokio_util::io::ReaderStream;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
use crate::stream::ResponseStream;
use codex_provider_config::ModelProviderInfo;
pub async fn stream_from_fixture(
pub async fn stream_from_fixture_wire(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
otel_event_manager: OtelEventManager,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
) -> Result<crate::stream::WireResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
let display_path = path.as_ref().display().to_string();
let file = std::fs::File::open(path.as_ref())
.map_err(|err| Error::Other(format!("failed to open fixture {display_path}: {err}")))?;
@@ -34,12 +32,12 @@ pub async fn stream_from_fixture(
let rdr = std::io::Cursor::new(content);
let stream = ReaderStream::new(rdr).map_err(|err| Error::Other(err.to_string()));
tokio::spawn(crate::client::sse::process_sse(
tokio::spawn(crate::client::sse::process_sse_wire(
stream,
tx_event,
provider.stream_idle_timeout(),
otel_event_manager,
crate::decode::responses::ResponsesSseDecoder,
crate::decode_wire::responses::WireResponsesSseDecoder,
));
Ok(ResponseStream { rx_event })
Ok(crate::stream::EventStream::from_receiver(rx_event))
}

View File

@@ -1,10 +1,7 @@
use std::sync::Arc;
use codex_protocol::protocol::SessionSource;
use crate::auth::AuthContext;
use crate::auth::AuthProvider;
use crate::common::apply_subagent_header;
use crate::error::Result;
use codex_provider_config::ModelProviderInfo;
@@ -13,7 +10,6 @@ pub async fn build_request(
http_client: &reqwest::Client,
provider: &ModelProviderInfo,
auth: &Option<AuthContext>,
session_source: Option<&SessionSource>,
extra_headers: &[(&str, String)],
) -> Result<reqwest::RequestBuilder> {
let mut builder = provider
@@ -36,7 +32,6 @@ pub async fn build_request(
} => instructions.clone(),
},
})?;
builder = apply_subagent_header(builder, session_source);
for (name, value) in extra_headers {
builder = builder.header(*name, value);
}

View File

@@ -3,21 +3,22 @@ use codex_otel::otel_event_manager::OtelEventManager;
use tokio::sync::mpsc;
use crate::error::Result;
use crate::stream::ResponseEvent;
use crate::stream::WireEvent;
pub mod fixtures;
pub mod http;
pub mod rate_limits;
pub mod sse;
/// Decodes framed SSE JSON into ResponseEvent(s).
/// Implementations may keep state across frames (e.g., Chat function-call state).
// Legacy ResponseEvent-based decoder removed
/// Decodes framed SSE JSON into WireEvent(s).
#[async_trait]
pub trait ResponseDecoder {
pub trait WireResponseDecoder {
async fn on_frame(
&mut self,
json: &str,
tx: &mpsc::Sender<Result<ResponseEvent>>,
tx: &mpsc::Sender<Result<WireEvent>>,
otel: &OtelEventManager,
) -> Result<()>;
}

View File

@@ -7,22 +7,24 @@ use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::client::ResponseDecoder;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
// Legacy ResponseEvent-based SSE framer removed
use crate::stream::WireEvent;
/// Generic SSE framer: turns a Byte stream into framed JSON and delegates to a ResponseDecoder.
// Legacy ResponseEvent-based SSE framer removed
/// Generic SSE framer for wire events: Byte stream -> framed JSON -> WireResponseDecoder.
#[allow(clippy::too_many_arguments)]
pub async fn process_sse<S, D>(
pub async fn process_sse_wire<S, D>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
tx_event: mpsc::Sender<Result<WireEvent>>,
max_idle_duration: Duration,
otel_event_manager: OtelEventManager,
mut decoder: D,
) where
S: Stream<Item = Result<Bytes>> + Send + 'static + Unpin,
D: ResponseDecoder + Send,
D: crate::client::WireResponseDecoder + Send,
{
let mut stream = stream;
let mut data_buffer = String::new();
@@ -60,20 +62,18 @@ pub async fn process_sse<S, D>(
if let Some(tail) = line.strip_prefix("data:") {
data_buffer.push_str(tail.trim_start());
} else if !line.is_empty() && !data_buffer.is_empty() {
// Continuation of a long data: line split across chunks; append raw.
data_buffer.push_str(line);
}
if line.is_empty() && !data_buffer.is_empty() {
// One full JSON frame ready delegate to decoder
if let Err(err) = decoder
.on_frame(&data_buffer, &tx_event, &otel_event_manager)
let json = std::mem::take(&mut data_buffer);
if let Err(e) = decoder
.on_frame(&json, &tx_event, &otel_event_manager)
.await
{
let _ = tx_event.send(Err(err)).await;
let _ = tx_event.send(Err(e)).await;
return;
}
data_buffer.clear();
}
}
}

View File

@@ -1,22 +0,0 @@
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
/// Apply the `x-openai-subagent` header when the session source indicates a
/// subagent. Returns the original builder unchanged when not applicable.
pub(crate) fn apply_subagent_header(
mut builder: reqwest::RequestBuilder,
session_source: Option<&SessionSource>,
) -> reqwest::RequestBuilder {
if let Some(SessionSource::SubAgent(sub)) = session_source {
let subagent = if let SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
builder = builder.header("x-openai-subagent", subagent);
}
builder
}

View File

@@ -1,490 +0,0 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::trace;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
#[derive(Debug, Deserialize)]
pub struct StreamResponseCompleted {
pub id: String,
pub usage: Option<TokenUsagePartial>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
pub error: ErrorBody,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ErrorBody {
pub r#type: Option<String>,
pub code: Option<String>,
pub message: Option<String>,
pub plan_type: Option<String>,
pub resets_at: Option<i64>,
}
// legacy helper removed; decoupled error handling in core
#[derive(Debug, Deserialize)]
pub struct StreamEvent {
pub r#type: String,
pub response: Option<Value>,
pub item: Option<Value>,
pub error: Option<Value>,
#[serde(default)]
pub delta: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct TokenUsagePartial {
#[serde(default)]
pub input_tokens: i64,
#[serde(default)]
pub cached_input_tokens: i64,
#[serde(default)]
pub input_tokens_details: Option<TokenUsageInputDetails>,
#[serde(default)]
pub output_tokens: i64,
#[serde(default)]
pub output_tokens_details: Option<TokenUsageOutputDetails>,
#[serde(default)]
pub reasoning_output_tokens: i64,
#[serde(default)]
pub total_tokens: i64,
}
impl From<TokenUsagePartial> for TokenUsage {
fn from(value: TokenUsagePartial) -> Self {
let cached_input_tokens = if value.cached_input_tokens > 0 {
Some(value.cached_input_tokens)
} else {
value
.input_tokens_details
.and_then(|d| d.cached_tokens)
.filter(|v| *v > 0)
};
let reasoning_output_tokens = if value.reasoning_output_tokens > 0 {
Some(value.reasoning_output_tokens)
} else {
value
.output_tokens_details
.and_then(|d| d.reasoning_tokens)
.filter(|v| *v > 0)
};
Self {
input_tokens: value.input_tokens,
cached_input_tokens: cached_input_tokens.unwrap_or(0),
output_tokens: value.output_tokens,
reasoning_output_tokens: reasoning_output_tokens.unwrap_or(0),
total_tokens: value.total_tokens,
}
}
}
#[derive(Debug, Deserialize)]
pub struct TokenUsageInputDetails {
#[serde(default)]
pub cached_tokens: Option<i64>,
}
#[derive(Debug, Deserialize)]
pub struct TokenUsageOutputDetails {
#[serde(default)]
pub reasoning_tokens: Option<i64>,
}
pub async fn handle_sse_payload(
payload: sse::Payload,
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
otel_event_manager: &OtelEventManager,
) -> Result<()> {
if let Some(responses) = payload.responses {
for ev in responses {
let event = match ev {
sse::Response::Completed(complete) => {
if let Some(usage) = &complete.usage {
otel_event_manager.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
} else {
otel_event_manager
.see_event_completed_failed(&"missing token usage".to_string());
}
ResponseEvent::Completed {
response_id: complete.id,
token_usage: complete.usage,
}
}
sse::Response::Error(err) => {
let retry_after = err
.retry_after
.map(|secs| Duration::from_secs(if secs < 0 { 0 } else { secs as u64 }));
return Err(Error::Stream(
err.message.unwrap_or_else(|| "fatal error".to_string()),
retry_after,
));
}
};
tx_event.send(Ok(event)).await.ok();
}
}
if let Some(message_delta) = payload.response_message_delta {
let ev = ResponseEvent::OutputTextDelta(message_delta.text.clone());
tx_event.send(Ok(ev)).await.ok();
}
if let Some(_response_content) = payload.response_content {
// Not used currently
}
if let Some(ev) = payload.response_event {
debug!("Unhandled response_event: {ev:?}");
}
if let Some(item) = payload.response_output_item {
match item.r#type {
sse::OutputItem::Created => {
tx_event.send(Ok(ResponseEvent::Created)).await.ok();
otel_event_manager.sse_event_kind("response.output_item.done");
}
}
}
if let Some(done) = payload.response_output_text_delta {
tx_event
.send(Ok(ResponseEvent::OutputTextDelta(done.text)))
.await
.ok();
}
if let Some(completed) = payload.response_output_item_done {
let response_item =
serde_json::from_value::<ResponseItem>(completed.item).map_err(Error::Json)?;
tx_event
.send(Ok(ResponseEvent::OutputItemDone(response_item)))
.await
.ok();
otel_event_manager.sse_event_kind("response.output_item.done");
}
if let Some(reasoning_content_delta) = payload.response_output_reasoning_delta {
tx_event
.send(Ok(ResponseEvent::ReasoningContentDelta(
reasoning_content_delta.text,
)))
.await
.ok();
}
if let Some(reasoning_summary_delta) = payload.response_output_reasoning_summary_delta {
tx_event
.send(Ok(ResponseEvent::ReasoningSummaryDelta(
reasoning_summary_delta.text,
)))
.await
.ok();
}
if let Some(ev) = payload.response_error
&& ev.code.as_deref() == Some("max_response_tokens")
{
let _ = tx_event
.send(Err(Error::Stream(
"context window exceeded".to_string(),
None,
)))
.await;
}
Ok(())
}
#[derive(Debug, Deserialize)]
pub struct TextDelta {
pub delta: String,
}
pub async fn handle_stream_event(
event: StreamEvent,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
otel_event_manager: &OtelEventManager,
) {
trace!("response event: {}", event.r#type);
match event.r#type.as_str() {
"response.created" => {
let _ = tx_event.send(Ok(ResponseEvent::Created)).await;
}
"response.output_text.delta" => {
if let Some(item_val) = event.item {
let resp = serde_json::from_value::<TextDelta>(item_val);
if let Ok(delta) = resp {
let event = ResponseEvent::OutputTextDelta(delta.delta);
let _ = tx_event.send(Ok(event)).await;
}
} else if let Some(delta) = event.delta {
let _ = tx_event
.send(Ok(ResponseEvent::OutputTextDelta(delta)))
.await;
}
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningContentDelta(delta);
let _ = tx_event.send(Ok(event)).await;
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
let _ = tx_event.send(Ok(event)).await;
}
}
"response.output_item.done" => {
if let Some(item_val) = event.item
&& let Ok(item) = serde_json::from_value::<ResponseItem>(item_val)
{
let event = ResponseEvent::OutputItemDone(item);
if tx_event.send(Ok(event)).await.is_err() {}
}
}
"response.failed" => {
if let Some(resp_val) = event.response {
otel_event_manager.sse_event_failed(
Some(&"response.failed".to_string()),
Duration::from_millis(0),
&resp_val,
);
if let Some(err) = resp_val
.get("error")
.cloned()
.and_then(|v| serde_json::from_value::<ErrorBody>(v).ok())
{
let msg = if err.code.as_deref() == Some("context_length_exceeded") {
"context window exceeded".to_string()
} else if err.code.as_deref() == Some("insufficient_quota") {
"quota exceeded".to_string()
} else {
err.message.unwrap_or_else(|| "fatal error".to_string())
};
let _ = tx_event.send(Err(Error::Stream(msg, None))).await;
}
}
}
"response.error" => {
if let Some(err_val) = event.error {
let err_resp = serde_json::from_value::<ErrorResponse>(err_val);
if let Ok(err) = err_resp {
let retry_after = try_parse_retry_after(&err);
let _ = tx_event
.send(Err(Error::Stream(
err.error
.message
.unwrap_or_else(|| "unknown error".to_string()),
retry_after,
)))
.await;
}
}
}
"response.completed" => {
if let Some(resp_val) = event.response
&& let Ok(resp) = serde_json::from_value::<StreamResponseCompleted>(resp_val)
{
let usage = resp.usage.map(TokenUsage::from);
let ev = ResponseEvent::Completed {
response_id: resp.id,
token_usage: usage.clone(),
};
let _ = tx_event.send(Ok(ev)).await;
if let Some(usage) = &usage {
otel_event_manager.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
} else {
otel_event_manager
.see_event_completed_failed(&"missing token usage".to_string());
}
}
}
"response.output_item.added" => {
if let Some(item_val) = event.item
&& let Ok(item) = serde_json::from_value::<ResponseItem>(item_val)
{
let event = ResponseEvent::OutputItemAdded(item);
if tx_event.send(Ok(event)).await.is_err() {}
}
}
"response.reasoning_summary_part.added" => {
let event = ResponseEvent::ReasoningSummaryPartAdded;
let _ = tx_event.send(Ok(event)).await;
}
_ => {}
}
}
#[derive(Debug, Deserialize)]
pub struct ResponseErrorBody {
pub code: Option<String>,
}
fn try_parse_retry_after(err: &ErrorResponse) -> Option<Duration> {
if err.error.r#type.as_deref() == Some("rate_limit_exceeded") {
let retry_after = serde_json::to_value(&err.error)
.ok()
.and_then(|v| v.get("retry_after").cloned())
.and_then(|v| serde_json::from_value::<ResponseErrorBody>(v).ok())
.and_then(|v| v.code)
.and_then(parse_retry_after);
return retry_after;
}
None
}
fn parse_retry_after(s: String) -> Option<Duration> {
let minutes_pattern = regex_lite::Regex::new(r"^(\d+)m$").ok()?;
if let Some(cap) = minutes_pattern.captures(&s)
&& let Some(m) = cap.get(1).and_then(|m| m.as_str().parse::<u64>().ok())
{
return Some(Duration::from_secs(m * 60));
}
s.parse::<u64>().ok().map(Duration::from_secs)
}
pub mod sse {
use serde::Deserialize;
use serde_json::Value;
#[derive(Debug, Deserialize)]
pub struct Payload {
pub responses: Option<Vec<Response>>,
pub response_content: Option<Value>,
pub response_error: Option<ResponseError>,
pub response_event: Option<String>,
pub response_message_delta: Option<ResponseMessageDelta>,
pub response_output_item: Option<ResponseOutputItem>,
pub response_output_text_delta: Option<ResponseOutputTextDelta>,
pub response_output_item_done: Option<ResponseOutputItemDone>,
pub response_output_reasoning_delta: Option<ResponseOutputReasoningDelta>,
pub response_output_reasoning_summary_delta: Option<ResponseOutputReasoningSummaryDelta>,
}
#[derive(Debug, Deserialize)]
pub enum Response {
#[serde(rename = "response.completed")]
Completed(ResponseCompleted),
#[serde(rename = "response.error")]
Error(ResponseError),
}
#[derive(Debug, Deserialize)]
pub struct ResponseCompleted {
pub id: String,
pub usage: Option<codex_protocol::protocol::TokenUsage>,
}
#[derive(Debug, Deserialize)]
pub struct ResponseError {
pub code: Option<String>,
pub message: Option<String>,
pub retry_after: Option<i64>,
}
#[derive(Debug, Deserialize)]
pub struct ResponseMessageDelta {
pub text: String,
}
#[derive(Debug, Deserialize)]
pub enum OutputItem {
#[serde(rename = "response.output_item.created")]
Created,
}
#[derive(Debug, Deserialize)]
pub struct ResponseOutputItem {
pub r#type: OutputItem,
}
#[derive(Debug, Deserialize)]
pub struct ResponseOutputTextDelta {
pub text: String,
}
#[derive(Debug, Deserialize)]
pub struct ResponseOutputItemDone {
pub item: Value,
}
#[derive(Debug, Deserialize)]
pub struct ResponseOutputReasoningDelta {
pub text: String,
}
#[derive(Debug, Deserialize)]
pub struct ResponseOutputReasoningSummaryDelta {
pub text: String,
}
}
pub struct ResponsesSseDecoder;
impl Default for ResponsesSseDecoder {
fn default() -> Self {
Self
}
}
#[async_trait]
impl crate::client::ResponseDecoder for ResponsesSseDecoder {
async fn on_frame(
&mut self,
json: &str,
tx: &mpsc::Sender<Result<ResponseEvent>>,
otel_event_manager: &OtelEventManager,
) -> Result<()> {
if let Ok(event) = serde_json::from_str::<StreamEvent>(json) {
otel_event_manager.sse_event_kind(&event.r#type);
handle_stream_event(event, tx.clone(), otel_event_manager).await;
return Ok(());
}
otel_event_manager.sse_event_failed(
None,
Duration::from_millis(0),
&format!("Cannot parse SSE JSON: {json}"),
);
match serde_json::from_str::<sse::Payload>(json) {
Ok(payload) => handle_sse_payload(payload, tx, otel_event_manager).await,
Err(err) => {
otel_event_manager.sse_event_failed(
None,
Duration::from_millis(0),
&format!("Cannot parse SSE JSON: {err}"),
);
Err(Error::Other(format!("Cannot parse SSE JSON: {err}")))
}
}
}
}

View File

@@ -1,48 +1,43 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::debug;
use crate::client::WireResponseDecoder;
use crate::error::Result;
use crate::stream::ResponseEvent;
pub struct ChatSseDecoder {
fn_call_state: FunctionCallState,
assistant_item: Option<ResponseItem>,
reasoning_item: Option<ResponseItem>,
}
use crate::stream::WireEvent;
#[derive(Default)]
struct FunctionCallState {
active: bool,
call_id: Option<String>,
name: Option<String>,
arguments: String,
call_id: Option<String>,
active: bool,
}
impl ChatSseDecoder {
pub struct WireChatSseDecoder {
fn_call_state: FunctionCallState,
}
impl WireChatSseDecoder {
pub fn new() -> Self {
Self {
fn_call_state: FunctionCallState::default(),
assistant_item: None,
reasoning_item: None,
}
}
}
#[async_trait]
impl crate::client::ResponseDecoder for ChatSseDecoder {
impl WireResponseDecoder for WireChatSseDecoder {
async fn on_frame(
&mut self,
json: &str,
tx: &mpsc::Sender<Result<ResponseEvent>>,
tx: &mpsc::Sender<crate::error::Result<WireEvent>>,
_otel: &OtelEventManager,
) -> Result<()> {
// Chat sends a terminal "[DONE]" frame; we ignore it here. Caller should handle end-of-stream.
let Ok(parsed_chunk) = serde_json::from_str::<serde_json::Value>(json) else {
// Chat sends a terminal "[DONE]" frame; ignore it.
let Ok(parsed_chunk) = serde_json::from_str::<Value>(json) else {
debug!("failed to parse Chat SSE JSON: {}", json);
return Ok(());
};
@@ -58,10 +53,8 @@ impl crate::client::ResponseDecoder for ChatSseDecoder {
if let Some(content) = delta.get("content").and_then(|c| c.as_array()) {
for piece in content {
if let Some(text) = piece.get("text").and_then(|t| t.as_str()) {
append_assistant_text(tx, &mut self.assistant_item, text.to_string())
.await;
let _ = tx
.send(Ok(ResponseEvent::OutputTextDelta(text.to_string())))
.send(Ok(WireEvent::OutputTextDelta(text.to_string())))
.await;
}
}
@@ -87,7 +80,8 @@ impl crate::client::ResponseDecoder for ChatSseDecoder {
if let Some(reasoning) = delta.get("reasoning_content").and_then(|c| c.as_array()) {
for entry in reasoning {
if let Some(text) = entry.get("text").and_then(|t| t.as_str()) {
append_reasoning_text(tx, &mut self.reasoning_item, text.to_string())
let _ = tx
.send(Ok(WireEvent::ReasoningContentDelta(text.to_string())))
.await;
}
}
@@ -103,65 +97,17 @@ impl crate::client::ResponseDecoder for ChatSseDecoder {
let arguments = self.fn_call_state.arguments.clone();
self.fn_call_state = FunctionCallState::default();
let item = ResponseItem::FunctionCall {
id: Some(call_id.clone()),
call_id,
name: function_name,
arguments,
};
let _ = tx.send(Ok(ResponseEvent::OutputItemDone(item))).await;
let item = serde_json::json!({
"type": "function_call",
"id": call_id,
"call_id": call_id,
"name": function_name,
"arguments": arguments,
});
let _ = tx.send(Ok(WireEvent::OutputItemDone(item))).await;
}
}
Ok(())
}
}
async fn append_assistant_text(
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
assistant_item: &mut Option<ResponseItem>,
text: String,
) {
if assistant_item.is_none() {
let item = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![],
};
*assistant_item = Some(item.clone());
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemAdded(item)))
.await;
}
if let Some(ResponseItem::Message { content, .. }) = assistant_item {
content.push(ContentItem::OutputText { text });
}
}
async fn append_reasoning_text(
tx_event: &mpsc::Sender<Result<ResponseEvent>>,
reasoning_item: &mut Option<ResponseItem>,
text: String,
) {
if reasoning_item.is_none() {
let item = ResponseItem::Reasoning {
id: String::new(),
summary: Vec::new(),
content: Some(vec![]),
encrypted_content: None,
};
*reasoning_item = Some(item.clone());
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemAdded(item)))
.await;
}
if let Some(ResponseItem::Reasoning {
content: Some(content),
..
}) = reasoning_item
{
content.push(ReasoningItemContent::ReasoningText { text });
}
}

View File

@@ -0,0 +1,2 @@
pub mod chat;
pub mod responses;

View File

@@ -0,0 +1,166 @@
use async_trait::async_trait;
use codex_otel::otel_event_manager::OtelEventManager;
use serde::Deserialize;
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::debug;
use crate::client::WireResponseDecoder;
use crate::error::Error;
use crate::error::Result;
use crate::stream::WireEvent;
use crate::stream::WireTokenUsage;
#[derive(Debug, Deserialize)]
struct StreamEvent {
#[serde(rename = "type")]
event_type: String,
#[serde(default)]
response: Option<Value>,
#[serde(default)]
item: Option<Value>,
#[serde(default)]
error: Option<Value>,
#[serde(default)]
delta: Option<String>,
}
pub struct WireResponsesSseDecoder;
#[async_trait]
impl WireResponseDecoder for WireResponsesSseDecoder {
async fn on_frame(
&mut self,
json: &str,
tx: &mpsc::Sender<Result<WireEvent>>,
otel: &OtelEventManager,
) -> Result<()> {
let Ok(event) = serde_json::from_str::<StreamEvent>(json) else {
debug!("failed to parse Responses SSE JSON: {}", json);
return Ok(());
};
match event.event_type.as_str() {
"response.created" => {
let _ = tx.send(Ok(WireEvent::Created)).await;
}
"response.output_text.delta" => {
if let Some(delta) = event.delta.or_else(|| {
event.item.and_then(|v| {
v.get("delta")
.and_then(|d| d.as_str().map(|s| s.to_string()))
})
}) {
let _ = tx.send(Ok(WireEvent::OutputTextDelta(delta))).await;
}
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
let _ = tx.send(Ok(WireEvent::ReasoningContentDelta(delta))).await;
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let _ = tx.send(Ok(WireEvent::ReasoningSummaryDelta(delta))).await;
}
}
"response.output_item.done" => {
if let Some(item_val) = event.item {
let _ = tx.send(Ok(WireEvent::OutputItemDone(item_val))).await;
}
}
"response.output_item.added" => {
if let Some(item_val) = event.item {
let _ = tx.send(Ok(WireEvent::OutputItemAdded(item_val))).await;
}
}
"response.reasoning_summary_part.added" => {
let _ = tx.send(Ok(WireEvent::ReasoningSummaryPartAdded)).await;
}
"response.completed" => {
if let Some(resp) = event.response {
let response_id = resp
.get("id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let usage = parse_wire_usage(&resp);
if let Some(u) = &usage {
otel.sse_event_completed(
u.input_tokens,
u.output_tokens,
Some(u.cached_input_tokens),
Some(u.reasoning_output_tokens),
u.total_tokens,
);
} else {
otel.see_event_completed_failed(&"missing token usage".to_string());
}
let _ = tx
.send(Ok(WireEvent::Completed {
response_id,
token_usage: usage,
}))
.await;
}
}
"response.error" | "response.failed" => {
let message = event
.error
.as_ref()
.and_then(|v| v.get("message"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown error".to_string());
let _ = tx.send(Err(Error::Stream(message, None))).await;
}
_ => {}
}
Ok(())
}
}
fn parse_wire_usage(resp: &Value) -> Option<WireTokenUsage> {
let usage = resp.get("usage").cloned()?;
let input_tokens = usage
.get("input_tokens")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let cached_input_tokens = usage
.get("cached_input_tokens")
.and_then(|v| v.as_i64())
.or_else(|| {
usage
.get("input_tokens_details")
.and_then(|d| d.get("cached_tokens"))
.and_then(|v| v.as_i64())
})
.unwrap_or(0);
let output_tokens = usage
.get("output_tokens")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let reasoning_output_tokens = usage
.get("reasoning_output_tokens")
.and_then(|v| v.as_i64())
.or_else(|| {
usage
.get("output_tokens_details")
.and_then(|d| d.get("reasoning_tokens"))
.and_then(|v| v.as_i64())
})
.unwrap_or(0);
let total_tokens = usage
.get("total_tokens")
.and_then(|v| v.as_i64())
.unwrap_or(0);
Some(WireTokenUsage {
input_tokens,
cached_input_tokens,
output_tokens,
reasoning_output_tokens,
total_tokens,
})
}

View File

@@ -1,21 +1,18 @@
pub mod api;
pub mod auth;
pub mod chat;
mod client;
mod common;
mod decode;
// Legacy payload decoding has been removed; wire decoding lives in decode_wire
mod decode_wire;
pub mod error;
// payload building lives in codex-core now
pub mod responses;
pub mod routed_client;
pub mod stream;
mod wire;
pub use crate::auth::AuthContext;
pub use crate::auth::AuthProvider;
pub use crate::chat::ChatCompletionsApiClient;
pub use crate::chat::ChatCompletionsApiClientConfig;
pub use crate::client::fixtures::stream_from_fixture;
pub use crate::error::Error;
pub use crate::error::Result;
pub use crate::responses::ResponsesApiClient;

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use async_trait::async_trait;
use codex_app_server_protocol::AuthMode;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
@@ -10,12 +9,9 @@ use tokio::sync::mpsc;
use tracing::debug;
use tracing::trace;
use crate::api::PayloadClient;
use crate::auth::AuthProvider;
use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
use crate::stream::ResponseStream;
use crate::stream::WireResponseStream;
use codex_provider_config::ModelProviderInfo;
@@ -43,19 +39,18 @@ pub struct ResponsesApiClient {
config: ResponsesApiClientConfig,
}
#[async_trait]
impl PayloadClient for ResponsesApiClient {
type Config = ResponsesApiClientConfig;
fn new(config: Self::Config) -> Result<Self> {
impl ResponsesApiClient {
pub fn new(config: ResponsesApiClientConfig) -> Result<Self> {
Ok(Self { config })
}
}
async fn stream_payload(
impl ResponsesApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &Value,
session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<ResponseStream> {
_session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
if self.config.provider.wire_api != codex_provider_config::WireApi::Responses {
return Err(Error::UnsupportedOperation(
"ResponsesApiClient requires a Responses provider".to_string(),
@@ -98,7 +93,6 @@ impl PayloadClient for ResponsesApiClient {
&self.config.http_client,
&self.config.provider,
&auth,
session_source,
&extra_headers,
)
.await?;
@@ -124,10 +118,27 @@ impl PayloadClient for ResponsesApiClient {
request_id: None,
})?;
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let (tx_event, rx_event) = mpsc::channel::<Result<crate::stream::WireEvent>>(1600);
if let Some(snapshot) = crate::client::rate_limits::parse_rate_limit_snapshot(res.headers())
&& tx_event
.send(Ok(ResponseEvent::RateLimits(snapshot)))
.send(Ok(crate::stream::WireEvent::RateLimits(
crate::stream::WireRateLimitSnapshot {
primary: snapshot
.primary
.map(|w| crate::stream::WireRateLimitWindow {
used_percent: Some(w.used_percent),
window_minutes: w.window_minutes,
resets_at: w.resets_at,
}),
secondary: snapshot
.secondary
.map(|w| crate::stream::WireRateLimitWindow {
used_percent: Some(w.used_percent),
window_minutes: w.window_minutes,
resets_at: w.resets_at,
}),
},
)))
.await
.is_err()
{
@@ -142,36 +153,14 @@ impl PayloadClient for ResponsesApiClient {
});
let idle_timeout = self.config.provider.stream_idle_timeout();
let otel = self.config.otel_event_manager.clone();
tokio::spawn(crate::client::sse::process_sse(
tokio::spawn(crate::client::sse::process_sse_wire(
stream,
tx_event,
idle_timeout,
otel,
crate::decode::responses::ResponsesSseDecoder,
crate::decode_wire::responses::WireResponsesSseDecoder,
));
Ok(crate::stream::EventStream::from_receiver(rx_event))
}
}
impl ResponsesApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &Value,
session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
use futures::StreamExt;
let legacy = self.stream_payload(payload_json, session_source).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1600);
tokio::spawn(async move {
futures::pin_mut!(legacy);
while let Some(item) = legacy.next().await {
let converted = item.and_then(|ev| crate::wire::map_response_event_to_wire(ev));
if tx.send(converted).await.is_err() {
break;
}
}
});
Ok(crate::stream::EventStream::from_receiver(rx))
}
}

View File

@@ -3,20 +3,15 @@ use std::sync::Arc;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionSource;
use crate::ChatCompletionsApiClient;
use crate::ChatCompletionsApiClientConfig;
use crate::ResponseStream;
use crate::ResponsesApiClient;
use crate::ResponsesApiClientConfig;
use crate::Result;
use crate::WireApi;
use crate::WireEvent;
use crate::WireResponseStream;
use crate::api::PayloadClient;
use crate::auth::AuthProvider;
use crate::client::fixtures::stream_from_fixture;
use codex_provider_config::ModelProviderInfo;
/// Dispatches to the appropriate API client implementation based on the provider wire API.
@@ -28,8 +23,8 @@ pub struct RoutedApiClientConfig {
pub conversation_id: ConversationId,
pub auth_provider: Option<Arc<dyn AuthProvider>>,
pub otel_event_manager: OtelEventManager,
pub session_source: SessionSource,
pub responses_fixture_path: Option<PathBuf>,
pub extra_headers: Vec<(String, String)>,
}
#[derive(Clone)]
@@ -42,18 +37,12 @@ impl RoutedApiClient {
Self { config }
}
pub async fn stream_payload(&self, payload_json: &serde_json::Value) -> Result<ResponseStream> {
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
) -> Result<WireResponseStream> {
match self.config.provider.wire_api {
WireApi::Responses => {
if let Some(path) = &self.config.responses_fixture_path {
return stream_from_fixture(
path,
self.config.provider.clone(),
self.config.otel_event_manager.clone(),
)
.await;
}
let cfg = ResponsesApiClientConfig {
http_client: self.config.http_client.clone(),
provider: self.config.provider.clone(),
@@ -61,12 +50,18 @@ impl RoutedApiClient {
conversation_id: self.config.conversation_id,
auth_provider: self.config.auth_provider.clone(),
otel_event_manager: self.config.otel_event_manager.clone(),
extra_headers: vec![],
extra_headers: self.config.extra_headers.clone(),
};
let client = <ResponsesApiClient as crate::api::PayloadClient>::new(cfg)?;
client
.stream_payload(payload_json, Some(&self.config.session_source))
.await
if let Some(path) = &self.config.responses_fixture_path {
return crate::client::fixtures::stream_from_fixture_wire(
path,
self.config.provider.clone(),
self.config.otel_event_manager.clone(),
)
.await;
}
let client = ResponsesApiClient::new(cfg)?;
client.stream_payload_wire(payload_json, None).await
}
WireApi::Chat => {
let cfg = ChatCompletionsApiClientConfig {
@@ -74,54 +69,11 @@ impl RoutedApiClient {
provider: self.config.provider.clone(),
model: self.config.model.clone(),
otel_event_manager: self.config.otel_event_manager.clone(),
session_source: self.config.session_source.clone(),
extra_headers: vec![],
extra_headers: self.config.extra_headers.clone(),
};
let client = <ChatCompletionsApiClient as crate::api::PayloadClient>::new(cfg)?;
client
.stream_payload(payload_json, Some(&self.config.session_source))
.await
let client = ChatCompletionsApiClient::new(cfg)?;
client.stream_payload_wire(payload_json, None).await
}
}
}
pub async fn stream_payload_wire(
&self,
payload_json: &serde_json::Value,
) -> Result<WireResponseStream> {
use futures::StreamExt;
let legacy = self.stream_payload(payload_json).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1600);
tokio::spawn(async move {
futures::pin_mut!(legacy);
while let Some(item) = legacy.next().await {
let converted = item.and_then(|ev| map_response_event_to_wire(ev));
if tx.send(converted).await.is_err() {
break;
}
}
});
Ok(crate::stream::EventStream::from_receiver(rx))
}
}
#[async_trait::async_trait]
impl PayloadClient for RoutedApiClient {
type Config = RoutedApiClientConfig;
fn new(config: Self::Config) -> Result<Self> {
Ok(Self::new(config))
}
async fn stream_payload(
&self,
payload_json: &serde_json::Value,
_session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<ResponseStream> {
self.stream_payload(payload_json).await
}
}
fn map_response_event_to_wire(ev: crate::stream::ResponseEvent) -> Result<WireEvent> {
crate::wire::map_response_event_to_wire(ev)
}

View File

@@ -1,55 +0,0 @@
use crate::error::Result;
use crate::stream::WireEvent;
use crate::stream::WireRateLimitSnapshot;
use crate::stream::WireRateLimitWindow;
pub fn map_response_event_to_wire(ev: crate::stream::ResponseEvent) -> Result<WireEvent> {
Ok(match ev {
crate::stream::ResponseEvent::Created => WireEvent::Created,
crate::stream::ResponseEvent::OutputItemDone(item) => {
WireEvent::OutputItemDone(serde_json::to_value(item).unwrap_or(serde_json::Value::Null))
}
crate::stream::ResponseEvent::OutputItemAdded(item) => WireEvent::OutputItemAdded(
serde_json::to_value(item).unwrap_or(serde_json::Value::Null),
),
crate::stream::ResponseEvent::Completed {
response_id,
token_usage,
} => {
let mapped = token_usage.map(|u| crate::stream::WireTokenUsage {
input_tokens: u.input_tokens,
cached_input_tokens: u.cached_input_tokens,
output_tokens: u.output_tokens,
reasoning_output_tokens: u.reasoning_output_tokens,
total_tokens: u.total_tokens,
});
WireEvent::Completed {
response_id,
token_usage: mapped,
}
}
crate::stream::ResponseEvent::OutputTextDelta(s) => WireEvent::OutputTextDelta(s),
crate::stream::ResponseEvent::ReasoningSummaryDelta(s) => {
WireEvent::ReasoningSummaryDelta(s)
}
crate::stream::ResponseEvent::ReasoningContentDelta(s) => {
WireEvent::ReasoningContentDelta(s)
}
crate::stream::ResponseEvent::ReasoningSummaryPartAdded => {
WireEvent::ReasoningSummaryPartAdded
}
crate::stream::ResponseEvent::RateLimits(s) => {
let to_win = |w: Option<codex_protocol::protocol::RateLimitWindow>| -> Option<WireRateLimitWindow> {
w.map(|w| WireRateLimitWindow {
used_percent: Some(w.used_percent),
window_minutes: w.window_minutes,
resets_at: w.resets_at,
})
};
WireEvent::RateLimits(WireRateLimitSnapshot {
primary: to_win(s.primary),
secondary: to_win(s.secondary),
})
}
})
}