mirror of
https://github.com/openai/codex.git
synced 2026-05-02 20:32:04 +03:00
This PR adds the API V2 version of the command‑execution approval flow
for the shell tool.
This PR wires the new RPC (`item/commandExecution/requestApproval`, V2
only) and related events (`item/started`, `item/completed`, and
`item/commandExecution/delta`, which are emitted in both V1 and V2)
through the app-server
protocol. The new approval RPC is only sent when the user initiates a
turn with the new `turn/start` API so we don't break backwards
compatibility with VSCE.
The approach I took was to make as few changes to the Codex core as
possible, leveraging existing `EventMsg` core events, and translating
those in app-server. I did have to add additional fields to
`EventMsg::ExecCommandEndEvent` to capture the command's input so that
app-server can statelessly transform these events to a
`ThreadItem::CommandExecution` item for the `item/completed` event.
Once we stabilize the API and it's complete enough for our partners, we
can work on migrating the core to be aware of command execution items as
a first-class concept.
**Note**: We'll need followup work to make sure these APIs work for the
unified exec tool, but will wait til that's stable and landed before
doing a pass on app-server.
Example payloads below:
```
{
"method": "item/started",
"params": {
"item": {
"aggregatedOutput": null,
"command": "/bin/zsh -lc 'touch /tmp/should-trigger-approval'",
"cwd": "/Users/owen/repos/codex/codex-rs",
"durationMs": null,
"exitCode": null,
"id": "call_lNWWsbXl1e47qNaYjFRs0dyU",
"parsedCmd": [
{
"cmd": "touch /tmp/should-trigger-approval",
"type": "unknown"
}
],
"status": "inProgress",
"type": "commandExecution"
}
}
}
```
```
{
"id": 0,
"method": "item/commandExecution/requestApproval",
"params": {
"itemId": "call_lNWWsbXl1e47qNaYjFRs0dyU",
"parsedCmd": [
{
"cmd": "touch /tmp/should-trigger-approval",
"type": "unknown"
}
],
"reason": "Need to create file in /tmp which is outside workspace sandbox",
"risk": null,
"threadId": "019a93e8-0a52-7fe3-9808-b6bc40c0989a",
"turnId": "1"
}
}
```
```
{
"id": 0,
"result": {
"acceptSettings": {
"forSession": false
},
"decision": "accept"
}
}
```
```
{
"params": {
"item": {
"aggregatedOutput": null,
"command": "/bin/zsh -lc 'touch /tmp/should-trigger-approval'",
"cwd": "/Users/owen/repos/codex/codex-rs",
"durationMs": 224,
"exitCode": 0,
"id": "call_lNWWsbXl1e47qNaYjFRs0dyU",
"parsedCmd": [
{
"cmd": "touch /tmp/should-trigger-approval",
"type": "unknown"
}
],
"status": "completed",
"type": "commandExecution"
}
}
}
```
332 lines
13 KiB
Rust
332 lines
13 KiB
Rust
//! Asynchronous worker that executes a **Codex** tool-call inside a spawned
|
|
//! Tokio task. Separated from `message_processor.rs` to keep that file small
|
|
//! and to make future feature-growth easier to manage.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
use crate::exec_approval::handle_exec_approval_request;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use crate::outgoing_message::OutgoingNotificationMeta;
|
|
use crate::patch_approval::handle_patch_approval_request;
|
|
use codex_core::CodexConversation;
|
|
use codex_core::ConversationManager;
|
|
use codex_core::NewConversation;
|
|
use codex_core::config::Config as CodexConfig;
|
|
use codex_core::protocol::AgentMessageEvent;
|
|
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
|
use codex_core::protocol::Event;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::ExecApprovalRequestEvent;
|
|
use codex_core::protocol::Op;
|
|
use codex_core::protocol::Submission;
|
|
use codex_core::protocol::TaskCompleteEvent;
|
|
use codex_protocol::ConversationId;
|
|
use codex_protocol::user_input::UserInput;
|
|
use mcp_types::CallToolResult;
|
|
use mcp_types::ContentBlock;
|
|
use mcp_types::RequestId;
|
|
use mcp_types::TextContent;
|
|
use serde_json::json;
|
|
use tokio::sync::Mutex;
|
|
|
|
pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
|
|
|
|
/// Run a complete Codex session and stream events back to the client.
|
|
///
|
|
/// On completion (success or error) the function sends the appropriate
|
|
/// `tools/call` response so the LLM can continue the conversation.
|
|
pub async fn run_codex_tool_session(
|
|
id: RequestId,
|
|
initial_prompt: String,
|
|
config: CodexConfig,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
conversation_manager: Arc<ConversationManager>,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
) {
|
|
let NewConversation {
|
|
conversation_id,
|
|
conversation,
|
|
session_configured,
|
|
} = match conversation_manager.new_conversation(config).await {
|
|
Ok(res) => res,
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Failed to start Codex session: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(id.clone(), result).await;
|
|
return;
|
|
}
|
|
};
|
|
|
|
let session_configured_event = Event {
|
|
// Use a fake id value for now.
|
|
id: "".to_string(),
|
|
msg: EventMsg::SessionConfigured(session_configured.clone()),
|
|
};
|
|
outgoing
|
|
.send_event_as_notification(
|
|
&session_configured_event,
|
|
Some(OutgoingNotificationMeta::new(Some(id.clone()))),
|
|
)
|
|
.await;
|
|
|
|
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
|
// any events emitted for this tool-call can be correlated with the
|
|
// originating `tools/call` request.
|
|
let sub_id = match &id {
|
|
RequestId::String(s) => s.clone(),
|
|
RequestId::Integer(n) => n.to_string(),
|
|
};
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.insert(id.clone(), conversation_id);
|
|
let submission = Submission {
|
|
id: sub_id.clone(),
|
|
op: Op::UserInput {
|
|
items: vec![UserInput::Text {
|
|
text: initial_prompt.clone(),
|
|
}],
|
|
},
|
|
};
|
|
|
|
if let Err(e) = conversation.submit_with_id(submission).await {
|
|
tracing::error!("Failed to submit initial prompt: {e}");
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid.lock().await.remove(&id);
|
|
return;
|
|
}
|
|
|
|
run_codex_tool_session_inner(
|
|
conversation,
|
|
outgoing,
|
|
id,
|
|
running_requests_id_to_codex_uuid,
|
|
)
|
|
.await;
|
|
}
|
|
|
|
pub async fn run_codex_tool_session_reply(
|
|
conversation: Arc<CodexConversation>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
request_id: RequestId,
|
|
prompt: String,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
conversation_id: ConversationId,
|
|
) {
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.insert(request_id.clone(), conversation_id);
|
|
if let Err(e) = conversation
|
|
.submit(Op::UserInput {
|
|
items: vec![UserInput::Text { text: prompt }],
|
|
})
|
|
.await
|
|
{
|
|
tracing::error!("Failed to submit user input: {e}");
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.remove(&request_id);
|
|
return;
|
|
}
|
|
|
|
run_codex_tool_session_inner(
|
|
conversation,
|
|
outgoing,
|
|
request_id,
|
|
running_requests_id_to_codex_uuid,
|
|
)
|
|
.await;
|
|
}
|
|
|
|
async fn run_codex_tool_session_inner(
|
|
codex: Arc<CodexConversation>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
request_id: RequestId,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
) {
|
|
let request_id_str = match &request_id {
|
|
RequestId::String(s) => s.clone(),
|
|
RequestId::Integer(n) => n.to_string(),
|
|
};
|
|
|
|
// Stream events until the task needs to pause for user interaction or
|
|
// completes.
|
|
loop {
|
|
match codex.next_event().await {
|
|
Ok(event) => {
|
|
outgoing
|
|
.send_event_as_notification(
|
|
&event,
|
|
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
|
|
)
|
|
.await;
|
|
|
|
match event.msg {
|
|
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
|
turn_id: _,
|
|
command,
|
|
cwd,
|
|
call_id,
|
|
reason: _,
|
|
risk,
|
|
parsed_cmd,
|
|
}) => {
|
|
handle_exec_approval_request(
|
|
command,
|
|
cwd,
|
|
outgoing.clone(),
|
|
codex.clone(),
|
|
request_id.clone(),
|
|
request_id_str.clone(),
|
|
event.id.clone(),
|
|
call_id,
|
|
parsed_cmd,
|
|
risk,
|
|
)
|
|
.await;
|
|
continue;
|
|
}
|
|
EventMsg::Error(err_event) => {
|
|
// Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption).
|
|
let result = json!({
|
|
"error": err_event.message,
|
|
});
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
break;
|
|
}
|
|
EventMsg::Warning(_) => {
|
|
continue;
|
|
}
|
|
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
|
call_id,
|
|
reason,
|
|
grant_root,
|
|
changes,
|
|
}) => {
|
|
handle_patch_approval_request(
|
|
call_id,
|
|
reason,
|
|
grant_root,
|
|
changes,
|
|
outgoing.clone(),
|
|
codex.clone(),
|
|
request_id.clone(),
|
|
request_id_str.clone(),
|
|
event.id.clone(),
|
|
)
|
|
.await;
|
|
continue;
|
|
}
|
|
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
|
|
let text = match last_agent_message {
|
|
Some(msg) => msg,
|
|
None => "".to_string(),
|
|
};
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text,
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.remove(&request_id);
|
|
break;
|
|
}
|
|
EventMsg::SessionConfigured(_) => {
|
|
tracing::error!("unexpected SessionConfigured event");
|
|
}
|
|
EventMsg::AgentMessageDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentReasoningDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::McpStartupUpdate(_) | EventMsg::McpStartupComplete(_) => {
|
|
// Ignored in MCP tool runner.
|
|
}
|
|
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentReasoningRawContent(_)
|
|
| EventMsg::AgentReasoningRawContentDelta(_)
|
|
| EventMsg::TaskStarted(_)
|
|
| EventMsg::TokenCount(_)
|
|
| EventMsg::AgentReasoning(_)
|
|
| EventMsg::AgentReasoningSectionBreak(_)
|
|
| EventMsg::McpToolCallBegin(_)
|
|
| EventMsg::McpToolCallEnd(_)
|
|
| EventMsg::McpListToolsResponse(_)
|
|
| EventMsg::ListCustomPromptsResponse(_)
|
|
| EventMsg::ExecCommandBegin(_)
|
|
| EventMsg::ExecCommandOutputDelta(_)
|
|
| EventMsg::ExecCommandEnd(_)
|
|
| EventMsg::BackgroundEvent(_)
|
|
| EventMsg::StreamError(_)
|
|
| EventMsg::PatchApplyBegin(_)
|
|
| EventMsg::PatchApplyEnd(_)
|
|
| EventMsg::TurnDiff(_)
|
|
| EventMsg::WebSearchBegin(_)
|
|
| EventMsg::WebSearchEnd(_)
|
|
| EventMsg::GetHistoryEntryResponse(_)
|
|
| EventMsg::PlanUpdate(_)
|
|
| EventMsg::TurnAborted(_)
|
|
| EventMsg::UserMessage(_)
|
|
| EventMsg::ShutdownComplete
|
|
| EventMsg::ViewImageToolCall(_)
|
|
| EventMsg::RawResponseItem(_)
|
|
| EventMsg::EnteredReviewMode(_)
|
|
| EventMsg::ItemStarted(_)
|
|
| EventMsg::ItemCompleted(_)
|
|
| EventMsg::AgentMessageContentDelta(_)
|
|
| EventMsg::ReasoningContentDelta(_)
|
|
| EventMsg::ReasoningRawContentDelta(_)
|
|
| EventMsg::UndoStarted(_)
|
|
| EventMsg::UndoCompleted(_)
|
|
| EventMsg::ExitedReviewMode(_)
|
|
| EventMsg::DeprecationNotice(_) => {
|
|
// For now, we do not do anything extra for these
|
|
// events. Note that
|
|
// send(codex_event_to_notification(&event)) above has
|
|
// already dispatched these events as notifications,
|
|
// though we may want to do give different treatment to
|
|
// individual events in the future.
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Codex runtime error: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
// TODO(mbolin): Could present the error in a more
|
|
// structured way.
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|