[app-server] feat: v2 apply_patch approval flow (#6760)

This PR adds the API V2 version of the apply_patch approval flow, which
centers around `ThreadItem::FileChange`.

This PR wires the new RPC (`item/fileChange/requestApproval`, V2 only)
and related events (`item/started`, `item/completed` for
`ThreadItem::FileChange`, which are emitted in both V1 and V2) through
the app-server
protocol. The new approval RPC is only sent when the user initiates a
turn with the new `turn/start` API so we don't break backwards
compatibility with VSCE.

Similar to https://github.com/openai/codex/pull/6758, the approach I
took was to make as few changes to the Codex core as possible,
leveraging existing `EventMsg` core events, and translating those in
app-server. I did have to add a few additional fields to
`EventMsg::PatchApplyBegin` and `EventMsg::PatchApplyEnd`, but those
were fairly lightweight.

However, the `EventMsg`s emitted by core are the following:
```
1) Auto-approved (no request for approval)

- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd

2) Approved by user
- EventMsg::ApplyPatchApprovalRequest
- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd

3) Declined by user
- EventMsg::ApplyPatchApprovalRequest
- EventMsg::PatchApplyBegin
- EventMsg::PatchApplyEnd
```

For a request triggering an approval, this would result in:
```
item/fileChange/requestApproval
item/started
item/completed
```

which is different from the `ThreadItem::CommandExecution` flow
introduced in https://github.com/openai/codex/pull/6758, which does the
below and is preferable:
```
item/started
item/commandExecution/requestApproval
item/completed
```

To fix this, we leverage `TurnSummaryStore` on codex_message_processor
to store a little bit of state, allowing us to fire `item/started` and
`item/fileChange/requestApproval` whenever we receive the underlying
`EventMsg::ApplyPatchApprovalRequest`, and no-oping when we receive the
`EventMsg::PatchApplyBegin` later.

This is much less invasive than modifying the order of EventMsg within
core (I tried).

The resulting payloads:
```
{
  "method": "item/started",
  "params": {
    "item": {
      "changes": [
        {
          "diff": "Hello from Codex!\n",
          "kind": "add",
          "path": "/Users/owen/repos/codex/codex-rs/APPROVAL_DEMO.txt"
        }
      ],
      "id": "call_Nxnwj7B3YXigfV6Mwh03d686",
      "status": "inProgress",
      "type": "fileChange"
    }
  }
}
```

```
{
  "id": 0,
  "method": "item/fileChange/requestApproval",
  "params": {
    "grantRoot": null,
    "itemId": "call_Nxnwj7B3YXigfV6Mwh03d686",
    "reason": null,
    "threadId": "019a9e11-8295-7883-a283-779e06502c6f",
    "turnId": "1"
  }
}
```

```
{
  "id": 0,
  "result": {
    "decision": "accept"
  }
}
```

```
{
  "method": "item/completed",
  "params": {
    "item": {
      "changes": [
        {
          "diff": "Hello from Codex!\n",
          "kind": "add",
          "path": "/Users/owen/repos/codex/codex-rs/APPROVAL_DEMO.txt"
        }
      ],
      "id": "call_Nxnwj7B3YXigfV6Mwh03d686",
      "status": "completed",
      "type": "fileChange"
    }
  }
}
```
This commit is contained in:
Owen Lin
2025-11-19 20:13:31 -08:00
committed by GitHub
parent fb9849e1e3
commit d6c30ed25e
16 changed files with 693 additions and 22 deletions

View File

@@ -15,12 +15,17 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::ExecCommandApprovalResponse;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::FileUpdateChange;
use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::McpToolCallError;
use codex_app_server_protocol::McpToolCallResult;
use codex_app_server_protocol::McpToolCallStatus;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
@@ -40,6 +45,7 @@ use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange as CoreFileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
@@ -47,7 +53,9 @@ use codex_core::protocol::ReviewDecision;
use codex_core::review_format::format_review_findings_block;
use codex_protocol::ConversationId;
use codex_protocol::protocol::ReviewOutputEvent;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::error;
@@ -70,24 +78,74 @@ pub(crate) async fn apply_bespoke_event_handling(
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
turn_id,
changes,
reason,
grant_root,
}) => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes,
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
}) => match api_version {
ApiVersion::V1 => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes.clone(),
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = call_id.clone();
let patch_changes = convert_patch_changes(&changes);
let first_start = {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
summary.file_change_started.insert(item_id.clone())
};
if first_start {
let item = ThreadItem::FileChange {
id: item_id.clone(),
changes: patch_changes.clone(),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
let params = FileChangeRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
item_id: item_id.clone(),
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(
event_id,
conversation_id,
item_id,
patch_changes,
rx,
conversation,
outgoing,
turn_summary_store,
)
.await;
});
}
},
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
turn_id,
@@ -244,6 +302,49 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
EventMsg::PatchApplyBegin(patch_begin_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_begin_event.call_id.clone();
let first_start = {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
summary.file_change_started.insert(item_id.clone())
};
if first_start {
let item = ThreadItem::FileChange {
id: item_id.clone(),
changes: convert_patch_changes(&patch_begin_event.changes),
status: PatchApplyStatus::InProgress,
};
let notification = ItemStartedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
}
EventMsg::PatchApplyEnd(patch_end_event) => {
// Until we migrate the core to be aware of a first class FileChangeItem
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
let item_id = patch_end_event.call_id.clone();
let status = if patch_end_event.success {
PatchApplyStatus::Completed
} else {
PatchApplyStatus::Failed
};
let changes = convert_patch_changes(&patch_end_event.changes);
complete_file_change_item(
conversation_id,
item_id,
changes,
status,
outgoing.as_ref(),
&turn_summary_store,
)
.await;
}
EventMsg::ExecCommandBegin(exec_command_begin_event) => {
let item = ThreadItem::CommandExecution {
id: exec_command_begin_event.call_id.clone(),
@@ -365,6 +466,32 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn complete_file_change_item(
conversation_id: ConversationId,
item_id: String,
changes: Vec<FileUpdateChange>,
status: PatchApplyStatus,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
{
let mut map = turn_summary_store.lock().await;
if let Some(summary) = map.get_mut(&conversation_id) {
summary.file_change_started.remove(&item_id);
}
}
let item = ThreadItem::FileChange {
id: item_id,
changes,
status,
};
let notification = ItemCompletedNotification { item };
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
async fn find_and_remove_turn_summary(
conversation_id: ConversationId,
turn_summary_store: &TurnSummaryStore,
@@ -512,6 +639,110 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
}
}
fn convert_patch_changes(changes: &HashMap<PathBuf, CoreFileChange>) -> Vec<FileUpdateChange> {
let mut converted: Vec<FileUpdateChange> = changes
.iter()
.map(|(path, change)| FileUpdateChange {
path: path.to_string_lossy().into_owned(),
kind: map_patch_change_kind(change),
diff: format_file_change_diff(change),
})
.collect();
converted.sort_by(|a, b| a.path.cmp(&b.path));
converted
}
fn map_patch_change_kind(change: &CoreFileChange) -> V2PatchChangeKind {
match change {
CoreFileChange::Add { .. } => V2PatchChangeKind::Add,
CoreFileChange::Delete { .. } => V2PatchChangeKind::Delete,
CoreFileChange::Update { move_path, .. } => V2PatchChangeKind::Update {
move_path: move_path.clone(),
},
}
}
fn format_file_change_diff(change: &CoreFileChange) -> String {
match change {
CoreFileChange::Add { content } => content.clone(),
CoreFileChange::Delete { content } => content.clone(),
CoreFileChange::Update {
unified_diff,
move_path,
} => {
if let Some(path) = move_path {
format!("{unified_diff}\n\nMoved to: {}", path.display())
} else {
unified_diff.clone()
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn on_file_change_request_approval_response(
event_id: String,
conversation_id: ConversationId,
item_id: String,
changes: Vec<FileUpdateChange>,
receiver: oneshot::Receiver<JsonValue>,
codex: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
turn_summary_store: TurnSummaryStore,
) {
let response = receiver.await;
let (decision, completion_status) = match response {
Ok(value) => {
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
FileChangeRequestApprovalResponse {
decision: ApprovalDecision::Decline,
}
});
let (decision, completion_status) = match response.decision {
ApprovalDecision::Accept => (ReviewDecision::Approved, None),
ApprovalDecision::Decline => {
(ReviewDecision::Denied, Some(PatchApplyStatus::Declined))
}
ApprovalDecision::Cancel => {
(ReviewDecision::Abort, Some(PatchApplyStatus::Declined))
}
};
// Allow EventMsg::PatchApplyEnd to emit ItemCompleted for accepted patches.
// Only short-circuit on declines/cancels/failures.
(decision, completion_status)
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
}
};
if let Some(status) = completion_status {
complete_file_change_item(
conversation_id,
item_id,
changes,
status,
outgoing.as_ref(),
&turn_summary_store,
)
.await;
}
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
decision,
})
.await
{
error!("failed to submit PatchApproval: {err}");
}
}
async fn on_command_execution_request_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonValue>,