feat: hot reload mcp servers (#8957)

### Summary
* Added `mcpServer/refresh` command to inform app servers and active
threads to refresh mcpServer on next turn event.
* Added `pending_mcp_server_refresh_config` to codex core so that if the
value is populated, we reinitialize the mcp server manager on the thread
level.
* The config is updated on `mcpServer/refresh` command which we iterate
through threads and provide with the latest config value after last
write.
This commit is contained in:
Shijie Rao
2026-01-12 11:17:50 -08:00
committed by GitHub
parent 034d489c34
commit 3e91a95ce1
8 changed files with 247 additions and 6 deletions

View File

@@ -48,6 +48,7 @@ use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::OAuthCredentialsStoreMode;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
@@ -84,6 +85,7 @@ use crate::config::Config;
use crate::config::Constrained;
use crate::config::ConstraintResult;
use crate::config::GhostSnapshotConfig;
use crate::config::types::McpServerConfig;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
use crate::environment_context::EnvironmentContext;
@@ -107,6 +109,7 @@ use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::McpServerRefreshConfig;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::ReasoningContentDeltaEvent;
@@ -361,6 +364,7 @@ pub(crate) struct Session {
/// The set of enabled features should be invariant for the lifetime of the
/// session.
features: Features,
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) services: SessionServices,
next_internal_sub_id: AtomicU64,
@@ -685,7 +689,7 @@ impl Session {
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: CancellationToken::new(),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::default(),
notifier: UserNotifier::new(config.notify.clone()),
rollout: Mutex::new(Some(rollout_recorder)),
@@ -706,6 +710,7 @@ impl Session {
agent_status,
state: Mutex::new(state),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
@@ -742,6 +747,8 @@ impl Session {
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: session_configuration.cwd.clone(),
};
let cancel_token = sess.mcp_startup_cancellation_token().await;
sess.services
.mcp_connection_manager
.write()
@@ -751,7 +758,7 @@ impl Session {
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
tx_event.clone(),
sess.services.mcp_startup_cancellation_token.clone(),
cancel_token,
sandbox_state,
)
.await;
@@ -1647,12 +1654,85 @@ impl Session {
Arc::clone(&self.services.user_shell)
}
async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) {
let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() };
let Some(refresh_config) = refresh_config else {
return;
};
let McpServerRefreshConfig {
mcp_servers,
mcp_oauth_credentials_store_mode,
} = refresh_config;
let mcp_servers =
match serde_json::from_value::<HashMap<String, McpServerConfig>>(mcp_servers) {
Ok(servers) => servers,
Err(err) => {
warn!("failed to parse MCP server refresh config: {err}");
return;
}
};
let store_mode = match serde_json::from_value::<OAuthCredentialsStoreMode>(
mcp_oauth_credentials_store_mode,
) {
Ok(mode) => mode,
Err(err) => {
warn!("failed to parse MCP OAuth refresh config: {err}");
return;
}
};
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await;
let sandbox_state = SandboxState {
sandbox_policy: turn_context.sandbox_policy.clone(),
codex_linux_sandbox_exe: turn_context.codex_linux_sandbox_exe.clone(),
sandbox_cwd: turn_context.cwd.clone(),
};
let cancel_token = self.reset_mcp_startup_cancellation_token().await;
let mut refreshed_manager = McpConnectionManager::default();
refreshed_manager
.initialize(
mcp_servers,
store_mode,
auth_statuses,
self.get_tx_event(),
cancel_token,
sandbox_state,
)
.await;
let mut manager = self.services.mcp_connection_manager.write().await;
*manager = refreshed_manager;
}
async fn mcp_startup_cancellation_token(&self) -> CancellationToken {
self.services
.mcp_startup_cancellation_token
.lock()
.await
.clone()
}
async fn reset_mcp_startup_cancellation_token(&self) -> CancellationToken {
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
guard.cancel();
let cancel_token = CancellationToken::new();
*guard = cancel_token.clone();
cancel_token
}
fn show_raw_agent_reasoning(&self) -> bool {
self.services.show_raw_agent_reasoning
}
async fn cancel_mcp_startup(&self) {
self.services.mcp_startup_cancellation_token.cancel();
self.services
.mcp_startup_cancellation_token
.lock()
.await
.cancel();
}
}
@@ -1710,6 +1790,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::ListMcpTools => {
handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await;
}
Op::RefreshMcpServers { config } => {
handlers::refresh_mcp_servers(&sess, config).await;
}
Op::ListCustomPrompts => {
handlers::list_custom_prompts(&sess, sub.id.clone()).await;
}
@@ -1778,6 +1861,7 @@ mod handlers {
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ListCustomPromptsResponseEvent;
use codex_protocol::protocol::ListSkillsResponseEvent;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
@@ -1876,6 +1960,8 @@ mod handlers {
.await;
}
sess.refresh_mcp_servers_if_requested(&current_context)
.await;
sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
.await;
*previous_context = Some(current_context);
@@ -2007,6 +2093,11 @@ mod handlers {
});
}
pub async fn refresh_mcp_servers(sess: &Arc<Session>, refresh_config: McpServerRefreshConfig) {
let mut guard = sess.pending_mcp_server_refresh_config.lock().await;
*guard = Some(refresh_config);
}
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let snapshot = collect_mcp_snapshot_from_manager(
@@ -2191,6 +2282,7 @@ mod handlers {
review_request: ReviewRequest,
) {
let turn_context = sess.new_default_turn_with_sub_id(sub_id.clone()).await;
sess.refresh_mcp_servers_if_requested(&turn_context).await;
match resolve_review_request(review_request, turn_context.cwd.as_path()) {
Ok(resolved) => {
spawn_review_thread(
@@ -3526,7 +3618,7 @@ mod tests {
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: CancellationToken::new(),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::default(),
notifier: UserNotifier::new(None),
rollout: Mutex::new(None),
@@ -3558,6 +3650,7 @@ mod tests {
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
@@ -3620,7 +3713,7 @@ mod tests {
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: CancellationToken::new(),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::default(),
notifier: UserNotifier::new(None),
rollout: Mutex::new(None),
@@ -3652,6 +3745,7 @@ mod tests {
agent_status: agent_status_tx,
state: Mutex::new(state),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
@@ -3660,6 +3754,48 @@ mod tests {
(session, turn_context, rx_event)
}
#[tokio::test]
async fn refresh_mcp_servers_is_deferred_until_next_turn() {
let (session, turn_context) = make_session_and_context().await;
let old_token = session.mcp_startup_cancellation_token().await;
assert!(!old_token.is_cancelled());
let mcp_oauth_credentials_store_mode =
serde_json::to_value(OAuthCredentialsStoreMode::Auto).expect("serialize store mode");
let refresh_config = McpServerRefreshConfig {
mcp_servers: json!({}),
mcp_oauth_credentials_store_mode,
};
{
let mut guard = session.pending_mcp_server_refresh_config.lock().await;
*guard = Some(refresh_config);
}
assert!(!old_token.is_cancelled());
assert!(
session
.pending_mcp_server_refresh_config
.lock()
.await
.is_some()
);
session
.refresh_mcp_servers_if_requested(&turn_context)
.await;
assert!(old_token.is_cancelled());
assert!(
session
.pending_mcp_server_refresh_config
.lock()
.await
.is_none()
);
let new_token = session.mcp_startup_cancellation_token().await;
assert!(!new_token.is_cancelled());
}
#[tokio::test]
async fn record_model_warning_appends_user_message() {
let (mut session, turn_context) = make_session_and_context().await;