Split MCP connection modules (#19725)

## Why

The MCP connection manager module had grown to mix orchestration, RMCP
client startup, elicitation handling, Codex Apps cache and naming
behavior, tool qualification and filtering, and runtime data. The
previous stacked PRs split these responsibilities incrementally; this PR
collapses that work into one self-contained refactor on latest main.

## What changed

- Move McpConnectionManager into connection_manager.rs.
- Move RMCP client lifecycle, startup, and uncached tool listing into
rmcp_client.rs.
- Move elicitation request tracking and policy handling into
elicitation.rs.
- Move Codex Apps cache, key, filtering, and naming helpers into
codex_apps.rs.
- Rename the tool-name helper module to tools.rs and move ToolInfo, tool
filtering, schema masking, and qualification there.
- Move runtime and sandbox shared types into runtime.rs.
- Preserve latest main PermissionProfile-based MCP elicitation
auto-approval behavior.

## Verification

- just fmt
- cargo check -p codex-mcp
- cargo check -p codex-mcp --tests
- cargo check -p codex-core

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-26 16:23:34 -07:00
committed by GitHub
parent 4c58e64f08
commit 0bda8161a2
11 changed files with 2020 additions and 1878 deletions

View File

@@ -0,0 +1,258 @@
//! Codex Apps support for the built-in apps MCP server.
//!
//! This module owns the pieces that are unique to ChatGPT-hosted app
//! connectors: cache scoping by authenticated user, disk cache reads/writes,
//! connector allow-list filtering, and the normalization that turns app
//! connector/tool metadata into model-visible MCP callable names.
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::runtime::emit_duration;
use crate::tools::MCP_TOOLS_CACHE_WRITE_DURATION_METRIC;
use crate::tools::ToolInfo;
use codex_login::CodexAuth;
use codex_utils_plugins::mcp_connector::is_connector_id_allowed;
use codex_utils_plugins::mcp_connector::sanitize_name;
use serde::Deserialize;
use serde::Serialize;
use sha1::Digest;
use sha1::Sha1;
pub(crate) const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CodexAppsToolsCacheKey {
pub(crate) account_id: Option<String>,
pub(crate) chatgpt_user_id: Option<String>,
pub(crate) is_workspace_account: bool,
}
pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey {
CodexAppsToolsCacheKey {
account_id: auth.and_then(CodexAuth::get_account_id),
chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id),
is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account),
}
}
pub fn filter_non_codex_apps_mcp_tools_only(
mcp_tools: &HashMap<String, ToolInfo>,
) -> HashMap<String, ToolInfo> {
mcp_tools
.iter()
.filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME)
.map(|(name, tool)| (name.clone(), tool.clone()))
.collect()
}
#[derive(Clone)]
pub(crate) struct CodexAppsToolsCacheContext {
pub(crate) codex_home: PathBuf,
pub(crate) user_key: CodexAppsToolsCacheKey,
}
impl CodexAppsToolsCacheContext {
pub(crate) fn cache_path(&self) -> PathBuf {
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
let user_key_hash = sha1_hex(&user_key_json);
self.codex_home
.join(CODEX_APPS_TOOLS_CACHE_DIR)
.join(format!("{user_key_hash}.json"))
}
}
pub(crate) enum CachedCodexAppsToolsLoad {
Hit(Vec<ToolInfo>),
Missing,
Invalid,
}
pub(crate) fn normalize_codex_apps_tool_title(
server_name: &str,
connector_name: Option<&str>,
value: &str,
) -> String {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return value.to_string();
}
let Some(connector_name) = connector_name
.map(str::trim)
.filter(|name| !name.is_empty())
else {
return value.to_string();
};
let prefix = format!("{connector_name}_");
if let Some(stripped) = value.strip_prefix(&prefix)
&& !stripped.is_empty()
{
return stripped.to_string();
}
value.to_string()
}
pub(crate) fn normalize_codex_apps_callable_name(
server_name: &str,
tool_name: &str,
connector_id: Option<&str>,
connector_name: Option<&str>,
) -> String {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return tool_name.to_string();
}
let tool_name = sanitize_name(tool_name);
if let Some(connector_name) = connector_name
.map(str::trim)
.map(sanitize_name)
.filter(|name| !name.is_empty())
&& let Some(stripped) = tool_name.strip_prefix(&connector_name)
&& !stripped.is_empty()
{
return stripped.to_string();
}
if let Some(connector_id) = connector_id
.map(str::trim)
.map(sanitize_name)
.filter(|name| !name.is_empty())
&& let Some(stripped) = tool_name.strip_prefix(&connector_id)
&& !stripped.is_empty()
{
return stripped.to_string();
}
tool_name
}
pub(crate) fn normalize_codex_apps_callable_namespace(
server_name: &str,
connector_name: Option<&str>,
) -> String {
if server_name == CODEX_APPS_MCP_SERVER_NAME
&& let Some(connector_name) = connector_name
{
format!("mcp__{}__{}", server_name, sanitize_name(connector_name))
} else {
format!("mcp__{server_name}__")
}
}
pub(crate) fn write_cached_codex_apps_tools_if_needed(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
tools: &[ToolInfo],
) {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return;
}
if let Some(cache_context) = cache_context {
let cache_write_start = Instant::now();
write_cached_codex_apps_tools(cache_context, tools);
emit_duration(
MCP_TOOLS_CACHE_WRITE_DURATION_METRIC,
cache_write_start.elapsed(),
&[],
);
}
}
pub(crate) fn load_startup_cached_codex_apps_tools_snapshot(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
) -> Option<Vec<ToolInfo>> {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
let cache_context = cache_context?;
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
}
#[cfg(test)]
pub(crate) fn read_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> Option<Vec<ToolInfo>> {
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
}
pub(crate) fn load_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> CachedCodexAppsToolsLoad {
let cache_path = cache_context.cache_path();
let bytes = match std::fs::read(cache_path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return CachedCodexAppsToolsLoad::Missing;
}
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
Ok(cache) => cache,
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
return CachedCodexAppsToolsLoad::Invalid;
}
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
}
pub(crate) fn write_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
tools: &[ToolInfo],
) {
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent()
&& std::fs::create_dir_all(parent).is_err()
{
return;
}
let tools = filter_disallowed_codex_apps_tools(tools.to_vec());
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
tools,
}) else {
return;
};
let _ = std::fs::write(cache_path, bytes);
}
pub(crate) fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| {
tool.connector_id
.as_deref()
.is_none_or(is_connector_id_allowed)
})
.collect()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CodexAppsToolsDiskCache {
schema_version: u8,
tools: Vec<ToolInfo>,
}
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
fn sha1_hex(s: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(s.as_bytes());
let sha1 = hasher.finalize();
format!("{sha1:x}")
}

View File

@@ -0,0 +1,700 @@
//! Aggregates MCP server connections for Codex.
//!
//! [`McpConnectionManager`] owns the set of running async RMCP clients keyed by
//! MCP server name. It coordinates startup status events, keeps server origin
//! metadata, aggregates tools/resources/templates across servers, routes tool
//! calls to the right client, and exposes the public manager API used by
//! `codex-core`.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::McpAuthStatusEntry;
use crate::codex_apps::CodexAppsToolsCacheContext;
use crate::codex_apps::CodexAppsToolsCacheKey;
use crate::codex_apps::write_cached_codex_apps_tools_if_needed;
use crate::elicitation::ElicitationRequestManager;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::ToolPluginProvenance;
use crate::rmcp_client::AsyncManagedClient;
use crate::rmcp_client::DEFAULT_STARTUP_TIMEOUT;
use crate::rmcp_client::MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC;
use crate::rmcp_client::MCP_TOOLS_LIST_DURATION_METRIC;
use crate::rmcp_client::ManagedClient;
use crate::rmcp_client::StartupOutcomeError;
use crate::rmcp_client::list_tools_for_client_uncached;
use crate::runtime::McpRuntimeEnvironment;
use crate::runtime::emit_duration;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::qualify_tools;
use crate::tools::tool_with_model_visible_input_schema;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_config::Constrained;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_login::CodexAuth;
use codex_protocol::ToolName;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::McpStartupCompleteEvent;
use codex_protocol::protocol::McpStartupFailure;
use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_rmcp_client::ElicitationResponse;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use rmcp::model::Resource;
use rmcp::model::ResourceTemplate;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::warn;
use url::Url;
/// A thin wrapper around a set of running [`RmcpClient`] instances.
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_origins: HashMap<String, String>,
elicitation_requests: ElicitationRequestManager,
}
impl McpConnectionManager {
pub fn new_uninitialized(
approval_policy: &Constrained<AskForApproval>,
permission_profile: &Constrained<PermissionProfile>,
) -> Self {
Self {
clients: HashMap::new(),
server_origins: HashMap::new(),
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
permission_profile.get().clone(),
),
}
}
pub fn has_servers(&self) -> bool {
!self.clients.is_empty()
}
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
self.server_origins.get(server_name).map(String::as_str)
}
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
*policy = approval_policy.value();
}
}
pub fn set_permission_profile(&self, permission_profile: PermissionProfile) {
if let Ok(mut profile) = self.elicitation_requests.permission_profile.lock() {
*profile = permission_profile;
}
}
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub async fn new(
mcp_servers: &HashMap<String, McpServerConfig>,
store_mode: OAuthCredentialsStoreMode,
auth_entries: HashMap<String, McpAuthStatusEntry>,
approval_policy: &Constrained<AskForApproval>,
submit_id: String,
tx_event: Sender<Event>,
initial_permission_profile: PermissionProfile,
runtime_environment: McpRuntimeEnvironment,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
tool_plugin_provenance: ToolPluginProvenance,
auth: Option<&CodexAuth>,
) -> (Self, CancellationToken) {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
let mut server_origins = HashMap::new();
let mut join_set = JoinSet::new();
let elicitation_requests =
ElicitationRequestManager::new(approval_policy.value(), initial_permission_profile);
let tool_plugin_provenance = Arc::new(tool_plugin_provenance);
let startup_submit_id = submit_id.clone();
let codex_apps_auth_provider = auth
.filter(|auth| auth.uses_codex_backend())
.map(codex_model_provider::auth_provider_from_auth);
let mcp_servers = mcp_servers.clone();
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
if let Some(origin) = transport_origin(&cfg.transport) {
server_origins.insert(server_name.clone(), origin);
}
let cancel_token = cancel_token.child_token();
let _ = emit_update(
startup_submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Starting,
},
)
.await;
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
Some(CodexAppsToolsCacheContext {
codex_home: codex_home.clone(),
user_key: codex_apps_tools_cache_key.clone(),
})
} else {
None
};
let uses_env_bearer_token = match &cfg.transport {
McpServerTransportConfig::StreamableHttp {
bearer_token_env_var,
..
} => bearer_token_env_var.is_some(),
McpServerTransportConfig::Stdio { .. } => false,
};
let runtime_auth_provider =
if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token {
codex_apps_auth_provider.clone()
} else {
None
};
let async_managed_client = AsyncManagedClient::new(
server_name.clone(),
cfg,
store_mode,
cancel_token.clone(),
tx_event.clone(),
elicitation_requests.clone(),
codex_apps_tools_cache_context,
Arc::clone(&tool_plugin_provenance),
runtime_environment.clone(),
runtime_auth_provider,
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let mut outcome = async_managed_client.client().await;
if cancel_token.is_cancelled() {
outcome = Err(StartupOutcomeError::Cancelled);
}
let status = match &outcome {
Ok(_) => McpStartupStatus::Ready,
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
Err(error) => {
let error_str = mcp_init_error_display(
server_name.as_str(),
auth_entry.as_ref(),
error,
);
McpStartupStatus::Failed { error: error_str }
}
};
let _ = emit_update(
submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status,
},
)
.await;
(server_name, outcome)
});
}
let manager = Self {
clients,
server_origins,
elicitation_requests: elicitation_requests.clone(),
};
tokio::spawn(async move {
let outcomes = join_set.join_all().await;
let mut summary = McpStartupCompleteEvent::default();
for (server_name, outcome) in outcomes {
match outcome {
Ok(_) => summary.ready.push(server_name),
Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name),
Err(StartupOutcomeError::Failed { error }) => {
summary.failed.push(McpStartupFailure {
server: server_name,
error,
})
}
}
}
let _ = tx_event
.send(Event {
id: startup_submit_id,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
});
(manager, cancel_token)
}
pub async fn resolve_elicitation(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.elicitation_requests
.resolve(server_name, id, response)
.await
}
pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool {
let Some(async_managed_client) = self.clients.get(server_name) else {
return false;
};
match tokio::time::timeout(timeout, async_managed_client.client()).await {
Ok(Ok(_)) => true,
Ok(Err(_)) | Err(_) => false,
}
}
pub async fn required_startup_failures(
&self,
required_servers: &[String],
) -> Vec<McpStartupFailure> {
let mut failures = Vec::new();
for server_name in required_servers {
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
failures.push(McpStartupFailure {
server: server_name.clone(),
error: format!("required MCP server `{server_name}` was not initialized"),
});
continue;
};
match async_managed_client.client().await {
Ok(_) => {}
Err(error) => failures.push(McpStartupFailure {
server: server_name.clone(),
error: startup_outcome_error_message(error),
}),
}
}
failures
}
/// Returns a single map that contains all tools. Each key is the
/// fully-qualified name for the tool.
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = Vec::new();
for managed_client in self.clients.values() {
let Some(server_tools) = managed_client.listed_tools().await else {
continue;
};
tools.extend(server_tools);
}
qualify_tools(tools)
}
/// Force-refresh codex apps tools by bypassing the in-process cache.
///
/// On success, the refreshed tools replace the cache contents and the
/// latest filtered tool map is returned directly to the caller. On
/// failure, the existing cache remains unchanged.
pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result<HashMap<String, ToolInfo>> {
let managed_client = self
.clients
.get(CODEX_APPS_MCP_SERVER_NAME)
.ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))?
.client()
.await
.context("failed to get client")?;
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
managed_client.tool_timeout,
managed_client.server_instructions.as_deref(),
)
.await
.with_context(|| {
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
})?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
CODEX_APPS_MCP_SERVER_NAME,
managed_client.codex_apps_tools_cache_context.as_ref(),
&tools,
);
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
let tools = filter_tools(tools, &managed_client.tool_filter)
.into_iter()
.map(|mut tool| {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
tool
});
Ok(qualify_tools(tools))
}
/// Returns a single map that contains all resources. Each key is the
/// server name and the value is a vector of resources.
pub async fn list_all_resources(&self) -> HashMap<String, Vec<Resource>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let timeout = managed_client.tool_timeout;
let client = managed_client.client.clone();
join_set.spawn(async move {
let mut collected: Vec<Resource> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resources(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name, Err(err)),
};
collected.extend(response.resources);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name,
Err(anyhow!("resources/list returned duplicate cursor")),
);
}
cursor = Some(next);
}
None => return (server_name, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<Resource>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(resources))) => {
aggregated.insert(server_name, resources);
}
Ok((server_name, Err(err))) => {
warn!("Failed to list resources for MCP server '{server_name}': {err:#}");
}
Err(err) => {
warn!("Task panic when listing resources for MCP server: {err:#}");
}
}
}
aggregated
}
/// Returns a single map that contains all resource templates. Each key is the
/// server name and the value is a vector of resource templates.
pub async fn list_all_resource_templates(&self) -> HashMap<String, Vec<ResourceTemplate>> {
let mut join_set = JoinSet::new();
let clients_snapshot = &self.clients;
for (server_name, async_managed_client) in clients_snapshot {
let server_name_cloned = server_name.clone();
let Ok(managed_client) = async_managed_client.client().await else {
continue;
};
let client = managed_client.client.clone();
let timeout = managed_client.tool_timeout;
join_set.spawn(async move {
let mut collected: Vec<ResourceTemplate> = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = cursor.as_ref().map(|next| PaginatedRequestParams {
meta: None,
cursor: Some(next.clone()),
});
let response = match client.list_resource_templates(params, timeout).await {
Ok(result) => result,
Err(err) => return (server_name_cloned, Err(err)),
};
collected.extend(response.resource_templates);
match response.next_cursor {
Some(next) => {
if cursor.as_ref() == Some(&next) {
return (
server_name_cloned,
Err(anyhow!(
"resources/templates/list returned duplicate cursor"
)),
);
}
cursor = Some(next);
}
None => return (server_name_cloned, Ok(collected)),
}
}
});
}
let mut aggregated: HashMap<String, Vec<ResourceTemplate>> = HashMap::new();
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok((server_name, Ok(templates))) => {
aggregated.insert(server_name, templates);
}
Ok((server_name, Err(err))) => {
warn!(
"Failed to list resource templates for MCP server '{server_name}': {err:#}"
);
}
Err(err) => {
warn!("Task panic when listing resource templates for MCP server: {err:#}");
}
}
}
aggregated
}
/// Invoke the tool indicated by the (server, tool) pair.
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
return Err(anyhow!(
"tool '{tool}' is disabled for MCP server '{server}'"
));
}
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
let content = result
.content
.into_iter()
.map(|content| {
serde_json::to_value(content)
.unwrap_or_else(|_| serde_json::Value::String("<content>".to_string()))
})
.collect();
Ok(CallToolResult {
content,
structured_content: result.structured_content,
is_error: result.is_error,
meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()),
})
}
pub async fn server_supports_sandbox_state_meta_capability(
&self,
server: &str,
) -> Result<bool> {
Ok(self
.client_by_name(server)
.await?
.server_supports_sandbox_state_meta_capability)
}
/// List resources from the specified server.
pub async fn list_resources(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourcesResult> {
let managed = self.client_by_name(server).await?;
let timeout = managed.tool_timeout;
managed
.client
.list_resources(params, timeout)
.await
.with_context(|| format!("resources/list failed for `{server}`"))
}
/// List resource templates from the specified server.
pub async fn list_resource_templates(
&self,
server: &str,
params: Option<PaginatedRequestParams>,
) -> Result<ListResourceTemplatesResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
client
.list_resource_templates(params, timeout)
.await
.with_context(|| format!("resources/templates/list failed for `{server}`"))
}
/// Read a resource from the specified server.
pub async fn read_resource(
&self,
server: &str,
params: ReadResourceRequestParams,
) -> Result<ReadResourceResult> {
let managed = self.client_by_name(server).await?;
let client = managed.client.clone();
let timeout = managed.tool_timeout;
let uri = params.uri.clone();
client
.read_resource(params, timeout)
.await
.with_context(|| format!("resources/read failed for `{server}` ({uri})"))
}
pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
let all_tools = self.list_all_tools().await;
all_tools
.into_values()
.find(|tool| tool.canonical_tool_name() == *tool_name)
}
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
self.clients
.get(name)
.ok_or_else(|| anyhow!("unknown MCP server '{name}'"))?
.client()
.await
.context("failed to get client")
}
}
async fn emit_update(
submit_id: &str,
tx_event: &Sender<Event>,
update: McpStartupUpdateEvent,
) -> Result<(), async_channel::SendError<Event>> {
tx_event
.send(Event {
id: submit_id.to_string(),
msg: EventMsg::McpStartupUpdate(update),
})
.await
}
fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
match transport {
McpServerTransportConfig::StreamableHttp { url, .. } => {
let parsed = Url::parse(url).ok()?;
Some(parsed.origin().ascii_serialization())
}
McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()),
}
}
fn mcp_init_error_display(
server_name: &str,
entry: Option<&McpAuthStatusEntry>,
err: &StartupOutcomeError,
) -> String {
if let Some(McpServerTransportConfig::StreamableHttp {
url,
bearer_token_env_var,
http_headers,
..
}) = &entry.map(|entry| &entry.config.transport)
&& url == "https://api.githubcopilot.com/mcp/"
&& bearer_token_env_var.is_none()
&& http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true)
{
format!(
"GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN"
)
} else if is_mcp_client_auth_required_error(err) {
format!(
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`."
)
} else if is_mcp_client_startup_timeout_error(err) {
let startup_timeout_secs = match entry {
Some(entry) => match entry.config.startup_timeout_sec {
Some(timeout) => timeout,
None => DEFAULT_STARTUP_TIMEOUT,
},
None => DEFAULT_STARTUP_TIMEOUT,
}
.as_secs();
format!(
"MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX"
)
} else {
format!("MCP client for `{server_name}` failed to start: {err:#}")
}
}
fn startup_outcome_error_message(error: StartupOutcomeError) -> String {
match error {
StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(),
StartupOutcomeError::Failed { error } => error,
}
}
fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool {
match error {
StartupOutcomeError::Failed { error } => error.contains("Auth required"),
_ => false,
}
}
fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool {
match error {
StartupOutcomeError::Failed { error } => {
error.contains("request timed out")
|| error.contains("timed out handshaking with MCP server")
}
_ => false,
}
}
#[cfg(test)]
#[path = "connection_manager_tests.rs"]
mod tests;

View File

@@ -1,12 +1,36 @@
use super::*;
use crate::codex_apps::CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION;
use crate::codex_apps::CodexAppsToolsCacheContext;
use crate::codex_apps::load_startup_cached_codex_apps_tools_snapshot;
use crate::codex_apps::read_cached_codex_apps_tools;
use crate::codex_apps::write_cached_codex_apps_tools;
use crate::declared_openai_file_input_param_names;
use crate::elicitation::ElicitationRequestManager;
use crate::elicitation::elicitation_is_rejected_by_policy;
use crate::rmcp_client::AsyncManagedClient;
use crate::rmcp_client::ManagedClient;
use crate::rmcp_client::StartupOutcomeError;
use crate::rmcp_client::elicitation_capability_for_server;
use crate::tools::ToolFilter;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::qualify_tools;
use crate::tools::tool_with_model_visible_input_schema;
use codex_config::Constrained;
use codex_protocol::ToolName;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::McpAuthStatus;
use futures::FutureExt;
use pretty_assertions::assert_eq;
use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::JsonObject;
use rmcp::model::Meta;
use rmcp::model::NumberOrString;
use rmcp::model::Tool;
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::tempdir;

View File

@@ -0,0 +1,190 @@
//! MCP elicitation request tracking and policy handling.
//!
//! RMCP clients call into this module when a server asks Codex to elicit data
//! from the user. It decides whether the request can be automatically accepted,
//! must be declined by policy, or should be surfaced as a Codex protocol event
//! and later resolved through the stored responder.
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use crate::mcp::mcp_permission_prompt_is_auto_approved;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_protocol::approvals::ElicitationRequest;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::mcp::RequestId as ProtocolRequestId;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::SendElicitation;
use futures::future::FutureExt;
use rmcp::model::CreateElicitationRequestParams;
use rmcp::model::ElicitationAction;
use rmcp::model::RequestId;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
#[derive(Clone)]
pub(crate) struct ElicitationRequestManager {
requests: Arc<Mutex<ResponderMap>>,
pub(crate) approval_policy: Arc<StdMutex<AskForApproval>>,
pub(crate) permission_profile: Arc<StdMutex<PermissionProfile>>,
}
impl ElicitationRequestManager {
pub(crate) fn new(
approval_policy: AskForApproval,
permission_profile: PermissionProfile,
) -> Self {
Self {
requests: Arc::new(Mutex::new(HashMap::new())),
approval_policy: Arc::new(StdMutex::new(approval_policy)),
permission_profile: Arc::new(StdMutex::new(permission_profile)),
}
}
pub(crate) async fn resolve(
&self,
server_name: String,
id: RequestId,
response: ElicitationResponse,
) -> Result<()> {
self.requests
.lock()
.await
.remove(&(server_name, id))
.ok_or_else(|| anyhow!("elicitation request not found"))?
.send(response)
.map_err(|e| anyhow!("failed to send elicitation response: {e:?}"))
}
pub(crate) fn make_sender(
&self,
server_name: String,
tx_event: Sender<Event>,
) -> SendElicitation {
let elicitation_requests = self.requests.clone();
let approval_policy = self.approval_policy.clone();
let permission_profile = self.permission_profile.clone();
Box::new(move |id, elicitation| {
let elicitation_requests = elicitation_requests.clone();
let tx_event = tx_event.clone();
let server_name = server_name.clone();
let approval_policy = approval_policy.clone();
let permission_profile = permission_profile.clone();
async move {
let approval_policy = approval_policy
.lock()
.map(|policy| *policy)
.unwrap_or(AskForApproval::Never);
let permission_profile = permission_profile
.lock()
.map(|profile| profile.clone())
.unwrap_or_default();
if mcp_permission_prompt_is_auto_approved(approval_policy, &permission_profile)
&& can_auto_accept_elicitation(&elicitation)
{
return Ok(ElicitationResponse {
action: ElicitationAction::Accept,
content: Some(serde_json::json!({})),
meta: None,
});
}
if elicitation_is_rejected_by_policy(approval_policy) {
return Ok(ElicitationResponse {
action: ElicitationAction::Decline,
content: None,
meta: None,
});
}
let request = match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
meta,
message,
requested_schema,
} => ElicitationRequest::Form {
meta: meta
.map(serde_json::to_value)
.transpose()
.context("failed to serialize MCP elicitation metadata")?,
message,
requested_schema: serde_json::to_value(requested_schema)
.context("failed to serialize MCP elicitation schema")?,
},
CreateElicitationRequestParams::UrlElicitationParams {
meta,
message,
url,
elicitation_id,
} => ElicitationRequest::Url {
meta: meta
.map(serde_json::to_value)
.transpose()
.context("failed to serialize MCP elicitation metadata")?,
message,
url,
elicitation_id,
},
};
let (tx, rx) = oneshot::channel();
{
let mut lock = elicitation_requests.lock().await;
lock.insert((server_name.clone(), id.clone()), tx);
}
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
turn_id: None,
server_name,
id: match id.clone() {
rmcp::model::NumberOrString::String(value) => {
ProtocolRequestId::String(value.to_string())
}
rmcp::model::NumberOrString::Number(value) => {
ProtocolRequestId::Integer(value)
}
},
request,
}),
})
.await;
rx.await
.context("elicitation request channel closed unexpectedly")
}
.boxed()
})
}
}
pub(crate) fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool {
match approval_policy {
AskForApproval::Never => true,
AskForApproval::OnFailure => false,
AskForApproval::OnRequest => false,
AskForApproval::UnlessTrusted => false,
AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(),
}
}
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool {
match elicitation {
CreateElicitationRequestParams::FormElicitationParams {
requested_schema, ..
} => {
// Auto-accept confirm/approval elicitations without schema requirements.
requested_schema.properties.is_empty()
}
CreateElicitationRequestParams::UrlElicitationParams { .. } => false,
}
}

View File

@@ -1,15 +1,15 @@
pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use mcp_connection_manager::McpConnectionManager;
pub use mcp_connection_manager::McpRuntimeEnvironment;
pub use mcp_connection_manager::SandboxState;
pub use mcp_connection_manager::ToolInfo;
pub use connection_manager::McpConnectionManager;
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use runtime::McpRuntimeEnvironment;
pub use runtime::SandboxState;
pub use tools::ToolInfo;
pub use mcp::CODEX_APPS_MCP_SERVER_NAME;
pub use mcp::McpConfig;
pub use mcp::ToolPluginProvenance;
pub use mcp_connection_manager::CodexAppsToolsCacheKey;
pub use mcp_connection_manager::codex_apps_tools_cache_key;
pub use codex_apps::CodexAppsToolsCacheKey;
pub use codex_apps::codex_apps_tools_cache_key;
pub use mcp::configured_mcp_servers;
pub use mcp::effective_mcp_servers;
@@ -33,11 +33,15 @@ pub use mcp::oauth_login_support;
pub use mcp::resolve_oauth_scopes;
pub use mcp::should_retry_without_scopes;
pub use codex_apps::filter_non_codex_apps_mcp_tools_only;
pub use mcp::mcp_permission_prompt_is_auto_approved;
pub use mcp::qualified_mcp_tool_name_prefix;
pub use mcp_connection_manager::declared_openai_file_input_param_names;
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
pub use tools::declared_openai_file_input_param_names;
pub(crate) mod codex_apps;
pub(crate) mod connection_manager;
pub(crate) mod elicitation;
pub(crate) mod mcp;
pub(crate) mod mcp_connection_manager;
pub(crate) mod mcp_tool_names;
pub(crate) mod rmcp_client;
pub(crate) mod runtime;
pub(crate) mod tools;

View File

@@ -34,9 +34,9 @@ use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use serde_json::Value;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::McpRuntimeEnvironment;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
use crate::codex_apps::codex_apps_tools_cache_key;
use crate::connection_manager::McpConnectionManager;
use crate::runtime::McpRuntimeEnvironment;
pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const MCP_TOOL_NAME_PREFIX: &str = "mcp";

View File

@@ -64,6 +64,7 @@ fn mcp_prompt_auto_approval_honors_unrestricted_managed_profiles() {
},
));
}
#[test]
fn tool_plugin_provenance_collects_app_and_mcp_sources() {
let provenance = ToolPluginProvenance::from_capability_summaries(&[

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,591 @@
//! RMCP client lifecycle for MCP server connections.
//!
//! This module owns startup of individual RMCP clients: building the transport,
//! initializing the server, listing raw tools, applying per-server tool filters,
//! and exposing cached startup snapshots while a client is still connecting.
//! Higher-level aggregation and resource/tool APIs live in
//! [`crate::connection_manager`].
use std::borrow::Cow;
use std::collections::HashMap;
use std::env;
use std::ffi::OsString;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use crate::codex_apps::CachedCodexAppsToolsLoad;
use crate::codex_apps::CodexAppsToolsCacheContext;
use crate::codex_apps::filter_disallowed_codex_apps_tools;
use crate::codex_apps::load_cached_codex_apps_tools;
use crate::codex_apps::load_startup_cached_codex_apps_tools_snapshot;
use crate::codex_apps::normalize_codex_apps_callable_name;
use crate::codex_apps::normalize_codex_apps_callable_namespace;
use crate::codex_apps::normalize_codex_apps_tool_title;
use crate::codex_apps::write_cached_codex_apps_tools_if_needed;
use crate::elicitation::ElicitationRequestManager;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::ToolPluginProvenance;
use crate::runtime::McpRuntimeEnvironment;
use crate::runtime::emit_duration;
use crate::tools::ToolFilter;
use crate::tools::ToolInfo;
use crate::tools::filter_tools;
use crate::tools::tool_with_model_visible_input_schema;
use anyhow::Result;
use anyhow::anyhow;
use async_channel::Sender;
use codex_api::SharedAuthProvider;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::HttpClient;
use codex_exec_server::ReqwestHttpClient;
use codex_protocol::protocol::Event;
use codex_rmcp_client::ExecutorStdioServerLauncher;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::StdioServerLauncher;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::Shared;
use rmcp::model::ClientCapabilities;
use rmcp::model::ElicitationCapability;
use rmcp::model::FormElicitationCapability;
use rmcp::model::Implementation;
use rmcp::model::InitializeRequestParams;
use rmcp::model::ProtocolVersion;
use tokio_util::sync::CancellationToken;
/// MCP server capability indicating that Codex should include [`SandboxState`]
/// in tool-call request `_meta` under this key.
pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
"codex.mcp.tools.fetch_uncached.duration_ms";
pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Clone)]
pub(crate) struct ManagedClient {
pub(crate) client: Arc<RmcpClient>,
pub(crate) tools: Vec<ToolInfo>,
pub(crate) tool_filter: ToolFilter,
pub(crate) tool_timeout: Option<Duration>,
pub(crate) server_instructions: Option<String>,
pub(crate) server_supports_sandbox_state_meta_capability: bool,
pub(crate) codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
impl ManagedClient {
fn listed_tools(&self) -> Vec<ToolInfo> {
let total_start = Instant::now();
if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref()
&& let CachedCodexAppsToolsLoad::Hit(tools) =
load_cached_codex_apps_tools(cache_context)
{
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "hit")],
);
return filter_tools(tools, &self.tool_filter);
}
if self.codex_apps_tools_cache_context.is_some() {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "miss")],
);
}
self.tools.clone()
}
}
#[derive(Clone)]
pub(crate) struct AsyncManagedClient {
pub(crate) client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
pub(crate) startup_snapshot: Option<Vec<ToolInfo>>,
pub(crate) startup_complete: Arc<AtomicBool>,
pub(crate) tool_plugin_provenance: Arc<ToolPluginProvenance>,
}
impl AsyncManagedClient {
// Keep this constructor flat so the startup inputs remain readable at the
// single call site instead of introducing a one-off params wrapper.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
server_name: String,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
cancel_token: CancellationToken,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
) -> Self {
let tool_filter = ToolFilter::from_config(&config);
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
&server_name,
codex_apps_tools_cache_context.as_ref(),
)
.map(|tools| filter_tools(tools, &tool_filter));
let startup_tool_filter = tool_filter;
let startup_complete = Arc::new(AtomicBool::new(false));
let startup_complete_for_fut = Arc::clone(&startup_complete);
let fut = async move {
let outcome = async {
if let Err(error) = validate_mcp_server_name(&server_name) {
return Err(error.into());
}
let client = Arc::new(
make_rmcp_client(
&server_name,
config.clone(),
store_mode,
runtime_environment,
runtime_auth_provider,
)
.await?,
);
match start_server_task(
server_name,
client,
StartServerTaskParams {
startup_timeout: config
.startup_timeout_sec
.or(Some(DEFAULT_STARTUP_TIMEOUT)),
tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
tool_filter: startup_tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
},
)
.or_cancel(&cancel_token)
.await
{
Ok(result) => result,
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
}
}
.await;
startup_complete_for_fut.store(true, Ordering::Release);
outcome
};
let client = fut.boxed().shared();
if startup_snapshot.is_some() {
let startup_task = client.clone();
tokio::spawn(async move {
let _ = startup_task.await;
});
}
Self {
client,
startup_snapshot,
startup_complete,
tool_plugin_provenance,
}
}
pub(crate) async fn client(&self) -> Result<ManagedClient, StartupOutcomeError> {
self.client.clone().await
}
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
if !self.startup_complete.load(Ordering::Acquire) {
return self.startup_snapshot.clone();
}
None
}
pub(crate) async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
let annotate_tools = |tools: Vec<ToolInfo>| {
let mut tools = tools;
for tool in &mut tools {
if tool.server_name == CODEX_APPS_MCP_SERVER_NAME {
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
}
let plugin_names = match tool.connector_id.as_deref() {
Some(connector_id) => self
.tool_plugin_provenance
.plugin_display_names_for_connector_id(connector_id),
None => self
.tool_plugin_provenance
.plugin_display_names_for_mcp_server_name(tool.server_name.as_str()),
};
tool.plugin_display_names = plugin_names.to_vec();
if plugin_names.is_empty() {
continue;
}
let plugin_source_note = if plugin_names.len() == 1 {
format!("This tool is part of plugin `{}`.", plugin_names[0])
} else {
format!(
"This tool is part of plugins {}.",
plugin_names
.iter()
.map(|plugin_name| format!("`{plugin_name}`"))
.collect::<Vec<_>>()
.join(", ")
)
};
let description = tool
.tool
.description
.as_deref()
.map(str::trim)
.unwrap_or("");
let annotated_description = if description.is_empty() {
plugin_source_note
} else if matches!(description.chars().last(), Some('.' | '!' | '?')) {
format!("{description} {plugin_source_note}")
} else {
format!("{description}. {plugin_source_note}")
};
tool.tool.description = Some(Cow::Owned(annotated_description));
}
tools
};
// Keep cache payloads raw; plugin provenance is resolved per-session at read time.
let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() {
Some(startup_tools)
} else {
match self.client().await {
Ok(client) => Some(client.listed_tools()),
Err(_) => self.startup_snapshot.clone(),
}
};
tools.map(annotate_tools)
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub(crate) enum StartupOutcomeError {
#[error("MCP startup cancelled")]
Cancelled,
// We can't store the original error here because anyhow::Error doesn't implement
// `Clone`.
#[error("MCP startup failed: {error}")]
Failed { error: String },
}
impl From<anyhow::Error> for StartupOutcomeError {
fn from(error: anyhow::Error) -> Self {
Self::Failed {
error: error.to_string(),
}
}
}
pub(crate) fn elicitation_capability_for_server(
_server_name: &str,
) -> Option<ElicitationCapability> {
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
Some(ElicitationCapability {
form: Some(FormElicitationCapability {
schema_validation: None,
}),
url: None,
})
}
pub(crate) async fn list_tools_for_client_uncached(
server_name: &str,
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
server_instructions: Option<&str>,
) -> Result<Vec<ToolInfo>> {
let resp = client
.list_tools_with_connector_ids(/*params*/ None, timeout)
.await?;
let tools = resp
.tools
.into_iter()
.map(|tool| {
let callable_name = normalize_codex_apps_callable_name(
server_name,
&tool.tool.name,
tool.connector_id.as_deref(),
tool.connector_name.as_deref(),
);
let callable_namespace = normalize_codex_apps_callable_namespace(
server_name,
tool.connector_name.as_deref(),
);
let connector_name = tool.connector_name;
let connector_description = tool.connector_description;
let mut tool_def = tool.tool;
if let Some(title) = tool_def.title.as_deref() {
let normalized_title =
normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title);
if tool_def.title.as_deref() != Some(normalized_title.as_str()) {
tool_def.title = Some(normalized_title);
}
}
ToolInfo {
server_name: server_name.to_owned(),
callable_name,
callable_namespace,
server_instructions: server_instructions.map(str::to_string),
tool: tool_def,
connector_id: tool.connector_id,
connector_name,
plugin_display_names: Vec::new(),
connector_description,
}
})
.collect();
if server_name == CODEX_APPS_MCP_SERVER_NAME {
return Ok(filter_disallowed_codex_apps_tools(tools));
}
Ok(tools)
}
fn resolve_bearer_token(
server_name: &str,
bearer_token_env_var: Option<&str>,
) -> Result<Option<String>> {
let Some(env_var) = bearer_token_env_var else {
return Ok(None);
};
match env::var(env_var) {
Ok(value) => {
if value.is_empty() {
Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' is empty"
))
} else {
Ok(Some(value))
}
}
Err(env::VarError::NotPresent) => Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' is not set"
)),
Err(env::VarError::NotUnicode(_)) => Err(anyhow!(
"Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode"
)),
}
}
fn validate_mcp_server_name(server_name: &str) -> Result<()> {
let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?;
if !re.is_match(server_name) {
return Err(anyhow!(
"Invalid MCP server name '{server_name}': must match pattern {pattern}",
pattern = re.as_str()
));
}
Ok(())
}
async fn start_server_task(
server_name: String,
client: Arc<RmcpClient>,
params: StartServerTaskParams,
) -> Result<ManagedClient, StartupOutcomeError> {
let StartServerTaskParams {
startup_timeout,
tool_timeout,
tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
experimental: None,
extensions: None,
roots: None,
sampling: None,
elicitation,
tasks: None,
},
client_info: Implementation {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".into()),
description: None,
icons: None,
website_url: None,
},
protocol_version: ProtocolVersion::V_2025_06_18,
};
let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event);
let initialize_result = client
.initialize(params, startup_timeout, send_elicitation)
.await
.map_err(StartupOutcomeError::from)?;
let server_supports_sandbox_state_meta_capability = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY))
.is_some();
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
&server_name,
&client,
startup_timeout,
initialize_result.instructions.as_deref(),
)
.await
.map_err(StartupOutcomeError::from)?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
&server_name,
codex_apps_tools_cache_context.as_ref(),
&tools,
);
if server_name == CODEX_APPS_MCP_SERVER_NAME {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
}
let tools = filter_tools(tools, &tool_filter);
let managed = ManagedClient {
client: Arc::clone(&client),
tools,
tool_timeout: Some(tool_timeout),
tool_filter,
server_instructions: initialize_result.instructions,
server_supports_sandbox_state_meta_capability,
codex_apps_tools_cache_context,
};
Ok(managed)
}
struct StartServerTaskParams {
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
tool_timeout: Duration,
tool_filter: ToolFilter,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
async fn make_rmcp_client(
server_name: &str,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
runtime_environment: McpRuntimeEnvironment,
runtime_auth_provider: Option<SharedAuthProvider>,
) -> Result<RmcpClient, StartupOutcomeError> {
let McpServerConfig {
transport,
experimental_environment,
..
} = config;
let remote_environment = match experimental_environment.as_deref() {
None | Some("local") => false,
Some("remote") => {
if !runtime_environment.environment().is_remote() {
return Err(StartupOutcomeError::from(anyhow!(
"remote MCP server `{server_name}` requires a remote environment"
)));
}
true
}
Some(environment) => {
return Err(StartupOutcomeError::from(anyhow!(
"unsupported experimental_environment `{environment}` for MCP server `{server_name}`"
)));
}
};
match transport {
McpServerTransportConfig::Stdio {
command,
args,
env,
env_vars,
cwd,
} => {
let command_os: OsString = command.into();
let args_os: Vec<OsString> = args.into_iter().map(Into::into).collect();
let env_os = env.map(|env| {
env.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect::<HashMap<_, _>>()
});
let launcher = if remote_environment {
Arc::new(ExecutorStdioServerLauncher::new(
runtime_environment.environment().get_exec_backend(),
runtime_environment.fallback_cwd(),
))
} else {
Arc::new(LocalStdioServerLauncher::new(
runtime_environment.fallback_cwd(),
)) as Arc<dyn StdioServerLauncher>
};
// `RmcpClient` always sees a launched MCP stdio server. The
// launcher hides whether that means a local child process or an
// executor process whose stdin/stdout bytes cross the process API.
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
}
McpServerTransportConfig::StreamableHttp {
url,
http_headers,
env_http_headers,
bearer_token_env_var,
} => {
let http_client: Arc<dyn HttpClient> = if remote_environment {
runtime_environment.environment().get_http_client()
} else {
Arc::new(ReqwestHttpClient)
};
let resolved_bearer_token =
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
Ok(token) => token,
Err(error) => return Err(error.into()),
};
RmcpClient::new_streamable_http_client(
server_name,
&url,
resolved_bearer_token,
http_headers,
env_http_headers,
store_mode,
http_client,
runtime_auth_provider,
)
.await
.map_err(StartupOutcomeError::from)
}
}
}

View File

@@ -0,0 +1,66 @@
//! Runtime support for Model Context Protocol (MCP) servers.
//!
//! This module contains data that describes the runtime environment in which MCP
//! servers execute, plus the sandbox state payload sent to capable servers and a
//! tiny shared metrics helper. Transport startup and orchestration live in
//! [`crate::rmcp_client`] and [`crate::connection_manager`].
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_exec_server::Environment;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::SandboxPolicy;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SandboxState {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub permission_profile: Option<PermissionProfile>,
pub sandbox_policy: SandboxPolicy,
pub codex_linux_sandbox_exe: Option<PathBuf>,
pub sandbox_cwd: PathBuf,
#[serde(default)]
pub use_legacy_landlock: bool,
}
/// Runtime placement information used when starting MCP server transports.
///
/// `McpConfig` describes what servers exist. This value describes where those
/// servers should run for the current caller. Keep it explicit at manager
/// construction time so status/snapshot paths and real sessions make the same
/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is
/// used when a stdio server omits `cwd` and the launcher needs a concrete
/// process working directory.
#[derive(Clone)]
pub struct McpRuntimeEnvironment {
environment: Arc<Environment>,
fallback_cwd: PathBuf,
}
impl McpRuntimeEnvironment {
pub fn new(environment: Arc<Environment>, fallback_cwd: PathBuf) -> Self {
Self {
environment,
fallback_cwd,
}
}
pub(crate) fn environment(&self) -> Arc<Environment> {
Arc::clone(&self.environment)
}
pub(crate) fn fallback_cwd(&self) -> PathBuf {
self.fallback_cwd.clone()
}
}
pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
if let Some(metrics) = codex_otel::global() {
let _ = metrics.record_duration(metric, duration, tags);
}
}

View File

@@ -1,18 +1,134 @@
//! Allocates model-visible MCP tool names while preserving raw MCP identities.
//! MCP tool metadata, filtering, schema shaping, and name qualification.
//!
//! Raw MCP tool identities must be preserved for protocol calls, while
//! model-visible tool names must be sanitized, deduplicated, and kept within API
//! limits. This module owns that translation as well as the shared [`ToolInfo`]
//! type and helpers that adjust tool schemas before exposing them to the model.
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use codex_config::McpServerConfig;
use codex_protocol::ToolName;
use rmcp::model::Tool;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value as JsonValue;
use sha1::Digest;
use sha1::Sha1;
use tracing::warn;
use crate::mcp::sanitize_responses_api_tool_name;
use crate::mcp_connection_manager::ToolInfo;
const MCP_TOOL_NAME_DELIMITER: &str = "__";
const MAX_TOOL_NAME_LENGTH: usize = 64;
const CALLABLE_NAME_HASH_LEN: usize = 12;
pub(crate) const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str =
"codex.mcp.tools.cache_write.duration_ms";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolInfo {
/// Raw MCP server name used for routing the tool call.
pub server_name: String,
/// Model-visible tool name used in Responses API tool declarations.
#[serde(rename = "tool_name", alias = "callable_name")]
pub callable_name: String,
/// Model-visible namespace used for deferred tool loading.
#[serde(rename = "tool_namespace", alias = "callable_namespace")]
pub callable_namespace: String,
/// Instructions from the MCP server initialize result.
#[serde(default)]
pub server_instructions: Option<String>,
/// Raw MCP tool definition; `tool.name` is sent back to the MCP server.
pub tool: Tool,
pub connector_id: Option<String>,
pub connector_name: Option<String>,
#[serde(default)]
pub plugin_display_names: Vec<String>,
pub connector_description: Option<String>,
}
impl ToolInfo {
pub fn canonical_tool_name(&self) -> ToolName {
ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone())
}
}
pub fn declared_openai_file_input_param_names(
meta: Option<&Map<String, JsonValue>>,
) -> Vec<String> {
let Some(meta) = meta else {
return Vec::new();
};
meta.get(META_OPENAI_FILE_PARAMS)
.and_then(JsonValue::as_array)
.into_iter()
.flatten()
.filter_map(JsonValue::as_str)
.filter(|value| !value.is_empty())
.map(str::to_string)
.collect()
}
/// A tool is allowed to be used if both are true:
/// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled.
/// 2. The tool is not explicitly disabled.
#[derive(Default, Clone)]
pub(crate) struct ToolFilter {
pub(crate) enabled: Option<HashSet<String>>,
pub(crate) disabled: HashSet<String>,
}
impl ToolFilter {
pub(crate) fn from_config(cfg: &McpServerConfig) -> Self {
let enabled = cfg
.enabled_tools
.as_ref()
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>());
let disabled = cfg
.disabled_tools
.as_ref()
.map(|tools| tools.iter().cloned().collect::<HashSet<_>>())
.unwrap_or_default();
Self { enabled, disabled }
}
pub(crate) fn allows(&self, tool_name: &str) -> bool {
if let Some(enabled) = &self.enabled
&& !enabled.contains(tool_name)
{
return false;
}
!self.disabled.contains(tool_name)
}
}
/// Returns the model-visible view of a tool while preserving the raw metadata
/// used by execution. Keep cache entries raw and call this at manager return
/// boundaries.
pub(crate) fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
let file_params = declared_openai_file_input_param_names(tool.meta.as_deref());
if file_params.is_empty() {
return tool.clone();
}
let mut tool = tool.clone();
let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone());
mask_input_schema_for_file_path_params(&mut input_schema, &file_params);
if let JsonValue::Object(input_schema) = input_schema {
tool.input_schema = Arc::new(input_schema);
}
tool
}
pub(crate) fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| filter.allows(&tool.tool.name))
.collect()
}
/// Returns a qualified-name lookup for MCP tools.
///
@@ -121,6 +237,57 @@ struct CallableToolCandidate {
callable_name: String,
}
const MCP_TOOL_NAME_DELIMITER: &str = "__";
const MAX_TOOL_NAME_LENGTH: usize = 64;
const CALLABLE_NAME_HASH_LEN: usize = 12;
const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams";
fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) {
let Some(properties) = input_schema
.as_object_mut()
.and_then(|schema| schema.get_mut("properties"))
.and_then(JsonValue::as_object_mut)
else {
return;
};
for field_name in file_params {
let Some(property_schema) = properties.get_mut(field_name) else {
continue;
};
mask_input_property_schema(property_schema);
}
}
fn mask_input_property_schema(schema: &mut JsonValue) {
let Some(object) = schema.as_object_mut() else {
return;
};
let mut description = object
.get("description")
.and_then(JsonValue::as_str)
.map(str::to_string)
.unwrap_or_default();
let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here.";
if description.is_empty() {
description = guidance.to_string();
} else if !description.contains(guidance) {
description = format!("{description} {guidance}");
}
let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array")
|| object.get("items").is_some();
object.clear();
object.insert("description".to_string(), JsonValue::String(description));
if is_array {
object.insert("type".to_string(), JsonValue::String("array".to_string()));
object.insert("items".to_string(), serde_json::json!({ "type": "string" }));
} else {
object.insert("type".to_string(), JsonValue::String("string".to_string()));
}
}
fn sha1_hex(s: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(s.as_bytes());