send-aggregated output

This commit is contained in:
Ahmed Ibrahim
2025-08-15 16:05:54 -07:00
parent 311ad0ce26
commit 35130cf21b
6 changed files with 79 additions and 19 deletions

View File

@@ -722,6 +722,7 @@ impl Session {
let ExecToolCallOutput {
stdout,
stderr,
aggregated_output,
duration,
exit_code,
} = output;
@@ -731,6 +732,7 @@ impl Session {
let stdout = stdout.text.chars().take(MAX_STREAM_OUTPUT).collect();
let stderr = stderr.text.chars().take(MAX_STREAM_OUTPUT).collect();
let formatted_output = format_exec_output_str(output);
let aggregated_output: String = aggregated_output.text.clone();
let msg = if is_apply_patch {
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
@@ -744,9 +746,10 @@ impl Session {
call_id: call_id.to_string(),
stdout,
stderr,
formatted_output,
duration: *duration,
aggregated_output,
exit_code: *exit_code,
duration: *duration,
formatted_output,
})
};
@@ -804,6 +807,7 @@ impl Session {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(get_error_message_ui(e)),
aggregated_output: StreamOutput::new(get_error_message_ui(e)),
duration: Duration::default(),
};
&output_stderr
@@ -2592,6 +2596,7 @@ fn format_exec_output(exec_output: &ExecToolCallOutput) -> String {
// round to 1 decimal place
let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0;
let formatted_output = format_exec_output_str(exec_output);
let formatted_output = format_exec_output_str(exec_output);
let payload = ExecOutput {

View File

@@ -153,6 +153,7 @@ pub async fn process_exec_tool_call(
exit_code,
stdout,
stderr,
aggregated_output: raw_output.aggregated_output.from_utf8_lossy(),
duration,
})
}
@@ -193,6 +194,7 @@ pub struct RawExecToolCallOutput {
pub exit_status: ExitStatus,
pub stdout: StreamOutput<Vec<u8>>,
pub stderr: StreamOutput<Vec<u8>>,
pub aggregated_output: StreamOutput<Vec<u8>>,
}
impl StreamOutput<String> {
@@ -213,11 +215,31 @@ impl StreamOutput<Vec<u8>> {
}
}
#[inline]
fn copy_capped(
dst: &mut Vec<u8>,
src: &[u8],
remaining_bytes: &mut usize,
remaining_lines: &mut usize,
) {
for &b in src {
if *remaining_bytes == 0 || *remaining_lines == 0 {
break;
}
dst.push(b);
*remaining_bytes = remaining_bytes.saturating_sub(1);
if b == b'\n' {
*remaining_lines = remaining_lines.saturating_sub(1);
}
}
}
#[derive(Debug)]
pub struct ExecToolCallOutput {
pub exit_code: i32,
pub stdout: StreamOutput<String>,
pub stderr: StreamOutput<String>,
pub aggregated_output: StreamOutput<String>,
pub duration: Duration,
}
@@ -273,12 +295,15 @@ pub(crate) async fn consume_truncated_output(
))
})?;
let (agg_tx, agg_rx) = async_channel::unbounded::<Vec<u8>>();
let stdout_handle = tokio::spawn(read_capped(
BufReader::new(stdout_reader),
MAX_STREAM_OUTPUT,
MAX_STREAM_OUTPUT_LINES,
stdout_stream.clone(),
false,
Some(agg_tx.clone()),
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
@@ -286,6 +311,7 @@ pub(crate) async fn consume_truncated_output(
MAX_STREAM_OUTPUT_LINES,
stdout_stream.clone(),
true,
Some(agg_tx.clone()),
));
let exit_status = tokio::select! {
@@ -310,10 +336,37 @@ pub(crate) async fn consume_truncated_output(
let stdout = stdout_handle.await??;
let stderr = stderr_handle.await??;
drop(agg_tx);
let mut combined_buf = Vec::with_capacity(MAX_STREAM_OUTPUT.min(8 * 1024));
let mut remaining_bytes = MAX_STREAM_OUTPUT;
let mut remaining_lines = MAX_STREAM_OUTPUT_LINES;
while let Ok(chunk) = agg_rx.recv().await {
if remaining_bytes == 0 || remaining_lines == 0 {
continue;
}
copy_capped(
&mut combined_buf,
&chunk,
&mut remaining_bytes,
&mut remaining_lines,
);
}
let truncated = remaining_lines == 0 || remaining_bytes == 0;
let aggregated_output = StreamOutput {
text: combined_buf,
truncated_after_lines: if truncated {
Some((MAX_STREAM_OUTPUT_LINES - remaining_lines) as u32)
} else {
None
},
};
Ok(RawExecToolCallOutput {
exit_status,
stdout,
stderr,
aggregated_output,
})
}
@@ -323,6 +376,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
max_lines: usize,
stream: Option<StdoutStream>,
is_stderr: bool,
aggregate_tx: Option<Sender<Vec<u8>>>,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(max_output.min(8 * 1024));
let mut tmp = [0u8; 8192];
@@ -355,20 +409,17 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
let _ = stream.tx_event.send(event).await;
}
// Copy into the buffer only while we still have byte and line budget.
if let Some(tx) = &aggregate_tx {
let _ = tx.send(tmp[..n].to_vec()).await;
}
if remaining_bytes > 0 && remaining_lines > 0 {
let mut copy_len = 0;
for &b in &tmp[..n] {
if remaining_bytes == 0 || remaining_lines == 0 {
break;
}
copy_len += 1;
remaining_bytes -= 1;
if b == b'\n' {
remaining_lines -= 1;
}
}
buf.extend_from_slice(&tmp[..copy_len]);
copy_capped(
&mut buf,
&tmp[..n],
&mut remaining_bytes,
&mut remaining_lines,
);
}
// Continue reading to EOF to avoid back-pressure, but discard once caps are hit.
}

View File

@@ -287,8 +287,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout,
stderr,
aggregated_output,
duration,
exit_code,
..
@@ -304,8 +303,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
("".to_string(), format!("exec('{call_id}')"))
};
let output = if exit_code == 0 { stdout } else { stderr };
let truncated_output = output
let truncated_output = aggregated_output
.lines()
.take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL)
.collect::<Vec<_>>()

View File

@@ -685,6 +685,9 @@ pub struct ExecCommandEndEvent {
pub stdout: String,
/// Captured stderr
pub stderr: String,
/// Captured aggregated output
#[serde(default)]
pub aggregated_output: String,
/// The command's exit code.
pub exit_code: i32,
/// The duration of the command execution.

View File

@@ -263,6 +263,7 @@ fn exec_history_cell_shows_working_then_completed() {
call_id: "call-1".into(),
stdout: "done".into(),
stderr: String::new(),
aggregated_output: "done".into(),
exit_code: 0,
duration: std::time::Duration::from_millis(5),
formatted_output: "done".into(),
@@ -313,6 +314,7 @@ fn exec_history_cell_shows_working_then_failed() {
call_id: "call-2".into(),
stdout: String::new(),
stderr: "error".into(),
aggregated_output: "error".into(),
exit_code: 2,
duration: std::time::Duration::from_millis(7),
formatted_output: "".into(),

View File

@@ -9,6 +9,7 @@ codex
Im going to scan the workspace and Cargo manifests to see build profiles and
dependencies that impact binary size. Then Ill summarize the main causes.
>_
✓ ls -la
└ total 6696