diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index c2f9fe9dc3..e8c079f60d 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::collections::HashSet; +use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; @@ -15,11 +17,20 @@ use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::FileChange; use mcp_types::RequestId; use tokio::sync::watch::Receiver as WatchReceiver; use tracing::error; use uuid::Uuid; +/// Conversation event loop bridging Codex events to MCP notifications. +/// +/// Semantics: +/// - Always buffers all Codex events to include in an InitialState snapshot when +/// streaming turns on. +/// - Streams notifications live when `streaming_enabled` is true. +/// - Defers exec/patch approval elicitations until streaming turns on so +/// the client first receives InitialState, then the corresponding requests. pub async fn run_conversation_loop( codex: Arc, outgoing: Arc, @@ -33,10 +44,19 @@ pub async fn run_conversation_loop( RequestId::Integer(n) => n.to_string(), }; - // Buffer all events for InitialState + // Buffer all events to include in InitialState when streaming is enabled let mut buffered_events: Vec = Vec::new(); let mut streaming_enabled = *stream_rx.borrow(); + let mut pending_elicitations: Vec = Vec::new(); + + let ctx = LoopCtx { + outgoing: outgoing.clone(), + codex: codex.clone(), + request_id: request_id.clone(), + request_id_str: request_id_str.clone(), + }; + loop { tokio::select! { res = codex.next_event() => { @@ -44,70 +64,44 @@ pub async fn run_conversation_loop( Ok(event) => { // Always buffer the event buffered_events.push(CodexEventNotificationParams { meta: None, msg: event.msg.clone() }); - - if streaming_enabled { - let method = event.msg.to_string(); - let params = CodexEventNotificationParams { meta: None, msg: event.msg.clone() }; - if let Ok(params_val) = serde_json::to_value(¶ms) { - outgoing - .send_custom_notification(&method, params_val) - .await; - } else { - error!("Failed to serialize event params"); - } - } + // Stream immediately if enabled + stream_event_if_enabled(streaming_enabled, &ctx, &event.msg).await; match event.msg { - EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { - command, - cwd, - call_id, - reason: _, - }) => { - if streaming_enabled { - handle_exec_approval_request( - command, - cwd, - outgoing.clone(), - codex.clone(), - request_id.clone(), - request_id_str.clone(), - event.id.clone(), - call_id, - ) - .await; - } + EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { command, cwd, call_id, reason: _ }) => { + process_exec_request( + streaming_enabled, + &mut pending_elicitations, + command, + cwd, + call_id, + event.id.clone(), + &ctx, + ) + .await; continue; } EventMsg::Error(_) => { error!("Codex runtime error"); } - EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { - call_id, - reason, - grant_root, - changes, - }) => { - if streaming_enabled { - handle_patch_approval_request( - call_id, - reason, - grant_root, - changes, - outgoing.clone(), - codex.clone(), - request_id.clone(), - request_id_str.clone(), - event.id.clone(), - ) - .await; - } + EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, reason, grant_root, changes }) => { + process_patch_request( + streaming_enabled, + &mut pending_elicitations, + PatchReq { + call_id, + reason, + grant_root, + changes, + event_id: event.id.clone(), + }, + &ctx, + ) + .await; continue; } EventMsg::TaskComplete(_) => { - // remove running session id - let mut running_sessions = running_sessions.lock().await; - running_sessions.remove(&session_id); + handle_task_complete(&running_sessions, &session_id).await; } EventMsg::SessionConfigured(_) => { tracing::error!("unexpected SessionConfigured event"); @@ -135,12 +129,6 @@ pub async fn run_conversation_loop( | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) | EventMsg::ShutdownComplete => { - // For now, we do not do anything extra for these - // events. Note that - // send(codex_event_to_notification(&event)) above has - // already dispatched these events as notifications, - // though we may want to do give different treatment to - // individual events in the future. } } } @@ -152,26 +140,242 @@ pub async fn run_conversation_loop( changed = stream_rx.changed() => { if changed.is_ok() { let now = *stream_rx.borrow(); - if now && !streaming_enabled { - streaming_enabled = true; - // Emit InitialState with all buffered events - let params = InitialStateNotificationParams { - meta: Some(NotificationMeta { conversation_id: Some(ConversationId(session_id)), request_id: None }), - initial_state: InitialStatePayload { events: buffered_events.clone() }, - }; - if let Ok(params_val) = serde_json::to_value(¶ms) { - outgoing - .send_custom_notification("notifications/initial_state", params_val) - .await; - } else { - error!("Failed to serialize InitialState params"); - } - } else if !now && streaming_enabled { - // streaming disabled - streaming_enabled = false; - } + handle_stream_change( + now, + &mut streaming_enabled, + session_id, + &buffered_events, + &mut pending_elicitations, + &ctx, + ) + .await; } } } } } + +/// Deferred elicitation requests to be sent after InitialState when +/// streaming is enabled. Preserves original event order (FIFO). +enum PendingElicitation { + Exec { + command: Vec, + cwd: PathBuf, + event_id: String, + call_id: String, + }, + Patch { + call_id: String, + reason: Option, + grant_root: Option, + changes: HashMap, + event_id: String, + }, +} + +/// Immutable context shared across helper functions to avoid long +/// argument lists. +struct LoopCtx { + outgoing: Arc, + codex: Arc, + request_id: RequestId, + request_id_str: String, +} + +/// Snapshot of a patch approval request used to defer elicitation. +struct PatchReq { + call_id: String, + reason: Option, + grant_root: Option, + changes: HashMap, + event_id: String, +} + +/// Streams a single Codex event as an MCP notification if streaming is enabled. +async fn stream_event_if_enabled(streaming_enabled: bool, ctx: &LoopCtx, msg: &EventMsg) { + if !streaming_enabled { + return; + } + let method = msg.to_string(); + let params = CodexEventNotificationParams { + meta: None, + msg: msg.clone(), + }; + if let Ok(params_val) = serde_json::to_value(¶ms) { + ctx.outgoing + .send_custom_notification(&method, params_val) + .await; + } else { + error!("Failed to serialize event params"); + } +} + +/// Handles an exec approval request. If streaming is disabled, defers the +/// elicitation until after InitialState; otherwise elicits immediately. +async fn process_exec_request( + streaming_enabled: bool, + pending: &mut Vec, + command: Vec, + cwd: PathBuf, + call_id: String, + event_id: String, + ctx: &LoopCtx, +) { + if streaming_enabled { + handle_exec_approval_request( + command, + cwd, + ctx.outgoing.clone(), + ctx.codex.clone(), + ctx.request_id.clone(), + ctx.request_id_str.clone(), + event_id, + call_id, + ) + .await; + } else { + pending.push(PendingElicitation::Exec { + command, + cwd, + event_id, + call_id, + }); + } +} + +/// Handles a patch approval request. If streaming is disabled, defers the +/// elicitation until after InitialState; otherwise elicits immediately. +async fn process_patch_request( + streaming_enabled: bool, + pending: &mut Vec, + req: PatchReq, + ctx: &LoopCtx, +) { + let PatchReq { + call_id, + reason, + grant_root, + changes, + event_id, + } = req; + if streaming_enabled { + handle_patch_approval_request( + call_id, + reason, + grant_root, + changes, + ctx.outgoing.clone(), + ctx.codex.clone(), + ctx.request_id.clone(), + ctx.request_id_str.clone(), + event_id, + ) + .await; + } else { + pending.push(PendingElicitation::Patch { + call_id, + reason, + grant_root, + changes, + event_id, + }); + } +} + +/// Handles a streaming state change. +/// +/// When enabling streaming: +/// 1) emits InitialState with all buffered events +/// 2) drains and sends any deferred elicitations +async fn handle_stream_change( + now: bool, + streaming_enabled: &mut bool, + session_id: Uuid, + buffered_events: &[CodexEventNotificationParams], + pending: &mut Vec, + ctx: &LoopCtx, +) { + if now && !*streaming_enabled { + *streaming_enabled = true; + emit_initial_state(ctx, session_id, buffered_events).await; + drain_pending_elicitations(pending, ctx).await; + } else if !now && *streaming_enabled { + *streaming_enabled = false; + } +} + +/// Emits the InitialState snapshot to the client. +async fn emit_initial_state( + ctx: &LoopCtx, + session_id: Uuid, + buffered_events: &[CodexEventNotificationParams], +) { + let params = InitialStateNotificationParams { + meta: Some(NotificationMeta { + conversation_id: Some(ConversationId(session_id)), + request_id: None, + }), + initial_state: InitialStatePayload { + events: buffered_events.to_vec(), + }, + }; + if let Ok(params_val) = serde_json::to_value(¶ms) { + ctx.outgoing + .send_custom_notification("notifications/initial_state", params_val) + .await; + } else { + error!("Failed to serialize InitialState params"); + } +} + +/// Sends any deferred exec/patch elicitations in FIFO order. +async fn drain_pending_elicitations(pending: &mut Vec, ctx: &LoopCtx) { + for item in pending.drain(..) { + match item { + PendingElicitation::Exec { + command, + cwd, + event_id, + call_id, + } => { + handle_exec_approval_request( + command, + cwd, + ctx.outgoing.clone(), + ctx.codex.clone(), + ctx.request_id.clone(), + ctx.request_id_str.clone(), + event_id, + call_id, + ) + .await; + } + PendingElicitation::Patch { + call_id, + reason, + grant_root, + changes, + event_id, + } => { + handle_patch_approval_request( + call_id, + reason, + grant_root, + changes, + ctx.outgoing.clone(), + ctx.codex.clone(), + ctx.request_id.clone(), + ctx.request_id_str.clone(), + event_id, + ) + .await; + } + } + } +} + +/// Removes the session id from the shared running set when a task completes. +async fn handle_task_complete(running_sessions: &Arc>>, session_id: &Uuid) { + let mut running_sessions = running_sessions.lock().await; + running_sessions.remove(session_id); +} diff --git a/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs b/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs index 52f01760de..4e4df5132a 100644 --- a/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs +++ b/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs @@ -5,7 +5,6 @@ use crate::mcp_protocol::ConversationStreamResult; use crate::mcp_protocol::ToolCallResponseResult; use crate::message_processor::MessageProcessor; use crate::tool_handlers::send_message::get_session; -use uuid::Uuid; pub(crate) async fn handle_stream_conversation( message_processor: &MessageProcessor, @@ -64,10 +63,7 @@ pub(crate) async fn handle_cancel( message_processor: &MessageProcessor, args: &ConversationStreamArgs, ) { - disable_stream_for_session(message_processor, args.conversation_id.0).await; -} - -async fn disable_stream_for_session(message_processor: &MessageProcessor, session_id: Uuid) { + let session_id = args.conversation_id.0; let sender_opt: Option> = { let senders = message_processor.streaming_session_senders(); let guard = senders.lock().await;