Compare commits

...

2 Commits

Author SHA1 Message Date
Justin Rushing
7fc03f4a19 Send raw Codex events to subscribed MCP servers
Remove the _meta.threadId envelope from forwarded codex/event notifications. External MCP servers now receive the serialized Event directly as notification params, so the payload contract is just id plus msg, with msg.type still used for subscription filtering.

This keeps the custom notification API minimal and avoids carrying duplicate thread metadata in a one-field wrapper. Event fanout semantics are unchanged: Codex still emits only to servers that advertise matching capabilities.experimental["codex/events"].eventTypes, skips servers that are still starting, and treats delivery as best effort.
2026-04-02 23:48:11 -07:00
Justin Rushing
f2620e7c8b Forward Codex events to subscribed MCP servers
Add an experimental codex/events MCP capability so external servers can opt into runtime event notifications during initialize. Servers advertise the exact event types they want via capabilities.experimental["codex/events"].eventTypes, and Codex only forwards matching events to those servers.

Forwarded notifications use method codex/event and preserve the existing event payload shape used by the built-in MCP server: params._meta.threadId carries the conversation/thread identifier, while the Event itself is flattened into params.id and params.msg. That keeps context metadata separate from the event body while avoiding an extra wrapper object.

The core session now publishes each event to the existing tx_event channel as before, then performs best-effort fanout to the MCP connection manager. Fanout skips servers that are still starting up, ignores servers that did not subscribe to the emitted msg.type, and logs notification failures without breaking the main turn flow.

Add tests for generic experimental capability detection and for loading codex/events eventTypes from the MCP initialize response, including the no-eventTypes case which intentionally subscribes to nothing.
2026-04-02 23:39:23 -07:00
3 changed files with 212 additions and 7 deletions

View File

@@ -357,6 +357,7 @@ struct ManagedClient {
tool_filter: ToolFilter,
tool_timeout: Option<Duration>,
server_supports_sandbox_state_capability: bool,
codex_event_types: HashSet<String>,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
@@ -401,6 +402,16 @@ impl ManagedClient {
.await?;
Ok(())
}
async fn notify_codex_event(&self, event_type: &str, params: &serde_json::Value) -> Result<()> {
if !self.codex_event_types.contains(event_type) {
return Ok(());
}
self.client
.send_custom_notification(MCP_CODEX_EVENT_METHOD, Some(params.clone()))
.await
}
}
#[derive(Clone)]
@@ -559,13 +570,24 @@ impl AsyncManagedClient {
let managed = self.client().await?;
managed.notify_sandbox_state_change(sandbox_state).await
}
async fn notify_codex_event(&self, event_type: &str, params: &serde_json::Value) -> Result<()> {
if !self.startup_complete.load(Ordering::Acquire) {
return Ok(());
}
let managed = self.client().await?;
managed.notify_codex_event(event_type, params).await
}
}
pub const MCP_SANDBOX_STATE_CAPABILITY: &str = "codex/sandbox-state";
pub const MCP_CODEX_EVENTS_CAPABILITY: &str = "codex/events";
/// Custom MCP request to push sandbox state updates.
/// When used, the `params` field of the notification is [`SandboxState`].
pub const MCP_SANDBOX_STATE_METHOD: &str = "codex/sandbox-state/update";
pub const MCP_CODEX_EVENT_METHOD: &str = "codex/event";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -577,6 +599,13 @@ pub struct SandboxState {
pub use_legacy_landlock: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CodexEventsCapability {
#[serde(default)]
event_types: HashSet<String>,
}
/// A thin wrapper around a set of running [`RmcpClient`] instances.
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
@@ -1126,6 +1155,44 @@ impl McpConnectionManager {
Ok(())
}
pub async fn notify_codex_event(&self, event: &Event) -> Result<()> {
let params = serde_json::to_value(event)?;
let Some(event_type) = params
.get("msg")
.and_then(|msg| msg.get("type"))
.and_then(|event_type| event_type.as_str())
else {
warn!("Skipping codex event notification without msg.type");
return Ok(());
};
let mut join_set = JoinSet::new();
for async_managed_client in self.clients.values() {
let params = params.clone();
let event_type = event_type.to_string();
let async_managed_client = async_managed_client.clone();
join_set.spawn(async move {
async_managed_client
.notify_codex_event(&event_type, &params)
.await
});
}
while let Some(join_res) = join_set.join_next().await {
match join_res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!("Failed to notify codex event to MCP server: {err:#}");
}
Err(err) => {
warn!("Task panic when notifying codex event to MCP server: {err:#}");
}
}
}
Ok(())
}
}
async fn emit_update(
@@ -1297,6 +1364,43 @@ fn resolve_bearer_token(
}
}
fn server_supports_experimental_capability(
initialize_result: &rmcp::model::InitializeResult,
capability: &str,
) -> bool {
initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(capability))
.is_some()
}
fn server_codex_event_types(
server_name: &str,
initialize_result: &rmcp::model::InitializeResult,
) -> HashSet<String> {
let Some(value) = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_CODEX_EVENTS_CAPABILITY))
else {
return HashSet::new();
};
match serde_json::from_value::<CodexEventsCapability>(serde_json::Value::Object(value.clone()))
{
Ok(capability) => capability.event_types,
Err(err) => {
warn!(
"MCP server {server_name} declared invalid {MCP_CODEX_EVENTS_CAPABILITY} capability: {err:#}"
);
HashSet::new()
}
}
}
#[derive(Debug, Clone, thiserror::Error)]
enum StartupOutcomeError {
#[error("MCP startup cancelled")]
@@ -1396,18 +1500,16 @@ async fn start_server_task(
}
let tools = filter_tools(tools, &tool_filter);
let server_supports_sandbox_state_capability = initialize_result
.capabilities
.experimental
.as_ref()
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_CAPABILITY))
.is_some();
let server_supports_sandbox_state_capability =
server_supports_experimental_capability(&initialize_result, MCP_SANDBOX_STATE_CAPABILITY);
let codex_event_types = server_codex_event_types(&server_name, &initialize_result);
let managed = ManagedClient {
client: Arc::clone(&client),
tools,
tool_timeout: Some(tool_timeout),
tool_filter,
server_supports_sandbox_state_capability,
codex_event_types,
codex_apps_tools_cache_context,
};

View File

@@ -2,6 +2,7 @@ use super::*;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::McpAuthStatus;
use rmcp::model::JsonObject;
use serde_json::json;
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::tempdir;
@@ -522,6 +523,100 @@ fn elicitation_capability_enabled_only_for_codex_apps() {
assert!(elicitation_capability_for_server("custom_mcp").is_none());
}
#[test]
fn experimental_capability_detected_when_present() {
let initialize_result: rmcp::model::InitializeResult = serde_json::from_value(json!({
"capabilities": {
"experimental": {
"codex/events": {}
}
},
"instructions": null,
"protocolVersion": "2025-06-18",
"serverInfo": {
"name": "test",
"version": "1.0.0"
}
}))
.expect("initialize result");
assert!(server_supports_experimental_capability(
&initialize_result,
MCP_CODEX_EVENTS_CAPABILITY,
));
assert!(!server_supports_experimental_capability(
&initialize_result,
MCP_SANDBOX_STATE_CAPABILITY,
));
}
#[test]
fn experimental_capability_not_detected_when_missing() {
let initialize_result: rmcp::model::InitializeResult = serde_json::from_value(json!({
"capabilities": {},
"instructions": null,
"protocolVersion": "2025-06-18",
"serverInfo": {
"name": "test",
"version": "1.0.0"
}
}))
.expect("initialize result");
assert!(!server_supports_experimental_capability(
&initialize_result,
MCP_CODEX_EVENTS_CAPABILITY,
));
}
#[test]
fn codex_event_types_loaded_from_capability() {
let initialize_result: rmcp::model::InitializeResult = serde_json::from_value(json!({
"capabilities": {
"experimental": {
"codex/events": {
"eventTypes": ["item_completed", "mcp_tool_call_begin"]
}
}
},
"instructions": null,
"protocolVersion": "2025-06-18",
"serverInfo": {
"name": "test",
"version": "1.0.0"
}
}))
.expect("initialize result");
assert_eq!(
server_codex_event_types("test", &initialize_result),
HashSet::from([
"item_completed".to_string(),
"mcp_tool_call_begin".to_string(),
])
);
}
#[test]
fn codex_event_types_empty_when_capability_omits_event_types() {
let initialize_result: rmcp::model::InitializeResult = serde_json::from_value(json!({
"capabilities": {
"experimental": {
"codex/events": {}
}
},
"instructions": null,
"protocolVersion": "2025-06-18",
"serverInfo": {
"name": "test",
"version": "1.0.0"
}
}))
.expect("initialize result");
assert!(server_codex_event_types("test", &initialize_result).is_empty());
}
#[test]
fn mcp_init_error_display_prompts_for_github_pat() {
let server_name = "github";

View File

@@ -2772,9 +2772,17 @@ impl Session {
if let Some(status) = agent_status_from_event(&event.msg) {
self.agent_status.send_replace(status);
}
if let Err(e) = self.tx_event.send(event).await {
if let Err(e) = self.tx_event.send(event.clone()).await {
debug!("dropping event because channel is closed: {e}");
}
self.notify_external_mcp_event(&event).await;
}
async fn notify_external_mcp_event(&self, event: &Event) {
let manager = self.services.mcp_connection_manager.read().await;
if let Err(err) = manager.notify_codex_event(event).await {
warn!("failed to notify MCP servers about codex event: {err:#}");
}
}
pub(crate) async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {