mirror of
https://github.com/openai/codex.git
synced 2026-05-01 03:42:05 +03:00
Turn-state sticky routing per turn (#9332)
- capture the header from SSE/WS handshakes, store it per ModelClientSession using `Oncelock`, echo it on turn-scoped requests, and add SSE+WS integration tests for within-turn persistence + cross-turn reset. - keep `x-codex-turn-state` sticky within a user turn to maintain routing continuity for retries/tool follow-ups.
This commit is contained in:
@@ -87,6 +87,7 @@ impl<T: HttpTransport, A: AuthProvider> ChatClient<T, A> {
|
||||
extra_headers,
|
||||
RequestCompression::None,
|
||||
spawn_chat_stream,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use http::HeaderMap;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
||||
@@ -36,6 +37,7 @@ pub struct ResponsesOptions {
|
||||
pub session_source: Option<SessionSource>,
|
||||
pub extra_headers: HeaderMap,
|
||||
pub compression: Compression,
|
||||
pub turn_state: Option<Arc<OnceLock<String>>>,
|
||||
}
|
||||
|
||||
impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
@@ -58,9 +60,15 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesRequest,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
self.stream(request.body, request.headers, request.compression)
|
||||
.await
|
||||
self.stream(
|
||||
request.body,
|
||||
request.headers,
|
||||
request.compression,
|
||||
turn_state,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all, err)]
|
||||
@@ -80,6 +88,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
session_source,
|
||||
extra_headers,
|
||||
compression,
|
||||
turn_state,
|
||||
} = options;
|
||||
|
||||
let request = ResponsesRequestBuilder::new(model, &prompt.instructions, &prompt.input)
|
||||
@@ -96,7 +105,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
.compression(compression)
|
||||
.build(self.streaming.provider())?;
|
||||
|
||||
self.stream_request(request).await
|
||||
self.stream_request(request, turn_state).await
|
||||
}
|
||||
|
||||
fn path(&self) -> &'static str {
|
||||
@@ -111,6 +120,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
body: Value,
|
||||
extra_headers: HeaderMap,
|
||||
compression: Compression,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let compression = match compression {
|
||||
Compression::None => RequestCompression::None,
|
||||
@@ -124,6 +134,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
extra_headers,
|
||||
compression,
|
||||
spawn_response_stream,
|
||||
turn_state,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -27,6 +28,7 @@ use tracing::trace;
|
||||
use url::Url;
|
||||
|
||||
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
|
||||
pub struct ResponsesWebsocketConnection {
|
||||
stream: Arc<Mutex<Option<WsStream>>>,
|
||||
@@ -100,6 +102,7 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
||||
pub async fn connect(
|
||||
&self,
|
||||
extra_headers: HeaderMap,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponsesWebsocketConnection, ApiError> {
|
||||
let ws_url = Url::parse(&self.provider.url_for_path("responses"))
|
||||
.map_err(|err| ApiError::Stream(format!("failed to build websocket URL: {err}")))?;
|
||||
@@ -108,7 +111,7 @@ impl<A: AuthProvider> ResponsesWebsocketClient<A> {
|
||||
headers.extend(extra_headers);
|
||||
apply_auth_headers(&mut headers, &self.auth);
|
||||
|
||||
let stream = connect_websocket(ws_url, headers).await?;
|
||||
let stream = connect_websocket(ws_url, headers, turn_state).await?;
|
||||
Ok(ResponsesWebsocketConnection::new(
|
||||
stream,
|
||||
self.provider.stream_idle_timeout,
|
||||
@@ -130,16 +133,28 @@ fn apply_auth_headers(headers: &mut HeaderMap, auth: &impl AuthProvider) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_websocket(url: Url, headers: HeaderMap) -> Result<WsStream, ApiError> {
|
||||
async fn connect_websocket(
|
||||
url: Url,
|
||||
headers: HeaderMap,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<WsStream, ApiError> {
|
||||
let mut request = url
|
||||
.clone()
|
||||
.into_client_request()
|
||||
.map_err(|err| ApiError::Stream(format!("failed to build websocket request: {err}")))?;
|
||||
request.headers_mut().extend(headers);
|
||||
|
||||
let (stream, _) = tokio_tungstenite::connect_async(request)
|
||||
let (stream, response) = tokio_tungstenite::connect_async(request)
|
||||
.await
|
||||
.map_err(|err| map_ws_error(err, &url))?;
|
||||
if let Some(turn_state) = turn_state
|
||||
&& let Some(header_value) = response
|
||||
.headers()
|
||||
.get(X_CODEX_TURN_STATE_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
{
|
||||
let _ = turn_state.set(header_value.to_string());
|
||||
}
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use http::HeaderMap;
|
||||
use http::Method;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
|
||||
pub(crate) struct StreamingClient<T: HttpTransport, A: AuthProvider> {
|
||||
@@ -23,6 +24,13 @@ pub(crate) struct StreamingClient<T: HttpTransport, A: AuthProvider> {
|
||||
sse_telemetry: Option<Arc<dyn SseTelemetry>>,
|
||||
}
|
||||
|
||||
type StreamSpawner = fn(
|
||||
StreamResponse,
|
||||
Duration,
|
||||
Option<Arc<dyn SseTelemetry>>,
|
||||
Option<Arc<OnceLock<String>>>,
|
||||
) -> ResponseStream;
|
||||
|
||||
impl<T: HttpTransport, A: AuthProvider> StreamingClient<T, A> {
|
||||
pub(crate) fn new(transport: T, provider: Provider, auth: A) -> Self {
|
||||
Self {
|
||||
@@ -54,7 +62,8 @@ impl<T: HttpTransport, A: AuthProvider> StreamingClient<T, A> {
|
||||
body: Value,
|
||||
extra_headers: HeaderMap,
|
||||
compression: RequestCompression,
|
||||
spawner: fn(StreamResponse, Duration, Option<Arc<dyn SseTelemetry>>) -> ResponseStream,
|
||||
spawner: StreamSpawner,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let builder = || {
|
||||
let mut req = self.provider.build_request(Method::POST, path);
|
||||
@@ -80,6 +89,7 @@ impl<T: HttpTransport, A: AuthProvider> StreamingClient<T, A> {
|
||||
stream_response,
|
||||
self.provider.stream_idle_timeout,
|
||||
self.sse_telemetry.clone(),
|
||||
turn_state,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
@@ -21,7 +23,8 @@ use tracing::trace;
|
||||
pub(crate) fn spawn_chat_stream(
|
||||
stream_response: StreamResponse,
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<std::sync::Arc<dyn SseTelemetry>>,
|
||||
telemetry: Option<Arc<dyn SseTelemetry>>,
|
||||
_turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> ResponseStream {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent, ApiError>>(1600);
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -16,6 +16,7 @@ use serde_json::Value;
|
||||
use std::io::BufRead;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
@@ -49,6 +50,7 @@ pub fn spawn_response_stream(
|
||||
stream_response: StreamResponse,
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn SseTelemetry>>,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> ResponseStream {
|
||||
let rate_limits = parse_rate_limit(&stream_response.headers);
|
||||
let models_etag = stream_response
|
||||
@@ -56,6 +58,14 @@ pub fn spawn_response_stream(
|
||||
.get("X-Models-Etag")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(ToString::to_string);
|
||||
if let Some(turn_state) = turn_state.as_ref()
|
||||
&& let Some(header_value) = stream_response
|
||||
.headers
|
||||
.get("x-codex-turn-state")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
{
|
||||
let _ = turn_state.set(header_value.to_string());
|
||||
}
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent, ApiError>>(1600);
|
||||
tokio::spawn(async move {
|
||||
if let Some(snapshot) = rate_limits {
|
||||
|
||||
Reference in New Issue
Block a user