diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 31bc8d65a7..34a105b269 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2245,6 +2245,7 @@ dependencies = [ "chrono", "clap", "codex-ansi-escape", + "codex-app-server", "codex-app-server-protocol", "codex-arg0", "codex-backend-client", diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index a9ce27ed0d..e29122bbf9 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -235,6 +235,11 @@ client_request_definitions! { params: v2::ThreadReadParams, response: v2::ThreadReadResponse, }, + #[experimental("thread/submitOp")] + ThreadSubmitOp => "thread/submitOp" { + params: v2::ThreadSubmitOpParams, + response: v2::ThreadSubmitOpResponse, + }, SkillsList => "skills/list" { params: v2::SkillsListParams, response: v2::SkillsListResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 43038d351c..f8cc7601fc 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1962,6 +1962,21 @@ pub struct ThreadReadResponse { pub thread: Thread, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadSubmitOpParams { + pub thread_id: String, + /// Serialized `codex_protocol::protocol::Op` payload used as a temporary + /// compatibility bridge while clients migrate to typed app-server methods. + pub op: JsonValue, +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadSubmitOpResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d28b79db7e..c5651d2220 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -149,6 +149,8 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadSubmitOpParams; +use codex_app_server_protocol::ThreadSubmitOpResponse; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::ThreadUnarchiveResponse; use codex_app_server_protocol::ThreadUnarchivedNotification; @@ -586,6 +588,10 @@ impl CodexMessageProcessor { self.thread_read(to_connection_request_id(request_id), params) .await; } + ClientRequest::ThreadSubmitOp { request_id, params } => { + self.thread_submit_op(to_connection_request_id(request_id), params) + .await; + } ClientRequest::SkillsList { request_id, params } => { self.skills_list(to_connection_request_id(request_id), params) .await; @@ -2784,6 +2790,46 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } + async fn thread_submit_op( + &self, + request_id: ConnectionRequestId, + params: ThreadSubmitOpParams, + ) { + let ThreadSubmitOpParams { thread_id, op } = params; + + let (_, thread) = match self.load_thread(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let op = match serde_json::from_value::(op) { + Ok(op) => op, + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("invalid core op payload: {err}"), + ) + .await; + return; + } + }; + + match thread.submit(op).await { + Ok(_) => { + self.outgoing + .send_response(request_id, ThreadSubmitOpResponse {}) + .await; + } + Err(err) => { + self.send_internal_error(request_id, format!("failed to submit op: {err}")) + .await; + } + } + } + pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { self.thread_manager.subscribe_thread_created() } diff --git a/codex-rs/app-server/src/embedded_session.rs b/codex-rs/app-server/src/embedded_session.rs new file mode 100644 index 0000000000..10313c36eb --- /dev/null +++ b/codex-rs/app-server/src/embedded_session.rs @@ -0,0 +1,158 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use crate::codex_message_processor::CodexMessageProcessor; +use crate::codex_message_processor::CodexMessageProcessorArgs; +use crate::outgoing_message::ConnectionId; +use crate::outgoing_message::OutgoingEnvelope; +use crate::outgoing_message::OutgoingMessage; +use crate::outgoing_message::OutgoingMessageSender; +use crate::transport::CHANNEL_CAPACITY; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_cloud_requirements::cloud_requirements_loader; +use codex_core::AuthManager; +use codex_core::ThreadManager; +use codex_core::config::Config; +use codex_feedback::CodexFeedback; +use tokio::sync::mpsc; +use toml::Value as TomlValue; + +const EMBEDDED_CONNECTION_ID: ConnectionId = ConnectionId(0); + +#[derive(Debug)] +enum EmbeddedSessionInput { + Request(ClientRequest), + Response(JSONRPCResponse), + Error(JSONRPCError), +} + +pub struct EmbeddedSessionClientArgs { + pub auth_manager: Arc, + pub thread_manager: Arc, + pub config: Config, + pub cli_overrides: Vec<(String, TomlValue)>, + pub feedback: CodexFeedback, + pub codex_linux_sandbox_exe: Option, +} + +pub struct EmbeddedSessionClient { + input_tx: mpsc::Sender, + output_rx: mpsc::Receiver, +} + +impl EmbeddedSessionClient { + pub fn spawn(args: EmbeddedSessionClientArgs) -> Self { + let (input_tx, mut input_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (output_tx, output_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + + let EmbeddedSessionClientArgs { + auth_manager, + thread_manager, + config, + cli_overrides, + feedback, + codex_linux_sandbox_exe, + } = args; + let config = Arc::new(config); + let cloud_requirements = Arc::new(std::sync::RwLock::new(cloud_requirements_loader( + auth_manager.clone(), + config.chatgpt_base_url.clone(), + config.codex_home.clone(), + ))); + + let mut processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { + auth_manager, + thread_manager, + outgoing: outgoing.clone(), + codex_linux_sandbox_exe, + config, + cli_overrides, + cloud_requirements, + single_client_mode: true, + feedback, + }); + + tokio::spawn(async move { + while let Some(envelope) = outgoing_rx.recv().await { + let message = match envelope { + OutgoingEnvelope::ToConnection { + connection_id, + message, + } => { + if connection_id != EMBEDDED_CONNECTION_ID { + continue; + } + outgoing_message_to_jsonrpc(message) + } + OutgoingEnvelope::Broadcast { message } => outgoing_message_to_jsonrpc(message), + }; + + if output_tx.send(message).await.is_err() { + break; + } + } + }); + + tokio::spawn(async move { + while let Some(input) = input_rx.recv().await { + match input { + EmbeddedSessionInput::Request(request) => { + processor + .process_request(EMBEDDED_CONNECTION_ID, request) + .await; + } + EmbeddedSessionInput::Response(response) => { + outgoing + .notify_client_response(response.id, response.result) + .await; + } + EmbeddedSessionInput::Error(error) => { + outgoing.notify_client_error(error.id, error.error).await; + } + } + } + processor.connection_closed(EMBEDDED_CONNECTION_ID).await; + }); + + Self { + input_tx, + output_rx, + } + } + + pub async fn send_request(&self, request: ClientRequest) -> std::io::Result<()> { + self.input_tx + .send(EmbeddedSessionInput::Request(request)) + .await + .map_err(|_| std::io::Error::other("embedded app-server session is closed")) + } + + pub async fn send_response(&self, response: JSONRPCResponse) -> std::io::Result<()> { + self.input_tx + .send(EmbeddedSessionInput::Response(response)) + .await + .map_err(|_| std::io::Error::other("embedded app-server session is closed")) + } + + pub async fn send_error(&self, error: JSONRPCError) -> std::io::Result<()> { + self.input_tx + .send(EmbeddedSessionInput::Error(error)) + .await + .map_err(|_| std::io::Error::other("embedded app-server session is closed")) + } + + pub async fn recv(&mut self) -> Option { + self.output_rx.recv().await + } +} + +fn outgoing_message_to_jsonrpc(message: OutgoingMessage) -> JSONRPCMessage { + let value = + serde_json::to_value(message).expect("outgoing app-server message should serialize"); + serde_json::from_value(value).expect("outgoing app-server message should decode as JSON-RPC") +} diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 1bdbf97a13..a4bdb029e3 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -56,6 +56,7 @@ mod bespoke_event_handling; mod codex_message_processor; mod config_api; mod dynamic_tools; +mod embedded_session; mod error_code; mod filters; mod fuzzy_file_search; @@ -67,6 +68,8 @@ mod thread_status; mod transport; pub use crate::transport::AppServerTransport; +pub use embedded_session::EmbeddedSessionClient; +pub use embedded_session::EmbeddedSessionClientArgs; const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT"; diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 683b0828dc..66b6c3d975 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -27,6 +27,7 @@ base64 = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } codex-ansi-escape = { workspace = true } +codex-app-server = { workspace = true } codex-app-server-protocol = { workspace = true } codex-arg0 = { workspace = true } codex-backend-client = { workspace = true } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 238a60730c..54d4732e2e 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -2616,7 +2616,13 @@ impl ChatWidget { let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep); let mut rng = rand::rng(); let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string(); - let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager); + let codex_op_tx = spawn_agent( + config.clone(), + app_event_tx.clone(), + thread_manager, + auth_manager.clone(), + feedback.clone(), + ); let model_override = model.as_deref(); let model_for_header = model diff --git a/codex-rs/tui/src/chatwidget/agent.rs b/codex-rs/tui/src/chatwidget/agent.rs index 63d519a7a5..a5d478e7e3 100644 --- a/codex-rs/tui/src/chatwidget/agent.rs +++ b/codex-rs/tui/src/chatwidget/agent.rs @@ -1,73 +1,152 @@ +use std::collections::HashMap; use std::sync::Arc; +use codex_app_server::EmbeddedSessionClient; +use codex_app_server::EmbeddedSessionClientArgs; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::DynamicToolCallOutputContentItem as V2DynamicToolCallOutputContentItem; +use codex_app_server_protocol::DynamicToolCallResponse as V2DynamicToolCallResponse; +use codex_app_server_protocol::FileChangeApprovalDecision; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId as AppServerRequestId; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadSubmitOpParams; +use codex_app_server_protocol::ToolRequestUserInputAnswer; +use codex_app_server_protocol::ToolRequestUserInputResponse; +use codex_core::AuthManager; use codex_core::CodexThread; -use codex_core::NewThread; use codex_core::ThreadManager; use codex_core::config::Config; +use codex_feedback::CodexFeedback; +use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem; +use codex_protocol::dynamic_tools::DynamicToolResponse as CoreDynamicToolResponse; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; +use codex_protocol::protocol::ReviewDecision; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::unbounded_channel; use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; +#[derive(Default)] +struct PendingServerRequests { + exec_approval_by_id: HashMap, + patch_approval_by_item_id: HashMap, + user_input_by_turn_id: HashMap, + dynamic_tool_by_call_id: HashMap, +} + /// Spawn the agent bootstrapper and op forwarding loop, returning the /// `UnboundedSender` used by the UI to submit operations. pub(crate) fn spawn_agent( config: Config, app_event_tx: AppEventSender, server: Arc, + auth_manager: Arc, + feedback: CodexFeedback, ) -> UnboundedSender { let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); - let app_event_tx_clone = app_event_tx; tokio::spawn(async move { - let NewThread { - thread, - session_configured, - .. - } = match server.start_thread(config).await { - Ok(v) => v, - Err(err) => { - let message = format!("Failed to initialize codex: {err}"); - tracing::error!("{message}"); - app_event_tx_clone.send(AppEvent::CodexEvent(Event { - id: "".to_string(), - msg: EventMsg::Error(err.to_error_event(None)), - })); - app_event_tx_clone.send(AppEvent::FatalExitRequest(message)); - tracing::error!("failed to initialize codex: {err}"); - return; - } - }; - - // Forward the captured `SessionConfigured` event so it can be rendered in the UI. - let ev = codex_protocol::protocol::Event { - // The `id` does not matter for rendering, so we can use a fake value. - id: "".to_string(), - msg: codex_protocol::protocol::EventMsg::SessionConfigured(session_configured), - }; - app_event_tx_clone.send(AppEvent::CodexEvent(ev)); - - let thread_clone = thread.clone(); - tokio::spawn(async move { - while let Some(op) = codex_op_rx.recv().await { - let id = thread_clone.submit(op).await; - if let Err(e) = id { - tracing::error!("failed to submit op: {e}"); - } - } + let mut client = EmbeddedSessionClient::spawn(EmbeddedSessionClientArgs { + auth_manager, + thread_manager: server, + codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), + config: config.clone(), + cli_overrides: Vec::new(), + feedback, }); - while let Ok(event) = thread.next_event().await { - let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete); - app_event_tx_clone.send(AppEvent::CodexEvent(event)); - if is_shutdown_complete { - // ShutdownComplete is terminal for a thread; drop this receiver task so - // the Arc can be released and thread resources can clean up. - break; + let mut next_request_id = 1_i64; + let start_request_id = next_request_id_value(&mut next_request_id); + let start_request = ClientRequest::ThreadStart { + request_id: start_request_id.clone(), + params: thread_start_params_from_config(&config), + }; + if let Err(err) = client.send_request(start_request).await { + let message = format!("Failed to initialize codex app-server session: {err}"); + tracing::error!("{message}"); + app_event_tx.send(AppEvent::FatalExitRequest(message)); + return; + } + + let mut thread_id: Option = None; + let mut pending = PendingServerRequests::default(); + + while thread_id.is_none() { + let Some(message) = client.recv().await else { + let message = + "Embedded app-server session closed before thread/start completed".to_string(); + tracing::error!("{message}"); + app_event_tx.send(AppEvent::FatalExitRequest(message)); + return; + }; + if handle_app_server_message( + &app_event_tx, + &start_request_id, + Some(&mut thread_id), + &mut pending, + message, + ) { + return; + } + } + + let Some(thread_id) = thread_id else { + let message = "thread/start did not return a thread id".to_string(); + tracing::error!("{message}"); + app_event_tx.send(AppEvent::FatalExitRequest(message)); + return; + }; + + loop { + tokio::select! { + maybe_op = codex_op_rx.recv() => { + let Some(op) = maybe_op else { + break; + }; + + let handled = match try_handle_server_request_reply_op(&mut client, &mut pending, &op).await { + Ok(handled) => handled, + Err(err) => { + tracing::error!("failed to answer app-server server request: {err}"); + false + } + }; + if handled { + continue; + } + if is_server_request_reply_op(&op) { + tracing::warn!("dropping reply op without pending app-server request: {op:?}"); + continue; + } + + if let Err(err) = submit_thread_op(&client, &thread_id, &op, &mut next_request_id).await { + tracing::error!("failed to submit op via app-server: {err}"); + } + } + maybe_message = client.recv() => { + let Some(message) = maybe_message else { + break; + }; + if handle_app_server_message( + &app_event_tx, + &start_request_id, + None, + &mut pending, + message, + ) { + break; + } + } } } }); @@ -75,6 +154,254 @@ pub(crate) fn spawn_agent( codex_op_tx } +fn handle_app_server_message( + app_event_tx: &AppEventSender, + start_request_id: &AppServerRequestId, + thread_id: Option<&mut Option>, + pending: &mut PendingServerRequests, + message: JSONRPCMessage, +) -> bool { + match message { + JSONRPCMessage::Notification(notification) => { + if let Some(event) = decode_raw_codex_event_notification(notification) { + let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete); + app_event_tx.send(AppEvent::CodexEvent(event)); + if is_shutdown_complete { + return true; + } + } + } + JSONRPCMessage::Request(request) => { + if let Ok(server_request) = ServerRequest::try_from(request) { + match server_request { + ServerRequest::CommandExecutionRequestApproval { request_id, params } => { + let key = params.approval_id.unwrap_or(params.item_id); + pending.exec_approval_by_id.insert(key, request_id); + } + ServerRequest::FileChangeRequestApproval { request_id, params } => { + pending + .patch_approval_by_item_id + .insert(params.item_id, request_id); + } + ServerRequest::ToolRequestUserInput { request_id, params } => { + pending + .user_input_by_turn_id + .insert(params.turn_id, request_id); + } + ServerRequest::DynamicToolCall { request_id, params } => { + pending + .dynamic_tool_by_call_id + .insert(params.call_id, request_id); + } + _ => {} + } + } + } + JSONRPCMessage::Response(response) => { + if &response.id == start_request_id { + match serde_json::from_value::(response.result) { + Ok(parsed) => { + if let Some(thread_id) = thread_id { + *thread_id = Some(parsed.thread.id); + } + } + Err(err) => { + tracing::error!("failed to decode thread/start response: {err}"); + } + } + } + } + JSONRPCMessage::Error(error) => { + if &error.id == start_request_id { + let message = format!( + "thread/start failed: code={} message={}", + error.error.code, error.error.message + ); + tracing::error!("{message}"); + app_event_tx.send(AppEvent::FatalExitRequest(message)); + return true; + } + } + } + + false +} + +fn decode_raw_codex_event_notification( + notification: codex_app_server_protocol::JSONRPCNotification, +) -> Option { + if !notification.method.starts_with("codex/event/") { + return None; + } + let params = notification.params?; + match serde_json::from_value::(params) { + Ok(event) => Some(event), + Err(err) => { + tracing::warn!("failed to decode raw codex event notification: {err}"); + None + } + } +} + +fn thread_start_params_from_config(config: &Config) -> ThreadStartParams { + ThreadStartParams { + model: config.model.clone(), + model_provider: Some(config.model_provider_id.clone()), + cwd: Some(config.cwd.display().to_string()), + approval_policy: Some((*config.permissions.approval_policy.get()).into()), + personality: config.personality, + experimental_raw_events: true, + ..Default::default() + } +} + +async fn submit_thread_op( + client: &EmbeddedSessionClient, + thread_id: &str, + op: &Op, + next_request_id: &mut i64, +) -> std::io::Result<()> { + let request = ClientRequest::ThreadSubmitOp { + request_id: next_request_id_value(next_request_id), + params: ThreadSubmitOpParams { + thread_id: thread_id.to_string(), + op: serde_json::to_value(op).map_err(std::io::Error::other)?, + }, + }; + client.send_request(request).await +} + +fn next_request_id_value(next_request_id: &mut i64) -> AppServerRequestId { + let value = *next_request_id; + *next_request_id += 1; + AppServerRequestId::Integer(value) +} + +fn is_server_request_reply_op(op: &Op) -> bool { + matches!( + op, + Op::ExecApproval { .. } + | Op::PatchApproval { .. } + | Op::UserInputAnswer { .. } + | Op::DynamicToolResponse { .. } + ) +} + +async fn try_handle_server_request_reply_op( + client: &mut EmbeddedSessionClient, + pending: &mut PendingServerRequests, + op: &Op, +) -> std::io::Result { + match op { + Op::ExecApproval { id, decision, .. } => { + if let Some(request_id) = pending.exec_approval_by_id.remove(id) { + let response = CommandExecutionRequestApprovalResponse { + decision: map_exec_approval_decision(decision.clone()), + }; + send_jsonrpc_response(client, request_id, response).await?; + return Ok(true); + } + } + Op::PatchApproval { id, decision } => { + if let Some(request_id) = pending.patch_approval_by_item_id.remove(id) { + let response = FileChangeRequestApprovalResponse { + decision: map_file_change_approval_decision(decision.clone()), + }; + send_jsonrpc_response(client, request_id, response).await?; + return Ok(true); + } + } + Op::UserInputAnswer { id, response } => { + if let Some(request_id) = pending.user_input_by_turn_id.remove(id) { + let response = ToolRequestUserInputResponse { + answers: response + .answers + .iter() + .map(|(question_id, answer)| { + ( + question_id.clone(), + ToolRequestUserInputAnswer { + answers: answer.answers.clone(), + }, + ) + }) + .collect(), + }; + send_jsonrpc_response(client, request_id, response).await?; + return Ok(true); + } + } + Op::DynamicToolResponse { id, response } => { + if let Some(request_id) = pending.dynamic_tool_by_call_id.remove(id) { + let response = v2_dynamic_tool_response_from_core(response.clone()); + send_jsonrpc_response(client, request_id, response).await?; + return Ok(true); + } + } + _ => {} + } + + Ok(false) +} + +async fn send_jsonrpc_response( + client: &mut EmbeddedSessionClient, + request_id: AppServerRequestId, + response: T, +) -> std::io::Result<()> { + let result = serde_json::to_value(response).map_err(std::io::Error::other)?; + client + .send_response(JSONRPCResponse { + id: request_id, + result, + }) + .await +} + +fn map_exec_approval_decision(decision: ReviewDecision) -> CommandExecutionApprovalDecision { + match decision { + ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept, + ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession, + ReviewDecision::ApprovedExecpolicyAmendment { + proposed_execpolicy_amendment, + } => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment { + execpolicy_amendment: proposed_execpolicy_amendment.into(), + }, + ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline, + ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel, + } +} + +fn map_file_change_approval_decision(decision: ReviewDecision) -> FileChangeApprovalDecision { + match decision { + ReviewDecision::Approved => FileChangeApprovalDecision::Accept, + ReviewDecision::ApprovedForSession => FileChangeApprovalDecision::AcceptForSession, + ReviewDecision::Denied => FileChangeApprovalDecision::Decline, + ReviewDecision::Abort => FileChangeApprovalDecision::Cancel, + ReviewDecision::ApprovedExecpolicyAmendment { .. } => FileChangeApprovalDecision::Accept, + } +} + +fn v2_dynamic_tool_response_from_core( + response: CoreDynamicToolResponse, +) -> V2DynamicToolCallResponse { + V2DynamicToolCallResponse { + content_items: response + .content_items + .into_iter() + .map(|item| match item { + CoreDynamicToolCallOutputContentItem::InputText { text } => { + V2DynamicToolCallOutputContentItem::InputText { text } + } + CoreDynamicToolCallOutputContentItem::InputImage { image_url } => { + V2DynamicToolCallOutputContentItem::InputImage { image_url } + } + }) + .collect(), + success: response.success, + } +} + /// Spawn agent loops for an existing thread (e.g., a forked thread). /// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent /// events and accepts Ops for submission.