mirror of
https://github.com/openai/codex.git
synced 2026-05-05 22:01:37 +03:00
Terminate stdio MCP servers on shutdown to avoid process leaks (#19753)
## Why Several bug reports describe thread shutdown (including subagent threads) leaving stdio MCP server processes behind. These reports all point at the same lifecycle gap: Codex launches stdio MCP servers, but the session-level shutdown path does not explicitly close MCP clients or terminate the server process tree. Fixes #12491 Fixes #12976 Fixes #18881 Fixes #19469 ## History This is best understood as a regression/coverage gap in MCP session lifecycle management, not as stdio MCP cleanup being absent all along. #10710 added process-group cleanup for stdio MCP servers, but that cleanup only runs when the `RmcpClient`/transport is dropped. The older reports (#12491 and #12976) came after that cleanup existed, which suggests the remaining problem was that some higher-level shutdown paths kept the MCP manager alive or replaced it without explicitly draining clients. The newer reports (#18881 and #19469) exposed the same family around manager replacement and shutdown. ## What changed - Added an explicit stdio MCP process handle in `codex-rmcp-client` so local MCP servers terminate their process group and executor-backed MCP servers call the executor process terminator. - Added `RmcpClient::shutdown()` and manager-level MCP shutdown draining so session shutdown, channel-close fallback, MCP refresh, and connector probing stop owned MCP clients. - Added regression coverage that starts a stdio MCP server, begins an in-flight blocking tool call, shuts down the client, and asserts the server process exits. ## Verification - `cargo test -p codex-rmcp-client` - `cargo test -p codex-mcp` - `just fix -p codex-rmcp-client` - `just fix -p codex-mcp` - `just fix -p codex-core` - Manual before/after validation with a temporary repro script: - Pre-fix binary from `HEAD^` (`fed0a8f4fa`): reproduced the leak with surviving MCP server and child PIDs, `survivors=[77583, 77592]`, `leaked=true`. - Post-fix binary from this branch (`67e318148b`): verified both MCP processes were gone after interrupting `codex exec`, `survivors=[]`, `leaked=false`.
This commit is contained in:
@@ -71,6 +71,7 @@ pub struct McpConnectionManager {
|
||||
clients: HashMap<String, AsyncManagedClient>,
|
||||
server_origins: HashMap<String, String>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
startup_cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl McpConnectionManager {
|
||||
@@ -85,6 +86,7 @@ impl McpConnectionManager {
|
||||
approval_policy.value(),
|
||||
permission_profile.get().clone(),
|
||||
),
|
||||
startup_cancellation_token: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +94,24 @@ impl McpConnectionManager {
|
||||
!self.clients.is_empty()
|
||||
}
|
||||
|
||||
/// Drain all MCP clients from this manager and return a future that stops
|
||||
/// them and terminates their stdio server processes.
|
||||
pub fn begin_shutdown(&mut self) -> impl std::future::Future<Output = ()> + Send + 'static {
|
||||
self.startup_cancellation_token.cancel();
|
||||
let clients = std::mem::take(&mut self.clients);
|
||||
self.server_origins.clear();
|
||||
async move {
|
||||
for client in clients.into_values() {
|
||||
client.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop all MCP clients owned by this manager and terminate stdio server processes.
|
||||
pub async fn shutdown(&mut self) {
|
||||
self.begin_shutdown().await;
|
||||
}
|
||||
|
||||
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
|
||||
self.server_origins.get(server_name).map(String::as_str)
|
||||
}
|
||||
@@ -221,6 +241,7 @@ impl McpConnectionManager {
|
||||
clients,
|
||||
server_origins,
|
||||
elicitation_requests: elicitation_requests.clone(),
|
||||
startup_cancellation_token: cancel_token.clone(),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let outcomes = join_set.join_all().await;
|
||||
@@ -609,6 +630,13 @@ impl McpConnectionManager {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for McpConnectionManager {
|
||||
fn drop(&mut self) {
|
||||
self.startup_cancellation_token.cancel();
|
||||
self.clients.clear();
|
||||
}
|
||||
}
|
||||
|
||||
async fn emit_update(
|
||||
submit_id: &str,
|
||||
tx_event: &Sender<Event>,
|
||||
|
||||
@@ -662,6 +662,7 @@ async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() {
|
||||
startup_snapshot: Some(startup_tools),
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -690,6 +691,7 @@ async fn resolve_tool_info_accepts_canonical_namespaced_tool_names() {
|
||||
startup_snapshot: Some(startup_tools),
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -726,6 +728,7 @@ async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot(
|
||||
startup_snapshot: None,
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -750,6 +753,7 @@ async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty(
|
||||
startup_snapshot: Some(Vec::new()),
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -784,6 +788,7 @@ async fn list_all_tools_uses_startup_snapshot_when_client_startup_fails() {
|
||||
startup_snapshot: Some(startup_tools),
|
||||
startup_complete,
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token: CancellationToken::new(),
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ use rmcp::model::Implementation;
|
||||
use rmcp::model::InitializeRequestParams;
|
||||
use rmcp::model::ProtocolVersion;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
/// MCP server capability indicating that Codex should include [`SandboxState`]
|
||||
/// in tool-call request `_meta` under this key.
|
||||
@@ -115,6 +116,7 @@ pub(crate) struct AsyncManagedClient {
|
||||
pub(crate) startup_snapshot: Option<Vec<ToolInfo>>,
|
||||
pub(crate) startup_complete: Arc<AtomicBool>,
|
||||
pub(crate) tool_plugin_provenance: Arc<ToolPluginProvenance>,
|
||||
pub(crate) cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl AsyncManagedClient {
|
||||
@@ -142,8 +144,9 @@ impl AsyncManagedClient {
|
||||
let startup_tool_filter = tool_filter;
|
||||
let startup_complete = Arc::new(AtomicBool::new(false));
|
||||
let startup_complete_for_fut = Arc::clone(&startup_complete);
|
||||
let cancel_token_for_fut = cancel_token.clone();
|
||||
let fut = async move {
|
||||
let outcome = async {
|
||||
let outcome = match async {
|
||||
if let Err(error) = validate_mcp_server_name(&server_name) {
|
||||
return Err(error.into());
|
||||
}
|
||||
@@ -158,7 +161,7 @@ impl AsyncManagedClient {
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
match start_server_task(
|
||||
start_server_task(
|
||||
server_name,
|
||||
client,
|
||||
StartServerTaskParams {
|
||||
@@ -172,14 +175,14 @@ impl AsyncManagedClient {
|
||||
codex_apps_tools_cache_context,
|
||||
},
|
||||
)
|
||||
.or_cancel(&cancel_token)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
|
||||
}
|
||||
}
|
||||
.await;
|
||||
.or_cancel(&cancel_token_for_fut)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
|
||||
};
|
||||
|
||||
startup_complete_for_fut.store(true, Ordering::Release);
|
||||
outcome
|
||||
@@ -197,6 +200,7 @@ impl AsyncManagedClient {
|
||||
startup_snapshot,
|
||||
startup_complete,
|
||||
tool_plugin_provenance,
|
||||
cancel_token,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +208,17 @@ impl AsyncManagedClient {
|
||||
self.client.clone().await
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel_token.cancel();
|
||||
match self.client().await {
|
||||
Ok(client) => client.client.shutdown().await,
|
||||
Err(StartupOutcomeError::Cancelled) => {}
|
||||
Err(error) => {
|
||||
warn!("failed to initialize MCP client during shutdown: {error:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
|
||||
if !self.startup_complete.load(Ordering::Acquire) {
|
||||
return self.startup_snapshot.clone();
|
||||
|
||||
Reference in New Issue
Block a user