Compare commits

..

4 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
6007175c77 Merge branch 'main' of github.com:openai/codex into ruslan/core-fs-watcher 2026-03-20 13:42:24 -07:00
Ruslan Nigmatullin
b22d65b244 lint 2026-03-18 15:32:59 -07:00
Ruslan Nigmatullin
a8f0bd49e3 change api a bit 2026-03-18 15:13:23 -07:00
Ruslan Nigmatullin
e83e9e5e41 core: Make FileWatcher reusable 2026-03-18 14:03:24 -07:00
13 changed files with 984 additions and 462 deletions

View File

@@ -203,8 +203,6 @@ pub(crate) struct PreviousTurnSettings {
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::git_info::get_git_repo_root;
use crate::guardian::GuardianReviewSessionManager;
use crate::hook_runtime::PendingInputHookDisposition;
@@ -290,6 +288,8 @@ use crate::skills::injection::ToolMentionKind;
use crate::skills::injection::app_id_from_path;
use crate::skills::injection::tool_kind_for_path;
use crate::skills::resolve_skill_dependencies_for_turn;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
@@ -370,7 +370,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) skills_watcher: Arc<SkillsWatcher>,
pub(crate) conversation_history: InitialHistory,
pub(crate) session_source: SessionSource,
pub(crate) agent_control: AgentControl,
@@ -423,7 +423,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
conversation_history,
session_source,
agent_control,
@@ -611,7 +611,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager.clone(),
file_watcher,
skills_watcher,
agent_control,
)
.await
@@ -1262,13 +1262,13 @@ impl Session {
self.out_of_band_elicitation_paused.send_replace(paused);
}
fn start_file_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.file_watcher.subscribe();
fn start_skills_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.skills_watcher.subscribe();
let weak_sess = Arc::downgrade(self);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
Ok(SkillsWatcherEvent::SkillsChanged { .. }) => {
let Some(sess) = weak_sess.upgrade() else {
break;
};
@@ -1404,7 +1404,7 @@ impl Session {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
file_watcher: Arc<FileWatcher>,
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -1823,7 +1823,7 @@ impl Session {
skills_manager,
plugins_manager: Arc::clone(&plugins_manager),
mcp_manager: Arc::clone(&mcp_manager),
file_watcher,
skills_watcher,
agent_control,
network_proxy,
network_approval: Arc::clone(&network_approval),
@@ -1901,7 +1901,7 @@ impl Session {
}
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_file_watcher_listener();
sess.start_skills_watcher_listener();
// Construct sandbox_state before MCP startup so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {

View File

@@ -80,7 +80,7 @@ pub(crate) async fn run_codex_thread_interactive(
skills_manager: Arc::clone(&parent_session.services.skills_manager),
plugins_manager: Arc::clone(&parent_session.services.plugins_manager),
mcp_manager: Arc::clone(&parent_session.services.mcp_manager),
file_watcher: Arc::clone(&parent_session.services.file_watcher),
skills_watcher: Arc::clone(&parent_session.services.skills_watcher),
conversation_history: initial_history.unwrap_or(InitialHistory::New),
session_source: SessionSource::SubAgent(subagent_source),
agent_control: parent_session.services.agent_control.clone(),

View File

@@ -2359,7 +2359,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
skills_manager,
plugins_manager,
mcp_manager,
Arc::new(FileWatcher::noop()),
Arc::new(SkillsWatcher::noop()),
AgentControl::default(),
)
.await;
@@ -2458,7 +2458,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
.expect("create environment"),
);
let file_watcher = Arc::new(FileWatcher::noop());
let skills_watcher = Arc::new(SkillsWatcher::noop());
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(
McpConnectionManager::new_mcp_connection_manager_for_tests(
@@ -2492,7 +2492,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
agent_control,
network_proxy: None,
network_approval: Arc::clone(&network_approval),
@@ -3257,7 +3257,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
.expect("create environment"),
);
let file_watcher = Arc::new(FileWatcher::noop());
let skills_watcher = Arc::new(SkillsWatcher::noop());
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(
McpConnectionManager::new_mcp_connection_manager_for_tests(
@@ -3291,7 +3291,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
agent_control,
network_proxy: None,
network_approval: Arc::clone(&network_approval),

View File

@@ -435,7 +435,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
true,
));
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let file_watcher = Arc::new(FileWatcher::noop());
let skills_watcher = Arc::new(SkillsWatcher::noop());
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
config,
@@ -444,7 +444,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
conversation_history: InitialHistory::New,
session_source: SessionSource::SubAgent(SubAgentSource::Other(
GUARDIAN_REVIEWER_NAME.to_string(),

View File

@@ -1,13 +1,15 @@
//! Watches skill roots for changes and broadcasts coarse-grained
//! `FileWatcherEvent`s that higher-level components react to on the next turn.
//! Watches subscribed files or directories and routes coarse-grained change
//! notifications to the subscribers that own matching watched paths.
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use notify::Event;
@@ -16,22 +18,162 @@ use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio::time::sleep_until;
use tracing::warn;
use crate::config::Config;
use crate::skills::SkillsManager;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileWatcherEvent {
SkillsChanged { paths: Vec<PathBuf> },
/// Coalesced file change notification for a subscriber.
pub struct FileWatcherEvent {
/// Changed paths delivered in sorted order with duplicates removed.
pub paths: Vec<PathBuf>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
/// Path subscription registered by a [`FileWatcherSubscriber`].
pub struct WatchPath {
/// Root path to watch.
pub path: PathBuf,
/// Whether events below `path` should match recursively.
pub recursive: bool,
}
type SubscriberId = u64;
#[derive(Default)]
struct WatchState {
skills_root_ref_counts: HashMap<PathBuf, usize>,
next_subscriber_id: SubscriberId,
path_ref_counts: HashMap<PathBuf, PathWatchCounts>,
subscribers: HashMap<SubscriberId, SubscriberState>,
}
struct SubscriberState {
watched_paths: HashMap<WatchPath, usize>,
tx: WatchSender,
}
/// Receives coalesced change notifications for a single subscriber.
pub struct Receiver {
inner: Arc<ReceiverInner>,
}
struct WatchSender {
inner: Arc<ReceiverInner>,
}
struct ReceiverInner {
changed_paths: AsyncMutex<BTreeSet<PathBuf>>,
notify: Notify,
sender_count: AtomicUsize,
}
impl Receiver {
/// Waits for the next batch of changed paths, or returns `None` once the
/// corresponding subscriber has been removed and no more events can arrive.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
loop {
let notified = self.inner.notify.notified();
{
let mut changed_paths = self.inner.changed_paths.lock().await;
if !changed_paths.is_empty() {
return Some(FileWatcherEvent {
paths: std::mem::take(&mut *changed_paths).into_iter().collect(),
});
}
if self.inner.sender_count.load(Ordering::Acquire) == 0 {
return None;
}
}
notified.await;
}
}
}
impl WatchSender {
async fn add_changed_paths(&self, paths: &[PathBuf]) {
if paths.is_empty() {
return;
}
let mut changed_paths = self.inner.changed_paths.lock().await;
let previous_len = changed_paths.len();
changed_paths.extend(paths.iter().cloned());
if changed_paths.len() != previous_len {
self.inner.notify.notify_one();
}
}
}
impl Clone for WatchSender {
fn clone(&self) -> Self {
self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl Drop for WatchSender {
fn drop(&mut self) {
if self.inner.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.inner.notify.notify_waiters();
}
}
}
fn watch_channel() -> (WatchSender, Receiver) {
let inner = Arc::new(ReceiverInner {
changed_paths: AsyncMutex::new(BTreeSet::new()),
notify: Notify::new(),
sender_count: AtomicUsize::new(1),
});
(
WatchSender {
inner: Arc::clone(&inner),
},
Receiver { inner },
)
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct PathWatchCounts {
non_recursive: usize,
recursive: usize,
}
impl PathWatchCounts {
fn increment(&mut self, recursive: bool, amount: usize) {
if recursive {
self.recursive += amount;
} else {
self.non_recursive += amount;
}
}
fn decrement(&mut self, recursive: bool, amount: usize) {
if recursive {
self.recursive = self.recursive.saturating_sub(amount);
} else {
self.non_recursive = self.non_recursive.saturating_sub(amount);
}
}
fn effective_mode(self) -> Option<RecursiveMode> {
if self.recursive > 0 {
Some(RecursiveMode::Recursive)
} else if self.non_recursive > 0 {
Some(RecursiveMode::NonRecursive)
} else {
None
}
}
fn is_empty(self) -> bool {
self.non_recursive == 0 && self.recursive == 0
}
}
struct FileWatcherInner {
@@ -39,73 +181,95 @@ struct FileWatcherInner {
watched_paths: HashMap<PathBuf, RecursiveMode>,
}
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
/// Coalesces bursts of paths and emits at most once per interval.
struct ThrottledPaths {
pending: HashSet<PathBuf>,
next_allowed_at: Instant,
/// Coalesces bursts of watch notifications and emits at most once per interval.
pub struct ThrottledWatchReceiver {
rx: Receiver,
interval: Duration,
next_allowed: Option<Instant>,
}
impl ThrottledPaths {
fn new(now: Instant) -> Self {
impl ThrottledWatchReceiver {
/// Creates a throttling wrapper around a raw watcher [`Receiver`].
pub fn new(rx: Receiver, interval: Duration) -> Self {
Self {
pending: HashSet::new(),
next_allowed_at: now,
rx,
interval,
next_allowed: None,
}
}
fn add(&mut self, paths: Vec<PathBuf>) {
self.pending.extend(paths);
}
fn next_deadline(&self, now: Instant) -> Option<Instant> {
(!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at)
}
fn take_ready(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() || now < self.next_allowed_at {
return None;
/// Receives the next event, enforcing the configured minimum delay after
/// the previous emission.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
if let Some(next_allowed) = self.next_allowed {
sleep_until(next_allowed).await;
}
Some(self.take_with_next_allowed(now))
}
fn take_pending(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() {
return None;
let event = self.rx.recv().await;
if event.is_some() {
self.next_allowed = Some(Instant::now() + self.interval);
}
Some(self.take_with_next_allowed(now))
}
fn take_with_next_allowed(&mut self, now: Instant) -> Vec<PathBuf> {
let mut paths: Vec<PathBuf> = self.pending.drain().collect();
paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL;
paths
event
}
}
pub(crate) struct FileWatcher {
inner: Option<Mutex<FileWatcherInner>>,
state: Arc<RwLock<WatchState>>,
tx: broadcast::Sender<FileWatcherEvent>,
/// Handle used to register watched paths for one logical consumer.
pub struct FileWatcherSubscriber {
id: SubscriberId,
file_watcher: Arc<FileWatcher>,
}
pub(crate) struct WatchRegistration {
impl FileWatcherSubscriber {
/// Registers the provided paths for this subscriber and returns an RAII
/// guard that unregisters them on drop.
pub fn register_paths(&self, watched_paths: Vec<WatchPath>) -> WatchRegistration {
let watched_paths = dedupe_watched_paths(watched_paths);
self.file_watcher.register_paths(self.id, &watched_paths);
WatchRegistration {
file_watcher: Arc::downgrade(&self.file_watcher),
subscriber_id: self.id,
watched_paths,
}
}
#[cfg(test)]
pub(crate) fn register_path(&self, path: PathBuf, recursive: bool) -> WatchRegistration {
self.register_paths(vec![WatchPath { path, recursive }])
}
}
impl Drop for FileWatcherSubscriber {
fn drop(&mut self) {
self.file_watcher.remove_subscriber(self.id);
}
}
/// RAII guard for a set of active path registrations.
pub struct WatchRegistration {
file_watcher: std::sync::Weak<FileWatcher>,
roots: Vec<PathBuf>,
subscriber_id: SubscriberId,
watched_paths: Vec<WatchPath>,
}
impl Drop for WatchRegistration {
fn drop(&mut self) {
if let Some(file_watcher) = self.file_watcher.upgrade() {
file_watcher.unregister_roots(&self.roots);
file_watcher.unregister_paths(self.subscriber_id, &self.watched_paths);
}
}
}
/// Multi-subscriber file watcher built on top of `notify`.
pub struct FileWatcher {
inner: Option<Mutex<FileWatcherInner>>,
state: Arc<RwLock<WatchState>>,
}
impl FileWatcher {
pub(crate) fn new(_codex_home: PathBuf) -> notify::Result<Self> {
/// Creates a live filesystem watcher and starts its background event loop
/// on the current Tokio runtime.
pub fn new() -> notify::Result<Self> {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let raw_tx_clone = raw_tx;
let watcher = notify::recommended_watcher(move |res| {
@@ -115,109 +279,101 @@ impl FileWatcher {
watcher,
watched_paths: HashMap::new(),
};
let (tx, _) = broadcast::channel(128);
let state = Arc::new(RwLock::new(WatchState {
skills_root_ref_counts: HashMap::new(),
}));
let state = Arc::new(RwLock::new(WatchState::default()));
let file_watcher = Self {
inner: Some(Mutex::new(inner)),
state: Arc::clone(&state),
tx: tx.clone(),
state,
};
file_watcher.spawn_event_loop(raw_rx, state, tx);
file_watcher.spawn_event_loop(raw_rx);
Ok(file_watcher)
}
pub(crate) fn noop() -> Self {
let (tx, _) = broadcast::channel(1);
/// Creates an inert watcher that only supports test-driven synthetic
/// notifications.
pub fn noop() -> Self {
Self {
inner: None,
state: Arc::new(RwLock::new(WatchState {
skills_root_ref_counts: HashMap::new(),
})),
tx,
state: Arc::new(RwLock::new(WatchState::default())),
}
}
pub(crate) fn subscribe(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.tx.subscribe()
/// Adds a new subscriber and returns both its registration handle and its
/// dedicated event receiver.
pub fn add_subscriber(self: &Arc<Self>) -> (FileWatcherSubscriber, Receiver) {
let (tx, rx) = watch_channel();
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let subscriber_id = state.next_subscriber_id;
state.next_subscriber_id += 1;
state.subscribers.insert(
subscriber_id,
SubscriberState {
watched_paths: HashMap::new(),
tx,
},
);
let subscriber = FileWatcherSubscriber {
id: subscriber_id,
file_watcher: self.clone(),
};
(subscriber, rx)
}
pub(crate) fn register_config(
self: &Arc<Self>,
config: &Config,
skills_manager: &SkillsManager,
) -> WatchRegistration {
let deduped_roots: HashSet<PathBuf> = skills_manager
.skill_roots_for_config(config)
.into_iter()
.map(|root| root.path)
.collect();
let mut registered_roots: Vec<PathBuf> = deduped_roots.into_iter().collect();
registered_roots.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
for root in &registered_roots {
self.register_skills_root(root.clone());
}
fn register_paths(&self, subscriber_id: SubscriberId, watched_paths: &[WatchPath]) {
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
WatchRegistration {
file_watcher: Arc::downgrade(self),
roots: registered_roots,
for watched_path in watched_paths {
{
let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
return;
};
*subscriber
.watched_paths
.entry(watched_path.clone())
.or_default() += 1;
}
let counts = state
.path_ref_counts
.entry(watched_path.path.clone())
.or_default();
let previous_mode = counts.effective_mode();
counts.increment(watched_path.recursive, /*amount*/ 1);
let next_mode = counts.effective_mode();
if previous_mode != next_mode {
self.reconfigure_watch(&watched_path.path, next_mode, &mut inner_guard);
}
}
}
// Bridge `notify`'s callback-based events into the Tokio runtime and
// broadcast coarse-grained change signals to subscribers.
fn spawn_event_loop(
&self,
mut raw_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
state: Arc<RwLock<WatchState>>,
tx: broadcast::Sender<FileWatcherEvent>,
) {
// notify the matching subscribers.
fn spawn_event_loop(&self, mut raw_rx: mpsc::UnboundedReceiver<notify::Result<Event>>) {
if let Ok(handle) = Handle::try_current() {
let state = Arc::clone(&self.state);
handle.spawn(async move {
let now = Instant::now();
let mut skills = ThrottledPaths::new(now);
loop {
let now = Instant::now();
let next_deadline = skills.next_deadline(now);
let timer_deadline = next_deadline
.unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365));
let timer = sleep_until(timer_deadline);
tokio::pin!(timer);
tokio::select! {
res = raw_rx.recv() => {
match res {
Some(Ok(event)) => {
let skills_paths = classify_event(&event, &state);
let now = Instant::now();
skills.add(skills_paths);
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
}
Some(Err(err)) => {
warn!("file watcher error: {err}");
}
None => {
// Flush any pending changes before shutdown so subscribers
// see the latest state.
let now = Instant::now();
if let Some(paths) = skills.take_pending(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
break;
}
match raw_rx.recv().await {
Some(Ok(event)) => {
if !is_mutating_event(&event) {
continue;
}
}
_ = &mut timer => {
let now = Instant::now();
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
if event.paths.is_empty() {
continue;
}
Self::notify_subscribers(&state, &event.paths).await;
}
Some(Err(err)) => {
warn!("file watcher error: {err}");
}
None => break,
}
}
});
@@ -226,127 +382,195 @@ impl FileWatcher {
}
}
fn register_skills_root(&self, root: PathBuf) {
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let count = state
.skills_root_ref_counts
.entry(root.clone())
.or_insert(0);
*count += 1;
if *count == 1 {
self.watch_path(root, RecursiveMode::Recursive);
}
}
fn unregister_roots(&self, roots: &[PathBuf]) {
fn unregister_paths(&self, subscriber_id: SubscriberId, watched_paths: &[WatchPath]) {
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
for root in roots {
let mut should_unwatch = false;
if let Some(count) = state.skills_root_ref_counts.get_mut(root) {
if *count > 1 {
*count -= 1;
} else {
state.skills_root_ref_counts.remove(root);
should_unwatch = true;
for watched_path in watched_paths {
{
let Some(subscriber) = state.subscribers.get_mut(&subscriber_id) else {
return;
};
let Some(subscriber_count) = subscriber.watched_paths.get_mut(watched_path) else {
continue;
};
*subscriber_count = subscriber_count.saturating_sub(1);
if *subscriber_count == 0 {
subscriber.watched_paths.remove(watched_path);
}
}
if !should_unwatch {
continue;
}
let Some(inner) = &self.inner else {
let Some(counts) = state.path_ref_counts.get_mut(&watched_path.path) else {
continue;
};
if inner_guard.is_none() {
let guard = inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
inner_guard = Some(guard);
let previous_mode = counts.effective_mode();
counts.decrement(watched_path.recursive, /*amount*/ 1);
let next_mode = counts.effective_mode();
if counts.is_empty() {
state.path_ref_counts.remove(&watched_path.path);
}
let Some(guard) = inner_guard.as_mut() else {
continue;
};
if guard.watched_paths.remove(root).is_none() {
continue;
}
if let Err(err) = guard.watcher.unwatch(root) {
warn!("failed to unwatch {}: {err}", root.display());
if previous_mode != next_mode {
self.reconfigure_watch(&watched_path.path, next_mode, &mut inner_guard);
}
}
}
fn watch_path(&self, path: PathBuf, mode: RecursiveMode) {
fn remove_subscriber(&self, subscriber_id: SubscriberId) {
let mut state = self
.state
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some(subscriber) = state.subscribers.remove(&subscriber_id) else {
return;
};
let mut inner_guard: Option<std::sync::MutexGuard<'_, FileWatcherInner>> = None;
for (watched_path, count) in subscriber.watched_paths {
let Some(path_counts) = state.path_ref_counts.get_mut(&watched_path.path) else {
continue;
};
let previous_mode = path_counts.effective_mode();
path_counts.decrement(watched_path.recursive, count);
let next_mode = path_counts.effective_mode();
if path_counts.is_empty() {
state.path_ref_counts.remove(&watched_path.path);
}
if previous_mode != next_mode {
self.reconfigure_watch(&watched_path.path, next_mode, &mut inner_guard);
}
}
}
fn reconfigure_watch<'a>(
&'a self,
path: &Path,
next_mode: Option<RecursiveMode>,
inner_guard: &mut Option<std::sync::MutexGuard<'a, FileWatcherInner>>,
) {
let Some(inner) = &self.inner else {
return;
};
if inner_guard.is_none() {
let guard = inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*inner_guard = Some(guard);
}
let Some(guard) = inner_guard.as_mut() else {
return;
};
let existing_mode = guard.watched_paths.get(path).copied();
if existing_mode == next_mode {
return;
}
if existing_mode.is_some() {
if let Err(err) = guard.watcher.unwatch(path) {
warn!("failed to unwatch {}: {err}", path.display());
}
guard.watched_paths.remove(path);
}
let Some(next_mode) = next_mode else {
return;
};
if !path.exists() {
return;
}
let watch_path = path;
let mut guard = inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(existing) = guard.watched_paths.get(&watch_path) {
if *existing == RecursiveMode::Recursive || *existing == mode {
return;
}
if let Err(err) = guard.watcher.unwatch(&watch_path) {
warn!("failed to unwatch {}: {err}", watch_path.display());
}
}
if let Err(err) = guard.watcher.watch(&watch_path, mode) {
warn!("failed to watch {}: {err}", watch_path.display());
if let Err(err) = guard.watcher.watch(path, next_mode) {
warn!("failed to watch {}: {err}", path.display());
return;
}
guard.watched_paths.insert(watch_path, mode);
guard.watched_paths.insert(path.to_path_buf(), next_mode);
}
async fn notify_subscribers(state: &RwLock<WatchState>, event_paths: &[PathBuf]) {
let subscribers_to_notify: Vec<(WatchSender, Vec<PathBuf>)> = {
let state = state
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state
.subscribers
.values()
.filter_map(|subscriber| {
let changed_paths: Vec<PathBuf> = event_paths
.iter()
.filter(|event_path| {
subscriber.watched_paths.keys().any(|watched_path| {
watch_path_matches_event(watched_path, event_path)
})
})
.cloned()
.collect();
(!changed_paths.is_empty()).then_some((subscriber.tx.clone(), changed_paths))
})
.collect()
};
for (subscriber, changed_paths) in subscribers_to_notify {
subscriber.add_changed_paths(&changed_paths).await;
}
}
#[cfg(test)]
pub(crate) async fn send_paths_for_test(&self, paths: Vec<PathBuf>) {
Self::notify_subscribers(&self.state, &paths).await;
}
#[cfg(test)]
pub(crate) fn spawn_event_loop_for_test(
&self,
raw_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
) {
self.spawn_event_loop(raw_rx);
}
#[cfg(test)]
pub(crate) fn watch_counts_for_test(&self, path: &Path) -> Option<(usize, usize)> {
let state = self
.state
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state
.path_ref_counts
.get(path)
.map(|counts| (counts.non_recursive, counts.recursive))
}
}
fn classify_event(event: &Event, state: &RwLock<WatchState>) -> Vec<PathBuf> {
if !matches!(
fn is_mutating_event(event: &Event) -> bool {
matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
) {
return Vec::new();
}
let mut skills_paths = Vec::new();
let skills_roots = match state.read() {
Ok(state) => state
.skills_root_ref_counts
.keys()
.cloned()
.collect::<HashSet<_>>(),
Err(err) => {
let state = err.into_inner();
state
.skills_root_ref_counts
.keys()
.cloned()
.collect::<HashSet<_>>()
}
};
for path in &event.paths {
if is_skills_path(path, &skills_roots) {
skills_paths.push(path.clone());
}
}
skills_paths
)
}
fn is_skills_path(path: &Path, roots: &HashSet<PathBuf>) -> bool {
roots.iter().any(|root| path.starts_with(root))
fn dedupe_watched_paths(mut watched_paths: Vec<WatchPath>) -> Vec<WatchPath> {
watched_paths.sort_unstable_by(|a, b| {
a.path
.as_os_str()
.cmp(b.path.as_os_str())
.then(a.recursive.cmp(&b.recursive))
});
watched_paths.dedup();
watched_paths
}
fn watch_path_matches_event(watched_path: &WatchPath, event_path: &Path) -> bool {
if event_path == watched_path.path {
return true;
}
if watched_path.path.starts_with(event_path) {
return true;
}
if !event_path.starts_with(&watched_path.path) {
return false;
}
watched_path.recursive || event_path.parent() == Some(watched_path.path.as_path())
}
#[cfg(test)]

View File

@@ -1,13 +1,13 @@
use super::*;
use notify::EventKind;
use notify::event::AccessKind;
use notify::event::AccessMode;
use notify::event::CreateKind;
use notify::event::ModifyKind;
use notify::event::RemoveKind;
use pretty_assertions::assert_eq;
use tokio::time::timeout;
const TEST_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);
fn path(name: &str) -> PathBuf {
PathBuf::from(name)
}
@@ -20,147 +20,202 @@ fn notify_event(kind: EventKind, paths: Vec<PathBuf>) -> Event {
event
}
#[test]
fn throttles_and_coalesces_within_interval() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
#[tokio::test]
async fn throttled_receiver_coalesces_within_interval() {
let (tx, rx) = watch_channel();
let mut throttled = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
throttled.add(vec![path("a")]);
let first = throttled.take_ready(start).expect("first emit");
assert_eq!(first, vec![path("a")]);
throttled.add(vec![path("b"), path("c")]);
assert_eq!(throttled.take_ready(start), None);
let second = throttled
.take_ready(start + WATCHER_THROTTLE_INTERVAL)
.expect("coalesced emit");
assert_eq!(second, vec![path("b"), path("c")]);
}
#[test]
fn flushes_pending_on_shutdown() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
throttled.add(vec![path("a")]);
let _ = throttled.take_ready(start).expect("first emit");
throttled.add(vec![path("b")]);
assert_eq!(throttled.take_ready(start), None);
let flushed = throttled
.take_pending(start)
.expect("shutdown flush emits pending paths");
assert_eq!(flushed, vec![path("b")]);
}
#[test]
fn classify_event_filters_to_skills_roots() {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
});
let event = notify_event(
EventKind::Create(CreateKind::Any),
vec![
root.join("demo/SKILL.md"),
path("/tmp/other/not-a-skill.txt"),
],
);
let classified = classify_event(&event, &state);
assert_eq!(classified, vec![root.join("demo/SKILL.md")]);
}
#[test]
fn classify_event_supports_multiple_roots_without_prefix_false_positives() {
let root_a = path("/tmp/skills");
let root_b = path("/tmp/workspace/.codex/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root_a.clone(), 1), (root_b.clone(), 1)]),
});
let event = notify_event(
EventKind::Modify(ModifyKind::Any),
vec![
root_a.join("alpha/SKILL.md"),
path("/tmp/skills-extra/not-under-skills.txt"),
root_b.join("beta/SKILL.md"),
],
);
let classified = classify_event(&event, &state);
tx.add_changed_paths(&[path("a")]).await;
let first = timeout(Duration::from_secs(1), throttled.recv())
.await
.expect("first emit timeout");
assert_eq!(
classified,
vec![root_a.join("alpha/SKILL.md"), root_b.join("beta/SKILL.md")]
first,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);
tx.add_changed_paths(&[path("b"), path("c")]).await;
let blocked = timeout(TEST_THROTTLE_INTERVAL / 2, throttled.recv()).await;
assert_eq!(blocked.is_err(), true);
let second = timeout(TEST_THROTTLE_INTERVAL * 2, throttled.recv())
.await
.expect("second emit timeout");
assert_eq!(
second,
Some(FileWatcherEvent {
paths: vec![path("b"), path("c")],
})
);
}
#[tokio::test]
async fn throttled_receiver_flushes_pending_on_shutdown() {
let (tx, rx) = watch_channel();
let mut throttled = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
tx.add_changed_paths(&[path("a")]).await;
let first = timeout(Duration::from_secs(1), throttled.recv())
.await
.expect("first emit timeout");
assert_eq!(
first,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);
tx.add_changed_paths(&[path("b")]).await;
drop(tx);
let second = timeout(Duration::from_secs(1), throttled.recv())
.await
.expect("shutdown flush timeout");
assert_eq!(
second,
Some(FileWatcherEvent {
paths: vec![path("b")],
})
);
let closed = timeout(Duration::from_secs(1), throttled.recv())
.await
.expect("closed recv timeout");
assert_eq!(closed, None);
}
#[test]
fn is_mutating_event_filters_non_mutating_event_kinds() {
assert_eq!(
is_mutating_event(&notify_event(
EventKind::Create(CreateKind::Any),
vec![path("/tmp/created")]
)),
true
);
assert_eq!(
is_mutating_event(&notify_event(
EventKind::Modify(ModifyKind::Any),
vec![path("/tmp/modified")]
)),
true
);
assert_eq!(
is_mutating_event(&notify_event(
EventKind::Access(AccessKind::Open(AccessMode::Any)),
vec![path("/tmp/accessed")]
)),
false
);
}
#[test]
fn classify_event_ignores_non_mutating_event_kinds() {
let root = path("/tmp/skills");
let state = RwLock::new(WatchState {
skills_root_ref_counts: HashMap::from([(root.clone(), 1)]),
});
let path = root.join("demo/SKILL.md");
let access_event = notify_event(
EventKind::Access(AccessKind::Open(AccessMode::Any)),
vec![path.clone()],
);
assert_eq!(classify_event(&access_event, &state), Vec::<PathBuf>::new());
let any_event = notify_event(EventKind::Any, vec![path.clone()]);
assert_eq!(classify_event(&any_event, &state), Vec::<PathBuf>::new());
let other_event = notify_event(EventKind::Other, vec![path]);
assert_eq!(classify_event(&other_event, &state), Vec::<PathBuf>::new());
}
#[test]
fn register_skills_root_dedupes_state_entries() {
let watcher = FileWatcher::noop();
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
watcher.register_skills_root(root);
watcher.register_skills_root(path("/tmp/other-skills"));
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.len(), 2);
}
#[test]
fn watch_registration_drop_unregisters_roots() {
fn register_dedupes_by_path_and_scope() {
let watcher = Arc::new(FileWatcher::noop());
let root = path("/tmp/skills");
watcher.register_skills_root(root.clone());
let registration = WatchRegistration {
file_watcher: Arc::downgrade(&watcher),
roots: vec![root],
};
let (subscriber, _rx) = watcher.add_subscriber();
let _first = subscriber.register_path(path("/tmp/skills"), false);
let _second = subscriber.register_path(path("/tmp/skills"), false);
let _third = subscriber.register_path(path("/tmp/skills"), true);
let _fourth = subscriber.register_path(path("/tmp/other-skills"), true);
assert_eq!(
watcher.watch_counts_for_test(&path("/tmp/skills")),
Some((2, 1))
);
assert_eq!(
watcher.watch_counts_for_test(&path("/tmp/other-skills")),
Some((0, 1))
);
}
#[test]
fn watch_registration_drop_unregisters_paths() {
let watcher = Arc::new(FileWatcher::noop());
let (subscriber, _rx) = watcher.add_subscriber();
let registration = subscriber.register_path(path("/tmp/skills"), true);
drop(registration);
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.len(), 0);
assert_eq!(watcher.watch_counts_for_test(&path("/tmp/skills")), None);
}
#[test]
fn subscriber_drop_unregisters_paths() {
let watcher = Arc::new(FileWatcher::noop());
let registration = {
let (subscriber, _rx) = watcher.add_subscriber();
subscriber.register_path(path("/tmp/skills"), true)
};
assert_eq!(watcher.watch_counts_for_test(&path("/tmp/skills")), None);
drop(registration);
}
#[tokio::test]
async fn receiver_closes_when_subscriber_drops() {
let watcher = Arc::new(FileWatcher::noop());
let (subscriber, mut rx) = watcher.add_subscriber();
drop(subscriber);
let closed = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("closed recv timeout");
assert_eq!(closed, None);
}
#[test]
fn recursive_registration_downgrades_to_non_recursive_after_drop() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let root = temp_dir.path().join("watched-dir");
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new().expect("watcher"));
let (subscriber, _rx) = watcher.add_subscriber();
let non_recursive = subscriber.register_path(root.clone(), false);
let recursive = subscriber.register_path(root.clone(), true);
{
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(
inner.watched_paths.get(&root),
Some(&RecursiveMode::Recursive)
);
}
drop(recursive);
{
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(
inner.watched_paths.get(&root),
Some(&RecursiveMode::NonRecursive)
);
}
drop(non_recursive);
}
#[test]
fn unregister_holds_state_lock_until_unwatch_finishes() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let root = temp_dir.path().join("skills");
let root = temp_dir.path().join("watched-dir");
std::fs::create_dir(&root).expect("create root");
let watcher = Arc::new(FileWatcher::new(temp_dir.path().to_path_buf()).expect("watcher"));
watcher.register_skills_root(root.clone());
let watcher = Arc::new(FileWatcher::new().expect("watcher"));
let (unregister_subscriber, _unregister_rx) = watcher.add_subscriber();
let (register_subscriber, _register_rx) = watcher.add_subscriber();
let registration = unregister_subscriber.register_path(root.clone(), true);
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner_guard = inner.lock().expect("inner lock");
let unregister_watcher = Arc::clone(&watcher);
let unregister_root = root.clone();
let unregister_thread = std::thread::spawn(move || {
unregister_watcher.unregister_roots(&[unregister_root]);
drop(registration);
});
let state_lock_observed = (0..100).any(|_| {
@@ -172,75 +227,128 @@ fn unregister_holds_state_lock_until_unwatch_finishes() {
});
assert_eq!(state_lock_observed, true);
let register_watcher = Arc::clone(&watcher);
let register_root = root.clone();
let register_thread = std::thread::spawn(move || {
register_watcher.register_skills_root(register_root);
let registration = register_subscriber.register_path(register_root, false);
(register_subscriber, registration)
});
drop(inner_guard);
unregister_thread.join().expect("unregister join");
register_thread.join().expect("register join");
let (register_subscriber, non_recursive) = register_thread.join().expect("register join");
let state = watcher.state.read().expect("state lock");
assert_eq!(state.skills_root_ref_counts.get(&root), Some(&1));
drop(state);
assert_eq!(watcher.watch_counts_for_test(&root), Some((1, 0)));
let inner = watcher.inner.as_ref().expect("watcher inner");
let inner = inner.lock().expect("inner lock");
assert_eq!(
inner.watched_paths.get(&root),
Some(&RecursiveMode::Recursive)
Some(&RecursiveMode::NonRecursive)
);
drop(inner);
drop(non_recursive);
drop(register_subscriber);
}
#[tokio::test]
async fn matching_subscribers_are_notified() {
let watcher = Arc::new(FileWatcher::noop());
let (skills_subscriber, skills_rx) = watcher.add_subscriber();
let (plugins_subscriber, plugins_rx) = watcher.add_subscriber();
let _skills = skills_subscriber.register_path(path("/tmp/skills"), true);
let _plugins = plugins_subscriber.register_path(path("/tmp/plugins"), true);
let mut skills_rx = ThrottledWatchReceiver::new(skills_rx, TEST_THROTTLE_INTERVAL);
let mut plugins_rx = ThrottledWatchReceiver::new(plugins_rx, TEST_THROTTLE_INTERVAL);
watcher
.send_paths_for_test(vec![path("/tmp/skills/rust/SKILL.md")])
.await;
let skills_event = timeout(Duration::from_secs(1), skills_rx.recv())
.await
.expect("skills change timeout")
.expect("skills change");
assert_eq!(
skills_event,
FileWatcherEvent {
paths: vec![path("/tmp/skills/rust/SKILL.md")],
}
);
let plugins_event = timeout(TEST_THROTTLE_INTERVAL, plugins_rx.recv()).await;
assert_eq!(plugins_event.is_err(), true);
}
#[tokio::test]
async fn non_recursive_watch_ignores_grandchildren() {
let watcher = Arc::new(FileWatcher::noop());
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills"), false);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
watcher
.send_paths_for_test(vec![path("/tmp/skills/nested/SKILL.md")])
.await;
let event = timeout(TEST_THROTTLE_INTERVAL, rx.recv()).await;
assert_eq!(event.is_err(), true);
}
#[tokio::test]
async fn ancestor_events_notify_child_watches() {
let watcher = Arc::new(FileWatcher::noop());
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills/rust/SKILL.md"), false);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
watcher.send_paths_for_test(vec![path("/tmp/skills")]).await;
let event = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("ancestor event timeout")
.expect("ancestor event");
assert_eq!(
event,
FileWatcherEvent {
paths: vec![path("/tmp/skills")],
}
);
}
#[tokio::test]
async fn spawn_event_loop_flushes_pending_changes_on_shutdown() {
let watcher = FileWatcher::noop();
let root = path("/tmp/skills");
{
let mut state = watcher.state.write().expect("state lock");
state.skills_root_ref_counts.insert(root.clone(), 1);
}
async fn spawn_event_loop_filters_non_mutating_events() {
let watcher = Arc::new(FileWatcher::noop());
let (subscriber, rx) = watcher.add_subscriber();
let _registration = subscriber.register_path(path("/tmp/skills"), true);
let mut rx = ThrottledWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (tx, mut rx) = broadcast::channel(8);
watcher.spawn_event_loop(raw_rx, Arc::clone(&watcher.state), tx);
watcher.spawn_event_loop_for_test(raw_rx);
raw_tx
.send(Ok(notify_event(
EventKind::Access(AccessKind::Open(AccessMode::Any)),
vec![path("/tmp/skills/SKILL.md")],
)))
.expect("send access event");
let blocked = timeout(TEST_THROTTLE_INTERVAL, rx.recv()).await;
assert_eq!(blocked.is_err(), true);
raw_tx
.send(Ok(notify_event(
EventKind::Create(CreateKind::File),
vec![root.join("a/SKILL.md")],
vec![path("/tmp/skills/SKILL.md")],
)))
.expect("send first event");
let first = timeout(Duration::from_secs(2), rx.recv())
.expect("send create event");
let event = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("first watcher event")
.expect("broadcast recv first");
.expect("create event timeout")
.expect("create event");
assert_eq!(
first,
FileWatcherEvent::SkillsChanged {
paths: vec![root.join("a/SKILL.md")]
}
);
raw_tx
.send(Ok(notify_event(
EventKind::Remove(RemoveKind::File),
vec![root.join("b/SKILL.md")],
)))
.expect("send second event");
drop(raw_tx);
let second = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("second watcher event")
.expect("broadcast recv second");
assert_eq!(
second,
FileWatcherEvent::SkillsChanged {
paths: vec![root.join("b/SKILL.md")]
event,
FileWatcherEvent {
paths: vec![path("/tmp/skills/SKILL.md")],
}
);
}

View File

@@ -39,7 +39,7 @@ pub mod exec;
pub mod exec_env;
mod exec_policy;
pub mod external_agent_config;
mod file_watcher;
pub mod file_watcher;
mod flags;
pub mod git_info;
mod guardian;
@@ -72,6 +72,7 @@ pub mod sandboxing;
mod session_prefix;
mod session_startup_prewarm;
mod shell_detect;
mod skills_watcher;
mod stream_events_utils;
pub mod test_support;
mod text_encoding;
@@ -89,6 +90,9 @@ pub use model_provider_info::OPENAI_PROVIDER_ID;
pub use model_provider_info::WireApi;
pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
pub use skills_watcher::SkillsWatcherEvent;
#[deprecated(note = "use SkillsWatcherEvent")]
pub type FileWatcherEvent = SkillsWatcherEvent;
mod event_mapping;
mod response_debug_context;
pub mod review_format;
@@ -179,7 +183,6 @@ pub use exec_policy::ExecPolicyError;
pub use exec_policy::check_execpolicy_for_warnings;
pub use exec_policy::format_exec_policy_error_with_source;
pub use exec_policy::load_exec_policy;
pub use file_watcher::FileWatcherEvent;
pub use safety::get_platform_sandbox;
pub use tools::spec::parse_tool_input_schema;
pub use turn_metadata::build_turn_metadata_header;

View File

@@ -0,0 +1,116 @@
//! Skills-specific watcher built on top of the generic [`FileWatcher`].
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tracing::warn;
use crate::config::Config;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherSubscriber;
use crate::file_watcher::Receiver;
use crate::file_watcher::ThrottledWatchReceiver;
use crate::file_watcher::WatchPath;
use crate::file_watcher::WatchRegistration;
use crate::skills::SkillsManager;
#[cfg(not(test))]
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(test)]
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SkillsWatcherEvent {
SkillsChanged { paths: Vec<PathBuf> },
}
pub(crate) struct SkillsWatcher {
subscriber: FileWatcherSubscriber,
tx: broadcast::Sender<SkillsWatcherEvent>,
}
impl SkillsWatcher {
pub(crate) fn new(file_watcher: &Arc<FileWatcher>) -> Self {
let (subscriber, rx) = file_watcher.add_subscriber();
let (tx, _) = broadcast::channel(128);
let skills_watcher = Self {
subscriber,
tx: tx.clone(),
};
Self::spawn_event_loop(rx, tx);
skills_watcher
}
pub(crate) fn noop() -> Self {
Self::new(&Arc::new(FileWatcher::noop()))
}
pub(crate) fn subscribe(&self) -> broadcast::Receiver<SkillsWatcherEvent> {
self.tx.subscribe()
}
pub(crate) fn register_config(
&self,
config: &Config,
skills_manager: &SkillsManager,
) -> WatchRegistration {
let roots = skills_manager
.skill_roots_for_config(config)
.into_iter()
.map(|root| WatchPath {
path: root.path,
recursive: true,
})
.collect();
self.subscriber.register_paths(roots)
}
fn spawn_event_loop(rx: Receiver, tx: broadcast::Sender<SkillsWatcherEvent>) {
let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL);
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
while let Some(event) = rx.recv().await {
let _ = tx.send(SkillsWatcherEvent::SkillsChanged { paths: event.paths });
}
});
} else {
warn!("skills watcher listener skipped: no Tokio runtime available");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tokio::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn forwards_file_watcher_events() {
let file_watcher = Arc::new(FileWatcher::noop());
let skills_watcher = SkillsWatcher::new(&file_watcher);
let mut rx = skills_watcher.subscribe();
let _registration = skills_watcher
.subscriber
.register_path(PathBuf::from("/tmp/skill"), true);
file_watcher
.send_paths_for_test(vec![PathBuf::from("/tmp/skill/SKILL.md")])
.await;
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("skills watcher event")
.expect("broadcast recv");
assert_eq!(
event,
SkillsWatcherEvent::SkillsChanged {
paths: vec![PathBuf::from("/tmp/skill/SKILL.md")],
}
);
}
}

View File

@@ -8,12 +8,12 @@ use crate::analytics_client::AnalyticsEventsClient;
use crate::client::ModelClient;
use crate::config::StartedNetworkProxy;
use crate::exec_policy::ExecPolicyManager;
use crate::file_watcher::FileWatcher;
use crate::mcp::McpManager;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::models_manager::manager::ModelsManager;
use crate::plugins::PluginsManager;
use crate::skills::SkillsManager;
use crate::skills_watcher::SkillsWatcher;
use crate::state_db::StateDbHandle;
use crate::tools::code_mode::CodeModeService;
use crate::tools::network_approval::NetworkApprovalService;
@@ -54,7 +54,7 @@ pub(crate) struct SessionServices {
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) skills_watcher: Arc<SkillsWatcher>,
pub(crate) agent_control: AgentControl,
pub(crate) network_proxy: Option<StartedNetworkProxy>,
pub(crate) network_approval: Arc<NetworkApprovalService>,

View File

@@ -12,7 +12,6 @@ use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::mcp::McpManager;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::manager::ModelsManager;
@@ -24,6 +23,8 @@ use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillsManager;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelPreset;
@@ -76,32 +77,33 @@ impl Drop for TempCodexHomeGuard {
}
}
fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -> Arc<FileWatcher> {
fn build_skills_watcher(skills_manager: Arc<SkillsManager>) -> Arc<SkillsWatcher> {
if should_use_test_thread_manager_behavior()
&& let Ok(handle) = Handle::try_current()
&& handle.runtime_flavor() == RuntimeFlavor::CurrentThread
{
// The real watcher spins background tasks that can starve the
// current-thread test runtime and cause event waits to time out.
warn!("using noop file watcher under current-thread test runtime");
return Arc::new(FileWatcher::noop());
warn!("using noop skills watcher under current-thread test runtime");
return Arc::new(SkillsWatcher::noop());
}
let file_watcher = match FileWatcher::new(codex_home) {
let file_watcher = match FileWatcher::new() {
Ok(file_watcher) => Arc::new(file_watcher),
Err(err) => {
warn!("failed to initialize file watcher: {err}");
Arc::new(FileWatcher::noop())
}
};
let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher));
let mut rx = file_watcher.subscribe();
let mut rx = skills_watcher.subscribe();
let skills_manager = Arc::clone(&skills_manager);
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
Ok(SkillsWatcherEvent::SkillsChanged { .. }) => {
skills_manager.clear_cache();
}
Err(broadcast::error::RecvError::Closed) => break,
@@ -110,10 +112,10 @@ fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -
}
});
} else {
warn!("file watcher listener skipped: no Tokio runtime available");
warn!("skills watcher listener skipped: no Tokio runtime available");
}
file_watcher
skills_watcher
}
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
@@ -155,7 +157,7 @@ pub(crate) struct ThreadManagerState {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
file_watcher: Arc<FileWatcher>,
skills_watcher: Arc<SkillsWatcher>,
session_source: SessionSource,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
@@ -187,7 +189,7 @@ impl ThreadManager {
config.bundled_skills_enabled(),
restriction_product,
));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -202,7 +204,7 @@ impl ThreadManager {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
auth_manager,
session_source,
ops_log: should_use_test_thread_manager_behavior()
@@ -253,7 +255,7 @@ impl ThreadManager {
/*bundled_skills_enabled*/ true,
restriction_product,
));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -266,7 +268,7 @@ impl ThreadManager {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
auth_manager,
session_source: SessionSource::Exec,
ops_log: should_use_test_thread_manager_behavior()
@@ -296,10 +298,6 @@ impl ThreadManager {
self.state.mcp_manager.clone()
}
pub fn subscribe_file_watcher(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.state.file_watcher.subscribe()
}
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
self.state.models_manager.clone()
}
@@ -760,7 +758,7 @@ impl ThreadManagerState {
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
let watch_registration = self
.file_watcher
.skills_watcher
.register_config(&config, self.skills_manager.as_ref());
let CodexSpawnOk {
codex, thread_id, ..
@@ -771,7 +769,7 @@ impl ThreadManagerState {
skills_manager: Arc::clone(&self.skills_manager),
plugins_manager: Arc::clone(&self.plugins_manager),
mcp_manager: Arc::clone(&self.mcp_manager),
file_watcher: Arc::clone(&self.file_watcher),
skills_watcher: Arc::clone(&self.skills_watcher),
conversation_history: initial_history,
session_source,
agent_control,

View File

@@ -110,14 +110,12 @@ host-qualified nightly filename to the plain `nightly-2025-09-18` channel when
needed, and then invokes `cargo-dylint dylint --lib-path <that-library>` with
the repo's default `DYLINT_RUSTFLAGS` and `CARGO_INCREMENTAL=0` settings.
The checked-in `run-prebuilt-linter.sh` wrapper now invokes the package
entrypoint directly and only layers on Codex-specific defaults like
`--manifest-path`, `--workspace`, and `--no-deps` when the caller does not
choose something narrower. The packaged runner now owns the bundled
`cargo-dylint`, library discovery and filename normalization, default
`DYLINT_RUSTFLAGS`, `CARGO_INCREMENTAL=0`, and `RUSTUP_HOME` inference. The
shell wrapper still makes sure the `rustup` shims stay ahead of any direct
toolchain `cargo` binary on `PATH`, because `cargo-dylint` still expects that.
The checked-in `run-prebuilt-linter.sh` wrapper uses the fetched package
contents directly so the current checked-in alpha artifact works the same way.
It also makes sure the `rustup` shims stay ahead of any direct toolchain
`cargo` binary on `PATH`, and sets `RUSTUP_HOME` from `rustup show home` when
the environment does not already provide it. That extra `RUSTUP_HOME` export is
required for the current Windows Dylint driver path.
If you are changing the lint crate itself, use the source-build wrapper:
@@ -142,11 +140,11 @@ default:
./tools/argument-comment-lint/run-prebuilt-linter.sh -p codex-core
```
The packaged runner does that by setting `DYLINT_RUSTFLAGS`, and it leaves an
explicit existing setting alone. It also defaults `CARGO_INCREMENTAL=0` unless
you have already set it, because the current nightly Dylint flow can otherwise
hit a rustc incremental compilation ICE locally. To override that behavior for
an ad hoc run:
The wrapper does that by setting `DYLINT_RUSTFLAGS`, and it leaves an explicit
existing setting alone. It also defaults `CARGO_INCREMENTAL=0` unless you have
already set it, because the current nightly Dylint flow can otherwise hit a
rustc incremental compilation ICE locally. To override that behavior for an ad
hoc run:
```bash
DYLINT_RUSTFLAGS="-A uncommented-anonymous-literal-argument" \

View File

@@ -4,73 +4,73 @@
"name": "argument-comment-lint",
"platforms": {
"macos-aarch64": {
"size": 3414909,
"size": 3402747,
"hash": "blake3",
"digest": "2a455fa67fa862a36ac4b8b8d8cb57bb3397eafe0579a1fd1a98a96fe7238fc5",
"digest": "a11669d2f184a2c6f226cedce1bf10d1ec478d53413c42fe80d17dd873fdb2d7",
"format": "tar.gz",
"path": "argument-comment-lint/bin/argument-comment-lint",
"providers": [
{
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.6/argument-comment-lint-aarch64-apple-darwin.tar.gz"
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.2/argument-comment-lint-aarch64-apple-darwin.tar.gz"
},
{
"type": "github-release",
"repo": "https://github.com/openai/codex",
"tag": "rust-v0.117.0-alpha.6",
"tag": "rust-v0.117.0-alpha.2",
"name": "argument-comment-lint-aarch64-apple-darwin.tar.gz"
}
]
},
"linux-x86_64": {
"size": 3882120,
"size": 3869711,
"hash": "blake3",
"digest": "8c50468cd97d70e029638a478bf977ff2fa150ba188e2597b615233376064c8a",
"digest": "1015f4ba07d57edc5ec79c8f6709ddc1516f64c903e909820437a4b89d8d853a",
"format": "tar.gz",
"path": "argument-comment-lint/bin/argument-comment-lint",
"providers": [
{
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.6/argument-comment-lint-x86_64-unknown-linux-gnu.tar.gz"
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.2/argument-comment-lint-x86_64-unknown-linux-gnu.tar.gz"
},
{
"type": "github-release",
"repo": "https://github.com/openai/codex",
"tag": "rust-v0.117.0-alpha.6",
"tag": "rust-v0.117.0-alpha.2",
"name": "argument-comment-lint-x86_64-unknown-linux-gnu.tar.gz"
}
]
},
"linux-aarch64": {
"size": 3772016,
"size": 3759446,
"hash": "blake3",
"digest": "390169281f19ffa0583bc52412040daea6791300f1d6278ad2c753e58b902554",
"digest": "91f2a31e6390ca728ad09ae1aa6b6f379c67d996efcc22956001df89f068af5b",
"format": "tar.gz",
"path": "argument-comment-lint/bin/argument-comment-lint",
"providers": [
{
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.6/argument-comment-lint-aarch64-unknown-linux-gnu.tar.gz"
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.2/argument-comment-lint-aarch64-unknown-linux-gnu.tar.gz"
},
{
"type": "github-release",
"repo": "https://github.com/openai/codex",
"tag": "rust-v0.117.0-alpha.6",
"tag": "rust-v0.117.0-alpha.2",
"name": "argument-comment-lint-aarch64-unknown-linux-gnu.tar.gz"
}
]
},
"windows-x86_64": {
"size": 3255810,
"size": 3244599,
"hash": "blake3",
"digest": "dd2c9478d16ea8ed550438100e84b3c8caf496834004a90a291249f01b09c343",
"digest": "dc711c6d85b1cabbe52447dda3872deb20c2e64b155da8be0ecb207c7c391683",
"format": "zip",
"path": "argument-comment-lint/bin/argument-comment-lint.exe",
"providers": [
{
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.6/argument-comment-lint-x86_64-pc-windows-msvc.zip"
"url": "https://github.com/openai/codex/releases/download/rust-v0.117.0-alpha.2/argument-comment-lint-x86_64-pc-windows-msvc.zip"
},
{
"type": "github-release",
"repo": "https://github.com/openai/codex",
"tag": "rust-v0.117.0-alpha.6",
"tag": "rust-v0.117.0-alpha.2",
"name": "argument-comment-lint-x86_64-pc-windows-msvc.zip"
}
]

View File

@@ -8,6 +8,7 @@ dotslash_manifest="$repo_root/tools/argument-comment-lint/argument-comment-lint"
has_manifest_path=false
has_package_selection=false
has_library_selection=false
has_no_deps=false
expect_value=""
@@ -20,6 +21,9 @@ for arg in "$@"; do
package_selection)
has_package_selection=true
;;
library_selection)
has_library_selection=true
;;
esac
expect_value=""
continue
@@ -41,6 +45,12 @@ for arg in "$@"; do
--package=*)
has_package_selection=true
;;
--lib|--lib-path)
expect_value="library_selection"
;;
--lib=*|--lib-path=*)
has_library_selection=true
;;
--workspace)
has_package_selection=true
;;
@@ -72,8 +82,6 @@ EOF
fi
if command -v rustup >/dev/null 2>&1; then
# cargo-dylint still expects the rustup shims to win over direct toolchain
# cargo binaries on PATH.
rustup_bin_dir="$(dirname "$(command -v rustup)")"
path_entries=()
while IFS= read -r entry; do
@@ -84,6 +92,73 @@ if command -v rustup >/dev/null 2>&1; then
PATH+=":$(IFS=:; echo "${path_entries[*]}")"
fi
export PATH
if [[ -z "${RUSTUP_HOME:-}" ]]; then
rustup_home="$(rustup show home 2>/dev/null || true)"
if [[ -n "$rustup_home" ]]; then
export RUSTUP_HOME="$rustup_home"
fi
fi
fi
exec "$dotslash_manifest" "${lint_args[@]}"
package_entrypoint="$(dotslash -- fetch "$dotslash_manifest")"
bin_dir="$(cd "$(dirname "$package_entrypoint")" && pwd)"
package_root="$(cd "$bin_dir/.." && pwd)"
library_dir="$package_root/lib"
cargo_dylint="$bin_dir/cargo-dylint"
if [[ ! -x "$cargo_dylint" ]]; then
cargo_dylint="$bin_dir/cargo-dylint.exe"
fi
if [[ ! -x "$cargo_dylint" ]]; then
echo "bundled cargo-dylint executable not found under $bin_dir" >&2
exit 1
fi
shopt -s nullglob
libraries=("$library_dir"/*@*)
shopt -u nullglob
if [[ ${#libraries[@]} -eq 0 ]]; then
echo "no packaged Dylint library found in $library_dir" >&2
exit 1
fi
if [[ ${#libraries[@]} -ne 1 ]]; then
echo "expected exactly one packaged Dylint library in $library_dir" >&2
exit 1
fi
library_path="${libraries[0]}"
library_filename="$(basename "$library_path")"
normalized_library_path="$library_path"
library_ext=".${library_filename##*.}"
library_stem="${library_filename%.*}"
if [[ "$library_stem" =~ ^(.+@nightly-[0-9]{4}-[0-9]{2}-[0-9]{2})-.+$ ]]; then
normalized_library_filename="${BASH_REMATCH[1]}$library_ext"
temp_dir="$(mktemp -d "${TMPDIR:-/tmp}/argument-comment-lint.XXXXXX")"
normalized_library_path="$temp_dir/$normalized_library_filename"
cp "$library_path" "$normalized_library_path"
fi
if [[ -n "${DYLINT_RUSTFLAGS:-}" ]]; then
if [[ "$DYLINT_RUSTFLAGS" != *"-D uncommented-anonymous-literal-argument"* ]]; then
DYLINT_RUSTFLAGS+=" -D uncommented-anonymous-literal-argument"
fi
if [[ "$DYLINT_RUSTFLAGS" != *"-A unknown_lints"* ]]; then
DYLINT_RUSTFLAGS+=" -A unknown_lints"
fi
else
DYLINT_RUSTFLAGS="-D uncommented-anonymous-literal-argument -A unknown_lints"
fi
export DYLINT_RUSTFLAGS
if [[ -z "${CARGO_INCREMENTAL:-}" ]]; then
export CARGO_INCREMENTAL=0
fi
command=("$cargo_dylint" dylint --lib-path "$normalized_library_path")
if [[ "$has_library_selection" == false ]]; then
command+=(--all)
fi
command+=("${lint_args[@]}")
exec "${command[@]}"