mirror of
https://github.com/openai/codex.git
synced 2026-03-23 16:46:32 +03:00
Compare commits
1 Commits
starr/exec
...
streamable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
014562eef0 |
3
codex-rs/Cargo.lock
generated
3
codex-rs/Cargo.lock
generated
@@ -868,7 +868,10 @@ name = "codex-mcp-client"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"eventsource-stream",
|
||||
"futures",
|
||||
"mcp-types",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
This file has moved. Please see the latest configuration documentation here:
|
||||
|
||||
- Full config docs: [docs/config.md](../docs/config.md)
|
||||
- MCP servers section: [docs/config.md#mcp_servers](../docs/config.md#mcp_servers)
|
||||
- MCP servers section: [docs/config.md#mcp_servers](../docs/config.md#mcp_servers)
|
||||
|
||||
@@ -576,6 +576,9 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
// Kick off background MCP tool initialization/loading without blocking the UI.
|
||||
sess.mcp_connection_manager.refresh_tools_in_background();
|
||||
|
||||
Ok((sess, turn_context))
|
||||
}
|
||||
|
||||
@@ -1286,7 +1289,12 @@ async fn submission_loop(
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
// Refresh tools on-demand to give users an up-to-date list.
|
||||
// This runs in the foreground for this operation only.
|
||||
if let Err(e) = sess.mcp_connection_manager.refresh_tools().await {
|
||||
warn!("failed to refresh MCP tools: {e:#}");
|
||||
}
|
||||
|
||||
let tools = sess.mcp_connection_manager.list_all_tools();
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
@@ -1613,6 +1621,12 @@ async fn run_turn(
|
||||
sub_id: String,
|
||||
input: Vec<ResponseItem>,
|
||||
) -> CodexResult<Vec<ProcessedResponseItem>> {
|
||||
// Give MCP tools a brief moment to arrive if background discovery is still running.
|
||||
// This avoids an empty tool list in the first turn right after startup.
|
||||
sess.mcp_connection_manager
|
||||
.wait_for_tools_with_timeout(std::time::Duration::from_millis(800))
|
||||
.await;
|
||||
|
||||
let tools = get_openai_tools(
|
||||
&turn_context.tools_config,
|
||||
Some(sess.mcp_connection_manager.list_all_tools()),
|
||||
|
||||
@@ -12,14 +12,25 @@ use serde::Serialize;
|
||||
use strum_macros::Display;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct McpServerConfig {
|
||||
pub command: String,
|
||||
#[serde(untagged)]
|
||||
pub enum McpServerConfig {
|
||||
/// MCP server launched as a subprocess that communicates over stdio.
|
||||
Stdio {
|
||||
command: String,
|
||||
|
||||
#[serde(default)]
|
||||
pub args: Vec<String>,
|
||||
#[serde(default)]
|
||||
args: Vec<String>,
|
||||
|
||||
#[serde(default)]
|
||||
pub env: Option<HashMap<String, String>>,
|
||||
#[serde(default)]
|
||||
env: Option<HashMap<String, String>>,
|
||||
},
|
||||
/// MCP server reachable via the Streamable HTTP transport.
|
||||
StreamableHttp {
|
||||
url: String,
|
||||
|
||||
#[serde(default)]
|
||||
headers: Option<HashMap<String, String>>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Copy, Clone, PartialEq)]
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsString;
|
||||
use std::sync::RwLock;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -22,6 +23,8 @@ use mcp_types::Tool;
|
||||
use serde_json::json;
|
||||
use sha1::Digest;
|
||||
use sha1::Sha1;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
@@ -38,6 +41,8 @@ const MAX_TOOL_NAME_LENGTH: usize = 64;
|
||||
|
||||
/// Timeout for the `tools/list` request.
|
||||
const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
/// Timeout for MCP initialize handshake.
|
||||
const INIT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Map that holds a startup error for every MCP server that could **not** be
|
||||
/// spawned successfully.
|
||||
@@ -82,7 +87,6 @@ struct ToolInfo {
|
||||
}
|
||||
|
||||
/// A thin wrapper around a set of running [`McpClient`] instances.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct McpConnectionManager {
|
||||
/// Server-name -> client instance.
|
||||
///
|
||||
@@ -90,11 +94,48 @@ pub(crate) struct McpConnectionManager {
|
||||
/// the user configuration.
|
||||
clients: HashMap<String, std::sync::Arc<McpClient>>,
|
||||
|
||||
/// Fully qualified tool name -> tool instance.
|
||||
tools: HashMap<String, ToolInfo>,
|
||||
/// Fully qualified tool name -> tool instance. Populated asynchronously
|
||||
/// after clients are initialized to avoid blocking shell startup.
|
||||
tools: std::sync::Arc<RwLock<HashMap<String, ToolInfo>>>,
|
||||
|
||||
/// Lazy server initialization status (initialize is sent once per server on first use)
|
||||
init_cells: HashMap<String, std::sync::Arc<OnceCell<()>>>,
|
||||
|
||||
/// Broadcasts current MCP tool count so callers can await initial load.
|
||||
tool_count_tx: watch::Sender<usize>,
|
||||
}
|
||||
|
||||
impl Default for McpConnectionManager {
|
||||
fn default() -> Self {
|
||||
let (tx, _rx) = watch::channel(0usize);
|
||||
Self {
|
||||
clients: HashMap::new(),
|
||||
tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
|
||||
init_cells: HashMap::new(),
|
||||
tool_count_tx: tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl McpConnectionManager {
|
||||
fn build_initialize_params() -> mcp_types::InitializeRequestParams {
|
||||
mcp_types::InitializeRequestParams {
|
||||
capabilities: ClientCapabilities {
|
||||
experimental: None,
|
||||
roots: None,
|
||||
sampling: None,
|
||||
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
|
||||
// indicates this should be an empty object.
|
||||
elicitation: Some(json!({})),
|
||||
},
|
||||
client_info: Implementation {
|
||||
name: "codex-mcp-client".to_owned(),
|
||||
version: env!("CARGO_PKG_VERSION").to_owned(),
|
||||
title: Some("Codex".into()),
|
||||
},
|
||||
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(),
|
||||
}
|
||||
}
|
||||
/// Spawn a [`McpClient`] for each configured server.
|
||||
///
|
||||
/// * `mcp_servers` – Map loaded from the user configuration where *keys*
|
||||
@@ -127,43 +168,21 @@ impl McpConnectionManager {
|
||||
}
|
||||
|
||||
join_set.spawn(async move {
|
||||
let McpServerConfig { command, args, env } = cfg;
|
||||
let client_res = McpClient::new_stdio_client(
|
||||
command.into(),
|
||||
args.into_iter().map(OsString::from).collect(),
|
||||
env,
|
||||
)
|
||||
.await;
|
||||
match client_res {
|
||||
Ok(client) => {
|
||||
// Initialize the client.
|
||||
let params = mcp_types::InitializeRequestParams {
|
||||
capabilities: ClientCapabilities {
|
||||
experimental: None,
|
||||
roots: None,
|
||||
sampling: None,
|
||||
// https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation#capabilities
|
||||
// indicates this should be an empty object.
|
||||
elicitation: Some(json!({})),
|
||||
},
|
||||
client_info: Implementation {
|
||||
name: "codex-mcp-client".to_owned(),
|
||||
version: env!("CARGO_PKG_VERSION").to_owned(),
|
||||
title: Some("Codex".into()),
|
||||
},
|
||||
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(),
|
||||
};
|
||||
let initialize_notification_params = None;
|
||||
let timeout = Some(Duration::from_secs(10));
|
||||
match client
|
||||
.initialize(params, initialize_notification_params, timeout)
|
||||
.await
|
||||
{
|
||||
Ok(_response) => (server_name, Ok(client)),
|
||||
Err(e) => (server_name, Err(e)),
|
||||
}
|
||||
let client_res = match cfg {
|
||||
McpServerConfig::Stdio { command, args, env } => McpClient::new_stdio_client(
|
||||
command.into(),
|
||||
args.into_iter().map(OsString::from).collect(),
|
||||
env,
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::from),
|
||||
McpServerConfig::StreamableHttp { url, headers } => {
|
||||
McpClient::new_streamable_http_client(url, headers).await
|
||||
}
|
||||
Err(e) => (server_name, Err(e.into())),
|
||||
};
|
||||
match client_res {
|
||||
Ok(client) => (server_name, Ok(client)),
|
||||
Err(e) => (server_name, Err(e)),
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -184,20 +203,39 @@ impl McpConnectionManager {
|
||||
}
|
||||
}
|
||||
|
||||
let all_tools = list_all_tools(&clients).await?;
|
||||
// Initialize with an empty tool registry. No network calls are made here
|
||||
// to keep shell startup and teardown snappy.
|
||||
let tools: std::sync::Arc<RwLock<HashMap<String, ToolInfo>>> =
|
||||
std::sync::Arc::new(RwLock::new(HashMap::new()));
|
||||
|
||||
let tools = qualify_tools(all_tools);
|
||||
// Prepare lazy init cells for each server.
|
||||
let mut init_cells = HashMap::new();
|
||||
for server in clients.keys() {
|
||||
init_cells.insert(server.clone(), std::sync::Arc::new(OnceCell::new()));
|
||||
}
|
||||
|
||||
Ok((Self { clients, tools }, errors))
|
||||
let (tool_count_tx, _rx) = watch::channel(0usize);
|
||||
Ok((
|
||||
Self {
|
||||
clients,
|
||||
tools,
|
||||
init_cells,
|
||||
tool_count_tx,
|
||||
},
|
||||
errors,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns a single map that contains **all** tools. Each key is the
|
||||
/// fully-qualified name for the tool.
|
||||
pub fn list_all_tools(&self) -> HashMap<String, Tool> {
|
||||
self.tools
|
||||
.iter()
|
||||
.map(|(name, tool)| (name.clone(), tool.tool.clone()))
|
||||
.collect()
|
||||
match self.tools.read() {
|
||||
Ok(guard) => guard
|
||||
.iter()
|
||||
.map(|(name, tool)| (name.clone(), tool.tool.clone()))
|
||||
.collect(),
|
||||
Err(_) => HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Invoke the tool indicated by the (server, tool) pair.
|
||||
@@ -213,7 +251,8 @@ impl McpConnectionManager {
|
||||
.get(server)
|
||||
.ok_or_else(|| anyhow!("unknown MCP server '{server}'"))?
|
||||
.clone();
|
||||
|
||||
// Ensure the server is initialized before invoking tools
|
||||
self.ensure_initialized(server, &client).await?;
|
||||
client
|
||||
.call_tool(tool.to_string(), arguments, timeout)
|
||||
.await
|
||||
@@ -221,9 +260,123 @@ impl McpConnectionManager {
|
||||
}
|
||||
|
||||
pub fn parse_tool_name(&self, tool_name: &str) -> Option<(String, String)> {
|
||||
self.tools
|
||||
.get(tool_name)
|
||||
.map(|tool| (tool.server_name.clone(), tool.tool_name.clone()))
|
||||
match self.tools.read() {
|
||||
Ok(guard) => guard
|
||||
.get(tool_name)
|
||||
.map(|tool| (tool.server_name.clone(), tool.tool_name.clone())),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Refresh the internal tool registry synchronously on demand.
|
||||
pub async fn refresh_tools(&self) -> Result<()> {
|
||||
// Initialize each server on demand before listing tools
|
||||
for (server, client) in &self.clients {
|
||||
self.ensure_initialized(server, client).await?;
|
||||
}
|
||||
let clients_snapshot: HashMap<_, _> = self
|
||||
.clients
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let all_tools = list_all_tools(&clients_snapshot).await?;
|
||||
let qualified = qualify_tools(all_tools);
|
||||
if let Ok(mut guard) = self.tools.write() {
|
||||
*guard = qualified;
|
||||
}
|
||||
let _ = self
|
||||
.tool_count_tx
|
||||
.send(self.tools.read().map(|m| m.len()).unwrap_or(0));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_initialized(
|
||||
&self,
|
||||
server_name: &str,
|
||||
client: &std::sync::Arc<McpClient>,
|
||||
) -> Result<()> {
|
||||
let cell = self
|
||||
.init_cells
|
||||
.get(server_name)
|
||||
.ok_or_else(|| anyhow!(format!("missing init cell for server {server_name}")))?
|
||||
.clone();
|
||||
|
||||
let initialize = async {
|
||||
let params = Self::build_initialize_params();
|
||||
let initialize_notification_params = None;
|
||||
let timeout = Some(INIT_TIMEOUT);
|
||||
client
|
||||
.initialize(params, initialize_notification_params, timeout)
|
||||
.await
|
||||
.map(|_| ())
|
||||
};
|
||||
|
||||
cell.get_or_try_init(|| initialize).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Spawn a background refresh that initializes servers and loads tools.
|
||||
/// This does not block the caller and is safe to call multiple times.
|
||||
pub fn refresh_tools_in_background(&self) {
|
||||
let tools_ref = self.tools.clone();
|
||||
let tx = self.tool_count_tx.clone();
|
||||
let clients_snapshot: HashMap<_, _> = self
|
||||
.clients
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let init_cells_snapshot: HashMap<_, _> = self
|
||||
.init_cells
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Initialize each server once
|
||||
for (server, client) in &clients_snapshot {
|
||||
if let Some(cell) = init_cells_snapshot.get(server) {
|
||||
let params = Self::build_initialize_params();
|
||||
let initialize_notification_params = None;
|
||||
let timeout = Some(INIT_TIMEOUT);
|
||||
let client_clone = client.clone();
|
||||
let _ = cell
|
||||
.get_or_try_init(|| async move {
|
||||
client_clone
|
||||
.initialize(params, initialize_notification_params, timeout)
|
||||
.await
|
||||
.map(|_| ())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Load tools after init
|
||||
match list_all_tools(&clients_snapshot).await {
|
||||
Ok(all_tools) => {
|
||||
let qualified = qualify_tools(all_tools);
|
||||
if let Ok(mut guard) = tools_ref.write() {
|
||||
*guard = qualified;
|
||||
}
|
||||
let _ = tx.send(tools_ref.read().map(|m| m.len()).unwrap_or(0));
|
||||
}
|
||||
Err(e) => warn!("failed to list MCP tools in background: {e:#}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Optionally await until tools are available, bounded by a timeout.
|
||||
pub async fn wait_for_tools_with_timeout(&self, timeout: Duration) {
|
||||
let mut rx = self.tool_count_tx.subscribe();
|
||||
if *rx.borrow() > 0 {
|
||||
return;
|
||||
}
|
||||
let _ = tokio::time::timeout(timeout, async {
|
||||
while rx.changed().await.is_ok() {
|
||||
if *rx.borrow() > 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,4 +519,61 @@ mod tests {
|
||||
"my_server__yet_another_e1c3987bd9c50b826cbe1687966f79f0c602d19ca"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_valid_server_name() {
|
||||
assert!(is_valid_mcp_server_name("valid-Server_01"));
|
||||
assert!(!is_valid_mcp_server_name("invalid server"));
|
||||
assert!(!is_valid_mcp_server_name("invalid/server"));
|
||||
assert!(!is_valid_mcp_server_name(""));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wait_for_tools_returns_immediately_when_ready() {
|
||||
// Arrange: manager with one tool already present and signal sent
|
||||
let mgr = McpConnectionManager::default();
|
||||
|
||||
{
|
||||
let mut guard = mgr.tools.write().expect("lock tools for write");
|
||||
guard.insert("srv__tool".to_string(), create_test_tool("srv", "tool"));
|
||||
}
|
||||
let _ = mgr
|
||||
.tool_count_tx
|
||||
.send(mgr.tools.read().map(|m| m.len()).unwrap_or(0));
|
||||
|
||||
// Act: should return without waiting
|
||||
let rt = tokio::runtime::Runtime::new().expect("create tokio runtime");
|
||||
rt.block_on(async {
|
||||
mgr.wait_for_tools_with_timeout(Duration::from_millis(1))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_tool_name_reads_from_cache() {
|
||||
let mgr = McpConnectionManager::default();
|
||||
{
|
||||
let mut guard = mgr.tools.write().expect("lock tools for write");
|
||||
guard.insert(
|
||||
"serverA__alpha".to_string(),
|
||||
create_test_tool("serverA", "alpha"),
|
||||
);
|
||||
}
|
||||
let parsed = mgr.parse_tool_name("serverA__alpha");
|
||||
assert_eq!(parsed, Some(("serverA".to_string(), "alpha".to_string())));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_all_tools_snapshot() {
|
||||
let mgr = McpConnectionManager::default();
|
||||
{
|
||||
let mut guard = mgr.tools.write().expect("lock tools for write");
|
||||
guard.insert("s__a".to_string(), create_test_tool("s", "a"));
|
||||
guard.insert("s__b".to_string(), create_test_tool("s", "b"));
|
||||
}
|
||||
let tools = mgr.list_all_tools();
|
||||
assert_eq!(tools.len(), 2);
|
||||
assert!(tools.contains_key("s__a"));
|
||||
assert!(tools.contains_key("s__b"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,9 @@ anyhow = "1"
|
||||
mcp-types = { path = "../mcp-types" }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
eventsource-stream = "0.2.3"
|
||||
futures = "0.3"
|
||||
reqwest = { version = "0.12", features = ["json", "stream"] }
|
||||
tracing = { version = "0.1.41", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
tokio = { version = "1", features = [
|
||||
|
||||
@@ -21,6 +21,8 @@ use std::time::Duration;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use eventsource_stream::Eventsource;
|
||||
use futures::StreamExt;
|
||||
use mcp_types::CallToolRequest;
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::InitializeRequest;
|
||||
@@ -37,6 +39,7 @@ use mcp_types::ListToolsResult;
|
||||
use mcp_types::ModelContextProtocolNotification;
|
||||
use mcp_types::ModelContextProtocolRequest;
|
||||
use mcp_types::RequestId;
|
||||
use reqwest::header;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
@@ -59,23 +62,36 @@ const CHANNEL_CAPACITY: usize = 128;
|
||||
/// Internal representation of a pending request sender.
|
||||
type PendingSender = oneshot::Sender<JSONRPCMessage>;
|
||||
|
||||
enum McpTransport {
|
||||
Stdio {
|
||||
/// Retain this child process until the client is dropped. The Tokio runtime
|
||||
/// will make a "best effort" to reap the process after it exits, but it is
|
||||
/// not a guarantee. See the `kill_on_drop` documentation for details.
|
||||
#[allow(dead_code)]
|
||||
child: tokio::process::Child,
|
||||
/// Channel for sending JSON-RPC messages *to* the background writer task.
|
||||
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
},
|
||||
StreamableHttp(HttpClient),
|
||||
}
|
||||
|
||||
/// A running MCP client instance.
|
||||
pub struct McpClient {
|
||||
/// Retain this child process until the client is dropped. The Tokio runtime
|
||||
/// will make a "best effort" to reap the process after it exits, but it is
|
||||
/// not a guarantee. See the `kill_on_drop` documentation for details.
|
||||
#[allow(dead_code)]
|
||||
child: tokio::process::Child,
|
||||
|
||||
/// Channel for sending JSON-RPC messages *to* the background writer task.
|
||||
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
|
||||
/// Map of `request.id -> oneshot::Sender` used to dispatch responses back
|
||||
/// to the originating caller.
|
||||
pending: Arc<Mutex<HashMap<i64, PendingSender>>>,
|
||||
|
||||
/// Monotonically increasing counter used to generate request IDs.
|
||||
id_counter: AtomicI64,
|
||||
|
||||
transport: McpTransport,
|
||||
}
|
||||
|
||||
struct HttpClient {
|
||||
client: reqwest::Client,
|
||||
url: reqwest::Url,
|
||||
headers: HashMap<String, String>,
|
||||
session_id: tokio::sync::Mutex<Option<String>>,
|
||||
}
|
||||
|
||||
impl McpClient {
|
||||
@@ -177,10 +193,156 @@ impl McpClient {
|
||||
let _ = (writer_handle, reader_handle);
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
outgoing_tx,
|
||||
pending,
|
||||
id_counter: AtomicI64::new(1),
|
||||
transport: McpTransport::Stdio { child, outgoing_tx },
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_request_http<R>(
|
||||
&self,
|
||||
params: R::Params,
|
||||
timeout: Option<Duration>,
|
||||
http: &HttpClient,
|
||||
) -> Result<R::Result>
|
||||
where
|
||||
R: ModelContextProtocolRequest,
|
||||
R::Params: Serialize,
|
||||
R::Result: DeserializeOwned,
|
||||
{
|
||||
let fut = async {
|
||||
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let request_id = RequestId::Integer(id);
|
||||
|
||||
let params_json = serde_json::to_value(¶ms)?;
|
||||
let params_field = if params_json.is_null() {
|
||||
None
|
||||
} else {
|
||||
Some(params_json)
|
||||
};
|
||||
|
||||
let jsonrpc_request = JSONRPCRequest {
|
||||
id: request_id.clone(),
|
||||
jsonrpc: JSONRPC_VERSION.to_string(),
|
||||
method: R::METHOD.to_string(),
|
||||
params: params_field,
|
||||
};
|
||||
|
||||
let message = JSONRPCMessage::Request(jsonrpc_request);
|
||||
|
||||
let mut builder = http
|
||||
.client
|
||||
.post(http.url.clone())
|
||||
.header(header::ACCEPT, "application/json, text/event-stream")
|
||||
.json(&message);
|
||||
|
||||
for (k, v) in &http.headers {
|
||||
builder = builder.header(k, v);
|
||||
}
|
||||
|
||||
if let Some(session) = http.session_id.lock().await.as_ref() {
|
||||
builder = builder.header("Mcp-Session-Id", session);
|
||||
}
|
||||
|
||||
let response = builder.send().await?;
|
||||
|
||||
if let Some(session_id) = response.headers().get("Mcp-Session-Id")
|
||||
&& let Ok(val) = session_id.to_str()
|
||||
{
|
||||
*http.session_id.lock().await = Some(val.to_string());
|
||||
}
|
||||
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
|
||||
if content_type.starts_with("application/json") {
|
||||
let msg: JSONRPCMessage = response.json().await?;
|
||||
match msg {
|
||||
JSONRPCMessage::Response(JSONRPCResponse { result, .. }) => {
|
||||
let typed: R::Result = serde_json::from_value(result)?;
|
||||
Ok(typed)
|
||||
}
|
||||
JSONRPCMessage::Error(err) => Err(anyhow!(format!(
|
||||
"server returned JSON-RPC error: code = {}, message = {}",
|
||||
err.error.code, err.error.message
|
||||
))),
|
||||
other => Err(anyhow!(format!(
|
||||
"unexpected message variant received in reply path: {:?}",
|
||||
other
|
||||
))),
|
||||
}
|
||||
} else if content_type.starts_with("text/event-stream") {
|
||||
let mut stream = response.bytes_stream().eventsource();
|
||||
while let Some(event_res) = stream.next().await {
|
||||
let event = event_res?;
|
||||
let data = event.data.trim();
|
||||
if data.is_empty() {
|
||||
continue;
|
||||
}
|
||||
match serde_json::from_str::<JSONRPCMessage>(data) {
|
||||
Ok(JSONRPCMessage::Response(resp)) => {
|
||||
if resp.id == request_id {
|
||||
let typed: R::Result = serde_json::from_value(resp.result)?;
|
||||
return Ok(typed);
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Error(err)) => {
|
||||
if err.id == request_id {
|
||||
return Err(anyhow!(format!(
|
||||
"server returned JSON-RPC error: code = {}, message = {}",
|
||||
err.error.code, err.error.message
|
||||
)));
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Notification(n)) => {
|
||||
info!("<- notification: {:?}", n);
|
||||
}
|
||||
Ok(JSONRPCMessage::Request(r)) => {
|
||||
info!("<- request not handled: {:?}", r);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to deserialize JSONRPCMessage: {e}; event = {}",
|
||||
data
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow!("SSE stream closed without response"))
|
||||
} else {
|
||||
Err(anyhow!(format!(
|
||||
"unsupported content type: {}",
|
||||
content_type
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
match timeout {
|
||||
Some(dur) => time::timeout(dur, fut)
|
||||
.await
|
||||
.map_err(|_| anyhow!("request timed out"))?,
|
||||
None => fut.await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_streamable_http_client(
|
||||
url: String,
|
||||
headers: Option<HashMap<String, String>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = reqwest::Url::parse(&url)?;
|
||||
Ok(Self {
|
||||
pending: Arc::new(Mutex::new(HashMap::new())),
|
||||
id_counter: AtomicI64::new(1),
|
||||
transport: McpTransport::StreamableHttp(HttpClient {
|
||||
client,
|
||||
url,
|
||||
headers: headers.unwrap_or_default(),
|
||||
session_id: tokio::sync::Mutex::new(None),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -199,6 +361,15 @@ impl McpClient {
|
||||
R::Params: Serialize,
|
||||
R::Result: DeserializeOwned,
|
||||
{
|
||||
if let McpTransport::StreamableHttp(http) = &self.transport {
|
||||
return self.send_request_http::<R>(params, timeout, http).await;
|
||||
}
|
||||
|
||||
let outgoing_tx = match &self.transport {
|
||||
McpTransport::Stdio { outgoing_tx, .. } => outgoing_tx,
|
||||
_ => return Err(anyhow!("client not configured for stdio")),
|
||||
};
|
||||
|
||||
// Create a new unique ID.
|
||||
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let request_id = RequestId::Integer(id);
|
||||
@@ -232,7 +403,7 @@ impl McpClient {
|
||||
}
|
||||
|
||||
// Send to writer task.
|
||||
if self.outgoing_tx.send(message).await.is_err() {
|
||||
if outgoing_tx.send(message).await.is_err() {
|
||||
return Err(anyhow!(
|
||||
"failed to send message to writer task - channel closed"
|
||||
));
|
||||
@@ -285,6 +456,46 @@ impl McpClient {
|
||||
N: ModelContextProtocolNotification,
|
||||
N::Params: Serialize,
|
||||
{
|
||||
if let McpTransport::StreamableHttp(http) = &self.transport {
|
||||
let params_json = serde_json::to_value(¶ms)?;
|
||||
let params_field = if params_json.is_null() {
|
||||
None
|
||||
} else {
|
||||
Some(params_json)
|
||||
};
|
||||
|
||||
let method = N::METHOD.to_string();
|
||||
let jsonrpc_notification = JSONRPCNotification {
|
||||
jsonrpc: JSONRPC_VERSION.to_string(),
|
||||
method: method.clone(),
|
||||
params: params_field,
|
||||
};
|
||||
let notification = JSONRPCMessage::Notification(jsonrpc_notification);
|
||||
|
||||
let mut builder = http
|
||||
.client
|
||||
.post(http.url.clone())
|
||||
.header(header::ACCEPT, "application/json, text/event-stream")
|
||||
.json(¬ification);
|
||||
|
||||
for (k, v) in &http.headers {
|
||||
builder = builder.header(k, v);
|
||||
}
|
||||
|
||||
if let Some(session) = http.session_id.lock().await.as_ref() {
|
||||
builder = builder.header("Mcp-Session-Id", session);
|
||||
}
|
||||
|
||||
let resp = builder.send().await?;
|
||||
if resp.status() != reqwest::StatusCode::ACCEPTED && !resp.status().is_success() {
|
||||
return Err(anyhow!(format!(
|
||||
"server returned unexpected status for notification `{method}`: {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Serialize params -> JSON. For many request types `Params` is
|
||||
// `Option<T>` and `None` should be encoded as *absence* of the field.
|
||||
let params_json = serde_json::to_value(¶ms)?;
|
||||
@@ -302,10 +513,13 @@ impl McpClient {
|
||||
};
|
||||
|
||||
let notification = JSONRPCMessage::Notification(jsonrpc_notification);
|
||||
self.outgoing_tx
|
||||
.send(notification)
|
||||
.await
|
||||
.with_context(|| format!("failed to send notification `{method}` to writer task"))
|
||||
match &self.transport {
|
||||
McpTransport::Stdio { outgoing_tx, .. } => outgoing_tx,
|
||||
_ => return Err(anyhow!("client not configured for stdio")),
|
||||
}
|
||||
.send(notification)
|
||||
.await
|
||||
.with_context(|| format!("failed to send notification `{method}` to writer task"))
|
||||
}
|
||||
|
||||
/// Negotiates the initialization with the MCP server. Sends an `initialize`
|
||||
@@ -400,7 +614,9 @@ impl Drop for McpClient {
|
||||
// `kill_on_drop(true)` above, this extra check has the benefit of
|
||||
// forcing the process to be reaped immediately if it has already exited
|
||||
// instead of waiting for the Tokio runtime to reap it later.
|
||||
let _ = self.child.try_wait();
|
||||
if let McpTransport::Stdio { child, .. } = &mut self.transport {
|
||||
let _ = child.try_wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use codex_ansi_escape::ansi_escape_line;
|
||||
use codex_common::create_config_summary_entries;
|
||||
use codex_common::elapsed::format_duration;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_types::McpServerConfig;
|
||||
use codex_core::plan_tool::PlanItemArg;
|
||||
use codex_core::plan_tool::StepStatus;
|
||||
use codex_core::plan_tool::UpdatePlanArgs;
|
||||
@@ -901,24 +902,41 @@ pub(crate) fn new_mcp_tools_output(
|
||||
server.clone().into(),
|
||||
]));
|
||||
|
||||
if !cfg.command.is_empty() {
|
||||
let cmd_display = format!("{} {}", cfg.command, cfg.args.join(" "));
|
||||
|
||||
lines.push(Line::from(vec![
|
||||
" • Command: ".into(),
|
||||
cmd_display.into(),
|
||||
]));
|
||||
}
|
||||
|
||||
if let Some(env) = cfg.env.as_ref()
|
||||
&& !env.is_empty()
|
||||
{
|
||||
let mut env_pairs: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();
|
||||
env_pairs.sort();
|
||||
lines.push(Line::from(vec![
|
||||
" • Env: ".into(),
|
||||
env_pairs.join(" ").into(),
|
||||
]));
|
||||
match cfg {
|
||||
McpServerConfig::Stdio { command, args, env } => {
|
||||
if !command.is_empty() {
|
||||
let cmd_display = format!("{} {}", command, args.join(" "));
|
||||
lines.push(Line::from(vec![
|
||||
" • Command: ".into(),
|
||||
cmd_display.into(),
|
||||
]));
|
||||
}
|
||||
if let Some(env) = env.as_ref()
|
||||
&& !env.is_empty()
|
||||
{
|
||||
let mut env_pairs: Vec<String> =
|
||||
env.iter().map(|(k, v)| format!("{k}={v}")).collect();
|
||||
env_pairs.sort();
|
||||
lines.push(Line::from(vec![
|
||||
" • Env: ".into(),
|
||||
env_pairs.join(" ").into(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
McpServerConfig::StreamableHttp { url, headers } => {
|
||||
lines.push(Line::from(vec![" • URL: ".into(), url.clone().into()]));
|
||||
if let Some(headers) = headers.as_ref()
|
||||
&& !headers.is_empty()
|
||||
{
|
||||
let mut header_pairs: Vec<String> =
|
||||
headers.iter().map(|(k, v)| format!("{k}={v}")).collect();
|
||||
header_pairs.sort();
|
||||
lines.push(Line::from(vec![
|
||||
" • Headers: ".into(),
|
||||
header_pairs.join(" ").into(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if names.is_empty() {
|
||||
|
||||
@@ -360,6 +360,11 @@ Should be represented as follows in `~/.codex/config.toml`:
|
||||
command = "npx"
|
||||
args = ["-y", "mcp-server"]
|
||||
env = { "API_KEY" = "value" }
|
||||
|
||||
# Streamable HTTP remote servers
|
||||
[mcp_servers.my_mcp_sever]
|
||||
url = "https://{the-actual-domain-here}/mcp"
|
||||
# headers = { Authorization = "Bearer {token}" }
|
||||
```
|
||||
|
||||
## disable_response_storage
|
||||
|
||||
Reference in New Issue
Block a user