change api a bit

This commit is contained in:
Ruslan Nigmatullin
2026-03-18 15:13:10 -07:00
parent e83e9e5e41
commit a8f0bd49e3
4 changed files with 61 additions and 64 deletions

View File

@@ -26,13 +26,18 @@ use tokio::time::sleep_until;
use tracing::warn;
#[derive(Debug, Clone, PartialEq, Eq)]
/// Coalesced file change notification for a subscriber.
pub struct FileWatcherEvent {
/// Changed paths delivered in sorted order with duplicates removed.
pub paths: Vec<PathBuf>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
/// Path subscription registered by a [`FileWatcherSubscriber`].
pub struct WatchPath {
/// Root path to watch.
pub path: PathBuf,
/// Whether events below `path` should match recursively.
pub recursive: bool,
}
@@ -50,6 +55,7 @@ struct SubscriberState {
tx: WatchSender,
}
/// Receives coalesced change notifications for a single subscriber.
pub struct Receiver {
inner: Arc<ReceiverInner>,
}
@@ -65,6 +71,8 @@ struct ReceiverInner {
}
impl Receiver {
/// Waits for the next batch of changed paths, or returns `None` once the
/// corresponding subscriber has been removed and no more events can arrive.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
loop {
let notified = self.inner.notify.notified();
@@ -181,6 +189,7 @@ pub struct ThrottledWatchReceiver {
}
impl ThrottledWatchReceiver {
/// Creates a throttling wrapper around a raw watcher [`Receiver`].
pub fn new(rx: Receiver, interval: Duration) -> Self {
Self {
rx,
@@ -189,6 +198,8 @@ impl ThrottledWatchReceiver {
}
}
/// Receives the next event, enforcing the configured minimum delay after
/// the previous emission.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
if let Some(next_allowed) = self.next_allowed {
sleep_until(next_allowed).await;
@@ -202,25 +213,21 @@ impl ThrottledWatchReceiver {
}
}
/// Handle used to register watched paths for one logical consumer.
pub struct FileWatcherSubscriber {
id: SubscriberId,
file_watcher: std::sync::Weak<FileWatcher>,
rx: Option<Receiver>,
file_watcher: Arc<FileWatcher>,
}
impl FileWatcherSubscriber {
pub fn take_receiver(&mut self) -> Option<Receiver> {
self.rx.take()
}
/// Registers the provided paths for this subscriber and returns an RAII
/// guard that unregisters them on drop.
pub fn register_paths(&self, watched_paths: Vec<WatchPath>) -> WatchRegistration {
let watched_paths = dedupe_watched_paths(watched_paths);
if let Some(file_watcher) = self.file_watcher.upgrade() {
file_watcher.register_paths(self.id, &watched_paths);
}
self.file_watcher.register_paths(self.id, &watched_paths);
WatchRegistration {
file_watcher: self.file_watcher.clone(),
file_watcher: Arc::downgrade(&self.file_watcher),
subscriber_id: self.id,
watched_paths,
}
@@ -234,12 +241,11 @@ impl FileWatcherSubscriber {
impl Drop for FileWatcherSubscriber {
fn drop(&mut self) {
if let Some(file_watcher) = self.file_watcher.upgrade() {
file_watcher.remove_subscriber(self.id);
}
self.file_watcher.remove_subscriber(self.id);
}
}
/// RAII guard for a set of active path registrations.
pub struct WatchRegistration {
file_watcher: std::sync::Weak<FileWatcher>,
subscriber_id: SubscriberId,
@@ -254,12 +260,15 @@ impl Drop for WatchRegistration {
}
}
/// Multi-subscriber file watcher built on top of `notify`.
pub struct FileWatcher {
inner: Option<Mutex<FileWatcherInner>>,
state: Arc<RwLock<WatchState>>,
}
impl FileWatcher {
/// Creates a live filesystem watcher and starts its background event loop
/// on the current Tokio runtime.
pub fn new() -> notify::Result<Self> {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let raw_tx_clone = raw_tx;
@@ -279,6 +288,8 @@ impl FileWatcher {
Ok(file_watcher)
}
/// Creates an inert watcher that only supports test-driven synthetic
/// notifications.
pub fn noop() -> Self {
Self {
inner: None,
@@ -286,7 +297,9 @@ impl FileWatcher {
}
}
pub fn add_subscriber(self: &Arc<Self>) -> FileWatcherSubscriber {
/// Adds a new subscriber and returns both its registration handle and its
/// dedicated event receiver.
pub fn add_subscriber(self: &Arc<Self>) -> (FileWatcherSubscriber, Receiver) {
let (tx, rx) = watch_channel();
let mut state = self
.state
@@ -302,11 +315,11 @@ impl FileWatcher {
},
);
FileWatcherSubscriber {
let subscriber = FileWatcherSubscriber {
id: subscriber_id,
file_watcher: Arc::downgrade(self),
rx: Some(rx),
}
file_watcher: self.clone(),
};
(subscriber, rx)
}
fn register_paths(&self, subscriber_id: SubscriberId, watched_paths: &[WatchPath]) {

View File

@@ -114,7 +114,7 @@ fn is_mutating_event_filters_non_mutating_event_kinds() {
#[test]
fn register_dedupes_by_path_and_scope() {
let watcher = Arc::new(FileWatcher::noop());
let subscriber = watcher.add_subscriber();
let (subscriber, _rx) = watcher.add_subscriber();
let _first = subscriber.register_path(path("/tmp/skills"), false);
let _second = subscriber.register_path(path("/tmp/skills"), false);
let _third = subscriber.register_path(path("/tmp/skills"), true);
@@ -133,7 +133,7 @@ fn register_dedupes_by_path_and_scope() {
#[test]
fn watch_registration_drop_unregisters_paths() {
let watcher = Arc::new(FileWatcher::noop());
let subscriber = watcher.add_subscriber();
let (subscriber, _rx) = watcher.add_subscriber();
let registration = subscriber.register_path(path("/tmp/skills"), true);
drop(registration);
@@ -145,7 +145,7 @@ fn watch_registration_drop_unregisters_paths() {
fn subscriber_drop_unregisters_paths() {
let watcher = Arc::new(FileWatcher::noop());
let registration = {
let subscriber = watcher.add_subscriber();
let (subscriber, _rx) = watcher.add_subscriber();
subscriber.register_path(path("/tmp/skills"), true)
};
@@ -153,13 +153,17 @@ fn subscriber_drop_unregisters_paths() {
drop(registration);
}
#[test]
fn receiver_can_only_be_taken_once() {
#[tokio::test]
async fn receiver_closes_when_subscriber_drops() {
let watcher = Arc::new(FileWatcher::noop());
let mut subscriber = watcher.add_subscriber();
let (subscriber, mut rx) = watcher.add_subscriber();
assert_eq!(subscriber.take_receiver().is_some(), true);
assert_eq!(subscriber.take_receiver().is_none(), true);
drop(subscriber);
let closed = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("closed recv timeout");
assert_eq!(closed, None);
}
#[test]
@@ -169,7 +173,7 @@ fn recursive_registration_downgrades_to_non_recursive_after_drop() {
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new().expect("watcher"));
let subscriber = watcher.add_subscriber();
let (subscriber, _rx) = watcher.add_subscriber();
let non_recursive = subscriber.register_path(root.clone(), false);
let recursive = subscriber.register_path(root.clone(), true);
@@ -203,8 +207,8 @@ fn unregister_holds_state_lock_until_unwatch_finishes() {
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new().expect("watcher"));
let unregister_subscriber = watcher.add_subscriber();
let register_subscriber = watcher.add_subscriber();
let (unregister_subscriber, _unregister_rx) = watcher.add_subscriber();
let (register_subscriber, _register_rx) = watcher.add_subscriber();
let registration = unregister_subscriber.register_path(root.clone(), true);
let inner = watcher.inner.as_ref().expect("watcher inner");
@@ -251,20 +255,12 @@ fn unregister_holds_state_lock_until_unwatch_finishes() {
#[tokio::test]
async fn matching_subscribers_are_notified() {
let watcher = Arc::new(FileWatcher::noop());
let mut skills_subscriber = watcher.add_subscriber();
let mut plugins_subscriber = watcher.add_subscriber();
let (skills_subscriber, skills_rx) = watcher.add_subscriber();
let (plugins_subscriber, plugins_rx) = watcher.add_subscriber();
let _skills = skills_subscriber.register_path(path("/tmp/skills"), true);
let _plugins = plugins_subscriber.register_path(path("/tmp/plugins"), true);
let mut skills_rx = ThrottledWatchReceiver::new(
skills_subscriber.take_receiver().expect("skills receiver"),
TEST_THROTTLE_INTERVAL,
);
let mut plugins_rx = ThrottledWatchReceiver::new(
plugins_subscriber
.take_receiver()
.expect("plugins receiver"),
TEST_THROTTLE_INTERVAL,
);
let mut skills_rx = ThrottledWatchReceiver::new(skills_rx, TEST_THROTTLE_INTERVAL);
let mut plugins_rx = ThrottledWatchReceiver::new(plugins_rx, TEST_THROTTLE_INTERVAL);
watcher
.send_paths_for_test(vec![path("/tmp/skills/rust/SKILL.md")])
@@ -288,12 +284,9 @@ async fn matching_subscribers_are_notified() {
#[tokio::test]
async fn non_recursive_watch_ignores_grandchildren() {
let watcher = Arc::new(FileWatcher::noop());
let mut subscriber = watcher.add_subscriber();
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills"), false);
let mut rx = ThrottledWatchReceiver::new(
subscriber.take_receiver().expect("subscriber receiver"),
TEST_THROTTLE_INTERVAL,
);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
watcher
.send_paths_for_test(vec![path("/tmp/skills/nested/SKILL.md")])
@@ -306,12 +299,9 @@ async fn non_recursive_watch_ignores_grandchildren() {
#[tokio::test]
async fn ancestor_events_notify_child_watches() {
let watcher = Arc::new(FileWatcher::noop());
let mut subscriber = watcher.add_subscriber();
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills/rust/SKILL.md"), false);
let mut rx = ThrottledWatchReceiver::new(
subscriber.take_receiver().expect("subscriber receiver"),
TEST_THROTTLE_INTERVAL,
);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
watcher.send_paths_for_test(vec![path("/tmp/skills")]).await;
@@ -330,12 +320,9 @@ async fn ancestor_events_notify_child_watches() {
#[tokio::test]
async fn spawn_event_loop_filters_non_mutating_events() {
let watcher = Arc::new(FileWatcher::noop());
let mut subscriber = watcher.add_subscriber();
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills"), true);
let mut rx = ThrottledWatchReceiver::new(
subscriber.take_receiver().expect("subscriber receiver"),
TEST_THROTTLE_INTERVAL,
);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
watcher.spawn_event_loop_for_test(raw_rx);

View File

@@ -33,12 +33,9 @@ pub(crate) struct SkillsWatcher {
}
impl SkillsWatcher {
pub(crate) fn new(file_watcher: Arc<FileWatcher>) -> Self {
pub(crate) fn new(file_watcher: &Arc<FileWatcher>) -> Self {
let (subscriber, rx) = file_watcher.add_subscriber();
let (tx, _) = broadcast::channel(128);
let mut subscriber = file_watcher.add_subscriber();
let Some(rx) = subscriber.take_receiver() else {
unreachable!("file watcher receiver is only taken once");
};
let skills_watcher = Self {
subscriber,
tx: tx.clone(),
@@ -48,7 +45,7 @@ impl SkillsWatcher {
}
pub(crate) fn noop() -> Self {
Self::new(Arc::new(FileWatcher::noop()))
Self::new(&Arc::new(FileWatcher::noop()))
}
pub(crate) fn subscribe(&self) -> broadcast::Receiver<SkillsWatcherEvent> {
@@ -95,7 +92,7 @@ mod tests {
#[tokio::test]
async fn forwards_file_watcher_events() {
let file_watcher = Arc::new(FileWatcher::noop());
let skills_watcher = SkillsWatcher::new(Arc::clone(&file_watcher));
let skills_watcher = SkillsWatcher::new(&file_watcher);
let mut rx = skills_watcher.subscribe();
let _registration = skills_watcher
.subscriber

View File

@@ -95,7 +95,7 @@ fn build_skills_watcher(skills_manager: Arc<SkillsManager>) -> Arc<SkillsWatcher
Arc::new(FileWatcher::noop())
}
};
let skills_watcher = Arc::new(SkillsWatcher::new(file_watcher));
let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher));
let mut rx = skills_watcher.subscribe();
let skills_manager = Arc::clone(&skills_manager);