Compare commits

...

6 Commits

Author SHA1 Message Date
Friel
591d18d428 Compare forked request prefix before spawn output 2026-04-01 17:03:06 +00:00
Friel
fad58042ab Fix forked spawn output replacement 2026-04-01 16:51:15 +00:00
Friel
177a75c8b5 Assert forked prompt cache key reuse 2026-04-01 16:28:59 +00:00
Friel
9e800dc9ab Tighten forked-subagent request-prefix comparator 2026-04-01 06:01:59 +00:00
Friel
cc1b7874a7 Improve forked-subagent request-prefix regression test 2026-04-01 05:59:08 +00:00
Friel
28c2367c0a test(core): assert forked spawn request prefix stability 2026-04-01 05:39:35 +00:00
2 changed files with 139 additions and 40 deletions

View File

@@ -103,14 +103,18 @@ fn decode_body_bytes(body: &[u8], content_encoding: Option<&str>) -> Vec<u8> {
impl ResponsesRequest {
pub fn body_json(&self) -> Value {
let body = decode_body_bytes(
let body = self.decoded_body_bytes();
serde_json::from_slice(&body).unwrap()
}
pub fn decoded_body_bytes(&self) -> Vec<u8> {
decode_body_bytes(
&self.0.body,
self.0
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok()),
);
serde_json::from_slice(&body).unwrap()
)
}
pub fn body_bytes(&self) -> Vec<u8> {

View File

@@ -18,6 +18,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::time::Duration;
use tokio::time::Instant;
@@ -63,6 +64,108 @@ fn has_subagent_notification(req: &ResponsesRequest) -> bool {
.any(|text| text.contains("<subagent_notification>"))
}
fn cache_prefix_request_body(request: &ResponsesRequest, call_id: &str) -> Result<Value> {
let mut body = request.body_json();
let object = body
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("expected JSON object request body, got {body:?}"))?;
let input = object
.get_mut("input")
.and_then(Value::as_array_mut)
.ok_or_else(|| anyhow::anyhow!("expected request input array, got {object:?}"))?;
let spawn_call_index = input
.iter()
.rposition(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
.ok_or_else(|| {
anyhow::anyhow!("expected request input to include function_call {call_id}: {input:?}")
})?;
// The cache-preservation contract is only for the shared request prefix up to
// and including the forked `spawn_agent` call. The `FunctionCallOutput` for that
// call is the first legal divergence point between parent and child requests,
// so truncate immediately before it.
input.truncate(spawn_call_index + 1);
if let Some(tools) = object.get_mut("tools") {
*tools = normalize_tools_for_cache_prefix(tools);
}
Ok(body)
}
fn prompt_cache_key(request: &ResponsesRequest) -> Option<String> {
request
.body_json()
.get("prompt_cache_key")
.and_then(Value::as_str)
.map(str::to_string)
}
fn normalize_tools_for_cache_prefix(tools: &Value) -> Value {
let normalized_tools = tools
.as_array()
.unwrap_or_else(|| panic!("expected tools array: {tools:?}"))
.iter()
.filter_map(normalize_tool_for_cache_prefix)
.collect::<Vec<_>>();
Value::Array(normalized_tools)
}
fn normalize_tool_for_cache_prefix(tool: &Value) -> Option<Value> {
let mut normalized = tool
.as_object()
.unwrap_or_else(|| panic!("expected tool object: {tool:?}"))
.clone();
if normalized.get("type").and_then(Value::as_str) == Some("namespace")
&& let Some(namespace_tools) = normalized.get("tools")
{
normalized.insert(
"tools".to_string(),
normalize_namespace_tools_for_cache_prefix(namespace_tools),
);
}
if normalized
.get("defer_loading")
.and_then(Value::as_bool)
.unwrap_or(false)
&& normalized.get("type").and_then(Value::as_str) == Some("function")
{
normalized.remove("parameters");
}
Some(Value::Object(normalized))
}
fn normalize_namespace_tools_for_cache_prefix(tools: &Value) -> Value {
let normalized_tools = tools
.as_array()
.unwrap_or_else(|| panic!("expected namespace tools array: {tools:?}"))
.iter()
.filter_map(|tool| {
let tool_object = tool
.as_object()
.unwrap_or_else(|| panic!("expected namespace tool object: {tool:?}"))
.clone();
if tool_object
.get("defer_loading")
.and_then(Value::as_bool)
.unwrap_or(false)
&& tool_object.get("type").and_then(Value::as_str) == Some("function")
{
None
} else {
normalize_tool_for_cache_prefix(&Value::Object(tool_object))
}
})
.collect::<Vec<_>>();
Value::Array(normalized_tools)
}
fn tool_parameter_description(
req: &ResponsesRequest,
tool_name: &str,
@@ -328,9 +431,12 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
)
.await;
let _child_request_log = mount_sse_once_match(
let child_request_log = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, CHILD_PROMPT),
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT)
&& body_contains(req, FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
},
sse(vec![
ev_response_created("resp-child-1"),
ev_assistant_message("msg-child-1", "child done"),
@@ -339,9 +445,11 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
)
.await;
let _turn1_followup = mount_sse_once_match(
let turn1_followup = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
|req: &wiremock::Request| {
body_contains(req, SPAWN_CALL_ID) && !body_contains(req, CHILD_PROMPT)
},
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
@@ -363,18 +471,17 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
test.submit_turn(TURN_1_PROMPT).await?;
let _ = spawn_turn.single_request();
let parent_followup_requests = wait_for_requests(&turn1_followup).await?;
let parent_followup_request = parent_followup_requests
.first()
.expect("parent follow-up request should be captured");
let deadline = Instant::now() + Duration::from_secs(2);
let child_request = loop {
if let Some(request) = server
.received_requests()
.await
.unwrap_or_default()
if let Some(request) = child_request_log
.requests()
.into_iter()
.find(|request| {
body_contains(request, CHILD_PROMPT)
&& body_contains(request, FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
})
.find(|request| request.body_contains_text(CHILD_PROMPT))
{
break request;
}
@@ -383,31 +490,19 @@ async fn spawned_child_receives_forked_parent_context() -> Result<()> {
}
sleep(Duration::from_millis(10)).await;
};
assert!(body_contains(&child_request, TURN_0_FORK_PROMPT));
assert!(body_contains(&child_request, "seeded"));
let child_body = child_request
.body_json::<serde_json::Value>()
.expect("forked child request body should be json");
let function_call_output = child_body["input"]
.as_array()
.and_then(|items| {
items.iter().find(|item| {
item["type"].as_str() == Some("function_call_output")
&& item["call_id"].as_str() == Some(SPAWN_CALL_ID)
})
})
.unwrap_or_else(|| panic!("expected forked child request to include spawn_agent output"));
let (content, success) = match &function_call_output["output"] {
serde_json::Value::String(text) => (Some(text.as_str()), None),
serde_json::Value::Object(output) => (
output.get("content").and_then(serde_json::Value::as_str),
output.get("success").and_then(serde_json::Value::as_bool),
),
_ => (None, None),
};
assert_eq!(content, Some(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE));
assert_ne!(success, Some(false));
assert!(child_request.body_contains_text(TURN_0_FORK_PROMPT));
assert!(child_request.body_contains_text("seeded"));
let parent_cache_prefix = cache_prefix_request_body(parent_followup_request, SPAWN_CALL_ID)?;
let child_cache_prefix = cache_prefix_request_body(&child_request, SPAWN_CALL_ID)?;
assert_eq!(
prompt_cache_key(parent_followup_request),
prompt_cache_key(&child_request),
"forked parent and child requests must reuse the same prompt_cache_key so backend sharding can colocate them for KV cache reuse"
);
assert_eq!(
parent_cache_prefix, child_cache_prefix,
"forked child requests must preserve every cache-relevant request field and the conversation-item prefix exactly through the shared spawn_agent call; namespace shells and non-deferred tools must stay stable, while deferred namespace members may only appear after tool_search_output"
);
Ok(())
}