add js_repl polling and reuse unified_exec managed process substrate

This commit is contained in:
Curtis 'Fjord' Hawthorne
2026-03-12 15:57:58 -07:00
parent 650beb177e
commit e924654a60
32 changed files with 6783 additions and 1684 deletions

View File

@@ -38,13 +38,20 @@ use image::Rgba;
use image::load_from_memory;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::time::Duration;
use wiremock::BodyPrintLimit;
use wiremock::Mock;
use wiremock::MockServer;
#[cfg(not(debug_assertions))]
use wiremock::Respond;
use wiremock::ResponseTemplate;
#[cfg(not(debug_assertions))]
use wiremock::matchers::body_string_contains;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
fn image_messages(body: &Value) -> Vec<&Value> {
body.get("input")
@@ -73,6 +80,27 @@ fn find_image_message(body: &Value) -> Option<&Value> {
image_messages(body).into_iter().next()
}
fn request_body_json(request: &wiremock::Request) -> Value {
let content_encoding = request
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok());
let body = if content_encoding
.is_some_and(|value| value.split(',').any(|entry| entry.trim() == "zstd"))
{
match zstd::stream::decode_all(std::io::Cursor::new(&request.body)) {
Ok(body) => body,
Err(err) => panic!("decode zstd request body: {err}"),
}
} else {
request.body.clone()
};
match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => panic!("request body json: {err}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_turn_with_local_image_attaches_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
@@ -910,6 +938,652 @@ await codex.emitImage(out);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_tool_attaches_multiple_local_images() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "js-repl-view-image-two";
let js_input = r#"
const fs = await import("node:fs/promises");
const path = await import("node:path");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
const imagePathA = path.join(codex.tmpDir, "js-repl-view-image-a.png");
const imagePathB = path.join(codex.tmpDir, "js-repl-view-image-b.png");
await fs.writeFile(imagePathA, png);
await fs.writeFile(imagePathB, png);
const outA = await codex.tool("view_image", { path: imagePathA });
const outB = await codex.tool("view_image", { path: imagePathB });
await codex.emitImage(outA);
await codex.emitImage(outB);
console.log("attached-two-images");
"#;
let first_response = sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(call_id, "js_repl", js_input),
ev_completed("resp-1"),
]);
responses::mount_sse_once(&server, first_response).await;
let second_response = sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]);
let mock = responses::mount_sse_once(&server, second_response).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl to write two images and attach them".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: Some(ReasoningSummary::Auto),
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(10),
)
.await;
let req = mock.single_request();
let custom_output = req.custom_tool_call_output(call_id);
assert_ne!(
custom_output.get("success").and_then(Value::as_bool),
Some(false),
"js_repl call failed unexpectedly: {custom_output}"
);
let output_items = custom_output
.get("output")
.and_then(Value::as_array)
.expect("custom_tool_call_output should be a content item array");
let js_repl_output = output_items
.iter()
.find_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_text"))
.then(|| item.get("text").and_then(Value::as_str))
.flatten()
})
.expect("custom tool output text present");
assert!(
js_repl_output.contains("attached-two-images"),
"expected js_repl output marker, got {js_repl_output}"
);
let image_urls = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_urls.len(),
2,
"js_repl should include one input_image content item per nested view_image call"
);
for image_url in image_urls {
assert!(
image_url.starts_with("data:image/png;base64,"),
"expected png data URL, got {image_url}"
);
}
let body = req.body_json();
assert_eq!(
image_messages(&body).len(),
0,
"js_repl should not inject pending input image messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_tool_attaches_local_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit";
let poll_call_id = "js-repl-poll-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
await codex.emitImage(out);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
exec_id: Arc<Mutex<Option<String>>>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
*self.exec_id.lock().expect("exec id lock") = Some(exec_id.to_string());
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
let exec_id = self
.exec_id
.lock()
.expect("exec id lock")
.clone()
.expect("exec id should be recorded before polling");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2b"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2b"),
])
}
3 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
exec_id: Arc::new(Mutex::new(None)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(4)
.expect(4)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image and attach it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
4,
"expected submit, two polls, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(3)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let output_items = poll_output["output"]
.as_array()
.expect("js_repl_poll output should be a content item array");
let status_item = output_items
.first()
.expect("first poll output item should be status text");
assert_eq!(
status_item.get("type").and_then(Value::as_str),
Some("input_text")
);
let status_json: Value = serde_json::from_str(
status_item["text"]
.as_str()
.expect("status item text should be present"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
assert_eq!(status_json["logs"], Value::Null);
let image_items = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_items.len(),
1,
"expected one image item in poll output"
);
assert!(
image_items[0].starts_with("data:image/png;base64,"),
"expected png data URL, got {}",
image_items[0]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit-no-emit";
let poll_call_id = "js-repl-poll-no-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image-no-emit.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
console.log(out.type);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
exec_id: Arc<Mutex<Option<String>>>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
*self.exec_id.lock().expect("exec id lock") = Some(exec_id.to_string());
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
let exec_id = self
.exec_id
.lock()
.expect("exec id lock")
.clone()
.expect("exec id should be recorded before polling");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2b"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2b"),
])
}
3 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
exec_id: Arc::new(Mutex::new(None)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(4)
.expect(4)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image without emitting it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut tool_event = None;
wait_for_event_with_timeout(
&codex,
|event| match event {
EventMsg::ViewImageToolCall(_) => {
tool_event = Some(event.clone());
false
}
EventMsg::TurnComplete(_) => true,
_ => false,
},
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let tool_event = match tool_event {
Some(EventMsg::ViewImageToolCall(event)) => event,
other => panic!("expected ViewImageToolCall event, got {other:?}"),
};
assert!(
tool_event
.path
.ends_with("js-repl-poll-view-image-no-emit.png"),
"unexpected image path: {}",
tool_event.path.display()
);
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
4,
"expected submit, two polls, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(3)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let status_json: Value = serde_json::from_str(
poll_output["output"]
.as_str()
.expect("js_repl_poll output should stay text without explicit emit"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));