diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 959cc92520..c5b4e62775 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -5332,8 +5332,14 @@ impl CodexMessageProcessor { ), &config, ); - Self::send_app_list_updated_notification(&outgoing, merged.clone()).await; - last_notified_apps = Some(merged); + if Self::should_send_app_list_updated_notification( + merged.as_slice(), + accessible_loaded, + all_loaded, + ) { + Self::send_app_list_updated_notification(&outgoing, merged.clone()).await; + last_notified_apps = Some(merged); + } } loop { @@ -5411,7 +5417,12 @@ impl CodexMessageProcessor { ), &config, ); - if last_notified_apps.as_ref() != Some(&merged) { + if Self::should_send_app_list_updated_notification( + merged.as_slice(), + accessible_loaded, + all_loaded, + ) && last_notified_apps.as_ref() != Some(&merged) + { Self::send_app_list_updated_notification(&outgoing, merged.clone()).await; last_notified_apps = Some(merged.clone()); } @@ -5441,6 +5452,15 @@ impl CodexMessageProcessor { connectors::merge_connectors_with_accessible(all, accessible, all_connectors_loaded) } + fn should_send_app_list_updated_notification( + connectors: &[AppInfo], + accessible_loaded: bool, + all_loaded: bool, + ) -> bool { + connectors.iter().any(|connector| connector.is_accessible) + || (accessible_loaded && all_loaded) + } + fn paginate_apps( connectors: &[AppInfo], start: usize, diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index 6a3243ad56..72655b5afb 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -428,7 +428,7 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<( } #[tokio::test] -async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { +async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates() -> Result<()> { let connectors = vec![ AppInfo { id: "alpha".to_string(), @@ -475,7 +475,7 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { codex_home.path(), ChatGptAuthFixture::new("chatgpt-token") .account_id("account-123") - .chatgpt_user_id("user-123") + .chatgpt_user_id("user-directory-first") .chatgpt_account_id("account-123"), AuthCredentialsStoreMode::File, )?; @@ -492,60 +492,14 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { }) .await?; - let expected_directory_first = vec![ - AppInfo { - id: "alpha".to_string(), - name: "Alpha".to_string(), - description: Some("Alpha connector".to_string()), - logo_url: Some("https://example.com/alpha.png".to_string()), - logo_url_dark: None, - distribution_channel: None, - branding: None, - app_metadata: None, - labels: None, - install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), - is_accessible: false, - is_enabled: true, - }, - AppInfo { - id: "beta".to_string(), - name: "beta".to_string(), - description: None, - logo_url: None, - logo_url_dark: None, - distribution_channel: None, - branding: None, - app_metadata: None, - labels: None, - install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), - is_accessible: false, - is_enabled: true, - }, - ]; - let expected_accessible_first = vec![AppInfo { - id: "beta".to_string(), - name: "Beta App".to_string(), - description: None, - logo_url: None, - logo_url_dark: None, - distribution_channel: None, - branding: None, - app_metadata: None, - labels: None, - install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()), - is_accessible: true, - is_enabled: true, - }]; - - let first_update = read_app_list_updated_notification(&mut mcp).await?; - // app/list emits an update after whichever async load finishes first. Even with - // a tools delay in this test, the accessible-tools path can return first if the - // process-global Codex Apps tools cache is warm from another test. + let maybe_update = timeout( + Duration::from_millis(150), + read_app_list_updated_notification(&mut mcp), + ) + .await; assert!( - first_update.data == expected_directory_first - || first_update.data == expected_accessible_first, - "unexpected first app/list update: {:#?}", - first_update.data + maybe_update.is_err(), + "unexpected directory-only app/list update before accessible apps loaded" ); let expected = vec![ @@ -579,8 +533,96 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { }, ]; - let second_update = read_app_list_updated_notification(&mut mcp).await?; - assert_eq!(second_update.data, expected); + let update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(update.data, expected); + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let AppsListResponse { data, next_cursor } = to_response(response)?; + assert_eq!(data, expected); + assert!(next_cursor.is_none()); + + server_handle.abort(); + Ok(()) +} + +#[tokio::test] +async fn list_apps_does_not_emit_empty_interim_updates() -> Result<()> { + let connectors = vec![AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: None, + is_accessible: false, + is_enabled: true, + }]; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + Vec::new(), + Duration::from_millis(300), + Duration::ZERO, + ) + .await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-empty-interim") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + thread_id: None, + force_refetch: false, + }) + .await?; + + let maybe_update = timeout( + Duration::from_millis(150), + read_app_list_updated_notification(&mut mcp), + ) + .await; + assert!( + maybe_update.is_err(), + "unexpected empty interim app/list update" + ); + + let expected = vec![AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + is_enabled: true, + }]; + + let update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(update.data, expected); let response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT, @@ -1026,39 +1068,14 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu ] ); - let second_update = read_app_list_updated_notification(&mut mcp).await?; - assert_eq!( - second_update.data, - vec![ - AppInfo { - id: "alpha".to_string(), - name: "Alpha".to_string(), - description: Some("Alpha v1".to_string()), - logo_url: None, - logo_url_dark: None, - distribution_channel: None, - branding: None, - app_metadata: None, - labels: None, - install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), - is_accessible: false, - is_enabled: true, - }, - AppInfo { - id: "beta".to_string(), - name: "Beta App".to_string(), - description: Some("Beta v1".to_string()), - logo_url: None, - logo_url_dark: None, - distribution_channel: None, - branding: None, - app_metadata: None, - labels: None, - install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()), - is_accessible: false, - is_enabled: true, - }, - ] + let maybe_second_update = timeout( + Duration::from_millis(150), + read_app_list_updated_notification(&mut mcp), + ) + .await; + assert!( + maybe_second_update.is_err(), + "unexpected inaccessible-only app/list update during force refetch" ); let expected_final = vec![AppInfo { @@ -1075,8 +1092,8 @@ async fn list_apps_force_refetch_patches_updates_from_cached_snapshots() -> Resu is_accessible: false, is_enabled: true, }]; - let third_update = read_app_list_updated_notification(&mut mcp).await?; - assert_eq!(third_update.data, expected_final); + let second_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(second_update.data, expected_final); let refetch_response: JSONRPCResponse = timeout( DEFAULT_TIMEOUT,