Compare commits

...

1 Commits

Author SHA1 Message Date
Eric Traut
e4aa2b2c3b Handle BrokenPipe in codex exec --json stdout streaming
Fixes #10248.

Replace println! in JSONL streaming with a locked stdout writer.
Treat ErrorKind::BrokenPipe as a graceful shutdown signal instead of panicking.
Add a regression test that closes stdout mid-stream and asserts a clean exit.
Why this looked like a 0.92.0 regression:

The panic-on-EPIPE behavior is older, but 0.92.0 appears to emit additional early --json stdout lines in some configurations.
In particular, 0.92.0 includes a new “under-development features enabled” warning event (c900de271, #9954) that prints to stdout in JSON mode.
More early stdout writes increase the chance of hitting EPIPE when downstream consumers (e.g., head, tee, detached runners) close the pipe mid-run.
This change makes --json streaming robust to stdout closure regardless of when it happens.
2026-01-30 12:38:12 -08:00
5 changed files with 56 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::io::Write as _;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
@@ -846,10 +847,17 @@ impl EventProcessor for EventProcessorWithJsonOutput {
#[allow(clippy::print_stdout)]
fn process_event(&mut self, event: protocol::Event) -> CodexStatus {
let aggregated = self.collect_thread_events(&event);
let mut stdout = std::io::stdout().lock();
for conv_event in aggregated {
match serde_json::to_string(&conv_event) {
Ok(line) => {
println!("{line}");
if let Err(err) = writeln!(stdout, "{line}") {
if err.kind() == std::io::ErrorKind::BrokenPipe {
return CodexStatus::InitiateShutdown;
}
error!("Failed to write event to stdout: {err:?}");
break;
}
}
Err(e) => {
error!("Failed to serialize event: {e:?}");

View File

@@ -0,0 +1,44 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_core::auth::CODEX_API_KEY_ENV_VAR;
use codex_utils_cargo_bin::cargo_bin;
use codex_utils_cargo_bin::find_resource;
use core_test_support::test_codex_exec::test_codex_exec;
use pretty_assertions::assert_eq;
use std::io::BufRead as _;
use std::io::BufReader;
use std::process::Command;
use std::process::Stdio;
#[test]
fn json_streaming_exits_cleanly_when_stdout_closes() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture = find_resource!("tests/fixtures/cli_responses_fixture.sse")?;
let repo_root = codex_utils_cargo_bin::repo_root()?;
let bin = cargo_bin("codex-exec")?;
let mut cmd = Command::new(bin);
cmd.current_dir(test.cwd_path())
.env("CODEX_HOME", test.home_path())
.env(CODEX_API_KEY_ENV_VAR, "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("--json")
.arg("echo broken pipe handling");
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().expect("stdout missing");
let mut reader = BufReader::new(stdout);
let mut first_line = String::new();
// Read the first line, then drop the reader to close the pipe.
let _bytes = reader.read_line(&mut first_line)?;
drop(reader);
let status = child.wait()?;
assert_eq!(status.code(), Some(0));
Ok(())
}

View File

@@ -2,6 +2,7 @@
mod add_dir;
mod apply_patch;
mod auth_env;
mod broken_pipe;
mod originator;
mod output_schema;
mod resume;

1
rust-v0.91.0-worktree Submodule

Submodule rust-v0.91.0-worktree added at 3684bc646e

1
rust-v0.92.0-worktree Submodule

Submodule rust-v0.92.0-worktree added at a09055074e