core: Make FileWatcher reusable (#15093)

### Summary
Make `FileWatcher` a reusable core component which can be built upon.
Extract skills-related logic into a separate `SkillWatcher`.
Introduce a composable `ThrottledWatchReceiver` to throttle filesystem
events, coalescing affected paths among them.

### Testing
Updated existing unit tests.
This commit is contained in:
Ruslan Nigmatullin
2026-03-24 11:04:47 -07:00
committed by GitHub
parent bb7e9a8171
commit daf5e584c2
10 changed files with 877 additions and 429 deletions

View File

@@ -12,7 +12,6 @@ use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::mcp::McpManager;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::manager::ModelsManager;
@@ -24,6 +23,8 @@ use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillsManager;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::tasks::interrupted_turn_history_marker;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
@@ -83,32 +84,33 @@ impl Drop for TempCodexHomeGuard {
}
}
fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -> Arc<FileWatcher> {
fn build_skills_watcher(skills_manager: Arc<SkillsManager>) -> Arc<SkillsWatcher> {
if should_use_test_thread_manager_behavior()
&& let Ok(handle) = Handle::try_current()
&& handle.runtime_flavor() == RuntimeFlavor::CurrentThread
{
// The real watcher spins background tasks that can starve the
// current-thread test runtime and cause event waits to time out.
warn!("using noop file watcher under current-thread test runtime");
return Arc::new(FileWatcher::noop());
warn!("using noop skills watcher under current-thread test runtime");
return Arc::new(SkillsWatcher::noop());
}
let file_watcher = match FileWatcher::new(codex_home) {
let file_watcher = match FileWatcher::new() {
Ok(file_watcher) => Arc::new(file_watcher),
Err(err) => {
warn!("failed to initialize file watcher: {err}");
Arc::new(FileWatcher::noop())
}
};
let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher));
let mut rx = file_watcher.subscribe();
let mut rx = skills_watcher.subscribe();
let skills_manager = Arc::clone(&skills_manager);
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
Ok(SkillsWatcherEvent::SkillsChanged { .. }) => {
skills_manager.clear_cache();
}
Err(broadcast::error::RecvError::Closed) => break,
@@ -117,10 +119,10 @@ fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -
}
});
} else {
warn!("file watcher listener skipped: no Tokio runtime available");
warn!("skills watcher listener skipped: no Tokio runtime available");
}
file_watcher
skills_watcher
}
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
@@ -201,7 +203,7 @@ pub(crate) struct ThreadManagerState {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
file_watcher: Arc<FileWatcher>,
skills_watcher: Arc<SkillsWatcher>,
session_source: SessionSource,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
@@ -233,7 +235,7 @@ impl ThreadManager {
config.bundled_skills_enabled(),
restriction_product,
));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -248,7 +250,7 @@ impl ThreadManager {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
auth_manager,
session_source,
ops_log: should_use_test_thread_manager_behavior()
@@ -299,7 +301,7 @@ impl ThreadManager {
/*bundled_skills_enabled*/ true,
restriction_product,
));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -312,7 +314,7 @@ impl ThreadManager {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
auth_manager,
session_source: SessionSource::Exec,
ops_log: should_use_test_thread_manager_behavior()
@@ -342,10 +344,6 @@ impl ThreadManager {
self.state.mcp_manager.clone()
}
pub fn subscribe_file_watcher(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.state.file_watcher.subscribe()
}
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
self.state.models_manager.clone()
}
@@ -838,7 +836,7 @@ impl ThreadManagerState {
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
let watch_registration = self
.file_watcher
.skills_watcher
.register_config(&config, self.skills_manager.as_ref());
let CodexSpawnOk {
codex, thread_id, ..
@@ -849,7 +847,7 @@ impl ThreadManagerState {
skills_manager: Arc::clone(&self.skills_manager),
plugins_manager: Arc::clone(&self.plugins_manager),
mcp_manager: Arc::clone(&self.mcp_manager),
file_watcher: Arc::clone(&self.file_watcher),
skills_watcher: Arc::clone(&self.skills_watcher),
conversation_history: initial_history,
session_source,
agent_control,