diff --git a/codex-rs/api-client/src/aggregate.rs b/codex-rs/api-client/src/aggregate.rs index cee283dd2e..e22fd47d92 100644 --- a/codex-rs/api-client/src/aggregate.rs +++ b/codex-rs/api-client/src/aggregate.rs @@ -10,12 +10,6 @@ use futures::Stream; use crate::error::Result; use crate::stream::ResponseEvent; -#[derive(Clone, Copy, Debug)] -pub enum ChatAggregationMode { - AggregatedOnly, - Streaming, -} - pub trait AggregateStreamExt: Stream> + Sized { fn aggregate(self) -> AggregatedChatStream where diff --git a/codex-rs/api-client/src/chat.rs b/codex-rs/api-client/src/chat.rs index b70bda04f8..36bf7adfd1 100644 --- a/codex-rs/api-client/src/chat.rs +++ b/codex-rs/api-client/src/chat.rs @@ -6,7 +6,6 @@ use codex_protocol::protocol::SessionSource; use futures::TryStreamExt; use tokio::sync::mpsc; -use crate::aggregate::ChatAggregationMode; use crate::api::ApiClient; use crate::client::PayloadBuilder; use crate::common::backoff; @@ -25,14 +24,12 @@ use crate::stream::ResponseStream; /// - `model`: Model identifier to use. /// - `otel_event_manager`: Telemetry event manager for request/stream instrumentation. /// - `session_source`: Session metadata, used to set subagent headers when applicable. -/// - `aggregation_mode`: How to emit streaming output (raw deltas or aggregated). pub struct ChatCompletionsApiClientConfig { pub http_client: reqwest::Client, pub provider: ModelProviderInfo, pub model: String, pub otel_event_manager: OtelEventManager, pub session_source: SessionSource, - pub aggregation_mode: ChatAggregationMode, } #[derive(Clone)] @@ -91,14 +88,12 @@ impl ApiClient for ChatCompletionsApiClient { }); let idle_timeout = self.config.provider.stream_idle_timeout(); let otel = self.config.otel_event_manager.clone(); - let mode = self.config.aggregation_mode; - tokio::spawn(crate::client::sse::process_sse( stream, tx_event.clone(), idle_timeout, otel, - crate::decode::chat::ChatSseDecoder::new(mode), + crate::decode::chat::ChatSseDecoder::new(), )); return Ok(ResponseStream { rx_event }); diff --git a/codex-rs/api-client/src/client/http.rs b/codex-rs/api-client/src/client/http.rs index ffab9c56c1..e79fc4df1b 100644 --- a/codex-rs/api-client/src/client/http.rs +++ b/codex-rs/api-client/src/client/http.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use codex_protocol::protocol::SessionSource; -use reqwest::header::HeaderMap; use crate::auth::AuthContext; use crate::auth::AuthProvider; @@ -33,11 +32,3 @@ pub async fn resolve_auth(auth_provider: &Option>) -> Opti None } } - -/// Extract a provider request id, when present, from headers. -pub fn request_id_from_headers(headers: &HeaderMap) -> Option { - headers - .get("cf-ray") - .and_then(|v| v.to_str().ok()) - .map(std::string::ToString::to_string) -} diff --git a/codex-rs/api-client/src/client/mod.rs b/codex-rs/api-client/src/client/mod.rs index 2037550b33..80ac1273dd 100644 --- a/codex-rs/api-client/src/client/mod.rs +++ b/codex-rs/api-client/src/client/mod.rs @@ -27,13 +27,3 @@ pub trait ResponseDecoder { otel: &OtelEventManager, ) -> Result<()>; } - -/// Optional trait to expose rate limit parsing where needed. -pub trait RateLimitProvider { - fn parse( - &self, - _headers: &reqwest::header::HeaderMap, - ) -> Option { - None - } -} diff --git a/codex-rs/api-client/src/decode/chat.rs b/codex-rs/api-client/src/decode/chat.rs index 1790d0bb80..43e83c1da9 100644 --- a/codex-rs/api-client/src/decode/chat.rs +++ b/codex-rs/api-client/src/decode/chat.rs @@ -6,12 +6,10 @@ use codex_protocol::models::ResponseItem; use tokio::sync::mpsc; use tracing::debug; -use crate::aggregate::ChatAggregationMode; use crate::error::Result; use crate::stream::ResponseEvent; pub struct ChatSseDecoder { - aggregation_mode: ChatAggregationMode, fn_call_state: FunctionCallState, assistant_item: Option, reasoning_item: Option, @@ -26,9 +24,8 @@ struct FunctionCallState { } impl ChatSseDecoder { - pub fn new(aggregation_mode: ChatAggregationMode) -> Self { + pub fn new() -> Self { Self { - aggregation_mode, fn_call_state: FunctionCallState::default(), assistant_item: None, reasoning_item: None, @@ -63,11 +60,9 @@ impl crate::client::ResponseDecoder for ChatSseDecoder { if let Some(text) = piece.get("text").and_then(|t| t.as_str()) { append_assistant_text(tx, &mut self.assistant_item, text.to_string()) .await; - if matches!(self.aggregation_mode, ChatAggregationMode::Streaming) { - let _ = tx - .send(Ok(ResponseEvent::OutputTextDelta(text.to_string()))) - .await; - } + let _ = tx + .send(Ok(ResponseEvent::OutputTextDelta(text.to_string()))) + .await; } } } diff --git a/codex-rs/api-client/src/decode/responses.rs b/codex-rs/api-client/src/decode/responses.rs index 3d5fed807e..0e8c07e53b 100644 --- a/codex-rs/api-client/src/decode/responses.rs +++ b/codex-rs/api-client/src/decode/responses.rs @@ -14,12 +14,6 @@ use crate::error::Error; use crate::error::Result; use crate::stream::ResponseEvent; -#[derive(Debug, Deserialize)] -pub struct ResponseCompleted { - pub id: String, - pub usage: Option, -} - #[derive(Debug, Deserialize)] pub struct StreamResponseCompleted { pub id: String, @@ -231,8 +225,6 @@ pub struct TextDelta { pub async fn handle_stream_event( event: StreamEvent, tx_event: mpsc::Sender>, - _response_completed: &mut Option, - _response_error: &mut Option, otel_event_manager: &OtelEventManager, ) { trace!("response event: {}", event.r#type); @@ -475,16 +467,7 @@ impl crate::client::ResponseDecoder for ResponsesSseDecoder { ) -> Result<()> { if let Ok(event) = serde_json::from_str::(json) { otel_event_manager.sse_event_kind(&event.r#type); - let mut completed: Option = None; - let mut error: Option = None; - handle_stream_event( - event, - tx.clone(), - &mut completed, - &mut error, - otel_event_manager, - ) - .await; + handle_stream_event(event, tx.clone(), otel_event_manager).await; return Ok(()); } diff --git a/codex-rs/api-client/src/lib.rs b/codex-rs/api-client/src/lib.rs index 8ed5eae41a..99374b15f2 100644 --- a/codex-rs/api-client/src/lib.rs +++ b/codex-rs/api-client/src/lib.rs @@ -2,19 +2,18 @@ pub mod aggregate; pub mod api; pub mod auth; pub mod chat; -pub mod client; +mod client; mod common; -pub mod decode; +mod decode; pub mod error; pub mod model_provider; -pub mod payload; +mod payload; pub mod prompt; pub mod responses; pub mod routed_client; pub mod stream; pub use crate::aggregate::AggregateStreamExt; -pub use crate::aggregate::ChatAggregationMode; pub use crate::api::ApiClient; pub use crate::auth::AuthContext; pub use crate::auth::AuthProvider; diff --git a/codex-rs/api-client/src/routed_client.rs b/codex-rs/api-client/src/routed_client.rs index 8563bf22c5..5c90094a2f 100644 --- a/codex-rs/api-client/src/routed_client.rs +++ b/codex-rs/api-client/src/routed_client.rs @@ -6,7 +6,6 @@ use codex_protocol::ConversationId; use codex_protocol::protocol::SessionSource; use crate::ApiClient; -use crate::ChatAggregationMode; use crate::ChatCompletionsApiClient; use crate::ChatCompletionsApiClientConfig; use crate::Prompt; @@ -29,7 +28,6 @@ pub struct RoutedApiClientConfig { pub auth_provider: Option>, pub otel_event_manager: OtelEventManager, pub session_source: SessionSource, - pub chat_aggregation_mode: ChatAggregationMode, pub responses_fixture_path: Option, } @@ -79,7 +77,6 @@ impl RoutedApiClient { model: self.config.model.clone(), otel_event_manager: self.config.otel_event_manager.clone(), session_source: self.config.session_source.clone(), - aggregation_mode: self.config.chat_aggregation_mode, }; let client = ChatCompletionsApiClient::new(cfg)?; client.stream(prompt).await diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 3c30eaa464..9472e2f856 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use async_trait::async_trait; use codex_api_client::AuthContext; use codex_api_client::AuthProvider; -use codex_api_client::ChatAggregationMode; use codex_api_client::ModelProviderInfo; use codex_api_client::Result as ApiClientResult; use codex_api_client::RoutedApiClient; @@ -178,7 +177,6 @@ impl ModelClient { } async fn build_api_client(&self) -> ApiClientResult { - let show_reasoning = self.config.show_raw_agent_reasoning; let auth_provider = self.auth_manager.as_ref().map(|manager| { Arc::new(AuthManagerProvider::new(Arc::clone(manager))) as Arc }); @@ -193,11 +191,6 @@ impl ModelClient { auth_provider, otel_event_manager: self.otel_event_manager.clone(), session_source: self.session_source.clone(), - chat_aggregation_mode: if show_reasoning { - ChatAggregationMode::Streaming - } else { - ChatAggregationMode::AggregatedOnly - }, responses_fixture_path, };