Surface error on WS close, only retry retryable errors

This commit is contained in:
Rasmus Rygaard
2026-02-27 20:07:19 -08:00
parent c1851be1ed
commit 74d0570cd7
8 changed files with 201 additions and 5 deletions

View File

@@ -33,6 +33,7 @@ use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Error as WsError;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tracing::debug;
use tracing::error;
use tracing::info;
@@ -40,6 +41,7 @@ use tracing::trace;
use tungstenite::extensions::ExtensionsConfig;
use tungstenite::extensions::compression::deflate::DeflateConfig;
use tungstenite::protocol::WebSocketConfig;
use tungstenite::protocol::frame::coding::CloseCode;
use url::Url;
struct WsStream {
@@ -419,6 +421,41 @@ fn map_ws_error(err: WsError, url: &Url) -> ApiError {
}
}
fn map_websocket_close(close_frame: Option<&CloseFrame>) -> ApiError {
let message = format_websocket_close_message(close_frame);
match close_frame {
Some(frame) if should_retry_websocket_close_code(frame.code) => ApiError::Retryable {
message,
delay: None,
},
Some(_) => ApiError::NonRetryableStream(message),
None => ApiError::Stream(message),
}
}
fn format_websocket_close_message(close_frame: Option<&CloseFrame>) -> String {
let Some(frame) = close_frame else {
return "websocket closed by server before response.completed".to_string();
};
let code = u16::from(frame.code);
if frame.reason.is_empty() {
format!("websocket closed by server before response.completed (code {code})")
} else {
format!(
"websocket closed by server before response.completed (code {code}: {})",
frame.reason
)
}
}
fn should_retry_websocket_close_code(code: CloseCode) -> bool {
matches!(
code,
CloseCode::Away | CloseCode::Error | CloseCode::Restart | CloseCode::Again
)
}
#[derive(Debug, Deserialize)]
struct WrappedWebsocketError {
code: Option<String>,
@@ -607,10 +644,8 @@ async fn run_websocket_response_stream(
Message::Binary(_) => {
return Err(ApiError::Stream("unexpected binary websocket event".into()));
}
Message::Close(_) => {
return Err(ApiError::Stream(
"websocket closed by server before response.completed".into(),
));
Message::Close(close_frame) => {
return Err(map_websocket_close(close_frame.as_ref()));
}
Message::Frame(_) => {}
Message::Ping(_) | Message::Pong(_) => {}
@@ -768,6 +803,41 @@ mod tests {
assert!(api_error.is_none());
}
#[test]
fn websocket_close_bad_code_is_non_retryable_and_surfaces_reason() {
let close_frame = CloseFrame {
code: CloseCode::Bad(108),
reason: "server-side validation failed".into(),
};
let api_error = map_websocket_close(Some(&close_frame));
let ApiError::NonRetryableStream(message) = api_error else {
panic!("expected ApiError::NonRetryableStream");
};
assert_eq!(
message,
"websocket closed by server before response.completed (code 108: server-side validation failed)"
);
}
#[test]
fn websocket_close_again_is_retryable_and_surfaces_reason() {
let close_frame = CloseFrame {
code: CloseCode::Again,
reason: "retry after rebalance".into(),
};
let api_error = map_websocket_close(Some(&close_frame));
let ApiError::Retryable { message, delay } = api_error else {
panic!("expected ApiError::Retryable");
};
assert_eq!(
message,
"websocket closed by server before response.completed (code 1013: retry after rebalance)"
);
assert_eq!(delay, None);
}
#[test]
fn merge_request_headers_matches_http_precedence() {
let mut provider_headers = HeaderMap::new();