mirror of
https://github.com/openai/codex.git
synced 2026-05-04 13:21:54 +03:00
Process-group cleanup for stdio MCP servers to prevent orphan process storms (#10710)
This PR changes stdio MCP child processes to run in their own process group * Add guarded teardown in codex-rmcp-client: send SIGTERM to the group first, then SIGKILL after a short grace period. * Add terminate_process_group helper in process_group.rs. * Add Unix regression test in process_group_cleanup.rs to verify wrapper + grandchild are reaped on client drop. Addresses reported MCP process/thread storm: #10581
This commit is contained in:
@@ -60,7 +60,10 @@ use crate::utils::create_env_for_mcp_server;
|
||||
use crate::utils::run_with_timeout;
|
||||
|
||||
enum PendingTransport {
|
||||
ChildProcess(TokioChildProcess),
|
||||
ChildProcess {
|
||||
transport: TokioChildProcess,
|
||||
process_group_guard: Option<ProcessGroupGuard>,
|
||||
},
|
||||
StreamableHttp {
|
||||
transport: StreamableHttpClientTransport<reqwest::Client>,
|
||||
},
|
||||
@@ -75,11 +78,71 @@ enum ClientState {
|
||||
transport: Option<PendingTransport>,
|
||||
},
|
||||
Ready {
|
||||
_process_group_guard: Option<ProcessGroupGuard>,
|
||||
service: Arc<RunningService<RoleClient, LoggingClientHandler>>,
|
||||
oauth: Option<OAuthPersistor>,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
|
||||
|
||||
#[cfg(unix)]
|
||||
struct ProcessGroupGuard {
|
||||
process_group_id: u32,
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
struct ProcessGroupGuard;
|
||||
|
||||
impl ProcessGroupGuard {
|
||||
fn new(process_group_id: u32) -> Self {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
Self { process_group_id }
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
let _ = process_group_id;
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn maybe_terminate_process_group(&self) {
|
||||
let process_group_id = self.process_group_id;
|
||||
let should_escalate =
|
||||
match codex_utils_pty::process_group::terminate_process_group(process_group_id) {
|
||||
Ok(exists) => exists,
|
||||
Err(error) => {
|
||||
warn!("Failed to terminate MCP process group {process_group_id}: {error}");
|
||||
false
|
||||
}
|
||||
};
|
||||
if should_escalate {
|
||||
std::thread::spawn(move || {
|
||||
std::thread::sleep(PROCESS_GROUP_TERM_GRACE_PERIOD);
|
||||
if let Err(error) =
|
||||
codex_utils_pty::process_group::kill_process_group(process_group_id)
|
||||
{
|
||||
warn!("Failed to kill MCP process group {process_group_id}: {error}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn maybe_terminate_process_group(&self) {}
|
||||
}
|
||||
|
||||
impl Drop for ProcessGroupGuard {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(unix) {
|
||||
self.maybe_terminate_process_group();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Elicitation = CreateElicitationRequestParam;
|
||||
pub type ElicitationResponse = CreateElicitationResult;
|
||||
|
||||
@@ -129,6 +192,8 @@ impl RmcpClient {
|
||||
.env_clear()
|
||||
.envs(envs)
|
||||
.args(&args);
|
||||
#[cfg(unix)]
|
||||
command.process_group(0);
|
||||
if let Some(cwd) = cwd {
|
||||
command.current_dir(cwd);
|
||||
}
|
||||
@@ -136,6 +201,7 @@ impl RmcpClient {
|
||||
let (transport, stderr) = TokioChildProcess::builder(command)
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
|
||||
|
||||
if let Some(stderr) = stderr {
|
||||
tokio::spawn(async move {
|
||||
@@ -157,7 +223,10 @@ impl RmcpClient {
|
||||
|
||||
Ok(Self {
|
||||
state: Mutex::new(ClientState::Connecting {
|
||||
transport: Some(PendingTransport::ChildProcess(transport)),
|
||||
transport: Some(PendingTransport::ChildProcess {
|
||||
transport,
|
||||
process_group_guard,
|
||||
}),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -226,17 +295,22 @@ impl RmcpClient {
|
||||
) -> Result<InitializeResult> {
|
||||
let client_handler = LoggingClientHandler::new(params.clone(), send_elicitation);
|
||||
|
||||
let (transport, oauth_persistor) = {
|
||||
let (transport, oauth_persistor, process_group_guard) = {
|
||||
let mut guard = self.state.lock().await;
|
||||
match &mut *guard {
|
||||
ClientState::Connecting { transport } => match transport.take() {
|
||||
Some(PendingTransport::ChildProcess(transport)) => (
|
||||
Some(PendingTransport::ChildProcess {
|
||||
transport,
|
||||
process_group_guard,
|
||||
}) => (
|
||||
service::serve_client(client_handler.clone(), transport).boxed(),
|
||||
None,
|
||||
process_group_guard,
|
||||
),
|
||||
Some(PendingTransport::StreamableHttp { transport }) => (
|
||||
service::serve_client(client_handler.clone(), transport).boxed(),
|
||||
None,
|
||||
None,
|
||||
),
|
||||
Some(PendingTransport::StreamableHttpWithOAuth {
|
||||
transport,
|
||||
@@ -244,6 +318,7 @@ impl RmcpClient {
|
||||
}) => (
|
||||
service::serve_client(client_handler.clone(), transport).boxed(),
|
||||
Some(oauth_persistor),
|
||||
None,
|
||||
),
|
||||
None => return Err(anyhow!("client already initializing")),
|
||||
},
|
||||
@@ -270,6 +345,7 @@ impl RmcpClient {
|
||||
{
|
||||
let mut guard = self.state.lock().await;
|
||||
*guard = ClientState::Ready {
|
||||
_process_group_guard: process_group_guard,
|
||||
service: Arc::new(service),
|
||||
oauth: oauth_persistor.clone(),
|
||||
};
|
||||
@@ -448,7 +524,7 @@ impl RmcpClient {
|
||||
match &*guard {
|
||||
ClientState::Ready {
|
||||
oauth: Some(runtime),
|
||||
service: _,
|
||||
..
|
||||
} => Some(runtime.clone()),
|
||||
_ => None,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user