mirror of
https://github.com/openai/codex.git
synced 2026-04-21 23:12:19 +03:00
Compare commits
15 Commits
codex-debu
...
nornagon/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75ba0d9e89 | ||
|
|
3117991c02 | ||
|
|
5a4834748f | ||
|
|
4cb61784a5 | ||
|
|
6674c6c3e4 | ||
|
|
8ec1ea16d3 | ||
|
|
83745d1be2 | ||
|
|
6cda3d13ab | ||
|
|
6a2e5e19d8 | ||
|
|
5572ffd40e | ||
|
|
3fffcb5542 | ||
|
|
2ffa009070 | ||
|
|
5ea08d7150 | ||
|
|
fad99960c3 | ||
|
|
cacf8c91bb |
@@ -199,7 +199,7 @@ impl ModelClient {
|
||||
input: &input_with_instructions,
|
||||
tools: &tools_json,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: false,
|
||||
parallel_tool_calls: true,
|
||||
reasoning,
|
||||
store,
|
||||
stream: true,
|
||||
|
||||
@@ -732,7 +732,7 @@ impl Session {
|
||||
|
||||
async fn on_exec_command_end(
|
||||
&self,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
_turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: &str,
|
||||
call_id: &str,
|
||||
output: &ExecToolCallOutput,
|
||||
@@ -775,20 +775,6 @@ impl Session {
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
|
||||
// If this is an apply_patch, after we emit the end patch, emit a second event
|
||||
// with the full turn diff if there is one.
|
||||
if is_apply_patch {
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||||
let event = Event {
|
||||
id: sub_id.into(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Runs the exec tool call and emits events for the begin and end of the
|
||||
/// command even on error.
|
||||
@@ -1695,7 +1681,31 @@ async fn try_run_turn(
|
||||
let mut stream = turn_context.client.clone().stream(&prompt).await?;
|
||||
|
||||
let mut output = Vec::new();
|
||||
|
||||
// Stage tool calls to execute after the model signals completion.
|
||||
// We do this so multiple tool calls can run in parallel (where possible)
|
||||
// and so we don't block SSE processing.
|
||||
enum StagedCall {
|
||||
Exec {
|
||||
item: ResponseItem,
|
||||
name: String,
|
||||
args: String,
|
||||
call_id: String,
|
||||
},
|
||||
LocalShell {
|
||||
item: ResponseItem,
|
||||
id: Option<String>,
|
||||
call_id: Option<String>,
|
||||
action: LocalShellAction,
|
||||
},
|
||||
Mcp {
|
||||
item: ResponseItem,
|
||||
server: String,
|
||||
tool: String,
|
||||
args: String,
|
||||
call_id: String,
|
||||
},
|
||||
}
|
||||
let mut staged: Vec<StagedCall> = Vec::new();
|
||||
loop {
|
||||
// Poll the next item from the model stream. We must inspect *both* Ok and Err
|
||||
// cases so that transient stream failures (e.g., dropped SSE connection before
|
||||
@@ -1722,15 +1732,59 @@ async fn try_run_turn(
|
||||
match event {
|
||||
ResponseEvent::Created => {}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let response = handle_response_item(
|
||||
sess,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
item.clone(),
|
||||
)
|
||||
.await?;
|
||||
output.push(ProcessedResponseItem { item, response });
|
||||
match &item {
|
||||
ResponseItem::FunctionCall {
|
||||
name,
|
||||
arguments,
|
||||
call_id,
|
||||
..
|
||||
} => {
|
||||
// Classify as MCP vs Exec and stage execution for after completion.
|
||||
if let Some((server, tool)) =
|
||||
sess.mcp_connection_manager.parse_tool_name(name)
|
||||
{
|
||||
staged.push(StagedCall::Mcp {
|
||||
item: item.clone(),
|
||||
server,
|
||||
tool,
|
||||
args: arguments.clone(),
|
||||
call_id: call_id.clone(),
|
||||
});
|
||||
} else {
|
||||
staged.push(StagedCall::Exec {
|
||||
item: item.clone(),
|
||||
name: name.clone(),
|
||||
args: arguments.clone(),
|
||||
call_id: call_id.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
ResponseItem::LocalShellCall {
|
||||
id,
|
||||
call_id,
|
||||
action,
|
||||
..
|
||||
} => {
|
||||
staged.push(StagedCall::LocalShell {
|
||||
item: item.clone(),
|
||||
id: id.clone(),
|
||||
call_id: call_id.clone(),
|
||||
action: action.clone(),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
// Non-tool items are handled immediately (messages, reasoning, deltas).
|
||||
let response = handle_response_item(
|
||||
sess,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
item.clone(),
|
||||
)
|
||||
.await?;
|
||||
output.push(ProcessedResponseItem { item, response });
|
||||
}
|
||||
}
|
||||
}
|
||||
ResponseEvent::WebSearchCallBegin { call_id, query } => {
|
||||
let q = query.unwrap_or_else(|| "Searching Web...".to_string());
|
||||
@@ -1756,6 +1810,165 @@ async fn try_run_turn(
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Execute any staged tool calls before returning the turn output.
|
||||
use futures::future::join_all;
|
||||
|
||||
// Run MCP, LocalShell, and non-apply exec tool calls fully in parallel.
|
||||
let mut mcp_futs = Vec::new();
|
||||
let mut local_futs = Vec::new();
|
||||
let mut exec_parallel_futs = Vec::new();
|
||||
// Keep apply_patch exec calls sequential for diff tracking.
|
||||
let mut exec_sequential_calls = Vec::new();
|
||||
|
||||
for sc in staged {
|
||||
match sc {
|
||||
StagedCall::Mcp {
|
||||
item,
|
||||
server,
|
||||
tool,
|
||||
args,
|
||||
call_id,
|
||||
} => {
|
||||
let fut = handle_mcp_tool_call(
|
||||
sess,
|
||||
sub_id,
|
||||
call_id.clone(),
|
||||
server,
|
||||
tool,
|
||||
args,
|
||||
None,
|
||||
);
|
||||
mcp_futs.push(async move {
|
||||
let response = fut.await;
|
||||
ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
}
|
||||
});
|
||||
}
|
||||
StagedCall::LocalShell {
|
||||
item,
|
||||
id,
|
||||
call_id,
|
||||
action,
|
||||
} => {
|
||||
let effective_call_id = match (call_id, id) {
|
||||
(Some(call_id), _) => call_id,
|
||||
(None, Some(id)) => id,
|
||||
(None, None) => String::new(),
|
||||
};
|
||||
let LocalShellAction::Exec(action) = action;
|
||||
let params = ShellToolCallParams {
|
||||
command: action.command,
|
||||
workdir: action.working_directory,
|
||||
timeout_ms: action.timeout_ms,
|
||||
with_escalated_permissions: None,
|
||||
justification: None,
|
||||
};
|
||||
let exec_params = to_exec_params(params, turn_context);
|
||||
let fut = async move {
|
||||
// Use a fresh tracker – non-apply_patch calls won't emit diffs anyway.
|
||||
let mut tdt = TurnDiffTracker::new();
|
||||
let response = handle_container_exec_with_params(
|
||||
exec_params,
|
||||
sess,
|
||||
turn_context,
|
||||
&mut tdt,
|
||||
sub_id.to_string(),
|
||||
effective_call_id,
|
||||
)
|
||||
.await;
|
||||
ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
}
|
||||
};
|
||||
local_futs.push(fut);
|
||||
}
|
||||
StagedCall::Exec {
|
||||
item,
|
||||
name,
|
||||
args,
|
||||
call_id,
|
||||
} => {
|
||||
if name == "apply_patch" {
|
||||
exec_sequential_calls.push(StagedCall::Exec {
|
||||
item,
|
||||
name,
|
||||
args,
|
||||
call_id,
|
||||
});
|
||||
} else {
|
||||
let fut = async move {
|
||||
let response = match parse_container_exec_arguments(
|
||||
args,
|
||||
turn_context,
|
||||
&call_id,
|
||||
) {
|
||||
Ok(params) => {
|
||||
let mut tdt = TurnDiffTracker::new();
|
||||
handle_container_exec_with_params(
|
||||
params,
|
||||
sess,
|
||||
turn_context,
|
||||
&mut tdt,
|
||||
sub_id.to_string(),
|
||||
call_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(output) => *output,
|
||||
};
|
||||
ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
}
|
||||
};
|
||||
exec_parallel_futs.push(fut);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut processed: Vec<ProcessedResponseItem> = join_all(mcp_futs).await;
|
||||
processed.extend(join_all(local_futs).await);
|
||||
processed.extend(join_all(exec_parallel_futs).await);
|
||||
|
||||
// Now handle exec/shell calls one by one so we can use the turn_diff_tracker safely.
|
||||
for sc in exec_sequential_calls {
|
||||
match sc {
|
||||
StagedCall::Exec {
|
||||
item,
|
||||
name,
|
||||
args,
|
||||
call_id,
|
||||
} => {
|
||||
// Reuse existing logic for exec tools.
|
||||
let response = handle_function_call(
|
||||
sess,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id.to_string(),
|
||||
name,
|
||||
args,
|
||||
call_id,
|
||||
)
|
||||
.await;
|
||||
processed.push(ProcessedResponseItem {
|
||||
item,
|
||||
response: Some(response),
|
||||
});
|
||||
}
|
||||
StagedCall::LocalShell { .. } => {}
|
||||
StagedCall::Mcp { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve original ordering: non-tool items were already pushed to `output` as they arrived;
|
||||
// now append the processed tool-call items in their staged order.
|
||||
output.extend(processed);
|
||||
|
||||
// Emit a single unified diff for the entire batch if there is one.
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||||
|
||||
115
codex-rs/core/tests/suite/parallel_tool_calls.rs
Normal file
115
codex-rs/core/tests/suite/parallel_tool_calls.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
#![cfg(unix)]
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_core::built_in_model_providers;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_login::CodexAuth;
|
||||
use core_test_support::load_sse_fixture_with_id_from_str;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn build_parallel_exec_sse() -> String {
|
||||
let item1 = serde_json::json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "local_shell_call",
|
||||
"call_id": "c1",
|
||||
"status": "in_progress",
|
||||
"action": {
|
||||
"type": "exec",
|
||||
"command": ["/bin/sh", "-c", "echo A"],
|
||||
"timeout_ms": 3000,
|
||||
"working_directory": null,
|
||||
"env": null,
|
||||
"user": null
|
||||
}
|
||||
}
|
||||
});
|
||||
let item2 = serde_json::json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "local_shell_call",
|
||||
"call_id": "c2",
|
||||
"status": "in_progress",
|
||||
"action": {
|
||||
"type": "exec",
|
||||
"command": ["/bin/sh", "-c", "echo B"],
|
||||
"timeout_ms": 3000,
|
||||
"working_directory": null,
|
||||
"env": null,
|
||||
"user": null
|
||||
}
|
||||
}
|
||||
});
|
||||
let completed = serde_json::json!({
|
||||
"type": "response.completed",
|
||||
"response": { "id": "__ID__" }
|
||||
});
|
||||
|
||||
let raw = serde_json::json!([
|
||||
item1, item2, completed
|
||||
])
|
||||
.to_string();
|
||||
load_sse_fixture_with_id_from_str(&raw, "resp_parallel")
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn local_shell_calls_begin_before_any_end_and_run_concurrently() {
|
||||
let tmp_sse = tempfile::NamedTempFile::new().expect("tmp sse");
|
||||
std::fs::write(tmp_sse.path(), build_parallel_exec_sse()).expect("write sse");
|
||||
std::env::set_var("CODEX_RS_SSE_FIXTURE", tmp_sse.path());
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut config = core_test_support::load_default_config_for_test(&home);
|
||||
let providers = built_in_model_providers();
|
||||
config.model_provider = providers
|
||||
.get("openai")
|
||||
.expect("builtin provider")
|
||||
.clone();
|
||||
|
||||
let cm = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
|
||||
let conv = cm
|
||||
.new_conversation(config)
|
||||
.await
|
||||
.expect("spawn conversation")
|
||||
.conversation;
|
||||
|
||||
conv
|
||||
.submit(codex_core::protocol::Op::UserInput {
|
||||
items: vec![codex_core::protocol::InputItem::Text {
|
||||
text: "go".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.expect("submit");
|
||||
|
||||
let mut begins = 0usize;
|
||||
let mut ends = 0usize;
|
||||
let mut seen_first_end = false;
|
||||
|
||||
use tokio::time::{sleep, Duration as TokioDuration};
|
||||
let deadline = Instant::now() + TokioDuration::from_secs(5);
|
||||
|
||||
while Instant::now() < deadline {
|
||||
let ev = conv.next_event().await.expect("event");
|
||||
match ev.msg {
|
||||
EventMsg::ExecCommandBegin(_) => {
|
||||
begins += 1;
|
||||
}
|
||||
EventMsg::ExecCommandEnd(_) => {
|
||||
if !seen_first_end {
|
||||
assert_eq!(begins, 2, "expected both begins before first end");
|
||||
seen_first_end = true;
|
||||
}
|
||||
ends += 1;
|
||||
}
|
||||
EventMsg::TaskComplete(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
sleep(TokioDuration::from_millis(5)).await;
|
||||
}
|
||||
|
||||
assert_eq!(begins, 2, "expected two parallel exec begins");
|
||||
assert_eq!(ends, 2, "expected two exec completions");
|
||||
}
|
||||
@@ -42,6 +42,7 @@ use ratatui::layout::Layout;
|
||||
use ratatui::layout::Rect;
|
||||
use ratatui::widgets::Widget;
|
||||
use ratatui::widgets::WidgetRef;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -59,6 +60,7 @@ use crate::history_cell::CommandOutput;
|
||||
use crate::history_cell::ExecCell;
|
||||
use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::PatchEventType;
|
||||
use crate::history_cell::RunningMcpCell;
|
||||
use crate::slash_command::SlashCommand;
|
||||
use crate::tui::FrameRequester;
|
||||
// streaming internals are provided by crate::streaming and crate::markdown_stream
|
||||
@@ -79,6 +81,8 @@ use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_core::protocol_config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_file_search::FileMatch;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use uuid::Uuid;
|
||||
|
||||
// Track information about an in-flight exec command.
|
||||
@@ -99,7 +103,8 @@ pub(crate) struct ChatWidget {
|
||||
// Stream lifecycle controller
|
||||
stream: StreamController,
|
||||
running_commands: HashMap<String, RunningCommand>,
|
||||
pending_exec_completions: Vec<(Vec<String>, Vec<ParsedCommand>, CommandOutput)>,
|
||||
running_mcp: HashMap<String, Arc<AtomicBool>>,
|
||||
pending_exec_completions: Vec<(Vec<String>, Vec<ParsedCommand>, CommandOutput, Duration)>,
|
||||
task_complete_pending: bool,
|
||||
// Queue of interruptive UI events deferred during an active write cycle
|
||||
interrupts: InterruptManager,
|
||||
@@ -417,6 +422,12 @@ impl ChatWidget {
|
||||
Some(rc) => (rc.command, rc.parsed_cmd),
|
||||
None => (vec![ev.call_id.clone()], Vec::new()),
|
||||
};
|
||||
// Flip the corresponding spinner to a final mark immediately in the active cell.
|
||||
if let Some(active) = self.active_exec_cell.as_mut() {
|
||||
let success = ev.exit_code == 0;
|
||||
active.mark_segment_complete(&ev.call_id, success);
|
||||
self.request_redraw();
|
||||
}
|
||||
self.pending_exec_completions.push((
|
||||
command,
|
||||
parsed,
|
||||
@@ -426,19 +437,20 @@ impl ChatWidget {
|
||||
stderr: ev.stderr.clone(),
|
||||
formatted_output: ev.formatted_output.clone(),
|
||||
},
|
||||
ev.duration,
|
||||
));
|
||||
|
||||
if self.running_commands.is_empty() {
|
||||
self.active_exec_cell = None;
|
||||
let pending = std::mem::take(&mut self.pending_exec_completions);
|
||||
for (command, parsed, output) in pending {
|
||||
for (command, parsed, output, duration) in pending {
|
||||
let include_header = !self.last_history_was_exec;
|
||||
let cell = history_cell::new_completed_exec_command(
|
||||
command,
|
||||
parsed,
|
||||
output,
|
||||
include_header,
|
||||
ev.duration,
|
||||
duration,
|
||||
);
|
||||
self.add_to_history(cell);
|
||||
self.last_history_was_exec = true;
|
||||
@@ -501,14 +513,18 @@ impl ChatWidget {
|
||||
// Accumulate parsed commands into a single active Exec cell so they stack
|
||||
match self.active_exec_cell.as_mut() {
|
||||
Some(exec) => {
|
||||
let start_idx = exec.parsed.len();
|
||||
let added = ev.parsed_cmd.len();
|
||||
exec.parsed.extend(ev.parsed_cmd);
|
||||
exec.push_segment(ev.call_id.clone(), start_idx, added);
|
||||
}
|
||||
_ => {
|
||||
let include_header = !self.last_history_was_exec;
|
||||
self.active_exec_cell = Some(history_cell::new_active_exec_command(
|
||||
self.active_exec_cell = Some(history_cell::ExecCell::with_initial_segment(
|
||||
ev.command,
|
||||
ev.parsed_cmd,
|
||||
include_header,
|
||||
ev.call_id.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -519,10 +535,17 @@ impl ChatWidget {
|
||||
|
||||
pub(crate) fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
self.add_to_history(history_cell::new_active_mcp_tool_call(ev.invocation));
|
||||
// Track running state so the spinner stops when the call ends.
|
||||
let is_running = Arc::new(AtomicBool::new(true));
|
||||
self.running_mcp
|
||||
.insert(ev.call_id.clone(), Arc::clone(&is_running));
|
||||
self.add_boxed_history(Box::new(RunningMcpCell::new(ev.invocation, is_running)));
|
||||
}
|
||||
pub(crate) fn handle_mcp_end_now(&mut self, ev: McpToolCallEndEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
if let Some(flag) = self.running_mcp.remove(&ev.call_id) {
|
||||
flag.store(false, Ordering::Relaxed);
|
||||
}
|
||||
self.add_boxed_history(history_cell::new_completed_mcp_tool_call(
|
||||
80,
|
||||
ev.invocation,
|
||||
@@ -591,6 +614,7 @@ impl ChatWidget {
|
||||
last_token_usage: TokenUsage::default(),
|
||||
stream: StreamController::new(config),
|
||||
running_commands: HashMap::new(),
|
||||
running_mcp: HashMap::new(),
|
||||
pending_exec_completions: Vec::new(),
|
||||
task_complete_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
@@ -636,6 +660,7 @@ impl ChatWidget {
|
||||
last_token_usage: TokenUsage::default(),
|
||||
stream: StreamController::new(config),
|
||||
running_commands: HashMap::new(),
|
||||
running_mcp: HashMap::new(),
|
||||
pending_exec_completions: Vec::new(),
|
||||
task_complete_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests.rs
|
||||
expression: s
|
||||
---
|
||||
>_
|
||||
✓ ⌨️ echo A
|
||||
⠋ ⌨️ echo B
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests.rs
|
||||
expression: s
|
||||
---
|
||||
>_
|
||||
⠋ ⌨️ echo one
|
||||
⠋ ⌨️ echo two
|
||||
@@ -175,6 +175,7 @@ fn make_chatwidget_manual() -> (
|
||||
last_token_usage: TokenUsage::default(),
|
||||
stream: StreamController::new(cfg),
|
||||
running_commands: HashMap::new(),
|
||||
running_mcp: HashMap::new(),
|
||||
pending_exec_completions: Vec::new(),
|
||||
task_complete_pending: false,
|
||||
interrupts: InterruptManager::new(),
|
||||
@@ -214,6 +215,106 @@ fn lines_to_single_string(lines: &[ratatui::text::Line<'static>]) -> String {
|
||||
s
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn active_exec_segments_render_for_parallel_begins_snapshot() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
// Begin first command
|
||||
chat.handle_codex_event(Event {
|
||||
id: "call-1".into(),
|
||||
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id: "call-1".into(),
|
||||
command: vec!["bash".into(), "-lc".into(), "echo one".into()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
|
||||
parsed_cmd: vec![
|
||||
codex_core::parse_command::ParsedCommand::Unknown {
|
||||
cmd: "echo one".into(),
|
||||
}
|
||||
.into(),
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
// Begin a second command (parallel segment)
|
||||
chat.handle_codex_event(Event {
|
||||
id: "call-2".into(),
|
||||
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id: "call-2".into(),
|
||||
command: vec!["bash".into(), "-lc".into(), "echo two".into()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
|
||||
parsed_cmd: vec![
|
||||
codex_core::parse_command::ParsedCommand::Unknown {
|
||||
cmd: "echo two".into(),
|
||||
}
|
||||
.into(),
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
// Snapshot the active exec cell (shows spinners for both segments)
|
||||
let cell = chat.active_exec_cell.as_ref().expect("active exec cell");
|
||||
let s = lines_to_single_string(&cell.display_lines());
|
||||
assert_snapshot!(s);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn active_exec_segments_mark_completion_in_place_snapshot() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
// Begin two commands
|
||||
chat.handle_codex_event(Event {
|
||||
id: "call-a".into(),
|
||||
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id: "call-a".into(),
|
||||
command: vec!["bash".into(), "-lc".into(), "echo A".into()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
|
||||
parsed_cmd: vec![
|
||||
codex_core::parse_command::ParsedCommand::Unknown {
|
||||
cmd: "echo A".into(),
|
||||
}
|
||||
.into(),
|
||||
],
|
||||
}),
|
||||
});
|
||||
chat.handle_codex_event(Event {
|
||||
id: "call-b".into(),
|
||||
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id: "call-b".into(),
|
||||
command: vec!["bash".into(), "-lc".into(), "echo B".into()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
|
||||
parsed_cmd: vec![
|
||||
codex_core::parse_command::ParsedCommand::Unknown {
|
||||
cmd: "echo B".into(),
|
||||
}
|
||||
.into(),
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
// Complete first command successfully; second still running
|
||||
chat.handle_codex_event(Event {
|
||||
id: "call-a".into(),
|
||||
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||
call_id: "call-a".into(),
|
||||
stdout: "A".into(),
|
||||
stderr: String::new(),
|
||||
aggregated_output: "A".into(),
|
||||
exit_code: 0,
|
||||
duration: std::time::Duration::from_millis(5),
|
||||
formatted_output: "A".into(),
|
||||
}),
|
||||
});
|
||||
|
||||
// Drain any history insertions (none expected yet) and snapshot active cell
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
let cell = chat
|
||||
.active_exec_cell
|
||||
.as_ref()
|
||||
.expect("active exec cell after first end");
|
||||
let s = lines_to_single_string(&cell.display_lines());
|
||||
assert_snapshot!(s);
|
||||
}
|
||||
|
||||
fn open_fixture(name: &str) -> std::fs::File {
|
||||
// 1) Prefer fixtures within this crate
|
||||
{
|
||||
|
||||
@@ -36,6 +36,9 @@ use ratatui::widgets::Wrap;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::error;
|
||||
@@ -107,16 +110,26 @@ pub(crate) struct ExecCell {
|
||||
start_time: Option<Instant>,
|
||||
duration: Option<Duration>,
|
||||
include_header: bool,
|
||||
// When present, this ExecCell represents an active, possibly multi-call
|
||||
// exec group. Each segment maps a contiguous slice of `parsed` to a
|
||||
// specific call_id with its own running/completed state.
|
||||
pub(crate) segments: Option<Vec<ExecSegment>>,
|
||||
}
|
||||
impl HistoryCell for ExecCell {
|
||||
fn display_lines(&self) -> Vec<Line<'static>> {
|
||||
exec_command_lines(
|
||||
&self.command,
|
||||
&self.parsed,
|
||||
self.output.as_ref(),
|
||||
self.start_time,
|
||||
self.include_header,
|
||||
)
|
||||
// If this cell is tracking active segments (parallel calls), render
|
||||
// per-segment markers so completed calls show ✓ while others spin.
|
||||
if let Some(segments) = &self.segments {
|
||||
render_active_exec_with_segments(&self.parsed, segments, self.include_header)
|
||||
} else {
|
||||
exec_command_lines(
|
||||
&self.command,
|
||||
&self.parsed,
|
||||
self.output.as_ref(),
|
||||
self.start_time,
|
||||
self.include_header,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn transcript_lines(&self) -> Vec<Line<'static>> {
|
||||
@@ -273,6 +286,7 @@ pub(crate) fn new_user_prompt(message: String) -> PlainHistoryCell {
|
||||
PlainHistoryCell { lines }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_active_exec_command(
|
||||
command: Vec<String>,
|
||||
parsed: Vec<ParsedCommand>,
|
||||
@@ -285,6 +299,7 @@ pub(crate) fn new_active_exec_command(
|
||||
start_time: Some(Instant::now()),
|
||||
duration: None,
|
||||
include_header,
|
||||
segments: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,9 +317,142 @@ pub(crate) fn new_completed_exec_command(
|
||||
start_time: None,
|
||||
duration: Some(duration),
|
||||
include_header,
|
||||
segments: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ExecSegment {
|
||||
pub(crate) call_id: String,
|
||||
pub(crate) start: usize,
|
||||
pub(crate) len: usize,
|
||||
pub(crate) started_at: Instant,
|
||||
pub(crate) completed_success: Option<bool>,
|
||||
}
|
||||
|
||||
impl ExecCell {
|
||||
pub(crate) fn with_initial_segment(
|
||||
command: Vec<String>,
|
||||
parsed: Vec<ParsedCommand>,
|
||||
include_header: bool,
|
||||
call_id: String,
|
||||
) -> ExecCell {
|
||||
let start = 0usize;
|
||||
let len = parsed.len();
|
||||
ExecCell {
|
||||
command,
|
||||
parsed,
|
||||
output: None,
|
||||
start_time: Some(Instant::now()),
|
||||
duration: None,
|
||||
include_header,
|
||||
segments: Some(vec![ExecSegment {
|
||||
call_id,
|
||||
start,
|
||||
len,
|
||||
started_at: Instant::now(),
|
||||
completed_success: None,
|
||||
}]),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_segment(&mut self, call_id: String, start: usize, len: usize) {
|
||||
if let Some(segs) = &mut self.segments {
|
||||
segs.push(ExecSegment {
|
||||
call_id,
|
||||
start,
|
||||
len,
|
||||
started_at: Instant::now(),
|
||||
completed_success: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn mark_segment_complete(&mut self, call_id: &str, success: bool) {
|
||||
if let Some(segs) = &mut self.segments
|
||||
&& let Some(seg) = segs.iter_mut().find(|s| s.call_id == call_id)
|
||||
{
|
||||
seg.completed_success = Some(success);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parsed_command_label(parsed: &ParsedCommand) -> String {
|
||||
match parsed {
|
||||
ParsedCommand::Read { name, .. } => format!("📖 {name}"),
|
||||
ParsedCommand::ListFiles { cmd, path } => match path {
|
||||
Some(p) => format!("📂 {p}"),
|
||||
None => format!("📂 {cmd}"),
|
||||
},
|
||||
ParsedCommand::Search { query, path, cmd } => match (query, path) {
|
||||
(Some(q), Some(p)) => format!("🔎 {q} in {p}"),
|
||||
(Some(q), None) => format!("🔎 {q}"),
|
||||
(None, Some(p)) => format!("🔎 {p}"),
|
||||
(None, None) => format!("🔎 {cmd}"),
|
||||
},
|
||||
ParsedCommand::Format { .. } => "✨ Formatting".to_string(),
|
||||
ParsedCommand::Test { cmd } => format!("🧪 {cmd}"),
|
||||
ParsedCommand::Lint { cmd, .. } => format!("🧹 {cmd}"),
|
||||
ParsedCommand::Unknown { cmd } => format!("⌨️ {cmd}"),
|
||||
ParsedCommand::Noop { cmd } => format!("🔄 {cmd}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn render_parsed_commands_with_marker(
|
||||
parsed_commands: &[ParsedCommand],
|
||||
include_header: bool,
|
||||
mut marker_for_idx: impl FnMut(usize) -> Span<'static>,
|
||||
) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line> = Vec::new();
|
||||
if include_header {
|
||||
lines.push(Line::from(""));
|
||||
lines.push(Line::from(">_".magenta()));
|
||||
}
|
||||
for (idx, parsed) in parsed_commands.iter().enumerate() {
|
||||
let text = parsed_command_label(parsed);
|
||||
for (j, line_text) in text.lines().enumerate() {
|
||||
if j == 0 {
|
||||
lines.push(Line::from(vec![
|
||||
" ".into(),
|
||||
marker_for_idx(idx),
|
||||
" ".into(),
|
||||
line_text.to_string().light_blue(),
|
||||
]));
|
||||
} else {
|
||||
lines.push(Line::from(vec![
|
||||
" ".into(),
|
||||
line_text.to_string().light_blue(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
}
|
||||
lines
|
||||
}
|
||||
|
||||
fn render_active_exec_with_segments(
|
||||
parsed_commands: &[ParsedCommand],
|
||||
segments: &[ExecSegment],
|
||||
include_header: bool,
|
||||
) -> Vec<Line<'static>> {
|
||||
let seg_for = |idx: usize| -> Option<&ExecSegment> {
|
||||
segments
|
||||
.iter()
|
||||
.find(|s| idx >= s.start && idx < s.start + s.len)
|
||||
};
|
||||
render_parsed_commands_with_marker(parsed_commands, include_header, |idx| match seg_for(idx) {
|
||||
Some(seg) => match seg.completed_success {
|
||||
Some(true) => "✓".green(),
|
||||
Some(false) => "✗".red(),
|
||||
None => {
|
||||
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
|
||||
let i = ((seg.started_at.elapsed().as_millis() / 100) as usize) % FRAMES.len();
|
||||
Span::raw(format!("{}", FRAMES[i]))
|
||||
}
|
||||
},
|
||||
None => "?".into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn exec_command_lines(
|
||||
command: &[String],
|
||||
parsed: &[ParsedCommand],
|
||||
@@ -324,67 +472,22 @@ fn new_parsed_command(
|
||||
start_time: Option<Instant>,
|
||||
include_header: bool,
|
||||
) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line> = Vec::new();
|
||||
// Leading spacer and header line above command list
|
||||
if include_header {
|
||||
lines.push(Line::from(""));
|
||||
lines.push(Line::from(">_".magenta()));
|
||||
}
|
||||
|
||||
// Determine the leading status marker: spinner while running, ✓ on success, ✗ on failure.
|
||||
let status_marker: Span<'static> = match output {
|
||||
None => {
|
||||
// Animated braille spinner – choose frame based on elapsed time.
|
||||
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
|
||||
let idx = start_time
|
||||
.map(|st| ((st.elapsed().as_millis() / 100) as usize) % FRAMES.len())
|
||||
.unwrap_or(0);
|
||||
let ch = FRAMES[idx];
|
||||
Span::raw(format!("{ch}"))
|
||||
Span::raw(format!("{}", FRAMES[idx]))
|
||||
}
|
||||
Some(o) if o.exit_code == 0 => Span::styled("✓", Style::default().fg(Color::Green)),
|
||||
Some(_) => Span::styled("✗", Style::default().fg(Color::Red)),
|
||||
};
|
||||
|
||||
for parsed in parsed_commands.iter() {
|
||||
let text = match parsed {
|
||||
ParsedCommand::Read { name, .. } => format!("📖 {name}"),
|
||||
ParsedCommand::ListFiles { cmd, path } => match path {
|
||||
Some(p) => format!("📂 {p}"),
|
||||
None => format!("📂 {cmd}"),
|
||||
},
|
||||
ParsedCommand::Search { query, path, cmd } => match (query, path) {
|
||||
(Some(q), Some(p)) => format!("🔎 {q} in {p}"),
|
||||
(Some(q), None) => format!("🔎 {q}"),
|
||||
(None, Some(p)) => format!("🔎 {p}"),
|
||||
(None, None) => format!("🔎 {cmd}"),
|
||||
},
|
||||
ParsedCommand::Format { .. } => "✨ Formatting".to_string(),
|
||||
ParsedCommand::Test { cmd } => format!("🧪 {cmd}"),
|
||||
ParsedCommand::Lint { cmd, .. } => format!("🧹 {cmd}"),
|
||||
ParsedCommand::Unknown { cmd } => format!("⌨️ {cmd}"),
|
||||
ParsedCommand::Noop { cmd } => format!("🔄 {cmd}"),
|
||||
};
|
||||
// Prefix: two spaces, marker, space. Continuations align under the text block.
|
||||
for (j, line_text) in text.lines().enumerate() {
|
||||
if j == 0 {
|
||||
lines.push(Line::from(vec![
|
||||
" ".into(),
|
||||
status_marker.clone(),
|
||||
" ".into(),
|
||||
line_text.to_string().light_blue(),
|
||||
]));
|
||||
} else {
|
||||
lines.push(Line::from(vec![
|
||||
" ".into(),
|
||||
line_text.to_string().light_blue(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut lines = render_parsed_commands_with_marker(parsed_commands, include_header, |_| {
|
||||
status_marker.clone()
|
||||
});
|
||||
lines.extend(output_lines(output, true, false));
|
||||
|
||||
lines
|
||||
}
|
||||
|
||||
@@ -437,23 +540,72 @@ fn new_exec_command_generic(
|
||||
lines
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_active_mcp_tool_call(invocation: McpInvocation) -> PlainHistoryCell {
|
||||
// Backwards-compatible helper retained for call sites; create a non-animated cell.
|
||||
// Prefer `new_running_mcp_tool_call` for spinner support.
|
||||
let title_line = Line::from(vec!["tool".magenta(), " running...".dim()]);
|
||||
let lines: Vec<Line> = vec![
|
||||
Line::from(""),
|
||||
title_line,
|
||||
format_mcp_invocation(invocation.clone()),
|
||||
format_mcp_invocation(invocation),
|
||||
];
|
||||
|
||||
PlainHistoryCell { lines }
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RunningMcpCell {
|
||||
invocation: McpInvocation,
|
||||
start_time: Instant,
|
||||
is_running: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl RunningMcpCell {
|
||||
pub(crate) fn new(invocation: McpInvocation, is_running: Arc<AtomicBool>) -> Self {
|
||||
Self {
|
||||
invocation,
|
||||
start_time: Instant::now(),
|
||||
is_running,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// (second impl removed; defined above)
|
||||
|
||||
pub(crate) fn new_web_search_call(query: String) -> PlainHistoryCell {
|
||||
let lines: Vec<Line<'static>> =
|
||||
vec![Line::from(""), Line::from(vec!["🌐 ".into(), query.into()])];
|
||||
PlainHistoryCell { lines }
|
||||
}
|
||||
|
||||
impl HistoryCell for RunningMcpCell {
|
||||
fn display_lines(&self) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line<'static>> = Vec::new();
|
||||
lines.push(Line::from(""));
|
||||
|
||||
// Spinner or final status marker
|
||||
let running = self.is_running.load(Ordering::Relaxed);
|
||||
let status = if running {
|
||||
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
|
||||
let idx = ((self.start_time.elapsed().as_millis() / 100) as usize) % FRAMES.len();
|
||||
Span::raw(format!("{}", FRAMES[idx]))
|
||||
} else {
|
||||
"✓".green()
|
||||
};
|
||||
|
||||
let title = Line::from(vec![
|
||||
"tool".magenta(),
|
||||
" ".into(),
|
||||
status,
|
||||
" ".into(),
|
||||
"running...".dim(),
|
||||
]);
|
||||
lines.push(title);
|
||||
lines.push(format_mcp_invocation(self.invocation.clone()));
|
||||
lines
|
||||
}
|
||||
}
|
||||
|
||||
/// If the first content is an image, return a new cell with the image.
|
||||
/// TODO(rgwood-dd): Handle images properly even if they're not the first result.
|
||||
fn try_new_completed_mcp_tool_call_with_image_output(
|
||||
|
||||
Reference in New Issue
Block a user