feat: Allow sync with remote plugin status. (#14176)

Add forceRemoteSync to plugin/list.
When it is set to True, we will sync the local plugin status with the
remote one (backend-api/plugins/list).
This commit is contained in:
xl-openai
2026-03-10 13:32:59 -07:00
committed by Michael Bolin
parent f2d66fadd8
commit d751e68f44
14 changed files with 1042 additions and 30 deletions

View File

@@ -5,6 +5,7 @@ use super::manifest::PluginManifestInterfaceSummary;
use super::marketplace::MarketplaceError;
use super::marketplace::MarketplacePluginSourceSummary;
use super::marketplace::list_marketplaces;
use super::marketplace::load_marketplace_summary;
use super::marketplace::resolve_marketplace_plugin;
use super::plugin_manifest_name;
use super::plugin_manifest_paths;
@@ -15,6 +16,7 @@ use super::store::PluginInstallResult;
use super::store::PluginStore;
use super::store::PluginStoreError;
use super::sync_openai_plugins_repo;
use crate::auth::CodexAuth;
use crate::config::Config;
use crate::config::ConfigService;
use crate::config::ConfigServiceError;
@@ -25,6 +27,7 @@ use crate::config::profile::ConfigProfile;
use crate::config::types::McpServerConfig;
use crate::config::types::PluginConfig;
use crate::config_loader::ConfigLayerStack;
use crate::default_client::build_reqwest_client;
use crate::features::Feature;
use crate::features::FeatureOverrides;
use crate::features::Features;
@@ -43,12 +46,17 @@ use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use toml_edit::value;
use tracing::info;
use tracing::warn;
const DEFAULT_SKILLS_DIR_NAME: &str = "skills";
const DEFAULT_MCP_CONFIG_FILE: &str = ".mcp.json";
const DEFAULT_APP_CONFIG_FILE: &str = ".app.json";
const DISABLE_CURATED_PLUGIN_SYNC_ENV_VAR: &str = "CODEX_DISABLE_CURATED_PLUGIN_SYNC";
const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated";
const REMOTE_PLUGIN_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -206,6 +214,111 @@ impl PluginLoadOutcome {
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RemotePluginSyncResult {
/// Plugin ids newly installed into the local plugin cache.
pub installed_plugin_ids: Vec<String>,
/// Plugin ids whose local config was changed to enabled.
pub enabled_plugin_ids: Vec<String>,
/// Plugin ids whose local config was changed to disabled.
pub disabled_plugin_ids: Vec<String>,
/// Plugin ids removed from local cache or plugin config.
pub uninstalled_plugin_ids: Vec<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum PluginRemoteSyncError {
#[error("chatgpt authentication required to sync remote plugins")]
AuthRequired,
#[error(
"chatgpt authentication required to sync remote plugins; api key auth is not supported"
)]
UnsupportedAuthMode,
#[error("failed to read auth token for remote plugin sync: {0}")]
AuthToken(#[source] std::io::Error),
#[error("failed to send remote plugin sync request to {url}: {source}")]
Request {
url: String,
#[source]
source: reqwest::Error,
},
#[error("remote plugin sync request to {url} failed with status {status}: {body}")]
UnexpectedStatus {
url: String,
status: reqwest::StatusCode,
body: String,
},
#[error("failed to parse remote plugin sync response from {url}: {source}")]
Decode {
url: String,
#[source]
source: serde_json::Error,
},
#[error("local curated marketplace is not available")]
LocalMarketplaceNotFound,
#[error("remote marketplace `{marketplace_name}` is not available locally")]
UnknownRemoteMarketplace { marketplace_name: String },
#[error("duplicate remote plugin `{plugin_name}` in sync response")]
DuplicateRemotePlugin { plugin_name: String },
#[error(
"remote plugin `{plugin_name}` was not found in local marketplace `{marketplace_name}`"
)]
UnknownRemotePlugin {
plugin_name: String,
marketplace_name: String,
},
#[error("{0}")]
InvalidPluginId(#[from] PluginIdError),
#[error("{0}")]
Marketplace(#[from] MarketplaceError),
#[error("{0}")]
Store(#[from] PluginStoreError),
#[error("{0}")]
Config(#[from] anyhow::Error),
#[error("failed to join remote plugin sync task: {0}")]
Join(#[from] tokio::task::JoinError),
}
impl PluginRemoteSyncError {
fn auth_token(source: std::io::Error) -> Self {
Self::AuthToken(source)
}
fn request(url: String, source: reqwest::Error) -> Self {
Self::Request { url, source }
}
fn join(source: tokio::task::JoinError) -> Self {
Self::Join(source)
}
}
#[derive(Debug, Deserialize)]
struct RemotePluginStatusSummary {
name: String,
#[serde(default = "default_remote_marketplace_name")]
marketplace_name: String,
enabled: bool,
}
fn default_remote_marketplace_name() -> String {
OPENAI_CURATED_MARKETPLACE_NAME.to_string()
}
pub struct PluginsManager {
codex_home: PathBuf,
store: PluginStore,
@@ -311,6 +424,169 @@ impl PluginsManager {
Ok(())
}
pub async fn sync_plugins_from_remote(
&self,
config: &Config,
auth: Option<&CodexAuth>,
) -> Result<RemotePluginSyncResult, PluginRemoteSyncError> {
info!("starting remote plugin sync");
let remote_plugins = fetch_remote_plugin_status(config, auth).await?;
let configured_plugins = configured_plugins_from_stack(&config.config_layer_stack);
let curated_marketplace_root = curated_plugins_repo_path(self.codex_home.as_path());
let curated_marketplace_path = AbsolutePathBuf::try_from(
curated_marketplace_root.join(".agents/plugins/marketplace.json"),
)
.map_err(|_| PluginRemoteSyncError::LocalMarketplaceNotFound)?;
let curated_marketplace = match load_marketplace_summary(&curated_marketplace_path) {
Ok(marketplace) => marketplace,
Err(MarketplaceError::MarketplaceNotFound { .. }) => {
return Err(PluginRemoteSyncError::LocalMarketplaceNotFound);
}
Err(err) => return Err(err.into()),
};
let marketplace_name = curated_marketplace.name.clone();
let mut local_plugins =
Vec::<(String, PluginId, AbsolutePathBuf, Option<bool>, bool)>::new();
let mut local_plugin_names = HashSet::new();
for plugin in curated_marketplace.plugins {
let plugin_name = plugin.name;
if !local_plugin_names.insert(plugin_name.clone()) {
warn!(
plugin = plugin_name,
marketplace = %marketplace_name,
"ignoring duplicate local plugin entry during remote sync"
);
continue;
}
let plugin_id = PluginId::new(plugin_name.clone(), marketplace_name.clone())?;
let plugin_key = plugin_id.as_key();
let source_path = match plugin.source {
MarketplacePluginSourceSummary::Local { path } => path,
};
let current_enabled = configured_plugins
.get(&plugin_key)
.map(|plugin| plugin.enabled);
let is_installed = self.store.is_installed(&plugin_id);
local_plugins.push((
plugin_name,
plugin_id,
source_path,
current_enabled,
is_installed,
));
}
let mut remote_enabled_by_name = HashMap::<String, bool>::new();
for plugin in remote_plugins {
if plugin.marketplace_name != marketplace_name {
return Err(PluginRemoteSyncError::UnknownRemoteMarketplace {
marketplace_name: plugin.marketplace_name,
});
}
if !local_plugin_names.contains(&plugin.name) {
warn!(
plugin = plugin.name,
marketplace = %marketplace_name,
"ignoring remote plugin missing from local marketplace during sync"
);
continue;
}
if remote_enabled_by_name
.insert(plugin.name.clone(), plugin.enabled)
.is_some()
{
return Err(PluginRemoteSyncError::DuplicateRemotePlugin {
plugin_name: plugin.name,
});
}
}
let mut config_edits = Vec::new();
let mut installs = Vec::new();
let mut uninstalls = Vec::new();
let mut result = RemotePluginSyncResult::default();
let remote_plugin_count = remote_enabled_by_name.len();
let local_plugin_count = local_plugins.len();
for (plugin_name, plugin_id, source_path, current_enabled, is_installed) in local_plugins {
let plugin_key = plugin_id.as_key();
if let Some(enabled) = remote_enabled_by_name.get(&plugin_name).copied() {
if !is_installed {
installs.push((source_path, plugin_id.clone()));
result.installed_plugin_ids.push(plugin_key.clone());
}
if current_enabled != Some(enabled) {
if enabled {
result.enabled_plugin_ids.push(plugin_key.clone());
} else {
result.disabled_plugin_ids.push(plugin_key.clone());
}
config_edits.push(ConfigEdit::SetPath {
segments: vec!["plugins".to_string(), plugin_key, "enabled".to_string()],
value: value(enabled),
});
}
} else {
if is_installed {
uninstalls.push(plugin_id);
}
if is_installed || current_enabled.is_some() {
result.uninstalled_plugin_ids.push(plugin_key.clone());
}
if current_enabled.is_some() {
config_edits.push(ConfigEdit::ClearPath {
segments: vec!["plugins".to_string(), plugin_key],
});
}
}
}
let store = self.store.clone();
let store_result = tokio::task::spawn_blocking(move || {
for (source_path, plugin_id) in installs {
store.install(source_path, plugin_id)?;
}
for plugin_id in uninstalls {
store.uninstall(&plugin_id)?;
}
Ok::<(), PluginStoreError>(())
})
.await
.map_err(PluginRemoteSyncError::join)?;
if let Err(err) = store_result {
self.clear_cache();
return Err(err.into());
}
let config_result = if config_edits.is_empty() {
Ok(())
} else {
ConfigEditsBuilder::new(&self.codex_home)
.with_edits(config_edits)
.apply()
.await
};
self.clear_cache();
config_result?;
info!(
marketplace = %marketplace_name,
remote_plugin_count,
local_plugin_count,
installed_plugin_ids = ?result.installed_plugin_ids,
enabled_plugin_ids = ?result.enabled_plugin_ids,
disabled_plugin_ids = ?result.disabled_plugin_ids,
uninstalled_plugin_ids = ?result.uninstalled_plugin_ids,
"completed remote plugin sync"
);
Ok(result)
}
pub fn list_marketplaces_for_config(
&self,
config: &Config,
@@ -416,6 +692,47 @@ impl PluginsManager {
}
}
async fn fetch_remote_plugin_status(
config: &Config,
auth: Option<&CodexAuth>,
) -> Result<Vec<RemotePluginStatusSummary>, PluginRemoteSyncError> {
let Some(auth) = auth else {
return Err(PluginRemoteSyncError::AuthRequired);
};
if !auth.is_chatgpt_auth() {
return Err(PluginRemoteSyncError::UnsupportedAuthMode);
}
let base_url = config.chatgpt_base_url.trim_end_matches('/');
let url = format!("{base_url}/plugins/list");
let client = build_reqwest_client();
let token = auth
.get_token()
.map_err(PluginRemoteSyncError::auth_token)?;
let mut request = client
.get(&url)
.timeout(REMOTE_PLUGIN_SYNC_TIMEOUT)
.bearer_auth(token);
if let Some(account_id) = auth.get_account_id() {
request = request.header("chatgpt-account-id", account_id);
}
let response = request
.send()
.await
.map_err(|source| PluginRemoteSyncError::request(url.clone(), source))?;
let status = response.status();
let body = response.text().await.unwrap_or_default();
if !status.is_success() {
return Err(PluginRemoteSyncError::UnexpectedStatus { url, status, body });
}
serde_json::from_str(&body).map_err(|source| PluginRemoteSyncError::Decode {
url: url.clone(),
source,
})
}
#[derive(Debug, thiserror::Error)]
pub enum PluginInstallError {
#[error("{0}")]
@@ -869,6 +1186,7 @@ struct PluginMcpDiscovery {
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::CodexAuth;
use crate::config::CONFIG_TOML_FILE;
use crate::config::ConfigBuilder;
use crate::config::types::McpServerTransportConfig;
@@ -881,6 +1199,12 @@ mod tests {
use std::fs;
use tempfile::TempDir;
use toml::Value;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn write_file(path: &Path, contents: &str) {
fs::create_dir_all(path.parent().expect("file should have a parent")).unwrap();
@@ -900,6 +1224,41 @@ mod tests {
fs::write(plugin_root.join(".mcp.json"), r#"{"mcpServers":{}}"#).unwrap();
}
fn write_openai_curated_marketplace(root: &Path, plugin_names: &[&str]) {
fs::create_dir_all(root.join(".git")).unwrap();
fs::create_dir_all(root.join(".agents/plugins")).unwrap();
let plugins = plugin_names
.iter()
.map(|plugin_name| {
format!(
r#"{{
"name": "{plugin_name}",
"source": {{
"source": "local",
"path": "./plugins/{plugin_name}"
}}
}}"#
)
})
.collect::<Vec<_>>()
.join(",\n");
fs::write(
root.join(".agents/plugins/marketplace.json"),
format!(
r#"{{
"name": "{OPENAI_CURATED_MARKETPLACE_NAME}",
"plugins": [
{plugins}
]
}}"#
),
)
.unwrap();
for plugin_name in plugin_names {
write_plugin(root, &format!("plugins/{plugin_name}"), plugin_name);
}
}
fn plugin_config_toml(enabled: bool, plugins_feature_enabled: bool) -> String {
let mut root = toml::map::Map::new();
@@ -2005,6 +2364,318 @@ enabled = true
);
}
#[tokio::test]
async fn sync_plugins_from_remote_reconciles_cache_and_config() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"linear/local",
"linear",
);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"calendar/local",
"calendar",
);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
[plugins."linear@openai-curated"]
enabled = false
[plugins."calendar@openai-curated"]
enabled = true
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.and(header("authorization", "Bearer Access Token"))
.and(header("chatgpt-account-id", "account_id"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true},
{"id":"2","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":false}
]"#,
))
.mount(&server)
.await;
let mut config = load_config(tmp.path(), tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = PluginsManager::new(tmp.path().to_path_buf());
let result = manager
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await
.unwrap();
assert_eq!(
result,
RemotePluginSyncResult {
installed_plugin_ids: vec!["gmail@openai-curated".to_string()],
enabled_plugin_ids: vec!["linear@openai-curated".to_string()],
disabled_plugin_ids: vec!["gmail@openai-curated".to_string()],
uninstalled_plugin_ids: vec!["calendar@openai-curated".to_string()],
}
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/linear/local")
.is_dir()
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/gmail/local")
.is_dir()
);
assert!(
!tmp.path()
.join("plugins/cache/openai-curated/calendar")
.exists()
);
let config = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap();
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(config.contains(r#"[plugins."gmail@openai-curated"]"#));
assert!(config.contains("enabled = true"));
assert!(config.contains("enabled = false"));
assert!(!config.contains(r#"[plugins."calendar@openai-curated"]"#));
let synced_config = load_config(tmp.path(), tmp.path()).await;
let curated_marketplace = manager
.list_marketplaces_for_config(&synced_config, &[])
.unwrap()
.into_iter()
.find(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
.unwrap();
assert_eq!(
curated_marketplace
.plugins
.into_iter()
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
.collect::<Vec<_>>(),
vec![
("linear@openai-curated".to_string(), true, true),
("gmail@openai-curated".to_string(), true, false),
("calendar@openai-curated".to_string(), false, false),
]
);
}
#[tokio::test]
async fn sync_plugins_from_remote_ignores_unknown_remote_plugins() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear"]);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
[plugins."linear@openai-curated"]
enabled = false
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"plugin-one","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
let mut config = load_config(tmp.path(), tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = PluginsManager::new(tmp.path().to_path_buf());
let result = manager
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await
.unwrap();
assert_eq!(
result,
RemotePluginSyncResult {
installed_plugin_ids: Vec::new(),
enabled_plugin_ids: Vec::new(),
disabled_plugin_ids: Vec::new(),
uninstalled_plugin_ids: vec!["linear@openai-curated".to_string()],
}
);
let config = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap();
assert!(!config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(
!tmp.path()
.join("plugins/cache/openai-curated/linear")
.exists()
);
}
#[tokio::test]
async fn sync_plugins_from_remote_keeps_existing_plugins_when_install_fails() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear", "gmail"]);
fs::remove_dir_all(curated_root.join("plugins/gmail")).unwrap();
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"linear/local",
"linear",
);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
[plugins."linear@openai-curated"]
enabled = false
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
let mut config = load_config(tmp.path(), tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = PluginsManager::new(tmp.path().to_path_buf());
let err = manager
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await
.unwrap_err();
assert!(matches!(
err,
PluginRemoteSyncError::Store(PluginStoreError::Invalid(ref message))
if message.contains("plugin source path is not a directory")
));
assert!(
tmp.path()
.join("plugins/cache/openai-curated/linear/local")
.is_dir()
);
assert!(
!tmp.path()
.join("plugins/cache/openai-curated/gmail")
.exists()
);
let config = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap();
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(!config.contains(r#"[plugins."gmail@openai-curated"]"#));
assert!(config.contains("enabled = false"));
}
#[tokio::test]
async fn sync_plugins_from_remote_uses_first_duplicate_local_plugin_entry() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
fs::create_dir_all(curated_root.join(".git")).unwrap();
fs::create_dir_all(curated_root.join(".agents/plugins")).unwrap();
fs::write(
curated_root.join(".agents/plugins/marketplace.json"),
r#"{
"name": "openai-curated",
"plugins": [
{
"name": "gmail",
"source": {
"source": "local",
"path": "./plugins/gmail-first"
}
},
{
"name": "gmail",
"source": {
"source": "local",
"path": "./plugins/gmail-second"
}
}
]
}"#,
)
.unwrap();
write_plugin(&curated_root, "plugins/gmail-first", "gmail");
write_plugin(&curated_root, "plugins/gmail-second", "gmail");
fs::write(curated_root.join("plugins/gmail-first/marker.txt"), "first").unwrap();
fs::write(
curated_root.join("plugins/gmail-second/marker.txt"),
"second",
)
.unwrap();
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
let mut config = load_config(tmp.path(), tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = PluginsManager::new(tmp.path().to_path_buf());
let result = manager
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await
.unwrap();
assert_eq!(
result,
RemotePluginSyncResult {
installed_plugin_ids: vec!["gmail@openai-curated".to_string()],
enabled_plugin_ids: vec!["gmail@openai-curated".to_string()],
disabled_plugin_ids: Vec::new(),
uninstalled_plugin_ids: Vec::new(),
}
);
assert_eq!(
fs::read_to_string(
tmp.path()
.join("plugins/cache/openai-curated/gmail/local/marker.txt")
)
.unwrap(),
"first"
);
}
#[test]
fn load_plugins_ignores_project_config_files() {
let codex_home = TempDir::new().unwrap();