mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
Add MCP tool call spans (#15659)
## Summary - add an explicit `mcp.tools.call` span around MCP tool execution in core - keep MCP span validation local to `mcp_tool_call_tests` instead of broadening the integration test suite - inline the turn/session correlation fields directly in the span initializer ## Included Changes - `codex-rs/core/src/mcp_tool_call.rs`: wrap the existing MCP tool call in `mcp.tools.call` and inline `conversation.id`, `session.id`, and `turn.id` in the span initializer - `codex-rs/core/src/mcp_tool_call_tests.rs`: assert the MCP span records the expected correlation and server fields ## Testing - `cargo test -p codex-core` - `just fmt` ## Notes - `cargo test -p codex-core` still hits existing unrelated failures in guardian-config tests and the sandboxed JS REPL `mktemp` test - metric work moved to stacked PR #15792 - transport-level RMCP spans and trace propagation remain in stacked PR #15792 - full workspace `cargo test` was not run --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
committed by
GitHub
parent
2c67a27a71
commit
b6524514c1
@@ -50,6 +50,10 @@ use serde::Serialize;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use toml_edit::value;
|
||||
use tracing::Instrument;
|
||||
use tracing::Span;
|
||||
use tracing::field::Empty;
|
||||
use url::Url;
|
||||
|
||||
/// Handles the specified tool call dispatches the appropriate
|
||||
/// `McpToolCallBegin` and `McpToolCallEnd` events to the `Session`.
|
||||
@@ -121,6 +125,19 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
}
|
||||
let request_meta =
|
||||
build_mcp_tool_call_request_meta(turn_context.as_ref(), &server, metadata.as_ref());
|
||||
let connector_id = metadata
|
||||
.as_ref()
|
||||
.and_then(|metadata| metadata.connector_id.clone());
|
||||
let connector_name = metadata
|
||||
.as_ref()
|
||||
.and_then(|metadata| metadata.connector_name.clone());
|
||||
let server_origin = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.server_origin(&server)
|
||||
.map(str::to_string);
|
||||
|
||||
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
@@ -145,15 +162,29 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context.as_ref()).await;
|
||||
|
||||
let start = Instant::now();
|
||||
let result = sess
|
||||
.call_tool(
|
||||
let result = async {
|
||||
sess.call_tool(
|
||||
&server,
|
||||
&tool_name,
|
||||
arguments_value.clone(),
|
||||
request_meta.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"));
|
||||
.map_err(|e| format!("tool call error: {e:?}"))
|
||||
}
|
||||
.instrument(mcp_tool_call_span(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
McpToolCallSpanFields {
|
||||
server_name: &server,
|
||||
tool_name: &tool_name,
|
||||
call_id: &call_id,
|
||||
server_origin: server_origin.as_deref(),
|
||||
connector_id: connector_id.as_deref(),
|
||||
connector_name: connector_name.as_deref(),
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
turn_context
|
||||
.model_info
|
||||
@@ -161,8 +192,8 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
.contains(&InputModality::Image),
|
||||
result,
|
||||
);
|
||||
if let Err(e) = &result {
|
||||
tracing::warn!("MCP tool call error: {e:?}");
|
||||
if let Err(error) = &result {
|
||||
tracing::warn!("MCP tool call error: {error:?}");
|
||||
}
|
||||
let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
||||
call_id: call_id.clone(),
|
||||
@@ -236,10 +267,24 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
|
||||
let start = Instant::now();
|
||||
// Perform the tool call.
|
||||
let result = sess
|
||||
.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"));
|
||||
let result = async {
|
||||
sess.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"))
|
||||
}
|
||||
.instrument(mcp_tool_call_span(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
McpToolCallSpanFields {
|
||||
server_name: &server,
|
||||
tool_name: &tool_name,
|
||||
call_id: &call_id,
|
||||
server_origin: server_origin.as_deref(),
|
||||
connector_id: connector_id.as_deref(),
|
||||
connector_name: connector_name.as_deref(),
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
turn_context
|
||||
.model_info
|
||||
@@ -247,8 +292,8 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
.contains(&InputModality::Image),
|
||||
result,
|
||||
);
|
||||
if let Err(e) = &result {
|
||||
tracing::warn!("MCP tool call error: {e:?}");
|
||||
if let Err(error) = &result {
|
||||
tracing::warn!("MCP tool call error: {error:?}");
|
||||
}
|
||||
let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
||||
call_id: call_id.clone(),
|
||||
@@ -273,6 +318,62 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
CallToolResult::from_result(result)
|
||||
}
|
||||
|
||||
fn mcp_tool_call_span(
|
||||
session: &Session,
|
||||
turn_context: &TurnContext,
|
||||
fields: McpToolCallSpanFields<'_>,
|
||||
) -> Span {
|
||||
let transport = match fields.server_origin {
|
||||
Some("stdio") => "stdio",
|
||||
Some(_) => "streamable_http",
|
||||
None => "",
|
||||
};
|
||||
let span = tracing::info_span!(
|
||||
"mcp.tools.call",
|
||||
otel.kind = "client",
|
||||
rpc.system = "jsonrpc",
|
||||
rpc.method = "tools/call",
|
||||
mcp.server.name = fields.server_name,
|
||||
mcp.server.origin = fields.server_origin.unwrap_or(""),
|
||||
mcp.transport = transport,
|
||||
mcp.connector.id = fields.connector_id.unwrap_or(""),
|
||||
mcp.connector.name = fields.connector_name.unwrap_or(""),
|
||||
tool.name = fields.tool_name,
|
||||
tool.call_id = fields.call_id,
|
||||
conversation.id = %session.conversation_id,
|
||||
session.id = %session.conversation_id,
|
||||
turn.id = turn_context.sub_id.as_str(),
|
||||
server.address = Empty,
|
||||
server.port = Empty,
|
||||
);
|
||||
record_server_fields(&span, fields.server_origin);
|
||||
span
|
||||
}
|
||||
|
||||
struct McpToolCallSpanFields<'a> {
|
||||
server_name: &'a str,
|
||||
tool_name: &'a str,
|
||||
call_id: &'a str,
|
||||
server_origin: Option<&'a str>,
|
||||
connector_id: Option<&'a str>,
|
||||
connector_name: Option<&'a str>,
|
||||
}
|
||||
|
||||
fn record_server_fields(span: &Span, url: Option<&str>) {
|
||||
let Some(url) = url else {
|
||||
return;
|
||||
};
|
||||
let Ok(parsed) = Url::parse(url) else {
|
||||
return;
|
||||
};
|
||||
if let Some(host) = parsed.host_str() {
|
||||
span.record("server.address", host);
|
||||
}
|
||||
if let Some(port) = parsed.port_or_known_default() {
|
||||
span.record("server.port", port as i64);
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_mark_thread_memory_mode_polluted(sess: &Session, turn_context: &TurnContext) {
|
||||
if !turn_context
|
||||
.config
|
||||
|
||||
@@ -18,6 +18,10 @@ use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
use tracing::Instrument;
|
||||
use tracing::Level;
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_test::internal::MockWriter;
|
||||
|
||||
fn annotations(
|
||||
read_only: Option<bool>,
|
||||
@@ -119,6 +123,57 @@ fn approval_question_text_prepends_safety_reason() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mcp_tool_call_span_records_expected_fields() {
|
||||
let buffer: &'static std::sync::Mutex<Vec<u8>> =
|
||||
Box::leak(Box::new(std::sync::Mutex::new(Vec::new())));
|
||||
let subscriber = tracing_subscriber::fmt()
|
||||
.with_level(true)
|
||||
.with_ansi(false)
|
||||
.with_max_level(Level::TRACE)
|
||||
.with_span_events(FmtSpan::FULL)
|
||||
.with_writer(MockWriter::new(buffer))
|
||||
.finish();
|
||||
let _guard = tracing::subscriber::set_default(subscriber);
|
||||
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
async {}
|
||||
.instrument(mcp_tool_call_span(
|
||||
&session,
|
||||
&turn_context,
|
||||
McpToolCallSpanFields {
|
||||
server_name: "rmcp",
|
||||
tool_name: "echo",
|
||||
call_id: "call-123",
|
||||
server_origin: Some("https://example.com:8443/mcp"),
|
||||
connector_id: Some("calendar"),
|
||||
connector_name: Some("Calendar"),
|
||||
},
|
||||
))
|
||||
.await;
|
||||
|
||||
let logs = String::from_utf8(buffer.lock().expect("buffer lock").clone()).expect("utf8 logs");
|
||||
assert!(
|
||||
logs.contains("mcp.tools.call{otel.kind=\"client\"")
|
||||
&& logs.contains("rpc.system=\"jsonrpc\"")
|
||||
&& logs.contains("rpc.method=\"tools/call\"")
|
||||
&& logs.contains("mcp.server.name=\"rmcp\"")
|
||||
&& logs.contains("mcp.server.origin=\"https://example.com:8443/mcp\"")
|
||||
&& logs.contains("mcp.transport=\"streamable_http\"")
|
||||
&& logs.contains("mcp.connector.id=\"calendar\"")
|
||||
&& logs.contains("mcp.connector.name=\"Calendar\"")
|
||||
&& logs.contains("tool.name=\"echo\"")
|
||||
&& logs.contains("tool.call_id=\"call-123\"")
|
||||
&& logs.contains("server.address=\"example.com\"")
|
||||
&& logs.contains("server.port=8443")
|
||||
&& logs.contains("conversation.id=")
|
||||
&& logs.contains("session.id=")
|
||||
&& logs.contains("turn.id="),
|
||||
"missing MCP tool span fields\nlogs:\n{logs}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn approval_elicitation_request_uses_message_override_and_preserves_tool_params_keys() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
Reference in New Issue
Block a user