feat: update process ID for event handling (#7261)

This commit is contained in:
jif-oai
2025-11-25 22:21:05 +00:00
committed by GitHub
parent 981e2f742d
commit 28ff364c3a
17 changed files with 473 additions and 163 deletions

View File

@@ -44,7 +44,7 @@ fn extract_output_text(item: &Value) -> Option<&str> {
struct ParsedUnifiedExecOutput {
chunk_id: Option<String>,
wall_time_seconds: f64,
session_id: Option<i32>,
process_id: Option<String>,
exit_code: Option<i32>,
original_token_count: Option<usize>,
output: String,
@@ -59,7 +59,7 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
r#"(?:Chunk ID: (?P<chunk_id>[^\n]+)\n)?"#,
r#"Wall time: (?P<wall_time>-?\d+(?:\.\d+)?) seconds\n"#,
r#"(?:Process exited with code (?P<exit_code>-?\d+)\n)?"#,
r#"(?:Process running with session ID (?P<session_id>-?\d+)\n)?"#,
r#"(?:Process running with session ID (?P<process_id>-?\d+)\n)?"#,
r#"(?:Original token count: (?P<original_token_count>\d+)\n)?"#,
r#"Output:\n?(?P<output>.*)$"#,
))
@@ -92,15 +92,9 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
})
.transpose()?;
let session_id = captures
.name("session_id")
.map(|value| {
value
.as_str()
.parse::<i32>()
.context("failed to parse session id from unified exec output")
})
.transpose()?;
let process_id = captures
.name("process_id")
.map(|value| value.as_str().to_string());
let original_token_count = captures
.name("original_token_count")
@@ -121,7 +115,7 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
Ok(ParsedUnifiedExecOutput {
chunk_id,
wall_time_seconds,
session_id,
process_id,
exit_code,
original_token_count,
output,
@@ -335,7 +329,7 @@ async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
let poll_call_id = "uexec-end-event-poll";
let poll_args = json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 250,
});
@@ -493,7 +487,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
let stdin_call_id = "uexec-stdin-delta";
let stdin_args = json!({
"chars": "echo WSTDIN-MARK\\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 800,
});
@@ -592,7 +586,7 @@ async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
let stdin_call_id = "uexec-stdin-begin";
let stdin_args = json!({
"chars": "echo hello",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 400,
});
@@ -694,7 +688,7 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
let poll_call_id = "uexec-poll-empty";
let poll_args = json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 150,
});
@@ -880,8 +874,8 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
);
assert!(
metadata.session_id.is_none(),
"exec_command for a completed process should not include session_id"
metadata.process_id.is_none(),
"exec_command for a completed process should not include process_id"
);
let exit_code = metadata.exit_code.expect("expected exit_code");
@@ -973,7 +967,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
.expect("missing early exit unified_exec output");
assert!(
output.session_id.is_none(),
output.process_id.is_none(),
"short-lived process should not keep a session alive"
);
assert_eq!(
@@ -1023,12 +1017,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
});
let send_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1099,12 +1093,13 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
let start_output = outputs
.get(start_call_id)
.expect("missing start output for exec_command");
let session_id = start_output
.session_id
.expect("expected session id from exec_command");
let process_id = start_output
.process_id
.clone()
.expect("expected process id from exec_command");
assert!(
session_id >= 0,
"session_id should be non-negative, got {session_id}"
process_id.len() > 3,
"process_id should be at least 4 digits, got {process_id}"
);
assert!(
start_output.exit_code.is_none(),
@@ -1120,11 +1115,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
"expected echoed output from cat, got {echoed:?}"
);
let echoed_session = send_output
.session_id
.expect("write_stdin should return session id while process is running");
.process_id
.clone()
.expect("write_stdin should return process id while process is running");
assert_eq!(
echoed_session, session_id,
"write_stdin should reuse existing session id"
echoed_session, process_id,
"write_stdin should reuse existing process id"
);
assert!(
send_output.exit_code.is_none(),
@@ -1135,8 +1131,8 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
.get(exit_call_id)
.expect("missing exit metadata output");
assert!(
exit_output.session_id.is_none(),
"session_id should be omitted once the process exits"
exit_output.process_id.is_none(),
"process_id should be omitted once the process exits"
);
let exit_code = exit_output
.exit_code
@@ -1182,14 +1178,14 @@ async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()
let echo_call_id = "uexec-end-on-exit-echo";
let echo_args = serde_json::json!({
"chars": "bye-END\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 300,
});
let exit_call_id = "uexec-end-on-exit";
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1285,7 +1281,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
let second_call_id = "uexec-stdin";
let second_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1347,17 +1343,20 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
let start_output = outputs
.get(first_call_id)
.expect("missing first unified_exec output");
let session_id = start_output.session_id.unwrap_or_default();
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
session_id >= 0,
"expected session id in first unified_exec response"
!process_id.is_empty(),
"expected process id in first unified_exec response"
);
assert!(start_output.output.is_empty());
let reuse_output = outputs
.get(second_call_id)
.expect("missing reused unified_exec output");
assert_eq!(reuse_output.session_id.unwrap_or_default(), session_id);
assert_eq!(
reuse_output.process_id.clone().unwrap_or_default(),
process_id
);
let echoed = reuse_output.output.as_str();
assert!(
echoed.contains("hello unified exec"),
@@ -1413,7 +1412,7 @@ PY
let second_call_id = "uexec-lag-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 2_000,
});
@@ -1480,9 +1479,9 @@ PY
let start_output = outputs
.get(first_call_id)
.expect("missing initial unified_exec output");
let session_id = start_output.session_id.unwrap_or_default();
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
session_id >= 0,
!process_id.is_empty(),
"expected session id from initial unified_exec response"
);
@@ -1524,7 +1523,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
let second_call_id = "uexec-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 800,
});
@@ -1589,7 +1588,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
let outputs = collect_tool_outputs(&bodies)?;
let first_output = outputs.get(first_call_id).expect("missing timeout output");
assert_eq!(first_output.session_id, Some(0));
assert!(first_output.process_id.is_some());
assert!(first_output.output.is_empty());
let poll_output = outputs.get(second_call_id).expect("missing poll output");
@@ -1824,7 +1823,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
let keep_write_call_id = "uexec-prune-keep-write";
let keep_write_args = serde_json::json!({
"chars": "still alive\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
events.push(ev_function_call(
@@ -1836,7 +1835,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
let probe_call_id = "uexec-prune-probe";
let probe_args = serde_json::json!({
"chars": "should fail\n",
"session_id": 1,
"session_id": 1001,
"yield_time_ms": 500,
});
events.push(ev_function_call(
@@ -1885,7 +1884,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(keep_call_id))
.expect("missing initial keep session output");
let keep_start_output = parse_unified_exec_output(&keep_start)?;
pretty_assertions::assert_eq!(keep_start_output.session_id, Some(0));
assert!(keep_start_output.process_id.is_some());
assert!(keep_start_output.exit_code.is_none());
let prune_start = requests
@@ -1893,7 +1892,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(prune_call_id))
.expect("missing initial prune session output");
let prune_start_output = parse_unified_exec_output(&prune_start)?;
pretty_assertions::assert_eq!(prune_start_output.session_id, Some(1));
assert!(prune_start_output.process_id.is_some());
assert!(prune_start_output.exit_code.is_none());
let keep_write = requests
@@ -1901,7 +1900,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(keep_write_call_id))
.expect("missing keep write output");
let keep_write_output = parse_unified_exec_output(&keep_write)?;
pretty_assertions::assert_eq!(keep_write_output.session_id, Some(0));
assert!(keep_write_output.process_id.is_some());
assert!(
keep_write_output.output.contains("still alive"),
"expected cat session to echo input, got {:?}",
@@ -1913,7 +1912,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(probe_call_id))
.expect("missing probe output");
assert!(
pruned_probe.contains("UnknownSessionId") || pruned_probe.contains("Unknown session id"),
pruned_probe.contains("UnknownSessionId") || pruned_probe.contains("Unknown process id"),
"expected probe to fail after pruning, got {pruned_probe:?}"
);