This commit is contained in:
jif-oai
2025-11-11 15:16:07 +00:00
parent 0c609d441b
commit b7dcc8ef5c
7 changed files with 133 additions and 62 deletions

View File

@@ -16,6 +16,7 @@ use crate::error::Error;
use crate::error::Result;
use crate::stream::ResponseEvent;
use crate::stream::ResponseStream;
use crate::stream::WireResponseStream;
use codex_provider_config::ModelProviderInfo;
#[derive(Clone)]
@@ -34,6 +35,7 @@ pub struct ResponsesApiClientConfig {
pub conversation_id: ConversationId,
pub auth_provider: Option<Arc<dyn AuthProvider>>,
pub otel_event_manager: OtelEventManager,
pub extra_headers: Vec<(String, String)>,
}
#[derive(Clone)]
@@ -77,10 +79,21 @@ impl PayloadClient for ResponsesApiClient {
.unwrap_or_else(|_| "<unable to serialize payload>".to_string())
);
let extra_headers = vec![
("conversation_id", self.config.conversation_id.to_string()),
("session_id", self.config.conversation_id.to_string()),
let mut owned_headers: Vec<(String, String)> = vec![
(
"conversation_id".to_string(),
self.config.conversation_id.to_string(),
),
(
"session_id".to_string(),
self.config.conversation_id.to_string(),
),
];
owned_headers.extend(self.config.extra_headers.iter().cloned());
let extra_headers: Vec<(&str, String)> = owned_headers
.iter()
.map(|(k, v)| (k.as_str(), v.clone()))
.collect();
let mut req_builder = crate::client::http::build_request(
&self.config.http_client,
&self.config.provider,
@@ -140,3 +153,25 @@ impl PayloadClient for ResponsesApiClient {
Ok(crate::stream::EventStream::from_receiver(rx_event))
}
}
impl ResponsesApiClient {
pub async fn stream_payload_wire(
&self,
payload_json: &Value,
session_source: Option<&codex_protocol::protocol::SessionSource>,
) -> Result<WireResponseStream> {
use futures::StreamExt;
let legacy = self.stream_payload(payload_json, session_source).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1600);
tokio::spawn(async move {
futures::pin_mut!(legacy);
while let Some(item) = legacy.next().await {
let converted = item.and_then(|ev| crate::wire::map_response_event_to_wire(ev));
if tx.send(converted).await.is_err() {
break;
}
}
});
Ok(crate::stream::EventStream::from_receiver(rx))
}
}