app-server: prepare to run initialized rpcs concurrently (#17372)

## Summary

- Refactors `MessageProcessor` and per-connection session state so
initialized service RPC handling can be moved into spawned tasks in a
follow-up PR.
- Shares the processor and initialized session data with
`Arc`/`OnceLock` instead of mutable borrowed connection state.
- Keeps initialized request handling synchronous in this PR; it does
**not** call `tokio::spawn` for service RPCs yet.

## Testing

- `just fmt`
- `cargo test -p codex-app-server` *(fails on existing hardening gaps
covered by #17375, #17376, and #17377; the pipelined config regression
passed before the unrelated failures)*
- `just fix -p codex-app-server`
This commit is contained in:
Ruslan Nigmatullin
2026-04-14 11:24:34 -07:00
committed by GitHub
parent 769b1c3d7e
commit 23d4098c0f
6 changed files with 282 additions and 168 deletions

View File

@@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::collections::HashSet;
use std::future::Future;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -20,6 +21,7 @@ use crate::outgoing_message::RequestContext;
use crate::transport::AppServerTransport;
use crate::transport::RemoteControlHandle;
use async_trait::async_trait;
use axum::http::HeaderValue;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppServerRpcTransport;
use codex_app_server_protocol::AppListUpdatedNotification;
@@ -173,13 +175,52 @@ pub(crate) struct MessageProcessor {
remote_control_handle: Option<RemoteControlHandle>,
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub(crate) struct ConnectionSessionState {
pub(crate) initialized: bool,
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) client_version: Option<String>,
initialized: OnceLock<InitializedConnectionSessionState>,
}
#[derive(Debug)]
struct InitializedConnectionSessionState {
experimental_api_enabled: bool,
opted_out_notification_methods: HashSet<String>,
app_server_client_name: String,
client_version: String,
}
impl ConnectionSessionState {
pub(crate) fn initialized(&self) -> bool {
self.initialized.get().is_some()
}
pub(crate) fn experimental_api_enabled(&self) -> bool {
self.initialized
.get()
.is_some_and(|session| session.experimental_api_enabled)
}
pub(crate) fn opted_out_notification_methods(&self) -> HashSet<String> {
self.initialized
.get()
.map(|session| session.opted_out_notification_methods.clone())
.unwrap_or_default()
}
pub(crate) fn app_server_client_name(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.app_server_client_name.as_str())
}
pub(crate) fn client_version(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.client_version.as_str())
}
fn initialize(&self, session: InitializedConnectionSessionState) -> Result<(), ()> {
self.initialized.set(session).map_err(|_| ())
}
}
pub(crate) struct MessageProcessorArgs {
@@ -299,11 +340,11 @@ impl MessageProcessor {
}
pub(crate) async fn process_request(
&self,
self: &Arc<Self>,
connection_id: ConnectionId,
request: JSONRPCRequest,
transport: AppServerTransport,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
) {
let request_method = request.method.as_str();
tracing::trace!(
@@ -316,7 +357,7 @@ impl MessageProcessor {
request_id: request.id.clone(),
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
crate::app_server_tracing::request_span(&request, transport, connection_id, &session);
let request_trace = request.trace.as_ref().map(|trace| W3cTraceContext {
traceparent: trace.traceparent.clone(),
tracestate: trace.tracestate.clone(),
@@ -358,7 +399,7 @@ impl MessageProcessor {
self.handle_client_request(
request_id.clone(),
codex_request,
session,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
@@ -373,10 +414,10 @@ impl MessageProcessor {
/// This bypasses JSON request deserialization but keeps identical request
/// semantics by delegating to `handle_client_request`.
pub(crate) async fn process_client_request(
&self,
self: &Arc<Self>,
connection_id: ConnectionId,
request: ClientRequest,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
@@ -384,7 +425,7 @@ impl MessageProcessor {
request_id: request.id().clone(),
};
let request_span =
crate::app_server_tracing::typed_request_span(&request, connection_id, session);
crate::app_server_tracing::typed_request_span(&request, connection_id, &session);
let request_context =
RequestContext::new(request_id.clone(), request_span, /*parent_trace*/ None);
tracing::trace!(
@@ -402,7 +443,7 @@ impl MessageProcessor {
self.handle_client_request(
request_id.clone(),
request,
session,
Arc::clone(&session),
Some(outbound_initialized),
request_context.clone(),
)
@@ -525,10 +566,10 @@ impl MessageProcessor {
}
async fn handle_client_request(
&self,
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
// `Some(...)` means the caller wants initialize to immediately mark the
// connection outbound-ready. Websocket JSON-RPC calls pass `None` so
// lib.rs can deliver connection-scoped initialize notifications first.
@@ -536,126 +577,166 @@ impl MessageProcessor {
request_context: RequestContext,
) {
let connection_id = connection_request_id.connection_id;
match codex_request {
if let ClientRequest::Initialize { request_id, params } = codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
let connection_request_id = ConnectionRequestId {
connection_id,
request_id,
let connection_request_id = ConnectionRequestId {
connection_id,
request_id,
};
if session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
if session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let analytics_initialize_params = params.clone();
let (experimental_api_enabled, opt_out_notification_methods) =
match params.capabilities {
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
session.experimental_api_enabled = experimental_api_enabled;
session.opted_out_notification_methods =
opt_out_notification_methods.into_iter().collect();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
let originator = name.clone();
if let Err(error) = set_default_originator(originator.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_initialize(
connection_id.0,
analytics_initialize_params,
originator,
self.rpc_transport,
);
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
let user_agent_suffix = format!("{name}; {version}");
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
let user_agent = get_codex_user_agent();
let response = InitializeResponse {
user_agent,
codex_home: self.config.codex_home.clone(),
platform_family: std::env::consts::FAMILY.to_string(),
platform_os: std::env::consts::OS.to_string(),
};
self.outgoing
.send_response(connection_request_id, response)
.await;
session.initialized = true;
if let Some(outbound_initialized) = outbound_initialized {
// In-process clients can complete readiness immediately here. The
// websocket path defers this until lib.rs finishes transport-layer
// initialize handling for the specific connection.
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
}
self.outgoing.send_error(connection_request_id, error).await;
return;
}
_ => {
if !session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let analytics_initialize_params = params.clone();
let (experimental_api_enabled, opt_out_notification_methods) = match params.capabilities
{
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
// Validate before committing; set_default_originator validates while
// mutating process-global metadata.
if HeaderValue::from_str(&name).is_err() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
}
let originator = name.clone();
let user_agent_suffix = format!("{name}; {version}");
let codex_home = self.config.codex_home.clone();
if session
.initialize(InitializedConnectionSessionState {
experimental_api_enabled,
opted_out_notification_methods: opt_out_notification_methods
.into_iter()
.collect(),
app_server_client_name: name.clone(),
client_version: version,
})
.is_err()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
// Only the request that wins session initialization may mutate
// process-global client metadata.
if let Err(error) = set_default_originator(originator.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
tracing::warn!(
client_info_name = %name,
"validated clientInfo.name was rejected while setting originator"
);
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_initialize(
connection_id.0,
analytics_initialize_params,
originator,
self.rpc_transport,
);
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
let user_agent = get_codex_user_agent();
let response = InitializeResponse {
user_agent,
codex_home,
platform_family: std::env::consts::FAMILY.to_string(),
platform_os: std::env::consts::OS.to_string(),
};
self.outgoing
.send_response(connection_request_id, response)
.await;
if let Some(outbound_initialized) = outbound_initialized {
// In-process clients can complete readiness immediately here. The
// websocket path defers this until lib.rs finishes transport-layer
// initialize handling for the specific connection.
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
}
return;
}
self.dispatch_initialized_client_request(
connection_request_id,
codex_request,
session,
request_context,
)
.await;
}
async fn dispatch_initialized_client_request(
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: Arc<ConnectionSessionState>,
request_context: RequestContext,
) {
if !session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
&& !session.experimental_api_enabled()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -676,6 +757,29 @@ impl MessageProcessor {
);
}
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
Arc::clone(self)
.handle_initialized_client_request(
connection_request_id,
codex_request,
request_context,
app_server_client_name,
client_version,
)
.await;
}
async fn handle_initialized_client_request(
self: Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
request_context: RequestContext,
app_server_client_name: Option<String>,
client_version: Option<String>,
) {
let connection_id = connection_request_id.connection_id;
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
@@ -847,8 +951,8 @@ impl MessageProcessor {
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
session.client_version.clone(),
app_server_client_name,
client_version,
request_context,
)
.boxed()