feat(app-server): propagate traces across tasks and core ops (#14387)

## Summary

This PR keeps app-server RPC request trace context alive for the full
lifetime of the work that request kicks off (e.g. for `thread/start`,
this is `app-server rpc handler -> tokio background task -> core op
submissions`). Previously we lose trace lineage once the request handler
returns or hands work off to background tasks.

This approach is especially relevant for `thread/start` and other RPC
handlers that run in a non-blocking way. In the near future we'll most
likely want to make all app-server handlers run in a non-blocking way by
default, and only queue operations that must operate in order (e.g.
thread RPCs per thread?), so we want to make sure tracing in app-server
just generally works.

Depends on https://github.com/openai/codex/pull/14300

**Before**
<img width="155" height="207" alt="image"
src="https://github.com/user-attachments/assets/c9487459-36f1-436c-beb7-fafeb40737af"
/>


**After**
<img width="299" height="337" alt="image"
src="https://github.com/user-attachments/assets/727392b2-d072-4427-9dc4-0502d8652dea"
/>

## What changed

- Keep request-scoped trace context around until we send the final
response or error, or the connection closes.
- Thread that trace context through detached `thread/start` work so
background startup stays attached to the originating request.
- Pass request trace context through to downstream core operations,
including:
  - thread creation
  - resume/fork flows
  - turn submission
  - review
  - interrupt
  - realtime conversation operations
- Add tracing tests that verify:
  - remote W3C trace context is preserved for `thread/start`
  - remote W3C trace context is preserved for `turn/start`
  - downstream core spans stay under the originating request span
  - request-scoped tracing state is cleaned up correctly
- Clean up shutdown behavior so detached background tasks and spawned
threads are drained before process exit.
This commit is contained in:
Owen Lin
2026-03-11 20:18:31 -07:00
committed by GitHub
parent bf5e997b31
commit 5bc82c5b93
24 changed files with 1524 additions and 308 deletions

View File

@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::future::Future;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
@@ -12,6 +13,7 @@ use crate::external_agent_config_api::ExternalAgentConfigApi;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::RequestContext;
use crate::transport::AppServerTransport;
use async_trait::async_trait;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
@@ -55,6 +57,7 @@ use codex_core::models_manager::collaboration_mode_presets::CollaborationModesCo
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_state::log_db::LogDbLayer;
use futures::FutureExt;
use tokio::sync::broadcast;
@@ -240,53 +243,66 @@ impl MessageProcessor {
transport: AppServerTransport,
session: &mut ConnectionSessionState,
) {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
async {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let request_trace = request.trace.as_ref().map(|trace| W3cTraceContext {
traceparent: trace.traceparent.clone(),
tracestate: trace.tracestate.clone(),
});
let request_context = RequestContext::new(request_id.clone(), request_span, request_trace);
Self::run_request_with_context(
Arc::clone(&self.outgoing),
request_context.clone(),
async {
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(connection_id, request_id, codex_request, session, None)
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
};
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(
request_id.clone(),
codex_request,
session,
None,
request_context.clone(),
)
.await;
}
.instrument(request_span)
},
)
.await;
}
@@ -301,31 +317,35 @@ impl MessageProcessor {
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id().clone(),
};
let request_span =
crate::app_server_tracing::typed_request_span(&request, connection_id, session);
async {
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id().clone(),
};
tracing::trace!(
?connection_id,
request_id = ?request_id.request_id,
"app-server typed request"
);
// In-process clients do not have the websocket transport loop that performs
// post-initialize bookkeeping, so they still finalize outbound readiness in
// the shared request handler.
self.handle_client_request(
connection_id,
request_id,
request,
session,
Some(outbound_initialized),
)
.await;
}
.instrument(request_span)
let request_context = RequestContext::new(request_id.clone(), request_span, None);
tracing::trace!(
?connection_id,
request_id = ?request_id.request_id,
"app-server typed request"
);
Self::run_request_with_context(
Arc::clone(&self.outgoing),
request_context.clone(),
async {
// In-process clients do not have the websocket transport loop that performs
// post-initialize bookkeeping, so they still finalize outbound readiness in
// the shared request handler.
self.handle_client_request(
request_id.clone(),
request,
session,
Some(outbound_initialized),
request_context.clone(),
)
.await;
},
)
.await;
}
@@ -342,6 +362,19 @@ impl MessageProcessor {
tracing::info!("<- typed notification: {:?}", notification);
}
async fn run_request_with_context<F>(
outgoing: Arc<OutgoingMessageSender>,
request_context: RequestContext,
request_fut: F,
) where
F: Future<Output = ()>,
{
outgoing
.register_request_context(request_context.clone())
.await;
request_fut.instrument(request_context.span()).await;
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.codex_message_processor.thread_created_receiver()
}
@@ -384,7 +417,16 @@ impl MessageProcessor {
.await;
}
pub(crate) async fn drain_background_tasks(&self) {
self.codex_message_processor.drain_background_tasks().await;
}
pub(crate) async fn shutdown_threads(&self) {
self.codex_message_processor.shutdown_threads().await;
}
pub(crate) async fn connection_closed(&mut self, connection_id: ConnectionId) {
self.outgoing.connection_closed(connection_id).await;
self.codex_message_processor
.connection_closed(connection_id)
.await;
@@ -410,20 +452,21 @@ impl MessageProcessor {
async fn handle_client_request(
&mut self,
connection_id: ConnectionId,
request_id: ConnectionRequestId,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: &mut ConnectionSessionState,
// `Some(...)` means the caller wants initialize to immediately mark the
// connection outbound-ready. Websocket JSON-RPC calls pass `None` so
// lib.rs can deliver connection-scoped initialize notifications first.
outbound_initialized: Option<&AtomicBool>,
request_context: RequestContext,
) {
let connection_id = connection_request_id.connection_id;
match codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
let request_id = ConnectionRequestId {
let connection_request_id = ConnectionRequestId {
connection_id,
request_id,
};
@@ -433,7 +476,7 @@ impl MessageProcessor {
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(connection_request_id, error).await;
return;
}
@@ -473,7 +516,9 @@ impl MessageProcessor {
),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
}
SetOriginatorError::AlreadyInitialized => {
@@ -492,7 +537,9 @@ impl MessageProcessor {
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(connection_request_id, response)
.await;
session.initialized = true;
if let Some(outbound_initialized) = outbound_initialized {
@@ -513,7 +560,7 @@ impl MessageProcessor {
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(connection_request_id, error).await;
return;
}
}
@@ -526,7 +573,7 @@ impl MessageProcessor {
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(connection_request_id, error).await;
return;
}
@@ -596,7 +643,12 @@ impl MessageProcessor {
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(connection_id, other, session.app_server_client_name.clone())
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
request_context,
)
.boxed()
.await;
}
@@ -673,3 +725,6 @@ impl MessageProcessor {
}
}
}
#[cfg(test)]
mod tracing_tests;