use crate::error::TransportError; use crate::request::Request; use crate::request::Response; use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; use futures::stream::BoxStream; use http::HeaderMap; use http::Method; use http::StatusCode; pub type ByteStream = BoxStream<'static, Result>; pub struct StreamResponse { pub status: StatusCode, pub headers: HeaderMap, pub bytes: ByteStream, } #[async_trait] pub trait HttpTransport: Send + Sync { async fn execute(&self, req: Request) -> Result; async fn stream(&self, req: Request) -> Result; } #[derive(Clone, Debug)] pub struct ReqwestTransport { client: reqwest::Client, } impl ReqwestTransport { pub fn new(client: reqwest::Client) -> Self { Self { client } } fn build(&self, req: Request) -> Result { let mut builder = self .client .request( Method::from_bytes(req.method.as_str().as_bytes()).unwrap_or(Method::GET), &req.url, ) .headers(req.headers); if let Some(timeout) = req.timeout { builder = builder.timeout(timeout); } if let Some(body) = req.body { builder = builder.json(&body); } Ok(builder) } fn map_error(err: reqwest::Error) -> TransportError { if err.is_timeout() { TransportError::Timeout } else { TransportError::Network(err.to_string()) } } } #[async_trait] impl HttpTransport for ReqwestTransport { async fn execute(&self, req: Request) -> Result { let builder = self.build(req)?; let resp = builder.send().await.map_err(Self::map_error)?; let status = resp.status(); let headers = resp.headers().clone(); let bytes = resp.bytes().await.map_err(Self::map_error)?; if !status.is_success() { let body = String::from_utf8(bytes.to_vec()).ok(); return Err(TransportError::Http { status, headers: Some(headers), body, }); } Ok(Response { status, headers, body: bytes, }) } async fn stream(&self, req: Request) -> Result { let builder = self.build(req)?; let resp = builder.send().await.map_err(Self::map_error)?; let status = resp.status(); let headers = resp.headers().clone(); if !status.is_success() { let body = resp.text().await.ok(); return Err(TransportError::Http { status, headers: Some(headers), body, }); } let stream = resp .bytes_stream() .map(|result| result.map_err(Self::map_error)); Ok(StreamResponse { status, headers, bytes: Box::pin(stream), }) } }