This commit is contained in:
jif-oai
2025-11-10 18:17:28 +00:00
parent 6239decccc
commit ba2873074e
9 changed files with 9 additions and 72 deletions

View File

@@ -10,12 +10,6 @@ use futures::Stream;
use crate::error::Result;
use crate::stream::ResponseEvent;
#[derive(Clone, Copy, Debug)]
pub enum ChatAggregationMode {
AggregatedOnly,
Streaming,
}
pub trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
fn aggregate(self) -> AggregatedChatStream<Self>
where

View File

@@ -6,7 +6,6 @@ use codex_protocol::protocol::SessionSource;
use futures::TryStreamExt;
use tokio::sync::mpsc;
use crate::aggregate::ChatAggregationMode;
use crate::api::ApiClient;
use crate::client::PayloadBuilder;
use crate::common::backoff;
@@ -25,14 +24,12 @@ use crate::stream::ResponseStream;
/// - `model`: Model identifier to use.
/// - `otel_event_manager`: Telemetry event manager for request/stream instrumentation.
/// - `session_source`: Session metadata, used to set subagent headers when applicable.
/// - `aggregation_mode`: How to emit streaming output (raw deltas or aggregated).
pub struct ChatCompletionsApiClientConfig {
pub http_client: reqwest::Client,
pub provider: ModelProviderInfo,
pub model: String,
pub otel_event_manager: OtelEventManager,
pub session_source: SessionSource,
pub aggregation_mode: ChatAggregationMode,
}
#[derive(Clone)]
@@ -91,14 +88,12 @@ impl ApiClient for ChatCompletionsApiClient {
});
let idle_timeout = self.config.provider.stream_idle_timeout();
let otel = self.config.otel_event_manager.clone();
let mode = self.config.aggregation_mode;
tokio::spawn(crate::client::sse::process_sse(
stream,
tx_event.clone(),
idle_timeout,
otel,
crate::decode::chat::ChatSseDecoder::new(mode),
crate::decode::chat::ChatSseDecoder::new(),
));
return Ok(ResponseStream { rx_event });

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use codex_protocol::protocol::SessionSource;
use reqwest::header::HeaderMap;
use crate::auth::AuthContext;
use crate::auth::AuthProvider;
@@ -33,11 +32,3 @@ pub async fn resolve_auth(auth_provider: &Option<Arc<dyn AuthProvider>>) -> Opti
None
}
}
/// Extract a provider request id, when present, from headers.
pub fn request_id_from_headers(headers: &HeaderMap) -> Option<String> {
headers
.get("cf-ray")
.and_then(|v| v.to_str().ok())
.map(std::string::ToString::to_string)
}

View File

@@ -27,13 +27,3 @@ pub trait ResponseDecoder {
otel: &OtelEventManager,
) -> Result<()>;
}
/// Optional trait to expose rate limit parsing where needed.
pub trait RateLimitProvider {
fn parse(
&self,
_headers: &reqwest::header::HeaderMap,
) -> Option<codex_protocol::protocol::RateLimitSnapshot> {
None
}
}

View File

@@ -6,12 +6,10 @@ use codex_protocol::models::ResponseItem;
use tokio::sync::mpsc;
use tracing::debug;
use crate::aggregate::ChatAggregationMode;
use crate::error::Result;
use crate::stream::ResponseEvent;
pub struct ChatSseDecoder {
aggregation_mode: ChatAggregationMode,
fn_call_state: FunctionCallState,
assistant_item: Option<ResponseItem>,
reasoning_item: Option<ResponseItem>,
@@ -26,9 +24,8 @@ struct FunctionCallState {
}
impl ChatSseDecoder {
pub fn new(aggregation_mode: ChatAggregationMode) -> Self {
pub fn new() -> Self {
Self {
aggregation_mode,
fn_call_state: FunctionCallState::default(),
assistant_item: None,
reasoning_item: None,
@@ -63,11 +60,9 @@ impl crate::client::ResponseDecoder for ChatSseDecoder {
if let Some(text) = piece.get("text").and_then(|t| t.as_str()) {
append_assistant_text(tx, &mut self.assistant_item, text.to_string())
.await;
if matches!(self.aggregation_mode, ChatAggregationMode::Streaming) {
let _ = tx
.send(Ok(ResponseEvent::OutputTextDelta(text.to_string())))
.await;
}
let _ = tx
.send(Ok(ResponseEvent::OutputTextDelta(text.to_string())))
.await;
}
}
}

View File

@@ -14,12 +14,6 @@ use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
#[derive(Debug, Deserialize)]
pub struct ResponseCompleted {
pub id: String,
pub usage: Option<TokenUsage>,
}
#[derive(Debug, Deserialize)]
pub struct StreamResponseCompleted {
pub id: String,
@@ -231,8 +225,6 @@ pub struct TextDelta {
pub async fn handle_stream_event(
event: StreamEvent,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
_response_completed: &mut Option<ResponseCompleted>,
_response_error: &mut Option<Error>,
otel_event_manager: &OtelEventManager,
) {
trace!("response event: {}", event.r#type);
@@ -475,16 +467,7 @@ impl crate::client::ResponseDecoder for ResponsesSseDecoder {
) -> Result<()> {
if let Ok(event) = serde_json::from_str::<StreamEvent>(json) {
otel_event_manager.sse_event_kind(&event.r#type);
let mut completed: Option<ResponseCompleted> = None;
let mut error: Option<Error> = None;
handle_stream_event(
event,
tx.clone(),
&mut completed,
&mut error,
otel_event_manager,
)
.await;
handle_stream_event(event, tx.clone(), otel_event_manager).await;
return Ok(());
}

View File

@@ -2,19 +2,18 @@ pub mod aggregate;
pub mod api;
pub mod auth;
pub mod chat;
pub mod client;
mod client;
mod common;
pub mod decode;
mod decode;
pub mod error;
pub mod model_provider;
pub mod payload;
mod payload;
pub mod prompt;
pub mod responses;
pub mod routed_client;
pub mod stream;
pub use crate::aggregate::AggregateStreamExt;
pub use crate::aggregate::ChatAggregationMode;
pub use crate::api::ApiClient;
pub use crate::auth::AuthContext;
pub use crate::auth::AuthProvider;

View File

@@ -6,7 +6,6 @@ use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionSource;
use crate::ApiClient;
use crate::ChatAggregationMode;
use crate::ChatCompletionsApiClient;
use crate::ChatCompletionsApiClientConfig;
use crate::Prompt;
@@ -29,7 +28,6 @@ pub struct RoutedApiClientConfig {
pub auth_provider: Option<Arc<dyn AuthProvider>>,
pub otel_event_manager: OtelEventManager,
pub session_source: SessionSource,
pub chat_aggregation_mode: ChatAggregationMode,
pub responses_fixture_path: Option<PathBuf>,
}
@@ -79,7 +77,6 @@ impl RoutedApiClient {
model: self.config.model.clone(),
otel_event_manager: self.config.otel_event_manager.clone(),
session_source: self.config.session_source.clone(),
aggregation_mode: self.config.chat_aggregation_mode,
};
let client = ChatCompletionsApiClient::new(cfg)?;
client.stream(prompt).await