mirror of
https://github.com/openai/codex.git
synced 2026-05-04 21:32:21 +03:00
Merge remote-tracking branch 'origin/main' into jif/infty
# Conflicts: # codex-rs/Cargo.toml # codex-rs/cli/src/main.rs
This commit is contained in:
@@ -17,6 +17,7 @@ use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -27,6 +28,12 @@ use futures::future::BoxFuture;
|
||||
use futures::prelude::*;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::ListResourceTemplatesRequestParams;
|
||||
use mcp_types::ListResourceTemplatesResult;
|
||||
use mcp_types::ListResourcesRequestParams;
|
||||
use mcp_types::ListResourcesResult;
|
||||
use mcp_types::ReadResourceRequestParams;
|
||||
use mcp_types::ReadResourceResult;
|
||||
use serde_json;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -99,6 +106,7 @@ use crate::rollout::RolloutRecorderParams;
|
||||
use crate::shell;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::SessionServices;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::RegularTask;
|
||||
use crate::tasks::ReviewTask;
|
||||
@@ -364,15 +372,32 @@ impl Session {
|
||||
|
||||
let mcp_fut = McpConnectionManager::new(
|
||||
config.mcp_servers.clone(),
|
||||
config.use_experimental_use_rmcp_client,
|
||||
config
|
||||
.features
|
||||
.enabled(crate::features::Feature::RmcpClient),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
);
|
||||
let default_shell_fut = shell::default_user_shell();
|
||||
let history_meta_fut = crate::message_history::history_metadata(&config);
|
||||
let auth_statuses_fut = compute_auth_statuses(
|
||||
config.mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
);
|
||||
|
||||
// Join all independent futures.
|
||||
let (rollout_recorder, mcp_res, default_shell, (history_log_id, history_entry_count)) =
|
||||
tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut);
|
||||
let (
|
||||
rollout_recorder,
|
||||
mcp_res,
|
||||
default_shell,
|
||||
(history_log_id, history_entry_count),
|
||||
auth_statuses,
|
||||
) = tokio::join!(
|
||||
rollout_fut,
|
||||
mcp_fut,
|
||||
default_shell_fut,
|
||||
history_meta_fut,
|
||||
auth_statuses_fut
|
||||
);
|
||||
|
||||
let rollout_recorder = rollout_recorder.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
@@ -399,11 +424,24 @@ impl Session {
|
||||
// Surface individual client start-up failures to the user.
|
||||
if !failed_clients.is_empty() {
|
||||
for (server_name, err) in failed_clients {
|
||||
let message = format!("MCP client for `{server_name}` failed to start: {err:#}");
|
||||
error!("{message}");
|
||||
let log_message =
|
||||
format!("MCP client for `{server_name}` failed to start: {err:#}");
|
||||
error!("{log_message}");
|
||||
let display_message = if matches!(
|
||||
auth_statuses.get(&server_name),
|
||||
Some(McpAuthStatus::NotLoggedIn)
|
||||
) {
|
||||
format!(
|
||||
"The {server_name} MCP server is not logged in. Run `codex mcp login {server_name}` to log in."
|
||||
)
|
||||
} else {
|
||||
log_message
|
||||
};
|
||||
post_session_configured_error_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::Error(ErrorEvent { message }),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: display_message,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -413,6 +451,7 @@ impl Session {
|
||||
config.model.as_str(),
|
||||
config.model_family.slug.as_str(),
|
||||
auth_manager.auth().and_then(|a| a.get_account_id()),
|
||||
auth_manager.auth().and_then(|a| a.get_account_email()),
|
||||
auth_manager.auth().map(|a| a.mode),
|
||||
config.otel.log_user_prompt,
|
||||
terminal::user_agent(),
|
||||
@@ -446,12 +485,7 @@ impl Session {
|
||||
client,
|
||||
tools_config: ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
include_plan_tool: config.include_plan_tool,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: config.tools_web_search_request,
|
||||
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
|
||||
include_view_image_tool: config.include_view_image_tool,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
features: &config.features,
|
||||
}),
|
||||
user_instructions,
|
||||
base_instructions,
|
||||
@@ -593,6 +627,7 @@ impl Session {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
}
|
||||
|
||||
let parsed_cmd = parse_command(&command);
|
||||
let event = Event {
|
||||
id: event_id,
|
||||
msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
@@ -600,6 +635,7 @@ impl Session {
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
parsed_cmd,
|
||||
}),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
@@ -855,10 +891,7 @@ impl Session {
|
||||
call_id,
|
||||
command: command_for_display.clone(),
|
||||
cwd,
|
||||
parsed_cmd: parse_command(&command_for_display)
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
parsed_cmd: parse_command(&command_for_display),
|
||||
}),
|
||||
};
|
||||
let event = Event {
|
||||
@@ -883,6 +916,7 @@ impl Session {
|
||||
duration,
|
||||
exit_code,
|
||||
timed_out: _,
|
||||
..
|
||||
} = output;
|
||||
// Send full stdout/stderr to clients; do not truncate.
|
||||
let stdout = stdout.text.clone();
|
||||
@@ -947,15 +981,28 @@ impl Session {
|
||||
let sub_id = context.sub_id.clone();
|
||||
let call_id = context.call_id.clone();
|
||||
|
||||
self.on_exec_command_begin(turn_diff_tracker.clone(), context.clone())
|
||||
.await;
|
||||
|
||||
let begin_turn_diff = turn_diff_tracker.clone();
|
||||
let begin_context = context.clone();
|
||||
let session = self;
|
||||
let result = self
|
||||
.services
|
||||
.executor
|
||||
.run(request, self, approval_policy, &context)
|
||||
.run(request, self, approval_policy, &context, move || {
|
||||
let turn_diff = begin_turn_diff.clone();
|
||||
let ctx = begin_context.clone();
|
||||
async move {
|
||||
session.on_exec_command_begin(turn_diff, ctx).await;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
if matches!(
|
||||
&result,
|
||||
Err(ExecError::Function(FunctionCallError::Denied(_)))
|
||||
) {
|
||||
return result;
|
||||
}
|
||||
|
||||
let normalized = normalize_exec_result(&result);
|
||||
let borrowed = normalized.event_output();
|
||||
|
||||
@@ -1035,6 +1082,39 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_resources(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<ListResourcesRequestParams>,
|
||||
) -> anyhow::Result<ListResourcesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.list_resources(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn list_resource_templates(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<ListResourceTemplatesRequestParams>,
|
||||
) -> anyhow::Result<ListResourceTemplatesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.list_resource_templates(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn read_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
params: ReadResourceRequestParams,
|
||||
) -> anyhow::Result<ReadResourceResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read_resource(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn call_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -1200,12 +1280,7 @@ async fn submission_loop(
|
||||
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &effective_family,
|
||||
include_plan_tool: config.include_plan_tool,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: config.tools_web_search_request,
|
||||
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
|
||||
include_view_image_tool: config.include_view_image_tool,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
features: &config.features,
|
||||
});
|
||||
|
||||
let new_turn_context = TurnContext {
|
||||
@@ -1302,14 +1377,7 @@ async fn submission_loop(
|
||||
client,
|
||||
tools_config: ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &model_family,
|
||||
include_plan_tool: config.include_plan_tool,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: config.tools_web_search_request,
|
||||
use_streamable_shell_tool: config
|
||||
.use_experimental_streamable_shell_tool,
|
||||
include_view_image_tool: config.include_view_image_tool,
|
||||
experimental_unified_exec_tool: config
|
||||
.use_experimental_unified_exec_tool,
|
||||
features: &config.features,
|
||||
}),
|
||||
user_instructions: turn_context.user_instructions.clone(),
|
||||
base_instructions: turn_context.base_instructions.clone(),
|
||||
@@ -1409,16 +1477,23 @@ async fn submission_loop(
|
||||
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
let tools = sess.services.mcp_connection_manager.list_all_tools();
|
||||
let auth_statuses = compute_auth_statuses(
|
||||
config.mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
)
|
||||
.await;
|
||||
let (auth_statuses, resources, resource_templates) = tokio::join!(
|
||||
compute_auth_statuses(
|
||||
config.mcp_servers.iter(),
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
),
|
||||
sess.services.mcp_connection_manager.list_all_resources(),
|
||||
sess.services
|
||||
.mcp_connection_manager
|
||||
.list_all_resource_templates()
|
||||
);
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::McpListToolsResponse(
|
||||
crate::protocol::McpListToolsResponseEvent {
|
||||
tools,
|
||||
resources,
|
||||
resource_templates,
|
||||
auth_statuses,
|
||||
},
|
||||
),
|
||||
@@ -1541,14 +1616,15 @@ async fn spawn_review_thread(
|
||||
let model = config.review_model.clone();
|
||||
let review_model_family = find_family_for_model(&model)
|
||||
.unwrap_or_else(|| parent_turn_context.client.get_model_family());
|
||||
// For reviews, disable plan, web_search, view_image regardless of global settings.
|
||||
let mut review_features = config.features.clone();
|
||||
review_features.disable(crate::features::Feature::PlanTool);
|
||||
review_features.disable(crate::features::Feature::WebSearchRequest);
|
||||
review_features.disable(crate::features::Feature::ViewImageTool);
|
||||
review_features.disable(crate::features::Feature::StreamableShell);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &review_model_family,
|
||||
include_plan_tool: false,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: false,
|
||||
use_streamable_shell_tool: false,
|
||||
include_view_image_tool: false,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
features: &review_features,
|
||||
});
|
||||
|
||||
let base_instructions = REVIEW_PROMPT.to_string();
|
||||
@@ -1639,6 +1715,7 @@ pub(crate) async fn run_task(
|
||||
turn_context: Arc<TurnContext>,
|
||||
sub_id: String,
|
||||
input: Vec<InputItem>,
|
||||
task_kind: TaskKind,
|
||||
) -> Option<String> {
|
||||
if input.is_empty() {
|
||||
return None;
|
||||
@@ -1722,6 +1799,7 @@ pub(crate) async fn run_task(
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
sub_id.clone(),
|
||||
turn_input,
|
||||
task_kind,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -1874,6 +1952,7 @@ pub(crate) async fn run_task(
|
||||
);
|
||||
sess.notifier()
|
||||
.notify(&UserNotification::AgentTurnComplete {
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: sub_id.clone(),
|
||||
input_messages: turn_input_messages,
|
||||
last_assistant_message: last_agent_message.clone(),
|
||||
@@ -1947,6 +2026,7 @@ async fn run_turn(
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
sub_id: String,
|
||||
input: Vec<ResponseItem>,
|
||||
task_kind: TaskKind,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
let mcp_tools = sess.services.mcp_connection_manager.list_all_tools();
|
||||
let router = Arc::new(ToolRouter::from_config(
|
||||
@@ -1976,6 +2056,7 @@ async fn run_turn(
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
&sub_id,
|
||||
&prompt,
|
||||
task_kind,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -2049,6 +2130,7 @@ async fn try_run_turn(
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
sub_id: &str,
|
||||
prompt: &Prompt,
|
||||
task_kind: TaskKind,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
// call_ids that are part of this response.
|
||||
let completed_call_ids = prompt
|
||||
@@ -2114,7 +2196,11 @@ async fn try_run_turn(
|
||||
summary: turn_context.client.get_reasoning_summary(),
|
||||
});
|
||||
sess.persist_rollout_items(&[rollout_item]).await;
|
||||
let mut stream = turn_context.client.clone().stream(&prompt).await?;
|
||||
let mut stream = turn_context
|
||||
.client
|
||||
.clone()
|
||||
.stream_with_task_kind(prompt.as_ref(), task_kind)
|
||||
.await?;
|
||||
|
||||
let tool_runtime = ToolCallRuntime::new(
|
||||
Arc::clone(&router),
|
||||
@@ -2195,7 +2281,8 @@ async fn try_run_turn(
|
||||
response: Some(response),
|
||||
});
|
||||
}
|
||||
Err(FunctionCallError::RespondToModel(message)) => {
|
||||
Err(FunctionCallError::RespondToModel(message))
|
||||
| Err(FunctionCallError::Denied(message)) => {
|
||||
let response = ResponseInputItem::FunctionCallOutput {
|
||||
call_id: String::new(),
|
||||
output: FunctionCallOutputPayload {
|
||||
@@ -2724,6 +2811,7 @@ mod tests {
|
||||
config.model.as_str(),
|
||||
config.model_family.slug.as_str(),
|
||||
None,
|
||||
Some("test@test.com".to_string()),
|
||||
Some(AuthMode::ChatGPT),
|
||||
false,
|
||||
"test".to_string(),
|
||||
@@ -2753,12 +2841,7 @@ mod tests {
|
||||
);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
include_plan_tool: config.include_plan_tool,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: config.tools_web_search_request,
|
||||
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
|
||||
include_view_image_tool: config.include_view_image_tool,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
features: &config.features,
|
||||
});
|
||||
let turn_context = TurnContext {
|
||||
client,
|
||||
@@ -2826,12 +2909,7 @@ mod tests {
|
||||
);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
include_plan_tool: config.include_plan_tool,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: config.tools_web_search_request,
|
||||
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
|
||||
include_view_image_tool: config.include_view_image_tool,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
features: &config.features,
|
||||
});
|
||||
let turn_context = Arc::new(TurnContext {
|
||||
client,
|
||||
|
||||
Reference in New Issue
Block a user