Simplify subscribers

This commit is contained in:
jif-oai
2026-03-25 10:51:55 +00:00
parent 3037b80396
commit 8cae6b69a3
4 changed files with 67 additions and 47 deletions

View File

@@ -30,6 +30,7 @@ use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecSessionEvent;
use crate::process::SESSION_EVENT_CHANNEL_CAPACITY;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
@@ -101,7 +102,7 @@ struct Inner {
// process on the connection. Keep a local process_id -> sender registry so
// we can demux those connection-global notifications into the single
// process-scoped event channel returned by ExecBackend::start().
sessions: ArcSwap<HashMap<String, mpsc::UnboundedSender<ExecSessionEvent>>>,
sessions: ArcSwap<HashMap<String, mpsc::Sender<ExecSessionEvent>>>,
// ArcSwap makes reads cheap on the hot notification path, but writes still
// need serialization so concurrent register/remove operations do not
// overwrite each other's copy-on-write updates.
@@ -319,8 +320,8 @@ impl ExecServerClient {
pub(crate) async fn register_session(
&self,
process_id: &str,
) -> Result<mpsc::UnboundedReceiver<ExecSessionEvent>, ExecServerError> {
let (events_tx, events_rx) = mpsc::unbounded_channel();
) -> Result<mpsc::Receiver<ExecSessionEvent>, ExecServerError> {
let (events_tx, events_rx) = mpsc::channel(SESSION_EVENT_CHANNEL_CAPACITY);
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
let sessions = self.inner.sessions.load();
if sessions.contains_key(process_id) {
@@ -421,11 +422,13 @@ async fn handle_server_notification(
// each event to the single local receiver that owns this process.
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Output {
seq: params.seq,
stream: params.stream,
chunk: params.chunk.into_inner(),
});
let _ = events_tx
.send(ExecSessionEvent::Output {
seq: params.seq,
stream: params.stream,
chunk: params.chunk.into_inner(),
})
.await;
}
}
EXEC_EXITED_METHOD => {
@@ -433,10 +436,12 @@ async fn handle_server_notification(
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Exited {
seq: params.seq,
exit_code: params.exit_code,
});
let _ = events_tx
.send(ExecSessionEvent::Exited {
seq: params.seq,
exit_code: params.exit_code,
})
.await;
}
}
EXEC_CLOSED_METHOD => {
@@ -456,7 +461,9 @@ async fn handle_server_notification(
events_tx
};
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Closed { seq: params.seq });
let _ = events_tx
.send(ExecSessionEvent::Closed { seq: params.seq })
.await;
}
}
other => {