Compare commits

...

34 Commits

Author SHA1 Message Date
Eric Traut
36b50e02b8 codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
ca4837cd0d codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
bf91bae434 codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
d054de45bd codex: fix CI failure on PR #15106 2026-03-21 15:14:45 -06:00
Eric Traut
be5dba6189 codex: restore exec json streaming compatibility (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
efa0f04e6f Simplify exec event handling 2026-03-21 15:14:45 -06:00
Eric Traut
36ba70c2bc codex: reject in-process external auth refresh (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
9855a2269f codex: fix PR CI regressions (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
851c37f066 codex: address PR review feedback (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
38027b41ff codex: address PR review feedback (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
c84b994c3d codex: drop legacy app-server events from client API (#15106) 2026-03-21 15:12:37 -06:00
Eric Traut
50b9c114be codex: remove in-process legacy notifications (#15106) 2026-03-21 15:12:14 -06:00
Eric Traut
d15933b8d9 codex: remove legacy app-server notification plumbing (#15106) 2026-03-21 15:12:14 -06:00
Eric Traut
5d438ff4f4 codex: restore typed exec json output coverage (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
e5172ff1c1 codex: restore exec json output test path (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
e4d070ff55 codex: move exec jsonl tests to crate tests dir (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
2e69132680 codex: simplify in-process startup wiring (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
86a33fbd79 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
d4c72ea829 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
50b61563c5 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
be700f8284 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
6058aba292 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
99248e8300 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
6e32ed312d codex: remove dead tui app server imports (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
4daf0300e1 codex: fix clippy regression in human output (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
e616e4b28b codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
f83b099c10 Fix exec human output fallbacks 2026-03-21 15:11:28 -06:00
Eric Traut
eeeb01a4bd Fix Bazel exec test fixtures 2026-03-21 15:11:28 -06:00
Eric Traut
c2851508b4 Fix exec CI regressions on app-server migration 2026-03-21 15:11:28 -06:00
Eric Traut
a19077ce26 codex: move exec json output tests back out of impl (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
58e2e9a2bb codex: fix follow-up PR blockers (#15106) 2026-03-21 15:11:27 -06:00
Eric Traut
17482fc8e6 codex: address PR review feedback (#15106) 2026-03-21 15:10:50 -06:00
Eric Traut
8dc496ec47 Restore typed exec JSON output coverage 2026-03-21 15:10:50 -06:00
Eric Traut
cf6cc9043d Move codex exec onto typed app-server APIs 2026-03-21 15:10:50 -06:00
17 changed files with 2673 additions and 4030 deletions

5
codex-rs/Cargo.lock generated
View File

@@ -1488,7 +1488,6 @@ dependencies = [
"codex-app-server-protocol",
"codex-arg0",
"codex-core",
"codex-features",
"codex-feedback",
"codex-protocol",
"futures",
@@ -1996,9 +1995,7 @@ dependencies = [
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-cli",
"codex-utils-elapsed",
"codex-utils-oss",
"codex-utils-sandbox-summary",
"core_test_support",
"libc",
"opentelemetry",
@@ -2006,10 +2003,8 @@ dependencies = [
"owo-colors",
"predicates",
"pretty_assertions",
"rmcp",
"serde",
"serde_json",
"shlex",
"supports-color 3.0.2",
"tempfile",
"tokio",

View File

@@ -16,7 +16,6 @@ codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-core = { workspace = true }
codex-features = { workspace = true }
codex-feedback = { workspace = true }
codex-protocol = { workspace = true }
futures = { workspace = true }

View File

@@ -35,19 +35,14 @@ use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_features::Feature;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use serde::de::DeserializeOwned;
@@ -73,7 +68,6 @@ pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
pub enum AppServerEvent {
Lagged { skipped: usize },
ServerNotification(ServerNotification),
LegacyNotification(JSONRPCNotification),
ServerRequest(ServerRequest),
Disconnected { message: String },
}
@@ -85,9 +79,6 @@ impl From<InProcessServerEvent> for AppServerEvent {
InProcessServerEvent::ServerNotification(notification) => {
Self::ServerNotification(notification)
}
InProcessServerEvent::LegacyNotification(notification) => {
Self::LegacyNotification(notification)
}
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
}
}
@@ -97,19 +88,12 @@ fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
// These terminal events drive surface shutdown/completion state. Dropping
// them under backpressure can leave exec/TUI waiting forever even though
// the underlying turn has already ended.
match event {
matches!(
event,
InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
) => true,
InProcessServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
_ => false,
}
)
)
}
/// Layered error for [`InProcessAppServerClient::request_typed`].
@@ -159,16 +143,6 @@ impl Error for TypedRequestError {
}
}
#[derive(Clone)]
struct SharedCoreManagers {
// Temporary bootstrap escape hatch for embedders that still need direct
// core handles during the in-process app-server migration. Once TUI/exec
// stop depending on direct manager access, remove this wrapper and keep
// manager ownership entirely inside the app-server runtime.
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
pub struct InProcessClientStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
@@ -202,30 +176,6 @@ pub struct InProcessClientStartArgs {
}
impl InProcessClientStartArgs {
fn shared_core_managers(&self) -> SharedCoreManagers {
let auth_manager = AuthManager::shared(
self.config.codex_home.clone(),
self.enable_codex_api_key_env,
self.config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
self.config.as_ref(),
auth_manager.clone(),
self.session_source.clone(),
CollaborationModesConfig {
default_mode_request_user_input: self
.config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
SharedCoreManagers {
auth_manager,
thread_manager,
}
}
/// Builds initialize params from caller-provided metadata.
pub fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
@@ -247,7 +197,7 @@ impl InProcessClientStartArgs {
}
}
fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs {
fn into_runtime_start_args(self) -> InProcessStartArgs {
let initialize = self.initialize_params();
InProcessStartArgs {
arg0_paths: self.arg0_paths,
@@ -255,8 +205,6 @@ impl InProcessClientStartArgs {
cli_overrides: self.cli_overrides,
loader_overrides: self.loader_overrides,
cloud_requirements: self.cloud_requirements,
auth_manager: Some(shared_core.auth_manager.clone()),
thread_manager: Some(shared_core.thread_manager.clone()),
feedback: self.feedback,
config_warnings: self.config_warnings,
session_source: self.session_source,
@@ -310,8 +258,6 @@ pub struct InProcessAppServerClient {
command_tx: mpsc::Sender<ClientCommand>,
event_rx: mpsc::Receiver<InProcessServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
@@ -338,9 +284,8 @@ impl InProcessAppServerClient {
/// with overload error instead of being silently dropped.
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let shared_core = args.shared_core_managers();
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?;
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
@@ -401,6 +346,25 @@ impl InProcessAppServerClient {
let Some(event) = event else {
break;
};
if let InProcessServerEvent::ServerRequest(
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. }
) = &event
{
let send_result = request_sender.fail_server_request(
request_id.clone(),
JSONRPCErrorError {
code: -32000,
message: "chatgpt auth token refresh is not supported for in-process app-server clients".to_string(),
data: None,
},
);
if let Err(err) = send_result {
warn!(
"failed to reject unsupported chatgpt auth token refresh request: {err}"
);
}
continue;
}
if skipped_events > 0 {
if event_requires_delivery(&event) {
@@ -491,21 +455,9 @@ impl InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
auth_manager: shared_core.auth_manager,
thread_manager: shared_core.thread_manager,
})
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn auth_manager(&self) -> Arc<AuthManager> {
self.auth_manager.clone()
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn thread_manager(&self) -> Arc<ThreadManager> {
self.thread_manager.clone()
}
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
InProcessAppServerRequestHandle {
command_tx: self.command_tx.clone(),
@@ -664,8 +616,6 @@ impl InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
auth_manager: _,
thread_manager: _,
} = self;
let mut worker_handle = worker_handle;
// Drop the caller-facing receiver before asking the worker to shut
@@ -857,8 +807,6 @@ mod tests {
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::ConfigBuilder;
use futures::SinkExt;
use futures::StreamExt;
@@ -1052,7 +1000,7 @@ mod tests {
}
#[tokio::test]
async fn shared_thread_manager_tracks_threads_started_via_app_server() {
async fn threads_started_via_app_server_are_visible_through_typed_requests() {
let client = start_test_client(SessionSource::Cli).await;
let response: ThreadStartResponse = client
@@ -1065,17 +1013,19 @@ mod tests {
})
.await
.expect("thread/start should succeed");
let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id)
.expect("thread id should parse");
timeout(
Duration::from_secs(2),
client.thread_manager().get_thread(created_thread_id),
)
.await
.expect("timed out waiting for retained thread manager to observe started thread")
.expect("started thread should be visible through the shared thread manager");
let thread_ids = client.thread_manager().list_thread_ids().await;
assert!(thread_ids.contains(&created_thread_id));
let read = client
.request_typed::<codex_app_server_protocol::ThreadReadResponse>(
ClientRequest::ThreadRead {
request_id: RequestId::Integer(4),
params: codex_app_server_protocol::ThreadReadParams {
thread_id: response.thread.id.clone(),
include_turns: false,
},
},
)
.await
.expect("thread/read should return the newly started thread");
assert_eq!(read.thread.id, response.thread.id);
client.shutdown().await.expect("shutdown should complete");
}
@@ -1472,22 +1422,6 @@ mod tests {
let (command_tx, _command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(async {});
let config = build_test_config().await;
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
event_tx
.send(InProcessServerEvent::Lagged { skipped: 3 })
.await
@@ -1498,8 +1432,6 @@ mod tests {
command_tx,
event_rx,
worker_handle,
auth_manager,
thread_manager,
};
let event = timeout(Duration::from_secs(2), client.next_event())
@@ -1530,37 +1462,38 @@ mod tests {
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::LegacyNotification(
codex_app_server_protocol::JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
)
));
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
skipped: 1
}));
}
#[tokio::test]
async fn accessors_expose_retained_shared_managers() {
let client = start_test_client(SessionSource::Cli).await;
async fn runtime_start_args_leave_manager_bootstrap_to_app_server() {
let config = Arc::new(build_test_config().await);
assert!(
Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()),
"auth_manager accessor should clone the retained shared manager"
);
assert!(
Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()),
"thread_manager accessor should clone the retained shared manager"
);
let runtime_args = InProcessClientStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: config.clone(),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source: SessionSource::Exec,
enable_codex_api_key_env: false,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
}
.into_runtime_start_args();
client.shutdown().await.expect("shutdown should complete");
assert_eq!(runtime_args.config, config);
}
#[tokio::test]
async fn shutdown_completes_promptly_with_retained_shared_managers() {
async fn shutdown_completes_promptly_without_retained_managers() {
let client = start_test_client(SessionSource::Cli).await;
timeout(Duration::from_secs(1), client.shutdown())

View File

@@ -272,18 +272,19 @@ impl RemoteAppServerClient {
}
}
Ok(JSONRPCMessage::Notification(notification)) => {
let event = app_server_event_from_notification(notification);
if let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
if let Some(event) =
app_server_event_from_notification(notification)
&& let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
}
Ok(JSONRPCMessage::Request(request)) => {
let request_id = request.id.clone();
@@ -673,7 +674,9 @@ async fn initialize_remote_connection(
)));
}
JSONRPCMessage::Notification(notification) => {
pending_events.push(app_server_event_from_notification(notification));
if let Some(event) = app_server_event_from_notification(notification) {
pending_events.push(event);
}
}
JSONRPCMessage::Request(request) => {
let request_id = request.id.clone();
@@ -756,10 +759,10 @@ async fn initialize_remote_connection(
Ok(pending_events)
}
fn app_server_event_from_notification(notification: JSONRPCNotification) -> AppServerEvent {
match ServerNotification::try_from(notification.clone()) {
Ok(notification) => AppServerEvent::ServerNotification(notification),
Err(_) => AppServerEvent::LegacyNotification(notification),
fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
match ServerNotification::try_from(notification) {
Ok(notification) => Some(AppServerEvent::ServerNotification(notification)),
Err(_) => None,
}
}
@@ -852,13 +855,6 @@ async fn reject_if_server_request_dropped(
fn event_requires_delivery(event: &AppServerEvent) -> bool {
match event {
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
AppServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
AppServerEvent::Disconnected { .. } => true,
AppServerEvent::Lagged { .. }
| AppServerEvent::ServerNotification(_)

View File

@@ -68,14 +68,11 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
@@ -98,16 +95,6 @@ fn server_notification_requires_delivery(notification: &ServerNotification) -> b
matches!(notification, ServerNotification::TurnCompleted(_))
}
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
)
}
/// Input needed to start an in-process app-server runtime.
///
/// These fields mirror the pieces of ambient process state that stdio and
@@ -124,10 +111,6 @@ pub struct InProcessStartArgs {
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Optional prebuilt auth manager reused by an embedding caller.
pub auth_manager: Option<Arc<AuthManager>>,
/// Optional prebuilt thread manager reused by an embedding caller.
pub thread_manager: Option<Arc<ThreadManager>>,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
@@ -144,11 +127,6 @@ pub struct InProcessStartArgs {
/// Event emitted from the app-server to the in-process client.
///
/// The stream carries three event families because CLI surfaces are mid-migration
/// from the legacy `codex_protocol::Event` model to the typed app-server
/// notification model. Once all surfaces consume only [`ServerNotification`],
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
///
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
/// event — it signals that the consumer fell behind and some events were dropped.
#[derive(Debug, Clone)]
@@ -157,8 +135,6 @@ pub enum InProcessServerEvent {
ServerRequest(ServerRequest),
/// App-server notification directed to the embedded client.
ServerNotification(ServerNotification),
/// Legacy JSON-RPC notification from core event bridge.
LegacyNotification(JSONRPCNotification),
/// Indicates one or more events were dropped due to backpressure.
Lagged { skipped: usize },
}
@@ -390,7 +366,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
Arc::clone(&outbound_initialized),
Arc::clone(&outbound_experimental_api_enabled),
Arc::clone(&outbound_opted_out_notification_methods),
/*allow_legacy_notifications*/ true,
/*disconnect_sender*/ None,
),
);
@@ -410,8 +385,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
cli_overrides: args.cli_overrides,
loader_overrides: args.loader_overrides,
cloud_requirements: args.cloud_requirements,
auth_manager: args.auth_manager,
thread_manager: args.thread_manager,
feedback: args.feedback,
log_db: None,
config_warnings: args.config_warnings,
@@ -655,32 +628,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
}
}
}
OutgoingMessage::Notification(notification) => {
let notification = JSONRPCNotification {
method: notification.method,
params: notification.params,
};
if legacy_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::LegacyNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process legacy notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
OutgoingMessage::Notification(_notification) => {}
}
}
}
@@ -759,8 +707,6 @@ mod tests {
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
@@ -858,7 +804,7 @@ mod tests {
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
fn guaranteed_delivery_helpers_cover_terminal_server_notifications() {
assert!(server_notification_requires_delivery(
&ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-1".to_string(),
@@ -870,30 +816,5 @@ mod tests {
},
})
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/task_complete".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/shutdown_complete".to_string(),
params: None,
}
));
assert!(!legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/item_started".to_string(),
params: None,
}
));
}
}

View File

@@ -104,8 +104,6 @@ enum OutboundControlEvent {
Opened {
connection_id: ConnectionId,
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
// Allow codex/event/* notifications to be emitted.
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
initialized: Arc<AtomicBool>,
experimental_api_enabled: Arc<AtomicBool>,
@@ -562,7 +560,6 @@ pub async fn run_main_with_transport(
OutboundControlEvent::Opened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
initialized,
experimental_api_enabled,
@@ -575,7 +572,6 @@ pub async fn run_main_with_transport(
initialized,
experimental_api_enabled,
opted_out_notification_methods,
allow_legacy_notifications,
disconnect_sender,
),
);
@@ -618,8 +614,6 @@ pub async fn run_main_with_transport(
cli_overrides,
loader_overrides,
cloud_requirements: cloud_requirements.clone(),
auth_manager: None,
thread_manager: None,
feedback: feedback.clone(),
log_db,
config_warnings,
@@ -675,7 +669,6 @@ pub async fn run_main_with_transport(
TransportEvent::ConnectionOpened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
} => {
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -687,7 +680,6 @@ pub async fn run_main_with_transport(
.send(OutboundControlEvent::Opened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
initialized: Arc::clone(&outbound_initialized),
experimental_api_enabled: Arc::clone(

View File

@@ -172,8 +172,6 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
pub(crate) loader_overrides: LoaderOverrides,
pub(crate) cloud_requirements: CloudRequirementsLoader,
pub(crate) auth_manager: Option<Arc<AuthManager>>,
pub(crate) thread_manager: Option<Arc<ThreadManager>>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
@@ -192,36 +190,27 @@ impl MessageProcessor {
cli_overrides,
loader_overrides,
cloud_requirements,
auth_manager,
thread_manager,
feedback,
log_db,
config_warnings,
session_source,
enable_codex_api_key_env,
} = args;
let (auth_manager, thread_manager) = match (auth_manager, thread_manager) {
(Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager),
(None, None) => {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
enable_codex_api_key_env,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
(auth_manager, thread_manager)
}
_ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"),
};
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
enable_codex_api_key_env,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),

View File

@@ -239,8 +239,6 @@ fn build_test_processor(
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
log_db: None,
config_warnings: Vec::new(),

View File

@@ -188,7 +188,6 @@ pub(crate) enum TransportEvent {
ConnectionOpened {
connection_id: ConnectionId,
writer: mpsc::Sender<OutgoingMessage>,
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
},
ConnectionClosed {
@@ -226,7 +225,6 @@ pub(crate) struct OutboundConnectionState {
pub(crate) initialized: Arc<AtomicBool>,
pub(crate) experimental_api_enabled: Arc<AtomicBool>,
pub(crate) opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
pub(crate) allow_legacy_notifications: bool,
pub(crate) writer: mpsc::Sender<OutgoingMessage>,
disconnect_sender: Option<CancellationToken>,
}
@@ -237,14 +235,12 @@ impl OutboundConnectionState {
initialized: Arc<AtomicBool>,
experimental_api_enabled: Arc<AtomicBool>,
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
) -> Self {
Self {
initialized,
experimental_api_enabled,
opted_out_notification_methods,
allow_legacy_notifications,
writer,
disconnect_sender,
}
@@ -272,7 +268,6 @@ pub(crate) async fn start_stdio_connection(
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
allow_legacy_notifications: false,
disconnect_sender: None,
})
.await
@@ -376,7 +371,6 @@ async fn run_websocket_connection(
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
allow_legacy_notifications: false,
disconnect_sender: Some(disconnect_token.clone()),
})
.await
@@ -584,13 +578,10 @@ fn should_skip_notification_for_connection(
connection_state: &OutboundConnectionState,
message: &OutgoingMessage,
) -> bool {
if !connection_state.allow_legacy_notifications
&& matches!(message, OutgoingMessage::Notification(_))
{
if matches!(message, OutgoingMessage::Notification(_)) {
// Raw legacy `codex/event/*` notifications are still emitted upstream
// for in-process compatibility, but they are no longer part of the
// external app-server contract. Keep dropping them here until the
// producer path can be deleted entirely.
// for compatibility, but they are no longer part of the app-server
// client contract. Drop them until the producer path can be deleted.
return true;
}
@@ -970,7 +961,6 @@ mod tests {
initialized,
Arc::new(AtomicBool::new(true)),
opted_out_notification_methods,
false,
None,
),
);
@@ -1008,7 +998,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1034,7 +1023,7 @@ mod tests {
}
#[tokio::test]
async fn to_connection_legacy_notifications_are_preserved_for_in_process_clients() {
async fn to_connection_legacy_notifications_are_dropped_for_in_process_clients() {
let connection_id = ConnectionId(11);
let (writer_tx, mut writer_rx) = mpsc::channel(1);
@@ -1046,7 +1035,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
true,
None,
),
);
@@ -1065,17 +1053,10 @@ mod tests {
)
.await;
let message = writer_rx
.recv()
.await
.expect("legacy notification should reach in-process clients");
assert!(matches!(
message,
OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification {
method,
params: None,
}) if method == "codex/event/task_started"
));
assert!(
writer_rx.try_recv().is_err(),
"legacy notifications should not reach in-process clients"
);
}
#[tokio::test]
@@ -1091,7 +1072,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(false)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1158,7 +1138,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1246,7 +1225,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
Some(fast_disconnect_token.clone()),
),
);
@@ -1257,7 +1235,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
Some(slow_disconnect_token.clone()),
),
);
@@ -1329,7 +1306,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);

View File

@@ -3,6 +3,7 @@ name = "codex-exec"
version.workspace = true
edition.workspace = true
license.workspace = true
autotests = false
[[bin]]
name = "codex-exec"
@@ -12,6 +13,10 @@ path = "src/main.rs"
name = "codex_exec"
path = "src/lib.rs"
[[test]]
name = "all"
path = "tests/all.rs"
[lints]
workspace = true
@@ -28,13 +33,10 @@ codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cli = { workspace = true }
codex-utils-elapsed = { workspace = true }
codex-utils-oss = { workspace = true }
codex-utils-sandbox-summary = { workspace = true }
owo-colors = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
shlex = { workspace = true }
supports-color = { workspace = true }
tokio = { workspace = true, features = [
"io-std",
@@ -63,7 +65,6 @@ opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true }
tempfile = { workspace = true }
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }

View File

@@ -1,13 +1,13 @@
use std::path::Path;
use codex_app_server_protocol::ServerNotification;
use codex_core::config::Config;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::SessionConfiguredEvent;
pub(crate) enum CodexStatus {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CodexStatus {
Running,
InitiateShutdown,
Shutdown,
}
pub(crate) trait EventProcessor {
@@ -19,8 +19,11 @@ pub(crate) trait EventProcessor {
session_configured: &SessionConfiguredEvent,
);
/// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event) -> CodexStatus;
/// Handle a single typed app-server notification emitted by the agent.
fn process_server_notification(&mut self, notification: ServerNotification) -> CodexStatus;
/// Handle a local exec warning that is not represented as an app-server notification.
fn process_warning(&mut self, message: String) -> CodexStatus;
fn print_final_output(&mut self) {}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -17,11 +17,9 @@ use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessClientStartArgs;
use codex_app_server_client::InProcessServerEvent;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::McpServerElicitationAction;
use codex_app_server_protocol::McpServerElicitationRequestResponse;
use codex_app_server_protocol::RequestId;
@@ -30,8 +28,13 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread as AppServerThread;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnsubscribeParams;
@@ -41,8 +44,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_arg0::Arg0DispatchPaths;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
use codex_core::OLLAMA_OSS_PROVIDER_ID;
use codex_core::auth::AuthConfig;
@@ -59,16 +61,17 @@ use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::format_config_error_with_source;
use codex_core::format_exec_policy_error_with_source;
use codex_core::git_info::get_git_repo_root;
use codex_core::path_utils;
use codex_feedback::CodexFeedback;
use codex_otel::set_parent_from_context;
use codex_otel::traceparent_context_from_env;
use codex_protocol::account::PlanType as AccountPlanType;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
@@ -79,10 +82,9 @@ use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::io::IsTerminal;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use supports_color::Stream;
use tokio::sync::mpsc;
@@ -101,8 +103,6 @@ use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::default_client::set_default_originator;
use codex_core::find_thread_path_by_id_str;
use codex_core::find_thread_path_by_name_str;
const DEFAULT_ANALYTICS_ENABLED: bool = true;
@@ -287,18 +287,17 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
}
};
let cloud_auth_manager = AuthManager::shared(
codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config_toml.cli_auth_credentials_store.unwrap_or_default(),
);
let chatgpt_base_url = config_toml
.chatgpt_base_url
.clone()
.unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string());
// TODO(gt): Make cloud requirements failures blocking once we can fail-closed.
let cloud_requirements =
cloud_requirements_loader(cloud_auth_manager, chatgpt_base_url, codex_home.clone());
let cloud_requirements = cloud_requirements_loader_for_storage(
codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config_toml.cli_auth_credentials_store.unwrap_or_default(),
chatgpt_base_url,
);
let run_cli_overrides = cli_kv_overrides.clone();
let run_loader_overrides = LoaderOverrides::default();
let run_cloud_requirements = cloud_requirements.clone();
@@ -500,14 +499,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
last_message_file.clone(),
)),
};
let required_mcp_servers: HashSet<String> = config
.mcp_servers
.get()
.iter()
.filter(|(_, server)| server.enabled && server.required)
.map(|(name, _)| name.clone())
.collect();
if oss {
// We're in the oss section, so provider_id should be Some
// Let's handle None case gracefully though just in case
@@ -547,17 +538,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
})?;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
// Handle resume subcommand through existing `thread/list` + `thread/resume`
// APIs so exec no longer reaches into rollout storage directly.
let (primary_thread_id, fallback_session_configured) =
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
let resume_path = resolve_resume_path(&config, args).await?;
if let Some(path) = resume_path {
if let Some(thread_id) = resolve_resume_thread_id(&client, &config, args).await? {
let response: ThreadResumeResponse = send_request_with_response(
&client,
ClientRequest::ThreadResume {
request_id: request_ids.next(),
params: thread_resume_params_from_config(&config, Some(path)),
params: thread_resume_params_from_config(&config, thread_id),
},
"thread/resume",
)
@@ -598,7 +588,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
};
let primary_thread_id_for_span = primary_thread_id.to_string();
let mut buffered_events = VecDeque::new();
// Use the start/resume response as the authoritative bootstrap payload.
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
// avoidable startup latency on the in-process path.
@@ -670,10 +659,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
// is using.
event_processor.print_config_summary(&config, &prompt_summary, &session_configured);
if !json_mode && let Some(message) = codex_core::config::missing_system_bwrap_warning() {
let _ = event_processor.process_event(Event {
id: String::new(),
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
});
event_processor.process_warning(message);
}
info!("Codex initialized with event: {session_configured:?}");
@@ -748,34 +734,30 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
let mut interrupt_channel_open = true;
let primary_thread_id_for_requests = primary_thread_id.to_string();
loop {
let server_event = if let Some(event) = buffered_events.pop_front() {
Some(event)
} else {
tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
if maybe_interrupt.is_none() {
interrupt_channel_open = false;
continue;
}
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
&client,
ClientRequest::TurnInterrupt {
request_id: request_ids.next(),
params: TurnInterruptParams {
thread_id: primary_thread_id_for_requests.clone(),
turn_id: task_id.clone(),
},
},
"turn/interrupt",
)
.await
{
warn!("turn/interrupt failed: {err}");
}
let server_event = tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
if maybe_interrupt.is_none() {
interrupt_channel_open = false;
continue;
}
maybe_event = client.next_event() => maybe_event,
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
&client,
ClientRequest::TurnInterrupt {
request_id: request_ids.next(),
params: TurnInterruptParams {
thread_id: primary_thread_id_for_requests.clone(),
turn_id: task_id.clone(),
},
},
"turn/interrupt",
)
.await
{
warn!("turn/interrupt failed: {err}");
}
continue;
}
maybe_event = client.next_event() => maybe_event,
};
let Some(server_event) = server_event else {
@@ -784,69 +766,36 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
match server_event {
InProcessServerEvent::ServerRequest(request) => {
handle_server_request(
&client,
request,
&config,
&primary_thread_id_for_requests,
&mut error_seen,
)
.await;
handle_server_request(&client, request, &mut error_seen).await;
}
InProcessServerEvent::ServerNotification(notification) => {
if let ServerNotification::Error(payload) = &notification
if let ServerNotification::Error(payload) = &notification {
if payload.thread_id == primary_thread_id_for_requests
&& payload.turn_id == task_id
&& !payload.will_retry
{
error_seen = true;
}
} else if let ServerNotification::TurnCompleted(payload) = &notification
&& payload.thread_id == primary_thread_id_for_requests
&& payload.turn_id == task_id
&& !payload.will_retry
&& payload.turn.id == task_id
&& matches!(
payload.turn.status,
codex_app_server_protocol::TurnStatus::Failed
| codex_app_server_protocol::TurnStatus::Interrupted
)
{
error_seen = true;
}
}
InProcessServerEvent::LegacyNotification(notification) => {
let decoded = match decode_legacy_notification(notification) {
Ok(event) => event,
Err(err) => {
warn!("{err}");
continue;
}
};
if decoded.conversation_id.as_deref()
!= Some(primary_thread_id_for_requests.as_str())
&& decoded.conversation_id.is_some()
{
continue;
}
let event = decoded.event;
if matches!(event.msg, EventMsg::SessionConfigured(_)) {
continue;
}
if matches!(event.msg, EventMsg::Error(_)) {
// The legacy bridge still carries fatal turn failures for
// exec. Preserve the non-zero exit behavior until this
// path is fully replaced by typed server notifications.
error_seen = true;
}
match &event.msg {
EventMsg::TurnComplete(payload) => {
if payload.turn_id != task_id {
continue;
}
}
EventMsg::TurnAborted(payload) => {
if payload.turn_id.as_deref() != Some(task_id.as_str()) {
continue;
}
}
EventMsg::McpStartupUpdate(update) => {
if required_mcp_servers.contains(&update.server)
&& let codex_protocol::protocol::McpStartupStatus::Failed { error } =
&update.status
{
error_seen = true;
eprintln!(
"Required MCP server '{}' failed to initialize: {error}",
update.server
);
if should_process_notification(
&notification,
&primary_thread_id_for_requests,
&task_id,
) {
match event_processor.process_server_notification(notification) {
CodexStatus::Running => {}
CodexStatus::InitiateShutdown => {
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
@@ -859,37 +808,12 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
break;
}
}
_ => {}
}
match event_processor.process_event(event) {
CodexStatus::Running => {}
CodexStatus::InitiateShutdown => {
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
&primary_thread_id_for_requests,
)
.await
{
warn!("thread/unsubscribe failed during shutdown: {err}");
}
break;
}
CodexStatus::Shutdown => {
// `ShutdownComplete` does not identify which attached
// thread emitted it, so subagent shutdowns must not end
// the primary exec loop early.
}
}
}
InProcessServerEvent::Lagged { skipped } => {
let message = lagged_event_warning_message(skipped);
warn!("{message}");
let _ = event_processor.process_event(Event {
id: String::new(),
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
});
event_processor.process_warning(message);
}
}
}
@@ -936,10 +860,9 @@ fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
}
}
fn thread_resume_params_from_config(config: &Config, path: Option<PathBuf>) -> ThreadResumeParams {
fn thread_resume_params_from_config(config: &Config, thread_id: String) -> ThreadResumeParams {
ThreadResumeParams {
thread_id: "resume".to_string(),
path,
thread_id,
model: config.model.clone(),
model_provider: Some(config.model_provider_id.clone()),
cwd: Some(config.cwd.to_string_lossy().to_string()),
@@ -1017,20 +940,19 @@ fn session_configured_from_thread_resume_response(
)
}
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
match target {
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
}
}
#[expect(
clippy::too_many_arguments,
reason = "session mapping keeps explicit fields"
)]
/// Synthesizes startup session metadata from `thread/start` or `thread/resume`.
///
/// This is a compatibility bridge for the current in-process architecture.
/// Some session fields are not available synchronously from the start/resume
/// response, so callers must treat the result as a best-effort fallback until
/// a later `SessionConfigured` event proves otherwise.
/// TODO(architecture): stop synthesizing a partial `SessionConfiguredEvent`
/// here. Either return the authoritative session-configured payload from
/// `thread/start`/`thread/resume`, or introduce a smaller bootstrap type for
/// exec so this path cannot accidentally depend on placeholder fields.
fn session_configured_from_thread_response(
thread_id: &str,
thread_name: Option<String>,
@@ -1040,7 +962,7 @@ fn session_configured_from_thread_response(
service_tier: Option<codex_protocol::config_types::ServiceTier>,
approval_policy: AskForApproval,
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer,
sandbox_policy: codex_protocol::protocol::SandboxPolicy,
sandbox_policy: SandboxPolicy,
cwd: PathBuf,
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
) -> Result<SessionConfiguredEvent, String> {
@@ -1067,71 +989,198 @@ fn session_configured_from_thread_response(
})
}
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
match target {
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
}
}
fn normalize_legacy_notification_method(method: &str) -> &str {
method.strip_prefix("codex/event/").unwrap_or(method)
}
fn lagged_event_warning_message(skipped: usize) -> String {
format!("in-process app-server event stream lagged; dropped {skipped} events")
}
struct DecodedLegacyNotification {
conversation_id: Option<String>,
event: Event,
fn should_process_notification(
notification: &ServerNotification,
thread_id: &str,
turn_id: &str,
) -> bool {
match notification {
ServerNotification::ConfigWarning(_) | ServerNotification::DeprecationNotice(_) => true,
ServerNotification::Error(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::HookCompleted(notification) => {
notification.thread_id == thread_id
&& notification
.turn_id
.as_deref()
.is_none_or(|candidate| candidate == turn_id)
}
ServerNotification::HookStarted(notification) => {
notification.thread_id == thread_id
&& notification
.turn_id
.as_deref()
.is_none_or(|candidate| candidate == turn_id)
}
ServerNotification::ItemCompleted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ItemStarted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ModelRerouted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ThreadTokenUsageUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnCompleted(notification) => {
notification.thread_id == thread_id && notification.turn.id == turn_id
}
ServerNotification::TurnDiffUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnPlanUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnStarted(notification) => {
notification.thread_id == thread_id && notification.turn.id == turn_id
}
_ => false,
}
}
fn decode_legacy_notification(
notification: JSONRPCNotification,
) -> Result<DecodedLegacyNotification, String> {
let value = notification
.params
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
let method = notification.method;
let normalized_method = normalize_legacy_notification_method(&method).to_string();
let serde_json::Value::Object(mut object) = value else {
return Err(format!(
"legacy notification `{method}` params were not an object"
));
};
let conversation_id = object
.get("conversationId")
.and_then(serde_json::Value::as_str)
.map(str::to_owned);
let mut event_payload = if let Some(serde_json::Value::Object(msg_payload)) = object.get("msg")
{
serde_json::Value::Object(msg_payload.clone())
} else {
object.remove("conversationId");
serde_json::Value::Object(object)
};
let serde_json::Value::Object(ref mut object) = event_payload else {
return Err(format!(
"legacy notification `{method}` event payload was not an object"
));
};
object.insert(
"type".to_string(),
serde_json::Value::String(normalized_method),
);
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
vec![
ThreadSourceKind::Cli,
ThreadSourceKind::VsCode,
ThreadSourceKind::Exec,
ThreadSourceKind::AppServer,
ThreadSourceKind::SubAgent,
ThreadSourceKind::SubAgentReview,
ThreadSourceKind::SubAgentCompact,
ThreadSourceKind::SubAgentThreadSpawn,
ThreadSourceKind::SubAgentOther,
ThreadSourceKind::Unknown,
]
}
let msg: EventMsg = serde_json::from_value(event_payload)
.map_err(|err| format!("failed to decode event: {err}"))?;
Ok(DecodedLegacyNotification {
conversation_id,
event: Event {
id: String::new(),
msg,
},
})
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
if let Some(path) = thread.path.as_deref()
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
{
return cwd;
}
thread.cwd.clone()
}
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
let text = tokio::fs::read_to_string(path).await.ok()?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Some(item.cwd);
}
}
None
}
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
match (
path_utils::normalize_for_path_comparison(current_cwd),
path_utils::normalize_for_path_comparison(session_cwd),
) {
(Ok(current), Ok(session)) => current == session,
_ => current_cwd == session_cwd,
}
}
async fn resolve_resume_thread_id(
client: &InProcessAppServerClient,
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<String>> {
let model_providers = Some(vec![config.model_provider_id.clone()]);
if args.last {
let mut cursor = None;
loop {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
search_term: None,
},
},
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
for thread in response.data {
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
let Some(session_id) = args.session_id.as_deref() else {
return Ok(None);
};
if Uuid::parse_str(session_id).is_ok() {
return Ok(Some(session_id.to_string()));
}
let mut cursor = None;
loop {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: Some(vec![config.model_provider_id.clone()]),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
// Thread names are attached separately from rollout titles, so name
// resolution must scan the filtered list client-side instead of relying
// on the backend `search_term` filter.
search_term: None,
},
},
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
for thread in response.data {
if thread.name.as_deref() != Some(session_id) {
continue;
}
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
fn canceled_mcp_server_elicitation_response() -> Result<Value, String> {
@@ -1205,8 +1254,6 @@ fn server_request_method_name(request: &ServerRequest) -> String {
async fn handle_server_request(
client: &InProcessAppServerClient,
request: ServerRequest,
config: &Config,
_thread_id: &str,
error_seen: &mut bool,
) {
let method = server_request_method_name(&request);
@@ -1228,50 +1275,6 @@ async fn handle_server_request(
Err(err) => Err(err),
}
}
ServerRequest::ChatgptAuthTokensRefresh { request_id, params } => {
let refresh_result = tokio::task::spawn_blocking({
let config = config.clone();
move || local_external_chatgpt_tokens(&config)
})
.await;
match refresh_result {
Err(err) => {
reject_server_request(
client,
request_id,
&method,
format!("local chatgpt auth refresh task failed in exec: {err}"),
)
.await
}
Ok(Err(reason)) => reject_server_request(client, request_id, &method, reason).await,
Ok(Ok(response)) => {
if let Some(previous_account_id) = params.previous_account_id.as_deref()
&& previous_account_id != response.chatgpt_account_id
{
warn!(
"local auth refresh account mismatch: expected `{previous_account_id}`, got `{}`",
response.chatgpt_account_id
);
}
match serde_json::to_value(response) {
Ok(value) => {
resolve_server_request(
client,
request_id,
value,
"account/chatgptAuthTokens/refresh",
)
.await
}
Err(err) => Err(format!(
"failed to serialize chatgpt auth refresh response: {err}"
)),
}
}
}
}
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
reject_server_request(
client,
@@ -1320,6 +1323,15 @@ async fn handle_server_request(
)
.await
}
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. } => {
reject_server_request(
client,
request_id,
&method,
"chatgpt auth token refresh is not supported in exec mode".to_string(),
)
.await
}
ServerRequest::ApplyPatchApproval { request_id, params } => {
reject_server_request(
client,
@@ -1364,91 +1376,6 @@ async fn handle_server_request(
}
}
fn local_external_chatgpt_tokens(
config: &Config,
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config.cli_auth_credentials_store_mode,
);
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
auth_manager.reload();
let auth = auth_manager
.auth_cached()
.ok_or_else(|| "no cached auth available for local token refresh".to_string())?;
if !auth.is_external_chatgpt_tokens() {
return Err("external ChatGPT token auth is not active".to_string());
}
let access_token = auth
.get_token()
.map_err(|err| format!("failed to read external access token: {err}"))?;
let chatgpt_account_id = auth
.get_account_id()
.ok_or_else(|| "external token auth is missing chatgpt account id".to_string())?;
let chatgpt_plan_type = auth.account_plan_type().map(|plan_type| match plan_type {
AccountPlanType::Free => "free".to_string(),
AccountPlanType::Go => "go".to_string(),
AccountPlanType::Plus => "plus".to_string(),
AccountPlanType::Pro => "pro".to_string(),
AccountPlanType::Team => "team".to_string(),
AccountPlanType::Business => "business".to_string(),
AccountPlanType::Enterprise => "enterprise".to_string(),
AccountPlanType::Edu => "edu".to_string(),
AccountPlanType::Unknown => "unknown".to_string(),
});
Ok(ChatgptAuthTokensRefreshResponse {
access_token,
chatgpt_account_id,
chatgpt_plan_type,
})
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
let default_provider_filter = vec![config.model_provider_id.clone()];
let filter_cwd = if args.all {
None
} else {
Some(config.cwd.as_path())
};
match codex_core::RolloutRecorder::find_latest_thread_path(
config,
/*page_size*/ 1,
/*cursor*/ None,
codex_core::ThreadSortKey::UpdatedAt,
&[],
Some(default_provider_filter.as_slice()),
&config.model_provider_id,
filter_cwd,
)
.await
{
Ok(path) => Ok(path),
Err(e) => {
error!("Error listing threads: {e}");
Ok(None)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
if Uuid::parse_str(id_str).is_ok() {
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
} else {
let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?;
Ok(path)
}
} else {
Ok(None)
}
}
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
let path = path?;
@@ -1806,29 +1733,6 @@ mod tests {
);
}
#[test]
fn decode_legacy_notification_preserves_conversation_id() {
let decoded = decode_legacy_notification(JSONRPCNotification {
method: "codex/event/error".to_string(),
params: Some(serde_json::json!({
"conversationId": "thread-123",
"msg": {
"message": "boom"
}
})),
})
.expect("legacy notification should decode");
assert_eq!(decoded.conversation_id.as_deref(), Some("thread-123"));
assert!(matches!(
decoded.event.msg,
EventMsg::Error(codex_protocol::protocol::ErrorEvent {
message,
codex_error_info: None,
}) if message == "boom"
));
}
#[test]
fn canceled_mcp_server_elicitation_response_uses_cancel_action() {
let value = canceled_mcp_server_elicitation_response()

File diff suppressed because it is too large Load Diff

View File

@@ -122,9 +122,6 @@ impl App {
self.handle_server_notification_event(app_server_client, notification)
.await;
}
AppServerEvent::LegacyNotification(_) => {
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
}
AppServerEvent::ServerRequest(request) => {
self.handle_server_request_event(app_server_client, request)
.await;

View File

@@ -504,7 +504,6 @@ pub(crate) async fn run_onboarding_app(
return Err(color_eyre::eyre::eyre!(message));
}
AppServerEvent::Lagged { .. }
| AppServerEvent::LegacyNotification(_)
| AppServerEvent::ServerRequest(_) => {}
}
}