Compare commits

...

7 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
48d6d6a14c app-server: stabilize zsh fork response mocks
Motivation:
Zsh-fork tests assert command approval and execution behavior, but exact /responses request ordering and compressed mock body matching make them sensitive to background follow-up requests and request-compression details.

Summary:
Use content-matched response mocks for the relevant model requests, allow harmless follow-up responses where the test does not care about exact request count, and disable request compression in the zsh test config so body matchers inspect plain JSON.

Testing:
- cargo test -p codex-app-server
2026-04-09 23:11:57 -07:00
Ruslan Nigmatullin
3cd3d78454 app-server: assert disabled analytics event precisely
Motivation:
The thread initialization analytics negative test should only prove that codex_thread_initialized is gated off. Failing on any analytics request makes the test brittle when unrelated analytics traffic is emitted.

Summary:
Disable analytics through both relevant config paths for the negative case and assert no received analytics payload contains the thread-initialized event, rather than asserting no analytics payload exists at all.

Testing:
- cargo test -p codex-app-server --test all thread_start_does_not_track_thread_initialized_analytics_without_feature
- cargo test -p codex-app-server
2026-04-09 23:11:57 -07:00
Ruslan Nigmatullin
8a754a107e app-server: harden realtime tests against nondeterminism
Motivation:
Realtime tests were sensitive to deterministic JSON object key order in multipart request bodies and exact shell output in delegated command execution. Neither behavior is part of the contract those tests are intended to validate.

Summary:
Compare the multipart session payload as parsed JSON while preserving the SDP framing checks, and assert delegated shell output contains the expected command result instead of requiring an exact string match.

Testing:
- cargo test -p codex-app-server --test all realtime_webrtc_start_emits_sdp_notification
- cargo test -p codex-app-server --test all webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband
- cargo test -p codex-app-server --test all webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool
- cargo test -p codex-app-server
2026-04-09 23:11:57 -07:00
Ruslan Nigmatullin
7259925587 app-server: tolerate shell startup output in shell command tests
Motivation:
User-shell command tests can observe shell startup output from the local environment before or around the command output being asserted. Exact output equality makes these tests depend on machine-specific shell profile behavior.

Summary:
Update thread shell command tests to wait for output deltas containing the expected command output and to assert aggregated output contains the expected text instead of requiring exact equality.

Testing:
- cargo test -p codex-app-server --test all thread_shell_command_runs_as_standalone_turn_and_persists_history
- cargo test -p codex-app-server --test all thread_shell_command_uses_existing_active_turn
- cargo test -p codex-app-server
2026-04-09 23:11:28 -07:00
Ruslan Nigmatullin
3281a1cc5b app-server: serialize stateful rpcs by object key
Motivation:
Concurrent initialized request handling lets later RPCs for the same high-level object run before earlier RPCs publish their side effects. A bare per-key mutex would not guarantee arrival order once tasks are spawned, so same-key requests need an explicit FIFO queue.

Summary:
Add Rust-only serialization metadata to client_request_definitions!, expose ClientRequest::serialization_scope(), and implement a private app-server per-key FIFO dispatcher. The server maps logical protocol scopes to runtime queue keys, including connection-scoped command/exec process IDs and fs watch IDs, while leaving unkeyed requests concurrent.

Testing:
- cargo test -p codex-app-server-protocol client_request_serialization_scope
- cargo test -p codex-app-server request_serialization --lib
- cargo test -p codex-app-server-protocol
- cargo test -p codex-app-server
2026-04-09 23:11:28 -07:00
Ruslan Nigmatullin
1385a44433 app-server: run initialized rpcs concurrently
Motivation:
The app-server currently processes initialized JSON-RPC requests serially per processor loop, so slow requests can block unrelated work. To allow independent requests to make progress concurrently, request handling needs to be able to move into spawned tasks safely.

Summary:
Wrap the message processor and connection session state in Arcs, make initialized session data immutable after initialize with OnceLock-backed accessors, and spawn non-initialize request handling after the initialize and experimental-gating checks. Update tracing, in-process, websocket, and transport call sites to use the shared session accessors.

Testing:
- cargo test -p codex-app-server
2026-04-09 23:11:28 -07:00
Ruslan Nigmatullin
5894e019b6 app-server: add pipelined config rpc regression test
Motivation:
Concurrent request dispatch can reorder state-dependent RPCs. Config reads and writes are lightweight enough to serialize on a global config key, so the test suite should cover pipelined write/read behavior before adding the queue implementation.

Summary:
Add a config/value/write followed immediately by config/read integration test that asserts the read observes the written model value.

Testing:
- cargo test -p codex-app-server --test all config_read_after_pipelined_write_sees_written_value
- cargo test -p codex-app-server
2026-04-09 23:11:28 -07:00
13 changed files with 1044 additions and 252 deletions

View File

@@ -1,4 +1,5 @@
use std::path::Path;
use std::path::PathBuf;
use crate::JSONRPCNotification;
use crate::JSONRPCRequest;
@@ -8,6 +9,7 @@ use crate::export::write_json_schema;
use crate::protocol::v1;
use crate::protocol::v2;
use codex_experimental_api_macros::ExperimentalApi;
use codex_utils_absolute_path::AbsolutePathBuf;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
@@ -68,6 +70,96 @@ macro_rules! experimental_type_entry {
};
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientRequestSerializationScope {
Global(&'static str),
Thread {
thread_id: String,
},
ThreadPath {
path: PathBuf,
},
CommandExecProcess {
process_id: String,
},
FuzzyFileSearchSession {
session_id: String,
},
FsWatch {
watch_id: String,
},
Plugin {
marketplace_path: AbsolutePathBuf,
plugin_name: String,
},
PluginId {
plugin_id: String,
},
McpOauth {
server_name: String,
},
}
macro_rules! serialization_scope_expr {
($actual_params:ident) => {
None
};
($actual_params:ident, global($key:literal)) => {
Some(ClientRequestSerializationScope::Global($key))
};
($actual_params:ident, thread_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::Thread {
thread_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, thread_or_path($params:ident . $thread_field:ident, $params2:ident . $path_field:ident)) => {
if let Some(path) = $actual_params.$path_field.clone() {
Some(ClientRequestSerializationScope::ThreadPath { path })
} else {
Some(ClientRequestSerializationScope::Thread {
thread_id: $actual_params.$thread_field.clone(),
})
}
};
($actual_params:ident, optional_command_process_id($params:ident . $field:ident)) => {
$actual_params
.$field
.clone()
.map(|process_id| ClientRequestSerializationScope::CommandExecProcess { process_id })
};
($actual_params:ident, command_process_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::CommandExecProcess {
process_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, fuzzy_session_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::FuzzyFileSearchSession {
session_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, fs_watch_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::FsWatch {
watch_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, plugin($params:ident . $marketplace_field:ident, $params2:ident . $plugin_field:ident)) => {
Some(ClientRequestSerializationScope::Plugin {
marketplace_path: $actual_params.$marketplace_field.clone(),
plugin_name: $actual_params.$plugin_field.clone(),
})
};
($actual_params:ident, plugin_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::PluginId {
plugin_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, mcp_oauth_server($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::McpOauth {
server_name: $actual_params.$field.clone(),
})
};
}
/// Generates an `enum ClientRequest` where each variant is a request that the
/// client can send to the server. Each variant has associated `params` and
/// `response` types. Also generates a `export_client_responses()` function to
@@ -80,6 +172,7 @@ macro_rules! client_request_definitions {
$variant:ident $(=> $wire:literal)? {
params: $(#[$params_meta:meta])* $params:ty,
$(inspect_params: $inspect_params:tt,)?
$(serialization: $serialization:ident ( $($serialization_args:tt)* ),)?
response: $response:ty,
}
),* $(,)?
@@ -118,6 +211,19 @@ macro_rules! client_request_definitions {
})
.unwrap_or_else(|| "<unknown>".to_string())
}
pub fn serialization_scope(&self) -> Option<ClientRequestSerializationScope> {
match self {
$(
Self::$variant { params, .. } => {
let _ = params;
serialization_scope_expr!(
params $(, $serialization($($serialization_args)*))?
)
}
)*
}
}
}
/// Typed response from the server to the client.
@@ -244,19 +350,23 @@ client_request_definitions! {
ThreadResume => "thread/resume" {
params: v2::ThreadResumeParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::ThreadResumeResponse,
},
ThreadFork => "thread/fork" {
params: v2::ThreadForkParams,
inspect_params: true,
serialization: thread_or_path(params.thread_id, params.path),
response: v2::ThreadForkResponse,
},
ThreadArchive => "thread/archive" {
params: v2::ThreadArchiveParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadArchiveResponse,
},
ThreadUnsubscribe => "thread/unsubscribe" {
params: v2::ThreadUnsubscribeParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadUnsubscribeResponse,
},
#[experimental("thread/increment_elicitation")]
@@ -266,6 +376,7 @@ client_request_definitions! {
/// approval or other elicitation is pending outside the app-server request flow.
ThreadIncrementElicitation => "thread/increment_elicitation" {
params: v2::ThreadIncrementElicitationParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadIncrementElicitationResponse,
},
#[experimental("thread/decrement_elicitation")]
@@ -274,26 +385,32 @@ client_request_definitions! {
/// When the count reaches zero, timeout accounting resumes for the thread.
ThreadDecrementElicitation => "thread/decrement_elicitation" {
params: v2::ThreadDecrementElicitationParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadDecrementElicitationResponse,
},
ThreadSetName => "thread/name/set" {
params: v2::ThreadSetNameParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadSetNameResponse,
},
ThreadMetadataUpdate => "thread/metadata/update" {
params: v2::ThreadMetadataUpdateParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadMetadataUpdateResponse,
},
ThreadUnarchive => "thread/unarchive" {
params: v2::ThreadUnarchiveParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadUnarchiveResponse,
},
ThreadCompactStart => "thread/compact/start" {
params: v2::ThreadCompactStartParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadCompactStartResponse,
},
ThreadShellCommand => "thread/shellCommand" {
params: v2::ThreadShellCommandParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadShellCommandResponse,
},
ThreadAddCreditsNudgeEmail => "thread/addCreditsNudgeEmail" {
@@ -303,10 +420,12 @@ client_request_definitions! {
#[experimental("thread/backgroundTerminals/clean")]
ThreadBackgroundTerminalsClean => "thread/backgroundTerminals/clean" {
params: v2::ThreadBackgroundTerminalsCleanParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadBackgroundTerminalsCleanResponse,
},
ThreadRollback => "thread/rollback" {
params: v2::ThreadRollbackParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRollbackResponse,
},
ThreadList => "thread/list" {
@@ -319,6 +438,7 @@ client_request_definitions! {
},
ThreadRead => "thread/read" {
params: v2::ThreadReadParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadReadResponse,
},
SkillsList => "skills/list" {
@@ -367,56 +487,68 @@ client_request_definitions! {
},
FsWatch => "fs/watch" {
params: v2::FsWatchParams,
serialization: fs_watch_id(params.watch_id),
response: v2::FsWatchResponse,
},
FsUnwatch => "fs/unwatch" {
params: v2::FsUnwatchParams,
serialization: fs_watch_id(params.watch_id),
response: v2::FsUnwatchResponse,
},
SkillsConfigWrite => "skills/config/write" {
params: v2::SkillsConfigWriteParams,
serialization: global("skills-config"),
response: v2::SkillsConfigWriteResponse,
},
PluginInstall => "plugin/install" {
params: v2::PluginInstallParams,
serialization: plugin(params.marketplace_path, params.plugin_name),
response: v2::PluginInstallResponse,
},
PluginUninstall => "plugin/uninstall" {
params: v2::PluginUninstallParams,
serialization: plugin_id(params.plugin_id),
response: v2::PluginUninstallResponse,
},
TurnStart => "turn/start" {
params: v2::TurnStartParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::TurnStartResponse,
},
TurnSteer => "turn/steer" {
params: v2::TurnSteerParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::TurnSteerResponse,
},
TurnInterrupt => "turn/interrupt" {
params: v2::TurnInterruptParams,
serialization: thread_id(params.thread_id),
response: v2::TurnInterruptResponse,
},
#[experimental("thread/realtime/start")]
ThreadRealtimeStart => "thread/realtime/start" {
params: v2::ThreadRealtimeStartParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeStartResponse,
},
#[experimental("thread/realtime/appendAudio")]
ThreadRealtimeAppendAudio => "thread/realtime/appendAudio" {
params: v2::ThreadRealtimeAppendAudioParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendAudioResponse,
},
#[experimental("thread/realtime/appendText")]
ThreadRealtimeAppendText => "thread/realtime/appendText" {
params: v2::ThreadRealtimeAppendTextParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendTextResponse,
},
#[experimental("thread/realtime/stop")]
ThreadRealtimeStop => "thread/realtime/stop" {
params: v2::ThreadRealtimeStopParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeStopResponse,
},
#[experimental("thread/realtime/listVoices")]
@@ -426,6 +558,7 @@ client_request_definitions! {
},
ReviewStart => "review/start" {
params: v2::ReviewStartParams,
serialization: thread_id(params.thread_id),
response: v2::ReviewStartResponse,
},
@@ -439,6 +572,7 @@ client_request_definitions! {
},
ExperimentalFeatureEnablementSet => "experimentalFeature/enablement/set" {
params: v2::ExperimentalFeatureEnablementSetParams,
serialization: global("config"),
response: v2::ExperimentalFeatureEnablementSetResponse,
},
#[experimental("collaborationMode/list")]
@@ -456,42 +590,50 @@ client_request_definitions! {
McpServerOauthLogin => "mcpServer/oauth/login" {
params: v2::McpServerOauthLoginParams,
serialization: mcp_oauth_server(params.name),
response: v2::McpServerOauthLoginResponse,
},
McpServerRefresh => "config/mcpServer/reload" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("mcp-registry"),
response: v2::McpServerRefreshResponse,
},
McpServerStatusList => "mcpServerStatus/list" {
params: v2::ListMcpServerStatusParams,
serialization: global("mcp-registry"),
response: v2::ListMcpServerStatusResponse,
},
McpResourceRead => "mcpServer/resource/read" {
params: v2::McpResourceReadParams,
serialization: thread_id(params.thread_id),
response: v2::McpResourceReadResponse,
},
WindowsSandboxSetupStart => "windowsSandbox/setupStart" {
params: v2::WindowsSandboxSetupStartParams,
serialization: global("windows-sandbox-setup"),
response: v2::WindowsSandboxSetupStartResponse,
},
LoginAccount => "account/login/start" {
params: v2::LoginAccountParams,
inspect_params: true,
serialization: global("account-auth"),
response: v2::LoginAccountResponse,
},
CancelLoginAccount => "account/login/cancel" {
params: v2::CancelLoginAccountParams,
serialization: global("account-auth"),
response: v2::CancelLoginAccountResponse,
},
LogoutAccount => "account/logout" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("account-auth"),
response: v2::LogoutAccountResponse,
},
@@ -508,26 +650,31 @@ client_request_definitions! {
/// Execute a standalone command (argv vector) under the server's sandbox.
OneOffCommandExec => "command/exec" {
params: v2::CommandExecParams,
serialization: optional_command_process_id(params.process_id),
response: v2::CommandExecResponse,
},
/// Write stdin bytes to a running `command/exec` session or close stdin.
CommandExecWrite => "command/exec/write" {
params: v2::CommandExecWriteParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecWriteResponse,
},
/// Terminate a running `command/exec` session by client-supplied `processId`.
CommandExecTerminate => "command/exec/terminate" {
params: v2::CommandExecTerminateParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecTerminateResponse,
},
/// Resize a running PTY-backed `command/exec` session by client-supplied `processId`.
CommandExecResize => "command/exec/resize" {
params: v2::CommandExecResizeParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecResizeResponse,
},
ConfigRead => "config/read" {
params: v2::ConfigReadParams,
serialization: global("config"),
response: v2::ConfigReadResponse,
},
ExternalAgentConfigDetect => "externalAgentConfig/detect" {
@@ -536,24 +683,29 @@ client_request_definitions! {
},
ExternalAgentConfigImport => "externalAgentConfig/import" {
params: v2::ExternalAgentConfigImportParams,
serialization: global("external-agent-config-import"),
response: v2::ExternalAgentConfigImportResponse,
},
ConfigValueWrite => "config/value/write" {
params: v2::ConfigValueWriteParams,
serialization: global("config"),
response: v2::ConfigWriteResponse,
},
ConfigBatchWrite => "config/batchWrite" {
params: v2::ConfigBatchWriteParams,
serialization: global("config"),
response: v2::ConfigWriteResponse,
},
ConfigRequirementsRead => "configRequirements/read" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("config"),
response: v2::ConfigRequirementsReadResponse,
},
GetAccount => "account/read" {
params: v2::GetAccountParams,
serialization: global("account-auth"),
response: v2::GetAccountResponse,
},
@@ -569,6 +721,7 @@ client_request_definitions! {
/// DEPRECATED in favor of GetAccount
GetAuthStatus {
params: v1::GetAuthStatusParams,
serialization: global("account-auth"),
response: v1::GetAuthStatusResponse,
},
FuzzyFileSearch {
@@ -578,16 +731,19 @@ client_request_definitions! {
#[experimental("fuzzyFileSearch/sessionStart")]
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
params: FuzzyFileSearchSessionStartParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionStartResponse,
},
#[experimental("fuzzyFileSearch/sessionUpdate")]
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
params: FuzzyFileSearchSessionUpdateParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionUpdateResponse,
},
#[experimental("fuzzyFileSearch/sessionStop")]
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
params: FuzzyFileSearchSessionStopParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionStopResponse,
},
}
@@ -1064,6 +1220,213 @@ mod tests {
AbsolutePathBuf::from_absolute_path(absolute_path_string(path)).expect("absolute path")
}
fn request_id() -> RequestId {
RequestId::Integer(1)
}
#[test]
fn client_request_serialization_scope_covers_keyed_families() {
let thread_id = "thread-1".to_string();
let thread_resume = ClientRequest::ThreadResume {
request_id: request_id(),
params: v2::ThreadResumeParams {
thread_id: thread_id.clone(),
..Default::default()
},
};
assert_eq!(
thread_resume.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: thread_id.clone()
})
);
let fork_path = PathBuf::from("/tmp/source-thread.jsonl");
let thread_fork = ClientRequest::ThreadFork {
request_id: request_id(),
params: v2::ThreadForkParams {
thread_id: thread_id,
path: Some(fork_path.clone()),
..Default::default()
},
};
assert_eq!(
thread_fork.serialization_scope(),
Some(ClientRequestSerializationScope::ThreadPath { path: fork_path })
);
let command_exec = ClientRequest::OneOffCommandExec {
request_id: request_id(),
params: v2::CommandExecParams {
command: vec!["sleep".to_string(), "10".to_string()],
process_id: Some("proc-1".to_string()),
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
},
};
assert_eq!(
command_exec.serialization_scope(),
Some(ClientRequestSerializationScope::CommandExecProcess {
process_id: "proc-1".to_string()
})
);
let fuzzy_update = ClientRequest::FuzzyFileSearchSessionUpdate {
request_id: request_id(),
params: FuzzyFileSearchSessionUpdateParams {
session_id: "search-1".to_string(),
query: "lib".to_string(),
},
};
assert_eq!(
fuzzy_update.serialization_scope(),
Some(ClientRequestSerializationScope::FuzzyFileSearchSession {
session_id: "search-1".to_string()
})
);
let fs_watch = ClientRequest::FsWatch {
request_id: request_id(),
params: v2::FsWatchParams {
watch_id: "watch-1".to_string(),
path: absolute_path("/tmp/repo"),
},
};
assert_eq!(
fs_watch.serialization_scope(),
Some(ClientRequestSerializationScope::FsWatch {
watch_id: "watch-1".to_string()
})
);
let plugin_install = ClientRequest::PluginInstall {
request_id: request_id(),
params: v2::PluginInstallParams {
marketplace_path: absolute_path("/tmp/marketplace"),
plugin_name: "plugin-a".to_string(),
force_remote_sync: false,
},
};
assert_eq!(
plugin_install.serialization_scope(),
Some(ClientRequestSerializationScope::Plugin {
marketplace_path: absolute_path("/tmp/marketplace"),
plugin_name: "plugin-a".to_string(),
})
);
let plugin_uninstall = ClientRequest::PluginUninstall {
request_id: request_id(),
params: v2::PluginUninstallParams {
plugin_id: "plugin-a".to_string(),
force_remote_sync: false,
},
};
assert_eq!(
plugin_uninstall.serialization_scope(),
Some(ClientRequestSerializationScope::PluginId {
plugin_id: "plugin-a".to_string()
})
);
let mcp_oauth = ClientRequest::McpServerOauthLogin {
request_id: request_id(),
params: v2::McpServerOauthLoginParams {
name: "server-a".to_string(),
scopes: None,
timeout_secs: None,
},
};
assert_eq!(
mcp_oauth.serialization_scope(),
Some(ClientRequestSerializationScope::McpOauth {
server_name: "server-a".to_string()
})
);
let config_read = ClientRequest::ConfigRead {
request_id: request_id(),
params: v2::ConfigReadParams {
include_layers: false,
cwd: None,
},
};
assert_eq!(
config_read.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let account_read = ClientRequest::GetAccount {
request_id: request_id(),
params: v2::GetAccountParams {
refresh_token: false,
},
};
assert_eq!(
account_read.serialization_scope(),
Some(ClientRequestSerializationScope::Global("account-auth"))
);
}
#[test]
fn client_request_serialization_scope_covers_unkeyed_representatives() {
let initialize = ClientRequest::Initialize {
request_id: request_id(),
params: v1::InitializeParams {
client_info: v1::ClientInfo {
name: "test".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
};
assert_eq!(initialize.serialization_scope(), None);
let thread_start = ClientRequest::ThreadStart {
request_id: request_id(),
params: v2::ThreadStartParams::default(),
};
assert_eq!(thread_start.serialization_scope(), None);
let command_exec = ClientRequest::OneOffCommandExec {
request_id: request_id(),
params: v2::CommandExecParams {
command: vec!["true".to_string()],
process_id: None,
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
},
};
assert_eq!(command_exec.serialization_scope(), None);
let fs_read = ClientRequest::FsReadFile {
request_id: request_id(),
params: v2::FsReadFileParams {
path: absolute_path("/tmp/file.txt"),
},
};
assert_eq!(fs_read.serialization_scope(), None);
}
#[test]
fn serialize_get_conversation_summary() -> Result<()> {
let request = ClientRequest::GetConversationSummary {

View File

@@ -72,10 +72,10 @@ pub(crate) fn typed_request_span(
&span,
client_info
.map(|(client_name, _)| client_name)
.or(session.app_server_client_name.as_deref()),
.or(session.app_server_client_name()),
client_info
.map(|(_, client_version)| client_version)
.or(session.client_version.as_deref()),
.or(session.client_version()),
);
attach_parent_context(&span, &method, request.id(), /*parent_trace*/ None);
@@ -147,7 +147,7 @@ fn client_name<'a>(
if let Some(params) = initialize_client_info {
return Some(params.client_info.name.as_str());
}
session.app_server_client_name.as_deref()
session.app_server_client_name()
}
fn client_version<'a>(
@@ -157,7 +157,7 @@ fn client_version<'a>(
if let Some(params) = initialize_client_info {
return Some(params.client_info.version.as_str());
}
session.client_version.as_deref()
session.client_version()
}
fn initialize_client_info(request: &JSONRPCRequest) -> Option<InitializeParams> {

View File

@@ -386,7 +386,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let processor = MessageProcessor::new(MessageProcessorArgs {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
arg0_paths: args.arg0_paths,
config: args.config,
@@ -401,9 +401,9 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
auth_manager,
rpc_transport: AppServerRpcTransport::InProcess,
remote_control_handle: None,
});
}));
let mut thread_created_rx = processor.thread_created_receiver();
let mut session = ConnectionSessionState::default();
let session = Arc::new(ConnectionSessionState::default());
let mut listen_for_threads = true;
loop {
@@ -411,28 +411,33 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
command = processor_rx.recv() => {
match command {
Some(ProcessorCommand::Request(request)) => {
let was_initialized = session.initialized;
let was_initialized = session.initialized();
processor
.process_client_request(
IN_PROCESS_CONNECTION_ID,
*request,
&mut session,
Arc::clone(&session),
&outbound_initialized,
)
.await;
let opted_out_notification_methods_snapshot =
session.opted_out_notification_methods();
let experimental_api_enabled =
session.experimental_api_enabled();
let is_initialized = session.initialized();
if let Ok(mut opted_out_notification_methods) =
outbound_opted_out_notification_methods.write()
{
*opted_out_notification_methods =
session.opted_out_notification_methods.clone();
opted_out_notification_methods_snapshot;
} else {
warn!("failed to update outbound opted-out notifications");
}
outbound_experimental_api_enabled.store(
session.experimental_api_enabled,
experimental_api_enabled,
Ordering::Release,
);
if !was_initialized && session.initialized {
if !was_initialized && is_initialized {
processor.send_initialize_notifications().await;
}
}
@@ -447,7 +452,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
let connection_ids = if session.initialized {
let connection_ids = if session.initialized() {
vec![IN_PROCESS_CONNECTION_ID]
} else {
Vec::<ConnectionId>::new()

View File

@@ -79,6 +79,7 @@ pub mod in_process;
mod message_processor;
mod models;
mod outgoing_message;
mod request_serialization;
mod server_request_error;
mod thread_state;
mod thread_status;
@@ -646,7 +647,7 @@ pub async fn run_main_with_transport(
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
let loader_overrides = loader_overrides_for_config_api;
let processor = MessageProcessor::new(MessageProcessorArgs {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
arg0_paths,
config: Arc::new(config),
@@ -661,7 +662,7 @@ pub async fn run_main_with_transport(
auth_manager,
rpc_transport: analytics_rpc_transport(transport),
remote_control_handle: Some(remote_control_handle),
});
}));
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
@@ -763,23 +764,28 @@ pub async fn run_main_with_transport(
warn!("dropping request from unknown connection: {connection_id:?}");
continue;
};
let was_initialized = connection_state.session.initialized;
let was_initialized =
connection_state.session.initialized();
processor
.process_request(
connection_id,
request,
transport,
&mut connection_state.session,
Arc::clone(&connection_state.session),
)
.await;
let opted_out_notification_methods_snapshot = connection_state
.session
.opted_out_notification_methods();
let experimental_api_enabled =
connection_state.session.experimental_api_enabled();
let is_initialized = connection_state.session.initialized();
if let Ok(mut opted_out_notification_methods) = connection_state
.outbound_opted_out_notification_methods
.write()
{
*opted_out_notification_methods = connection_state
.session
.opted_out_notification_methods
.clone();
*opted_out_notification_methods =
opted_out_notification_methods_snapshot;
} else {
warn!(
"failed to update outbound opted-out notifications"
@@ -788,10 +794,10 @@ pub async fn run_main_with_transport(
connection_state
.outbound_experimental_api_enabled
.store(
connection_state.session.experimental_api_enabled,
experimental_api_enabled,
std::sync::atomic::Ordering::Release,
);
if !was_initialized && connection_state.session.initialized {
if !was_initialized && is_initialized {
processor
.send_initialize_notifications_to_connection(
connection_id,
@@ -831,12 +837,12 @@ pub async fn run_main_with_transport(
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
let initialized_connection_ids: Vec<ConnectionId> = connections
.iter()
.filter_map(|(connection_id, connection_state)| {
connection_state.session.initialized.then_some(*connection_id)
})
.collect();
let mut initialized_connection_ids = Vec::new();
for (connection_id, connection_state) in &connections {
if connection_state.session.initialized() {
initialized_connection_ids.push(*connection_id);
}
}
processor
.try_attach_thread_listener(
thread_id,

View File

@@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::collections::HashSet;
use std::future::Future;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -18,6 +19,9 @@ use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::RequestContext;
use crate::request_serialization::QueuedInitializedRequest;
use crate::request_serialization::RequestSerializationQueueKey;
use crate::request_serialization::RequestSerializationQueues;
use crate::transport::AppServerTransport;
use crate::transport::RemoteControlHandle;
use async_trait::async_trait;
@@ -172,15 +176,55 @@ pub(crate) struct MessageProcessor {
config_warnings: Arc<Vec<ConfigWarningNotification>>,
rpc_transport: AppServerRpcTransport,
remote_control_handle: Option<RemoteControlHandle>,
request_serialization_queues: RequestSerializationQueues,
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub(crate) struct ConnectionSessionState {
pub(crate) initialized: bool,
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) client_version: Option<String>,
initialized: OnceLock<InitializedConnectionSessionState>,
}
#[derive(Debug)]
struct InitializedConnectionSessionState {
experimental_api_enabled: bool,
opted_out_notification_methods: HashSet<String>,
app_server_client_name: String,
client_version: String,
}
impl ConnectionSessionState {
pub(crate) fn initialized(&self) -> bool {
self.initialized.get().is_some()
}
pub(crate) fn experimental_api_enabled(&self) -> bool {
self.initialized
.get()
.is_some_and(|session| session.experimental_api_enabled)
}
pub(crate) fn opted_out_notification_methods(&self) -> HashSet<String> {
self.initialized
.get()
.map(|session| session.opted_out_notification_methods.clone())
.unwrap_or_default()
}
pub(crate) fn app_server_client_name(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.app_server_client_name.as_str())
}
pub(crate) fn client_version(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.client_version.as_str())
}
fn initialize(&self, session: InitializedConnectionSessionState) -> Result<(), ()> {
self.initialized.set(session).map_err(|_| ())
}
}
pub(crate) struct MessageProcessorArgs {
@@ -290,6 +334,7 @@ impl MessageProcessor {
config_warnings: Arc::new(config_warnings),
rpc_transport,
remote_control_handle,
request_serialization_queues: RequestSerializationQueues::default(),
}
}
@@ -298,11 +343,11 @@ impl MessageProcessor {
}
pub(crate) async fn process_request(
&self,
self: &Arc<Self>,
connection_id: ConnectionId,
request: JSONRPCRequest,
transport: AppServerTransport,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
) {
let request_method = request.method.as_str();
tracing::trace!(
@@ -315,7 +360,7 @@ impl MessageProcessor {
request_id: request.id.clone(),
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
crate::app_server_tracing::request_span(&request, transport, connection_id, &session);
let request_trace = request.trace.as_ref().map(|trace| W3cTraceContext {
traceparent: trace.traceparent.clone(),
tracestate: trace.tracestate.clone(),
@@ -357,7 +402,7 @@ impl MessageProcessor {
self.handle_client_request(
request_id.clone(),
codex_request,
session,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
@@ -372,10 +417,10 @@ impl MessageProcessor {
/// This bypasses JSON request deserialization but keeps identical request
/// semantics by delegating to `handle_client_request`.
pub(crate) async fn process_client_request(
&self,
self: &Arc<Self>,
connection_id: ConnectionId,
request: ClientRequest,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
@@ -383,7 +428,7 @@ impl MessageProcessor {
request_id: request.id().clone(),
};
let request_span =
crate::app_server_tracing::typed_request_span(&request, connection_id, session);
crate::app_server_tracing::typed_request_span(&request, connection_id, &session);
let request_context =
RequestContext::new(request_id.clone(), request_span, /*parent_trace*/ None);
tracing::trace!(
@@ -401,7 +446,7 @@ impl MessageProcessor {
self.handle_client_request(
request_id.clone(),
request,
session,
Arc::clone(&session),
Some(outbound_initialized),
request_context.clone(),
)
@@ -524,10 +569,10 @@ impl MessageProcessor {
}
async fn handle_client_request(
&self,
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: &mut ConnectionSessionState,
session: Arc<ConnectionSessionState>,
// `Some(...)` means the caller wants initialize to immediately mark the
// connection outbound-ready. Websocket JSON-RPC calls pass `None` so
// lib.rs can deliver connection-scoped initialize notifications first.
@@ -535,138 +580,166 @@ impl MessageProcessor {
request_context: RequestContext,
) {
let connection_id = connection_request_id.connection_id;
match codex_request {
if let ClientRequest::Initialize { request_id, params } = codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
let connection_request_id = ConnectionRequestId {
connection_id,
request_id,
let connection_request_id = ConnectionRequestId {
connection_id,
request_id,
};
if session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
if session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let analytics_initialize_params = params.clone();
let (experimental_api_enabled, opt_out_notification_methods) =
match params.capabilities {
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
session.experimental_api_enabled = experimental_api_enabled;
session.opted_out_notification_methods =
opt_out_notification_methods.into_iter().collect();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
let originator = name.clone();
if let Err(error) = set_default_originator(originator.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_initialize(
connection_id.0,
analytics_initialize_params,
originator,
self.rpc_transport,
);
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
let user_agent_suffix = format!("{name}; {version}");
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
let user_agent = get_codex_user_agent();
let codex_home = match self.config.codex_home.clone().try_into() {
Ok(codex_home) => codex_home,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("Invalid CODEX_HOME: {err}"),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
};
let response = InitializeResponse {
user_agent,
codex_home,
platform_family: std::env::consts::FAMILY.to_string(),
platform_os: std::env::consts::OS.to_string(),
};
self.outgoing
.send_response(connection_request_id, response)
.await;
session.initialized = true;
if let Some(outbound_initialized) = outbound_initialized {
// In-process clients can complete readiness immediately here. The
// websocket path defers this until lib.rs finishes transport-layer
// initialize handling for the specific connection.
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
}
self.outgoing.send_error(connection_request_id, error).await;
return;
}
_ => {
if !session.initialized {
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let analytics_initialize_params = params.clone();
let (experimental_api_enabled, opt_out_notification_methods) = match params.capabilities
{
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
let originator = name.clone();
if let Err(error) = set_default_originator(originator.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing
.send_error(connection_request_id.clone(), error)
.await;
return;
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
if self.config.features.enabled(Feature::GeneralAnalytics) {
self.analytics_events_client.track_initialize(
connection_id.0,
analytics_initialize_params,
originator,
self.rpc_transport,
);
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
let user_agent_suffix = format!("{name}; {version}");
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
let user_agent = get_codex_user_agent();
let codex_home = match self.config.codex_home.clone().try_into() {
Ok(codex_home) => codex_home,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
code: INTERNAL_ERROR_CODE,
message: format!("Invalid CODEX_HOME: {err}"),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
};
let response = InitializeResponse {
user_agent,
codex_home,
platform_family: std::env::consts::FAMILY.to_string(),
platform_os: std::env::consts::OS.to_string(),
};
if session
.initialize(InitializedConnectionSessionState {
experimental_api_enabled,
opted_out_notification_methods: opt_out_notification_methods
.into_iter()
.collect(),
app_server_client_name: name.clone(),
client_version: version,
})
.is_err()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
self.outgoing
.send_response(connection_request_id, response)
.await;
if let Some(outbound_initialized) = outbound_initialized {
// In-process clients can complete readiness immediately here. The
// websocket path defers this until lib.rs finishes transport-layer
// initialize handling for the specific connection.
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
}
return;
}
self.dispatch_initialized_client_request(
connection_request_id,
codex_request,
session,
request_context,
)
.await;
}
async fn dispatch_initialized_client_request(
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: Arc<ConnectionSessionState>,
request_context: RequestContext,
) {
if !session.initialized() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(connection_request_id, error).await;
return;
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
&& !session.experimental_api_enabled()
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -677,6 +750,49 @@ impl MessageProcessor {
return;
}
let serialization_scope = codex_request.serialization_scope();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let connection_id = connection_request_id.connection_id;
let processor = Arc::clone(self);
let span = request_context.span();
let request = QueuedInitializedRequest::new(
async move {
processor
.handle_initialized_client_request(
connection_request_id,
codex_request,
request_context,
app_server_client_name,
client_version,
)
.await;
}
.instrument(span),
);
if let Some(scope) = serialization_scope {
let key = RequestSerializationQueueKey::from_scope(connection_id, scope);
self.request_serialization_queues
.enqueue(key, request)
.await;
} else {
tokio::spawn(async move {
request.run().await;
});
}
}
async fn handle_initialized_client_request(
self: Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
request_context: RequestContext,
app_server_client_name: Option<String>,
client_version: Option<String>,
) {
let connection_id = connection_request_id.connection_id;
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
@@ -848,8 +964,8 @@ impl MessageProcessor {
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
session.client_version.clone(),
app_server_client_name,
client_version,
request_context,
)
.boxed()

View File

@@ -109,9 +109,9 @@ fn tracing_test_guard() -> &'static tokio::sync::Mutex<()> {
struct TracingHarness {
_server: MockServer,
_codex_home: TempDir,
processor: MessageProcessor,
processor: Arc<MessageProcessor>,
outgoing_rx: mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
session: ConnectionSessionState,
session: Arc<ConnectionSessionState>,
tracing: &'static TestTracing,
}
@@ -129,7 +129,7 @@ impl TracingHarness {
_codex_home: codex_home,
processor,
outgoing_rx,
session: ConnectionSessionState::default(),
session: Arc::new(ConnectionSessionState::default()),
tracing,
};
@@ -152,7 +152,7 @@ impl TracingHarness {
/*trace*/ None,
)
.await;
assert!(harness.session.initialized);
assert!(harness.session.initialized());
Ok(harness)
}
@@ -182,7 +182,7 @@ impl TracingHarness {
TEST_CONNECTION_ID,
request,
AppServerTransport::Stdio,
&mut self.session,
Arc::clone(&self.session),
)
.await;
read_response(&mut self.outgoing_rx, request_id).await
@@ -230,14 +230,14 @@ async fn build_test_config(codex_home: &Path, server_uri: &str) -> Result<Config
fn build_test_processor(
config: Arc<Config>,
) -> (
MessageProcessor,
Arc<MessageProcessor>,
mpsc::Receiver<crate::outgoing_message::OutgoingEnvelope>,
) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let auth_manager =
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
let processor = MessageProcessor::new(MessageProcessorArgs {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
arg0_paths: Arg0DispatchPaths::default(),
config,
@@ -252,7 +252,7 @@ fn build_test_processor(
auth_manager,
rpc_transport: AppServerRpcTransport::Stdio,
remote_control_handle: None,
});
}));
(processor, outgoing_rx)
}

View File

@@ -0,0 +1,227 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use codex_app_server_protocol::ClientRequestSerializationScope;
use codex_utils_absolute_path::AbsolutePathBuf;
use tokio::sync::Mutex;
use tracing::Instrument;
use crate::outgoing_message::ConnectionId;
type BoxFutureUnit = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub(crate) enum RequestSerializationQueueKey {
Global(&'static str),
Thread {
thread_id: String,
},
ThreadPath {
path: PathBuf,
},
CommandExecProcess {
connection_id: ConnectionId,
process_id: String,
},
FuzzyFileSearchSession {
session_id: String,
},
FsWatch {
connection_id: ConnectionId,
watch_id: String,
},
Plugin {
marketplace_path: AbsolutePathBuf,
plugin_name: String,
},
PluginId {
plugin_id: String,
},
McpOauth {
server_name: String,
},
}
impl RequestSerializationQueueKey {
pub(crate) fn from_scope(
connection_id: ConnectionId,
scope: ClientRequestSerializationScope,
) -> Self {
match scope {
ClientRequestSerializationScope::Global(name) => Self::Global(name),
ClientRequestSerializationScope::Thread { thread_id } => Self::Thread { thread_id },
ClientRequestSerializationScope::ThreadPath { path } => Self::ThreadPath { path },
ClientRequestSerializationScope::CommandExecProcess { process_id } => {
Self::CommandExecProcess {
connection_id,
process_id,
}
}
ClientRequestSerializationScope::FuzzyFileSearchSession { session_id } => {
Self::FuzzyFileSearchSession { session_id }
}
ClientRequestSerializationScope::FsWatch { watch_id } => Self::FsWatch {
connection_id,
watch_id,
},
ClientRequestSerializationScope::Plugin {
marketplace_path,
plugin_name,
} => Self::Plugin {
marketplace_path,
plugin_name,
},
ClientRequestSerializationScope::PluginId { plugin_id } => Self::PluginId { plugin_id },
ClientRequestSerializationScope::McpOauth { server_name } => {
Self::McpOauth { server_name }
}
}
}
}
pub(crate) struct QueuedInitializedRequest {
future: BoxFutureUnit,
}
impl QueuedInitializedRequest {
pub(crate) fn new(future: impl Future<Output = ()> + Send + 'static) -> Self {
Self {
future: Box::pin(future),
}
}
pub(crate) async fn run(self) {
self.future.await;
}
}
#[derive(Clone, Default)]
pub(crate) struct RequestSerializationQueues {
inner: Arc<Mutex<HashMap<RequestSerializationQueueKey, VecDeque<QueuedInitializedRequest>>>>,
}
impl RequestSerializationQueues {
pub(crate) async fn enqueue(
&self,
key: RequestSerializationQueueKey,
request: QueuedInitializedRequest,
) {
let should_spawn = {
let mut queues = self.inner.lock().await;
match queues.get_mut(&key) {
Some(queue) => {
queue.push_back(request);
false
}
None => {
let mut queue = VecDeque::new();
queue.push_back(request);
queues.insert(key.clone(), queue);
true
}
}
};
if should_spawn {
let queues = self.clone();
let span = tracing::debug_span!("app_server.serialized_request_queue", ?key);
tokio::spawn(async move { queues.drain(key).await }.instrument(span));
}
}
async fn drain(self, key: RequestSerializationQueueKey) {
loop {
let request = {
let mut queues = self.inner.lock().await;
let Some(queue) = queues.get_mut(&key) else {
return;
};
match queue.pop_front() {
Some(request) => request,
None => {
queues.remove(&key);
return;
}
}
};
request.run().await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn same_key_requests_run_fifo() {
let queues = RequestSerializationQueues::default();
let key = RequestSerializationQueueKey::Global("test");
let (tx, mut rx) = mpsc::unbounded_channel();
for value in [1, 2, 3] {
let tx = tx.clone();
queues
.enqueue(
key.clone(),
QueuedInitializedRequest::new(async move {
tx.send(value).expect("receiver should be open");
}),
)
.await;
}
drop(tx);
let mut values = Vec::new();
while let Some(value) = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timed out waiting for queued request")
{
values.push(value);
}
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn different_keys_run_concurrently() {
let queues = RequestSerializationQueues::default();
let (blocked_tx, blocked_rx) = oneshot::channel::<()>();
let (ran_tx, ran_rx) = oneshot::channel::<()>();
queues
.enqueue(
RequestSerializationQueueKey::Global("blocked"),
QueuedInitializedRequest::new(async move {
let _ = blocked_rx.await;
}),
)
.await;
queues
.enqueue(
RequestSerializationQueueKey::Global("other"),
QueuedInitializedRequest::new(async move {
ran_tx.send(()).expect("receiver should be open");
}),
)
.await;
timeout(Duration::from_secs(1), ran_rx)
.await
.expect("other key should not be blocked")
.expect("sender should be open");
blocked_tx
.send(())
.expect("blocked request should be waiting");
}
}

View File

@@ -121,7 +121,7 @@ pub(crate) struct ConnectionState {
pub(crate) outbound_initialized: Arc<AtomicBool>,
pub(crate) outbound_experimental_api_enabled: Arc<AtomicBool>,
pub(crate) outbound_opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
pub(crate) session: ConnectionSessionState,
pub(crate) session: Arc<ConnectionSessionState>,
}
impl ConnectionState {
@@ -134,7 +134,7 @@ impl ConnectionState {
outbound_initialized,
outbound_experimental_api_enabled,
outbound_opted_out_notification_methods,
session: ConnectionSessionState::default(),
session: Arc::new(ConnectionSessionState::default()),
}
}
}

View File

@@ -569,6 +569,55 @@ model = "gpt-old"
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn config_read_after_pipelined_write_sees_written_value() -> Result<()> {
let temp_dir = TempDir::new()?;
let codex_home = temp_dir.path().canonicalize()?;
write_config(
&temp_dir,
r#"
model = "gpt-old"
"#,
)?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let write_id = mcp
.send_config_value_write_request(ConfigValueWriteParams {
file_path: None,
key_path: "model".to_string(),
value: json!("gpt-new"),
merge_strategy: MergeStrategy::Replace,
expected_version: None,
})
.await?;
let read_id = mcp
.send_config_read_request(ConfigReadParams {
include_layers: false,
cwd: None,
})
.await?;
let write_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(write_id)),
)
.await??;
let write: ConfigWriteResponse = to_response(write_resp)?;
assert_eq!(write.status, WriteStatus::Ok);
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let read: ConfigReadResponse = to_response(read_resp)?;
assert_eq!(read.config.model.as_deref(), Some("gpt-new"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn config_value_write_rejects_version_conflict() -> Result<()> {
let codex_home = TempDir::new()?;

View File

@@ -963,35 +963,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
"unexpected close reason: {closed_notification:?}"
);
let request = call_capture.single_request();
assert_eq!(request.url.path(), "/v1/realtime/calls");
assert_eq!(request.url.query(), None);
assert_eq!(
request
.headers
.get("content-type")
.and_then(|value| value.to_str().ok()),
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}}]}"#;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
v=offer\r\n\
\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
);
assert_call_create_multipart(call_capture.single_request(), "v=offer\r\n", session)?;
realtime_server.shutdown().await;
Ok(())
@@ -1312,7 +1285,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
};
assert_eq!(id.as_str(), "shell_call");
assert_eq!(status, CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
assert_output_contains(aggregated_output.as_deref(), "realtime-tool-ok");
// Phase 3: verify the shell output reached Responses and the final delegated answer returned
// to realtime as a single function-call-output item.
@@ -1643,6 +1616,16 @@ fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_outpu
);
}
fn assert_output_contains(aggregated_output: Option<&str>, expected_output: &str) {
let Some(aggregated_output) = aggregated_output else {
panic!("expected aggregated command output");
};
assert!(
aggregated_output.contains(expected_output),
"expected aggregated output to contain {expected_output:?}, got {aggregated_output:?}"
);
}
fn assert_v1_session_update(request: &Value) -> Result<()> {
assert_eq!(request["type"].as_str(), Some("session.update"));
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));
@@ -1691,22 +1674,27 @@ fn assert_call_create_multipart(
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
{offer_sdp}\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
let prefix = format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
{offer_sdp}\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n"
);
let session_part = body
.strip_prefix(&prefix)
.context("multipart body should include expected SDP part")?
.strip_suffix("\r\n--codex-realtime-call-boundary--\r\n")
.context("multipart body should end with closing boundary")?;
let actual_session: Value =
serde_json::from_str(session_part).context("actual session part should be JSON")?;
let expected_session: Value =
serde_json::from_str(session).context("expected session part should be JSON")?;
assert_eq!(actual_session, expected_session);
Ok(())
}

View File

@@ -94,11 +94,8 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() ->
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::InProgress);
let delta = wait_for_command_execution_output_delta(&mut mcp, &command_id).await?;
assert_eq!(
delta.delta.trim_end_matches(['\r', '\n']),
expected_output.trim_end_matches(['\r', '\n'])
);
wait_for_command_execution_output_delta_containing(&mut mcp, &command_id, &expected_output)
.await?;
let completed = wait_for_command_execution_completed(&mut mcp, Some(&command_id)).await?;
let ThreadItem::CommandExecution {
@@ -115,7 +112,7 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() ->
assert_eq!(id, &command_id);
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some(expected_output.as_str()));
assert_output_contains(aggregated_output.as_deref(), &expected_output);
assert_eq!(*exit_code, Some(0));
timeout(
@@ -152,7 +149,7 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() ->
};
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(status, &CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some(expected_output.as_str()));
assert_output_contains(aggregated_output.as_deref(), &expected_output);
Ok(())
}
@@ -275,7 +272,7 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
unreachable!("helper returns command execution item");
};
assert_eq!(source, &CommandExecutionSource::UserShell);
assert_eq!(aggregated_output.as_deref(), Some(expected_output.as_str()));
assert_output_contains(aggregated_output.as_deref(), &expected_output);
mcp.send_response(
request_id,
@@ -315,7 +312,9 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
source: CommandExecutionSource::UserShell,
aggregated_output,
..
} if aggregated_output.as_deref() == Some(expected_output.as_str())
} if aggregated_output
.as_deref()
.is_some_and(|output| output.contains(expected_output.as_str()))
)
}),
"expected active-turn shell command to be persisted on the existing turn"
@@ -401,9 +400,10 @@ async fn wait_for_command_execution_completed(
}
}
async fn wait_for_command_execution_output_delta(
async fn wait_for_command_execution_output_delta_containing(
mcp: &mut McpProcess,
item_id: &str,
expected_output: &str,
) -> Result<CommandExecutionOutputDeltaNotification> {
loop {
let notif = mcp
@@ -414,12 +414,22 @@ async fn wait_for_command_execution_output_delta(
.params
.ok_or_else(|| anyhow::anyhow!("missing output delta params"))?,
)?;
if delta.item_id == item_id {
if delta.item_id == item_id && delta.delta.contains(expected_output) {
return Ok(delta);
}
}
}
fn assert_output_contains(aggregated_output: Option<&str>, expected_output: &str) {
let Some(aggregated_output) = aggregated_output else {
panic!("expected aggregated command output");
};
assert!(
aggregated_output.contains(expected_output),
"expected aggregated output to contain {expected_output:?}, got {aggregated_output:?}"
);
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,

View File

@@ -221,11 +221,22 @@ async fn thread_start_does_not_track_thread_initialized_analytics_without_featur
.await??;
let _ = to_response::<ThreadStartResponse>(resp)?;
let payload = wait_for_analytics_payload(&server, Duration::from_millis(250)).await;
assert!(
payload.is_err(),
"thread analytics should be gated off when general_analytics is disabled"
);
tokio::time::sleep(Duration::from_millis(250)).await;
let requests = server.received_requests().await.unwrap_or_default();
for request in requests.iter().filter(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
let payload: Value = serde_json::from_slice(&request.body)?;
let events = payload["events"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("analytics payload missing events array"))?;
assert!(
!events
.iter()
.any(|event| event["event_type"] == "codex_thread_initialized"),
"thread analytics should be gated off when general_analytics is disabled"
);
}
Ok(())
}
@@ -826,10 +837,11 @@ fn create_config_toml_with_chatgpt_base_url(
chatgpt_base_url: &str,
general_analytics_enabled: bool,
) -> std::io::Result<()> {
let general_analytics_toml = if general_analytics_enabled {
"\ngeneral_analytics = true".to_string()
} else {
let general_analytics_toml = format!("general_analytics = {general_analytics_enabled}");
let analytics_toml = if general_analytics_enabled {
String::new()
} else {
"\n[analytics]\nenabled = false\n".to_string()
};
let config_toml = codex_home.join("config.toml");
std::fs::write(
@@ -840,6 +852,7 @@ model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
chatgpt_base_url = "{chatgpt_base_url}"
{analytics_toml}
model_provider = "mock_provider"

View File

@@ -10,7 +10,6 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::CommandAction;
@@ -75,16 +74,12 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
Some(5000),
"call-zsh-fork",
)?;
let no_op_response = responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_completed("resp-2"),
]);
// Interrupting after the shell item starts can race with the follow-up
// model request that reports the aborted tool call. This test only cares
// that zsh-fork launches the expected command, so allow one extra no-op
// `/responses` POST instead of asserting an exact request count.
let server =
create_mock_responses_server_sequence_unchecked(vec![response, no_op_response]).await;
let server = responses::start_mock_server().await;
wiremock::Mock::given(wiremock::matchers::method("POST"))
.and(wiremock::matchers::path_regex(".*/responses$"))
.respond_with(responses::sse_response(response))
.mount(&server)
.await;
create_config_toml(
&codex_home,
&server.uri(),
@@ -494,8 +489,19 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
// subcommand-decline flow. This test is about approval/decline behavior in
// the zsh fork, not exact model request count, so allow an extra request
// and return a harmless no-op response if it arrives.
let server =
create_mock_responses_server_sequence_unchecked(vec![response, no_op_response]).await;
let server = responses::start_mock_server().await;
let _initial_turn = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, "remove both files"),
response,
)
.await;
let _follow_up = responses::mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, "call-zsh-fork-subcommand-decline"),
no_op_response,
)
.await;
create_config_toml(
&codex_home,
&server.uri(),
@@ -744,6 +750,12 @@ async fn create_zsh_test_mcp_process(codex_home: &Path, zdotdir: &Path) -> Resul
McpProcess::new_with_env(codex_home, &[("ZDOTDIR", Some(zdotdir.as_str()))]).await
}
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
String::from_utf8(req.body.clone())
.ok()
.is_some_and(|body| body.contains(text))
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
@@ -751,7 +763,10 @@ fn create_config_toml(
feature_flags: &BTreeMap<Feature, bool>,
zsh_path: &Path,
) -> std::io::Result<()> {
let mut features = BTreeMap::from([(Feature::RemoteModels, false)]);
let mut features = BTreeMap::from([
(Feature::EnableRequestCompression, false),
(Feature::RemoteModels, false),
]);
for (feature, enabled) in feature_flags {
features.insert(*feature, *enabled);
}