mirror of
https://github.com/openai/codex.git
synced 2026-05-05 22:01:37 +03:00
fix(core): prevent hanging turn/start due to websocket warming issues (#14838)
## Description This PR fixes a bad first-turn failure mode in app-server when the startup websocket prewarm hangs. Before this change, `initialize -> thread/start -> turn/start` could sit behind the prewarm for up to five minutes, so the client would not see `turn/started`, and even `turn/interrupt` would block because the turn had not actually started yet. Now, we: - set a (configurable) timeout of 15s for websocket startup time, exposed as `websocket_startup_timeout_ms` in config.toml - `turn/started` is sent immediately on `turn/start` even if the websocket is still connecting - `turn/interrupt` can be used to cancel a turn that is still waiting on the websocket warmup - the turn task will wait for the full 15s websocket warming timeout before falling back ## Why The old behavior made app-server feel stuck at exactly the moment the client expects turn lifecycle events to start flowing. That was especially painful for external clients, because from their point of view the server had accepted the request but then went silent for minutes. ## Configuring the websocket startup timeout Can set it in config.toml like this: ``` [model_providers.openai] supports_websockets = true websocket_connect_timeout_ms = 15000 ```
This commit is contained in:
@@ -117,6 +117,9 @@ const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=20
|
||||
const RESPONSES_ENDPOINT: &str = "/responses";
|
||||
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
|
||||
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
|
||||
#[cfg(test)]
|
||||
pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
|
||||
Duration::from_millis(crate::model_provider_info::DEFAULT_WEBSOCKET_CONNECT_TIMEOUT_MS);
|
||||
pub fn ws_version_from_features(config: &Config) -> bool {
|
||||
config
|
||||
.features
|
||||
@@ -310,6 +313,27 @@ impl ModelClient {
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner) = websocket_session;
|
||||
}
|
||||
|
||||
pub(crate) fn force_http_fallback(
|
||||
&self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
model_info: &ModelInfo,
|
||||
) -> bool {
|
||||
let websocket_enabled = self.responses_websocket_enabled(model_info);
|
||||
let activated =
|
||||
websocket_enabled && !self.state.disable_websockets.swap(true, Ordering::Relaxed);
|
||||
if activated {
|
||||
warn!("falling back to HTTP");
|
||||
session_telemetry.counter(
|
||||
"codex.transport.fallback_to_http",
|
||||
/*inc*/ 1,
|
||||
&[("from_wire_api", "responses_websocket")],
|
||||
);
|
||||
}
|
||||
|
||||
self.store_cached_websocket_session(WebsocketSession::default());
|
||||
activated
|
||||
}
|
||||
|
||||
/// Compacts the current conversation history using the Compact endpoint.
|
||||
///
|
||||
/// This is a unary call (no streaming) that returns a new list of
|
||||
@@ -538,15 +562,22 @@ impl ModelClient {
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
);
|
||||
let websocket_connect_timeout = self.state.provider.websocket_connect_timeout();
|
||||
let start = Instant::now();
|
||||
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
let result = match tokio::time::timeout(
|
||||
websocket_connect_timeout,
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth).connect(
|
||||
headers,
|
||||
crate::default_client::default_headers(),
|
||||
turn_state,
|
||||
Some(websocket_telemetry),
|
||||
)
|
||||
.await;
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(_) => Err(ApiError::Transport(TransportError::Timeout)),
|
||||
};
|
||||
let error_message = result.as_ref().err().map(telemetry_api_error_message);
|
||||
let response_debug = result
|
||||
.as_ref()
|
||||
@@ -637,13 +668,12 @@ impl Drop for ModelClientSession {
|
||||
}
|
||||
|
||||
impl ModelClientSession {
|
||||
fn activate_http_fallback(&self, websocket_enabled: bool) -> bool {
|
||||
websocket_enabled
|
||||
&& !self
|
||||
.client
|
||||
.state
|
||||
.disable_websockets
|
||||
.swap(true, Ordering::Relaxed)
|
||||
fn reset_websocket_session(&mut self) {
|
||||
self.websocket_session.connection = None;
|
||||
self.websocket_session.last_request = None;
|
||||
self.websocket_session.last_response_rx = None;
|
||||
self.websocket_session
|
||||
.set_connection_reused(/*connection_reused*/ false);
|
||||
}
|
||||
|
||||
fn build_responses_request(
|
||||
@@ -896,7 +926,7 @@ impl ModelClientSession {
|
||||
.turn_state
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::clone(&self.turn_state));
|
||||
let new_conn = self
|
||||
let new_conn = match self
|
||||
.client
|
||||
.connect_websocket(
|
||||
session_telemetry,
|
||||
@@ -907,7 +937,16 @@ impl ModelClientSession {
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(new_conn) => new_conn,
|
||||
Err(err) => {
|
||||
if matches!(err, ApiError::Transport(TransportError::Timeout)) {
|
||||
self.reset_websocket_session();
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
self.websocket_session.connection = Some(new_conn);
|
||||
self.websocket_session
|
||||
.set_connection_reused(/*connection_reused*/ false);
|
||||
@@ -1130,15 +1169,12 @@ impl ModelClientSession {
|
||||
|
||||
let ws_request = self.prepare_websocket_request(ws_payload, &request);
|
||||
self.websocket_session.last_request = Some(request);
|
||||
let stream_result = self
|
||||
.websocket_session
|
||||
.connection
|
||||
.as_ref()
|
||||
.ok_or_else(|| {
|
||||
map_api_error(ApiError::Stream(
|
||||
"websocket connection is unavailable".to_string(),
|
||||
))
|
||||
})?
|
||||
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
|
||||
map_api_error(ApiError::Stream(
|
||||
"websocket connection is unavailable".to_string(),
|
||||
))
|
||||
})?;
|
||||
let stream_result = stream_result
|
||||
.stream_request(ws_request, self.websocket_session.connection_reused())
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
@@ -1296,22 +1332,10 @@ impl ModelClientSession {
|
||||
session_telemetry: &SessionTelemetry,
|
||||
model_info: &ModelInfo,
|
||||
) -> bool {
|
||||
let websocket_enabled = self.client.responses_websocket_enabled(model_info);
|
||||
let activated = self.activate_http_fallback(websocket_enabled);
|
||||
if activated {
|
||||
warn!("falling back to HTTP");
|
||||
session_telemetry.counter(
|
||||
"codex.transport.fallback_to_http",
|
||||
/*inc*/ 1,
|
||||
&[("from_wire_api", "responses_websocket")],
|
||||
);
|
||||
|
||||
self.websocket_session.connection = None;
|
||||
self.websocket_session.last_request = None;
|
||||
self.websocket_session.last_response_rx = None;
|
||||
self.websocket_session
|
||||
.set_connection_reused(/*connection_reused*/ false);
|
||||
}
|
||||
let activated = self
|
||||
.client
|
||||
.force_http_fallback(session_telemetry, model_info);
|
||||
self.websocket_session = WebsocketSession::default();
|
||||
activated
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user