Lift app-server JSON-RPC error handling to request boundary (#19484)

## Why

App-server request handling had a lot of repeated JSON-RPC error
construction and one-off `send_error`/`return` branches. This made small
handlers noisy and pushed error response details into leaf code that
otherwise only needed to validate input or call the underlying API.

## What Changed

- Added shared JSON-RPC error constructors in
`codex-rs/app-server/src/error_code.rs`.
- Lifted straightforward request result emission into
`codex-rs/app-server/src/message_processor.rs` so response/error
dispatch happens at the request boundary.
- Reused the result helpers across command exec, config, filesystem,
device-key, external-agent config, fs-watch, and outgoing-message paths.
- Removed leaf wrapper handlers where the method body was only
forwarding to a response helper.
- Returned request validation errors upward in the simple cases instead
of sending an error locally and immediately returning.

## Verification

- `cargo test -p codex-app-server --lib command_exec::tests`
- `cargo test -p codex-app-server --lib outgoing_message::tests`
- `cargo test -p codex-app-server --lib in_process::tests`
- `cargo test -p codex-app-server --test all v2::fs`
- `cargo test -p codex-app-server --test all v2::config_rpc`
- `cargo test -p codex-app-server --test all v2::external_agent_config`
- `cargo test -p codex-app-server --test all v2::initialize`
- `just fix -p codex-app-server`
- `git diff --check`

Note: full `cargo test -p codex-app-server` was attempted and stopped in
`message_processor::tracing_tests::turn_start_jsonrpc_span_parents_core_turn_spans`
with a stack overflow after unrelated tests had already passed.
This commit is contained in:
pakrym-oai
2026-04-26 15:10:35 -07:00
committed by GitHub
parent deaa307fb2
commit 2a020f1a0a
9 changed files with 270 additions and 525 deletions

View File

@@ -10,7 +10,7 @@ use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::config_api::ConfigApi;
use crate::config_manager::ConfigManager;
use crate::device_key_api::DeviceKeyApi;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::invalid_request;
use crate::external_agent_config_api::ExternalAgentConfigApi;
use crate::fs_api::FsApi;
use crate::fs_watch::FsWatchManager;
@@ -34,7 +34,6 @@ use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
use codex_app_server_protocol::ConfigValueWriteParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DeviceKeyCreateParams;
@@ -42,20 +41,10 @@ use codex_app_server_protocol::DeviceKeyPublicParams;
use codex_app_server_protocol::DeviceKeySignParams;
use codex_app_server_protocol::ExperimentalApi;
use codex_app_server_protocol::ExperimentalFeatureEnablementSetParams;
use codex_app_server_protocol::ExternalAgentConfigDetectParams;
use codex_app_server_protocol::ExternalAgentConfigImportCompletedNotification;
use codex_app_server_protocol::ExternalAgentConfigImportParams;
use codex_app_server_protocol::ExternalAgentConfigImportResponse;
use codex_app_server_protocol::ExternalAgentConfigMigrationItemType;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCreateDirectoryParams;
use codex_app_server_protocol::FsGetMetadataParams;
use codex_app_server_protocol::FsReadDirectoryParams;
use codex_app_server_protocol::FsReadFileParams;
use codex_app_server_protocol::FsRemoveParams;
use codex_app_server_protocol::FsUnwatchParams;
use codex_app_server_protocol::FsWatchParams;
use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::InitializeResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
@@ -390,43 +379,28 @@ impl MessageProcessor {
Arc::clone(&self.outgoing),
request_context.clone(),
async {
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
};
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(
request_id.clone(),
codex_request,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
let result = async {
let request_json = serde_json::to_value(&request)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))?;
let codex_request = serde_json::from_value::<ClientRequest>(request_json)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))?;
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(
request_id.clone(),
codex_request,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
.await
}
.await;
if let Err(error) = result {
self.outgoing.send_error(request_id.clone(), error).await;
}
},
)
.await;
@@ -463,14 +437,18 @@ impl MessageProcessor {
// In-process clients do not have the websocket transport loop that performs
// post-initialize bookkeeping, so they still finalize outbound readiness in
// the shared request handler.
self.handle_client_request(
request_id.clone(),
request,
Arc::clone(&session),
Some(outbound_initialized),
request_context.clone(),
)
.await;
let result = self
.handle_client_request(
request_id.clone(),
request,
Arc::clone(&session),
Some(outbound_initialized),
request_context.clone(),
)
.await;
if let Err(error) = result {
self.outgoing.send_error(request_id.clone(), error).await;
}
},
)
.await;
@@ -598,7 +576,7 @@ impl MessageProcessor {
// lib.rs can deliver connection-scoped initialize notifications first.
outbound_initialized: Option<&AtomicBool>,
request_context: RequestContext,
) {
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
if let ClientRequest::Initialize { request_id, params } = codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
@@ -608,13 +586,7 @@ impl MessageProcessor {
request_id,
};
if session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
return Err(invalid_request("Already initialized"));
}
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
@@ -642,17 +614,9 @@ impl MessageProcessor {
// Validate before committing; set_default_originator validates while
// mutating process-global metadata.
if HeaderValue::from_str(&name).is_err() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
return Err(invalid_request(format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
)));
}
let originator = name.clone();
let user_agent_suffix = format!("{name}; {version}");
@@ -668,13 +632,7 @@ impl MessageProcessor {
})
.is_err()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
return Err(invalid_request("Already initialized"));
}
// Only the request that wins session initialization may mutate
@@ -729,7 +687,7 @@ impl MessageProcessor {
.connection_initialized(connection_id)
.await;
}
return;
return Ok(());
}
self.dispatch_initialized_client_request(
@@ -738,7 +696,7 @@ impl MessageProcessor {
session,
request_context,
)
.await;
.await
}
async fn dispatch_initialized_client_request(
@@ -747,27 +705,15 @@ impl MessageProcessor {
codex_request: ClientRequest,
session: Arc<ConnectionSessionState>,
request_context: RequestContext,
) {
) -> Result<(), JSONRPCErrorError> {
if !session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
return Err(invalid_request("Not initialized"));
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
return Err(invalid_request(experimental_required_message(reason)));
}
let connection_id = connection_request_id.connection_id;
if self.config.features.enabled(Feature::GeneralAnalytics)
@@ -793,7 +739,7 @@ impl MessageProcessor {
client_version,
device_key_requests_allowed,
)
.await;
.await
}
async fn handle_initialized_client_request(
@@ -804,66 +750,48 @@ impl MessageProcessor {
app_server_client_name: Option<String>,
client_version: Option<String>,
device_key_requests_allowed: bool,
) {
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
let request_id_for_connection = |request_id| ConnectionRequestId {
connection_id,
request_id,
};
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.config_api.read(params).await,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.external_agent_config_api.detect(params).await,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
request_id_for_connection(request_id),
params,
)
.await;
.await?;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.handle_config_value_write(request_id_for_connection(request_id), params)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.handle_config_batch_write(request_id_for_connection(request_id), params)
.await;
}
ClientRequest::ExperimentalFeatureEnablementSet { request_id, params } => {
self.handle_experimental_feature_enablement_set(
ConnectionRequestId {
connection_id,
request_id,
},
request_id_for_connection(request_id),
params,
)
.await;
@@ -872,133 +800,105 @@ impl MessageProcessor {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.config_api.config_requirements_read().await,
)
.await;
}
ClientRequest::DeviceKeyCreate { request_id, params } => {
self.handle_device_key_create(
ConnectionRequestId {
connection_id,
request_id,
},
request_id_for_connection(request_id),
params,
device_key_requests_allowed,
);
}
ClientRequest::DeviceKeyPublic { request_id, params } => {
self.handle_device_key_public(
ConnectionRequestId {
connection_id,
request_id,
},
request_id_for_connection(request_id),
params,
device_key_requests_allowed,
);
}
ClientRequest::DeviceKeySign { request_id, params } => {
self.handle_device_key_sign(
ConnectionRequestId {
connection_id,
request_id,
},
request_id_for_connection(request_id),
params,
device_key_requests_allowed,
);
}
ClientRequest::FsReadFile { request_id, params } => {
self.handle_fs_read_file(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_file(params).await,
)
.await;
}
ClientRequest::FsWriteFile { request_id, params } => {
self.handle_fs_write_file(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.write_file(params).await,
)
.await;
}
ClientRequest::FsCreateDirectory { request_id, params } => {
self.handle_fs_create_directory(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.create_directory(params).await,
)
.await;
}
ClientRequest::FsGetMetadata { request_id, params } => {
self.handle_fs_get_metadata(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.get_metadata(params).await,
)
.await;
}
ClientRequest::FsReadDirectory { request_id, params } => {
self.handle_fs_read_directory(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_directory(params).await,
)
.await;
}
ClientRequest::FsRemove { request_id, params } => {
self.handle_fs_remove(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.remove(params).await,
)
.await;
}
ClientRequest::FsCopy { request_id, params } => {
self.handle_fs_copy(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.copy(params).await,
)
.await;
}
ClientRequest::FsWatch { request_id, params } => {
self.handle_fs_watch(
ConnectionRequestId {
connection_id,
request_id,
},
connection_id,
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.watch(connection_id, params).await,
)
.await;
}
ClientRequest::FsUnwatch { request_id, params } => {
self.handle_fs_unwatch(
ConnectionRequestId {
connection_id,
request_id,
},
connection_id,
params,
)
.await;
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.unwatch(connection_id, params).await,
)
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
@@ -1016,13 +916,7 @@ impl MessageProcessor {
.await;
}
}
}
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
Ok(())
}
async fn handle_config_value_write(
@@ -1167,13 +1061,6 @@ impl MessageProcessor {
}
}
async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) {
match self.config_api.config_requirements_read().await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
fn handle_device_key_create(
&self,
request_id: ConnectionRequestId,
@@ -1230,193 +1117,80 @@ impl MessageProcessor {
let device_key_api = self.device_key_api.clone();
let outgoing = Arc::clone(&self.outgoing);
tokio::spawn(async move {
if !device_key_requests_allowed {
outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("{method} is not available over remote transports"),
data: None,
},
)
.await;
return;
}
match run_request(device_key_api).await {
Ok(response) => outgoing.send_response(request_id, response).await,
Err(error) => outgoing.send_error(request_id, error).await,
let result = async {
if !device_key_requests_allowed {
return Err(invalid_request(format!(
"{method} is not available over remote transports"
)));
}
run_request(device_key_api).await
}
.await;
outgoing.send_result(request_id, result).await;
});
}
async fn handle_external_agent_config_detect(
&self,
request_id: ConnectionRequestId,
params: ExternalAgentConfigDetectParams,
) {
match self.external_agent_config_api.detect(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_external_agent_config_import(
&self,
request_id: ConnectionRequestId,
params: ExternalAgentConfigImportParams,
) {
) -> Result<(), JSONRPCErrorError> {
let has_plugin_imports = params.migration_items.iter().any(|item| {
matches!(
item.item_type,
ExternalAgentConfigMigrationItemType::Plugins
)
});
match self.external_agent_config_api.import(params).await {
Ok(pending_plugin_imports) => {
if has_plugin_imports {
self.handle_config_mutation().await;
}
self.outgoing
.send_response(request_id, ExternalAgentConfigImportResponse {})
.await;
if !has_plugin_imports {
return;
}
let pending_plugin_imports = self.external_agent_config_api.import(params).await?;
if has_plugin_imports {
self.handle_config_mutation().await;
}
self.outgoing
.send_response(request_id, ExternalAgentConfigImportResponse {})
.await;
if pending_plugin_imports.is_empty() {
self.outgoing
.send_server_notification(
ServerNotification::ExternalAgentConfigImportCompleted(
ExternalAgentConfigImportCompletedNotification {},
),
)
.await;
return;
}
if !has_plugin_imports {
return Ok(());
}
let external_agent_config_api = self.external_agent_config_api.clone();
let outgoing = Arc::clone(&self.outgoing);
let thread_manager = Arc::clone(&self.thread_manager);
tokio::spawn(async move {
for pending_plugin_import in pending_plugin_imports {
match external_agent_config_api
.complete_pending_plugin_import(pending_plugin_import)
.await
{
Ok(()) => {}
Err(error) => {
tracing::warn!(
error = %error.message,
"external agent config plugin import failed"
);
}
}
if pending_plugin_imports.is_empty() {
self.outgoing
.send_server_notification(ServerNotification::ExternalAgentConfigImportCompleted(
ExternalAgentConfigImportCompletedNotification {},
))
.await;
return Ok(());
}
let external_agent_config_api = self.external_agent_config_api.clone();
let outgoing = Arc::clone(&self.outgoing);
let thread_manager = Arc::clone(&self.thread_manager);
tokio::spawn(async move {
for pending_plugin_import in pending_plugin_imports {
match external_agent_config_api
.complete_pending_plugin_import(pending_plugin_import)
.await
{
Ok(()) => {}
Err(error) => {
tracing::warn!(
error = %error.message,
"external agent config plugin import failed"
);
}
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
outgoing
.send_server_notification(
ServerNotification::ExternalAgentConfigImportCompleted(
ExternalAgentConfigImportCompletedNotification {},
),
)
.await;
});
}
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
outgoing
.send_server_notification(ServerNotification::ExternalAgentConfigImportCompleted(
ExternalAgentConfigImportCompletedNotification {},
))
.await;
});
async fn handle_fs_read_file(&self, request_id: ConnectionRequestId, params: FsReadFileParams) {
match self.fs_api.read_file(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_write_file(
&self,
request_id: ConnectionRequestId,
params: FsWriteFileParams,
) {
match self.fs_api.write_file(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_create_directory(
&self,
request_id: ConnectionRequestId,
params: FsCreateDirectoryParams,
) {
match self.fs_api.create_directory(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_get_metadata(
&self,
request_id: ConnectionRequestId,
params: FsGetMetadataParams,
) {
match self.fs_api.get_metadata(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_read_directory(
&self,
request_id: ConnectionRequestId,
params: FsReadDirectoryParams,
) {
match self.fs_api.read_directory(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_remove(&self, request_id: ConnectionRequestId, params: FsRemoveParams) {
match self.fs_api.remove(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_copy(&self, request_id: ConnectionRequestId, params: FsCopyParams) {
match self.fs_api.copy(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_watch(
&self,
request_id: ConnectionRequestId,
connection_id: ConnectionId,
params: FsWatchParams,
) {
match self.fs_watch_manager.watch(connection_id, params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn handle_fs_unwatch(
&self,
request_id: ConnectionRequestId,
connection_id: ConnectionId,
params: FsUnwatchParams,
) {
match self.fs_watch_manager.unwatch(connection_id, params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
Ok(())
}
}