Compare commits

...

3 Commits

Author SHA1 Message Date
Sama Setty
3581373031 codex: expose app tools for explicit plugin mentions 2026-04-13 13:01:22 -07:00
Sama Setty
efba567e6b changes 2026-04-13 12:43:39 -07:00
Sama Setty
db19109589 Wait for explicit plugin app tools 2026-04-13 12:38:34 -07:00
4 changed files with 289 additions and 7 deletions

View File

@@ -5,6 +5,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -75,6 +76,7 @@ use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
use codex_login::default_client::originator;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_mcp::McpConnectionManager;
use codex_mcp::SandboxState;
use codex_mcp::ToolInfo;
@@ -444,6 +446,7 @@ pub(crate) const INITIAL_SUBMIT_ID: &str = "";
pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 512;
const CYBER_VERIFY_URL: &str = "https://chatgpt.com/cyber";
const CYBER_SAFETY_URL: &str = "https://developers.openai.com/codex/concepts/cyber-safety";
const EXPLICIT_APPS_READY_TIMEOUT: Duration = Duration::from_secs(3);
impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub(crate) async fn spawn(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
@@ -5994,6 +5997,23 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
.collect()
}
// Explicit plugin mentions imply app usage even when the user did not
// mention the app directly. If those connectors are still missing from the
// current `codex_apps` snapshot, give startup a bounded chance to finish
fn explicitly_enabled_connectors_missing_from_tools(
connector_ids: &HashSet<String>,
mcp_tools: &HashMap<String, ToolInfo>,
) -> bool {
let accessible_connector_ids = connectors::accessible_connectors_from_mcp_tools(mcp_tools)
.into_iter()
.map(|connector| connector.id)
.collect::<HashSet<_>>();
connector_ids
.iter()
.any(|connector_id| !accessible_connector_ids.contains(connector_id))
}
/// Takes a user message as input and runs a loop where, at each sampling request, the model
/// replies with either:
///
@@ -6050,15 +6070,24 @@ pub(crate) async fn run_turn(
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries());
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
if turn_context.apps_enabled() {
// Treat app connectors declared by explicit plugin mentions as
// explicit for this turn too. That lets them participate in both
// startup waiting and first-turn tool exposure.
explicitly_enabled_connectors.extend(mentioned_plugins.iter().flat_map(|plugin| {
plugin
.app_connector_ids
.iter()
.map(|connector_id| connector_id.0.clone())
}));
}
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.read()
.await
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let mut mcp_tools = match mcp_connection_manager
.list_all_tools()
.or_cancel(&cancellation_token)
.await
@@ -6066,7 +6095,37 @@ pub(crate) async fn run_turn(
Ok(mcp_tools) => mcp_tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) => HashMap::new(),
};
if turn_context.apps_enabled()
&& !explicitly_enabled_connectors.is_empty()
&& explicitly_enabled_connectors_missing_from_tools(
&explicitly_enabled_connectors,
&mcp_tools,
)
{
// The caller explicitly asked for one of these app-backed surfaces,
// but the first snapshot still does not expose it, so wait
// briefly and then rebuild the tool view for this turn.
let codex_apps_ready = match mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, EXPLICIT_APPS_READY_TIMEOUT)
.or_cancel(&cancellation_token)
.await
{
Ok(codex_apps_ready) => codex_apps_ready,
Err(_) => return None,
};
if codex_apps_ready {
mcp_tools = match mcp_connection_manager
.list_all_tools()
.or_cancel(&cancellation_token)
.await
{
Ok(mcp_tools) => mcp_tools,
Err(_) => return None,
};
}
}
mcp_tools
} else {
HashMap::new()
};
@@ -6140,7 +6199,6 @@ pub(crate) async fn run_turn(
.filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata)
.collect::<Vec<_>>();
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
&skill_items,
&available_connectors,

View File

@@ -452,6 +452,49 @@ fn numbered_mcp_tools(count: usize) -> HashMap<String, ToolInfo> {
.collect()
}
#[test]
fn explicit_connectors_missing_from_tools_requests_startup_wait() {
let connector_ids = HashSet::from(["slack".to_string()]);
assert!(explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&HashMap::new(),
));
}
#[test]
fn explicit_connectors_present_in_tools_skip_startup_wait() {
let connector_ids = HashSet::from(["slack".to_string()]);
let mcp_tools = HashMap::from([(
"mcp__codex_apps__slack_search".to_string(),
make_mcp_tool(
CODEX_APPS_MCP_SERVER_NAME,
"slack_search",
Some("slack"),
Some("Slack"),
),
)]);
assert!(!explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&mcp_tools,
));
}
#[test]
fn explicit_connectors_ignore_non_app_tool_matches() {
let connector_ids = HashSet::from(["slack".to_string()]);
let mcp_tools = HashMap::from([(
"mcp__rmcp__slack_search".to_string(),
make_mcp_tool("rmcp", "slack_search", Some("slack"), Some("Slack")),
)]);
assert!(explicitly_enabled_connectors_missing_from_tools(
&connector_ids,
&mcp_tools,
));
}
fn tools_config_for_mcp_tool_exposure(search_tool: bool) -> ToolsConfig {
let config = test_config();
let model_info = ModelsManager::construct_model_info_offline_for_tests(

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use serde_json::Value;
use serde_json::json;
use std::time::Duration;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
@@ -36,6 +37,13 @@ impl AppsTestServer {
}
pub async fn mount_searchable(server: &MockServer) -> Result<Self> {
Self::mount_searchable_with_tools_list_delay(server, Duration::ZERO).await
}
pub async fn mount_searchable_with_tools_list_delay(
server: &MockServer,
tools_list_delay: Duration,
) -> Result<Self> {
mount_oauth_metadata(server).await;
mount_connectors_directory(server).await;
mount_streamable_http_json_rpc(
@@ -43,6 +51,7 @@ impl AppsTestServer {
CONNECTOR_NAME.to_string(),
CONNECTOR_DESCRIPTION.to_string(),
/*searchable*/ true,
tools_list_delay,
)
.await;
Ok(Self {
@@ -53,6 +62,15 @@ impl AppsTestServer {
pub async fn mount_with_connector_name(
server: &MockServer,
connector_name: &str,
) -> Result<Self> {
Self::mount_with_connector_name_and_tools_list_delay(server, connector_name, Duration::ZERO)
.await
}
pub async fn mount_with_connector_name_and_tools_list_delay(
server: &MockServer,
connector_name: &str,
tools_list_delay: Duration,
) -> Result<Self> {
mount_oauth_metadata(server).await;
mount_connectors_directory(server).await;
@@ -61,6 +79,7 @@ impl AppsTestServer {
connector_name.to_string(),
CONNECTOR_DESCRIPTION.to_string(),
/*searchable*/ false,
tools_list_delay,
)
.await;
Ok(Self {
@@ -117,6 +136,7 @@ async fn mount_streamable_http_json_rpc(
connector_name: String,
connector_description: String,
searchable: bool,
tools_list_delay: Duration,
) {
Mock::given(method("POST"))
.and(path_regex("^/api/codex/apps/?$"))
@@ -124,6 +144,7 @@ async fn mount_streamable_http_json_rpc(
connector_name,
connector_description,
searchable,
tools_list_delay,
})
.mount(server)
.await;
@@ -133,6 +154,7 @@ struct CodexAppsJsonRpcResponder {
connector_name: String,
connector_description: String,
searchable: bool,
tools_list_delay: Duration,
}
impl Respond for CodexAppsJsonRpcResponder {
@@ -302,7 +324,9 @@ impl Respond for CodexAppsJsonRpcResponder {
}));
}
}
ResponseTemplate::new(200).set_body_json(response)
ResponseTemplate::new(200)
.set_delay(self.tools_list_delay)
.set_body_json(response)
}
"tools/call" => {
let id = body.get("id").cloned().unwrap_or(Value::Null);

View File

@@ -8,6 +8,8 @@ use std::time::Instant;
use anyhow::Result;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_mcp::codex_apps_tools_cache_key;
use codex_models_manager::bundled_models_response;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use core_test_support::apps_test_server::AppsTestServer;
@@ -21,6 +23,8 @@ use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use sha1::Digest;
use sha1::Sha1;
use tempfile::TempDir;
use wiremock::MockServer;
@@ -341,6 +345,159 @@ async fn explicit_plugin_mentions_inject_plugin_guidance() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_wait_for_plugin_apps_to_finish_starting() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let apps_server = AppsTestServer::mount_with_connector_name_and_tools_list_delay(
&server,
"Google Calendar",
Duration::from_secs(1),
)
.await?;
let mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let codex_home = Arc::new(TempDir::new()?);
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let user_key = codex_apps_tools_cache_key(Some(&auth));
let user_key_json = serde_json::to_string(&user_key)?;
let mut hasher = Sha1::new();
hasher.update(user_key_json.as_bytes());
let cache_path = codex_home
.path()
.join("cache/codex_apps_tools")
.join(format!("{:x}.json", hasher.finalize()));
std::fs::create_dir_all(
cache_path
.parent()
.expect("codex apps tools cache path should have a parent"),
)?;
std::fs::write(
&cache_path,
serde_json::to_vec_pretty(&serde_json::json!({
"schema_version": 2,
"tools": [],
}))?,
)?;
write_plugin_app_plugin(codex_home.as_ref());
let mut builder = test_codex()
.with_home(codex_home)
.with_auth(auth)
.with_config(move |config| {
config
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_server.chatgpt_base_url;
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
items: vec![codex_protocol::user_input::UserInput::Mention {
name: "sample".into(),
path: format!("plugin://{SAMPLE_PLUGIN_CONFIG_NAME}"),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let request = mock.single_request();
let developer_messages = request.message_input_texts("developer");
assert!(
developer_messages
.iter()
.any(|text| text.contains("Apps from this plugin")),
"expected plugin app guidance after waiting for codex apps startup: {developer_messages:?}"
);
let request_tools = tool_names(&request.body_json());
assert!(
request_tools
.iter()
.any(|name| name == "mcp__codex_apps__google_calendar_create_event"),
"expected plugin app tools after waiting for codex apps startup: {request_tools:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_directly_expose_plugin_apps_with_tool_search() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let apps_server = AppsTestServer::mount_searchable(&server).await?;
let mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let codex_home = Arc::new(TempDir::new()?);
write_plugin_app_plugin(codex_home.as_ref());
let mut builder = test_codex()
.with_home(codex_home)
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model("gpt-5-codex")
.with_config(move |config| {
config
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.enable(Feature::ToolSearch)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_server.chatgpt_base_url;
config.model = Some("gpt-5-codex".to_string());
let mut model_catalog = bundled_models_response()
.unwrap_or_else(|err| panic!("bundled models.json should parse: {err}"));
let model = model_catalog
.models
.iter_mut()
.find(|model| model.slug == "gpt-5-codex")
.expect("gpt-5-codex exists in bundled models.json");
model.supports_search_tool = true;
config.model_catalog = Some(model_catalog);
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
items: vec![codex_protocol::user_input::UserInput::Mention {
name: "sample".into(),
path: format!("plugin://{SAMPLE_PLUGIN_CONFIG_NAME}"),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let request_tools = tool_names(&mock.single_request().body_json());
assert!(
request_tools.iter().any(|name| name == "tool_search"),
"expected tool_search when searchable apps are available: {request_tools:?}"
);
assert!(
request_tools
.iter()
.any(|name| name == "mcp__codex_apps__calendar_create_event"),
"expected explicit plugin mention to directly expose its app tools: {request_tools:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn explicit_plugin_mentions_track_plugin_used_analytics() -> Result<()> {
skip_if_no_network!(Ok(()));