Compare commits

...

15 Commits

Author SHA1 Message Date
Jeremy Rose
75ba0d9e89 fix 2025-08-25 17:32:45 -07:00
Jeremy Rose
3117991c02 Merge remote-tracking branch 'origin/main' into nornagon/parallel-tool-calls 2025-08-25 17:32:18 -07:00
Jeremy Rose
5a4834748f fix 2025-08-25 17:24:30 -07:00
Jeremy Rose
4cb61784a5 test 2025-08-25 17:21:19 -07:00
Jeremy Rose
6674c6c3e4 update spinners 2025-08-25 15:59:57 -07:00
Jeremy Rose
8ec1ea16d3 fix duration 2025-08-25 15:27:12 -07:00
Jeremy Rose
83745d1be2 fix merge 2025-08-25 15:14:48 -07:00
Jeremy Rose
6cda3d13ab Merge remote-tracking branch 'refs/remotes/origin/nornagon/parallel-tool-calls' into nornagon/parallel-tool-calls 2025-08-25 15:03:24 -07:00
Jeremy Rose
6a2e5e19d8 Merge remote-tracking branch 'origin/main' into nornagon/parallel-tool-calls 2025-08-25 15:01:56 -07:00
Jeremy Rose
5572ffd40e wip 2025-08-23 19:45:53 -07:00
Jeremy Rose
3fffcb5542 Merge remote-tracking branch 'origin/main' into nornagon/more-test-perf 2025-08-23 19:36:28 -07:00
Jeremy Rose
2ffa009070 other crates too 2025-08-23 19:36:25 -07:00
Jeremy Rose
5ea08d7150 all integration tests in one binary 2025-08-23 15:23:24 -07:00
Jeremy Rose
fad99960c3 no core doctests 2025-08-23 14:53:30 -07:00
Jeremy Rose
cacf8c91bb wip 2025-08-22 16:23:16 -07:00
8 changed files with 709 additions and 89 deletions

View File

@@ -199,7 +199,7 @@ impl ModelClient {
input: &input_with_instructions,
tools: &tools_json,
tool_choice: "auto",
parallel_tool_calls: false,
parallel_tool_calls: true,
reasoning,
store,
stream: true,

View File

@@ -732,7 +732,7 @@ impl Session {
async fn on_exec_command_end(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
_turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
call_id: &str,
output: &ExecToolCallOutput,
@@ -775,20 +775,6 @@ impl Session {
msg,
};
let _ = self.tx_event.send(event).await;
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
if is_apply_patch {
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
id: sub_id.into(),
msg,
};
let _ = self.tx_event.send(event).await;
}
}
}
/// Runs the exec tool call and emits events for the begin and end of the
/// command even on error.
@@ -1695,7 +1681,31 @@ async fn try_run_turn(
let mut stream = turn_context.client.clone().stream(&prompt).await?;
let mut output = Vec::new();
// Stage tool calls to execute after the model signals completion.
// We do this so multiple tool calls can run in parallel (where possible)
// and so we don't block SSE processing.
enum StagedCall {
Exec {
item: ResponseItem,
name: String,
args: String,
call_id: String,
},
LocalShell {
item: ResponseItem,
id: Option<String>,
call_id: Option<String>,
action: LocalShellAction,
},
Mcp {
item: ResponseItem,
server: String,
tool: String,
args: String,
call_id: String,
},
}
let mut staged: Vec<StagedCall> = Vec::new();
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
@@ -1722,15 +1732,59 @@ async fn try_run_turn(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let response = handle_response_item(
sess,
turn_context,
turn_diff_tracker,
sub_id,
item.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
match &item {
ResponseItem::FunctionCall {
name,
arguments,
call_id,
..
} => {
// Classify as MCP vs Exec and stage execution for after completion.
if let Some((server, tool)) =
sess.mcp_connection_manager.parse_tool_name(name)
{
staged.push(StagedCall::Mcp {
item: item.clone(),
server,
tool,
args: arguments.clone(),
call_id: call_id.clone(),
});
} else {
staged.push(StagedCall::Exec {
item: item.clone(),
name: name.clone(),
args: arguments.clone(),
call_id: call_id.clone(),
});
}
}
ResponseItem::LocalShellCall {
id,
call_id,
action,
..
} => {
staged.push(StagedCall::LocalShell {
item: item.clone(),
id: id.clone(),
call_id: call_id.clone(),
action: action.clone(),
});
}
_ => {
// Non-tool items are handled immediately (messages, reasoning, deltas).
let response = handle_response_item(
sess,
turn_context,
turn_diff_tracker,
sub_id,
item.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
}
}
}
ResponseEvent::WebSearchCallBegin { call_id, query } => {
let q = query.unwrap_or_else(|| "Searching Web...".to_string());
@@ -1756,6 +1810,165 @@ async fn try_run_turn(
.ok();
}
// Execute any staged tool calls before returning the turn output.
use futures::future::join_all;
// Run MCP, LocalShell, and non-apply exec tool calls fully in parallel.
let mut mcp_futs = Vec::new();
let mut local_futs = Vec::new();
let mut exec_parallel_futs = Vec::new();
// Keep apply_patch exec calls sequential for diff tracking.
let mut exec_sequential_calls = Vec::new();
for sc in staged {
match sc {
StagedCall::Mcp {
item,
server,
tool,
args,
call_id,
} => {
let fut = handle_mcp_tool_call(
sess,
sub_id,
call_id.clone(),
server,
tool,
args,
None,
);
mcp_futs.push(async move {
let response = fut.await;
ProcessedResponseItem {
item,
response: Some(response),
}
});
}
StagedCall::LocalShell {
item,
id,
call_id,
action,
} => {
let effective_call_id = match (call_id, id) {
(Some(call_id), _) => call_id,
(None, Some(id)) => id,
(None, None) => String::new(),
};
let LocalShellAction::Exec(action) = action;
let params = ShellToolCallParams {
command: action.command,
workdir: action.working_directory,
timeout_ms: action.timeout_ms,
with_escalated_permissions: None,
justification: None,
};
let exec_params = to_exec_params(params, turn_context);
let fut = async move {
// Use a fresh tracker non-apply_patch calls won't emit diffs anyway.
let mut tdt = TurnDiffTracker::new();
let response = handle_container_exec_with_params(
exec_params,
sess,
turn_context,
&mut tdt,
sub_id.to_string(),
effective_call_id,
)
.await;
ProcessedResponseItem {
item,
response: Some(response),
}
};
local_futs.push(fut);
}
StagedCall::Exec {
item,
name,
args,
call_id,
} => {
if name == "apply_patch" {
exec_sequential_calls.push(StagedCall::Exec {
item,
name,
args,
call_id,
});
} else {
let fut = async move {
let response = match parse_container_exec_arguments(
args,
turn_context,
&call_id,
) {
Ok(params) => {
let mut tdt = TurnDiffTracker::new();
handle_container_exec_with_params(
params,
sess,
turn_context,
&mut tdt,
sub_id.to_string(),
call_id,
)
.await
}
Err(output) => *output,
};
ProcessedResponseItem {
item,
response: Some(response),
}
};
exec_parallel_futs.push(fut);
}
}
}
}
let mut processed: Vec<ProcessedResponseItem> = join_all(mcp_futs).await;
processed.extend(join_all(local_futs).await);
processed.extend(join_all(exec_parallel_futs).await);
// Now handle exec/shell calls one by one so we can use the turn_diff_tracker safely.
for sc in exec_sequential_calls {
match sc {
StagedCall::Exec {
item,
name,
args,
call_id,
} => {
// Reuse existing logic for exec tools.
let response = handle_function_call(
sess,
turn_context,
turn_diff_tracker,
sub_id.to_string(),
name,
args,
call_id,
)
.await;
processed.push(ProcessedResponseItem {
item,
response: Some(response),
});
}
StagedCall::LocalShell { .. } => {}
StagedCall::Mcp { .. } => {}
}
}
// Preserve original ordering: non-tool items were already pushed to `output` as they arrived;
// now append the processed tool-call items in their staged order.
output.extend(processed);
// Emit a single unified diff for the entire batch if there is one.
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });

View File

@@ -0,0 +1,115 @@
#![cfg(unix)]
use std::time::Instant;
use codex_core::built_in_model_providers;
use codex_core::protocol::EventMsg;
use codex_core::ConversationManager;
use codex_login::CodexAuth;
use core_test_support::load_sse_fixture_with_id_from_str;
use tempfile::TempDir;
fn build_parallel_exec_sse() -> String {
let item1 = serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "local_shell_call",
"call_id": "c1",
"status": "in_progress",
"action": {
"type": "exec",
"command": ["/bin/sh", "-c", "echo A"],
"timeout_ms": 3000,
"working_directory": null,
"env": null,
"user": null
}
}
});
let item2 = serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "local_shell_call",
"call_id": "c2",
"status": "in_progress",
"action": {
"type": "exec",
"command": ["/bin/sh", "-c", "echo B"],
"timeout_ms": 3000,
"working_directory": null,
"env": null,
"user": null
}
}
});
let completed = serde_json::json!({
"type": "response.completed",
"response": { "id": "__ID__" }
});
let raw = serde_json::json!([
item1, item2, completed
])
.to_string();
load_sse_fixture_with_id_from_str(&raw, "resp_parallel")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn local_shell_calls_begin_before_any_end_and_run_concurrently() {
let tmp_sse = tempfile::NamedTempFile::new().expect("tmp sse");
std::fs::write(tmp_sse.path(), build_parallel_exec_sse()).expect("write sse");
std::env::set_var("CODEX_RS_SSE_FIXTURE", tmp_sse.path());
let home = TempDir::new().unwrap();
let mut config = core_test_support::load_default_config_for_test(&home);
let providers = built_in_model_providers();
config.model_provider = providers
.get("openai")
.expect("builtin provider")
.clone();
let cm = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
let conv = cm
.new_conversation(config)
.await
.expect("spawn conversation")
.conversation;
conv
.submit(codex_core::protocol::Op::UserInput {
items: vec![codex_core::protocol::InputItem::Text {
text: "go".into(),
}],
})
.await
.expect("submit");
let mut begins = 0usize;
let mut ends = 0usize;
let mut seen_first_end = false;
use tokio::time::{sleep, Duration as TokioDuration};
let deadline = Instant::now() + TokioDuration::from_secs(5);
while Instant::now() < deadline {
let ev = conv.next_event().await.expect("event");
match ev.msg {
EventMsg::ExecCommandBegin(_) => {
begins += 1;
}
EventMsg::ExecCommandEnd(_) => {
if !seen_first_end {
assert_eq!(begins, 2, "expected both begins before first end");
seen_first_end = true;
}
ends += 1;
}
EventMsg::TaskComplete(_) => break,
_ => {}
}
sleep(TokioDuration::from_millis(5)).await;
}
assert_eq!(begins, 2, "expected two parallel exec begins");
assert_eq!(ends, 2, "expected two exec completions");
}

View File

@@ -42,6 +42,7 @@ use ratatui::layout::Layout;
use ratatui::layout::Rect;
use ratatui::widgets::Widget;
use ratatui::widgets::WidgetRef;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tracing::debug;
@@ -59,6 +60,7 @@ use crate::history_cell::CommandOutput;
use crate::history_cell::ExecCell;
use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType;
use crate::history_cell::RunningMcpCell;
use crate::slash_command::SlashCommand;
use crate::tui::FrameRequester;
// streaming internals are provided by crate::streaming and crate::markdown_stream
@@ -79,6 +81,8 @@ use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol_config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_file_search::FileMatch;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use uuid::Uuid;
// Track information about an in-flight exec command.
@@ -99,7 +103,8 @@ pub(crate) struct ChatWidget {
// Stream lifecycle controller
stream: StreamController,
running_commands: HashMap<String, RunningCommand>,
pending_exec_completions: Vec<(Vec<String>, Vec<ParsedCommand>, CommandOutput)>,
running_mcp: HashMap<String, Arc<AtomicBool>>,
pending_exec_completions: Vec<(Vec<String>, Vec<ParsedCommand>, CommandOutput, Duration)>,
task_complete_pending: bool,
// Queue of interruptive UI events deferred during an active write cycle
interrupts: InterruptManager,
@@ -417,6 +422,12 @@ impl ChatWidget {
Some(rc) => (rc.command, rc.parsed_cmd),
None => (vec![ev.call_id.clone()], Vec::new()),
};
// Flip the corresponding spinner to a final mark immediately in the active cell.
if let Some(active) = self.active_exec_cell.as_mut() {
let success = ev.exit_code == 0;
active.mark_segment_complete(&ev.call_id, success);
self.request_redraw();
}
self.pending_exec_completions.push((
command,
parsed,
@@ -426,19 +437,20 @@ impl ChatWidget {
stderr: ev.stderr.clone(),
formatted_output: ev.formatted_output.clone(),
},
ev.duration,
));
if self.running_commands.is_empty() {
self.active_exec_cell = None;
let pending = std::mem::take(&mut self.pending_exec_completions);
for (command, parsed, output) in pending {
for (command, parsed, output, duration) in pending {
let include_header = !self.last_history_was_exec;
let cell = history_cell::new_completed_exec_command(
command,
parsed,
output,
include_header,
ev.duration,
duration,
);
self.add_to_history(cell);
self.last_history_was_exec = true;
@@ -501,14 +513,18 @@ impl ChatWidget {
// Accumulate parsed commands into a single active Exec cell so they stack
match self.active_exec_cell.as_mut() {
Some(exec) => {
let start_idx = exec.parsed.len();
let added = ev.parsed_cmd.len();
exec.parsed.extend(ev.parsed_cmd);
exec.push_segment(ev.call_id.clone(), start_idx, added);
}
_ => {
let include_header = !self.last_history_was_exec;
self.active_exec_cell = Some(history_cell::new_active_exec_command(
self.active_exec_cell = Some(history_cell::ExecCell::with_initial_segment(
ev.command,
ev.parsed_cmd,
include_header,
ev.call_id.clone(),
));
}
}
@@ -519,10 +535,17 @@ impl ChatWidget {
pub(crate) fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
self.flush_answer_stream_with_separator();
self.add_to_history(history_cell::new_active_mcp_tool_call(ev.invocation));
// Track running state so the spinner stops when the call ends.
let is_running = Arc::new(AtomicBool::new(true));
self.running_mcp
.insert(ev.call_id.clone(), Arc::clone(&is_running));
self.add_boxed_history(Box::new(RunningMcpCell::new(ev.invocation, is_running)));
}
pub(crate) fn handle_mcp_end_now(&mut self, ev: McpToolCallEndEvent) {
self.flush_answer_stream_with_separator();
if let Some(flag) = self.running_mcp.remove(&ev.call_id) {
flag.store(false, Ordering::Relaxed);
}
self.add_boxed_history(history_cell::new_completed_mcp_tool_call(
80,
ev.invocation,
@@ -591,6 +614,7 @@ impl ChatWidget {
last_token_usage: TokenUsage::default(),
stream: StreamController::new(config),
running_commands: HashMap::new(),
running_mcp: HashMap::new(),
pending_exec_completions: Vec::new(),
task_complete_pending: false,
interrupts: InterruptManager::new(),
@@ -636,6 +660,7 @@ impl ChatWidget {
last_token_usage: TokenUsage::default(),
stream: StreamController::new(config),
running_commands: HashMap::new(),
running_mcp: HashMap::new(),
pending_exec_completions: Vec::new(),
task_complete_pending: false,
interrupts: InterruptManager::new(),

View File

@@ -0,0 +1,7 @@
---
source: tui/src/chatwidget/tests.rs
expression: s
---
>_
✓ ⌨️ echo A
⠋ ⌨️ echo B

View File

@@ -0,0 +1,7 @@
---
source: tui/src/chatwidget/tests.rs
expression: s
---
>_
⠋ ⌨️ echo one
⠋ ⌨️ echo two

View File

@@ -175,6 +175,7 @@ fn make_chatwidget_manual() -> (
last_token_usage: TokenUsage::default(),
stream: StreamController::new(cfg),
running_commands: HashMap::new(),
running_mcp: HashMap::new(),
pending_exec_completions: Vec::new(),
task_complete_pending: false,
interrupts: InterruptManager::new(),
@@ -214,6 +215,106 @@ fn lines_to_single_string(lines: &[ratatui::text::Line<'static>]) -> String {
s
}
#[test]
fn active_exec_segments_render_for_parallel_begins_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();
// Begin first command
chat.handle_codex_event(Event {
id: "call-1".into(),
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "call-1".into(),
command: vec!["bash".into(), "-lc".into(), "echo one".into()],
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
parsed_cmd: vec![
codex_core::parse_command::ParsedCommand::Unknown {
cmd: "echo one".into(),
}
.into(),
],
}),
});
// Begin a second command (parallel segment)
chat.handle_codex_event(Event {
id: "call-2".into(),
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "call-2".into(),
command: vec!["bash".into(), "-lc".into(), "echo two".into()],
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
parsed_cmd: vec![
codex_core::parse_command::ParsedCommand::Unknown {
cmd: "echo two".into(),
}
.into(),
],
}),
});
// Snapshot the active exec cell (shows spinners for both segments)
let cell = chat.active_exec_cell.as_ref().expect("active exec cell");
let s = lines_to_single_string(&cell.display_lines());
assert_snapshot!(s);
}
#[test]
fn active_exec_segments_mark_completion_in_place_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Begin two commands
chat.handle_codex_event(Event {
id: "call-a".into(),
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "call-a".into(),
command: vec!["bash".into(), "-lc".into(), "echo A".into()],
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
parsed_cmd: vec![
codex_core::parse_command::ParsedCommand::Unknown {
cmd: "echo A".into(),
}
.into(),
],
}),
});
chat.handle_codex_event(Event {
id: "call-b".into(),
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "call-b".into(),
command: vec!["bash".into(), "-lc".into(), "echo B".into()],
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
parsed_cmd: vec![
codex_core::parse_command::ParsedCommand::Unknown {
cmd: "echo B".into(),
}
.into(),
],
}),
});
// Complete first command successfully; second still running
chat.handle_codex_event(Event {
id: "call-a".into(),
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-a".into(),
stdout: "A".into(),
stderr: String::new(),
aggregated_output: "A".into(),
exit_code: 0,
duration: std::time::Duration::from_millis(5),
formatted_output: "A".into(),
}),
});
// Drain any history insertions (none expected yet) and snapshot active cell
let _ = drain_insert_history(&mut rx);
let cell = chat
.active_exec_cell
.as_ref()
.expect("active exec cell after first end");
let s = lines_to_single_string(&cell.display_lines());
assert_snapshot!(s);
}
fn open_fixture(name: &str) -> std::fs::File {
// 1) Prefer fixtures within this crate
{

View File

@@ -36,6 +36,9 @@ use ratatui::widgets::Wrap;
use std::collections::HashMap;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use tracing::error;
@@ -107,16 +110,26 @@ pub(crate) struct ExecCell {
start_time: Option<Instant>,
duration: Option<Duration>,
include_header: bool,
// When present, this ExecCell represents an active, possibly multi-call
// exec group. Each segment maps a contiguous slice of `parsed` to a
// specific call_id with its own running/completed state.
pub(crate) segments: Option<Vec<ExecSegment>>,
}
impl HistoryCell for ExecCell {
fn display_lines(&self) -> Vec<Line<'static>> {
exec_command_lines(
&self.command,
&self.parsed,
self.output.as_ref(),
self.start_time,
self.include_header,
)
// If this cell is tracking active segments (parallel calls), render
// per-segment markers so completed calls show ✓ while others spin.
if let Some(segments) = &self.segments {
render_active_exec_with_segments(&self.parsed, segments, self.include_header)
} else {
exec_command_lines(
&self.command,
&self.parsed,
self.output.as_ref(),
self.start_time,
self.include_header,
)
}
}
fn transcript_lines(&self) -> Vec<Line<'static>> {
@@ -273,6 +286,7 @@ pub(crate) fn new_user_prompt(message: String) -> PlainHistoryCell {
PlainHistoryCell { lines }
}
#[allow(dead_code)]
pub(crate) fn new_active_exec_command(
command: Vec<String>,
parsed: Vec<ParsedCommand>,
@@ -285,6 +299,7 @@ pub(crate) fn new_active_exec_command(
start_time: Some(Instant::now()),
duration: None,
include_header,
segments: None,
}
}
@@ -302,9 +317,142 @@ pub(crate) fn new_completed_exec_command(
start_time: None,
duration: Some(duration),
include_header,
segments: None,
}
}
#[derive(Clone, Debug)]
pub(crate) struct ExecSegment {
pub(crate) call_id: String,
pub(crate) start: usize,
pub(crate) len: usize,
pub(crate) started_at: Instant,
pub(crate) completed_success: Option<bool>,
}
impl ExecCell {
pub(crate) fn with_initial_segment(
command: Vec<String>,
parsed: Vec<ParsedCommand>,
include_header: bool,
call_id: String,
) -> ExecCell {
let start = 0usize;
let len = parsed.len();
ExecCell {
command,
parsed,
output: None,
start_time: Some(Instant::now()),
duration: None,
include_header,
segments: Some(vec![ExecSegment {
call_id,
start,
len,
started_at: Instant::now(),
completed_success: None,
}]),
}
}
pub(crate) fn push_segment(&mut self, call_id: String, start: usize, len: usize) {
if let Some(segs) = &mut self.segments {
segs.push(ExecSegment {
call_id,
start,
len,
started_at: Instant::now(),
completed_success: None,
});
}
}
pub(crate) fn mark_segment_complete(&mut self, call_id: &str, success: bool) {
if let Some(segs) = &mut self.segments
&& let Some(seg) = segs.iter_mut().find(|s| s.call_id == call_id)
{
seg.completed_success = Some(success);
}
}
}
fn parsed_command_label(parsed: &ParsedCommand) -> String {
match parsed {
ParsedCommand::Read { name, .. } => format!("📖 {name}"),
ParsedCommand::ListFiles { cmd, path } => match path {
Some(p) => format!("📂 {p}"),
None => format!("📂 {cmd}"),
},
ParsedCommand::Search { query, path, cmd } => match (query, path) {
(Some(q), Some(p)) => format!("🔎 {q} in {p}"),
(Some(q), None) => format!("🔎 {q}"),
(None, Some(p)) => format!("🔎 {p}"),
(None, None) => format!("🔎 {cmd}"),
},
ParsedCommand::Format { .. } => "✨ Formatting".to_string(),
ParsedCommand::Test { cmd } => format!("🧪 {cmd}"),
ParsedCommand::Lint { cmd, .. } => format!("🧹 {cmd}"),
ParsedCommand::Unknown { cmd } => format!("⌨️ {cmd}"),
ParsedCommand::Noop { cmd } => format!("🔄 {cmd}"),
}
}
fn render_parsed_commands_with_marker(
parsed_commands: &[ParsedCommand],
include_header: bool,
mut marker_for_idx: impl FnMut(usize) -> Span<'static>,
) -> Vec<Line<'static>> {
let mut lines: Vec<Line> = Vec::new();
if include_header {
lines.push(Line::from(""));
lines.push(Line::from(">_".magenta()));
}
for (idx, parsed) in parsed_commands.iter().enumerate() {
let text = parsed_command_label(parsed);
for (j, line_text) in text.lines().enumerate() {
if j == 0 {
lines.push(Line::from(vec![
" ".into(),
marker_for_idx(idx),
" ".into(),
line_text.to_string().light_blue(),
]));
} else {
lines.push(Line::from(vec![
" ".into(),
line_text.to_string().light_blue(),
]));
}
}
}
lines
}
fn render_active_exec_with_segments(
parsed_commands: &[ParsedCommand],
segments: &[ExecSegment],
include_header: bool,
) -> Vec<Line<'static>> {
let seg_for = |idx: usize| -> Option<&ExecSegment> {
segments
.iter()
.find(|s| idx >= s.start && idx < s.start + s.len)
};
render_parsed_commands_with_marker(parsed_commands, include_header, |idx| match seg_for(idx) {
Some(seg) => match seg.completed_success {
Some(true) => "".green(),
Some(false) => "".red(),
None => {
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
let i = ((seg.started_at.elapsed().as_millis() / 100) as usize) % FRAMES.len();
Span::raw(format!("{}", FRAMES[i]))
}
},
None => "?".into(),
})
}
fn exec_command_lines(
command: &[String],
parsed: &[ParsedCommand],
@@ -324,67 +472,22 @@ fn new_parsed_command(
start_time: Option<Instant>,
include_header: bool,
) -> Vec<Line<'static>> {
let mut lines: Vec<Line> = Vec::new();
// Leading spacer and header line above command list
if include_header {
lines.push(Line::from(""));
lines.push(Line::from(">_".magenta()));
}
// Determine the leading status marker: spinner while running, ✓ on success, ✗ on failure.
let status_marker: Span<'static> = match output {
None => {
// Animated braille spinner choose frame based on elapsed time.
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
let idx = start_time
.map(|st| ((st.elapsed().as_millis() / 100) as usize) % FRAMES.len())
.unwrap_or(0);
let ch = FRAMES[idx];
Span::raw(format!("{ch}"))
Span::raw(format!("{}", FRAMES[idx]))
}
Some(o) if o.exit_code == 0 => Span::styled("", Style::default().fg(Color::Green)),
Some(_) => Span::styled("", Style::default().fg(Color::Red)),
};
for parsed in parsed_commands.iter() {
let text = match parsed {
ParsedCommand::Read { name, .. } => format!("📖 {name}"),
ParsedCommand::ListFiles { cmd, path } => match path {
Some(p) => format!("📂 {p}"),
None => format!("📂 {cmd}"),
},
ParsedCommand::Search { query, path, cmd } => match (query, path) {
(Some(q), Some(p)) => format!("🔎 {q} in {p}"),
(Some(q), None) => format!("🔎 {q}"),
(None, Some(p)) => format!("🔎 {p}"),
(None, None) => format!("🔎 {cmd}"),
},
ParsedCommand::Format { .. } => "✨ Formatting".to_string(),
ParsedCommand::Test { cmd } => format!("🧪 {cmd}"),
ParsedCommand::Lint { cmd, .. } => format!("🧹 {cmd}"),
ParsedCommand::Unknown { cmd } => format!("⌨️ {cmd}"),
ParsedCommand::Noop { cmd } => format!("🔄 {cmd}"),
};
// Prefix: two spaces, marker, space. Continuations align under the text block.
for (j, line_text) in text.lines().enumerate() {
if j == 0 {
lines.push(Line::from(vec![
" ".into(),
status_marker.clone(),
" ".into(),
line_text.to_string().light_blue(),
]));
} else {
lines.push(Line::from(vec![
" ".into(),
line_text.to_string().light_blue(),
]));
}
}
}
let mut lines = render_parsed_commands_with_marker(parsed_commands, include_header, |_| {
status_marker.clone()
});
lines.extend(output_lines(output, true, false));
lines
}
@@ -437,23 +540,72 @@ fn new_exec_command_generic(
lines
}
#[allow(dead_code)]
pub(crate) fn new_active_mcp_tool_call(invocation: McpInvocation) -> PlainHistoryCell {
// Backwards-compatible helper retained for call sites; create a non-animated cell.
// Prefer `new_running_mcp_tool_call` for spinner support.
let title_line = Line::from(vec!["tool".magenta(), " running...".dim()]);
let lines: Vec<Line> = vec![
Line::from(""),
title_line,
format_mcp_invocation(invocation.clone()),
format_mcp_invocation(invocation),
];
PlainHistoryCell { lines }
}
#[derive(Debug)]
pub(crate) struct RunningMcpCell {
invocation: McpInvocation,
start_time: Instant,
is_running: Arc<AtomicBool>,
}
impl RunningMcpCell {
pub(crate) fn new(invocation: McpInvocation, is_running: Arc<AtomicBool>) -> Self {
Self {
invocation,
start_time: Instant::now(),
is_running,
}
}
}
// (second impl removed; defined above)
pub(crate) fn new_web_search_call(query: String) -> PlainHistoryCell {
let lines: Vec<Line<'static>> =
vec![Line::from(""), Line::from(vec!["🌐 ".into(), query.into()])];
PlainHistoryCell { lines }
}
impl HistoryCell for RunningMcpCell {
fn display_lines(&self) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from(""));
// Spinner or final status marker
let running = self.is_running.load(Ordering::Relaxed);
let status = if running {
const FRAMES: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
let idx = ((self.start_time.elapsed().as_millis() / 100) as usize) % FRAMES.len();
Span::raw(format!("{}", FRAMES[idx]))
} else {
"".green()
};
let title = Line::from(vec![
"tool".magenta(),
" ".into(),
status,
" ".into(),
"running...".dim(),
]);
lines.push(title);
lines.push(format_mcp_invocation(self.invocation.clone()));
lines
}
}
/// If the first content is an image, return a new cell with the image.
/// TODO(rgwood-dd): Handle images properly even if they're not the first result.
fn try_new_completed_mcp_tool_call_with_image_output(