Compare commits

...

1 Commits

Author SHA1 Message Date
Matthew Zeng
361dff228e Avoid duplicate codex_apps tool refresh on force refetch 2026-05-11 12:23:42 -07:00
2 changed files with 50 additions and 14 deletions

View File

@@ -3,6 +3,8 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use anyhow::Result;
@@ -1160,6 +1162,7 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
} = to_response(warm_response)?;
assert_eq!(warm_data, warm_second_update.data);
assert!(warm_next_cursor.is_none());
assert_eq!(server_control.tools_list_call_count(), 1);
server_control.set_connectors(vec![AppInfo {
id: "alpha".to_string(),
@@ -1263,6 +1266,7 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu
} = to_response(refetch_response)?;
assert_eq!(refetch_data, expected_final);
assert!(refetch_next_cursor.is_none());
assert_eq!(server_control.tools_list_call_count(), 2);
server_handle.abort();
Ok(())
@@ -1397,12 +1401,21 @@ struct AppsServerState {
#[derive(Clone)]
struct AppListMcpServer {
tools: Arc<StdMutex<Vec<Tool>>>,
tools_list_call_count: Arc<AtomicUsize>,
tools_delay: Duration,
}
impl AppListMcpServer {
fn new(tools: Arc<StdMutex<Vec<Tool>>>, tools_delay: Duration) -> Self {
Self { tools, tools_delay }
fn new(
tools: Arc<StdMutex<Vec<Tool>>>,
tools_list_call_count: Arc<AtomicUsize>,
tools_delay: Duration,
) -> Self {
Self {
tools,
tools_list_call_count,
tools_delay,
}
}
}
@@ -1410,6 +1423,7 @@ impl AppListMcpServer {
struct AppsServerControl {
response: Arc<StdMutex<serde_json::Value>>,
tools: Arc<StdMutex<Vec<Tool>>>,
tools_list_call_count: Arc<AtomicUsize>,
}
impl AppsServerControl {
@@ -1428,6 +1442,10 @@ impl AppsServerControl {
.unwrap_or_else(std::sync::PoisonError::into_inner);
*tools_guard = tools;
}
fn tools_list_call_count(&self) -> usize {
self.tools_list_call_count.load(Ordering::Relaxed)
}
}
impl ServerHandler for AppListMcpServer {
@@ -1445,8 +1463,10 @@ impl ServerHandler for AppListMcpServer {
) -> impl std::future::Future<Output = Result<ListToolsResult, rmcp::ErrorData>> + Send + '_
{
let tools = self.tools.clone();
let tools_list_call_count = Arc::clone(&self.tools_list_call_count);
let tools_delay = self.tools_delay;
async move {
tools_list_call_count.fetch_add(1, Ordering::Relaxed);
if tools_delay > Duration::ZERO {
tokio::time::sleep(tools_delay).await;
}
@@ -1519,6 +1539,7 @@ async fn start_apps_server_with_delays_and_control_inner(
json!({ "apps": connectors, "next_token": null }),
));
let tools = Arc::new(StdMutex::new(tools));
let tools_list_call_count = Arc::new(AtomicUsize::new(0));
let state = AppsServerState {
expected_bearer: "Bearer chatgpt-token".to_string(),
expected_account_id: "account-123".to_string(),
@@ -1530,6 +1551,7 @@ async fn start_apps_server_with_delays_and_control_inner(
let server_control = AppsServerControl {
response,
tools: tools.clone(),
tools_list_call_count: Arc::clone(&tools_list_call_count),
};
let listener = TcpListener::bind("127.0.0.1:0").await?;
@@ -1538,7 +1560,14 @@ async fn start_apps_server_with_delays_and_control_inner(
let mcp_service = StreamableHttpService::new(
{
let tools = tools.clone();
move || Ok(AppListMcpServer::new(tools.clone(), tools_delay))
let tools_list_call_count = Arc::clone(&tools_list_call_count);
move || {
Ok(AppListMcpServer::new(
tools.clone(),
Arc::clone(&tools_list_call_count),
tools_delay,
))
}
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),

View File

@@ -284,17 +284,24 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_environment_manager(
.await;
let refreshed_tools = if force_refetch {
match mcp_connection_manager
.hard_refresh_codex_apps_tools_cache()
.await
{
Ok(tools) => Some(tools),
Err(err) => {
warn!(
"failed to force-refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}', using cached/startup tools: {err:#}"
);
None
}
let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
let timeout = cfg
.configured_config()
.and_then(|config| config.startup_timeout_sec)
.unwrap_or(CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS);
mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
.await
} else {
false
};
if codex_apps_ready {
Some(mcp_connection_manager.list_all_tools().await)
} else {
warn!(
"failed to force-refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}', using cached/startup tools"
);
None
}
} else {
None