diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 7c5b8c7ced..ad902935a2 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2495,6 +2495,7 @@ dependencies = [ "chrono", "clap", "codex-ansi-escape", + "codex-app-server-client", "codex-app-server-protocol", "codex-arg0", "codex-backend-client", diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs index 80a328384f..ff5a6087c6 100644 --- a/codex-rs/app-server-client/src/lib.rs +++ b/codex-rs/app-server-client/src/lib.rs @@ -36,9 +36,12 @@ use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result as JsonRpcResult; use codex_arg0::Arg0DispatchPaths; +use codex_core::AuthManager; +use codex_core::ThreadManager; use codex_core::config::Config; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; +use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; use serde::de::DeserializeOwned; @@ -123,6 +126,16 @@ impl Error for TypedRequestError { } } +#[derive(Clone)] +struct SharedCoreManagers { + // Temporary bootstrap escape hatch for embedders that still need direct + // core handles during the in-process app-server migration. Once TUI/exec + // stop depending on direct manager access, remove this wrapper and keep + // manager ownership entirely inside the app-server runtime. + auth_manager: Arc, + thread_manager: Arc, +} + #[derive(Clone)] pub struct InProcessClientStartArgs { /// Resolved argv0 dispatch paths used by command execution internals. @@ -156,6 +169,30 @@ pub struct InProcessClientStartArgs { } impl InProcessClientStartArgs { + fn shared_core_managers(&self) -> SharedCoreManagers { + let auth_manager = AuthManager::shared( + self.config.codex_home.clone(), + self.enable_codex_api_key_env, + self.config.cli_auth_credentials_store_mode, + ); + let thread_manager = Arc::new(ThreadManager::new( + self.config.as_ref(), + auth_manager.clone(), + self.session_source.clone(), + CollaborationModesConfig { + default_mode_request_user_input: self + .config + .features + .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), + }, + )); + + SharedCoreManagers { + auth_manager, + thread_manager, + } + } + /// Builds initialize params from caller-provided metadata. pub fn initialize_params(&self) -> InitializeParams { let capabilities = InitializeCapabilities { @@ -177,7 +214,7 @@ impl InProcessClientStartArgs { } } - fn into_runtime_start_args(self) -> InProcessStartArgs { + fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs { let initialize = self.initialize_params(); InProcessStartArgs { arg0_paths: self.arg0_paths, @@ -185,6 +222,8 @@ impl InProcessClientStartArgs { cli_overrides: self.cli_overrides, loader_overrides: self.loader_overrides, cloud_requirements: self.cloud_requirements, + auth_manager: Some(shared_core.auth_manager.clone()), + thread_manager: Some(shared_core.thread_manager.clone()), feedback: self.feedback, config_warnings: self.config_warnings, session_source: self.session_source, @@ -238,6 +277,8 @@ pub struct InProcessAppServerClient { command_tx: mpsc::Sender, event_rx: mpsc::Receiver, worker_handle: tokio::task::JoinHandle<()>, + auth_manager: Arc, + thread_manager: Arc, } impl InProcessAppServerClient { @@ -248,8 +289,9 @@ impl InProcessAppServerClient { /// with overload error instead of being silently dropped. pub async fn start(args: InProcessClientStartArgs) -> IoResult { let channel_capacity = args.channel_capacity.max(1); + let shared_core = args.shared_core_managers(); let mut handle = - codex_app_server::in_process::start(args.into_runtime_start_args()).await?; + codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?; let request_sender = handle.sender(); let (command_tx, mut command_rx) = mpsc::channel::(channel_capacity); let (event_tx, event_rx) = mpsc::channel::(channel_capacity); @@ -400,9 +442,21 @@ impl InProcessAppServerClient { command_tx, event_rx, worker_handle, + auth_manager: shared_core.auth_manager, + thread_manager: shared_core.thread_manager, }) } + /// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage. + pub fn auth_manager(&self) -> Arc { + self.auth_manager.clone() + } + + /// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage. + pub fn thread_manager(&self) -> Arc { + self.thread_manager.clone() + } + /// Sends a typed client request and returns raw JSON-RPC result. /// /// Callers that expect a concrete response type should usually prefer @@ -555,6 +609,8 @@ impl InProcessAppServerClient { command_tx, event_rx, worker_handle, + auth_manager: _, + thread_manager: _, } = self; let mut worker_handle = worker_handle; // Drop the caller-facing receiver before asking the worker to shut @@ -606,6 +662,8 @@ mod tests { use codex_app_server_protocol::SessionSource as ApiSessionSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; + use codex_core::AuthManager; + use codex_core::ThreadManager; use codex_core::config::ConfigBuilder; use pretty_assertions::assert_eq; use tokio::time::Duration; @@ -702,6 +760,35 @@ mod tests { } } + #[tokio::test] + async fn shared_thread_manager_tracks_threads_started_via_app_server() { + let client = start_test_client(SessionSource::Cli).await; + + let response: ThreadStartResponse = client + .request_typed(ClientRequest::ThreadStart { + request_id: RequestId::Integer(3), + params: ThreadStartParams { + ephemeral: Some(true), + ..ThreadStartParams::default() + }, + }) + .await + .expect("thread/start should succeed"); + let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id) + .expect("thread id should parse"); + timeout( + Duration::from_secs(2), + client.thread_manager().get_thread(created_thread_id), + ) + .await + .expect("timed out waiting for retained thread manager to observe started thread") + .expect("started thread should be visible through the shared thread manager"); + let thread_ids = client.thread_manager().list_thread_ids().await; + assert!(thread_ids.contains(&created_thread_id)); + + client.shutdown().await.expect("shutdown should complete"); + } + #[tokio::test] async fn tiny_channel_capacity_still_supports_request_roundtrip() { let client = start_test_client_with_capacity(SessionSource::Exec, 1).await; @@ -746,6 +833,22 @@ mod tests { let (command_tx, _command_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::channel(1); let worker_handle = tokio::spawn(async {}); + let config = build_test_config().await; + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ); + let thread_manager = Arc::new(ThreadManager::new( + &config, + auth_manager.clone(), + SessionSource::Exec, + CollaborationModesConfig { + default_mode_request_user_input: config + .features + .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), + }, + )); event_tx .send(InProcessServerEvent::Lagged { skipped: 3 }) .await @@ -756,6 +859,8 @@ mod tests { command_tx, event_rx, worker_handle, + auth_manager, + thread_manager, }; let event = timeout(Duration::from_secs(2), client.next_event()) @@ -798,4 +903,30 @@ mod tests { skipped: 1 })); } + + #[tokio::test] + async fn accessors_expose_retained_shared_managers() { + let client = start_test_client(SessionSource::Cli).await; + + assert!( + Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()), + "auth_manager accessor should clone the retained shared manager" + ); + assert!( + Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()), + "thread_manager accessor should clone the retained shared manager" + ); + + client.shutdown().await.expect("shutdown should complete"); + } + + #[tokio::test] + async fn shutdown_completes_promptly_with_retained_shared_managers() { + let client = start_test_client(SessionSource::Cli).await; + + timeout(Duration::from_secs(1), client.shutdown()) + .await + .expect("shutdown should not wait for the 5s fallback timeout") + .expect("shutdown should complete"); + } } diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 6110fb52a5..3a9286a5f4 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -74,6 +74,8 @@ use codex_app_server_protocol::Result; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_arg0::Arg0DispatchPaths; +use codex_core::AuthManager; +use codex_core::ThreadManager; use codex_core::config::Config; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; @@ -122,6 +124,10 @@ pub struct InProcessStartArgs { pub loader_overrides: LoaderOverrides, /// Preloaded cloud requirements provider. pub cloud_requirements: CloudRequirementsLoader, + /// Optional prebuilt auth manager reused by an embedding caller. + pub auth_manager: Option>, + /// Optional prebuilt thread manager reused by an embedding caller. + pub thread_manager: Option>, /// Feedback sink used by app-server/core telemetry and logs. pub feedback: CodexFeedback, /// Startup warnings emitted after initialize succeeds. @@ -404,6 +410,8 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { cli_overrides: args.cli_overrides, loader_overrides: args.loader_overrides, cloud_requirements: args.cloud_requirements, + auth_manager: args.auth_manager, + thread_manager: args.thread_manager, feedback: args.feedback, log_db: None, config_warnings: args.config_warnings, @@ -475,6 +483,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } } + processor.clear_runtime_references(); processor.drain_background_tasks().await; processor.shutdown_threads().await; processor.connection_closed(IN_PROCESS_CONNECTION_ID).await; @@ -749,6 +758,8 @@ mod tests { cli_overrides: Vec::new(), loader_overrides: LoaderOverrides::default(), cloud_requirements: CloudRequirementsLoader::default(), + auth_manager: None, + thread_manager: None, feedback: CodexFeedback::new(), config_warnings: Vec::new(), session_source, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 3fc6116bab..745f771a2c 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -607,6 +607,8 @@ pub async fn run_main_with_transport( cli_overrides, loader_overrides, cloud_requirements: cloud_requirements.clone(), + auth_manager: None, + thread_manager: None, feedback: feedback.clone(), log_db, config_warnings, diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 911adef35a..7571449da9 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -139,6 +139,7 @@ pub(crate) struct MessageProcessor { codex_message_processor: CodexMessageProcessor, config_api: ConfigApi, external_agent_config_api: ExternalAgentConfigApi, + auth_manager: Arc, config: Arc, config_warnings: Arc>, } @@ -159,6 +160,8 @@ pub(crate) struct MessageProcessorArgs { pub(crate) cli_overrides: Vec<(String, TomlValue)>, pub(crate) loader_overrides: LoaderOverrides, pub(crate) cloud_requirements: CloudRequirementsLoader, + pub(crate) auth_manager: Option>, + pub(crate) thread_manager: Option>, pub(crate) feedback: CodexFeedback, pub(crate) log_db: Option, pub(crate) config_warnings: Vec, @@ -177,33 +180,42 @@ impl MessageProcessor { cli_overrides, loader_overrides, cloud_requirements, + auth_manager, + thread_manager, feedback, log_db, config_warnings, session_source, enable_codex_api_key_env, } = args; - let auth_manager = AuthManager::shared( - config.codex_home.clone(), - enable_codex_api_key_env, - config.cli_auth_credentials_store_mode, - ); + let (auth_manager, thread_manager) = match (auth_manager, thread_manager) { + (Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager), + (None, None) => { + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + enable_codex_api_key_env, + config.cli_auth_credentials_store_mode, + ); + let thread_manager = Arc::new(ThreadManager::new( + config.as_ref(), + auth_manager.clone(), + session_source, + CollaborationModesConfig { + default_mode_request_user_input: config + .features + .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), + }, + )); + (auth_manager, thread_manager) + } + _ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"), + }; auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge { outgoing: outgoing.clone(), })); let analytics_events_client = AnalyticsEventsClient::new(Arc::clone(&config), Arc::clone(&auth_manager)); - let thread_manager = Arc::new(ThreadManager::new( - config.as_ref(), - auth_manager.clone(), - session_source, - CollaborationModesConfig { - default_mode_request_user_input: config - .features - .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), - }, - )); thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); @@ -213,7 +225,7 @@ impl MessageProcessor { .maybe_start_curated_repo_sync_for_config(&config); let cloud_requirements = Arc::new(RwLock::new(cloud_requirements)); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { - auth_manager, + auth_manager: auth_manager.clone(), thread_manager: Arc::clone(&thread_manager), outgoing: outgoing.clone(), arg0_paths, @@ -238,11 +250,16 @@ impl MessageProcessor { codex_message_processor, config_api, external_agent_config_api, + auth_manager, config, config_warnings: Arc::new(config_warnings), } } + pub(crate) fn clear_runtime_references(&self) { + self.auth_manager.clear_external_auth_refresher(); + } + pub(crate) async fn process_request( &mut self, connection_id: ConnectionId, diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index af6fc5a486..40d7fb234f 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -241,6 +241,8 @@ fn build_test_processor( cli_overrides: Vec::new(), loader_overrides: LoaderOverrides::default(), cloud_requirements: CloudRequirementsLoader::default(), + auth_manager: None, + thread_manager: None, feedback: CodexFeedback::new(), log_db: None, config_warnings: Vec::new(), diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index a6ea0946b9..d6f2681e53 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -976,7 +976,12 @@ async fn run_interactive_tui( } } - codex_tui::run_main(interactive, arg0_paths).await + codex_tui::run_main( + interactive, + arg0_paths, + codex_core::config_loader::LoaderOverrides::default(), + ) + .await } fn confirm(prompt: &str) -> std::io::Result { diff --git a/codex-rs/core/src/auth.rs b/codex-rs/core/src/auth.rs index 8bb2b23d87..8818aa5c6c 100644 --- a/codex-rs/core/src/auth.rs +++ b/codex-rs/core/src/auth.rs @@ -1146,6 +1146,12 @@ impl AuthManager { } } + pub fn clear_external_auth_refresher(&self) { + if let Ok(mut guard) = self.inner.write() { + guard.external_refresher = None; + } + } + pub fn set_forced_chatgpt_workspace_id(&self, workspace_id: Option) { if let Ok(mut guard) = self.forced_chatgpt_workspace_id.write() { *guard = workspace_id; diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 230dd65486..ce46d2cd56 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -29,6 +29,7 @@ base64 = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } codex-ansi-escape = { workspace = true } +codex-app-server-client = { workspace = true } codex-app-server-protocol = { workspace = true } codex-arg0 = { workspace = true } codex-backend-client = { workspace = true } diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index a244965d5b..88eb68072a 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -39,6 +39,7 @@ use crate::tui::TuiEvent; use crate::update_action::UpdateAction; use crate::version::CODEX_CLI_VERSION; use codex_ansi_escape::ansi_escape_line; +use codex_app_server_client::InProcessAppServerClient; use codex_app_server_protocol::ConfigLayerSource; use codex_core::AuthManager; use codex_core::CodexAuth; @@ -51,7 +52,6 @@ use codex_core::config::edit::ConfigEditsBuilder; use codex_core::config::types::ModelAvailabilityNuxConfig; use codex_core::config_loader::ConfigLayerStackOrdering; use codex_core::features::Feature; -use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; use codex_core::models_manager::manager::RefreshStrategy; use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG; use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG; @@ -112,6 +112,7 @@ use tokio::task::JoinHandle; use toml::Value as TomlValue; mod agent_navigation; +mod app_server_adapter; mod pending_interactive_replay; use self::agent_navigation::AgentNavigationDirection; @@ -1711,7 +1712,7 @@ impl App { #[allow(clippy::too_many_arguments)] pub async fn run( tui: &mut tui::Tui, - auth_manager: Arc, + mut app_server: InProcessAppServerClient, mut config: Config, cli_kv_overrides: Vec<(String, TomlValue)>, harness_overrides: ConfigOverrides, @@ -1731,20 +1732,8 @@ impl App { let harness_overrides = normalize_harness_overrides_for_cwd(harness_overrides, &config.cwd)?; - let thread_manager = Arc::new(ThreadManager::new( - &config, - auth_manager.clone(), - SessionSource::Cli, - CollaborationModesConfig { - default_mode_request_user_input: config - .features - .enabled(codex_core::features::Feature::DefaultModeRequestUserInput), - }, - )); - // TODO(xl): Move into PluginManager once this no longer depends on config feature gating. - thread_manager - .plugins_manager() - .maybe_start_curated_repo_sync_for_config(&config); + let auth_manager = app_server.auth_manager(); + let thread_manager = app_server.thread_manager(); let mut model = thread_manager .get_models_manager() .get_default_model(&config.model, RefreshStrategy::Offline) @@ -1762,6 +1751,13 @@ impl App { ) .await; if let Some(exit_info) = exit_info { + app_server + .shutdown() + .await + .inspect_err(|err| { + tracing::warn!("app-server shutdown failed: {err}"); + }) + .ok(); return Ok(exit_info); } if let Some(updated_model) = config.model.clone() { @@ -1982,8 +1978,18 @@ impl App { } } + let tui_events = tui.event_stream(); + tokio::pin!(tui_events); + + tui.frame_requester().schedule_frame(); + + let mut thread_created_rx = thread_manager.subscribe_thread_created(); + let mut listen_for_threads = true; + let mut listen_for_app_server_events = true; + let mut waiting_for_initial_session_configured = wait_for_initial_session_configured; + #[cfg(not(debug_assertions))] - if let Some(latest_version) = upgrade_version { + let pre_loop_exit_reason = if let Some(latest_version) = upgrade_version { let control = app .handle_event( tui, @@ -1993,79 +1999,108 @@ impl App { ))), ) .await?; - if let AppRunControl::Exit(exit_reason) = control { - return Ok(AppExitInfo { - token_usage: app.token_usage(), - thread_id: app.chat_widget.thread_id(), - thread_name: app.chat_widget.thread_name(), - update_action: app.pending_update_action, - exit_reason, - }); - } - } - - let tui_events = tui.event_stream(); - tokio::pin!(tui_events); - - tui.frame_requester().schedule_frame(); - - let mut thread_created_rx = thread_manager.subscribe_thread_created(); - let mut listen_for_threads = true; - let mut waiting_for_initial_session_configured = wait_for_initial_session_configured; - - let exit_reason = loop { - let control = select! { - Some(event) = app_event_rx.recv() => { - app.handle_event(tui, event).await? - } - active = async { - if let Some(rx) = app.active_thread_rx.as_mut() { - rx.recv().await - } else { - None - } - }, if App::should_handle_active_thread_events( - waiting_for_initial_session_configured, - app.active_thread_rx.is_some() - ) => { - if let Some(event) = active { - app.handle_active_thread_event(tui, event).await?; - } else { - app.clear_active_thread().await; - } - AppRunControl::Continue - } - Some(event) = tui_events.next() => { - app.handle_tui_event(tui, event).await? - } - // Listen on new thread creation due to collab tools. - created = thread_created_rx.recv(), if listen_for_threads => { - match created { - Ok(thread_id) => { - app.handle_thread_created(thread_id).await?; - } - Err(broadcast::error::RecvError::Lagged(_)) => { - tracing::warn!("thread_created receiver lagged; skipping resync"); - } - Err(broadcast::error::RecvError::Closed) => { - listen_for_threads = false; - } - } - AppRunControl::Continue - } - }; - if App::should_stop_waiting_for_initial_session( - waiting_for_initial_session_configured, - app.primary_thread_id, - ) { - waiting_for_initial_session_configured = false; - } match control { - AppRunControl::Continue => {} - AppRunControl::Exit(reason) => break reason, + AppRunControl::Continue => None, + AppRunControl::Exit(exit_reason) => Some(exit_reason), + } + } else { + None + }; + #[cfg(debug_assertions)] + let pre_loop_exit_reason: Option = None; + + let exit_reason_result = if let Some(exit_reason) = pre_loop_exit_reason { + Ok(exit_reason) + } else { + loop { + let control = select! { + Some(event) = app_event_rx.recv() => { + match app.handle_event(tui, event).await { + Ok(control) => control, + Err(err) => break Err(err), + } + } + active = async { + if let Some(rx) = app.active_thread_rx.as_mut() { + rx.recv().await + } else { + None + } + }, if App::should_handle_active_thread_events( + waiting_for_initial_session_configured, + app.active_thread_rx.is_some() + ) => { + if let Some(event) = active { + if let Err(err) = app.handle_active_thread_event(tui, event).await { + break Err(err); + } + } else { + app.clear_active_thread().await; + } + AppRunControl::Continue + } + Some(event) = tui_events.next() => { + match app.handle_tui_event(tui, event).await { + Ok(control) => control, + Err(err) => break Err(err), + } + } + app_server_event = app_server.next_event(), if listen_for_app_server_events => { + match app_server_event { + Some(event) => app.handle_app_server_event(&app_server, event).await, + None => { + listen_for_app_server_events = false; + tracing::warn!("app-server event stream closed"); + } + } + AppRunControl::Continue + } + // Listen on new thread creation due to collab tools. + created = thread_created_rx.recv(), if listen_for_threads => { + match created { + Ok(thread_id) => { + if let Err(err) = app.handle_thread_created(thread_id).await { + break Err(err); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("thread_created receiver lagged; skipping resync"); + } + Err(broadcast::error::RecvError::Closed) => { + listen_for_threads = false; + } + } + AppRunControl::Continue + } + }; + if App::should_stop_waiting_for_initial_session( + waiting_for_initial_session_configured, + app.primary_thread_id, + ) { + waiting_for_initial_session_configured = false; + } + match control { + AppRunControl::Continue => {} + AppRunControl::Exit(reason) => break Ok(reason), + } + } + }; + if let Err(err) = app_server.shutdown().await { + tracing::warn!(error = %err, "failed to shut down embedded app server"); + } + let clear_result = tui.terminal.clear(); + let exit_reason = match exit_reason_result { + Ok(exit_reason) => { + clear_result?; + exit_reason + } + Err(err) => { + if let Err(clear_err) = clear_result { + tracing::warn!(error = %clear_err, "failed to clear terminal UI"); + } + return Err(err); } }; - tui.terminal.clear()?; Ok(AppExitInfo { token_usage: app.token_usage(), thread_id: app.chat_widget.thread_id(), diff --git a/codex-rs/tui/src/app/app_server_adapter.rs b/codex-rs/tui/src/app/app_server_adapter.rs new file mode 100644 index 0000000000..48caeb1380 --- /dev/null +++ b/codex-rs/tui/src/app/app_server_adapter.rs @@ -0,0 +1,72 @@ +/* +This module holds the temporary adapter layer between the TUI and the app +server during the hybrid migration period. + +For now, the TUI still owns its existing direct-core behavior, but startup +allocates a local in-process app server and drains its event stream. Keeping +the app-server-specific wiring here keeps that transitional logic out of the +main `app.rs` orchestration path. + +As more TUI flows move onto the app-server surface directly, this adapter +should shrink and eventually disappear. +*/ + +use super::App; +use codex_app_server_client::InProcessAppServerClient; +use codex_app_server_client::InProcessServerEvent; +use codex_app_server_protocol::JSONRPCErrorError; + +impl App { + pub(super) async fn handle_app_server_event( + &mut self, + app_server_client: &InProcessAppServerClient, + event: InProcessServerEvent, + ) { + match event { + InProcessServerEvent::Lagged { skipped } => { + tracing::warn!( + skipped, + "app-server event consumer lagged; dropping ignored events" + ); + } + InProcessServerEvent::ServerNotification(_) => {} + InProcessServerEvent::LegacyNotification(_) => {} + InProcessServerEvent::ServerRequest(request) => { + let request_id = request.id().clone(); + tracing::warn!( + ?request_id, + "rejecting app-server request while TUI still uses direct core APIs" + ); + if let Err(err) = self + .reject_app_server_request( + app_server_client, + request_id, + "TUI client does not yet handle this app-server server request".to_string(), + ) + .await + { + tracing::warn!("{err}"); + } + } + } + } + + async fn reject_app_server_request( + &self, + app_server_client: &InProcessAppServerClient, + request_id: codex_app_server_protocol::RequestId, + reason: String, + ) -> std::result::Result<(), String> { + app_server_client + .reject_server_request( + request_id, + JSONRPCErrorError { + code: -32000, + message: reason, + data: None, + }, + ) + .await + .map_err(|err| format!("failed to reject app-server request: {err}")) + } +} diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 43e0e2afe9..43230eb2d4 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -7,6 +7,10 @@ use additional_dirs::add_dir_warning_message; use app::App; pub use app::AppExitInfo; pub use app::ExitReason; +use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY; +use codex_app_server_client::InProcessAppServerClient; +use codex_app_server_client::InProcessClientStartArgs; +use codex_app_server_protocol::ConfigWarningNotification; use codex_cloud_requirements::cloud_requirements_loader; use codex_core::AuthManager; use codex_core::CodexAuth; @@ -24,6 +28,7 @@ use codex_core::config::load_config_as_toml_with_cli_overrides; use codex_core::config::resolve_oss_provider; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::ConfigLoadError; +use codex_core::config_loader::LoaderOverrides; use codex_core::config_loader::format_config_error_with_source; use codex_core::default_client::set_default_client_residency_requirement; use codex_core::find_thread_path_by_id_str; @@ -45,12 +50,15 @@ use codex_state::log_db; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_oss::ensure_oss_provider_ready; use codex_utils_oss::get_default_model_for_oss_provider; +use color_eyre::eyre::WrapErr; use cwd_prompt::CwdPromptAction; use cwd_prompt::CwdPromptOutcome; use cwd_prompt::CwdSelection; use std::fs::OpenOptions; +use std::future::Future; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use tracing::error; use tracing_appender::non_blocking; use tracing_subscriber::EnvFilter; @@ -212,6 +220,7 @@ mod voice { }); } } + mod wrapping; #[cfg(test)] @@ -227,7 +236,75 @@ pub use public_widgets::composer_input::ComposerAction; pub use public_widgets::composer_input::ComposerInput; // (tests access modules directly within the crate) -pub async fn run_main(mut cli: Cli, arg0_paths: Arg0DispatchPaths) -> std::io::Result { +async fn start_embedded_app_server( + arg0_paths: Arg0DispatchPaths, + config: Config, + cli_kv_overrides: Vec<(String, toml::Value)>, + loader_overrides: LoaderOverrides, + cloud_requirements: CloudRequirementsLoader, + feedback: codex_feedback::CodexFeedback, +) -> color_eyre::Result { + start_embedded_app_server_with( + arg0_paths, + config, + cli_kv_overrides, + loader_overrides, + cloud_requirements, + feedback, + InProcessAppServerClient::start, + ) + .await +} + +async fn start_embedded_app_server_with( + arg0_paths: Arg0DispatchPaths, + config: Config, + cli_kv_overrides: Vec<(String, toml::Value)>, + loader_overrides: LoaderOverrides, + cloud_requirements: CloudRequirementsLoader, + feedback: codex_feedback::CodexFeedback, + start_client: F, +) -> color_eyre::Result +where + F: FnOnce(InProcessClientStartArgs) -> Fut, + Fut: Future>, +{ + let config_warnings = config + .startup_warnings + .iter() + .map(|warning| ConfigWarningNotification { + summary: warning.clone(), + details: None, + path: None, + range: None, + }) + .collect(); + let client = start_client(InProcessClientStartArgs { + arg0_paths, + config: Arc::new(config), + cli_overrides: cli_kv_overrides, + loader_overrides, + cloud_requirements, + feedback, + config_warnings, + session_source: codex_protocol::protocol::SessionSource::Cli, + enable_codex_api_key_env: false, + client_name: "codex-tui".to_string(), + client_version: env!("CARGO_PKG_VERSION").to_string(), + experimental_api: true, + opt_out_notification_methods: Vec::new(), + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await + .wrap_err("failed to start embedded app server")?; + Ok(client) +} + +pub async fn run_main( + mut cli: Cli, + arg0_paths: Arg0DispatchPaths, + loader_overrides: LoaderOverrides, +) -> std::io::Result { let (sandbox_mode, approval_policy) = if cli.full_auto { ( Some(SandboxMode::WorkspaceWrite), @@ -519,6 +596,8 @@ pub async fn run_main(mut cli: Cli, arg0_paths: Arg0DispatchPaths) -> std::io::R run_ratatui_app( cli, + arg0_paths, + loader_overrides, config, overrides, cli_kv_overrides, @@ -529,8 +608,11 @@ pub async fn run_main(mut cli: Cli, arg0_paths: Arg0DispatchPaths) -> std::io::R .map_err(|err| std::io::Error::other(err.to_string())) } +#[allow(clippy::too_many_arguments)] async fn run_ratatui_app( cli: Cli, + arg0_paths: Arg0DispatchPaths, + loader_overrides: LoaderOverrides, initial_config: Config, overrides: ConfigOverrides, cli_kv_overrides: Vec<(String, toml::Value)>, @@ -919,10 +1001,27 @@ async fn run_ratatui_app( let use_alt_screen = determine_alt_screen_mode(no_alt_screen, config.tui_alternate_screen); tui.set_alt_screen_enabled(use_alt_screen); + let app_server = match start_embedded_app_server( + arg0_paths, + config.clone(), + cli_kv_overrides.clone(), + loader_overrides, + cloud_requirements.clone(), + feedback.clone(), + ) + .await + { + Ok(app_server) => app_server, + Err(err) => { + restore(); + session_log::log_session_end(); + return Err(err); + } + }; let app_result = App::run( &mut tui, - auth_manager, + app_server, config, cli_kv_overrides.clone(), overrides.clone(), @@ -1182,7 +1281,6 @@ mod tests { use codex_core::config::ConfigOverrides; use codex_core::config::ProjectConfig; use codex_core::features::Feature; - use codex_protocol::ThreadId; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; @@ -1200,6 +1298,20 @@ mod tests { .await } + async fn start_test_embedded_app_server( + config: Config, + ) -> color_eyre::Result { + start_embedded_app_server( + Arg0DispatchPaths::default(), + config, + Vec::new(), + LoaderOverrides::default(), + CloudRequirementsLoader::default(), + codex_feedback::CodexFeedback::new(), + ) + .await + } + #[tokio::test] #[serial] async fn windows_shows_trust_prompt_without_sandbox() -> std::io::Result<()> { @@ -1215,6 +1327,52 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn embedded_app_server_exposes_client_manager_accessors() -> color_eyre::Result<()> { + let temp_dir = TempDir::new()?; + let config = build_config(&temp_dir).await?; + let app_server = start_test_embedded_app_server(config).await?; + + assert!(Arc::ptr_eq( + &app_server.auth_manager(), + &app_server.auth_manager() + )); + assert!(Arc::ptr_eq( + &app_server.thread_manager(), + &app_server.thread_manager() + )); + + app_server.shutdown().await?; + Ok(()) + } + + #[tokio::test] + async fn embedded_app_server_start_failure_is_returned() -> color_eyre::Result<()> { + let temp_dir = TempDir::new()?; + let config = build_config(&temp_dir).await?; + let result = start_embedded_app_server_with( + Arg0DispatchPaths::default(), + config, + Vec::new(), + LoaderOverrides::default(), + CloudRequirementsLoader::default(), + codex_feedback::CodexFeedback::new(), + |_args| async { Err(std::io::Error::other("boom")) }, + ) + .await; + let err = match result { + Ok(_) => panic!("startup failure should be returned"), + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("failed to start embedded app server"), + "error should preserve the embedded app server startup context" + ); + Ok(()) + } #[tokio::test] #[serial] async fn windows_shows_trust_prompt_with_sandbox() -> std::io::Result<()> { diff --git a/codex-rs/tui/src/main.rs b/codex-rs/tui/src/main.rs index 5ee9ce5a47..3fe279df3f 100644 --- a/codex-rs/tui/src/main.rs +++ b/codex-rs/tui/src/main.rs @@ -22,7 +22,12 @@ fn main() -> anyhow::Result<()> { .config_overrides .raw_overrides .splice(0..0, top_cli.config_overrides.raw_overrides); - let exit_info = run_main(inner, arg0_paths).await?; + let exit_info = run_main( + inner, + arg0_paths, + codex_core::config_loader::LoaderOverrides::default(), + ) + .await?; let token_usage = exit_info.token_usage; if !token_usage.is_zero() { println!(