[App server] add mcp tool call item started/completed events (#6642)

this PR does two things:
1. refactor `apply_bespoke_event_handling` into a separate file as it's
getting kind of long;
2. add mcp tool call `item/started` and `item/completed` events. To roll
out app server events asap we didn't properly migrate mcp core events to
use TurnItem for mcp tool calls - this will be a follow-up PR.

real events generated in log:
```
{
  "method": "codex/event/mcp_tool_call_end",
  "params": {
    "conversationId": "019a8021-26af-7c20-83db-21ca81e44d68",
    "id": "0",
    "msg": {
      "call_id": "call_7EjRQkD9HnfyMWf7tGrT9FKA",
      "duration": {
        "nanos": 92708,
        "secs": 0
      },
      "invocation": {
        "arguments": {
          "server": ""
        },
        "server": "codex",
        "tool": "list_mcp_resources"
      },
      "result": {
        "Ok": {
          "content": [
            {
              "text": "{\"resources\":[]}",
              "type": "text"
            }
          ],
          "isError": false
        }
      },
      "type": "mcp_tool_call_end"
    }
  }
}

{
  "method": "item/completed",
  "params": {
    "item": {
      "arguments": {
        "server": ""
      },
      "error": null,
      "id": "call_7EjRQkD9HnfyMWf7tGrT9FKA",
      "result": {
        "content": [
          {
            "text": "{\"resources\":[]}",
            "type": "text"
          }
        ],
        "structuredContent": null
      },
      "server": "codex",
      "status": "completed",
      "tool": "list_mcp_resources",
      "type": "mcpToolCall"
    }
  }
}
```
This commit is contained in:
Celia Chen
2025-11-14 08:08:43 -08:00
committed by GitHub
parent f17b392470
commit 526777c9b4
6 changed files with 490 additions and 251 deletions

View File

@@ -1,3 +1,4 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::run_fuzzy_file_search;
@@ -8,13 +9,9 @@ use chrono::DateTime;
use chrono::Utc;
use codex_app_server_protocol::Account;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::AgentMessageDeltaNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApplyPatchApprovalResponse;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::ArchiveConversationResponse;
use codex_app_server_protocol::AskForApproval;
@@ -26,8 +23,6 @@ use codex_app_server_protocol::CancelLoginChatGptResponse;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::ExecCommandApprovalResponse;
use codex_app_server_protocol::ExecOneOffCommandParams;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FeedbackUploadParams;
@@ -46,9 +41,6 @@ use codex_app_server_protocol::GetUserSavedConfigResponse;
use codex_app_server_protocol::GitDiffToRemoteResponse;
use codex_app_server_protocol::InputItem as WireInputItem;
use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListConversationsResponse;
@@ -63,13 +55,9 @@ use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::SandboxMode;
@@ -78,7 +66,6 @@ use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::SendUserTurnResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
@@ -95,7 +82,6 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
@@ -127,12 +113,8 @@ use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox;
use codex_core::git_info::git_diff_to_remote;
use codex_core::parse_cursor;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_core::read_head_for_summary;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
@@ -167,7 +149,7 @@ use tracing::warn;
use uuid::Uuid;
type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>;
type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInterruptQueue>>>;
pub(crate) type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInterruptQueue>>>;
// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
@@ -198,7 +180,7 @@ pub(crate) struct CodexMessageProcessor {
}
#[derive(Clone, Copy, Debug)]
enum ApiVersion {
pub(crate) enum ApiVersion {
V1,
V2,
}
@@ -2655,157 +2637,6 @@ impl CodexMessageProcessor {
}
}
async fn apply_bespoke_event_handling(
event: Event,
conversation_id: ConversationId,
conversation: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
pending_interrupts: PendingInterrupts,
) {
let Event { id: event_id, msg } = event;
match msg {
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
changes,
reason,
grant_root,
}) => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes,
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
EventMsg::AgentMessageContentDelta(event) => {
let notification = AgentMessageDeltaNotification {
item_id: event.item_id,
delta: event.delta,
};
outgoing
.send_server_notification(ServerNotification::AgentMessageDelta(notification))
.await;
}
EventMsg::ReasoningContentDelta(event) => {
let notification = ReasoningSummaryTextDeltaNotification {
item_id: event.item_id,
delta: event.delta,
summary_index: event.summary_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningSummaryTextDelta(
notification,
))
.await;
}
EventMsg::ReasoningRawContentDelta(event) => {
let notification = ReasoningTextDeltaNotification {
item_id: event.item_id,
delta: event.delta,
content_index: event.content_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningTextDelta(notification))
.await;
}
EventMsg::AgentReasoningSectionBreak(event) => {
let notification = ReasoningSummaryPartAddedNotification {
item_id: event.item_id,
summary_index: event.summary_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningSummaryPartAdded(
notification,
))
.await;
}
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
command,
cwd,
reason,
risk,
parsed_cmd,
}) => {
let params = ExecCommandApprovalParams {
conversation_id,
call_id,
command,
cwd,
reason,
risk,
parsed_cmd,
};
let rx = outgoing
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
tokio::spawn(async move {
on_exec_approval_response(event_id, rx, conversation).await;
});
}
EventMsg::TokenCount(token_count_event) => {
if let Some(rate_limits) = token_count_event.rate_limits {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(
AccountRateLimitsUpdatedNotification {
rate_limits: rate_limits.into(),
},
))
.await;
}
}
EventMsg::ItemStarted(item_started_event) => {
let item: ThreadItem = item_started_event.item.clone().into();
let notification = ItemStartedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
EventMsg::ItemCompleted(item_completed_event) => {
let item: ThreadItem = item_completed_event.item.clone().into();
let notification = ItemCompletedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
// If this is a TurnAborted, reply to any pending interrupt requests.
EventMsg::TurnAborted(turn_aborted_event) => {
let pending = {
let mut map = pending_interrupts.lock().await;
map.remove(&conversation_id).unwrap_or_default()
};
if !pending.is_empty() {
for (rid, ver) in pending {
match ver {
ApiVersion::V1 => {
let response = InterruptConversationResponse {
abort_reason: turn_aborted_event.reason.clone(),
};
outgoing.send_response(rid, response).await;
}
ApiVersion::V2 => {
let response = TurnInterruptResponse {};
outgoing.send_response(rid, response).await;
}
}
}
}
}
_ => {}
}
}
async fn derive_config_from_params(
overrides: ConfigOverrides,
cli_overrides: Option<std::collections::HashMap<String, serde_json::Value>>,
@@ -2819,84 +2650,6 @@ async fn derive_config_from_params(
Config::load_with_cli_overrides(cli_overrides, overrides).await
}
async fn on_patch_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
codex: Arc<CodexConversation>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Err(err) => {
error!("request failed: {err:?}");
if let Err(submit_err) = codex
.submit(Op::PatchApproval {
id: event_id.clone(),
decision: ReviewDecision::Denied,
})
.await
{
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
}
return;
}
};
let response =
serde_json::from_value::<ApplyPatchApprovalResponse>(value).unwrap_or_else(|err| {
error!("failed to deserialize ApplyPatchApprovalResponse: {err}");
ApplyPatchApprovalResponse {
decision: ReviewDecision::Denied,
}
});
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
decision: response.decision,
})
.await
{
error!("failed to submit PatchApproval: {err}");
}
}
async fn on_exec_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
conversation: Arc<CodexConversation>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Err(err) => {
error!("request failed: {err:?}");
return;
}
};
// Try to deserialize `value` and then make the appropriate call to `codex`.
let response =
serde_json::from_value::<ExecCommandApprovalResponse>(value).unwrap_or_else(|err| {
error!("failed to deserialize ExecCommandApprovalResponse: {err}");
// If we cannot deserialize the response, we deny the request to be
// conservative.
ExecCommandApprovalResponse {
decision: ReviewDecision::Denied,
}
});
if let Err(err) = conversation
.submit(Op::ExecApproval {
id: event_id,
decision: response.decision,
})
.await
{
error!("failed to submit ExecApproval: {err}");
}
}
async fn read_summary_from_rollout(
path: &Path,
fallback_provider: &str,