mirror of
https://github.com/openai/codex.git
synced 2026-05-06 06:12:59 +03:00
codex: address PR review feedback (#18265)
This commit is contained in:
@@ -314,27 +314,35 @@ async fn send_message_to_connection(
|
||||
if connection_state.can_disconnect() {
|
||||
match writer.try_send(queued_message) {
|
||||
Ok(()) => false,
|
||||
Err(mpsc::error::TrySendError::Full(queued_message)) => match writer
|
||||
Err(mpsc::error::TrySendError::Full(queued_message)) => {
|
||||
let Some(disconnect_sender) = connection_state.disconnect_sender.clone() else {
|
||||
unreachable!("disconnectable connection must have a disconnect sender");
|
||||
};
|
||||
// WebSocket clients are marked disconnectable so a stuck writer
|
||||
// cannot block the outbound router forever. Still, normal turns
|
||||
// can briefly burst past the per-connection queue capacity while
|
||||
// the writer task is healthy, so give it a bounded chance to
|
||||
// drain before treating the client as slow.
|
||||
.send_timeout(queued_message, OUTBOUND_QUEUE_FULL_GRACE)
|
||||
.await
|
||||
{
|
||||
Ok(()) => false,
|
||||
Err(mpsc::error::SendTimeoutError::Timeout(_)) => {
|
||||
warn!(
|
||||
"disconnecting slow connection after outbound queue remained full for {:?}: {connection_id:?}",
|
||||
OUTBOUND_QUEUE_FULL_GRACE
|
||||
);
|
||||
disconnect_connection(connections, connection_id)
|
||||
}
|
||||
Err(mpsc::error::SendTimeoutError::Closed(_)) => {
|
||||
disconnect_connection(connections, connection_id)
|
||||
}
|
||||
},
|
||||
// the writer task is healthy, so wait for capacity off the
|
||||
// router path before treating the client as slow.
|
||||
tokio::spawn(async move {
|
||||
match writer
|
||||
.send_timeout(queued_message, OUTBOUND_QUEUE_FULL_GRACE)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::SendTimeoutError::Timeout(_)) => {
|
||||
warn!(
|
||||
"disconnecting slow connection after outbound queue remained full for {:?}: {connection_id:?}",
|
||||
OUTBOUND_QUEUE_FULL_GRACE
|
||||
);
|
||||
disconnect_sender.cancel();
|
||||
}
|
||||
Err(mpsc::error::SendTimeoutError::Closed(_)) => {
|
||||
disconnect_sender.cancel();
|
||||
}
|
||||
}
|
||||
});
|
||||
false
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
disconnect_connection(connections, connection_id)
|
||||
}
|
||||
@@ -927,41 +935,31 @@ mod tests {
|
||||
),
|
||||
);
|
||||
|
||||
let mut route_task = tokio::spawn(async move {
|
||||
route_outgoing_envelope(
|
||||
&mut connections,
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message: OutgoingMessage::AppServerNotification(
|
||||
ServerNotification::ConfigWarning(ConfigWarningNotification {
|
||||
summary: "second".to_string(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
}),
|
||||
),
|
||||
write_complete_tx: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
connections
|
||||
});
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(25), &mut route_task)
|
||||
.await
|
||||
.is_err(),
|
||||
"routing should wait briefly for queue capacity"
|
||||
);
|
||||
route_outgoing_envelope(
|
||||
&mut connections,
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message: OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning(
|
||||
ConfigWarningNotification {
|
||||
summary: "second".to_string(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
},
|
||||
)),
|
||||
write_complete_tx: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let first = writer_rx
|
||||
.recv()
|
||||
.await
|
||||
.expect("first queued message should be readable");
|
||||
let connections = timeout(Duration::from_millis(100), route_task)
|
||||
let second = timeout(Duration::from_millis(100), writer_rx.recv())
|
||||
.await
|
||||
.expect("routing should finish after the first queued message is drained")
|
||||
.expect("routing task should succeed");
|
||||
.expect("second notification should be delivered after queue capacity returns")
|
||||
.expect("second notification should exist");
|
||||
|
||||
assert!(connections.contains_key(&connection_id));
|
||||
assert!(!disconnect_token.is_cancelled());
|
||||
@@ -971,9 +969,6 @@ mod tests {
|
||||
ConfigWarningNotification { summary, .. }
|
||||
)) if summary == "queued"
|
||||
));
|
||||
let second = writer_rx
|
||||
.try_recv()
|
||||
.expect("second notification should be delivered once the queue has room");
|
||||
assert!(matches!(
|
||||
second.message,
|
||||
OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning(
|
||||
@@ -983,7 +978,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disconnectable_connection_disconnects_after_queue_grace_expires() {
|
||||
async fn disconnectable_connection_requests_disconnect_after_queue_grace_expires() {
|
||||
let connection_id = ConnectionId(2);
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel(1);
|
||||
let disconnect_token = CancellationToken::new();
|
||||
@@ -1014,35 +1009,34 @@ mod tests {
|
||||
),
|
||||
);
|
||||
|
||||
let route_task = tokio::spawn(async move {
|
||||
route_outgoing_envelope(
|
||||
&mut connections,
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message: OutgoingMessage::AppServerNotification(
|
||||
ServerNotification::ConfigWarning(ConfigWarningNotification {
|
||||
summary: "second".to_string(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
}),
|
||||
),
|
||||
write_complete_tx: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
connections
|
||||
});
|
||||
route_outgoing_envelope(
|
||||
&mut connections,
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message: OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning(
|
||||
ConfigWarningNotification {
|
||||
summary: "second".to_string(),
|
||||
details: None,
|
||||
path: None,
|
||||
range: None,
|
||||
},
|
||||
)),
|
||||
write_complete_tx: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let connections = timeout(
|
||||
assert!(connections.contains_key(&connection_id));
|
||||
timeout(
|
||||
OUTBOUND_QUEUE_FULL_GRACE + Duration::from_millis(100),
|
||||
route_task,
|
||||
async {
|
||||
while !disconnect_token.is_cancelled() {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("routing should finish after the queue grace expires")
|
||||
.expect("routing task should succeed");
|
||||
|
||||
assert!(!connections.contains_key(&connection_id));
|
||||
.expect("full queue should request disconnect after the grace expires");
|
||||
assert!(disconnect_token.is_cancelled());
|
||||
let original_message = writer_rx
|
||||
.try_recv()
|
||||
|
||||
Reference in New Issue
Block a user