feat(app-server): add tracing to all app-server APIs (#13285)

### Overview
This PR adds the first piece of tracing for app-server JSON-RPC
requests.

There are two main changes:
- JSON-RPC requests can now take an optional W3C trace context at the
top level via a `trace` field (`traceparent` / `tracestate`).
- app-server now creates a dedicated request span for every inbound
JSON-RPC request in `MessageProcessor`, and uses the request-level trace
context as the parent when present.

For compatibility with existing flows, app-server still falls back to
the TRACEPARENT env var when there is no request-level traceparent.

This PR is intentionally scoped to the app-server boundary. In a
followup, we'll actually propagate trace context through the async
handoff into core execution spans like run_turn, which will make
app-server traces much more useful.

### Spans
A few details on the app-server span shape:
- each inbound request gets its own server span
- span/resource names are based on the JSON-RPC method (`initialize`,
`thread/start`, `turn/start`, etc.)
- spans record transport (stdio vs websocket), request id, connection
id, and client name/version when available
- `initialize` stores client metadata in session state so later requests
on the same connection can reuse it
This commit is contained in:
Owen Lin
2026-03-02 16:01:41 -08:00
committed by GitHub
parent 14fcb6645c
commit d473e8d56d
17 changed files with 464 additions and 147 deletions

View File

@@ -12,6 +12,7 @@ use crate::external_agent_config_api::ExternalAgentConfigApi;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::AppServerTransport;
use async_trait::async_trait;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
@@ -59,6 +60,7 @@ use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::Instrument;
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
@@ -141,6 +143,7 @@ pub(crate) struct ConnectionSessionState {
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) client_version: Option<String>,
}
pub(crate) struct MessageProcessorArgs {
@@ -224,46 +227,50 @@ impl MessageProcessor {
&mut self,
connection_id: ConnectionId,
request: JSONRPCRequest,
transport: AppServerTransport,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
async {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match codex_request {
match codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
@@ -304,6 +311,8 @@ impl MessageProcessor {
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
@@ -330,7 +339,6 @@ impl MessageProcessor {
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
session.app_server_client_name = Some(name.clone());
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
@@ -355,91 +363,97 @@ impl MessageProcessor {
return;
}
}
}
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(connection_id, other, session.app_server_client_name.clone())
.boxed()
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
)
.boxed()
.await;
}
}
}
.instrument(request_span)
.await;
}
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {