mirror of
https://github.com/openai/codex.git
synced 2026-05-19 21:01:20 +03:00
Compare commits
6 Commits
jif/code-m
...
xli-codex/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1ec596a53 | ||
|
|
bdf075769d | ||
|
|
d923e47cb9 | ||
|
|
34f09a0ca1 | ||
|
|
ed7a129ecc | ||
|
|
0029bf63be |
@@ -77,6 +77,7 @@ macro_rules! experimental_type_entry {
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ClientRequestSerializationScope {
|
||||
Global(&'static str),
|
||||
GlobalSharedRead(&'static str),
|
||||
Thread { thread_id: String },
|
||||
ThreadPath { path: PathBuf },
|
||||
CommandExecProcess { process_id: String },
|
||||
@@ -93,6 +94,9 @@ macro_rules! serialization_scope_expr {
|
||||
($actual_params:ident, global($key:literal)) => {
|
||||
Some(ClientRequestSerializationScope::Global($key))
|
||||
};
|
||||
($actual_params:ident, global_shared_read($key:literal)) => {
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead($key))
|
||||
};
|
||||
($actual_params:ident, thread_id($params:ident . $field:ident)) => {
|
||||
Some(ClientRequestSerializationScope::Thread {
|
||||
thread_id: $actual_params.$field.clone(),
|
||||
@@ -585,7 +589,7 @@ client_request_definitions! {
|
||||
},
|
||||
SkillsList => "skills/list" {
|
||||
params: v2::SkillsListParams,
|
||||
serialization: global("config"),
|
||||
serialization: global_shared_read("config"),
|
||||
response: v2::SkillsListResponse,
|
||||
},
|
||||
HooksList => "hooks/list" {
|
||||
@@ -610,7 +614,7 @@ client_request_definitions! {
|
||||
},
|
||||
PluginList => "plugin/list" {
|
||||
params: v2::PluginListParams,
|
||||
serialization: global("config"),
|
||||
serialization: global_shared_read("config"),
|
||||
response: v2::PluginListResponse,
|
||||
},
|
||||
PluginRead => "plugin/read" {
|
||||
@@ -947,7 +951,7 @@ client_request_definitions! {
|
||||
|
||||
ConfigRead => "config/read" {
|
||||
params: v2::ConfigReadParams,
|
||||
serialization: global("config"),
|
||||
serialization: global_shared_read("config"),
|
||||
response: v2::ConfigReadResponse,
|
||||
},
|
||||
ExternalAgentConfigDetect => "externalAgentConfig/detect" {
|
||||
@@ -1655,6 +1659,28 @@ mod tests {
|
||||
Some(ClientRequestSerializationScope::Global("config"))
|
||||
);
|
||||
|
||||
let skills_list = ClientRequest::SkillsList {
|
||||
request_id: request_id(),
|
||||
params: v2::SkillsListParams {
|
||||
cwds: Vec::new(),
|
||||
force_reload: false,
|
||||
per_cwd_extra_user_roots: None,
|
||||
},
|
||||
};
|
||||
assert_eq!(
|
||||
skills_list.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let plugin_list = ClientRequest::PluginList {
|
||||
request_id: request_id(),
|
||||
params: v2::PluginListParams { cwds: None },
|
||||
};
|
||||
assert_eq!(
|
||||
plugin_list.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let plugin_uninstall = ClientRequest::PluginUninstall {
|
||||
request_id: request_id(),
|
||||
params: v2::PluginUninstallParams {
|
||||
@@ -1705,7 +1731,7 @@ mod tests {
|
||||
};
|
||||
assert_eq!(
|
||||
config_read.serialization_scope(),
|
||||
Some(ClientRequestSerializationScope::Global("config"))
|
||||
Some(ClientRequestSerializationScope::GlobalSharedRead("config"))
|
||||
);
|
||||
|
||||
let account_read = ClientRequest::GetAccount {
|
||||
|
||||
@@ -798,9 +798,9 @@ impl MessageProcessor {
|
||||
);
|
||||
|
||||
if let Some(scope) = serialization_scope {
|
||||
let key = RequestSerializationQueueKey::from_scope(connection_id, scope);
|
||||
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);
|
||||
self.request_serialization_queues
|
||||
.enqueue(key, request)
|
||||
.enqueue(key, access, request)
|
||||
.await;
|
||||
} else {
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CatalogRequestProcessor {
|
||||
@@ -9,6 +10,8 @@ pub(crate) struct CatalogRequestProcessor {
|
||||
pub(super) workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
|
||||
}
|
||||
|
||||
const SKILLS_LIST_CWD_CONCURRENCY: usize = 8;
|
||||
|
||||
fn skills_to_info(
|
||||
skills: &[codex_core::skills::SkillMetadata],
|
||||
disabled_paths: &HashSet<AbsolutePathBuf>,
|
||||
@@ -379,6 +382,7 @@ impl CatalogRequestProcessor {
|
||||
&self,
|
||||
params: SkillsListParams,
|
||||
) -> Result<SkillsListResponse, JSONRPCErrorError> {
|
||||
let total_started_at = Instant::now();
|
||||
let SkillsListParams {
|
||||
cwds,
|
||||
force_reload,
|
||||
@@ -390,7 +394,9 @@ impl CatalogRequestProcessor {
|
||||
cwds
|
||||
};
|
||||
let cwd_set: HashSet<PathBuf> = cwds.iter().cloned().collect();
|
||||
let cwd_count = cwds.len();
|
||||
|
||||
let extra_roots_started_at = Instant::now();
|
||||
let mut extra_roots_by_cwd: HashMap<PathBuf, Vec<AbsolutePathBuf>> = HashMap::new();
|
||||
for entry in per_cwd_extra_user_roots.unwrap_or_default() {
|
||||
if !cwd_set.contains(&entry.cwd) {
|
||||
@@ -417,12 +423,20 @@ impl CatalogRequestProcessor {
|
||||
.or_default()
|
||||
.extend(valid_extra_roots);
|
||||
}
|
||||
let extra_roots_ms = extra_roots_started_at.elapsed().as_millis();
|
||||
let extra_root_count = extra_roots_by_cwd.values().map(Vec::len).sum::<usize>();
|
||||
|
||||
let load_config_started_at = Instant::now();
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
let load_config_ms = load_config_started_at.elapsed().as_millis();
|
||||
let auth_started_at = Instant::now();
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let auth_ms = auth_started_at.elapsed().as_millis();
|
||||
let workspace_setting_started_at = Instant::now();
|
||||
let workspace_codex_plugins_enabled = self
|
||||
.workspace_codex_plugins_enabled(&config, auth.as_ref())
|
||||
.await;
|
||||
let workspace_setting_ms = workspace_setting_started_at.elapsed().as_millis();
|
||||
let skills_manager = self.thread_manager.skills_manager();
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
let fs = self
|
||||
@@ -430,56 +444,124 @@ impl CatalogRequestProcessor {
|
||||
.environment_manager()
|
||||
.default_environment()
|
||||
.map(|environment| environment.get_filesystem());
|
||||
let mut data = Vec::new();
|
||||
for cwd in cwds {
|
||||
let (cwd_abs, config_layer_stack) = match self.resolve_cwd_config(&cwd).await {
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
let error_path = cwd.clone();
|
||||
data.push(codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
}],
|
||||
});
|
||||
continue;
|
||||
let mut data = futures::stream::iter(cwds.into_iter().enumerate())
|
||||
.map(|(index, cwd)| {
|
||||
let config = &config;
|
||||
let extra_roots_by_cwd = &extra_roots_by_cwd;
|
||||
let fs = fs.clone();
|
||||
let plugins_manager = &plugins_manager;
|
||||
let skills_manager = &skills_manager;
|
||||
async move {
|
||||
let cwd_started_at = Instant::now();
|
||||
let resolve_cwd_config_started_at = Instant::now();
|
||||
let (cwd_abs, config_layer_stack) =
|
||||
match self.resolve_cwd_config(&cwd).await {
|
||||
Ok(resolved) => resolved,
|
||||
Err(message) => {
|
||||
warn!(
|
||||
cwd = %cwd.display(),
|
||||
total_ms = cwd_started_at.elapsed().as_millis(),
|
||||
resolve_cwd_config_ms = resolve_cwd_config_started_at.elapsed().as_millis(),
|
||||
"skills/list cwd timing failed to resolve cwd config"
|
||||
);
|
||||
let error_path = cwd.clone();
|
||||
return (
|
||||
index,
|
||||
codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills: Vec::new(),
|
||||
errors: vec![
|
||||
codex_app_server_protocol::SkillErrorInfo {
|
||||
path: error_path,
|
||||
message,
|
||||
},
|
||||
],
|
||||
},
|
||||
);
|
||||
}
|
||||
};
|
||||
let resolve_cwd_config_ms =
|
||||
resolve_cwd_config_started_at.elapsed().as_millis();
|
||||
let extra_roots = extra_roots_by_cwd
|
||||
.get(&cwd)
|
||||
.map_or(&[][..], std::vec::Vec::as_slice);
|
||||
let effective_skill_roots_started_at = Instant::now();
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(
|
||||
&config_layer_stack,
|
||||
&plugins_input,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let effective_skill_roots_ms =
|
||||
effective_skill_roots_started_at.elapsed().as_millis();
|
||||
let effective_skill_root_count = effective_skill_roots.len();
|
||||
let skills_input = codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
);
|
||||
let load_skills_started_at = Instant::now();
|
||||
let outcome = skills_manager
|
||||
.skills_for_cwd_with_extra_user_roots(
|
||||
&skills_input,
|
||||
force_reload,
|
||||
extra_roots,
|
||||
fs,
|
||||
)
|
||||
.await;
|
||||
let load_skills_ms = load_skills_started_at.elapsed().as_millis();
|
||||
let errors = errors_to_info(&outcome.errors);
|
||||
let skills = skills_to_info(&outcome.skills, &outcome.disabled_paths);
|
||||
warn!(
|
||||
cwd = %cwd.display(),
|
||||
total_ms = cwd_started_at.elapsed().as_millis(),
|
||||
resolve_cwd_config_ms,
|
||||
effective_skill_roots_ms,
|
||||
load_skills_ms,
|
||||
extra_root_count = extra_roots.len(),
|
||||
effective_skill_root_count,
|
||||
skill_count = skills.len(),
|
||||
error_count = errors.len(),
|
||||
"skills/list cwd timing"
|
||||
);
|
||||
(
|
||||
index,
|
||||
codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills,
|
||||
errors,
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
let extra_roots = extra_roots_by_cwd
|
||||
.get(&cwd)
|
||||
.map_or(&[][..], std::vec::Vec::as_slice);
|
||||
let effective_skill_roots = if workspace_codex_plugins_enabled {
|
||||
let plugins_input = config.plugins_config_input();
|
||||
plugins_manager
|
||||
.effective_skill_roots_for_layer_stack(&config_layer_stack, &plugins_input)
|
||||
.await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let skills_input = codex_core::skills::SkillsLoadInput::new(
|
||||
cwd_abs.clone(),
|
||||
effective_skill_roots,
|
||||
config_layer_stack,
|
||||
config.bundled_skills_enabled(),
|
||||
);
|
||||
let outcome = skills_manager
|
||||
.skills_for_cwd_with_extra_user_roots(
|
||||
&skills_input,
|
||||
force_reload,
|
||||
extra_roots,
|
||||
fs.clone(),
|
||||
)
|
||||
.await;
|
||||
let errors = errors_to_info(&outcome.errors);
|
||||
let skills = skills_to_info(&outcome.skills, &outcome.disabled_paths);
|
||||
data.push(codex_app_server_protocol::SkillsListEntry {
|
||||
cwd,
|
||||
skills,
|
||||
errors,
|
||||
});
|
||||
}
|
||||
})
|
||||
.buffer_unordered(SKILLS_LIST_CWD_CONCURRENCY)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
data.sort_unstable_by_key(|(index, _)| *index);
|
||||
let data = data.into_iter().map(|(_, entry)| entry).collect::<Vec<_>>();
|
||||
let skill_count = data.iter().map(|entry| entry.skills.len()).sum::<usize>();
|
||||
let error_count = data.iter().map(|entry| entry.errors.len()).sum::<usize>();
|
||||
warn!(
|
||||
cwd_count,
|
||||
total_ms = total_started_at.elapsed().as_millis(),
|
||||
force_reload,
|
||||
extra_roots_ms,
|
||||
extra_root_count,
|
||||
load_config_ms,
|
||||
auth_ms,
|
||||
workspace_setting_ms,
|
||||
workspace_codex_plugins_enabled,
|
||||
has_remote_fs = fs.is_some(),
|
||||
skill_count,
|
||||
error_count,
|
||||
"skills/list timing"
|
||||
);
|
||||
Ok(SkillsListResponse { data })
|
||||
}
|
||||
|
||||
|
||||
@@ -319,11 +319,15 @@ impl PluginRequestProcessor {
|
||||
&self,
|
||||
params: PluginListParams,
|
||||
) -> Result<PluginListResponse, JSONRPCErrorError> {
|
||||
let total_started_at = Instant::now();
|
||||
let plugins_manager = self.thread_manager.plugins_manager();
|
||||
let PluginListParams { cwds } = params;
|
||||
let roots = cwds.unwrap_or_default();
|
||||
let roots_count = roots.len();
|
||||
|
||||
let load_config_started_at = Instant::now();
|
||||
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
|
||||
let load_config_ms = load_config_started_at.elapsed().as_millis();
|
||||
let empty_response = || PluginListResponse {
|
||||
marketplaces: Vec::new(),
|
||||
marketplace_load_errors: Vec::new(),
|
||||
@@ -332,23 +336,30 @@ impl PluginRequestProcessor {
|
||||
if !config.features.enabled(Feature::Plugins) {
|
||||
return Ok(empty_response());
|
||||
}
|
||||
let auth_started_at = Instant::now();
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let auth_ms = auth_started_at.elapsed().as_millis();
|
||||
let workspace_setting_started_at = Instant::now();
|
||||
if !self
|
||||
.workspace_codex_plugins_enabled(&config, auth.as_ref())
|
||||
.await
|
||||
{
|
||||
return Ok(empty_response());
|
||||
}
|
||||
let workspace_setting_ms = workspace_setting_started_at.elapsed().as_millis();
|
||||
let plugins_input = config.plugins_config_input();
|
||||
let background_tasks_started_at = Instant::now();
|
||||
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
|
||||
&plugins_input,
|
||||
auth.clone(),
|
||||
&roots,
|
||||
Some(self.effective_plugins_changed_callback()),
|
||||
);
|
||||
let background_tasks_ms = background_tasks_started_at.elapsed().as_millis();
|
||||
|
||||
let config_for_marketplace_listing = plugins_input.clone();
|
||||
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
|
||||
let local_marketplace_listing_started_at = Instant::now();
|
||||
let (mut data, marketplace_load_errors) = match tokio::task::spawn_blocking(move || {
|
||||
let outcome = plugins_manager_for_marketplace_listing
|
||||
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
|
||||
@@ -406,7 +417,10 @@ impl PluginRequestProcessor {
|
||||
)));
|
||||
}
|
||||
};
|
||||
let local_marketplace_listing_ms =
|
||||
local_marketplace_listing_started_at.elapsed().as_millis();
|
||||
|
||||
let remote_marketplace_fetch_started_at = Instant::now();
|
||||
if config.features.enabled(Feature::RemotePlugin) {
|
||||
let remote_plugin_service_config = RemotePluginServiceConfig {
|
||||
chatgpt_base_url: config.chatgpt_base_url.clone(),
|
||||
@@ -444,7 +458,9 @@ impl PluginRequestProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
let remote_marketplace_fetch_ms = remote_marketplace_fetch_started_at.elapsed().as_millis();
|
||||
|
||||
let featured_plugin_ids_started_at = Instant::now();
|
||||
let featured_plugin_ids = if data
|
||||
.iter()
|
||||
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
|
||||
@@ -465,6 +481,29 @@ impl PluginRequestProcessor {
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let featured_plugin_ids_ms = featured_plugin_ids_started_at.elapsed().as_millis();
|
||||
let plugin_count = data
|
||||
.iter()
|
||||
.map(|marketplace| marketplace.plugins.len())
|
||||
.sum::<usize>();
|
||||
|
||||
warn!(
|
||||
roots_count,
|
||||
total_ms = total_started_at.elapsed().as_millis(),
|
||||
load_config_ms,
|
||||
auth_ms,
|
||||
workspace_setting_ms,
|
||||
background_tasks_ms,
|
||||
local_marketplace_listing_ms,
|
||||
remote_marketplace_fetch_ms,
|
||||
featured_plugin_ids_ms,
|
||||
marketplace_count = data.len(),
|
||||
plugin_count,
|
||||
marketplace_load_error_count = marketplace_load_errors.len(),
|
||||
featured_plugin_id_count = featured_plugin_ids.len(),
|
||||
remote_plugin_enabled = config.features.enabled(Feature::RemotePlugin),
|
||||
"plugin/list timing"
|
||||
);
|
||||
|
||||
Ok(PluginListResponse {
|
||||
marketplaces: data,
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_app_server_protocol::ClientRequestSerializationScope;
|
||||
use futures::future::join_all;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::Instrument;
|
||||
|
||||
@@ -43,35 +44,61 @@ pub(crate) enum RequestSerializationQueueKey {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub(crate) enum RequestSerializationAccess {
|
||||
Exclusive,
|
||||
SharedRead,
|
||||
}
|
||||
|
||||
impl RequestSerializationQueueKey {
|
||||
pub(crate) fn from_scope(
|
||||
connection_id: ConnectionId,
|
||||
scope: ClientRequestSerializationScope,
|
||||
) -> Self {
|
||||
) -> (Self, RequestSerializationAccess) {
|
||||
match scope {
|
||||
ClientRequestSerializationScope::Global(name) => Self::Global(name),
|
||||
ClientRequestSerializationScope::Thread { thread_id } => Self::Thread { thread_id },
|
||||
ClientRequestSerializationScope::ThreadPath { path } => Self::ThreadPath { path },
|
||||
ClientRequestSerializationScope::CommandExecProcess { process_id } => {
|
||||
ClientRequestSerializationScope::Global(name) => {
|
||||
(Self::Global(name), RequestSerializationAccess::Exclusive)
|
||||
}
|
||||
ClientRequestSerializationScope::GlobalSharedRead(name) => {
|
||||
(Self::Global(name), RequestSerializationAccess::SharedRead)
|
||||
}
|
||||
ClientRequestSerializationScope::Thread { thread_id } => (
|
||||
Self::Thread { thread_id },
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::ThreadPath { path } => (
|
||||
Self::ThreadPath { path },
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::CommandExecProcess { process_id } => (
|
||||
Self::CommandExecProcess {
|
||||
connection_id,
|
||||
process_id,
|
||||
}
|
||||
}
|
||||
ClientRequestSerializationScope::Process { process_handle } => Self::Process {
|
||||
connection_id,
|
||||
process_handle,
|
||||
},
|
||||
ClientRequestSerializationScope::FuzzyFileSearchSession { session_id } => {
|
||||
Self::FuzzyFileSearchSession { session_id }
|
||||
}
|
||||
ClientRequestSerializationScope::FsWatch { watch_id } => Self::FsWatch {
|
||||
connection_id,
|
||||
watch_id,
|
||||
},
|
||||
ClientRequestSerializationScope::McpOauth { server_name } => {
|
||||
Self::McpOauth { server_name }
|
||||
}
|
||||
},
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::Process { process_handle } => (
|
||||
Self::Process {
|
||||
connection_id,
|
||||
process_handle,
|
||||
},
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::FuzzyFileSearchSession { session_id } => (
|
||||
Self::FuzzyFileSearchSession { session_id },
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::FsWatch { watch_id } => (
|
||||
Self::FsWatch {
|
||||
connection_id,
|
||||
watch_id,
|
||||
},
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
ClientRequestSerializationScope::McpOauth { server_name } => (
|
||||
Self::McpOauth { server_name },
|
||||
RequestSerializationAccess::Exclusive,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -98,17 +125,24 @@ impl QueuedInitializedRequest {
|
||||
}
|
||||
}
|
||||
|
||||
struct QueuedSerializedRequest {
|
||||
access: RequestSerializationAccess,
|
||||
request: QueuedInitializedRequest,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct RequestSerializationQueues {
|
||||
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, VecDeque<QueuedInitializedRequest>>>>,
|
||||
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, VecDeque<QueuedSerializedRequest>>>>,
|
||||
}
|
||||
|
||||
impl RequestSerializationQueues {
|
||||
pub(crate) async fn enqueue(
|
||||
&self,
|
||||
key: RequestSerializationQueueKey,
|
||||
access: RequestSerializationAccess,
|
||||
request: QueuedInitializedRequest,
|
||||
) {
|
||||
let request = QueuedSerializedRequest { access, request };
|
||||
let should_spawn = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
match queues.get_mut(&key) {
|
||||
@@ -134,13 +168,27 @@ impl RequestSerializationQueues {
|
||||
|
||||
async fn drain(self, key: RequestSerializationQueueKey) {
|
||||
loop {
|
||||
let request = {
|
||||
let requests = {
|
||||
let mut queues = self.inner.lock().await;
|
||||
let Some(queue) = queues.get_mut(&key) else {
|
||||
return;
|
||||
};
|
||||
match queue.pop_front() {
|
||||
Some(request) => request,
|
||||
Some(request) => {
|
||||
let access = request.access;
|
||||
let mut requests = vec![request];
|
||||
if access == RequestSerializationAccess::SharedRead {
|
||||
while queue.front().is_some_and(|request| {
|
||||
request.access == RequestSerializationAccess::SharedRead
|
||||
}) {
|
||||
let Some(request) = queue.pop_front() else {
|
||||
break;
|
||||
};
|
||||
requests.push(request);
|
||||
}
|
||||
}
|
||||
requests
|
||||
}
|
||||
None => {
|
||||
queues.remove(&key);
|
||||
return;
|
||||
@@ -148,7 +196,7 @@ impl RequestSerializationQueues {
|
||||
}
|
||||
};
|
||||
|
||||
request.run().await;
|
||||
join_all(requests.into_iter().map(|request| request.request.run())).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -158,6 +206,7 @@ mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Duration;
|
||||
@@ -195,6 +244,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(Arc::clone(&gate), async move {
|
||||
tx.send(value).expect("receiver should be open");
|
||||
}),
|
||||
@@ -230,6 +280,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
RequestSerializationQueueKey::Global("blocked"),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
let _ = blocked_rx.await;
|
||||
}),
|
||||
@@ -238,6 +289,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
RequestSerializationQueueKey::Global("other"),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
ran_tx.send(()).expect("receiver should be open");
|
||||
}),
|
||||
@@ -268,6 +320,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(Arc::clone(&live_gate), async move {
|
||||
tx.send(FIRST_REQUEST_VALUE)
|
||||
.expect("receiver should be open");
|
||||
@@ -281,6 +334,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(closed_gate, async move {
|
||||
tx.send(SECOND_REQUEST_VALUE)
|
||||
.expect("receiver should be open");
|
||||
@@ -293,6 +347,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key,
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(live_gate, async move {
|
||||
tx.send(THIRD_REQUEST_VALUE)
|
||||
.expect("receiver should be open");
|
||||
@@ -336,6 +391,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(Arc::clone(&live_gate), async move {
|
||||
tx.send(FIRST_REQUEST_VALUE)
|
||||
.expect("receiver should be open");
|
||||
@@ -349,6 +405,7 @@ mod tests {
|
||||
queues
|
||||
.enqueue(
|
||||
key,
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(live_gate.clone(), async move {
|
||||
tx.send(SECOND_REQUEST_VALUE)
|
||||
.expect("receiver should be open");
|
||||
@@ -385,4 +442,241 @@ mod tests {
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_key_shared_reads_run_concurrently() {
|
||||
let queues = RequestSerializationQueues::default();
|
||||
let key = RequestSerializationQueueKey::Global("test");
|
||||
let (blocker_started_tx, blocker_started_rx) = oneshot::channel::<()>();
|
||||
let (blocker_release_tx, blocker_release_rx) = oneshot::channel::<()>();
|
||||
let (started_tx, mut started_rx) = mpsc::unbounded_channel();
|
||||
let (release_tx, _) = broadcast::channel::<()>(/*capacity*/ 1);
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
blocker_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
let _ = blocker_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
timeout(queue_drain_timeout(), blocker_started_rx)
|
||||
.await
|
||||
.expect("blocker should start")
|
||||
.expect("sender should be open");
|
||||
|
||||
for value in [FIRST_REQUEST_VALUE, SECOND_REQUEST_VALUE] {
|
||||
let started_tx = started_tx.clone();
|
||||
let mut release_rx = release_tx.subscribe();
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
started_tx.send(value).expect("receiver should be open");
|
||||
let _ = release_rx.recv().await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
drop(started_tx);
|
||||
blocker_release_tx
|
||||
.send(())
|
||||
.expect("blocker should still be waiting");
|
||||
|
||||
let mut started = Vec::new();
|
||||
for _ in 0..2 {
|
||||
started.push(
|
||||
timeout(queue_drain_timeout(), started_rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for shared read")
|
||||
.expect("sender should be open"),
|
||||
);
|
||||
}
|
||||
assert_eq!(started, vec![FIRST_REQUEST_VALUE, SECOND_REQUEST_VALUE]);
|
||||
|
||||
release_tx
|
||||
.send(())
|
||||
.expect("shared reads should still be waiting");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exclusive_write_waits_for_running_shared_reads() {
|
||||
let queues = RequestSerializationQueues::default();
|
||||
let key = RequestSerializationQueueKey::Global("test");
|
||||
let (blocker_started_tx, blocker_started_rx) = oneshot::channel::<()>();
|
||||
let (blocker_release_tx, blocker_release_rx) = oneshot::channel::<()>();
|
||||
let (read_started_tx, mut read_started_rx) = mpsc::unbounded_channel();
|
||||
let (read_release_tx, _) = broadcast::channel::<()>(/*capacity*/ 1);
|
||||
let (write_started_tx, write_started_rx) = oneshot::channel::<()>();
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
blocker_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
let _ = blocker_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
timeout(queue_drain_timeout(), blocker_started_rx)
|
||||
.await
|
||||
.expect("blocker should start")
|
||||
.expect("sender should be open");
|
||||
|
||||
for value in [FIRST_REQUEST_VALUE, SECOND_REQUEST_VALUE] {
|
||||
let read_started_tx = read_started_tx.clone();
|
||||
let mut read_release_rx = read_release_tx.subscribe();
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
read_started_tx
|
||||
.send(value)
|
||||
.expect("receiver should be open");
|
||||
let _ = read_release_rx.recv().await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
write_started_tx.send(()).expect("receiver should be open");
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
drop(read_started_tx);
|
||||
blocker_release_tx
|
||||
.send(())
|
||||
.expect("blocker should still be waiting");
|
||||
|
||||
for _ in 0..2 {
|
||||
timeout(queue_drain_timeout(), read_started_rx.recv())
|
||||
.await
|
||||
.expect("timed out waiting for shared read")
|
||||
.expect("sender should be open");
|
||||
}
|
||||
let mut write_started_rx = Box::pin(write_started_rx);
|
||||
timeout(shutdown_wait_timeout(), &mut write_started_rx)
|
||||
.await
|
||||
.expect_err("write should wait for running shared reads");
|
||||
|
||||
read_release_tx
|
||||
.send(())
|
||||
.expect("shared reads should still be waiting");
|
||||
timeout(queue_drain_timeout(), &mut write_started_rx)
|
||||
.await
|
||||
.expect("write should start after shared reads finish")
|
||||
.expect("sender should be open");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn later_shared_reads_do_not_jump_ahead_of_queued_write() {
|
||||
let queues = RequestSerializationQueues::default();
|
||||
let key = RequestSerializationQueueKey::Global("test");
|
||||
let (blocker_started_tx, blocker_started_rx) = oneshot::channel::<()>();
|
||||
let (blocker_release_tx, blocker_release_rx) = oneshot::channel::<()>();
|
||||
let (first_read_started_tx, first_read_started_rx) = oneshot::channel::<()>();
|
||||
let (first_read_release_tx, first_read_release_rx) = oneshot::channel::<()>();
|
||||
let (write_started_tx, write_started_rx) = oneshot::channel::<()>();
|
||||
let (write_release_tx, write_release_rx) = oneshot::channel::<()>();
|
||||
let (later_read_started_tx, later_read_started_rx) = oneshot::channel::<()>();
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
blocker_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
let _ = blocker_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
timeout(queue_drain_timeout(), blocker_started_rx)
|
||||
.await
|
||||
.expect("blocker should start")
|
||||
.expect("sender should be open");
|
||||
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
first_read_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
let _ = first_read_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::Exclusive,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
write_started_tx.send(()).expect("receiver should be open");
|
||||
let _ = write_release_rx.await;
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
queues
|
||||
.enqueue(
|
||||
key.clone(),
|
||||
RequestSerializationAccess::SharedRead,
|
||||
QueuedInitializedRequest::new(gate(), async move {
|
||||
later_read_started_tx
|
||||
.send(())
|
||||
.expect("receiver should be open");
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
blocker_release_tx
|
||||
.send(())
|
||||
.expect("blocker should still be waiting");
|
||||
|
||||
timeout(queue_drain_timeout(), first_read_started_rx)
|
||||
.await
|
||||
.expect("first read should start")
|
||||
.expect("sender should be open");
|
||||
let mut write_started_rx = Box::pin(write_started_rx);
|
||||
timeout(shutdown_wait_timeout(), &mut write_started_rx)
|
||||
.await
|
||||
.expect_err("write should wait for the first read");
|
||||
let mut later_read_started_rx = Box::pin(later_read_started_rx);
|
||||
timeout(shutdown_wait_timeout(), &mut later_read_started_rx)
|
||||
.await
|
||||
.expect_err("later read should wait behind the queued write");
|
||||
|
||||
first_read_release_tx
|
||||
.send(())
|
||||
.expect("first read should still be waiting");
|
||||
timeout(queue_drain_timeout(), &mut write_started_rx)
|
||||
.await
|
||||
.expect("write should start after the first read")
|
||||
.expect("sender should be open");
|
||||
timeout(shutdown_wait_timeout(), &mut later_read_started_rx)
|
||||
.await
|
||||
.expect_err("later read should still wait while the write is running");
|
||||
|
||||
write_release_tx
|
||||
.send(())
|
||||
.expect("write should still be waiting");
|
||||
timeout(queue_drain_timeout(), &mut later_read_started_rx)
|
||||
.await
|
||||
.expect("later read should start after the write")
|
||||
.expect("sender should be open");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -530,6 +530,42 @@ async fn skills_list_accepts_relative_cwds() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_list_preserves_requested_cwd_order() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let first_cwd = TempDir::new()?;
|
||||
let second_cwd = TempDir::new()?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_skills_list_request(SkillsListParams {
|
||||
cwds: vec![
|
||||
first_cwd.path().to_path_buf(),
|
||||
second_cwd.path().to_path_buf(),
|
||||
],
|
||||
force_reload: true,
|
||||
per_cwd_extra_user_roots: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let SkillsListResponse { data } = to_response(response)?;
|
||||
assert_eq!(
|
||||
data.iter().map(|entry| &entry.cwd).collect::<Vec<_>>(),
|
||||
vec![
|
||||
&first_cwd.path().to_path_buf(),
|
||||
&second_cwd.path().to_path_buf(),
|
||||
],
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skills_list_ignores_per_cwd_extra_roots_for_unknown_cwd() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
@@ -9,6 +9,7 @@ use codex_protocol::protocol::Product;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_plugins::PluginSkillRoot;
|
||||
use std::time::Instant;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -143,14 +144,28 @@ impl SkillsManager {
|
||||
extra_user_roots: &[AbsolutePathBuf],
|
||||
fs: Option<Arc<dyn ExecutorFileSystem>>,
|
||||
) -> SkillLoadOutcome {
|
||||
let total_started_at = Instant::now();
|
||||
let use_cwd_cache = fs.is_some();
|
||||
if use_cwd_cache
|
||||
&& !force_reload
|
||||
&& let Some(outcome) = self.cached_outcome_for_cwd(&input.cwd)
|
||||
{
|
||||
warn!(
|
||||
cwd = %input.cwd.as_path().display(),
|
||||
total_ms = total_started_at.elapsed().as_millis(),
|
||||
force_reload,
|
||||
use_cwd_cache,
|
||||
cache_hit = true,
|
||||
effective_skill_root_count = input.effective_skill_roots.len(),
|
||||
extra_user_root_count = extra_user_roots.len(),
|
||||
skill_count = outcome.skills.len(),
|
||||
error_count = outcome.errors.len(),
|
||||
"skills manager cwd timing"
|
||||
);
|
||||
return outcome;
|
||||
}
|
||||
|
||||
let skill_roots_started_at = Instant::now();
|
||||
let mut roots = skill_roots(
|
||||
fs.clone(),
|
||||
&input.config_layer_stack,
|
||||
@@ -158,6 +173,7 @@ impl SkillsManager {
|
||||
input.effective_skill_roots.clone(),
|
||||
)
|
||||
.await;
|
||||
let skill_roots_ms = skill_roots_started_at.elapsed().as_millis();
|
||||
if !bundled_skills_enabled_from_stack(&input.config_layer_stack) {
|
||||
roots.retain(|root| root.scope != SkillScope::System);
|
||||
}
|
||||
@@ -173,8 +189,12 @@ impl SkillsManager {
|
||||
}),
|
||||
);
|
||||
}
|
||||
let root_count = roots.len();
|
||||
let skill_config_rules = skill_config_rules_from_stack(&input.config_layer_stack);
|
||||
let build_outcome_started_at = Instant::now();
|
||||
let outcome = self.build_skill_outcome(roots, &skill_config_rules).await;
|
||||
let build_outcome_ms = build_outcome_started_at.elapsed().as_millis();
|
||||
let cache_write_started_at = Instant::now();
|
||||
if use_cwd_cache {
|
||||
let mut cache = self
|
||||
.cache_by_cwd
|
||||
@@ -182,6 +202,23 @@ impl SkillsManager {
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
cache.insert(input.cwd.clone(), outcome.clone());
|
||||
}
|
||||
let cache_write_ms = cache_write_started_at.elapsed().as_millis();
|
||||
warn!(
|
||||
cwd = %input.cwd.as_path().display(),
|
||||
total_ms = total_started_at.elapsed().as_millis(),
|
||||
force_reload,
|
||||
use_cwd_cache,
|
||||
cache_hit = false,
|
||||
effective_skill_root_count = input.effective_skill_roots.len(),
|
||||
extra_user_root_count = extra_user_roots.len(),
|
||||
root_count,
|
||||
skill_roots_ms,
|
||||
build_outcome_ms,
|
||||
cache_write_ms,
|
||||
skill_count = outcome.skills.len(),
|
||||
error_count = outcome.errors.len(),
|
||||
"skills manager cwd timing"
|
||||
);
|
||||
outcome
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user