This commit is contained in:
Ruslan Nigmatullin
2026-03-27 12:35:23 -07:00
parent 5537c4c014
commit db806e3aaa
2 changed files with 29 additions and 9 deletions

View File

@@ -34,7 +34,7 @@ struct ClientState {
pub(crate) struct ClientTracker {
clients: HashMap<ClientId, ClientState>,
join_set: JoinSet<()>,
join_set: JoinSet<ClientId>,
server_event_tx: mpsc::Sender<QueuedServerEnvelope>,
transport_event_tx: mpsc::Sender<TransportEvent>,
shutdown_token: CancellationToken,
@@ -55,8 +55,13 @@ impl ClientTracker {
}
}
pub(crate) async fn bookkeep_join_set(&mut self) {
while self.join_set.join_next().await.is_some() {}
pub(crate) async fn bookkeep_join_set(&mut self) -> Option<ClientId> {
while let Some(join_result) = self.join_set.join_next().await {
let Ok(client_id) = join_result else {
continue;
};
return Some(client_id);
}
futures::future::pending().await
}
@@ -168,7 +173,7 @@ impl ClientTracker {
}
let server_event_tx = self.server_event_tx.clone();
self.join_set.spawn(async move {
tokio::spawn(async move {
let server_envelope = QueuedServerEnvelope {
event: ServerEvent::Pong {
status: PongStatus::Unknown,
@@ -190,7 +195,7 @@ impl ClientTracker {
mut writer_rx: mpsc::Receiver<QueuedOutgoingMessage>,
mut status_rx: watch::Receiver<PongStatus>,
disconnect_token: CancellationToken,
) {
) -> ClientId {
loop {
let (event, write_complete_tx) = tokio::select! {
_ = disconnect_token.cancelled() => {
@@ -227,6 +232,7 @@ impl ClientTracker {
break;
}
}
client_id
}
pub(crate) async fn close_expired_clients(&mut self) -> Result<Vec<ClientId>, Stopped> {
@@ -244,7 +250,7 @@ impl ClientTracker {
Ok(expired_client_ids)
}
async fn close_client(&mut self, client_id: &ClientId) -> Result<(), Stopped> {
pub(super) async fn close_client(&mut self, client_id: &ClientId) -> Result<(), Stopped> {
let Some(client) = self.clients.remove(client_id) else {
return Ok(());
};
@@ -348,9 +354,15 @@ mod tests {
}
disconnect_sender.cancel();
timeout(Duration::from_secs(1), client_tracker.bookkeep_join_set())
let closed_client_id = timeout(Duration::from_secs(1), client_tracker.bookkeep_join_set())
.await
.expect_err("bookkeeping should process the closed task and stay pending");
.expect("bookkeeping should process the closed task")
.expect("closed task should return client id");
assert_eq!(closed_client_id, ClientId("client-1".to_string()));
client_tracker
.close_client(&closed_client_id)
.await
.expect("closed client should emit connection closed");
match transport_event_rx
.recv()

View File

@@ -380,7 +380,15 @@ impl RemoteControlWebsocket {
loop {
let incoming_message = tokio::select! {
_ = shutdown_token.cancelled() => return Ok(()),
_ = client_tracker.bookkeep_join_set() => continue,
client_id = client_tracker.bookkeep_join_set() => {
let Some(client_id) = client_id else {
continue;
};
if client_tracker.close_client(&client_id).await.is_err() {
return Ok(());
}
continue;
}
_ = idle_sweep_interval.tick() => {
let expired_client_ids = match client_tracker.close_expired_clients().await {
Ok(expired_client_ids) => expired_client_ids,