diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index e8c079f60d..36f202175c 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -2,7 +2,18 @@ use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; + +use codex_core::Codex; +use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::ApplyPatchApprovalRequestEvent; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::FileChange; +use mcp_types::RequestId; use tokio::sync::Mutex; +use tokio::sync::watch::Receiver as WatchReceiver; +use tracing::error; +use uuid::Uuid; use crate::exec_approval::handle_exec_approval_request; use crate::mcp_protocol::CodexEventNotificationParams; @@ -12,16 +23,6 @@ use crate::mcp_protocol::InitialStatePayload; use crate::mcp_protocol::NotificationMeta; use crate::outgoing_message::OutgoingMessageSender; use crate::patch_approval::handle_patch_approval_request; -use codex_core::Codex; -use codex_core::protocol::AgentMessageEvent; -use codex_core::protocol::ApplyPatchApprovalRequestEvent; -use codex_core::protocol::EventMsg; -use codex_core::protocol::ExecApprovalRequestEvent; -use codex_core::protocol::FileChange; -use mcp_types::RequestId; -use tokio::sync::watch::Receiver as WatchReceiver; -use tracing::error; -use uuid::Uuid; /// Conversation event loop bridging Codex events to MCP notifications. /// @@ -149,6 +150,8 @@ pub async fn run_conversation_loop( &ctx, ) .await; + } else { + error!("stream_rx change error; streaming control channel closed"); } } } diff --git a/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs b/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs index 4e4df5132a..d890ad9f07 100644 --- a/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs +++ b/codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs @@ -6,6 +6,8 @@ use crate::mcp_protocol::ToolCallResponseResult; use crate::message_processor::MessageProcessor; use crate::tool_handlers::send_message::get_session; +/// Handles the ConversationStream tool call: verifies the session and +/// enables streaming for the session, replying with an OK result. pub(crate) async fn handle_stream_conversation( message_processor: &MessageProcessor, id: RequestId, @@ -34,17 +36,14 @@ pub(crate) async fn handle_stream_conversation( let guard = senders_map.lock().await; guard.get(&session_id).cloned() }; - match tx { - Some(tx) => { - let _ = tx.send(true); - } - None => { - // No channel found for the session; treat as error - message_processor - .send_response_with_optional_error(id, None, Some(true)) - .await; - return; - } + if let Some(tx) = tx { + let _ = tx.send(true); + } else { + // No channel found for the session; treat as error + message_processor + .send_response_with_optional_error(id, None, Some(true)) + .await; + return; } // Acknowledge the stream request @@ -59,6 +58,7 @@ pub(crate) async fn handle_stream_conversation( .await; } +/// Handles cancellation for ConversationStream by disabling streaming for the session. pub(crate) async fn handle_cancel( message_processor: &MessageProcessor, args: &ConversationStreamArgs,