Compare commits

...

1 Commits

Author SHA1 Message Date
Kazuhiro Sera
014562eef0 feat: enable Streamable HTTP MCP servers & lazy loading of tools 2025-08-30 09:54:05 +09:00
9 changed files with 573 additions and 93 deletions

3
codex-rs/Cargo.lock generated
View File

@@ -868,7 +868,10 @@ name = "codex-mcp-client"
version = "0.0.0"
dependencies = [
"anyhow",
"eventsource-stream",
"futures",
"mcp-types",
"reqwest",
"serde",
"serde_json",
"tokio",

View File

@@ -3,4 +3,4 @@
This file has moved. Please see the latest configuration documentation here:
- Full config docs: [docs/config.md](../docs/config.md)
- MCP servers section: [docs/config.md#mcp_servers](../docs/config.md#mcp_servers)
- MCP servers section: [docs/config.md#mcp_servers](../docs/config.md#mcp_servers)

View File

@@ -576,6 +576,9 @@ impl Session {
}
}
// Kick off background MCP tool initialization/loading without blocking the UI.
sess.mcp_connection_manager.refresh_tools_in_background();
Ok((sess, turn_context))
}
@@ -1286,7 +1289,12 @@ async fn submission_loop(
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
// Refresh tools on-demand to give users an up-to-date list.
// This runs in the foreground for this operation only.
if let Err(e) = sess.mcp_connection_manager.refresh_tools().await {
warn!("failed to refresh MCP tools: {e:#}");
}
let tools = sess.mcp_connection_manager.list_all_tools();
let event = Event {
id: sub_id,
@@ -1613,6 +1621,12 @@ async fn run_turn(
sub_id: String,
input: Vec<ResponseItem>,
) -> CodexResult<Vec<ProcessedResponseItem>> {
// Give MCP tools a brief moment to arrive if background discovery is still running.
// This avoids an empty tool list in the first turn right after startup.
sess.mcp_connection_manager
.wait_for_tools_with_timeout(std::time::Duration::from_millis(800))
.await;
let tools = get_openai_tools(
&turn_context.tools_config,
Some(sess.mcp_connection_manager.list_all_tools()),

View File

@@ -12,14 +12,25 @@ use serde::Serialize;
use strum_macros::Display;
#[derive(Deserialize, Debug, Clone, PartialEq)]
pub struct McpServerConfig {
pub command: String,
#[serde(untagged)]
pub enum McpServerConfig {
/// MCP server launched as a subprocess that communicates over stdio.
Stdio {
command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
args: Vec<String>,
#[serde(default)]
pub env: Option<HashMap<String, String>>,
#[serde(default)]
env: Option<HashMap<String, String>>,
},
/// MCP server reachable via the Streamable HTTP transport.
StreamableHttp {
url: String,
#[serde(default)]
headers: Option<HashMap<String, String>>,
},
}
#[derive(Deserialize, Debug, Copy, Clone, PartialEq)]

View File

@@ -9,6 +9,7 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsString;
use std::sync::RwLock;
use std::time::Duration;
use anyhow::Context;
@@ -22,6 +23,8 @@ use mcp_types::Tool;
use serde_json::json;
use sha1::Digest;
use sha1::Sha1;
use tokio::sync::OnceCell;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::info;
use tracing::warn;
@@ -38,6 +41,8 @@ const MAX_TOOL_NAME_LENGTH: usize = 64;
/// Timeout for the `tools/list` request.
const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10);
/// Timeout for MCP initialize handshake.
const INIT_TIMEOUT: Duration = Duration::from_secs(10);
/// Map that holds a startup error for every MCP server that could **not** be
/// spawned successfully.
@@ -82,7 +87,6 @@ struct ToolInfo {
}
/// A thin wrapper around a set of running [`McpClient`] instances.
#[derive(Default)]
pub(crate) struct McpConnectionManager {
/// Server-name -> client instance.
///
@@ -90,11 +94,48 @@ pub(crate) struct McpConnectionManager {
/// the user configuration.
clients: HashMap<String, std::sync::Arc<McpClient>>,
/// Fully qualified tool name -> tool instance.
tools: HashMap<String, ToolInfo>,
/// Fully qualified tool name -> tool instance. Populated asynchronously
/// after clients are initialized to avoid blocking shell startup.
tools: std::sync::Arc<RwLock<HashMap<String, ToolInfo>>>,
/// Lazy server initialization status (initialize is sent once per server on first use)
init_cells: HashMap<String, std::sync::Arc<OnceCell<()>>>,
/// Broadcasts current MCP tool count so callers can await initial load.
tool_count_tx: watch::Sender<usize>,
}
impl Default for McpConnectionManager {
fn default() -> Self {
let (tx, _rx) = watch::channel(0usize);
Self {
clients: HashMap::new(),
tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
init_cells: HashMap::new(),
tool_count_tx: tx,
}
}
}
impl McpConnectionManager {
fn build_initialize_params() -> mcp_types::InitializeRequestParams {
mcp_types::InitializeRequestParams {
capabilities: ClientCapabilities {
experimental: None,
roots: None,
sampling: None,
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
elicitation: Some(json!({})),
},
client_info: Implementation {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".into()),
},
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(),
}
}
/// Spawn a [`McpClient`] for each configured server.
///
/// * `mcp_servers` Map loaded from the user configuration where *keys*
@@ -127,43 +168,21 @@ impl McpConnectionManager {
}
join_set.spawn(async move {
let McpServerConfig { command, args, env } = cfg;
let client_res = McpClient::new_stdio_client(
command.into(),
args.into_iter().map(OsString::from).collect(),
env,
)
.await;
match client_res {
Ok(client) => {
// Initialize the client.
let params = mcp_types::InitializeRequestParams {
capabilities: ClientCapabilities {
experimental: None,
roots: None,
sampling: None,
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
// indicates this should be an empty object.
elicitation: Some(json!({})),
},
client_info: Implementation {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".into()),
},
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(),
};
let initialize_notification_params = None;
let timeout = Some(Duration::from_secs(10));
match client
.initialize(params, initialize_notification_params, timeout)
.await
{
Ok(_response) => (server_name, Ok(client)),
Err(e) => (server_name, Err(e)),
}
let client_res = match cfg {
McpServerConfig::Stdio { command, args, env } => McpClient::new_stdio_client(
command.into(),
args.into_iter().map(OsString::from).collect(),
env,
)
.await
.map_err(anyhow::Error::from),
McpServerConfig::StreamableHttp { url, headers } => {
McpClient::new_streamable_http_client(url, headers).await
}
Err(e) => (server_name, Err(e.into())),
};
match client_res {
Ok(client) => (server_name, Ok(client)),
Err(e) => (server_name, Err(e)),
}
});
}
@@ -184,20 +203,39 @@ impl McpConnectionManager {
}
}
let all_tools = list_all_tools(&clients).await?;
// Initialize with an empty tool registry. No network calls are made here
// to keep shell startup and teardown snappy.
let tools: std::sync::Arc<RwLock<HashMap<String, ToolInfo>>> =
std::sync::Arc::new(RwLock::new(HashMap::new()));
let tools = qualify_tools(all_tools);
// Prepare lazy init cells for each server.
let mut init_cells = HashMap::new();
for server in clients.keys() {
init_cells.insert(server.clone(), std::sync::Arc::new(OnceCell::new()));
}
Ok((Self { clients, tools }, errors))
let (tool_count_tx, _rx) = watch::channel(0usize);
Ok((
Self {
clients,
tools,
init_cells,
tool_count_tx,
},
errors,
))
}
/// Returns a single map that contains **all** tools. Each key is the
/// fully-qualified name for the tool.
pub fn list_all_tools(&self) -> HashMap<String, Tool> {
self.tools
.iter()
.map(|(name, tool)| (name.clone(), tool.tool.clone()))
.collect()
match self.tools.read() {
Ok(guard) => guard
.iter()
.map(|(name, tool)| (name.clone(), tool.tool.clone()))
.collect(),
Err(_) => HashMap::new(),
}
}
/// Invoke the tool indicated by the (server, tool) pair.
@@ -213,7 +251,8 @@ impl McpConnectionManager {
.get(server)
.ok_or_else(|| anyhow!("unknown MCP server '{server}'"))?
.clone();
// Ensure the server is initialized before invoking tools
self.ensure_initialized(server, &client).await?;
client
.call_tool(tool.to_string(), arguments, timeout)
.await
@@ -221,9 +260,123 @@ impl McpConnectionManager {
}
pub fn parse_tool_name(&self, tool_name: &str) -> Option<(String, String)> {
self.tools
.get(tool_name)
.map(|tool| (tool.server_name.clone(), tool.tool_name.clone()))
match self.tools.read() {
Ok(guard) => guard
.get(tool_name)
.map(|tool| (tool.server_name.clone(), tool.tool_name.clone())),
Err(_) => None,
}
}
/// Refresh the internal tool registry synchronously on demand.
pub async fn refresh_tools(&self) -> Result<()> {
// Initialize each server on demand before listing tools
for (server, client) in &self.clients {
self.ensure_initialized(server, client).await?;
}
let clients_snapshot: HashMap<_, _> = self
.clients
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let all_tools = list_all_tools(&clients_snapshot).await?;
let qualified = qualify_tools(all_tools);
if let Ok(mut guard) = self.tools.write() {
*guard = qualified;
}
let _ = self
.tool_count_tx
.send(self.tools.read().map(|m| m.len()).unwrap_or(0));
Ok(())
}
async fn ensure_initialized(
&self,
server_name: &str,
client: &std::sync::Arc<McpClient>,
) -> Result<()> {
let cell = self
.init_cells
.get(server_name)
.ok_or_else(|| anyhow!(format!("missing init cell for server {server_name}")))?
.clone();
let initialize = async {
let params = Self::build_initialize_params();
let initialize_notification_params = None;
let timeout = Some(INIT_TIMEOUT);
client
.initialize(params, initialize_notification_params, timeout)
.await
.map(|_| ())
};
cell.get_or_try_init(|| initialize).await.map(|_| ())
}
/// Spawn a background refresh that initializes servers and loads tools.
/// This does not block the caller and is safe to call multiple times.
pub fn refresh_tools_in_background(&self) {
let tools_ref = self.tools.clone();
let tx = self.tool_count_tx.clone();
let clients_snapshot: HashMap<_, _> = self
.clients
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let init_cells_snapshot: HashMap<_, _> = self
.init_cells
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
tokio::spawn(async move {
// Initialize each server once
for (server, client) in &clients_snapshot {
if let Some(cell) = init_cells_snapshot.get(server) {
let params = Self::build_initialize_params();
let initialize_notification_params = None;
let timeout = Some(INIT_TIMEOUT);
let client_clone = client.clone();
let _ = cell
.get_or_try_init(|| async move {
client_clone
.initialize(params, initialize_notification_params, timeout)
.await
.map(|_| ())
})
.await;
}
}
// Load tools after init
match list_all_tools(&clients_snapshot).await {
Ok(all_tools) => {
let qualified = qualify_tools(all_tools);
if let Ok(mut guard) = tools_ref.write() {
*guard = qualified;
}
let _ = tx.send(tools_ref.read().map(|m| m.len()).unwrap_or(0));
}
Err(e) => warn!("failed to list MCP tools in background: {e:#}"),
}
});
}
/// Optionally await until tools are available, bounded by a timeout.
pub async fn wait_for_tools_with_timeout(&self, timeout: Duration) {
let mut rx = self.tool_count_tx.subscribe();
if *rx.borrow() > 0 {
return;
}
let _ = tokio::time::timeout(timeout, async {
while rx.changed().await.is_ok() {
if *rx.borrow() > 0 {
break;
}
}
})
.await;
}
}
@@ -366,4 +519,61 @@ mod tests {
"my_server__yet_another_e1c3987bd9c50b826cbe1687966f79f0c602d19ca"
);
}
#[test]
fn test_is_valid_server_name() {
assert!(is_valid_mcp_server_name("valid-Server_01"));
assert!(!is_valid_mcp_server_name("invalid server"));
assert!(!is_valid_mcp_server_name("invalid/server"));
assert!(!is_valid_mcp_server_name(""));
}
#[test]
fn test_wait_for_tools_returns_immediately_when_ready() {
// Arrange: manager with one tool already present and signal sent
let mgr = McpConnectionManager::default();
{
let mut guard = mgr.tools.write().expect("lock tools for write");
guard.insert("srv__tool".to_string(), create_test_tool("srv", "tool"));
}
let _ = mgr
.tool_count_tx
.send(mgr.tools.read().map(|m| m.len()).unwrap_or(0));
// Act: should return without waiting
let rt = tokio::runtime::Runtime::new().expect("create tokio runtime");
rt.block_on(async {
mgr.wait_for_tools_with_timeout(Duration::from_millis(1))
.await;
});
}
#[test]
fn test_parse_tool_name_reads_from_cache() {
let mgr = McpConnectionManager::default();
{
let mut guard = mgr.tools.write().expect("lock tools for write");
guard.insert(
"serverA__alpha".to_string(),
create_test_tool("serverA", "alpha"),
);
}
let parsed = mgr.parse_tool_name("serverA__alpha");
assert_eq!(parsed, Some(("serverA".to_string(), "alpha".to_string())));
}
#[test]
fn test_list_all_tools_snapshot() {
let mgr = McpConnectionManager::default();
{
let mut guard = mgr.tools.write().expect("lock tools for write");
guard.insert("s__a".to_string(), create_test_tool("s", "a"));
guard.insert("s__b".to_string(), create_test_tool("s", "b"));
}
let tools = mgr.list_all_tools();
assert_eq!(tools.len(), 2);
assert!(tools.contains_key("s__a"));
assert!(tools.contains_key("s__b"));
}
}

View File

@@ -11,6 +11,9 @@ anyhow = "1"
mcp-types = { path = "../mcp-types" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
eventsource-stream = "0.2.3"
futures = "0.3"
reqwest = { version = "0.12", features = ["json", "stream"] }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
tokio = { version = "1", features = [

View File

@@ -21,6 +21,8 @@ use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use eventsource_stream::Eventsource;
use futures::StreamExt;
use mcp_types::CallToolRequest;
use mcp_types::CallToolRequestParams;
use mcp_types::InitializeRequest;
@@ -37,6 +39,7 @@ use mcp_types::ListToolsResult;
use mcp_types::ModelContextProtocolNotification;
use mcp_types::ModelContextProtocolRequest;
use mcp_types::RequestId;
use reqwest::header;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::AsyncBufReadExt;
@@ -59,23 +62,36 @@ const CHANNEL_CAPACITY: usize = 128;
/// Internal representation of a pending request sender.
type PendingSender = oneshot::Sender<JSONRPCMessage>;
enum McpTransport {
Stdio {
/// Retain this child process until the client is dropped. The Tokio runtime
/// will make a "best effort" to reap the process after it exits, but it is
/// not a guarantee. See the `kill_on_drop` documentation for details.
#[allow(dead_code)]
child: tokio::process::Child,
/// Channel for sending JSON-RPC messages *to* the background writer task.
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
},
StreamableHttp(HttpClient),
}
/// A running MCP client instance.
pub struct McpClient {
/// Retain this child process until the client is dropped. The Tokio runtime
/// will make a "best effort" to reap the process after it exits, but it is
/// not a guarantee. See the `kill_on_drop` documentation for details.
#[allow(dead_code)]
child: tokio::process::Child,
/// Channel for sending JSON-RPC messages *to* the background writer task.
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
/// Map of `request.id -> oneshot::Sender` used to dispatch responses back
/// to the originating caller.
pending: Arc<Mutex<HashMap<i64, PendingSender>>>,
/// Monotonically increasing counter used to generate request IDs.
id_counter: AtomicI64,
transport: McpTransport,
}
struct HttpClient {
client: reqwest::Client,
url: reqwest::Url,
headers: HashMap<String, String>,
session_id: tokio::sync::Mutex<Option<String>>,
}
impl McpClient {
@@ -177,10 +193,156 @@ impl McpClient {
let _ = (writer_handle, reader_handle);
Ok(Self {
child,
outgoing_tx,
pending,
id_counter: AtomicI64::new(1),
transport: McpTransport::Stdio { child, outgoing_tx },
})
}
async fn send_request_http<R>(
&self,
params: R::Params,
timeout: Option<Duration>,
http: &HttpClient,
) -> Result<R::Result>
where
R: ModelContextProtocolRequest,
R::Params: Serialize,
R::Result: DeserializeOwned,
{
let fut = async {
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);
let request_id = RequestId::Integer(id);
let params_json = serde_json::to_value(&params)?;
let params_field = if params_json.is_null() {
None
} else {
Some(params_json)
};
let jsonrpc_request = JSONRPCRequest {
id: request_id.clone(),
jsonrpc: JSONRPC_VERSION.to_string(),
method: R::METHOD.to_string(),
params: params_field,
};
let message = JSONRPCMessage::Request(jsonrpc_request);
let mut builder = http
.client
.post(http.url.clone())
.header(header::ACCEPT, "application/json, text/event-stream")
.json(&message);
for (k, v) in &http.headers {
builder = builder.header(k, v);
}
if let Some(session) = http.session_id.lock().await.as_ref() {
builder = builder.header("Mcp-Session-Id", session);
}
let response = builder.send().await?;
if let Some(session_id) = response.headers().get("Mcp-Session-Id")
&& let Ok(val) = session_id.to_str()
{
*http.session_id.lock().await = Some(val.to_string());
}
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if content_type.starts_with("application/json") {
let msg: JSONRPCMessage = response.json().await?;
match msg {
JSONRPCMessage::Response(JSONRPCResponse { result, .. }) => {
let typed: R::Result = serde_json::from_value(result)?;
Ok(typed)
}
JSONRPCMessage::Error(err) => Err(anyhow!(format!(
"server returned JSON-RPC error: code = {}, message = {}",
err.error.code, err.error.message
))),
other => Err(anyhow!(format!(
"unexpected message variant received in reply path: {:?}",
other
))),
}
} else if content_type.starts_with("text/event-stream") {
let mut stream = response.bytes_stream().eventsource();
while let Some(event_res) = stream.next().await {
let event = event_res?;
let data = event.data.trim();
if data.is_empty() {
continue;
}
match serde_json::from_str::<JSONRPCMessage>(data) {
Ok(JSONRPCMessage::Response(resp)) => {
if resp.id == request_id {
let typed: R::Result = serde_json::from_value(resp.result)?;
return Ok(typed);
}
}
Ok(JSONRPCMessage::Error(err)) => {
if err.id == request_id {
return Err(anyhow!(format!(
"server returned JSON-RPC error: code = {}, message = {}",
err.error.code, err.error.message
)));
}
}
Ok(JSONRPCMessage::Notification(n)) => {
info!("<- notification: {:?}", n);
}
Ok(JSONRPCMessage::Request(r)) => {
info!("<- request not handled: {:?}", r);
}
Err(e) => {
warn!(
"failed to deserialize JSONRPCMessage: {e}; event = {}",
data
);
}
}
}
Err(anyhow!("SSE stream closed without response"))
} else {
Err(anyhow!(format!(
"unsupported content type: {}",
content_type
)))
}
};
match timeout {
Some(dur) => time::timeout(dur, fut)
.await
.map_err(|_| anyhow!("request timed out"))?,
None => fut.await,
}
}
pub async fn new_streamable_http_client(
url: String,
headers: Option<HashMap<String, String>>,
) -> anyhow::Result<Self> {
let client = reqwest::Client::new();
let url = reqwest::Url::parse(&url)?;
Ok(Self {
pending: Arc::new(Mutex::new(HashMap::new())),
id_counter: AtomicI64::new(1),
transport: McpTransport::StreamableHttp(HttpClient {
client,
url,
headers: headers.unwrap_or_default(),
session_id: tokio::sync::Mutex::new(None),
}),
})
}
@@ -199,6 +361,15 @@ impl McpClient {
R::Params: Serialize,
R::Result: DeserializeOwned,
{
if let McpTransport::StreamableHttp(http) = &self.transport {
return self.send_request_http::<R>(params, timeout, http).await;
}
let outgoing_tx = match &self.transport {
McpTransport::Stdio { outgoing_tx, .. } => outgoing_tx,
_ => return Err(anyhow!("client not configured for stdio")),
};
// Create a new unique ID.
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);
let request_id = RequestId::Integer(id);
@@ -232,7 +403,7 @@ impl McpClient {
}
// Send to writer task.
if self.outgoing_tx.send(message).await.is_err() {
if outgoing_tx.send(message).await.is_err() {
return Err(anyhow!(
"failed to send message to writer task - channel closed"
));
@@ -285,6 +456,46 @@ impl McpClient {
N: ModelContextProtocolNotification,
N::Params: Serialize,
{
if let McpTransport::StreamableHttp(http) = &self.transport {
let params_json = serde_json::to_value(&params)?;
let params_field = if params_json.is_null() {
None
} else {
Some(params_json)
};
let method = N::METHOD.to_string();
let jsonrpc_notification = JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.to_string(),
method: method.clone(),
params: params_field,
};
let notification = JSONRPCMessage::Notification(jsonrpc_notification);
let mut builder = http
.client
.post(http.url.clone())
.header(header::ACCEPT, "application/json, text/event-stream")
.json(&notification);
for (k, v) in &http.headers {
builder = builder.header(k, v);
}
if let Some(session) = http.session_id.lock().await.as_ref() {
builder = builder.header("Mcp-Session-Id", session);
}
let resp = builder.send().await?;
if resp.status() != reqwest::StatusCode::ACCEPTED && !resp.status().is_success() {
return Err(anyhow!(format!(
"server returned unexpected status for notification `{method}`: {}",
resp.status()
)));
}
return Ok(());
}
// Serialize params -> JSON. For many request types `Params` is
// `Option<T>` and `None` should be encoded as *absence* of the field.
let params_json = serde_json::to_value(&params)?;
@@ -302,10 +513,13 @@ impl McpClient {
};
let notification = JSONRPCMessage::Notification(jsonrpc_notification);
self.outgoing_tx
.send(notification)
.await
.with_context(|| format!("failed to send notification `{method}` to writer task"))
match &self.transport {
McpTransport::Stdio { outgoing_tx, .. } => outgoing_tx,
_ => return Err(anyhow!("client not configured for stdio")),
}
.send(notification)
.await
.with_context(|| format!("failed to send notification `{method}` to writer task"))
}
/// Negotiates the initialization with the MCP server. Sends an `initialize`
@@ -400,7 +614,9 @@ impl Drop for McpClient {
// `kill_on_drop(true)` above, this extra check has the benefit of
// forcing the process to be reaped immediately if it has already exited
// instead of waiting for the Tokio runtime to reap it later.
let _ = self.child.try_wait();
if let McpTransport::Stdio { child, .. } = &mut self.transport {
let _ = child.try_wait();
}
}
}

View File

@@ -9,6 +9,7 @@ use codex_ansi_escape::ansi_escape_line;
use codex_common::create_config_summary_entries;
use codex_common::elapsed::format_duration;
use codex_core::config::Config;
use codex_core::config_types::McpServerConfig;
use codex_core::plan_tool::PlanItemArg;
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
@@ -901,24 +902,41 @@ pub(crate) fn new_mcp_tools_output(
server.clone().into(),
]));
if !cfg.command.is_empty() {
let cmd_display = format!("{} {}", cfg.command, cfg.args.join(" "));
lines.push(Line::from(vec![
" • Command: ".into(),
cmd_display.into(),
]));
}
if let Some(env) = cfg.env.as_ref()
&& !env.is_empty()
{
let mut env_pairs: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();
env_pairs.sort();
lines.push(Line::from(vec![
" • Env: ".into(),
env_pairs.join(" ").into(),
]));
match cfg {
McpServerConfig::Stdio { command, args, env } => {
if !command.is_empty() {
let cmd_display = format!("{} {}", command, args.join(" "));
lines.push(Line::from(vec![
" • Command: ".into(),
cmd_display.into(),
]));
}
if let Some(env) = env.as_ref()
&& !env.is_empty()
{
let mut env_pairs: Vec<String> =
env.iter().map(|(k, v)| format!("{k}={v}")).collect();
env_pairs.sort();
lines.push(Line::from(vec![
" • Env: ".into(),
env_pairs.join(" ").into(),
]));
}
}
McpServerConfig::StreamableHttp { url, headers } => {
lines.push(Line::from(vec![" • URL: ".into(), url.clone().into()]));
if let Some(headers) = headers.as_ref()
&& !headers.is_empty()
{
let mut header_pairs: Vec<String> =
headers.iter().map(|(k, v)| format!("{k}={v}")).collect();
header_pairs.sort();
lines.push(Line::from(vec![
" • Headers: ".into(),
header_pairs.join(" ").into(),
]));
}
}
}
if names.is_empty() {

View File

@@ -360,6 +360,11 @@ Should be represented as follows in `~/.codex/config.toml`:
command = "npx"
args = ["-y", "mcp-server"]
env = { "API_KEY" = "value" }
# Streamable HTTP remote servers
[mcp_servers.my_mcp_sever]
url = "https://{the-actual-domain-here}/mcp"
# headers = { Authorization = "Bearer {token}" }
```
## disable_response_storage