mirror of
https://github.com/openai/codex.git
synced 2026-04-07 16:11:41 +03:00
Compare commits
34 Commits
dev/window
...
etraut/cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36b50e02b8 | ||
|
|
ca4837cd0d | ||
|
|
bf91bae434 | ||
|
|
d054de45bd | ||
|
|
be5dba6189 | ||
|
|
efa0f04e6f | ||
|
|
36ba70c2bc | ||
|
|
9855a2269f | ||
|
|
851c37f066 | ||
|
|
38027b41ff | ||
|
|
c84b994c3d | ||
|
|
50b9c114be | ||
|
|
d15933b8d9 | ||
|
|
5d438ff4f4 | ||
|
|
e5172ff1c1 | ||
|
|
e4d070ff55 | ||
|
|
2e69132680 | ||
|
|
86a33fbd79 | ||
|
|
d4c72ea829 | ||
|
|
50b61563c5 | ||
|
|
be700f8284 | ||
|
|
6058aba292 | ||
|
|
99248e8300 | ||
|
|
6e32ed312d | ||
|
|
4daf0300e1 | ||
|
|
e616e4b28b | ||
|
|
f83b099c10 | ||
|
|
eeeb01a4bd | ||
|
|
c2851508b4 | ||
|
|
a19077ce26 | ||
|
|
58e2e9a2bb | ||
|
|
17482fc8e6 | ||
|
|
8dc496ec47 | ||
|
|
cf6cc9043d |
5
codex-rs/Cargo.lock
generated
5
codex-rs/Cargo.lock
generated
@@ -1488,7 +1488,6 @@ dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-core",
|
||||
"codex-features",
|
||||
"codex-feedback",
|
||||
"codex-protocol",
|
||||
"futures",
|
||||
@@ -1996,9 +1995,7 @@ dependencies = [
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
"codex-utils-elapsed",
|
||||
"codex-utils-oss",
|
||||
"codex-utils-sandbox-summary",
|
||||
"core_test_support",
|
||||
"libc",
|
||||
"opentelemetry",
|
||||
@@ -2006,10 +2003,8 @@ dependencies = [
|
||||
"owo-colors",
|
||||
"predicates",
|
||||
"pretty_assertions",
|
||||
"rmcp",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"shlex",
|
||||
"supports-color 3.0.2",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
|
||||
@@ -16,7 +16,6 @@ codex-app-server = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-features = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -35,19 +35,14 @@ use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_features::Feature;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use serde::de::DeserializeOwned;
|
||||
@@ -73,7 +68,6 @@ pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
||||
pub enum AppServerEvent {
|
||||
Lagged { skipped: usize },
|
||||
ServerNotification(ServerNotification),
|
||||
LegacyNotification(JSONRPCNotification),
|
||||
ServerRequest(ServerRequest),
|
||||
Disconnected { message: String },
|
||||
}
|
||||
@@ -85,9 +79,6 @@ impl From<InProcessServerEvent> for AppServerEvent {
|
||||
InProcessServerEvent::ServerNotification(notification) => {
|
||||
Self::ServerNotification(notification)
|
||||
}
|
||||
InProcessServerEvent::LegacyNotification(notification) => {
|
||||
Self::LegacyNotification(notification)
|
||||
}
|
||||
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
|
||||
}
|
||||
}
|
||||
@@ -97,19 +88,12 @@ fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
|
||||
// These terminal events drive surface shutdown/completion state. Dropping
|
||||
// them under backpressure can leave exec/TUI waiting forever even though
|
||||
// the underlying turn has already ended.
|
||||
match event {
|
||||
matches!(
|
||||
event,
|
||||
InProcessServerEvent::ServerNotification(
|
||||
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
|
||||
) => true,
|
||||
InProcessServerEvent::LegacyNotification(notification) => matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
),
|
||||
_ => false,
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
/// Layered error for [`InProcessAppServerClient::request_typed`].
|
||||
@@ -159,16 +143,6 @@ impl Error for TypedRequestError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SharedCoreManagers {
|
||||
// Temporary bootstrap escape hatch for embedders that still need direct
|
||||
// core handles during the in-process app-server migration. Once TUI/exec
|
||||
// stop depending on direct manager access, remove this wrapper and keep
|
||||
// manager ownership entirely inside the app-server runtime.
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessClientStartArgs {
|
||||
/// Resolved argv0 dispatch paths used by command execution internals.
|
||||
@@ -202,30 +176,6 @@ pub struct InProcessClientStartArgs {
|
||||
}
|
||||
|
||||
impl InProcessClientStartArgs {
|
||||
fn shared_core_managers(&self) -> SharedCoreManagers {
|
||||
let auth_manager = AuthManager::shared(
|
||||
self.config.codex_home.clone(),
|
||||
self.enable_codex_api_key_env,
|
||||
self.config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
self.config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
self.session_source.clone(),
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: self
|
||||
.config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
|
||||
SharedCoreManagers {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds initialize params from caller-provided metadata.
|
||||
pub fn initialize_params(&self) -> InitializeParams {
|
||||
let capabilities = InitializeCapabilities {
|
||||
@@ -247,7 +197,7 @@ impl InProcessClientStartArgs {
|
||||
}
|
||||
}
|
||||
|
||||
fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs {
|
||||
fn into_runtime_start_args(self) -> InProcessStartArgs {
|
||||
let initialize = self.initialize_params();
|
||||
InProcessStartArgs {
|
||||
arg0_paths: self.arg0_paths,
|
||||
@@ -255,8 +205,6 @@ impl InProcessClientStartArgs {
|
||||
cli_overrides: self.cli_overrides,
|
||||
loader_overrides: self.loader_overrides,
|
||||
cloud_requirements: self.cloud_requirements,
|
||||
auth_manager: Some(shared_core.auth_manager.clone()),
|
||||
thread_manager: Some(shared_core.thread_manager.clone()),
|
||||
feedback: self.feedback,
|
||||
config_warnings: self.config_warnings,
|
||||
session_source: self.session_source,
|
||||
@@ -310,8 +258,6 @@ pub struct InProcessAppServerClient {
|
||||
command_tx: mpsc::Sender<ClientCommand>,
|
||||
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
||||
worker_handle: tokio::task::JoinHandle<()>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -338,9 +284,8 @@ impl InProcessAppServerClient {
|
||||
/// with overload error instead of being silently dropped.
|
||||
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let shared_core = args.shared_core_managers();
|
||||
let mut handle =
|
||||
codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?;
|
||||
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
|
||||
let request_sender = handle.sender();
|
||||
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
@@ -401,6 +346,25 @@ impl InProcessAppServerClient {
|
||||
let Some(event) = event else {
|
||||
break;
|
||||
};
|
||||
if let InProcessServerEvent::ServerRequest(
|
||||
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. }
|
||||
) = &event
|
||||
{
|
||||
let send_result = request_sender.fail_server_request(
|
||||
request_id.clone(),
|
||||
JSONRPCErrorError {
|
||||
code: -32000,
|
||||
message: "chatgpt auth token refresh is not supported for in-process app-server clients".to_string(),
|
||||
data: None,
|
||||
},
|
||||
);
|
||||
if let Err(err) = send_result {
|
||||
warn!(
|
||||
"failed to reject unsupported chatgpt auth token refresh request: {err}"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if skipped_events > 0 {
|
||||
if event_requires_delivery(&event) {
|
||||
@@ -491,21 +455,9 @@ impl InProcessAppServerClient {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
auth_manager: shared_core.auth_manager,
|
||||
thread_manager: shared_core.thread_manager,
|
||||
})
|
||||
}
|
||||
|
||||
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
|
||||
pub fn auth_manager(&self) -> Arc<AuthManager> {
|
||||
self.auth_manager.clone()
|
||||
}
|
||||
|
||||
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
|
||||
pub fn thread_manager(&self) -> Arc<ThreadManager> {
|
||||
self.thread_manager.clone()
|
||||
}
|
||||
|
||||
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
|
||||
InProcessAppServerRequestHandle {
|
||||
command_tx: self.command_tx.clone(),
|
||||
@@ -664,8 +616,6 @@ impl InProcessAppServerClient {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
auth_manager: _,
|
||||
thread_manager: _,
|
||||
} = self;
|
||||
let mut worker_handle = worker_handle;
|
||||
// Drop the caller-facing receiver before asking the worker to shut
|
||||
@@ -857,8 +807,6 @@ mod tests {
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_app_server_protocol::ToolRequestUserInputQuestion;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
@@ -1052,7 +1000,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shared_thread_manager_tracks_threads_started_via_app_server() {
|
||||
async fn threads_started_via_app_server_are_visible_through_typed_requests() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
|
||||
let response: ThreadStartResponse = client
|
||||
@@ -1065,17 +1013,19 @@ mod tests {
|
||||
})
|
||||
.await
|
||||
.expect("thread/start should succeed");
|
||||
let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id)
|
||||
.expect("thread id should parse");
|
||||
timeout(
|
||||
Duration::from_secs(2),
|
||||
client.thread_manager().get_thread(created_thread_id),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for retained thread manager to observe started thread")
|
||||
.expect("started thread should be visible through the shared thread manager");
|
||||
let thread_ids = client.thread_manager().list_thread_ids().await;
|
||||
assert!(thread_ids.contains(&created_thread_id));
|
||||
let read = client
|
||||
.request_typed::<codex_app_server_protocol::ThreadReadResponse>(
|
||||
ClientRequest::ThreadRead {
|
||||
request_id: RequestId::Integer(4),
|
||||
params: codex_app_server_protocol::ThreadReadParams {
|
||||
thread_id: response.thread.id.clone(),
|
||||
include_turns: false,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("thread/read should return the newly started thread");
|
||||
assert_eq!(read.thread.id, response.thread.id);
|
||||
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
@@ -1472,22 +1422,6 @@ mod tests {
|
||||
let (command_tx, _command_rx) = mpsc::channel(1);
|
||||
let (event_tx, event_rx) = mpsc::channel(1);
|
||||
let worker_handle = tokio::spawn(async {});
|
||||
let config = build_test_config().await;
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
event_tx
|
||||
.send(InProcessServerEvent::Lagged { skipped: 3 })
|
||||
.await
|
||||
@@ -1498,8 +1432,6 @@ mod tests {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
};
|
||||
|
||||
let event = timeout(Duration::from_secs(2), client.next_event())
|
||||
@@ -1530,37 +1462,38 @@ mod tests {
|
||||
)
|
||||
)
|
||||
));
|
||||
assert!(event_requires_delivery(
|
||||
&InProcessServerEvent::LegacyNotification(
|
||||
codex_app_server_protocol::JSONRPCNotification {
|
||||
method: "codex/event/turn_aborted".to_string(),
|
||||
params: None,
|
||||
}
|
||||
)
|
||||
));
|
||||
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
|
||||
skipped: 1
|
||||
}));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn accessors_expose_retained_shared_managers() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
async fn runtime_start_args_leave_manager_bootstrap_to_app_server() {
|
||||
let config = Arc::new(build_test_config().await);
|
||||
|
||||
assert!(
|
||||
Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()),
|
||||
"auth_manager accessor should clone the retained shared manager"
|
||||
);
|
||||
assert!(
|
||||
Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()),
|
||||
"thread_manager accessor should clone the retained shared manager"
|
||||
);
|
||||
let runtime_args = InProcessClientStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: config.clone(),
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::Exec,
|
||||
enable_codex_api_key_env: false,
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
}
|
||||
.into_runtime_start_args();
|
||||
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
assert_eq!(runtime_args.config, config);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_completes_promptly_with_retained_shared_managers() {
|
||||
async fn shutdown_completes_promptly_without_retained_managers() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
|
||||
timeout(Duration::from_secs(1), client.shutdown())
|
||||
|
||||
@@ -272,18 +272,19 @@ impl RemoteAppServerClient {
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Notification(notification)) => {
|
||||
let event = app_server_event_from_notification(notification);
|
||||
if let Err(err) = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
event,
|
||||
&mut stream,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(%err, "failed to deliver remote app-server event");
|
||||
break;
|
||||
}
|
||||
if let Some(event) =
|
||||
app_server_event_from_notification(notification)
|
||||
&& let Err(err) = deliver_event(
|
||||
&event_tx,
|
||||
&mut skipped_events,
|
||||
event,
|
||||
&mut stream,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(%err, "failed to deliver remote app-server event");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(JSONRPCMessage::Request(request)) => {
|
||||
let request_id = request.id.clone();
|
||||
@@ -673,7 +674,9 @@ async fn initialize_remote_connection(
|
||||
)));
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
pending_events.push(app_server_event_from_notification(notification));
|
||||
if let Some(event) = app_server_event_from_notification(notification) {
|
||||
pending_events.push(event);
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Request(request) => {
|
||||
let request_id = request.id.clone();
|
||||
@@ -756,10 +759,10 @@ async fn initialize_remote_connection(
|
||||
Ok(pending_events)
|
||||
}
|
||||
|
||||
fn app_server_event_from_notification(notification: JSONRPCNotification) -> AppServerEvent {
|
||||
match ServerNotification::try_from(notification.clone()) {
|
||||
Ok(notification) => AppServerEvent::ServerNotification(notification),
|
||||
Err(_) => AppServerEvent::LegacyNotification(notification),
|
||||
fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
|
||||
match ServerNotification::try_from(notification) {
|
||||
Ok(notification) => Some(AppServerEvent::ServerNotification(notification)),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -852,13 +855,6 @@ async fn reject_if_server_request_dropped(
|
||||
fn event_requires_delivery(event: &AppServerEvent) -> bool {
|
||||
match event {
|
||||
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
|
||||
AppServerEvent::LegacyNotification(notification) => matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
),
|
||||
AppServerEvent::Disconnected { .. } => true,
|
||||
AppServerEvent::Lagged { .. }
|
||||
| AppServerEvent::ServerNotification(_)
|
||||
|
||||
@@ -68,14 +68,11 @@ use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
@@ -98,16 +95,6 @@ fn server_notification_requires_delivery(notification: &ServerNotification) -> b
|
||||
matches!(notification, ServerNotification::TurnCompleted(_))
|
||||
}
|
||||
|
||||
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
|
||||
matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
)
|
||||
}
|
||||
|
||||
/// Input needed to start an in-process app-server runtime.
|
||||
///
|
||||
/// These fields mirror the pieces of ambient process state that stdio and
|
||||
@@ -124,10 +111,6 @@ pub struct InProcessStartArgs {
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Optional prebuilt auth manager reused by an embedding caller.
|
||||
pub auth_manager: Option<Arc<AuthManager>>,
|
||||
/// Optional prebuilt thread manager reused by an embedding caller.
|
||||
pub thread_manager: Option<Arc<ThreadManager>>,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// Startup warnings emitted after initialize succeeds.
|
||||
@@ -144,11 +127,6 @@ pub struct InProcessStartArgs {
|
||||
|
||||
/// Event emitted from the app-server to the in-process client.
|
||||
///
|
||||
/// The stream carries three event families because CLI surfaces are mid-migration
|
||||
/// from the legacy `codex_protocol::Event` model to the typed app-server
|
||||
/// notification model. Once all surfaces consume only [`ServerNotification`],
|
||||
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
|
||||
///
|
||||
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
|
||||
/// event — it signals that the consumer fell behind and some events were dropped.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -157,8 +135,6 @@ pub enum InProcessServerEvent {
|
||||
ServerRequest(ServerRequest),
|
||||
/// App-server notification directed to the embedded client.
|
||||
ServerNotification(ServerNotification),
|
||||
/// Legacy JSON-RPC notification from core event bridge.
|
||||
LegacyNotification(JSONRPCNotification),
|
||||
/// Indicates one or more events were dropped due to backpressure.
|
||||
Lagged { skipped: usize },
|
||||
}
|
||||
@@ -390,7 +366,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
Arc::clone(&outbound_initialized),
|
||||
Arc::clone(&outbound_experimental_api_enabled),
|
||||
Arc::clone(&outbound_opted_out_notification_methods),
|
||||
/*allow_legacy_notifications*/ true,
|
||||
/*disconnect_sender*/ None,
|
||||
),
|
||||
);
|
||||
@@ -410,8 +385,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
cli_overrides: args.cli_overrides,
|
||||
loader_overrides: args.loader_overrides,
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
auth_manager: args.auth_manager,
|
||||
thread_manager: args.thread_manager,
|
||||
feedback: args.feedback,
|
||||
log_db: None,
|
||||
config_warnings: args.config_warnings,
|
||||
@@ -655,32 +628,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
}
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Notification(notification) => {
|
||||
let notification = JSONRPCNotification {
|
||||
method: notification.method,
|
||||
params: notification.params,
|
||||
};
|
||||
if legacy_notification_requires_delivery(¬ification) {
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::LegacyNotification(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if let Err(send_error) =
|
||||
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
|
||||
{
|
||||
match send_error {
|
||||
mpsc::error::TrySendError::Full(_) => {
|
||||
warn!("dropping in-process legacy notification (queue full)");
|
||||
}
|
||||
mpsc::error::TrySendError::Closed(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Notification(_notification) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -759,8 +707,6 @@ mod tests {
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source,
|
||||
@@ -858,7 +804,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
||||
fn guaranteed_delivery_helpers_cover_terminal_server_notifications() {
|
||||
assert!(server_notification_requires_delivery(
|
||||
&ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
@@ -870,30 +816,5 @@ mod tests {
|
||||
},
|
||||
})
|
||||
));
|
||||
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/task_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/turn_aborted".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/shutdown_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(!legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/item_started".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,8 +104,6 @@ enum OutboundControlEvent {
|
||||
Opened {
|
||||
connection_id: ConnectionId,
|
||||
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
|
||||
// Allow codex/event/* notifications to be emitted.
|
||||
allow_legacy_notifications: bool,
|
||||
disconnect_sender: Option<CancellationToken>,
|
||||
initialized: Arc<AtomicBool>,
|
||||
experimental_api_enabled: Arc<AtomicBool>,
|
||||
@@ -562,7 +560,6 @@ pub async fn run_main_with_transport(
|
||||
OutboundControlEvent::Opened {
|
||||
connection_id,
|
||||
writer,
|
||||
allow_legacy_notifications,
|
||||
disconnect_sender,
|
||||
initialized,
|
||||
experimental_api_enabled,
|
||||
@@ -575,7 +572,6 @@ pub async fn run_main_with_transport(
|
||||
initialized,
|
||||
experimental_api_enabled,
|
||||
opted_out_notification_methods,
|
||||
allow_legacy_notifications,
|
||||
disconnect_sender,
|
||||
),
|
||||
);
|
||||
@@ -618,8 +614,6 @@ pub async fn run_main_with_transport(
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: feedback.clone(),
|
||||
log_db,
|
||||
config_warnings,
|
||||
@@ -675,7 +669,6 @@ pub async fn run_main_with_transport(
|
||||
TransportEvent::ConnectionOpened {
|
||||
connection_id,
|
||||
writer,
|
||||
allow_legacy_notifications,
|
||||
disconnect_sender,
|
||||
} => {
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
@@ -687,7 +680,6 @@ pub async fn run_main_with_transport(
|
||||
.send(OutboundControlEvent::Opened {
|
||||
connection_id,
|
||||
writer,
|
||||
allow_legacy_notifications,
|
||||
disconnect_sender,
|
||||
initialized: Arc::clone(&outbound_initialized),
|
||||
experimental_api_enabled: Arc::clone(
|
||||
|
||||
@@ -172,8 +172,6 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
|
||||
pub(crate) loader_overrides: LoaderOverrides,
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) auth_manager: Option<Arc<AuthManager>>,
|
||||
pub(crate) thread_manager: Option<Arc<ThreadManager>>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
@@ -192,36 +190,27 @@ impl MessageProcessor {
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
feedback,
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env,
|
||||
} = args;
|
||||
let (auth_manager, thread_manager) = match (auth_manager, thread_manager) {
|
||||
(Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager),
|
||||
(None, None) => {
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
(auth_manager, thread_manager)
|
||||
}
|
||||
_ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"),
|
||||
};
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
|
||||
@@ -239,8 +239,6 @@ fn build_test_processor(
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
config_warnings: Vec::new(),
|
||||
|
||||
@@ -188,7 +188,6 @@ pub(crate) enum TransportEvent {
|
||||
ConnectionOpened {
|
||||
connection_id: ConnectionId,
|
||||
writer: mpsc::Sender<OutgoingMessage>,
|
||||
allow_legacy_notifications: bool,
|
||||
disconnect_sender: Option<CancellationToken>,
|
||||
},
|
||||
ConnectionClosed {
|
||||
@@ -226,7 +225,6 @@ pub(crate) struct OutboundConnectionState {
|
||||
pub(crate) initialized: Arc<AtomicBool>,
|
||||
pub(crate) experimental_api_enabled: Arc<AtomicBool>,
|
||||
pub(crate) opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
||||
pub(crate) allow_legacy_notifications: bool,
|
||||
pub(crate) writer: mpsc::Sender<OutgoingMessage>,
|
||||
disconnect_sender: Option<CancellationToken>,
|
||||
}
|
||||
@@ -237,14 +235,12 @@ impl OutboundConnectionState {
|
||||
initialized: Arc<AtomicBool>,
|
||||
experimental_api_enabled: Arc<AtomicBool>,
|
||||
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
||||
allow_legacy_notifications: bool,
|
||||
disconnect_sender: Option<CancellationToken>,
|
||||
) -> Self {
|
||||
Self {
|
||||
initialized,
|
||||
experimental_api_enabled,
|
||||
opted_out_notification_methods,
|
||||
allow_legacy_notifications,
|
||||
writer,
|
||||
disconnect_sender,
|
||||
}
|
||||
@@ -272,7 +268,6 @@ pub(crate) async fn start_stdio_connection(
|
||||
.send(TransportEvent::ConnectionOpened {
|
||||
connection_id,
|
||||
writer: writer_tx,
|
||||
allow_legacy_notifications: false,
|
||||
disconnect_sender: None,
|
||||
})
|
||||
.await
|
||||
@@ -376,7 +371,6 @@ async fn run_websocket_connection(
|
||||
.send(TransportEvent::ConnectionOpened {
|
||||
connection_id,
|
||||
writer: writer_tx,
|
||||
allow_legacy_notifications: false,
|
||||
disconnect_sender: Some(disconnect_token.clone()),
|
||||
})
|
||||
.await
|
||||
@@ -584,13 +578,10 @@ fn should_skip_notification_for_connection(
|
||||
connection_state: &OutboundConnectionState,
|
||||
message: &OutgoingMessage,
|
||||
) -> bool {
|
||||
if !connection_state.allow_legacy_notifications
|
||||
&& matches!(message, OutgoingMessage::Notification(_))
|
||||
{
|
||||
if matches!(message, OutgoingMessage::Notification(_)) {
|
||||
// Raw legacy `codex/event/*` notifications are still emitted upstream
|
||||
// for in-process compatibility, but they are no longer part of the
|
||||
// external app-server contract. Keep dropping them here until the
|
||||
// producer path can be deleted entirely.
|
||||
// for compatibility, but they are no longer part of the app-server
|
||||
// client contract. Drop them until the producer path can be deleted.
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -970,7 +961,6 @@ mod tests {
|
||||
initialized,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
opted_out_notification_methods,
|
||||
false,
|
||||
None,
|
||||
),
|
||||
);
|
||||
@@ -1008,7 +998,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
None,
|
||||
),
|
||||
);
|
||||
@@ -1034,7 +1023,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn to_connection_legacy_notifications_are_preserved_for_in_process_clients() {
|
||||
async fn to_connection_legacy_notifications_are_dropped_for_in_process_clients() {
|
||||
let connection_id = ConnectionId(11);
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel(1);
|
||||
|
||||
@@ -1046,7 +1035,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
true,
|
||||
None,
|
||||
),
|
||||
);
|
||||
@@ -1065,17 +1053,10 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
let message = writer_rx
|
||||
.recv()
|
||||
.await
|
||||
.expect("legacy notification should reach in-process clients");
|
||||
assert!(matches!(
|
||||
message,
|
||||
OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification {
|
||||
method,
|
||||
params: None,
|
||||
}) if method == "codex/event/task_started"
|
||||
));
|
||||
assert!(
|
||||
writer_rx.try_recv().is_err(),
|
||||
"legacy notifications should not reach in-process clients"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1091,7 +1072,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
None,
|
||||
),
|
||||
);
|
||||
@@ -1158,7 +1138,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
None,
|
||||
),
|
||||
);
|
||||
@@ -1246,7 +1225,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
Some(fast_disconnect_token.clone()),
|
||||
),
|
||||
);
|
||||
@@ -1257,7 +1235,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
Some(slow_disconnect_token.clone()),
|
||||
),
|
||||
);
|
||||
@@ -1329,7 +1306,6 @@ mod tests {
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
false,
|
||||
None,
|
||||
),
|
||||
);
|
||||
|
||||
@@ -3,6 +3,7 @@ name = "codex-exec"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
autotests = false
|
||||
|
||||
[[bin]]
|
||||
name = "codex-exec"
|
||||
@@ -12,6 +13,10 @@ path = "src/main.rs"
|
||||
name = "codex_exec"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[test]]
|
||||
name = "all"
|
||||
path = "tests/all.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -28,13 +33,10 @@ codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cli = { workspace = true }
|
||||
codex-utils-elapsed = { workspace = true }
|
||||
codex-utils-oss = { workspace = true }
|
||||
codex-utils-sandbox-summary = { workspace = true }
|
||||
owo-colors = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
shlex = { workspace = true }
|
||||
supports-color = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"io-std",
|
||||
@@ -63,7 +65,6 @@ opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
predicates = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
rmcp = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::path::Path;
|
||||
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_core::config::Config;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
|
||||
pub(crate) enum CodexStatus {
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum CodexStatus {
|
||||
Running,
|
||||
InitiateShutdown,
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
pub(crate) trait EventProcessor {
|
||||
@@ -19,8 +19,11 @@ pub(crate) trait EventProcessor {
|
||||
session_configured: &SessionConfiguredEvent,
|
||||
);
|
||||
|
||||
/// Handle a single event emitted by the agent.
|
||||
fn process_event(&mut self, event: Event) -> CodexStatus;
|
||||
/// Handle a single typed app-server notification emitted by the agent.
|
||||
fn process_server_notification(&mut self, notification: ServerNotification) -> CodexStatus;
|
||||
|
||||
/// Handle a local exec warning that is not represented as an app-server notification.
|
||||
fn process_warning(&mut self, message: String) -> CodexStatus;
|
||||
|
||||
fn print_final_output(&mut self) {}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -17,11 +17,9 @@ use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessClientStartArgs;
|
||||
use codex_app_server_client::InProcessServerEvent;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::McpServerElicitationAction;
|
||||
use codex_app_server_protocol::McpServerElicitationRequestResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -30,8 +28,13 @@ use codex_app_server_protocol::ReviewStartResponse;
|
||||
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::Thread as AppServerThread;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadUnsubscribeParams;
|
||||
@@ -41,8 +44,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_core::AuthManager;
|
||||
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
|
||||
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
use codex_core::OLLAMA_OSS_PROVIDER_ID;
|
||||
use codex_core::auth::AuthConfig;
|
||||
@@ -59,16 +61,17 @@ use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::config_loader::format_config_error_with_source;
|
||||
use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::git_info::get_git_repo_root;
|
||||
use codex_core::path_utils;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_otel::set_parent_from_context;
|
||||
use codex_otel::traceparent_context_from_env;
|
||||
use codex_protocol::account::PlanType as AccountPlanType;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -79,10 +82,9 @@ use event_processor_with_human_output::EventProcessorWithHumanOutput;
|
||||
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::IsTerminal;
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use supports_color::Stream;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -101,8 +103,6 @@ use crate::event_processor::CodexStatus;
|
||||
use crate::event_processor::EventProcessor;
|
||||
use codex_core::default_client::set_default_client_residency_requirement;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_name_str;
|
||||
|
||||
const DEFAULT_ANALYTICS_ENABLED: bool = true;
|
||||
|
||||
@@ -287,18 +287,17 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
}
|
||||
};
|
||||
|
||||
let cloud_auth_manager = AuthManager::shared(
|
||||
codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config_toml.cli_auth_credentials_store.unwrap_or_default(),
|
||||
);
|
||||
let chatgpt_base_url = config_toml
|
||||
.chatgpt_base_url
|
||||
.clone()
|
||||
.unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string());
|
||||
// TODO(gt): Make cloud requirements failures blocking once we can fail-closed.
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(cloud_auth_manager, chatgpt_base_url, codex_home.clone());
|
||||
let cloud_requirements = cloud_requirements_loader_for_storage(
|
||||
codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config_toml.cli_auth_credentials_store.unwrap_or_default(),
|
||||
chatgpt_base_url,
|
||||
);
|
||||
let run_cli_overrides = cli_kv_overrides.clone();
|
||||
let run_loader_overrides = LoaderOverrides::default();
|
||||
let run_cloud_requirements = cloud_requirements.clone();
|
||||
@@ -500,14 +499,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
last_message_file.clone(),
|
||||
)),
|
||||
};
|
||||
let required_mcp_servers: HashSet<String> = config
|
||||
.mcp_servers
|
||||
.get()
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled && server.required)
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
|
||||
if oss {
|
||||
// We're in the oss section, so provider_id should be Some
|
||||
// Let's handle None case gracefully though just in case
|
||||
@@ -547,17 +538,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
|
||||
})?;
|
||||
|
||||
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
|
||||
// Handle resume subcommand through existing `thread/list` + `thread/resume`
|
||||
// APIs so exec no longer reaches into rollout storage directly.
|
||||
let (primary_thread_id, fallback_session_configured) =
|
||||
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
|
||||
let resume_path = resolve_resume_path(&config, args).await?;
|
||||
|
||||
if let Some(path) = resume_path {
|
||||
if let Some(thread_id) = resolve_resume_thread_id(&client, &config, args).await? {
|
||||
let response: ThreadResumeResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadResume {
|
||||
request_id: request_ids.next(),
|
||||
params: thread_resume_params_from_config(&config, Some(path)),
|
||||
params: thread_resume_params_from_config(&config, thread_id),
|
||||
},
|
||||
"thread/resume",
|
||||
)
|
||||
@@ -598,7 +588,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let primary_thread_id_for_span = primary_thread_id.to_string();
|
||||
let mut buffered_events = VecDeque::new();
|
||||
// Use the start/resume response as the authoritative bootstrap payload.
|
||||
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
|
||||
// avoidable startup latency on the in-process path.
|
||||
@@ -670,10 +659,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
// is using.
|
||||
event_processor.print_config_summary(&config, &prompt_summary, &session_configured);
|
||||
if !json_mode && let Some(message) = codex_core::config::missing_system_bwrap_warning() {
|
||||
let _ = event_processor.process_event(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
|
||||
});
|
||||
event_processor.process_warning(message);
|
||||
}
|
||||
|
||||
info!("Codex initialized with event: {session_configured:?}");
|
||||
@@ -748,34 +734,30 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
let mut interrupt_channel_open = true;
|
||||
let primary_thread_id_for_requests = primary_thread_id.to_string();
|
||||
loop {
|
||||
let server_event = if let Some(event) = buffered_events.pop_front() {
|
||||
Some(event)
|
||||
} else {
|
||||
tokio::select! {
|
||||
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
|
||||
if maybe_interrupt.is_none() {
|
||||
interrupt_channel_open = false;
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
|
||||
&client,
|
||||
ClientRequest::TurnInterrupt {
|
||||
request_id: request_ids.next(),
|
||||
params: TurnInterruptParams {
|
||||
thread_id: primary_thread_id_for_requests.clone(),
|
||||
turn_id: task_id.clone(),
|
||||
},
|
||||
},
|
||||
"turn/interrupt",
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("turn/interrupt failed: {err}");
|
||||
}
|
||||
let server_event = tokio::select! {
|
||||
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
|
||||
if maybe_interrupt.is_none() {
|
||||
interrupt_channel_open = false;
|
||||
continue;
|
||||
}
|
||||
maybe_event = client.next_event() => maybe_event,
|
||||
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
|
||||
&client,
|
||||
ClientRequest::TurnInterrupt {
|
||||
request_id: request_ids.next(),
|
||||
params: TurnInterruptParams {
|
||||
thread_id: primary_thread_id_for_requests.clone(),
|
||||
turn_id: task_id.clone(),
|
||||
},
|
||||
},
|
||||
"turn/interrupt",
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("turn/interrupt failed: {err}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
maybe_event = client.next_event() => maybe_event,
|
||||
};
|
||||
|
||||
let Some(server_event) = server_event else {
|
||||
@@ -784,69 +766,36 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
|
||||
match server_event {
|
||||
InProcessServerEvent::ServerRequest(request) => {
|
||||
handle_server_request(
|
||||
&client,
|
||||
request,
|
||||
&config,
|
||||
&primary_thread_id_for_requests,
|
||||
&mut error_seen,
|
||||
)
|
||||
.await;
|
||||
handle_server_request(&client, request, &mut error_seen).await;
|
||||
}
|
||||
InProcessServerEvent::ServerNotification(notification) => {
|
||||
if let ServerNotification::Error(payload) = ¬ification
|
||||
if let ServerNotification::Error(payload) = ¬ification {
|
||||
if payload.thread_id == primary_thread_id_for_requests
|
||||
&& payload.turn_id == task_id
|
||||
&& !payload.will_retry
|
||||
{
|
||||
error_seen = true;
|
||||
}
|
||||
} else if let ServerNotification::TurnCompleted(payload) = ¬ification
|
||||
&& payload.thread_id == primary_thread_id_for_requests
|
||||
&& payload.turn_id == task_id
|
||||
&& !payload.will_retry
|
||||
&& payload.turn.id == task_id
|
||||
&& matches!(
|
||||
payload.turn.status,
|
||||
codex_app_server_protocol::TurnStatus::Failed
|
||||
| codex_app_server_protocol::TurnStatus::Interrupted
|
||||
)
|
||||
{
|
||||
error_seen = true;
|
||||
}
|
||||
}
|
||||
InProcessServerEvent::LegacyNotification(notification) => {
|
||||
let decoded = match decode_legacy_notification(notification) {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
warn!("{err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if decoded.conversation_id.as_deref()
|
||||
!= Some(primary_thread_id_for_requests.as_str())
|
||||
&& decoded.conversation_id.is_some()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let event = decoded.event;
|
||||
if matches!(event.msg, EventMsg::SessionConfigured(_)) {
|
||||
continue;
|
||||
}
|
||||
if matches!(event.msg, EventMsg::Error(_)) {
|
||||
// The legacy bridge still carries fatal turn failures for
|
||||
// exec. Preserve the non-zero exit behavior until this
|
||||
// path is fully replaced by typed server notifications.
|
||||
error_seen = true;
|
||||
}
|
||||
match &event.msg {
|
||||
EventMsg::TurnComplete(payload) => {
|
||||
if payload.turn_id != task_id {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
EventMsg::TurnAborted(payload) => {
|
||||
if payload.turn_id.as_deref() != Some(task_id.as_str()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
EventMsg::McpStartupUpdate(update) => {
|
||||
if required_mcp_servers.contains(&update.server)
|
||||
&& let codex_protocol::protocol::McpStartupStatus::Failed { error } =
|
||||
&update.status
|
||||
{
|
||||
error_seen = true;
|
||||
eprintln!(
|
||||
"Required MCP server '{}' failed to initialize: {error}",
|
||||
update.server
|
||||
);
|
||||
|
||||
if should_process_notification(
|
||||
¬ification,
|
||||
&primary_thread_id_for_requests,
|
||||
&task_id,
|
||||
) {
|
||||
match event_processor.process_server_notification(notification) {
|
||||
CodexStatus::Running => {}
|
||||
CodexStatus::InitiateShutdown => {
|
||||
if let Err(err) = request_shutdown(
|
||||
&client,
|
||||
&mut request_ids,
|
||||
@@ -859,37 +808,12 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
match event_processor.process_event(event) {
|
||||
CodexStatus::Running => {}
|
||||
CodexStatus::InitiateShutdown => {
|
||||
if let Err(err) = request_shutdown(
|
||||
&client,
|
||||
&mut request_ids,
|
||||
&primary_thread_id_for_requests,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("thread/unsubscribe failed during shutdown: {err}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
CodexStatus::Shutdown => {
|
||||
// `ShutdownComplete` does not identify which attached
|
||||
// thread emitted it, so subagent shutdowns must not end
|
||||
// the primary exec loop early.
|
||||
}
|
||||
}
|
||||
}
|
||||
InProcessServerEvent::Lagged { skipped } => {
|
||||
let message = lagged_event_warning_message(skipped);
|
||||
warn!("{message}");
|
||||
let _ = event_processor.process_event(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
|
||||
});
|
||||
event_processor.process_warning(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -936,10 +860,9 @@ fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_resume_params_from_config(config: &Config, path: Option<PathBuf>) -> ThreadResumeParams {
|
||||
fn thread_resume_params_from_config(config: &Config, thread_id: String) -> ThreadResumeParams {
|
||||
ThreadResumeParams {
|
||||
thread_id: "resume".to_string(),
|
||||
path,
|
||||
thread_id,
|
||||
model: config.model.clone(),
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
cwd: Some(config.cwd.to_string_lossy().to_string()),
|
||||
@@ -1017,20 +940,19 @@ fn session_configured_from_thread_resume_response(
|
||||
)
|
||||
}
|
||||
|
||||
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
|
||||
match target {
|
||||
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
|
||||
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
|
||||
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
|
||||
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::too_many_arguments,
|
||||
reason = "session mapping keeps explicit fields"
|
||||
)]
|
||||
/// Synthesizes startup session metadata from `thread/start` or `thread/resume`.
|
||||
///
|
||||
/// This is a compatibility bridge for the current in-process architecture.
|
||||
/// Some session fields are not available synchronously from the start/resume
|
||||
/// response, so callers must treat the result as a best-effort fallback until
|
||||
/// a later `SessionConfigured` event proves otherwise.
|
||||
/// TODO(architecture): stop synthesizing a partial `SessionConfiguredEvent`
|
||||
/// here. Either return the authoritative session-configured payload from
|
||||
/// `thread/start`/`thread/resume`, or introduce a smaller bootstrap type for
|
||||
/// exec so this path cannot accidentally depend on placeholder fields.
|
||||
fn session_configured_from_thread_response(
|
||||
thread_id: &str,
|
||||
thread_name: Option<String>,
|
||||
@@ -1040,7 +962,7 @@ fn session_configured_from_thread_response(
|
||||
service_tier: Option<codex_protocol::config_types::ServiceTier>,
|
||||
approval_policy: AskForApproval,
|
||||
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer,
|
||||
sandbox_policy: codex_protocol::protocol::SandboxPolicy,
|
||||
sandbox_policy: SandboxPolicy,
|
||||
cwd: PathBuf,
|
||||
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
|
||||
) -> Result<SessionConfiguredEvent, String> {
|
||||
@@ -1067,71 +989,198 @@ fn session_configured_from_thread_response(
|
||||
})
|
||||
}
|
||||
|
||||
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
|
||||
match target {
|
||||
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
|
||||
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
|
||||
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
|
||||
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_legacy_notification_method(method: &str) -> &str {
|
||||
method.strip_prefix("codex/event/").unwrap_or(method)
|
||||
}
|
||||
|
||||
fn lagged_event_warning_message(skipped: usize) -> String {
|
||||
format!("in-process app-server event stream lagged; dropped {skipped} events")
|
||||
}
|
||||
|
||||
struct DecodedLegacyNotification {
|
||||
conversation_id: Option<String>,
|
||||
event: Event,
|
||||
fn should_process_notification(
|
||||
notification: &ServerNotification,
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
) -> bool {
|
||||
match notification {
|
||||
ServerNotification::ConfigWarning(_) | ServerNotification::DeprecationNotice(_) => true,
|
||||
ServerNotification::Error(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::HookCompleted(notification) => {
|
||||
notification.thread_id == thread_id
|
||||
&& notification
|
||||
.turn_id
|
||||
.as_deref()
|
||||
.is_none_or(|candidate| candidate == turn_id)
|
||||
}
|
||||
ServerNotification::HookStarted(notification) => {
|
||||
notification.thread_id == thread_id
|
||||
&& notification
|
||||
.turn_id
|
||||
.as_deref()
|
||||
.is_none_or(|candidate| candidate == turn_id)
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ItemStarted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ModelRerouted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ThreadTokenUsageUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnCompleted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn.id == turn_id
|
||||
}
|
||||
ServerNotification::TurnDiffUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnPlanUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnStarted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn.id == turn_id
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_legacy_notification(
|
||||
notification: JSONRPCNotification,
|
||||
) -> Result<DecodedLegacyNotification, String> {
|
||||
let value = notification
|
||||
.params
|
||||
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
|
||||
let method = notification.method;
|
||||
let normalized_method = normalize_legacy_notification_method(&method).to_string();
|
||||
let serde_json::Value::Object(mut object) = value else {
|
||||
return Err(format!(
|
||||
"legacy notification `{method}` params were not an object"
|
||||
));
|
||||
};
|
||||
let conversation_id = object
|
||||
.get("conversationId")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_owned);
|
||||
let mut event_payload = if let Some(serde_json::Value::Object(msg_payload)) = object.get("msg")
|
||||
{
|
||||
serde_json::Value::Object(msg_payload.clone())
|
||||
} else {
|
||||
object.remove("conversationId");
|
||||
serde_json::Value::Object(object)
|
||||
};
|
||||
let serde_json::Value::Object(ref mut object) = event_payload else {
|
||||
return Err(format!(
|
||||
"legacy notification `{method}` event payload was not an object"
|
||||
));
|
||||
};
|
||||
object.insert(
|
||||
"type".to_string(),
|
||||
serde_json::Value::String(normalized_method),
|
||||
);
|
||||
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
|
||||
vec![
|
||||
ThreadSourceKind::Cli,
|
||||
ThreadSourceKind::VsCode,
|
||||
ThreadSourceKind::Exec,
|
||||
ThreadSourceKind::AppServer,
|
||||
ThreadSourceKind::SubAgent,
|
||||
ThreadSourceKind::SubAgentReview,
|
||||
ThreadSourceKind::SubAgentCompact,
|
||||
ThreadSourceKind::SubAgentThreadSpawn,
|
||||
ThreadSourceKind::SubAgentOther,
|
||||
ThreadSourceKind::Unknown,
|
||||
]
|
||||
}
|
||||
|
||||
let msg: EventMsg = serde_json::from_value(event_payload)
|
||||
.map_err(|err| format!("failed to decode event: {err}"))?;
|
||||
Ok(DecodedLegacyNotification {
|
||||
conversation_id,
|
||||
event: Event {
|
||||
id: String::new(),
|
||||
msg,
|
||||
},
|
||||
})
|
||||
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
|
||||
if let Some(path) = thread.path.as_deref()
|
||||
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
|
||||
{
|
||||
return cwd;
|
||||
}
|
||||
thread.cwd.clone()
|
||||
}
|
||||
|
||||
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
|
||||
let text = tokio::fs::read_to_string(path).await.ok()?;
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
if let RolloutItem::TurnContext(item) = rollout_line.item {
|
||||
return Some(item.cwd);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
|
||||
match (
|
||||
path_utils::normalize_for_path_comparison(current_cwd),
|
||||
path_utils::normalize_for_path_comparison(session_cwd),
|
||||
) {
|
||||
(Ok(current), Ok(session)) => current == session,
|
||||
_ => current_cwd == session_cwd,
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_resume_thread_id(
|
||||
client: &InProcessAppServerClient,
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let model_providers = Some(vec![config.model_provider_id.clone()]);
|
||||
|
||||
if args.last {
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
let response: ThreadListResponse = send_request_with_response(
|
||||
client,
|
||||
ClientRequest::ThreadList {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: ThreadListParams {
|
||||
cursor,
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers: model_providers.clone(),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
},
|
||||
},
|
||||
"thread/list",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
for thread in response.data {
|
||||
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
|
||||
return Ok(Some(thread.id));
|
||||
}
|
||||
}
|
||||
let Some(next_cursor) = response.next_cursor else {
|
||||
return Ok(None);
|
||||
};
|
||||
cursor = Some(next_cursor);
|
||||
}
|
||||
}
|
||||
|
||||
let Some(session_id) = args.session_id.as_deref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
if Uuid::parse_str(session_id).is_ok() {
|
||||
return Ok(Some(session_id.to_string()));
|
||||
}
|
||||
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
let response: ThreadListResponse = send_request_with_response(
|
||||
client,
|
||||
ClientRequest::ThreadList {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: ThreadListParams {
|
||||
cursor,
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers: Some(vec![config.model_provider_id.clone()]),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
// Thread names are attached separately from rollout titles, so name
|
||||
// resolution must scan the filtered list client-side instead of relying
|
||||
// on the backend `search_term` filter.
|
||||
search_term: None,
|
||||
},
|
||||
},
|
||||
"thread/list",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
for thread in response.data {
|
||||
if thread.name.as_deref() != Some(session_id) {
|
||||
continue;
|
||||
}
|
||||
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
|
||||
return Ok(Some(thread.id));
|
||||
}
|
||||
}
|
||||
let Some(next_cursor) = response.next_cursor else {
|
||||
return Ok(None);
|
||||
};
|
||||
cursor = Some(next_cursor);
|
||||
}
|
||||
}
|
||||
|
||||
fn canceled_mcp_server_elicitation_response() -> Result<Value, String> {
|
||||
@@ -1205,8 +1254,6 @@ fn server_request_method_name(request: &ServerRequest) -> String {
|
||||
async fn handle_server_request(
|
||||
client: &InProcessAppServerClient,
|
||||
request: ServerRequest,
|
||||
config: &Config,
|
||||
_thread_id: &str,
|
||||
error_seen: &mut bool,
|
||||
) {
|
||||
let method = server_request_method_name(&request);
|
||||
@@ -1228,50 +1275,6 @@ async fn handle_server_request(
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
ServerRequest::ChatgptAuthTokensRefresh { request_id, params } => {
|
||||
let refresh_result = tokio::task::spawn_blocking({
|
||||
let config = config.clone();
|
||||
move || local_external_chatgpt_tokens(&config)
|
||||
})
|
||||
.await;
|
||||
|
||||
match refresh_result {
|
||||
Err(err) => {
|
||||
reject_server_request(
|
||||
client,
|
||||
request_id,
|
||||
&method,
|
||||
format!("local chatgpt auth refresh task failed in exec: {err}"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Ok(Err(reason)) => reject_server_request(client, request_id, &method, reason).await,
|
||||
Ok(Ok(response)) => {
|
||||
if let Some(previous_account_id) = params.previous_account_id.as_deref()
|
||||
&& previous_account_id != response.chatgpt_account_id
|
||||
{
|
||||
warn!(
|
||||
"local auth refresh account mismatch: expected `{previous_account_id}`, got `{}`",
|
||||
response.chatgpt_account_id
|
||||
);
|
||||
}
|
||||
match serde_json::to_value(response) {
|
||||
Ok(value) => {
|
||||
resolve_server_request(
|
||||
client,
|
||||
request_id,
|
||||
value,
|
||||
"account/chatgptAuthTokens/refresh",
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(err) => Err(format!(
|
||||
"failed to serialize chatgpt auth refresh response: {err}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
@@ -1320,6 +1323,15 @@ async fn handle_server_request(
|
||||
)
|
||||
.await
|
||||
}
|
||||
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
request_id,
|
||||
&method,
|
||||
"chatgpt auth token refresh is not supported in exec mode".to_string(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
ServerRequest::ApplyPatchApproval { request_id, params } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
@@ -1364,91 +1376,6 @@ async fn handle_server_request(
|
||||
}
|
||||
}
|
||||
|
||||
fn local_external_chatgpt_tokens(
|
||||
config: &Config,
|
||||
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
auth_manager.reload();
|
||||
|
||||
let auth = auth_manager
|
||||
.auth_cached()
|
||||
.ok_or_else(|| "no cached auth available for local token refresh".to_string())?;
|
||||
if !auth.is_external_chatgpt_tokens() {
|
||||
return Err("external ChatGPT token auth is not active".to_string());
|
||||
}
|
||||
|
||||
let access_token = auth
|
||||
.get_token()
|
||||
.map_err(|err| format!("failed to read external access token: {err}"))?;
|
||||
let chatgpt_account_id = auth
|
||||
.get_account_id()
|
||||
.ok_or_else(|| "external token auth is missing chatgpt account id".to_string())?;
|
||||
let chatgpt_plan_type = auth.account_plan_type().map(|plan_type| match plan_type {
|
||||
AccountPlanType::Free => "free".to_string(),
|
||||
AccountPlanType::Go => "go".to_string(),
|
||||
AccountPlanType::Plus => "plus".to_string(),
|
||||
AccountPlanType::Pro => "pro".to_string(),
|
||||
AccountPlanType::Team => "team".to_string(),
|
||||
AccountPlanType::Business => "business".to_string(),
|
||||
AccountPlanType::Enterprise => "enterprise".to_string(),
|
||||
AccountPlanType::Edu => "edu".to_string(),
|
||||
AccountPlanType::Unknown => "unknown".to_string(),
|
||||
});
|
||||
|
||||
Ok(ChatgptAuthTokensRefreshResponse {
|
||||
access_token,
|
||||
chatgpt_account_id,
|
||||
chatgpt_plan_type,
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve_resume_path(
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
) -> anyhow::Result<Option<PathBuf>> {
|
||||
if args.last {
|
||||
let default_provider_filter = vec![config.model_provider_id.clone()];
|
||||
let filter_cwd = if args.all {
|
||||
None
|
||||
} else {
|
||||
Some(config.cwd.as_path())
|
||||
};
|
||||
match codex_core::RolloutRecorder::find_latest_thread_path(
|
||||
config,
|
||||
/*page_size*/ 1,
|
||||
/*cursor*/ None,
|
||||
codex_core::ThreadSortKey::UpdatedAt,
|
||||
&[],
|
||||
Some(default_provider_filter.as_slice()),
|
||||
&config.model_provider_id,
|
||||
filter_cwd,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => Ok(path),
|
||||
Err(e) => {
|
||||
error!("Error listing threads: {e}");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
} else if let Some(id_str) = args.session_id.as_deref() {
|
||||
if Uuid::parse_str(id_str).is_ok() {
|
||||
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
|
||||
Ok(path)
|
||||
} else {
|
||||
let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?;
|
||||
Ok(path)
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
|
||||
let path = path?;
|
||||
|
||||
@@ -1806,29 +1733,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_legacy_notification_preserves_conversation_id() {
|
||||
let decoded = decode_legacy_notification(JSONRPCNotification {
|
||||
method: "codex/event/error".to_string(),
|
||||
params: Some(serde_json::json!({
|
||||
"conversationId": "thread-123",
|
||||
"msg": {
|
||||
"message": "boom"
|
||||
}
|
||||
})),
|
||||
})
|
||||
.expect("legacy notification should decode");
|
||||
|
||||
assert_eq!(decoded.conversation_id.as_deref(), Some("thread-123"));
|
||||
assert!(matches!(
|
||||
decoded.event.msg,
|
||||
EventMsg::Error(codex_protocol::protocol::ErrorEvent {
|
||||
message,
|
||||
codex_error_info: None,
|
||||
}) if message == "boom"
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canceled_mcp_server_elicitation_response_uses_cancel_action() {
|
||||
let value = canceled_mcp_server_elicitation_response()
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -122,9 +122,6 @@ impl App {
|
||||
self.handle_server_notification_event(app_server_client, notification)
|
||||
.await;
|
||||
}
|
||||
AppServerEvent::LegacyNotification(_) => {
|
||||
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
|
||||
}
|
||||
AppServerEvent::ServerRequest(request) => {
|
||||
self.handle_server_request_event(app_server_client, request)
|
||||
.await;
|
||||
|
||||
@@ -504,7 +504,6 @@ pub(crate) async fn run_onboarding_app(
|
||||
return Err(color_eyre::eyre::eyre!(message));
|
||||
}
|
||||
AppServerEvent::Lagged { .. }
|
||||
| AppServerEvent::LegacyNotification(_)
|
||||
| AppServerEvent::ServerRequest(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user