mirror of
https://github.com/openai/codex.git
synced 2026-05-04 13:21:54 +03:00
This is a very large PR with some non-backwards-compatible changes. Historically, `codex mcp` (or `codex mcp serve`) started a JSON-RPC-ish server that had two overlapping responsibilities: - Running an MCP server, providing some basic tool calls. - Running the app server used to power experiences such as the VS Code extension. This PR aims to separate these into distinct concepts: - `codex mcp-server` for the MCP server - `codex app-server` for the "application server" Note `codex mcp` still exists because it already has its own subcommands for MCP management (`list`, `add`, etc.) The MCP logic continues to live in `codex-rs/mcp-server` whereas the refactored app server logic is in the new `codex-rs/app-server` folder. Note that most of the existing integration tests in `codex-rs/mcp-server/tests/suite` were actually for the app server, so all the tests have been moved with the exception of `codex-rs/mcp-server/tests/suite/mod.rs`. Because this is already a large diff, I tried not to change more than I had to, so `codex-rs/app-server/tests/common/mcp_process.rs` still uses the name `McpProcess` for now, but I will do some mechanical renamings to things like `AppServer` in subsequent PRs. While `mcp-server` and `app-server` share some overlapping functionality (like reading streams of JSONL and dispatching based on message types) and some differences (completely different message types), I ended up doing a bit of copypasta between the two crates, as both have somewhat similar `message_processor.rs` and `outgoing_message.rs` files for now, though I expect them to diverge more in the near future. One material change is that of the initialize handshake for `codex app-server`, as we no longer use the MCP types for that handshake. Instead, we update `codex-rs/protocol/src/mcp_protocol.rs` to add an `Initialize` variant to `ClientRequest`, which takes the `ClientInfo` object we need to update the `USER_AGENT_SUFFIX` in `codex-rs/app-server/src/message_processor.rs`. One other material change is in `codex-rs/app-server/src/codex_message_processor.rs` where I eliminated a use of the `send_event_as_notification()` method I am generally trying to deprecate (because it blindly maps an `EventMsg` into a `JSONNotification`) in favor of `send_server_notification()`, which takes a `ServerNotification`, as that is intended to be a custom enum of all notification types supported by the app server. So to make this update, I had to introduce a new variant of `ServerNotification`, `SessionConfigured`, which is a non-backwards compatible change with the old `codex mcp`, and clients will have to be updated after the next release that contains this PR. Note that `codex-rs/app-server/tests/suite/list_resume.rs` also had to be update to reflect this change. I introduced `codex-rs/utils/json-to-toml/src/lib.rs` as a small utility crate to avoid some of the copying between `mcp-server` and `app-server`.
134 lines
5.1 KiB
Rust
134 lines
5.1 KiB
Rust
use std::path::PathBuf;
|
|
|
|
use crate::codex_message_processor::CodexMessageProcessor;
|
|
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use codex_protocol::mcp_protocol::ClientInfo;
|
|
use codex_protocol::mcp_protocol::ClientRequest;
|
|
use codex_protocol::mcp_protocol::InitializeResponse;
|
|
|
|
use codex_core::AuthManager;
|
|
use codex_core::ConversationManager;
|
|
use codex_core::config::Config;
|
|
use codex_core::default_client::USER_AGENT_SUFFIX;
|
|
use codex_core::default_client::get_codex_user_agent;
|
|
use mcp_types::JSONRPCError;
|
|
use mcp_types::JSONRPCErrorError;
|
|
use mcp_types::JSONRPCNotification;
|
|
use mcp_types::JSONRPCRequest;
|
|
use mcp_types::JSONRPCResponse;
|
|
use std::sync::Arc;
|
|
|
|
pub(crate) struct MessageProcessor {
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
codex_message_processor: CodexMessageProcessor,
|
|
initialized: bool,
|
|
}
|
|
|
|
impl MessageProcessor {
|
|
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
|
|
/// `Sender` so handlers can enqueue messages to be written to stdout.
|
|
pub(crate) fn new(
|
|
outgoing: OutgoingMessageSender,
|
|
codex_linux_sandbox_exe: Option<PathBuf>,
|
|
config: Arc<Config>,
|
|
) -> Self {
|
|
let outgoing = Arc::new(outgoing);
|
|
let auth_manager = AuthManager::shared(config.codex_home.clone());
|
|
let conversation_manager = Arc::new(ConversationManager::new(auth_manager.clone()));
|
|
let codex_message_processor = CodexMessageProcessor::new(
|
|
auth_manager,
|
|
conversation_manager,
|
|
outgoing.clone(),
|
|
codex_linux_sandbox_exe,
|
|
config,
|
|
);
|
|
|
|
Self {
|
|
outgoing,
|
|
codex_message_processor,
|
|
initialized: false,
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
|
|
let request_id = request.id.clone();
|
|
if let Ok(request_json) = serde_json::to_value(request)
|
|
&& let Ok(codex_request) = serde_json::from_value::<ClientRequest>(request_json)
|
|
{
|
|
match codex_request {
|
|
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
|
// itself with the `initialized` bool.
|
|
ClientRequest::Initialize { request_id, params } => {
|
|
if self.initialized {
|
|
let error = JSONRPCErrorError {
|
|
code: INVALID_REQUEST_ERROR_CODE,
|
|
message: "Already initialized".to_string(),
|
|
data: None,
|
|
};
|
|
self.outgoing.send_error(request_id, error).await;
|
|
return;
|
|
} else {
|
|
let ClientInfo {
|
|
name,
|
|
title: _title,
|
|
version,
|
|
} = params.client_info;
|
|
let user_agent_suffix = format!("{name}; {version}");
|
|
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
|
|
*suffix = Some(user_agent_suffix);
|
|
}
|
|
|
|
let user_agent = get_codex_user_agent();
|
|
let response = InitializeResponse { user_agent };
|
|
self.outgoing.send_response(request_id, response).await;
|
|
|
|
self.initialized = true;
|
|
return;
|
|
}
|
|
}
|
|
_ => {
|
|
if !self.initialized {
|
|
let error = JSONRPCErrorError {
|
|
code: INVALID_REQUEST_ERROR_CODE,
|
|
message: "Not initialized".to_string(),
|
|
data: None,
|
|
};
|
|
self.outgoing.send_error(request_id, error).await;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
self.codex_message_processor
|
|
.process_request(codex_request)
|
|
.await;
|
|
} else {
|
|
let error = JSONRPCErrorError {
|
|
code: INVALID_REQUEST_ERROR_CODE,
|
|
message: "Invalid request".to_string(),
|
|
data: None,
|
|
};
|
|
self.outgoing.send_error(request_id, error).await;
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
|
|
// Currently, we do not expect to receive any notifications from the
|
|
// client, so we just log them.
|
|
tracing::info!("<- notification: {:?}", notification);
|
|
}
|
|
|
|
/// Handle a standalone JSON-RPC response originating from the peer.
|
|
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
|
|
tracing::info!("<- response: {:?}", response);
|
|
let JSONRPCResponse { id, result, .. } = response;
|
|
self.outgoing.notify_client_response(id, result).await
|
|
}
|
|
|
|
/// Handle an error object received from the peer.
|
|
pub(crate) fn process_error(&mut self, err: JSONRPCError) {
|
|
tracing::error!("<- error: {:?}", err);
|
|
}
|
|
}
|