diff --git a/codex-rs/codex-mcp/src/codex_apps.rs b/codex-rs/codex-mcp/src/codex_apps.rs new file mode 100644 index 0000000000..0a7981fb0d --- /dev/null +++ b/codex-rs/codex-mcp/src/codex_apps.rs @@ -0,0 +1,258 @@ +//! Codex Apps support for the built-in apps MCP server. +//! +//! This module owns the pieces that are unique to ChatGPT-hosted app +//! connectors: cache scoping by authenticated user, disk cache reads/writes, +//! connector allow-list filtering, and the normalization that turns app +//! connector/tool metadata into model-visible MCP callable names. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::Instant; + +use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; +use crate::runtime::emit_duration; +use crate::tools::MCP_TOOLS_CACHE_WRITE_DURATION_METRIC; +use crate::tools::ToolInfo; +use codex_login::CodexAuth; +use codex_utils_plugins::mcp_connector::is_connector_id_allowed; +use codex_utils_plugins::mcp_connector::sanitize_name; +use serde::Deserialize; +use serde::Serialize; +use sha1::Digest; +use sha1::Sha1; + +pub(crate) const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CodexAppsToolsCacheKey { + pub(crate) account_id: Option, + pub(crate) chatgpt_user_id: Option, + pub(crate) is_workspace_account: bool, +} + +pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey { + CodexAppsToolsCacheKey { + account_id: auth.and_then(CodexAuth::get_account_id), + chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id), + is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account), + } +} + +pub fn filter_non_codex_apps_mcp_tools_only( + mcp_tools: &HashMap, +) -> HashMap { + mcp_tools + .iter() + .filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME) + .map(|(name, tool)| (name.clone(), tool.clone())) + .collect() +} + +#[derive(Clone)] +pub(crate) struct CodexAppsToolsCacheContext { + pub(crate) codex_home: PathBuf, + pub(crate) user_key: CodexAppsToolsCacheKey, +} + +impl CodexAppsToolsCacheContext { + pub(crate) fn cache_path(&self) -> PathBuf { + let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default(); + let user_key_hash = sha1_hex(&user_key_json); + self.codex_home + .join(CODEX_APPS_TOOLS_CACHE_DIR) + .join(format!("{user_key_hash}.json")) + } +} + +pub(crate) enum CachedCodexAppsToolsLoad { + Hit(Vec), + Missing, + Invalid, +} + +pub(crate) fn normalize_codex_apps_tool_title( + server_name: &str, + connector_name: Option<&str>, + value: &str, +) -> String { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return value.to_string(); + } + + let Some(connector_name) = connector_name + .map(str::trim) + .filter(|name| !name.is_empty()) + else { + return value.to_string(); + }; + + let prefix = format!("{connector_name}_"); + if let Some(stripped) = value.strip_prefix(&prefix) + && !stripped.is_empty() + { + return stripped.to_string(); + } + + value.to_string() +} + +pub(crate) fn normalize_codex_apps_callable_name( + server_name: &str, + tool_name: &str, + connector_id: Option<&str>, + connector_name: Option<&str>, +) -> String { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return tool_name.to_string(); + } + + let tool_name = sanitize_name(tool_name); + + if let Some(connector_name) = connector_name + .map(str::trim) + .map(sanitize_name) + .filter(|name| !name.is_empty()) + && let Some(stripped) = tool_name.strip_prefix(&connector_name) + && !stripped.is_empty() + { + return stripped.to_string(); + } + + if let Some(connector_id) = connector_id + .map(str::trim) + .map(sanitize_name) + .filter(|name| !name.is_empty()) + && let Some(stripped) = tool_name.strip_prefix(&connector_id) + && !stripped.is_empty() + { + return stripped.to_string(); + } + + tool_name +} + +pub(crate) fn normalize_codex_apps_callable_namespace( + server_name: &str, + connector_name: Option<&str>, +) -> String { + if server_name == CODEX_APPS_MCP_SERVER_NAME + && let Some(connector_name) = connector_name + { + format!("mcp__{}__{}", server_name, sanitize_name(connector_name)) + } else { + format!("mcp__{server_name}__") + } +} + +pub(crate) fn write_cached_codex_apps_tools_if_needed( + server_name: &str, + cache_context: Option<&CodexAppsToolsCacheContext>, + tools: &[ToolInfo], +) { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return; + } + + if let Some(cache_context) = cache_context { + let cache_write_start = Instant::now(); + write_cached_codex_apps_tools(cache_context, tools); + emit_duration( + MCP_TOOLS_CACHE_WRITE_DURATION_METRIC, + cache_write_start.elapsed(), + &[], + ); + } +} + +pub(crate) fn load_startup_cached_codex_apps_tools_snapshot( + server_name: &str, + cache_context: Option<&CodexAppsToolsCacheContext>, +) -> Option> { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return None; + } + + let cache_context = cache_context?; + + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, + } +} + +#[cfg(test)] +pub(crate) fn read_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> Option> { + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, + } +} + +pub(crate) fn load_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> CachedCodexAppsToolsLoad { + let cache_path = cache_context.cache_path(); + let bytes = match std::fs::read(cache_path) { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return CachedCodexAppsToolsLoad::Missing; + } + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) { + Ok(cache) => cache, + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION { + return CachedCodexAppsToolsLoad::Invalid; + } + CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools)) +} + +pub(crate) fn write_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, + tools: &[ToolInfo], +) { + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() + && std::fs::create_dir_all(parent).is_err() + { + return; + } + let tools = filter_disallowed_codex_apps_tools(tools.to_vec()); + let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache { + schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION, + tools, + }) else { + return; + }; + let _ = std::fs::write(cache_path, bytes); +} + +pub(crate) fn filter_disallowed_codex_apps_tools(tools: Vec) -> Vec { + tools + .into_iter() + .filter(|tool| { + tool.connector_id + .as_deref() + .is_none_or(is_connector_id_allowed) + }) + .collect() +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CodexAppsToolsDiskCache { + schema_version: u8, + tools: Vec, +} + +const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools"; + +fn sha1_hex(s: &str) -> String { + let mut hasher = Sha1::new(); + hasher.update(s.as_bytes()); + let sha1 = hasher.finalize(); + format!("{sha1:x}") +} diff --git a/codex-rs/codex-mcp/src/connection_manager.rs b/codex-rs/codex-mcp/src/connection_manager.rs new file mode 100644 index 0000000000..9bbcbe12e7 --- /dev/null +++ b/codex-rs/codex-mcp/src/connection_manager.rs @@ -0,0 +1,700 @@ +//! Aggregates MCP server connections for Codex. +//! +//! [`McpConnectionManager`] owns the set of running async RMCP clients keyed by +//! MCP server name. It coordinates startup status events, keeps server origin +//! metadata, aggregates tools/resources/templates across servers, routes tool +//! calls to the right client, and exposes the public manager API used by +//! `codex-core`. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use crate::McpAuthStatusEntry; +use crate::codex_apps::CodexAppsToolsCacheContext; +use crate::codex_apps::CodexAppsToolsCacheKey; +use crate::codex_apps::write_cached_codex_apps_tools_if_needed; +use crate::elicitation::ElicitationRequestManager; +use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; +use crate::mcp::ToolPluginProvenance; +use crate::rmcp_client::AsyncManagedClient; +use crate::rmcp_client::DEFAULT_STARTUP_TIMEOUT; +use crate::rmcp_client::MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC; +use crate::rmcp_client::MCP_TOOLS_LIST_DURATION_METRIC; +use crate::rmcp_client::ManagedClient; +use crate::rmcp_client::StartupOutcomeError; +use crate::rmcp_client::list_tools_for_client_uncached; +use crate::runtime::McpRuntimeEnvironment; +use crate::runtime::emit_duration; +use crate::tools::ToolInfo; +use crate::tools::filter_tools; +use crate::tools::qualify_tools; +use crate::tools::tool_with_model_visible_input_schema; +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +use async_channel::Sender; +use codex_config::Constrained; +use codex_config::McpServerConfig; +use codex_config::McpServerTransportConfig; +use codex_config::types::OAuthCredentialsStoreMode; +use codex_login::CodexAuth; +use codex_protocol::ToolName; +use codex_protocol::mcp::CallToolResult; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::McpStartupCompleteEvent; +use codex_protocol::protocol::McpStartupFailure; +use codex_protocol::protocol::McpStartupStatus; +use codex_protocol::protocol::McpStartupUpdateEvent; +use codex_rmcp_client::ElicitationResponse; +use rmcp::model::ListResourceTemplatesResult; +use rmcp::model::ListResourcesResult; +use rmcp::model::PaginatedRequestParams; +use rmcp::model::ReadResourceRequestParams; +use rmcp::model::ReadResourceResult; +use rmcp::model::RequestId; +use rmcp::model::Resource; +use rmcp::model::ResourceTemplate; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::instrument; +use tracing::warn; +use url::Url; + +/// A thin wrapper around a set of running [`RmcpClient`] instances. +pub struct McpConnectionManager { + clients: HashMap, + server_origins: HashMap, + elicitation_requests: ElicitationRequestManager, +} + +impl McpConnectionManager { + pub fn new_uninitialized( + approval_policy: &Constrained, + permission_profile: &Constrained, + ) -> Self { + Self { + clients: HashMap::new(), + server_origins: HashMap::new(), + elicitation_requests: ElicitationRequestManager::new( + approval_policy.value(), + permission_profile.get().clone(), + ), + } + } + + pub fn has_servers(&self) -> bool { + !self.clients.is_empty() + } + + pub fn server_origin(&self, server_name: &str) -> Option<&str> { + self.server_origins.get(server_name).map(String::as_str) + } + + pub fn set_approval_policy(&self, approval_policy: &Constrained) { + if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() { + *policy = approval_policy.value(); + } + } + + pub fn set_permission_profile(&self, permission_profile: PermissionProfile) { + if let Ok(mut profile) = self.elicitation_requests.permission_profile.lock() { + *profile = permission_profile; + } + } + + #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] + pub async fn new( + mcp_servers: &HashMap, + store_mode: OAuthCredentialsStoreMode, + auth_entries: HashMap, + approval_policy: &Constrained, + submit_id: String, + tx_event: Sender, + initial_permission_profile: PermissionProfile, + runtime_environment: McpRuntimeEnvironment, + codex_home: PathBuf, + codex_apps_tools_cache_key: CodexAppsToolsCacheKey, + tool_plugin_provenance: ToolPluginProvenance, + auth: Option<&CodexAuth>, + ) -> (Self, CancellationToken) { + let cancel_token = CancellationToken::new(); + let mut clients = HashMap::new(); + let mut server_origins = HashMap::new(); + let mut join_set = JoinSet::new(); + let elicitation_requests = + ElicitationRequestManager::new(approval_policy.value(), initial_permission_profile); + let tool_plugin_provenance = Arc::new(tool_plugin_provenance); + let startup_submit_id = submit_id.clone(); + let codex_apps_auth_provider = auth + .filter(|auth| auth.uses_codex_backend()) + .map(codex_model_provider::auth_provider_from_auth); + let mcp_servers = mcp_servers.clone(); + for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) { + if let Some(origin) = transport_origin(&cfg.transport) { + server_origins.insert(server_name.clone(), origin); + } + let cancel_token = cancel_token.child_token(); + let _ = emit_update( + startup_submit_id.as_str(), + &tx_event, + McpStartupUpdateEvent { + server: server_name.clone(), + status: McpStartupStatus::Starting, + }, + ) + .await; + let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME { + Some(CodexAppsToolsCacheContext { + codex_home: codex_home.clone(), + user_key: codex_apps_tools_cache_key.clone(), + }) + } else { + None + }; + let uses_env_bearer_token = match &cfg.transport { + McpServerTransportConfig::StreamableHttp { + bearer_token_env_var, + .. + } => bearer_token_env_var.is_some(), + McpServerTransportConfig::Stdio { .. } => false, + }; + let runtime_auth_provider = + if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token { + codex_apps_auth_provider.clone() + } else { + None + }; + let async_managed_client = AsyncManagedClient::new( + server_name.clone(), + cfg, + store_mode, + cancel_token.clone(), + tx_event.clone(), + elicitation_requests.clone(), + codex_apps_tools_cache_context, + Arc::clone(&tool_plugin_provenance), + runtime_environment.clone(), + runtime_auth_provider, + ); + clients.insert(server_name.clone(), async_managed_client.clone()); + let tx_event = tx_event.clone(); + let submit_id = startup_submit_id.clone(); + let auth_entry = auth_entries.get(&server_name).cloned(); + join_set.spawn(async move { + let mut outcome = async_managed_client.client().await; + if cancel_token.is_cancelled() { + outcome = Err(StartupOutcomeError::Cancelled); + } + let status = match &outcome { + Ok(_) => McpStartupStatus::Ready, + Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled, + Err(error) => { + let error_str = mcp_init_error_display( + server_name.as_str(), + auth_entry.as_ref(), + error, + ); + McpStartupStatus::Failed { error: error_str } + } + }; + + let _ = emit_update( + submit_id.as_str(), + &tx_event, + McpStartupUpdateEvent { + server: server_name.clone(), + status, + }, + ) + .await; + + (server_name, outcome) + }); + } + let manager = Self { + clients, + server_origins, + elicitation_requests: elicitation_requests.clone(), + }; + tokio::spawn(async move { + let outcomes = join_set.join_all().await; + let mut summary = McpStartupCompleteEvent::default(); + for (server_name, outcome) in outcomes { + match outcome { + Ok(_) => summary.ready.push(server_name), + Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name), + Err(StartupOutcomeError::Failed { error }) => { + summary.failed.push(McpStartupFailure { + server: server_name, + error, + }) + } + } + } + let _ = tx_event + .send(Event { + id: startup_submit_id, + msg: EventMsg::McpStartupComplete(summary), + }) + .await; + }); + (manager, cancel_token) + } + + pub async fn resolve_elicitation( + &self, + server_name: String, + id: RequestId, + response: ElicitationResponse, + ) -> Result<()> { + self.elicitation_requests + .resolve(server_name, id, response) + .await + } + + pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool { + let Some(async_managed_client) = self.clients.get(server_name) else { + return false; + }; + + match tokio::time::timeout(timeout, async_managed_client.client()).await { + Ok(Ok(_)) => true, + Ok(Err(_)) | Err(_) => false, + } + } + + pub async fn required_startup_failures( + &self, + required_servers: &[String], + ) -> Vec { + let mut failures = Vec::new(); + for server_name in required_servers { + let Some(async_managed_client) = self.clients.get(server_name).cloned() else { + failures.push(McpStartupFailure { + server: server_name.clone(), + error: format!("required MCP server `{server_name}` was not initialized"), + }); + continue; + }; + + match async_managed_client.client().await { + Ok(_) => {} + Err(error) => failures.push(McpStartupFailure { + server: server_name.clone(), + error: startup_outcome_error_message(error), + }), + } + } + failures + } + + /// Returns a single map that contains all tools. Each key is the + /// fully-qualified name for the tool. + #[instrument(level = "trace", skip_all)] + pub async fn list_all_tools(&self) -> HashMap { + let mut tools = Vec::new(); + for managed_client in self.clients.values() { + let Some(server_tools) = managed_client.listed_tools().await else { + continue; + }; + tools.extend(server_tools); + } + qualify_tools(tools) + } + + /// Force-refresh codex apps tools by bypassing the in-process cache. + /// + /// On success, the refreshed tools replace the cache contents and the + /// latest filtered tool map is returned directly to the caller. On + /// failure, the existing cache remains unchanged. + pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result> { + let managed_client = self + .clients + .get(CODEX_APPS_MCP_SERVER_NAME) + .ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))? + .client() + .await + .context("failed to get client")?; + + let list_start = Instant::now(); + let fetch_start = Instant::now(); + let tools = list_tools_for_client_uncached( + CODEX_APPS_MCP_SERVER_NAME, + &managed_client.client, + managed_client.tool_timeout, + managed_client.server_instructions.as_deref(), + ) + .await + .with_context(|| { + format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'") + })?; + emit_duration( + MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, + fetch_start.elapsed(), + &[], + ); + + write_cached_codex_apps_tools_if_needed( + CODEX_APPS_MCP_SERVER_NAME, + managed_client.codex_apps_tools_cache_context.as_ref(), + &tools, + ); + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + list_start.elapsed(), + &[("cache", "miss")], + ); + let tools = filter_tools(tools, &managed_client.tool_filter) + .into_iter() + .map(|mut tool| { + tool.tool = tool_with_model_visible_input_schema(&tool.tool); + tool + }); + Ok(qualify_tools(tools)) + } + + /// Returns a single map that contains all resources. Each key is the + /// server name and the value is a vector of resources. + pub async fn list_all_resources(&self) -> HashMap> { + let mut join_set = JoinSet::new(); + + let clients_snapshot = &self.clients; + + for (server_name, async_managed_client) in clients_snapshot { + let server_name = server_name.clone(); + let Ok(managed_client) = async_managed_client.client().await else { + continue; + }; + let timeout = managed_client.tool_timeout; + let client = managed_client.client.clone(); + + join_set.spawn(async move { + let mut collected: Vec = Vec::new(); + let mut cursor: Option = None; + + loop { + let params = cursor.as_ref().map(|next| PaginatedRequestParams { + meta: None, + cursor: Some(next.clone()), + }); + let response = match client.list_resources(params, timeout).await { + Ok(result) => result, + Err(err) => return (server_name, Err(err)), + }; + + collected.extend(response.resources); + + match response.next_cursor { + Some(next) => { + if cursor.as_ref() == Some(&next) { + return ( + server_name, + Err(anyhow!("resources/list returned duplicate cursor")), + ); + } + cursor = Some(next); + } + None => return (server_name, Ok(collected)), + } + } + }); + } + + let mut aggregated: HashMap> = HashMap::new(); + + while let Some(join_res) = join_set.join_next().await { + match join_res { + Ok((server_name, Ok(resources))) => { + aggregated.insert(server_name, resources); + } + Ok((server_name, Err(err))) => { + warn!("Failed to list resources for MCP server '{server_name}': {err:#}"); + } + Err(err) => { + warn!("Task panic when listing resources for MCP server: {err:#}"); + } + } + } + + aggregated + } + + /// Returns a single map that contains all resource templates. Each key is the + /// server name and the value is a vector of resource templates. + pub async fn list_all_resource_templates(&self) -> HashMap> { + let mut join_set = JoinSet::new(); + + let clients_snapshot = &self.clients; + + for (server_name, async_managed_client) in clients_snapshot { + let server_name_cloned = server_name.clone(); + let Ok(managed_client) = async_managed_client.client().await else { + continue; + }; + let client = managed_client.client.clone(); + let timeout = managed_client.tool_timeout; + + join_set.spawn(async move { + let mut collected: Vec = Vec::new(); + let mut cursor: Option = None; + + loop { + let params = cursor.as_ref().map(|next| PaginatedRequestParams { + meta: None, + cursor: Some(next.clone()), + }); + let response = match client.list_resource_templates(params, timeout).await { + Ok(result) => result, + Err(err) => return (server_name_cloned, Err(err)), + }; + + collected.extend(response.resource_templates); + + match response.next_cursor { + Some(next) => { + if cursor.as_ref() == Some(&next) { + return ( + server_name_cloned, + Err(anyhow!( + "resources/templates/list returned duplicate cursor" + )), + ); + } + cursor = Some(next); + } + None => return (server_name_cloned, Ok(collected)), + } + } + }); + } + + let mut aggregated: HashMap> = HashMap::new(); + + while let Some(join_res) = join_set.join_next().await { + match join_res { + Ok((server_name, Ok(templates))) => { + aggregated.insert(server_name, templates); + } + Ok((server_name, Err(err))) => { + warn!( + "Failed to list resource templates for MCP server '{server_name}': {err:#}" + ); + } + Err(err) => { + warn!("Task panic when listing resource templates for MCP server: {err:#}"); + } + } + } + + aggregated + } + + /// Invoke the tool indicated by the (server, tool) pair. + pub async fn call_tool( + &self, + server: &str, + tool: &str, + arguments: Option, + meta: Option, + ) -> Result { + let client = self.client_by_name(server).await?; + if !client.tool_filter.allows(tool) { + return Err(anyhow!( + "tool '{tool}' is disabled for MCP server '{server}'" + )); + } + + let result: rmcp::model::CallToolResult = client + .client + .call_tool(tool.to_string(), arguments, meta, client.tool_timeout) + .await + .with_context(|| format!("tool call failed for `{server}/{tool}`"))?; + + let content = result + .content + .into_iter() + .map(|content| { + serde_json::to_value(content) + .unwrap_or_else(|_| serde_json::Value::String("".to_string())) + }) + .collect(); + + Ok(CallToolResult { + content, + structured_content: result.structured_content, + is_error: result.is_error, + meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()), + }) + } + + pub async fn server_supports_sandbox_state_meta_capability( + &self, + server: &str, + ) -> Result { + Ok(self + .client_by_name(server) + .await? + .server_supports_sandbox_state_meta_capability) + } + + /// List resources from the specified server. + pub async fn list_resources( + &self, + server: &str, + params: Option, + ) -> Result { + let managed = self.client_by_name(server).await?; + let timeout = managed.tool_timeout; + + managed + .client + .list_resources(params, timeout) + .await + .with_context(|| format!("resources/list failed for `{server}`")) + } + + /// List resource templates from the specified server. + pub async fn list_resource_templates( + &self, + server: &str, + params: Option, + ) -> Result { + let managed = self.client_by_name(server).await?; + let client = managed.client.clone(); + let timeout = managed.tool_timeout; + + client + .list_resource_templates(params, timeout) + .await + .with_context(|| format!("resources/templates/list failed for `{server}`")) + } + + /// Read a resource from the specified server. + pub async fn read_resource( + &self, + server: &str, + params: ReadResourceRequestParams, + ) -> Result { + let managed = self.client_by_name(server).await?; + let client = managed.client.clone(); + let timeout = managed.tool_timeout; + let uri = params.uri.clone(); + + client + .read_resource(params, timeout) + .await + .with_context(|| format!("resources/read failed for `{server}` ({uri})")) + } + + pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option { + let all_tools = self.list_all_tools().await; + all_tools + .into_values() + .find(|tool| tool.canonical_tool_name() == *tool_name) + } + + async fn client_by_name(&self, name: &str) -> Result { + self.clients + .get(name) + .ok_or_else(|| anyhow!("unknown MCP server '{name}'"))? + .client() + .await + .context("failed to get client") + } +} + +async fn emit_update( + submit_id: &str, + tx_event: &Sender, + update: McpStartupUpdateEvent, +) -> Result<(), async_channel::SendError> { + tx_event + .send(Event { + id: submit_id.to_string(), + msg: EventMsg::McpStartupUpdate(update), + }) + .await +} + +fn transport_origin(transport: &McpServerTransportConfig) -> Option { + match transport { + McpServerTransportConfig::StreamableHttp { url, .. } => { + let parsed = Url::parse(url).ok()?; + Some(parsed.origin().ascii_serialization()) + } + McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()), + } +} + +fn mcp_init_error_display( + server_name: &str, + entry: Option<&McpAuthStatusEntry>, + err: &StartupOutcomeError, +) -> String { + if let Some(McpServerTransportConfig::StreamableHttp { + url, + bearer_token_env_var, + http_headers, + .. + }) = &entry.map(|entry| &entry.config.transport) + && url == "https://api.githubcopilot.com/mcp/" + && bearer_token_env_var.is_none() + && http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true) + { + format!( + "GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN" + ) + } else if is_mcp_client_auth_required_error(err) { + format!( + "The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`." + ) + } else if is_mcp_client_startup_timeout_error(err) { + let startup_timeout_secs = match entry { + Some(entry) => match entry.config.startup_timeout_sec { + Some(timeout) => timeout, + None => DEFAULT_STARTUP_TIMEOUT, + }, + None => DEFAULT_STARTUP_TIMEOUT, + } + .as_secs(); + format!( + "MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX" + ) + } else { + format!("MCP client for `{server_name}` failed to start: {err:#}") + } +} + +fn startup_outcome_error_message(error: StartupOutcomeError) -> String { + match error { + StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(), + StartupOutcomeError::Failed { error } => error, + } +} + +fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool { + match error { + StartupOutcomeError::Failed { error } => error.contains("Auth required"), + _ => false, + } +} + +fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool { + match error { + StartupOutcomeError::Failed { error } => { + error.contains("request timed out") + || error.contains("timed out handshaking with MCP server") + } + _ => false, + } +} + +#[cfg(test)] +#[path = "connection_manager_tests.rs"] +mod tests; diff --git a/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs b/codex-rs/codex-mcp/src/connection_manager_tests.rs similarity index 96% rename from codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs rename to codex-rs/codex-mcp/src/connection_manager_tests.rs index 0b9c1f3b6d..02c4bcc733 100644 --- a/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs +++ b/codex-rs/codex-mcp/src/connection_manager_tests.rs @@ -1,12 +1,36 @@ use super::*; +use crate::codex_apps::CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION; +use crate::codex_apps::CodexAppsToolsCacheContext; +use crate::codex_apps::load_startup_cached_codex_apps_tools_snapshot; +use crate::codex_apps::read_cached_codex_apps_tools; +use crate::codex_apps::write_cached_codex_apps_tools; +use crate::declared_openai_file_input_param_names; +use crate::elicitation::ElicitationRequestManager; +use crate::elicitation::elicitation_is_rejected_by_policy; +use crate::rmcp_client::AsyncManagedClient; +use crate::rmcp_client::ManagedClient; +use crate::rmcp_client::StartupOutcomeError; +use crate::rmcp_client::elicitation_capability_for_server; +use crate::tools::ToolFilter; +use crate::tools::ToolInfo; +use crate::tools::filter_tools; +use crate::tools::qualify_tools; +use crate::tools::tool_with_model_visible_input_schema; +use codex_config::Constrained; use codex_protocol::ToolName; use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::GranularApprovalConfig; use codex_protocol::protocol::McpAuthStatus; +use futures::FutureExt; use pretty_assertions::assert_eq; +use rmcp::model::CreateElicitationRequestParams; +use rmcp::model::ElicitationAction; +use rmcp::model::ElicitationCapability; +use rmcp::model::FormElicitationCapability; use rmcp::model::JsonObject; use rmcp::model::Meta; use rmcp::model::NumberOrString; +use rmcp::model::Tool; use std::collections::HashSet; use std::sync::Arc; use tempfile::tempdir; diff --git a/codex-rs/codex-mcp/src/elicitation.rs b/codex-rs/codex-mcp/src/elicitation.rs new file mode 100644 index 0000000000..101bda4125 --- /dev/null +++ b/codex-rs/codex-mcp/src/elicitation.rs @@ -0,0 +1,190 @@ +//! MCP elicitation request tracking and policy handling. +//! +//! RMCP clients call into this module when a server asks Codex to elicit data +//! from the user. It decides whether the request can be automatically accepted, +//! must be declined by policy, or should be surfaced as a Codex protocol event +//! and later resolved through the stored responder. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; + +use crate::mcp::mcp_permission_prompt_is_auto_approved; +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +use async_channel::Sender; +use codex_protocol::approvals::ElicitationRequest; +use codex_protocol::approvals::ElicitationRequestEvent; +use codex_protocol::mcp::RequestId as ProtocolRequestId; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_rmcp_client::ElicitationResponse; +use codex_rmcp_client::SendElicitation; +use futures::future::FutureExt; +use rmcp::model::CreateElicitationRequestParams; +use rmcp::model::ElicitationAction; +use rmcp::model::RequestId; +use tokio::sync::Mutex; +use tokio::sync::oneshot; + +#[derive(Clone)] +pub(crate) struct ElicitationRequestManager { + requests: Arc>, + pub(crate) approval_policy: Arc>, + pub(crate) permission_profile: Arc>, +} + +impl ElicitationRequestManager { + pub(crate) fn new( + approval_policy: AskForApproval, + permission_profile: PermissionProfile, + ) -> Self { + Self { + requests: Arc::new(Mutex::new(HashMap::new())), + approval_policy: Arc::new(StdMutex::new(approval_policy)), + permission_profile: Arc::new(StdMutex::new(permission_profile)), + } + } + + pub(crate) async fn resolve( + &self, + server_name: String, + id: RequestId, + response: ElicitationResponse, + ) -> Result<()> { + self.requests + .lock() + .await + .remove(&(server_name, id)) + .ok_or_else(|| anyhow!("elicitation request not found"))? + .send(response) + .map_err(|e| anyhow!("failed to send elicitation response: {e:?}")) + } + + pub(crate) fn make_sender( + &self, + server_name: String, + tx_event: Sender, + ) -> SendElicitation { + let elicitation_requests = self.requests.clone(); + let approval_policy = self.approval_policy.clone(); + let permission_profile = self.permission_profile.clone(); + Box::new(move |id, elicitation| { + let elicitation_requests = elicitation_requests.clone(); + let tx_event = tx_event.clone(); + let server_name = server_name.clone(); + let approval_policy = approval_policy.clone(); + let permission_profile = permission_profile.clone(); + async move { + let approval_policy = approval_policy + .lock() + .map(|policy| *policy) + .unwrap_or(AskForApproval::Never); + let permission_profile = permission_profile + .lock() + .map(|profile| profile.clone()) + .unwrap_or_default(); + if mcp_permission_prompt_is_auto_approved(approval_policy, &permission_profile) + && can_auto_accept_elicitation(&elicitation) + { + return Ok(ElicitationResponse { + action: ElicitationAction::Accept, + content: Some(serde_json::json!({})), + meta: None, + }); + } + + if elicitation_is_rejected_by_policy(approval_policy) { + return Ok(ElicitationResponse { + action: ElicitationAction::Decline, + content: None, + meta: None, + }); + } + + let request = match elicitation { + CreateElicitationRequestParams::FormElicitationParams { + meta, + message, + requested_schema, + } => ElicitationRequest::Form { + meta: meta + .map(serde_json::to_value) + .transpose() + .context("failed to serialize MCP elicitation metadata")?, + message, + requested_schema: serde_json::to_value(requested_schema) + .context("failed to serialize MCP elicitation schema")?, + }, + CreateElicitationRequestParams::UrlElicitationParams { + meta, + message, + url, + elicitation_id, + } => ElicitationRequest::Url { + meta: meta + .map(serde_json::to_value) + .transpose() + .context("failed to serialize MCP elicitation metadata")?, + message, + url, + elicitation_id, + }, + }; + let (tx, rx) = oneshot::channel(); + { + let mut lock = elicitation_requests.lock().await; + lock.insert((server_name.clone(), id.clone()), tx); + } + let _ = tx_event + .send(Event { + id: "mcp_elicitation_request".to_string(), + msg: EventMsg::ElicitationRequest(ElicitationRequestEvent { + turn_id: None, + server_name, + id: match id.clone() { + rmcp::model::NumberOrString::String(value) => { + ProtocolRequestId::String(value.to_string()) + } + rmcp::model::NumberOrString::Number(value) => { + ProtocolRequestId::Integer(value) + } + }, + request, + }), + }) + .await; + rx.await + .context("elicitation request channel closed unexpectedly") + } + .boxed() + }) + } +} + +pub(crate) fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool { + match approval_policy { + AskForApproval::Never => true, + AskForApproval::OnFailure => false, + AskForApproval::OnRequest => false, + AskForApproval::UnlessTrusted => false, + AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(), + } +} + +type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; + +fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool { + match elicitation { + CreateElicitationRequestParams::FormElicitationParams { + requested_schema, .. + } => { + // Auto-accept confirm/approval elicitations without schema requirements. + requested_schema.properties.is_empty() + } + CreateElicitationRequestParams::UrlElicitationParams { .. } => false, + } +} diff --git a/codex-rs/codex-mcp/src/lib.rs b/codex-rs/codex-mcp/src/lib.rs index ae73563c1e..1d3fd17619 100644 --- a/codex-rs/codex-mcp/src/lib.rs +++ b/codex-rs/codex-mcp/src/lib.rs @@ -1,15 +1,15 @@ -pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY; -pub use mcp_connection_manager::McpConnectionManager; -pub use mcp_connection_manager::McpRuntimeEnvironment; -pub use mcp_connection_manager::SandboxState; -pub use mcp_connection_manager::ToolInfo; +pub use connection_manager::McpConnectionManager; +pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY; +pub use runtime::McpRuntimeEnvironment; +pub use runtime::SandboxState; +pub use tools::ToolInfo; pub use mcp::CODEX_APPS_MCP_SERVER_NAME; pub use mcp::McpConfig; pub use mcp::ToolPluginProvenance; -pub use mcp_connection_manager::CodexAppsToolsCacheKey; -pub use mcp_connection_manager::codex_apps_tools_cache_key; +pub use codex_apps::CodexAppsToolsCacheKey; +pub use codex_apps::codex_apps_tools_cache_key; pub use mcp::configured_mcp_servers; pub use mcp::effective_mcp_servers; @@ -33,11 +33,15 @@ pub use mcp::oauth_login_support; pub use mcp::resolve_oauth_scopes; pub use mcp::should_retry_without_scopes; +pub use codex_apps::filter_non_codex_apps_mcp_tools_only; pub use mcp::mcp_permission_prompt_is_auto_approved; pub use mcp::qualified_mcp_tool_name_prefix; -pub use mcp_connection_manager::declared_openai_file_input_param_names; -pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only; +pub use tools::declared_openai_file_input_param_names; +pub(crate) mod codex_apps; +pub(crate) mod connection_manager; +pub(crate) mod elicitation; pub(crate) mod mcp; -pub(crate) mod mcp_connection_manager; -pub(crate) mod mcp_tool_names; +pub(crate) mod rmcp_client; +pub(crate) mod runtime; +pub(crate) mod tools; diff --git a/codex-rs/codex-mcp/src/mcp/mod.rs b/codex-rs/codex-mcp/src/mcp/mod.rs index e928621b59..080ac889de 100644 --- a/codex-rs/codex-mcp/src/mcp/mod.rs +++ b/codex-rs/codex-mcp/src/mcp/mod.rs @@ -34,9 +34,9 @@ use rmcp::model::ReadResourceRequestParams; use rmcp::model::ReadResourceResult; use serde_json::Value; -use crate::mcp_connection_manager::McpConnectionManager; -use crate::mcp_connection_manager::McpRuntimeEnvironment; -use crate::mcp_connection_manager::codex_apps_tools_cache_key; +use crate::codex_apps::codex_apps_tools_cache_key; +use crate::connection_manager::McpConnectionManager; +use crate::runtime::McpRuntimeEnvironment; pub const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps"; const MCP_TOOL_NAME_PREFIX: &str = "mcp"; diff --git a/codex-rs/codex-mcp/src/mcp/mod_tests.rs b/codex-rs/codex-mcp/src/mcp/mod_tests.rs index 885dcc8901..8c977c63ce 100644 --- a/codex-rs/codex-mcp/src/mcp/mod_tests.rs +++ b/codex-rs/codex-mcp/src/mcp/mod_tests.rs @@ -64,6 +64,7 @@ fn mcp_prompt_auto_approval_honors_unrestricted_managed_profiles() { }, )); } + #[test] fn tool_plugin_provenance_collects_app_and_mcp_sources() { let provenance = ToolPluginProvenance::from_capability_summaries(&[ diff --git a/codex-rs/codex-mcp/src/mcp_connection_manager.rs b/codex-rs/codex-mcp/src/mcp_connection_manager.rs deleted file mode 100644 index d7e345fb1a..0000000000 --- a/codex-rs/codex-mcp/src/mcp_connection_manager.rs +++ /dev/null @@ -1,1859 +0,0 @@ -//! Connection manager for Model Context Protocol (MCP) servers. -//! -//! The [`McpConnectionManager`] owns one [`codex_rmcp_client::RmcpClient`] per -//! configured server (keyed by the *server name*). It offers convenience -//! helpers to query the available tools across *all* servers and returns them -//! in a single aggregated map using the model-visible fully-qualified tool name -//! as the key. - -use std::borrow::Cow; -use std::collections::HashMap; -use std::collections::HashSet; -use std::env; -use std::ffi::OsString; -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::Mutex as StdMutex; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::time::Duration; -use std::time::Instant; - -use crate::McpAuthStatusEntry; -use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; -use crate::mcp::ToolPluginProvenance; -use crate::mcp::mcp_permission_prompt_is_auto_approved; -pub(crate) use crate::mcp_tool_names::qualify_tools; -use anyhow::Context; -use anyhow::Result; -use anyhow::anyhow; -use async_channel::Sender; -use codex_api::SharedAuthProvider; -use codex_async_utils::CancelErr; -use codex_async_utils::OrCancelExt; -use codex_config::Constrained; -use codex_config::types::OAuthCredentialsStoreMode; -use codex_exec_server::Environment; -use codex_exec_server::HttpClient; -use codex_exec_server::ReqwestHttpClient; -use codex_protocol::ToolName; -use codex_protocol::approvals::ElicitationRequest; -use codex_protocol::approvals::ElicitationRequestEvent; -use codex_protocol::mcp::CallToolResult; -use codex_protocol::mcp::RequestId as ProtocolRequestId; -use codex_protocol::models::PermissionProfile; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::McpStartupCompleteEvent; -use codex_protocol::protocol::McpStartupFailure; -use codex_protocol::protocol::McpStartupStatus; -use codex_protocol::protocol::McpStartupUpdateEvent; -use codex_protocol::protocol::SandboxPolicy; -use codex_rmcp_client::ElicitationResponse; -use codex_rmcp_client::ExecutorStdioServerLauncher; -use codex_rmcp_client::LocalStdioServerLauncher; -use codex_rmcp_client::RmcpClient; -use codex_rmcp_client::SendElicitation; -use codex_rmcp_client::StdioServerLauncher; -use futures::future::BoxFuture; -use futures::future::FutureExt; -use futures::future::Shared; -use rmcp::model::ClientCapabilities; -use rmcp::model::CreateElicitationRequestParams; -use rmcp::model::ElicitationAction; -use rmcp::model::ElicitationCapability; -use rmcp::model::FormElicitationCapability; -use rmcp::model::Implementation; -use rmcp::model::InitializeRequestParams; -use rmcp::model::ListResourceTemplatesResult; -use rmcp::model::ListResourcesResult; -use rmcp::model::PaginatedRequestParams; -use rmcp::model::ProtocolVersion; -use rmcp::model::ReadResourceRequestParams; -use rmcp::model::ReadResourceResult; -use rmcp::model::RequestId; -use rmcp::model::Resource; -use rmcp::model::ResourceTemplate; -use rmcp::model::Tool; - -use serde::Deserialize; -use serde::Serialize; -use serde_json::Map; -use serde_json::Value as JsonValue; -use sha1::Digest; -use sha1::Sha1; -use tokio::sync::Mutex; -use tokio::sync::oneshot; -use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; -use tracing::instrument; -use tracing::warn; -use url::Url; - -use codex_config::McpServerConfig; -use codex_config::McpServerTransportConfig; -use codex_login::CodexAuth; -use codex_utils_plugins::mcp_connector::is_connector_id_allowed; -use codex_utils_plugins::mcp_connector::sanitize_name; - -/// Delimiter used to separate MCP tool-name parts. -const MCP_TOOL_NAME_DELIMITER: &str = "__"; - -/// Default timeout for initializing MCP server & initially listing tools. -const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30); - -/// Default timeout for individual tool calls. -const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120); - -const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 2; -const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools"; -const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms"; -const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms"; -const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms"; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ToolInfo { - /// Raw MCP server name used for routing the tool call. - pub server_name: String, - /// Model-visible tool name used in Responses API tool declarations. - #[serde(rename = "tool_name", alias = "callable_name")] - pub callable_name: String, - /// Model-visible namespace used for deferred tool loading. - #[serde(rename = "tool_namespace", alias = "callable_namespace")] - pub callable_namespace: String, - /// Instructions from the MCP server initialize result. - #[serde(default)] - pub server_instructions: Option, - /// Raw MCP tool definition; `tool.name` is sent back to the MCP server. - pub tool: Tool, - pub connector_id: Option, - pub connector_name: Option, - #[serde(default)] - pub plugin_display_names: Vec, - pub connector_description: Option, -} - -impl ToolInfo { - pub fn canonical_tool_name(&self) -> ToolName { - ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone()) - } -} - -pub fn declared_openai_file_input_param_names( - meta: Option<&Map>, -) -> Vec { - let Some(meta) = meta else { - return Vec::new(); - }; - - meta.get(META_OPENAI_FILE_PARAMS) - .and_then(JsonValue::as_array) - .into_iter() - .flatten() - .filter_map(JsonValue::as_str) - .filter(|value| !value.is_empty()) - .map(str::to_string) - .collect() -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct CodexAppsToolsCacheKey { - account_id: Option, - chatgpt_user_id: Option, - is_workspace_account: bool, -} - -pub fn codex_apps_tools_cache_key(auth: Option<&CodexAuth>) -> CodexAppsToolsCacheKey { - CodexAppsToolsCacheKey { - account_id: auth.and_then(CodexAuth::get_account_id), - chatgpt_user_id: auth.and_then(CodexAuth::get_chatgpt_user_id), - is_workspace_account: auth.is_some_and(CodexAuth::is_workspace_account), - } -} - -pub fn filter_non_codex_apps_mcp_tools_only( - mcp_tools: &HashMap, -) -> HashMap { - mcp_tools - .iter() - .filter(|(_, tool)| tool.server_name != CODEX_APPS_MCP_SERVER_NAME) - .map(|(name, tool)| (name.clone(), tool.clone())) - .collect() -} - -/// MCP server capability indicating that Codex should include [`SandboxState`] -/// in tool-call request `_meta` under this key. -pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta"; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SandboxState { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub permission_profile: Option, - pub sandbox_policy: SandboxPolicy, - pub codex_linux_sandbox_exe: Option, - pub sandbox_cwd: PathBuf, - #[serde(default)] - pub use_legacy_landlock: bool, -} - -/// A thin wrapper around a set of running [`RmcpClient`] instances. -pub struct McpConnectionManager { - clients: HashMap, - server_origins: HashMap, - elicitation_requests: ElicitationRequestManager, -} - -/// Runtime placement information used when starting MCP server transports. -/// -/// `McpConfig` describes what servers exist. This value describes where those -/// servers should run for the current caller. Keep it explicit at manager -/// construction time so status/snapshot paths and real sessions make the same -/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is -/// used when a stdio server omits `cwd` and the launcher needs a concrete -/// process working directory. -#[derive(Clone)] -pub struct McpRuntimeEnvironment { - environment: Arc, - fallback_cwd: PathBuf, -} - -impl McpRuntimeEnvironment { - pub fn new(environment: Arc, fallback_cwd: PathBuf) -> Self { - Self { - environment, - fallback_cwd, - } - } - - fn environment(&self) -> Arc { - Arc::clone(&self.environment) - } - - fn fallback_cwd(&self) -> PathBuf { - self.fallback_cwd.clone() - } -} - -/// A tool is allowed to be used if both are true: -/// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled. -/// 2. The tool is not explicitly disabled. -#[derive(Default, Clone)] -pub(crate) struct ToolFilter { - enabled: Option>, - disabled: HashSet, -} - -impl ToolFilter { - fn from_config(cfg: &McpServerConfig) -> Self { - let enabled = cfg - .enabled_tools - .as_ref() - .map(|tools| tools.iter().cloned().collect::>()); - let disabled = cfg - .disabled_tools - .as_ref() - .map(|tools| tools.iter().cloned().collect::>()) - .unwrap_or_default(); - - Self { enabled, disabled } - } - - fn allows(&self, tool_name: &str) -> bool { - if let Some(enabled) = &self.enabled - && !enabled.contains(tool_name) - { - return false; - } - - !self.disabled.contains(tool_name) - } -} - -fn sha1_hex(s: &str) -> String { - let mut hasher = Sha1::new(); - hasher.update(s.as_bytes()); - let sha1 = hasher.finalize(); - format!("{sha1:x}") -} - -#[derive(Clone)] -struct CodexAppsToolsCacheContext { - codex_home: PathBuf, - user_key: CodexAppsToolsCacheKey, -} - -impl CodexAppsToolsCacheContext { - fn cache_path(&self) -> PathBuf { - let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default(); - let user_key_hash = sha1_hex(&user_key_json); - self.codex_home - .join(CODEX_APPS_TOOLS_CACHE_DIR) - .join(format!("{user_key_hash}.json")) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct CodexAppsToolsDiskCache { - schema_version: u8, - tools: Vec, -} - -enum CachedCodexAppsToolsLoad { - Hit(Vec), - Missing, - Invalid, -} - -type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; - -fn elicitation_is_rejected_by_policy(approval_policy: AskForApproval) -> bool { - match approval_policy { - AskForApproval::Never => true, - AskForApproval::OnFailure => false, - AskForApproval::OnRequest => false, - AskForApproval::UnlessTrusted => false, - AskForApproval::Granular(granular_config) => !granular_config.allows_mcp_elicitations(), - } -} - -fn can_auto_accept_elicitation(elicitation: &CreateElicitationRequestParams) -> bool { - match elicitation { - CreateElicitationRequestParams::FormElicitationParams { - requested_schema, .. - } => { - // Auto-accept confirm/approval elicitations without schema requirements. - requested_schema.properties.is_empty() - } - CreateElicitationRequestParams::UrlElicitationParams { .. } => false, - } -} - -#[derive(Clone)] -struct ElicitationRequestManager { - requests: Arc>, - approval_policy: Arc>, - permission_profile: Arc>, -} - -impl ElicitationRequestManager { - fn new(approval_policy: AskForApproval, permission_profile: PermissionProfile) -> Self { - Self { - requests: Arc::new(Mutex::new(HashMap::new())), - approval_policy: Arc::new(StdMutex::new(approval_policy)), - permission_profile: Arc::new(StdMutex::new(permission_profile)), - } - } - - async fn resolve( - &self, - server_name: String, - id: RequestId, - response: ElicitationResponse, - ) -> Result<()> { - self.requests - .lock() - .await - .remove(&(server_name, id)) - .ok_or_else(|| anyhow!("elicitation request not found"))? - .send(response) - .map_err(|e| anyhow!("failed to send elicitation response: {e:?}")) - } - - fn make_sender(&self, server_name: String, tx_event: Sender) -> SendElicitation { - let elicitation_requests = self.requests.clone(); - let approval_policy = self.approval_policy.clone(); - let permission_profile = self.permission_profile.clone(); - Box::new(move |id, elicitation| { - let elicitation_requests = elicitation_requests.clone(); - let tx_event = tx_event.clone(); - let server_name = server_name.clone(); - let approval_policy = approval_policy.clone(); - let permission_profile = permission_profile.clone(); - async move { - let approval_policy = approval_policy - .lock() - .map(|policy| *policy) - .unwrap_or(AskForApproval::Never); - let permission_profile = permission_profile - .lock() - .map(|profile| profile.clone()) - .unwrap_or_default(); - if mcp_permission_prompt_is_auto_approved(approval_policy, &permission_profile) - && can_auto_accept_elicitation(&elicitation) - { - return Ok(ElicitationResponse { - action: ElicitationAction::Accept, - content: Some(serde_json::json!({})), - meta: None, - }); - } - - if elicitation_is_rejected_by_policy(approval_policy) { - return Ok(ElicitationResponse { - action: ElicitationAction::Decline, - content: None, - meta: None, - }); - } - - let request = match elicitation { - CreateElicitationRequestParams::FormElicitationParams { - meta, - message, - requested_schema, - } => ElicitationRequest::Form { - meta: meta - .map(serde_json::to_value) - .transpose() - .context("failed to serialize MCP elicitation metadata")?, - message, - requested_schema: serde_json::to_value(requested_schema) - .context("failed to serialize MCP elicitation schema")?, - }, - CreateElicitationRequestParams::UrlElicitationParams { - meta, - message, - url, - elicitation_id, - } => ElicitationRequest::Url { - meta: meta - .map(serde_json::to_value) - .transpose() - .context("failed to serialize MCP elicitation metadata")?, - message, - url, - elicitation_id, - }, - }; - let (tx, rx) = oneshot::channel(); - { - let mut lock = elicitation_requests.lock().await; - lock.insert((server_name.clone(), id.clone()), tx); - } - let _ = tx_event - .send(Event { - id: "mcp_elicitation_request".to_string(), - msg: EventMsg::ElicitationRequest(ElicitationRequestEvent { - turn_id: None, - server_name, - id: match id.clone() { - rmcp::model::NumberOrString::String(value) => { - ProtocolRequestId::String(value.to_string()) - } - rmcp::model::NumberOrString::Number(value) => { - ProtocolRequestId::Integer(value) - } - }, - request, - }), - }) - .await; - rx.await - .context("elicitation request channel closed unexpectedly") - } - .boxed() - }) - } -} - -#[derive(Clone)] -struct ManagedClient { - client: Arc, - tools: Vec, - tool_filter: ToolFilter, - tool_timeout: Option, - server_instructions: Option, - server_supports_sandbox_state_meta_capability: bool, - codex_apps_tools_cache_context: Option, -} - -impl ManagedClient { - fn listed_tools(&self) -> Vec { - let total_start = Instant::now(); - if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref() - && let CachedCodexAppsToolsLoad::Hit(tools) = - load_cached_codex_apps_tools(cache_context) - { - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - total_start.elapsed(), - &[("cache", "hit")], - ); - return filter_tools(tools, &self.tool_filter); - } - - if self.codex_apps_tools_cache_context.is_some() { - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - total_start.elapsed(), - &[("cache", "miss")], - ); - } - - self.tools.clone() - } -} - -#[derive(Clone)] -struct AsyncManagedClient { - client: Shared>>, - startup_snapshot: Option>, - startup_complete: Arc, - tool_plugin_provenance: Arc, -} - -impl AsyncManagedClient { - // Keep this constructor flat so the startup inputs remain readable at the - // single call site instead of introducing a one-off params wrapper. - #[allow(clippy::too_many_arguments)] - fn new( - server_name: String, - config: McpServerConfig, - store_mode: OAuthCredentialsStoreMode, - cancel_token: CancellationToken, - tx_event: Sender, - elicitation_requests: ElicitationRequestManager, - codex_apps_tools_cache_context: Option, - tool_plugin_provenance: Arc, - runtime_environment: McpRuntimeEnvironment, - runtime_auth_provider: Option, - ) -> Self { - let tool_filter = ToolFilter::from_config(&config); - let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( - &server_name, - codex_apps_tools_cache_context.as_ref(), - ) - .map(|tools| filter_tools(tools, &tool_filter)); - let startup_tool_filter = tool_filter; - let startup_complete = Arc::new(AtomicBool::new(false)); - let startup_complete_for_fut = Arc::clone(&startup_complete); - let fut = async move { - let outcome = async { - if let Err(error) = validate_mcp_server_name(&server_name) { - return Err(error.into()); - } - - let client = Arc::new( - make_rmcp_client( - &server_name, - config.clone(), - store_mode, - runtime_environment, - runtime_auth_provider, - ) - .await?, - ); - match start_server_task( - server_name, - client, - StartServerTaskParams { - startup_timeout: config - .startup_timeout_sec - .or(Some(DEFAULT_STARTUP_TIMEOUT)), - tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), - tool_filter: startup_tool_filter, - tx_event, - elicitation_requests, - codex_apps_tools_cache_context, - }, - ) - .or_cancel(&cancel_token) - .await - { - Ok(result) => result, - Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled), - } - } - .await; - - startup_complete_for_fut.store(true, Ordering::Release); - outcome - }; - let client = fut.boxed().shared(); - if startup_snapshot.is_some() { - let startup_task = client.clone(); - tokio::spawn(async move { - let _ = startup_task.await; - }); - } - - Self { - client, - startup_snapshot, - startup_complete, - tool_plugin_provenance, - } - } - - async fn client(&self) -> Result { - self.client.clone().await - } - - fn startup_snapshot_while_initializing(&self) -> Option> { - if !self.startup_complete.load(Ordering::Acquire) { - return self.startup_snapshot.clone(); - } - None - } - - async fn listed_tools(&self) -> Option> { - let annotate_tools = |tools: Vec| { - let mut tools = tools; - for tool in &mut tools { - if tool.server_name == CODEX_APPS_MCP_SERVER_NAME { - tool.tool = tool_with_model_visible_input_schema(&tool.tool); - } - - let plugin_names = match tool.connector_id.as_deref() { - Some(connector_id) => self - .tool_plugin_provenance - .plugin_display_names_for_connector_id(connector_id), - None => self - .tool_plugin_provenance - .plugin_display_names_for_mcp_server_name(tool.server_name.as_str()), - }; - tool.plugin_display_names = plugin_names.to_vec(); - - if plugin_names.is_empty() { - continue; - } - - let plugin_source_note = if plugin_names.len() == 1 { - format!("This tool is part of plugin `{}`.", plugin_names[0]) - } else { - format!( - "This tool is part of plugins {}.", - plugin_names - .iter() - .map(|plugin_name| format!("`{plugin_name}`")) - .collect::>() - .join(", ") - ) - }; - let description = tool - .tool - .description - .as_deref() - .map(str::trim) - .unwrap_or(""); - let annotated_description = if description.is_empty() { - plugin_source_note - } else if matches!(description.chars().last(), Some('.' | '!' | '?')) { - format!("{description} {plugin_source_note}") - } else { - format!("{description}. {plugin_source_note}") - }; - tool.tool.description = Some(Cow::Owned(annotated_description)); - } - tools - }; - - // Keep cache payloads raw; plugin provenance is resolved per-session at read time. - let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() { - Some(startup_tools) - } else { - match self.client().await { - Ok(client) => Some(client.listed_tools()), - Err(_) => self.startup_snapshot.clone(), - } - }; - tools.map(annotate_tools) - } -} - -impl McpConnectionManager { - pub fn new_uninitialized( - approval_policy: &Constrained, - permission_profile: &Constrained, - ) -> Self { - Self { - clients: HashMap::new(), - server_origins: HashMap::new(), - elicitation_requests: ElicitationRequestManager::new( - approval_policy.value(), - permission_profile.get().clone(), - ), - } - } - - pub fn has_servers(&self) -> bool { - !self.clients.is_empty() - } - - pub fn server_origin(&self, server_name: &str) -> Option<&str> { - self.server_origins.get(server_name).map(String::as_str) - } - - pub fn set_approval_policy(&self, approval_policy: &Constrained) { - if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() { - *policy = approval_policy.value(); - } - } - - pub fn set_permission_profile(&self, permission_profile: PermissionProfile) { - if let Ok(mut profile) = self.elicitation_requests.permission_profile.lock() { - *profile = permission_profile; - } - } - - #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] - pub async fn new( - mcp_servers: &HashMap, - store_mode: OAuthCredentialsStoreMode, - auth_entries: HashMap, - approval_policy: &Constrained, - submit_id: String, - tx_event: Sender, - initial_permission_profile: PermissionProfile, - runtime_environment: McpRuntimeEnvironment, - codex_home: PathBuf, - codex_apps_tools_cache_key: CodexAppsToolsCacheKey, - tool_plugin_provenance: ToolPluginProvenance, - auth: Option<&CodexAuth>, - ) -> (Self, CancellationToken) { - let cancel_token = CancellationToken::new(); - let mut clients = HashMap::new(); - let mut server_origins = HashMap::new(); - let mut join_set = JoinSet::new(); - let elicitation_requests = - ElicitationRequestManager::new(approval_policy.value(), initial_permission_profile); - let tool_plugin_provenance = Arc::new(tool_plugin_provenance); - let startup_submit_id = submit_id.clone(); - let codex_apps_auth_provider = auth - .filter(|auth| auth.uses_codex_backend()) - .map(codex_model_provider::auth_provider_from_auth); - let mcp_servers = mcp_servers.clone(); - for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) { - if let Some(origin) = transport_origin(&cfg.transport) { - server_origins.insert(server_name.clone(), origin); - } - let cancel_token = cancel_token.child_token(); - let _ = emit_update( - startup_submit_id.as_str(), - &tx_event, - McpStartupUpdateEvent { - server: server_name.clone(), - status: McpStartupStatus::Starting, - }, - ) - .await; - let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME { - Some(CodexAppsToolsCacheContext { - codex_home: codex_home.clone(), - user_key: codex_apps_tools_cache_key.clone(), - }) - } else { - None - }; - let uses_env_bearer_token = match &cfg.transport { - McpServerTransportConfig::StreamableHttp { - bearer_token_env_var, - .. - } => bearer_token_env_var.is_some(), - McpServerTransportConfig::Stdio { .. } => false, - }; - let runtime_auth_provider = - if server_name == CODEX_APPS_MCP_SERVER_NAME && !uses_env_bearer_token { - codex_apps_auth_provider.clone() - } else { - None - }; - let async_managed_client = AsyncManagedClient::new( - server_name.clone(), - cfg, - store_mode, - cancel_token.clone(), - tx_event.clone(), - elicitation_requests.clone(), - codex_apps_tools_cache_context, - Arc::clone(&tool_plugin_provenance), - runtime_environment.clone(), - runtime_auth_provider, - ); - clients.insert(server_name.clone(), async_managed_client.clone()); - let tx_event = tx_event.clone(); - let submit_id = startup_submit_id.clone(); - let auth_entry = auth_entries.get(&server_name).cloned(); - join_set.spawn(async move { - let mut outcome = async_managed_client.client().await; - if cancel_token.is_cancelled() { - outcome = Err(StartupOutcomeError::Cancelled); - } - let status = match &outcome { - Ok(_) => McpStartupStatus::Ready, - Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled, - Err(error) => { - let error_str = mcp_init_error_display( - server_name.as_str(), - auth_entry.as_ref(), - error, - ); - McpStartupStatus::Failed { error: error_str } - } - }; - - let _ = emit_update( - submit_id.as_str(), - &tx_event, - McpStartupUpdateEvent { - server: server_name.clone(), - status, - }, - ) - .await; - - (server_name, outcome) - }); - } - let manager = Self { - clients, - server_origins, - elicitation_requests: elicitation_requests.clone(), - }; - tokio::spawn(async move { - let outcomes = join_set.join_all().await; - let mut summary = McpStartupCompleteEvent::default(); - for (server_name, outcome) in outcomes { - match outcome { - Ok(_) => summary.ready.push(server_name), - Err(StartupOutcomeError::Cancelled) => summary.cancelled.push(server_name), - Err(StartupOutcomeError::Failed { error }) => { - summary.failed.push(McpStartupFailure { - server: server_name, - error, - }) - } - } - } - let _ = tx_event - .send(Event { - id: startup_submit_id, - msg: EventMsg::McpStartupComplete(summary), - }) - .await; - }); - (manager, cancel_token) - } - - pub async fn resolve_elicitation( - &self, - server_name: String, - id: RequestId, - response: ElicitationResponse, - ) -> Result<()> { - self.elicitation_requests - .resolve(server_name, id, response) - .await - } - - pub async fn wait_for_server_ready(&self, server_name: &str, timeout: Duration) -> bool { - let Some(async_managed_client) = self.clients.get(server_name) else { - return false; - }; - - match tokio::time::timeout(timeout, async_managed_client.client()).await { - Ok(Ok(_)) => true, - Ok(Err(_)) | Err(_) => false, - } - } - - pub async fn required_startup_failures( - &self, - required_servers: &[String], - ) -> Vec { - let mut failures = Vec::new(); - for server_name in required_servers { - let Some(async_managed_client) = self.clients.get(server_name).cloned() else { - failures.push(McpStartupFailure { - server: server_name.clone(), - error: format!("required MCP server `{server_name}` was not initialized"), - }); - continue; - }; - - match async_managed_client.client().await { - Ok(_) => {} - Err(error) => failures.push(McpStartupFailure { - server: server_name.clone(), - error: startup_outcome_error_message(error), - }), - } - } - failures - } - - /// Returns a single map that contains all tools. Each key is the - /// fully-qualified name for the tool. - #[instrument(level = "trace", skip_all)] - pub async fn list_all_tools(&self) -> HashMap { - let mut tools = Vec::new(); - for managed_client in self.clients.values() { - let Some(server_tools) = managed_client.listed_tools().await else { - continue; - }; - tools.extend(server_tools); - } - qualify_tools(tools) - } - - /// Force-refresh codex apps tools by bypassing the in-process cache. - /// - /// On success, the refreshed tools replace the cache contents and the - /// latest filtered tool map is returned directly to the caller. On - /// failure, the existing cache remains unchanged. - pub async fn hard_refresh_codex_apps_tools_cache(&self) -> Result> { - let managed_client = self - .clients - .get(CODEX_APPS_MCP_SERVER_NAME) - .ok_or_else(|| anyhow!("unknown MCP server '{CODEX_APPS_MCP_SERVER_NAME}'"))? - .client() - .await - .context("failed to get client")?; - - let list_start = Instant::now(); - let fetch_start = Instant::now(); - let tools = list_tools_for_client_uncached( - CODEX_APPS_MCP_SERVER_NAME, - &managed_client.client, - managed_client.tool_timeout, - managed_client.server_instructions.as_deref(), - ) - .await - .with_context(|| { - format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'") - })?; - emit_duration( - MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, - fetch_start.elapsed(), - &[], - ); - - write_cached_codex_apps_tools_if_needed( - CODEX_APPS_MCP_SERVER_NAME, - managed_client.codex_apps_tools_cache_context.as_ref(), - &tools, - ); - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - list_start.elapsed(), - &[("cache", "miss")], - ); - let tools = filter_tools(tools, &managed_client.tool_filter) - .into_iter() - .map(|mut tool| { - tool.tool = tool_with_model_visible_input_schema(&tool.tool); - tool - }); - Ok(qualify_tools(tools)) - } - - /// Returns a single map that contains all resources. Each key is the - /// server name and the value is a vector of resources. - pub async fn list_all_resources(&self) -> HashMap> { - let mut join_set = JoinSet::new(); - - let clients_snapshot = &self.clients; - - for (server_name, async_managed_client) in clients_snapshot { - let server_name = server_name.clone(); - let Ok(managed_client) = async_managed_client.client().await else { - continue; - }; - let timeout = managed_client.tool_timeout; - let client = managed_client.client.clone(); - - join_set.spawn(async move { - let mut collected: Vec = Vec::new(); - let mut cursor: Option = None; - - loop { - let params = cursor.as_ref().map(|next| PaginatedRequestParams { - meta: None, - cursor: Some(next.clone()), - }); - let response = match client.list_resources(params, timeout).await { - Ok(result) => result, - Err(err) => return (server_name, Err(err)), - }; - - collected.extend(response.resources); - - match response.next_cursor { - Some(next) => { - if cursor.as_ref() == Some(&next) { - return ( - server_name, - Err(anyhow!("resources/list returned duplicate cursor")), - ); - } - cursor = Some(next); - } - None => return (server_name, Ok(collected)), - } - } - }); - } - - let mut aggregated: HashMap> = HashMap::new(); - - while let Some(join_res) = join_set.join_next().await { - match join_res { - Ok((server_name, Ok(resources))) => { - aggregated.insert(server_name, resources); - } - Ok((server_name, Err(err))) => { - warn!("Failed to list resources for MCP server '{server_name}': {err:#}"); - } - Err(err) => { - warn!("Task panic when listing resources for MCP server: {err:#}"); - } - } - } - - aggregated - } - - /// Returns a single map that contains all resource templates. Each key is the - /// server name and the value is a vector of resource templates. - pub async fn list_all_resource_templates(&self) -> HashMap> { - let mut join_set = JoinSet::new(); - - let clients_snapshot = &self.clients; - - for (server_name, async_managed_client) in clients_snapshot { - let server_name_cloned = server_name.clone(); - let Ok(managed_client) = async_managed_client.client().await else { - continue; - }; - let client = managed_client.client.clone(); - let timeout = managed_client.tool_timeout; - - join_set.spawn(async move { - let mut collected: Vec = Vec::new(); - let mut cursor: Option = None; - - loop { - let params = cursor.as_ref().map(|next| PaginatedRequestParams { - meta: None, - cursor: Some(next.clone()), - }); - let response = match client.list_resource_templates(params, timeout).await { - Ok(result) => result, - Err(err) => return (server_name_cloned, Err(err)), - }; - - collected.extend(response.resource_templates); - - match response.next_cursor { - Some(next) => { - if cursor.as_ref() == Some(&next) { - return ( - server_name_cloned, - Err(anyhow!( - "resources/templates/list returned duplicate cursor" - )), - ); - } - cursor = Some(next); - } - None => return (server_name_cloned, Ok(collected)), - } - } - }); - } - - let mut aggregated: HashMap> = HashMap::new(); - - while let Some(join_res) = join_set.join_next().await { - match join_res { - Ok((server_name, Ok(templates))) => { - aggregated.insert(server_name, templates); - } - Ok((server_name, Err(err))) => { - warn!( - "Failed to list resource templates for MCP server '{server_name}': {err:#}" - ); - } - Err(err) => { - warn!("Task panic when listing resource templates for MCP server: {err:#}"); - } - } - } - - aggregated - } - - /// Invoke the tool indicated by the (server, tool) pair. - pub async fn call_tool( - &self, - server: &str, - tool: &str, - arguments: Option, - meta: Option, - ) -> Result { - let client = self.client_by_name(server).await?; - if !client.tool_filter.allows(tool) { - return Err(anyhow!( - "tool '{tool}' is disabled for MCP server '{server}'" - )); - } - - let result: rmcp::model::CallToolResult = client - .client - .call_tool(tool.to_string(), arguments, meta, client.tool_timeout) - .await - .with_context(|| format!("tool call failed for `{server}/{tool}`"))?; - - let content = result - .content - .into_iter() - .map(|content| { - serde_json::to_value(content) - .unwrap_or_else(|_| serde_json::Value::String("".to_string())) - }) - .collect(); - - Ok(CallToolResult { - content, - structured_content: result.structured_content, - is_error: result.is_error, - meta: result.meta.and_then(|meta| serde_json::to_value(meta).ok()), - }) - } - - pub async fn server_supports_sandbox_state_meta_capability( - &self, - server: &str, - ) -> Result { - Ok(self - .client_by_name(server) - .await? - .server_supports_sandbox_state_meta_capability) - } - - /// List resources from the specified server. - pub async fn list_resources( - &self, - server: &str, - params: Option, - ) -> Result { - let managed = self.client_by_name(server).await?; - let timeout = managed.tool_timeout; - - managed - .client - .list_resources(params, timeout) - .await - .with_context(|| format!("resources/list failed for `{server}`")) - } - - /// List resource templates from the specified server. - pub async fn list_resource_templates( - &self, - server: &str, - params: Option, - ) -> Result { - let managed = self.client_by_name(server).await?; - let client = managed.client.clone(); - let timeout = managed.tool_timeout; - - client - .list_resource_templates(params, timeout) - .await - .with_context(|| format!("resources/templates/list failed for `{server}`")) - } - - /// Read a resource from the specified server. - pub async fn read_resource( - &self, - server: &str, - params: ReadResourceRequestParams, - ) -> Result { - let managed = self.client_by_name(server).await?; - let client = managed.client.clone(); - let timeout = managed.tool_timeout; - let uri = params.uri.clone(); - - client - .read_resource(params, timeout) - .await - .with_context(|| format!("resources/read failed for `{server}` ({uri})")) - } - - pub async fn resolve_tool_info(&self, tool_name: &ToolName) -> Option { - let all_tools = self.list_all_tools().await; - all_tools - .into_values() - .find(|tool| tool.canonical_tool_name() == *tool_name) - } - - async fn client_by_name(&self, name: &str) -> Result { - self.clients - .get(name) - .ok_or_else(|| anyhow!("unknown MCP server '{name}'"))? - .client() - .await - .context("failed to get client") - } -} - -const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams"; - -/// Returns the model-visible view of a tool while preserving the raw metadata -/// used by execution. Keep cache entries raw and call this at manager return -/// boundaries. -fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool { - let file_params = declared_openai_file_input_param_names(tool.meta.as_deref()); - if file_params.is_empty() { - return tool.clone(); - } - - let mut tool = tool.clone(); - let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone()); - mask_input_schema_for_file_path_params(&mut input_schema, &file_params); - if let JsonValue::Object(input_schema) = input_schema { - tool.input_schema = Arc::new(input_schema); - } - tool -} - -fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) { - let Some(properties) = input_schema - .as_object_mut() - .and_then(|schema| schema.get_mut("properties")) - .and_then(JsonValue::as_object_mut) - else { - return; - }; - - for field_name in file_params { - let Some(property_schema) = properties.get_mut(field_name) else { - continue; - }; - mask_input_property_schema(property_schema); - } -} - -fn mask_input_property_schema(schema: &mut JsonValue) { - let Some(object) = schema.as_object_mut() else { - return; - }; - - let mut description = object - .get("description") - .and_then(JsonValue::as_str) - .map(str::to_string) - .unwrap_or_default(); - let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."; - if description.is_empty() { - description = guidance.to_string(); - } else if !description.contains(guidance) { - description = format!("{description} {guidance}"); - } - - let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array") - || object.get("items").is_some(); - object.clear(); - object.insert("description".to_string(), JsonValue::String(description)); - if is_array { - object.insert("type".to_string(), JsonValue::String("array".to_string())); - object.insert("items".to_string(), serde_json::json!({ "type": "string" })); - } else { - object.insert("type".to_string(), JsonValue::String("string".to_string())); - } -} - -async fn emit_update( - submit_id: &str, - tx_event: &Sender, - update: McpStartupUpdateEvent, -) -> Result<(), async_channel::SendError> { - tx_event - .send(Event { - id: submit_id.to_string(), - msg: EventMsg::McpStartupUpdate(update), - }) - .await -} - -fn filter_tools(tools: Vec, filter: &ToolFilter) -> Vec { - tools - .into_iter() - .filter(|tool| filter.allows(&tool.tool.name)) - .collect() -} - -fn normalize_codex_apps_tool_title( - server_name: &str, - connector_name: Option<&str>, - value: &str, -) -> String { - if server_name != CODEX_APPS_MCP_SERVER_NAME { - return value.to_string(); - } - - let Some(connector_name) = connector_name - .map(str::trim) - .filter(|name| !name.is_empty()) - else { - return value.to_string(); - }; - - let prefix = format!("{connector_name}_"); - if let Some(stripped) = value.strip_prefix(&prefix) - && !stripped.is_empty() - { - return stripped.to_string(); - } - - value.to_string() -} - -fn normalize_codex_apps_callable_name( - server_name: &str, - tool_name: &str, - connector_id: Option<&str>, - connector_name: Option<&str>, -) -> String { - if server_name != CODEX_APPS_MCP_SERVER_NAME { - return tool_name.to_string(); - } - - let tool_name = sanitize_name(tool_name); - - if let Some(connector_name) = connector_name - .map(str::trim) - .map(sanitize_name) - .filter(|name| !name.is_empty()) - && let Some(stripped) = tool_name.strip_prefix(&connector_name) - && !stripped.is_empty() - { - return stripped.to_string(); - } - - if let Some(connector_id) = connector_id - .map(str::trim) - .map(sanitize_name) - .filter(|name| !name.is_empty()) - && let Some(stripped) = tool_name.strip_prefix(&connector_id) - && !stripped.is_empty() - { - return stripped.to_string(); - } - - tool_name -} - -fn normalize_codex_apps_callable_namespace( - server_name: &str, - connector_name: Option<&str>, -) -> String { - if server_name == CODEX_APPS_MCP_SERVER_NAME - && let Some(connector_name) = connector_name - { - format!( - "mcp{}{}{}{}", - MCP_TOOL_NAME_DELIMITER, - server_name, - MCP_TOOL_NAME_DELIMITER, - sanitize_name(connector_name) - ) - } else { - format!("mcp{MCP_TOOL_NAME_DELIMITER}{server_name}{MCP_TOOL_NAME_DELIMITER}") - } -} - -fn resolve_bearer_token( - server_name: &str, - bearer_token_env_var: Option<&str>, -) -> Result> { - let Some(env_var) = bearer_token_env_var else { - return Ok(None); - }; - - match env::var(env_var) { - Ok(value) => { - if value.is_empty() { - Err(anyhow!( - "Environment variable {env_var} for MCP server '{server_name}' is empty" - )) - } else { - Ok(Some(value)) - } - } - Err(env::VarError::NotPresent) => Err(anyhow!( - "Environment variable {env_var} for MCP server '{server_name}' is not set" - )), - Err(env::VarError::NotUnicode(_)) => Err(anyhow!( - "Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode" - )), - } -} - -#[derive(Debug, Clone, thiserror::Error)] -enum StartupOutcomeError { - #[error("MCP startup cancelled")] - Cancelled, - // We can't store the original error here because anyhow::Error doesn't implement - // `Clone`. - #[error("MCP startup failed: {error}")] - Failed { error: String }, -} - -impl From for StartupOutcomeError { - fn from(error: anyhow::Error) -> Self { - Self::Failed { - error: error.to_string(), - } - } -} - -fn elicitation_capability_for_server(_server_name: &str) -> Option { - // https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities - // indicates this should be an empty object. - Some(ElicitationCapability { - form: Some(FormElicitationCapability { - schema_validation: None, - }), - url: None, - }) -} - -async fn start_server_task( - server_name: String, - client: Arc, - params: StartServerTaskParams, -) -> Result { - let StartServerTaskParams { - startup_timeout, - tool_timeout, - tool_filter, - tx_event, - elicitation_requests, - codex_apps_tools_cache_context, - } = params; - let elicitation = elicitation_capability_for_server(&server_name); - let params = InitializeRequestParams { - meta: None, - capabilities: ClientCapabilities { - experimental: None, - extensions: None, - roots: None, - sampling: None, - elicitation, - tasks: None, - }, - client_info: Implementation { - name: "codex-mcp-client".to_owned(), - version: env!("CARGO_PKG_VERSION").to_owned(), - title: Some("Codex".into()), - description: None, - icons: None, - website_url: None, - }, - protocol_version: ProtocolVersion::V_2025_06_18, - }; - - let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event); - - let initialize_result = client - .initialize(params, startup_timeout, send_elicitation) - .await - .map_err(StartupOutcomeError::from)?; - - let server_supports_sandbox_state_meta_capability = initialize_result - .capabilities - .experimental - .as_ref() - .and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY)) - .is_some(); - let list_start = Instant::now(); - let fetch_start = Instant::now(); - let tools = list_tools_for_client_uncached( - &server_name, - &client, - startup_timeout, - initialize_result.instructions.as_deref(), - ) - .await - .map_err(StartupOutcomeError::from)?; - emit_duration( - MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, - fetch_start.elapsed(), - &[], - ); - write_cached_codex_apps_tools_if_needed( - &server_name, - codex_apps_tools_cache_context.as_ref(), - &tools, - ); - if server_name == CODEX_APPS_MCP_SERVER_NAME { - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - list_start.elapsed(), - &[("cache", "miss")], - ); - } - let tools = filter_tools(tools, &tool_filter); - - let managed = ManagedClient { - client: Arc::clone(&client), - tools, - tool_timeout: Some(tool_timeout), - tool_filter, - server_instructions: initialize_result.instructions, - server_supports_sandbox_state_meta_capability, - codex_apps_tools_cache_context, - }; - - Ok(managed) -} - -struct StartServerTaskParams { - startup_timeout: Option, // TODO: cancel_token should handle this. - tool_timeout: Duration, - tool_filter: ToolFilter, - tx_event: Sender, - elicitation_requests: ElicitationRequestManager, - codex_apps_tools_cache_context: Option, -} - -async fn make_rmcp_client( - server_name: &str, - config: McpServerConfig, - store_mode: OAuthCredentialsStoreMode, - runtime_environment: McpRuntimeEnvironment, - runtime_auth_provider: Option, -) -> Result { - let McpServerConfig { - transport, - experimental_environment, - .. - } = config; - let remote_environment = match experimental_environment.as_deref() { - None | Some("local") => false, - Some("remote") => { - if !runtime_environment.environment().is_remote() { - return Err(StartupOutcomeError::from(anyhow!( - "remote MCP server `{server_name}` requires a remote environment" - ))); - } - true - } - Some(environment) => { - return Err(StartupOutcomeError::from(anyhow!( - "unsupported experimental_environment `{environment}` for MCP server `{server_name}`" - ))); - } - }; - - match transport { - McpServerTransportConfig::Stdio { - command, - args, - env, - env_vars, - cwd, - } => { - let command_os: OsString = command.into(); - let args_os: Vec = args.into_iter().map(Into::into).collect(); - let env_os = env.map(|env| { - env.into_iter() - .map(|(key, value)| (key.into(), value.into())) - .collect::>() - }); - let launcher = if remote_environment { - Arc::new(ExecutorStdioServerLauncher::new( - runtime_environment.environment().get_exec_backend(), - runtime_environment.fallback_cwd(), - )) - } else { - Arc::new(LocalStdioServerLauncher::new( - runtime_environment.fallback_cwd(), - )) as Arc - }; - - // `RmcpClient` always sees a launched MCP stdio server. The - // launcher hides whether that means a local child process or an - // executor process whose stdin/stdout bytes cross the process API. - RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher) - .await - .map_err(|err| StartupOutcomeError::from(anyhow!(err))) - } - McpServerTransportConfig::StreamableHttp { - url, - http_headers, - env_http_headers, - bearer_token_env_var, - } => { - let http_client: Arc = if remote_environment { - runtime_environment.environment().get_http_client() - } else { - Arc::new(ReqwestHttpClient) - }; - let resolved_bearer_token = - match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) { - Ok(token) => token, - Err(error) => return Err(error.into()), - }; - RmcpClient::new_streamable_http_client( - server_name, - &url, - resolved_bearer_token, - http_headers, - env_http_headers, - store_mode, - http_client, - runtime_auth_provider, - ) - .await - .map_err(StartupOutcomeError::from) - } - } -} - -fn write_cached_codex_apps_tools_if_needed( - server_name: &str, - cache_context: Option<&CodexAppsToolsCacheContext>, - tools: &[ToolInfo], -) { - if server_name != CODEX_APPS_MCP_SERVER_NAME { - return; - } - - if let Some(cache_context) = cache_context { - let cache_write_start = Instant::now(); - write_cached_codex_apps_tools(cache_context, tools); - emit_duration( - MCP_TOOLS_CACHE_WRITE_DURATION_METRIC, - cache_write_start.elapsed(), - &[], - ); - } -} - -fn load_startup_cached_codex_apps_tools_snapshot( - server_name: &str, - cache_context: Option<&CodexAppsToolsCacheContext>, -) -> Option> { - if server_name != CODEX_APPS_MCP_SERVER_NAME { - return None; - } - - let cache_context = cache_context?; - - match load_cached_codex_apps_tools(cache_context) { - CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), - CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, - } -} - -#[cfg(test)] -fn read_cached_codex_apps_tools( - cache_context: &CodexAppsToolsCacheContext, -) -> Option> { - match load_cached_codex_apps_tools(cache_context) { - CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), - CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, - } -} - -fn load_cached_codex_apps_tools( - cache_context: &CodexAppsToolsCacheContext, -) -> CachedCodexAppsToolsLoad { - let cache_path = cache_context.cache_path(); - let bytes = match std::fs::read(cache_path) { - Ok(bytes) => bytes, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - return CachedCodexAppsToolsLoad::Missing; - } - Err(_) => return CachedCodexAppsToolsLoad::Invalid, - }; - let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) { - Ok(cache) => cache, - Err(_) => return CachedCodexAppsToolsLoad::Invalid, - }; - if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION { - return CachedCodexAppsToolsLoad::Invalid; - } - CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools)) -} - -fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) { - let cache_path = cache_context.cache_path(); - if let Some(parent) = cache_path.parent() - && std::fs::create_dir_all(parent).is_err() - { - return; - } - let tools = filter_disallowed_codex_apps_tools(tools.to_vec()); - let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache { - schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION, - tools, - }) else { - return; - }; - let _ = std::fs::write(cache_path, bytes); -} - -fn filter_disallowed_codex_apps_tools(tools: Vec) -> Vec { - tools - .into_iter() - .filter(|tool| { - tool.connector_id - .as_deref() - .is_none_or(is_connector_id_allowed) - }) - .collect() -} - -fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) { - if let Some(metrics) = codex_otel::global() { - let _ = metrics.record_duration(metric, duration, tags); - } -} - -fn transport_origin(transport: &McpServerTransportConfig) -> Option { - match transport { - McpServerTransportConfig::StreamableHttp { url, .. } => { - let parsed = Url::parse(url).ok()?; - Some(parsed.origin().ascii_serialization()) - } - McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()), - } -} - -async fn list_tools_for_client_uncached( - server_name: &str, - client: &Arc, - timeout: Option, - server_instructions: Option<&str>, -) -> Result> { - let resp = client - .list_tools_with_connector_ids(/*params*/ None, timeout) - .await?; - let tools = resp - .tools - .into_iter() - .map(|tool| { - let callable_name = normalize_codex_apps_callable_name( - server_name, - &tool.tool.name, - tool.connector_id.as_deref(), - tool.connector_name.as_deref(), - ); - let callable_namespace = normalize_codex_apps_callable_namespace( - server_name, - tool.connector_name.as_deref(), - ); - let connector_name = tool.connector_name; - let connector_description = tool.connector_description; - let mut tool_def = tool.tool; - if let Some(title) = tool_def.title.as_deref() { - let normalized_title = - normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title); - if tool_def.title.as_deref() != Some(normalized_title.as_str()) { - tool_def.title = Some(normalized_title); - } - } - ToolInfo { - server_name: server_name.to_owned(), - callable_name, - callable_namespace, - server_instructions: server_instructions.map(str::to_string), - tool: tool_def, - connector_id: tool.connector_id, - connector_name, - plugin_display_names: Vec::new(), - connector_description, - } - }) - .collect(); - if server_name == CODEX_APPS_MCP_SERVER_NAME { - return Ok(filter_disallowed_codex_apps_tools(tools)); - } - Ok(tools) -} - -fn validate_mcp_server_name(server_name: &str) -> Result<()> { - let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?; - if !re.is_match(server_name) { - return Err(anyhow!( - "Invalid MCP server name '{server_name}': must match pattern {pattern}", - pattern = re.as_str() - )); - } - Ok(()) -} - -fn mcp_init_error_display( - server_name: &str, - entry: Option<&McpAuthStatusEntry>, - err: &StartupOutcomeError, -) -> String { - if let Some(McpServerTransportConfig::StreamableHttp { - url, - bearer_token_env_var, - http_headers, - .. - }) = &entry.map(|entry| &entry.config.transport) - && url == "https://api.githubcopilot.com/mcp/" - && bearer_token_env_var.is_none() - && http_headers.as_ref().map(HashMap::is_empty).unwrap_or(true) - { - format!( - "GitHub MCP does not support OAuth. Log in by adding a personal access token (https://github.com/settings/personal-access-tokens) to your environment and config.toml:\n[mcp_servers.{server_name}]\nbearer_token_env_var = CODEX_GITHUB_PERSONAL_ACCESS_TOKEN" - ) - } else if is_mcp_client_auth_required_error(err) { - format!( - "The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}`." - ) - } else if is_mcp_client_startup_timeout_error(err) { - let startup_timeout_secs = match entry { - Some(entry) => match entry.config.startup_timeout_sec { - Some(timeout) => timeout, - None => DEFAULT_STARTUP_TIMEOUT, - }, - None => DEFAULT_STARTUP_TIMEOUT, - } - .as_secs(); - format!( - "MCP client for `{server_name}` timed out after {startup_timeout_secs} seconds. Add or adjust `startup_timeout_sec` in your config.toml:\n[mcp_servers.{server_name}]\nstartup_timeout_sec = XX" - ) - } else { - format!("MCP client for `{server_name}` failed to start: {err:#}") - } -} - -fn is_mcp_client_auth_required_error(error: &StartupOutcomeError) -> bool { - match error { - StartupOutcomeError::Failed { error } => error.contains("Auth required"), - _ => false, - } -} - -fn is_mcp_client_startup_timeout_error(error: &StartupOutcomeError) -> bool { - match error { - StartupOutcomeError::Failed { error } => { - error.contains("request timed out") - || error.contains("timed out handshaking with MCP server") - } - _ => false, - } -} - -fn startup_outcome_error_message(error: StartupOutcomeError) -> String { - match error { - StartupOutcomeError::Cancelled => "MCP startup cancelled".to_string(), - StartupOutcomeError::Failed { error } => error, - } -} - -#[cfg(test)] -mod mcp_init_error_display_tests {} - -#[cfg(test)] -#[path = "mcp_connection_manager_tests.rs"] -mod tests; diff --git a/codex-rs/codex-mcp/src/rmcp_client.rs b/codex-rs/codex-mcp/src/rmcp_client.rs new file mode 100644 index 0000000000..074e57c88c --- /dev/null +++ b/codex-rs/codex-mcp/src/rmcp_client.rs @@ -0,0 +1,591 @@ +//! RMCP client lifecycle for MCP server connections. +//! +//! This module owns startup of individual RMCP clients: building the transport, +//! initializing the server, listing raw tools, applying per-server tool filters, +//! and exposing cached startup snapshots while a client is still connecting. +//! Higher-level aggregation and resource/tool APIs live in +//! [`crate::connection_manager`]. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::env; +use std::ffi::OsString; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::time::Duration; +use std::time::Instant; + +use crate::codex_apps::CachedCodexAppsToolsLoad; +use crate::codex_apps::CodexAppsToolsCacheContext; +use crate::codex_apps::filter_disallowed_codex_apps_tools; +use crate::codex_apps::load_cached_codex_apps_tools; +use crate::codex_apps::load_startup_cached_codex_apps_tools_snapshot; +use crate::codex_apps::normalize_codex_apps_callable_name; +use crate::codex_apps::normalize_codex_apps_callable_namespace; +use crate::codex_apps::normalize_codex_apps_tool_title; +use crate::codex_apps::write_cached_codex_apps_tools_if_needed; +use crate::elicitation::ElicitationRequestManager; +use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; +use crate::mcp::ToolPluginProvenance; +use crate::runtime::McpRuntimeEnvironment; +use crate::runtime::emit_duration; +use crate::tools::ToolFilter; +use crate::tools::ToolInfo; +use crate::tools::filter_tools; +use crate::tools::tool_with_model_visible_input_schema; +use anyhow::Result; +use anyhow::anyhow; +use async_channel::Sender; +use codex_api::SharedAuthProvider; +use codex_async_utils::CancelErr; +use codex_async_utils::OrCancelExt; +use codex_config::McpServerConfig; +use codex_config::McpServerTransportConfig; +use codex_config::types::OAuthCredentialsStoreMode; +use codex_exec_server::HttpClient; +use codex_exec_server::ReqwestHttpClient; +use codex_protocol::protocol::Event; +use codex_rmcp_client::ExecutorStdioServerLauncher; +use codex_rmcp_client::LocalStdioServerLauncher; +use codex_rmcp_client::RmcpClient; +use codex_rmcp_client::StdioServerLauncher; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use futures::future::Shared; +use rmcp::model::ClientCapabilities; +use rmcp::model::ElicitationCapability; +use rmcp::model::FormElicitationCapability; +use rmcp::model::Implementation; +use rmcp::model::InitializeRequestParams; +use rmcp::model::ProtocolVersion; +use tokio_util::sync::CancellationToken; + +/// MCP server capability indicating that Codex should include [`SandboxState`] +/// in tool-call request `_meta` under this key. +pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta"; + +pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms"; +pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = + "codex.mcp.tools.fetch_uncached.duration_ms"; +pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120); + +#[derive(Clone)] +pub(crate) struct ManagedClient { + pub(crate) client: Arc, + pub(crate) tools: Vec, + pub(crate) tool_filter: ToolFilter, + pub(crate) tool_timeout: Option, + pub(crate) server_instructions: Option, + pub(crate) server_supports_sandbox_state_meta_capability: bool, + pub(crate) codex_apps_tools_cache_context: Option, +} + +impl ManagedClient { + fn listed_tools(&self) -> Vec { + let total_start = Instant::now(); + if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref() + && let CachedCodexAppsToolsLoad::Hit(tools) = + load_cached_codex_apps_tools(cache_context) + { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + total_start.elapsed(), + &[("cache", "hit")], + ); + return filter_tools(tools, &self.tool_filter); + } + + if self.codex_apps_tools_cache_context.is_some() { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + total_start.elapsed(), + &[("cache", "miss")], + ); + } + + self.tools.clone() + } +} + +#[derive(Clone)] +pub(crate) struct AsyncManagedClient { + pub(crate) client: Shared>>, + pub(crate) startup_snapshot: Option>, + pub(crate) startup_complete: Arc, + pub(crate) tool_plugin_provenance: Arc, +} + +impl AsyncManagedClient { + // Keep this constructor flat so the startup inputs remain readable at the + // single call site instead of introducing a one-off params wrapper. + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + server_name: String, + config: McpServerConfig, + store_mode: OAuthCredentialsStoreMode, + cancel_token: CancellationToken, + tx_event: Sender, + elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, + tool_plugin_provenance: Arc, + runtime_environment: McpRuntimeEnvironment, + runtime_auth_provider: Option, + ) -> Self { + let tool_filter = ToolFilter::from_config(&config); + let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( + &server_name, + codex_apps_tools_cache_context.as_ref(), + ) + .map(|tools| filter_tools(tools, &tool_filter)); + let startup_tool_filter = tool_filter; + let startup_complete = Arc::new(AtomicBool::new(false)); + let startup_complete_for_fut = Arc::clone(&startup_complete); + let fut = async move { + let outcome = async { + if let Err(error) = validate_mcp_server_name(&server_name) { + return Err(error.into()); + } + + let client = Arc::new( + make_rmcp_client( + &server_name, + config.clone(), + store_mode, + runtime_environment, + runtime_auth_provider, + ) + .await?, + ); + match start_server_task( + server_name, + client, + StartServerTaskParams { + startup_timeout: config + .startup_timeout_sec + .or(Some(DEFAULT_STARTUP_TIMEOUT)), + tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), + tool_filter: startup_tool_filter, + tx_event, + elicitation_requests, + codex_apps_tools_cache_context, + }, + ) + .or_cancel(&cancel_token) + .await + { + Ok(result) => result, + Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled), + } + } + .await; + + startup_complete_for_fut.store(true, Ordering::Release); + outcome + }; + let client = fut.boxed().shared(); + if startup_snapshot.is_some() { + let startup_task = client.clone(); + tokio::spawn(async move { + let _ = startup_task.await; + }); + } + + Self { + client, + startup_snapshot, + startup_complete, + tool_plugin_provenance, + } + } + + pub(crate) async fn client(&self) -> Result { + self.client.clone().await + } + + fn startup_snapshot_while_initializing(&self) -> Option> { + if !self.startup_complete.load(Ordering::Acquire) { + return self.startup_snapshot.clone(); + } + None + } + + pub(crate) async fn listed_tools(&self) -> Option> { + let annotate_tools = |tools: Vec| { + let mut tools = tools; + for tool in &mut tools { + if tool.server_name == CODEX_APPS_MCP_SERVER_NAME { + tool.tool = tool_with_model_visible_input_schema(&tool.tool); + } + + let plugin_names = match tool.connector_id.as_deref() { + Some(connector_id) => self + .tool_plugin_provenance + .plugin_display_names_for_connector_id(connector_id), + None => self + .tool_plugin_provenance + .plugin_display_names_for_mcp_server_name(tool.server_name.as_str()), + }; + tool.plugin_display_names = plugin_names.to_vec(); + + if plugin_names.is_empty() { + continue; + } + + let plugin_source_note = if plugin_names.len() == 1 { + format!("This tool is part of plugin `{}`.", plugin_names[0]) + } else { + format!( + "This tool is part of plugins {}.", + plugin_names + .iter() + .map(|plugin_name| format!("`{plugin_name}`")) + .collect::>() + .join(", ") + ) + }; + let description = tool + .tool + .description + .as_deref() + .map(str::trim) + .unwrap_or(""); + let annotated_description = if description.is_empty() { + plugin_source_note + } else if matches!(description.chars().last(), Some('.' | '!' | '?')) { + format!("{description} {plugin_source_note}") + } else { + format!("{description}. {plugin_source_note}") + }; + tool.tool.description = Some(Cow::Owned(annotated_description)); + } + tools + }; + + // Keep cache payloads raw; plugin provenance is resolved per-session at read time. + let tools = if let Some(startup_tools) = self.startup_snapshot_while_initializing() { + Some(startup_tools) + } else { + match self.client().await { + Ok(client) => Some(client.listed_tools()), + Err(_) => self.startup_snapshot.clone(), + } + }; + tools.map(annotate_tools) + } +} + +#[derive(Debug, Clone, thiserror::Error)] +pub(crate) enum StartupOutcomeError { + #[error("MCP startup cancelled")] + Cancelled, + // We can't store the original error here because anyhow::Error doesn't implement + // `Clone`. + #[error("MCP startup failed: {error}")] + Failed { error: String }, +} + +impl From for StartupOutcomeError { + fn from(error: anyhow::Error) -> Self { + Self::Failed { + error: error.to_string(), + } + } +} + +pub(crate) fn elicitation_capability_for_server( + _server_name: &str, +) -> Option { + // https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities + // indicates this should be an empty object. + Some(ElicitationCapability { + form: Some(FormElicitationCapability { + schema_validation: None, + }), + url: None, + }) +} + +pub(crate) async fn list_tools_for_client_uncached( + server_name: &str, + client: &Arc, + timeout: Option, + server_instructions: Option<&str>, +) -> Result> { + let resp = client + .list_tools_with_connector_ids(/*params*/ None, timeout) + .await?; + let tools = resp + .tools + .into_iter() + .map(|tool| { + let callable_name = normalize_codex_apps_callable_name( + server_name, + &tool.tool.name, + tool.connector_id.as_deref(), + tool.connector_name.as_deref(), + ); + let callable_namespace = normalize_codex_apps_callable_namespace( + server_name, + tool.connector_name.as_deref(), + ); + let connector_name = tool.connector_name; + let connector_description = tool.connector_description; + let mut tool_def = tool.tool; + if let Some(title) = tool_def.title.as_deref() { + let normalized_title = + normalize_codex_apps_tool_title(server_name, connector_name.as_deref(), title); + if tool_def.title.as_deref() != Some(normalized_title.as_str()) { + tool_def.title = Some(normalized_title); + } + } + ToolInfo { + server_name: server_name.to_owned(), + callable_name, + callable_namespace, + server_instructions: server_instructions.map(str::to_string), + tool: tool_def, + connector_id: tool.connector_id, + connector_name, + plugin_display_names: Vec::new(), + connector_description, + } + }) + .collect(); + if server_name == CODEX_APPS_MCP_SERVER_NAME { + return Ok(filter_disallowed_codex_apps_tools(tools)); + } + Ok(tools) +} + +fn resolve_bearer_token( + server_name: &str, + bearer_token_env_var: Option<&str>, +) -> Result> { + let Some(env_var) = bearer_token_env_var else { + return Ok(None); + }; + + match env::var(env_var) { + Ok(value) => { + if value.is_empty() { + Err(anyhow!( + "Environment variable {env_var} for MCP server '{server_name}' is empty" + )) + } else { + Ok(Some(value)) + } + } + Err(env::VarError::NotPresent) => Err(anyhow!( + "Environment variable {env_var} for MCP server '{server_name}' is not set" + )), + Err(env::VarError::NotUnicode(_)) => Err(anyhow!( + "Environment variable {env_var} for MCP server '{server_name}' contains invalid Unicode" + )), + } +} + +fn validate_mcp_server_name(server_name: &str) -> Result<()> { + let re = regex_lite::Regex::new(r"^[a-zA-Z0-9_-]+$")?; + if !re.is_match(server_name) { + return Err(anyhow!( + "Invalid MCP server name '{server_name}': must match pattern {pattern}", + pattern = re.as_str() + )); + } + Ok(()) +} + +async fn start_server_task( + server_name: String, + client: Arc, + params: StartServerTaskParams, +) -> Result { + let StartServerTaskParams { + startup_timeout, + tool_timeout, + tool_filter, + tx_event, + elicitation_requests, + codex_apps_tools_cache_context, + } = params; + let elicitation = elicitation_capability_for_server(&server_name); + let params = InitializeRequestParams { + meta: None, + capabilities: ClientCapabilities { + experimental: None, + extensions: None, + roots: None, + sampling: None, + elicitation, + tasks: None, + }, + client_info: Implementation { + name: "codex-mcp-client".to_owned(), + version: env!("CARGO_PKG_VERSION").to_owned(), + title: Some("Codex".into()), + description: None, + icons: None, + website_url: None, + }, + protocol_version: ProtocolVersion::V_2025_06_18, + }; + + let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event); + + let initialize_result = client + .initialize(params, startup_timeout, send_elicitation) + .await + .map_err(StartupOutcomeError::from)?; + + let server_supports_sandbox_state_meta_capability = initialize_result + .capabilities + .experimental + .as_ref() + .and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY)) + .is_some(); + let list_start = Instant::now(); + let fetch_start = Instant::now(); + let tools = list_tools_for_client_uncached( + &server_name, + &client, + startup_timeout, + initialize_result.instructions.as_deref(), + ) + .await + .map_err(StartupOutcomeError::from)?; + emit_duration( + MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, + fetch_start.elapsed(), + &[], + ); + write_cached_codex_apps_tools_if_needed( + &server_name, + codex_apps_tools_cache_context.as_ref(), + &tools, + ); + if server_name == CODEX_APPS_MCP_SERVER_NAME { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + list_start.elapsed(), + &[("cache", "miss")], + ); + } + let tools = filter_tools(tools, &tool_filter); + + let managed = ManagedClient { + client: Arc::clone(&client), + tools, + tool_timeout: Some(tool_timeout), + tool_filter, + server_instructions: initialize_result.instructions, + server_supports_sandbox_state_meta_capability, + codex_apps_tools_cache_context, + }; + + Ok(managed) +} + +struct StartServerTaskParams { + startup_timeout: Option, // TODO: cancel_token should handle this. + tool_timeout: Duration, + tool_filter: ToolFilter, + tx_event: Sender, + elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, +} + +async fn make_rmcp_client( + server_name: &str, + config: McpServerConfig, + store_mode: OAuthCredentialsStoreMode, + runtime_environment: McpRuntimeEnvironment, + runtime_auth_provider: Option, +) -> Result { + let McpServerConfig { + transport, + experimental_environment, + .. + } = config; + let remote_environment = match experimental_environment.as_deref() { + None | Some("local") => false, + Some("remote") => { + if !runtime_environment.environment().is_remote() { + return Err(StartupOutcomeError::from(anyhow!( + "remote MCP server `{server_name}` requires a remote environment" + ))); + } + true + } + Some(environment) => { + return Err(StartupOutcomeError::from(anyhow!( + "unsupported experimental_environment `{environment}` for MCP server `{server_name}`" + ))); + } + }; + + match transport { + McpServerTransportConfig::Stdio { + command, + args, + env, + env_vars, + cwd, + } => { + let command_os: OsString = command.into(); + let args_os: Vec = args.into_iter().map(Into::into).collect(); + let env_os = env.map(|env| { + env.into_iter() + .map(|(key, value)| (key.into(), value.into())) + .collect::>() + }); + let launcher = if remote_environment { + Arc::new(ExecutorStdioServerLauncher::new( + runtime_environment.environment().get_exec_backend(), + runtime_environment.fallback_cwd(), + )) + } else { + Arc::new(LocalStdioServerLauncher::new( + runtime_environment.fallback_cwd(), + )) as Arc + }; + + // `RmcpClient` always sees a launched MCP stdio server. The + // launcher hides whether that means a local child process or an + // executor process whose stdin/stdout bytes cross the process API. + RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher) + .await + .map_err(|err| StartupOutcomeError::from(anyhow!(err))) + } + McpServerTransportConfig::StreamableHttp { + url, + http_headers, + env_http_headers, + bearer_token_env_var, + } => { + let http_client: Arc = if remote_environment { + runtime_environment.environment().get_http_client() + } else { + Arc::new(ReqwestHttpClient) + }; + let resolved_bearer_token = + match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) { + Ok(token) => token, + Err(error) => return Err(error.into()), + }; + RmcpClient::new_streamable_http_client( + server_name, + &url, + resolved_bearer_token, + http_headers, + env_http_headers, + store_mode, + http_client, + runtime_auth_provider, + ) + .await + .map_err(StartupOutcomeError::from) + } + } +} diff --git a/codex-rs/codex-mcp/src/runtime.rs b/codex-rs/codex-mcp/src/runtime.rs new file mode 100644 index 0000000000..4284c96ff6 --- /dev/null +++ b/codex-rs/codex-mcp/src/runtime.rs @@ -0,0 +1,66 @@ +//! Runtime support for Model Context Protocol (MCP) servers. +//! +//! This module contains data that describes the runtime environment in which MCP +//! servers execute, plus the sandbox state payload sent to capable servers and a +//! tiny shared metrics helper. Transport startup and orchestration live in +//! [`crate::rmcp_client`] and [`crate::connection_manager`]. + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use codex_exec_server::Environment; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::SandboxPolicy; + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SandboxState { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub permission_profile: Option, + pub sandbox_policy: SandboxPolicy, + pub codex_linux_sandbox_exe: Option, + pub sandbox_cwd: PathBuf, + #[serde(default)] + pub use_legacy_landlock: bool, +} + +/// Runtime placement information used when starting MCP server transports. +/// +/// `McpConfig` describes what servers exist. This value describes where those +/// servers should run for the current caller. Keep it explicit at manager +/// construction time so status/snapshot paths and real sessions make the same +/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is +/// used when a stdio server omits `cwd` and the launcher needs a concrete +/// process working directory. +#[derive(Clone)] +pub struct McpRuntimeEnvironment { + environment: Arc, + fallback_cwd: PathBuf, +} + +impl McpRuntimeEnvironment { + pub fn new(environment: Arc, fallback_cwd: PathBuf) -> Self { + Self { + environment, + fallback_cwd, + } + } + + pub(crate) fn environment(&self) -> Arc { + Arc::clone(&self.environment) + } + + pub(crate) fn fallback_cwd(&self) -> PathBuf { + self.fallback_cwd.clone() + } +} + +pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) { + if let Some(metrics) = codex_otel::global() { + let _ = metrics.record_duration(metric, duration, tags); + } +} diff --git a/codex-rs/codex-mcp/src/mcp_tool_names.rs b/codex-rs/codex-mcp/src/tools.rs similarity index 53% rename from codex-rs/codex-mcp/src/mcp_tool_names.rs rename to codex-rs/codex-mcp/src/tools.rs index 2d2d100c0a..9b677e8a07 100644 --- a/codex-rs/codex-mcp/src/mcp_tool_names.rs +++ b/codex-rs/codex-mcp/src/tools.rs @@ -1,18 +1,134 @@ -//! Allocates model-visible MCP tool names while preserving raw MCP identities. +//! MCP tool metadata, filtering, schema shaping, and name qualification. +//! +//! Raw MCP tool identities must be preserved for protocol calls, while +//! model-visible tool names must be sanitized, deduplicated, and kept within API +//! limits. This module owns that translation as well as the shared [`ToolInfo`] +//! type and helpers that adjust tool schemas before exposing them to the model. use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Arc; +use codex_config::McpServerConfig; +use codex_protocol::ToolName; +use rmcp::model::Tool; +use serde::Deserialize; +use serde::Serialize; +use serde_json::Map; +use serde_json::Value as JsonValue; use sha1::Digest; use sha1::Sha1; use tracing::warn; use crate::mcp::sanitize_responses_api_tool_name; -use crate::mcp_connection_manager::ToolInfo; -const MCP_TOOL_NAME_DELIMITER: &str = "__"; -const MAX_TOOL_NAME_LENGTH: usize = 64; -const CALLABLE_NAME_HASH_LEN: usize = 12; +pub(crate) const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = + "codex.mcp.tools.cache_write.duration_ms"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolInfo { + /// Raw MCP server name used for routing the tool call. + pub server_name: String, + /// Model-visible tool name used in Responses API tool declarations. + #[serde(rename = "tool_name", alias = "callable_name")] + pub callable_name: String, + /// Model-visible namespace used for deferred tool loading. + #[serde(rename = "tool_namespace", alias = "callable_namespace")] + pub callable_namespace: String, + /// Instructions from the MCP server initialize result. + #[serde(default)] + pub server_instructions: Option, + /// Raw MCP tool definition; `tool.name` is sent back to the MCP server. + pub tool: Tool, + pub connector_id: Option, + pub connector_name: Option, + #[serde(default)] + pub plugin_display_names: Vec, + pub connector_description: Option, +} + +impl ToolInfo { + pub fn canonical_tool_name(&self) -> ToolName { + ToolName::namespaced(self.callable_namespace.clone(), self.callable_name.clone()) + } +} + +pub fn declared_openai_file_input_param_names( + meta: Option<&Map>, +) -> Vec { + let Some(meta) = meta else { + return Vec::new(); + }; + + meta.get(META_OPENAI_FILE_PARAMS) + .and_then(JsonValue::as_array) + .into_iter() + .flatten() + .filter_map(JsonValue::as_str) + .filter(|value| !value.is_empty()) + .map(str::to_string) + .collect() +} + +/// A tool is allowed to be used if both are true: +/// 1. enabled is None (no allowlist is set) or the tool is explicitly enabled. +/// 2. The tool is not explicitly disabled. +#[derive(Default, Clone)] +pub(crate) struct ToolFilter { + pub(crate) enabled: Option>, + pub(crate) disabled: HashSet, +} + +impl ToolFilter { + pub(crate) fn from_config(cfg: &McpServerConfig) -> Self { + let enabled = cfg + .enabled_tools + .as_ref() + .map(|tools| tools.iter().cloned().collect::>()); + let disabled = cfg + .disabled_tools + .as_ref() + .map(|tools| tools.iter().cloned().collect::>()) + .unwrap_or_default(); + + Self { enabled, disabled } + } + + pub(crate) fn allows(&self, tool_name: &str) -> bool { + if let Some(enabled) = &self.enabled + && !enabled.contains(tool_name) + { + return false; + } + + !self.disabled.contains(tool_name) + } +} + +/// Returns the model-visible view of a tool while preserving the raw metadata +/// used by execution. Keep cache entries raw and call this at manager return +/// boundaries. +pub(crate) fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool { + let file_params = declared_openai_file_input_param_names(tool.meta.as_deref()); + if file_params.is_empty() { + return tool.clone(); + } + + let mut tool = tool.clone(); + let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone()); + mask_input_schema_for_file_path_params(&mut input_schema, &file_params); + if let JsonValue::Object(input_schema) = input_schema { + tool.input_schema = Arc::new(input_schema); + } + tool +} + +pub(crate) fn filter_tools(tools: Vec, filter: &ToolFilter) -> Vec { + tools + .into_iter() + .filter(|tool| filter.allows(&tool.tool.name)) + .collect() +} /// Returns a qualified-name lookup for MCP tools. /// @@ -121,6 +237,57 @@ struct CallableToolCandidate { callable_name: String, } +const MCP_TOOL_NAME_DELIMITER: &str = "__"; +const MAX_TOOL_NAME_LENGTH: usize = 64; +const CALLABLE_NAME_HASH_LEN: usize = 12; +const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams"; + +fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) { + let Some(properties) = input_schema + .as_object_mut() + .and_then(|schema| schema.get_mut("properties")) + .and_then(JsonValue::as_object_mut) + else { + return; + }; + + for field_name in file_params { + let Some(property_schema) = properties.get_mut(field_name) else { + continue; + }; + mask_input_property_schema(property_schema); + } +} + +fn mask_input_property_schema(schema: &mut JsonValue) { + let Some(object) = schema.as_object_mut() else { + return; + }; + + let mut description = object + .get("description") + .and_then(JsonValue::as_str) + .map(str::to_string) + .unwrap_or_default(); + let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."; + if description.is_empty() { + description = guidance.to_string(); + } else if !description.contains(guidance) { + description = format!("{description} {guidance}"); + } + + let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array") + || object.get("items").is_some(); + object.clear(); + object.insert("description".to_string(), JsonValue::String(description)); + if is_array { + object.insert("type".to_string(), JsonValue::String("array".to_string())); + object.insert("items".to_string(), serde_json::json!({ "type": "string" })); + } else { + object.insert("type".to_string(), JsonValue::String("string".to_string())); + } +} + fn sha1_hex(s: &str) -> String { let mut hasher = Sha1::new(); hasher.update(s.as_bytes());