Compare commits

...

1 Commits

Author SHA1 Message Date
Curtis 'Fjord' Hawthorne
e924654a60 add js_repl polling and reuse unified_exec managed process substrate 2026-03-12 20:55:11 -07:00
32 changed files with 6783 additions and 1684 deletions

View File

@@ -28,6 +28,7 @@ use codex_core::sandboxing::ExecRequest;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use codex_utils_pty::ProcessHandle;
use codex_utils_pty::SpawnedProcess;
use codex_utils_pty::SpawnedProcessSplit;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
@@ -100,7 +101,7 @@ struct RunCommandParams {
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
process_id: Option<String>,
spawned: SpawnedProcess,
spawned: SpawnedCommand,
control_rx: mpsc::Receiver<CommandControlRequest>,
stream_stdin: bool,
stream_stdout_stderr: bool,
@@ -108,6 +109,11 @@ struct RunCommandParams {
output_bytes_cap: Option<usize>,
}
enum SpawnedCommand {
Merged(SpawnedProcess),
Split(SpawnedProcessSplit),
}
struct SpawnProcessOutputParams {
connection_id: ConnectionId,
process_id: Option<String>,
@@ -270,11 +276,16 @@ impl CommandExecManager {
size.unwrap_or_default(),
)
.await
} else if stream_stdin {
codex_utils_pty::spawn_pipe_process(program, args, cwd.as_path(), &env, &arg0).await
.map(SpawnedCommand::Merged)
} else {
codex_utils_pty::spawn_pipe_process_no_stdin(program, args, cwd.as_path(), &env, &arg0)
codex_utils_pty::spawn_pipe_process_split(program, args, cwd.as_path(), &env, &arg0)
.await
.map(|spawned| {
if !stream_stdin {
spawned.session.close_stdin();
}
SpawnedCommand::Split(spawned)
})
};
let spawned = match spawned {
Ok(spawned) => spawned,
@@ -462,36 +473,58 @@ async fn run_command(params: RunCommandParams) {
}
};
tokio::pin!(expiration);
let SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
tokio::pin!(exit_rx);
let mut timed_out = false;
let (stdio_timeout_tx, stdio_timeout_rx) = watch::channel(false);
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: stdout_rx,
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id,
output_rx: stderr_rx,
stdio_timeout_rx,
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stderr,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let (session, exit_rx, stdout_handle, stderr_handle) = match spawned {
SpawnedCommand::Merged(SpawnedProcess {
session,
output_rx,
exit_rx,
}) => {
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx,
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = tokio::spawn(async { String::new() });
(session, exit_rx, stdout_handle, stderr_handle)
}
SpawnedCommand::Split(SpawnedProcessSplit {
session,
stdout_rx,
stderr_rx,
exit_rx,
..
}) => {
let stdout_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id: process_id.clone(),
output_rx: stdout_rx,
stdio_timeout_rx: stdio_timeout_rx.clone(),
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stdout,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
let stderr_handle = spawn_process_output(SpawnProcessOutputParams {
connection_id: request_id.connection_id,
process_id,
output_rx: stderr_rx,
stdio_timeout_rx,
outgoing: Arc::clone(&outgoing),
stream: CommandExecOutputStream::Stderr,
stream_output: stream_stdout_stderr,
output_bytes_cap,
});
(session, exit_rx, stdout_handle, stderr_handle)
}
};
tokio::pin!(exit_rx);
let exit_code = loop {
tokio::select! {

View File

@@ -399,6 +399,9 @@
"js_repl": {
"type": "boolean"
},
"js_repl_polling": {
"type": "boolean"
},
"js_repl_tools_only": {
"type": "boolean"
},
@@ -1910,6 +1913,9 @@
"js_repl": {
"type": "boolean"
},
"js_repl_polling": {
"type": "boolean"
},
"js_repl_tools_only": {
"type": "boolean"
},

View File

@@ -87,6 +87,8 @@ pub enum Feature {
JsRepl,
/// Enable a minimal JavaScript mode backed by Node's built-in vm runtime.
CodeMode,
/// Enable js_repl polling helpers and tool.
JsReplPolling,
/// Only expose js_repl tools directly to the model.
JsReplToolsOnly,
/// Use the single unified PTY-backed exec tool.
@@ -558,6 +560,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::JsReplPolling,
key: "js_repl_polling",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::JsReplToolsOnly,
key: "js_repl_tools_only",

View File

@@ -1,6 +1,5 @@
use crate::features::Feature;
use crate::features::Features;
use codex_protocol::models::ImageDetail;
use codex_protocol::openai_models::ModelInfo;
pub(crate) fn can_request_original_image_detail(
@@ -10,19 +9,6 @@ pub(crate) fn can_request_original_image_detail(
model_info.supports_image_detail_original && features.enabled(Feature::ImageDetailOriginal)
}
pub(crate) fn normalize_output_image_detail(
features: &Features,
model_info: &ModelInfo,
detail: Option<ImageDetail>,
) -> Option<ImageDetail> {
match detail {
Some(ImageDetail::Original) if can_request_original_image_detail(features, model_info) => {
Some(ImageDetail::Original)
}
Some(ImageDetail::Original) | Some(_) | None => None,
}
}
#[cfg(test)]
#[path = "original_image_detail_tests.rs"]
mod tests;

View File

@@ -3,61 +3,21 @@ use super::*;
use crate::config::test_config;
use crate::features::Features;
use crate::models_manager::manager::ModelsManager;
use pretty_assertions::assert_eq;
#[test]
fn image_detail_original_feature_enables_explicit_original_without_force() {
fn image_detail_original_requires_feature_and_model_support() {
let config = test_config();
let mut model_info =
ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
model_info.supports_image_detail_original = true;
let mut features = Features::with_defaults();
features.enable(Feature::ImageDetailOriginal);
assert!(can_request_original_image_detail(&features, &model_info));
assert_eq!(
normalize_output_image_detail(&features, &model_info, Some(ImageDetail::Original)),
Some(ImageDetail::Original)
);
assert_eq!(
normalize_output_image_detail(&features, &model_info, None),
None
);
}
#[test]
fn explicit_original_is_dropped_without_feature_or_model_support() {
let config = test_config();
let mut model_info =
ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
model_info.supports_image_detail_original = true;
let features = Features::with_defaults();
assert_eq!(
normalize_output_image_detail(&features, &model_info, Some(ImageDetail::Original)),
None
);
let mut features = Features::with_defaults();
features.enable(Feature::ImageDetailOriginal);
model_info.supports_image_detail_original = false;
assert_eq!(
normalize_output_image_detail(&features, &model_info, Some(ImageDetail::Original)),
None
);
}
#[test]
fn unsupported_non_original_detail_is_dropped() {
let config = test_config();
let mut model_info =
ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
model_info.supports_image_detail_original = true;
assert!(!can_request_original_image_detail(&features, &model_info));
let mut features = Features::with_defaults();
features.enable(Feature::ImageDetailOriginal);
assert!(can_request_original_image_detail(&features, &model_info));
assert_eq!(
normalize_output_image_detail(&features, &model_info, Some(ImageDetail::Low)),
None
);
model_info.supports_image_detail_original = false;
assert!(!can_request_original_image_detail(&features, &model_info));
}

View File

@@ -59,7 +59,7 @@ fn render_js_repl_instructions(config: &Config) -> Option<String> {
);
section.push_str("- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n");
section.push_str("- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n");
section.push_str("- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells, but async callbacks that fire after a cell finishes still fail because no exec is active.\n");
section.push_str("- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells. In non-polling mode, async callbacks that fire after a cell finishes still fail because no exec is active. In polling mode, async callbacks and timers scheduled during a polled exec keep that exec active until they settle or are cleared.\n");
section.push_str("- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n");
section.push_str("- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n");
section.push_str("- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n");

View File

@@ -178,7 +178,7 @@ async fn js_repl_instructions_are_appended_when_enabled() {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells, but async callbacks that fire after a cell finishes still fail because no exec is active.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells. In non-polling mode, async callbacks that fire after a cell finishes still fail because no exec is active. In polling mode, async callbacks and timers scheduled during a polled exec keep that exec active until they settle or are cleared.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
}
@@ -197,7 +197,7 @@ async fn js_repl_tools_only_instructions_are_feature_gated() {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells, but async callbacks that fire after a cell finishes still fail because no exec is active.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n- MCP tools (if any) can also be called by name via `codex.tool(...)`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells. In non-polling mode, async callbacks that fire after a cell finishes still fail because no exec is active. In polling mode, async callbacks and timers scheduled during a polled exec keep that exec active until they settle or are cleared.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n- MCP tools (if any) can also be called by name via `codex.tool(...)`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
}
@@ -216,7 +216,7 @@ async fn js_repl_image_detail_original_does_not_change_instructions() {
let res = get_user_instructions(&cfg, None, None)
.await
.expect("js_repl instructions expected");
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells, but async callbacks that fire after a cell finishes still fail because no exec is active.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.cwd`, `codex.homeDir`, `codex.tmpDir`, `codex.tool(name, args?)`, and `codex.emitImage(imageLike)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike. Nested tool outputs stay inside JavaScript unless you emit them explicitly.\n- `codex.emitImage(...)` adds one image to the outer `js_repl` function output each time you call it, so you can call it multiple times to emit multiple images. It accepts a data URL, a single `input_image` item, an object like `{ bytes, mimeType }`, or a raw tool response object with exactly one image and no text. It rejects mixed text-and-image content.\n- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells. In non-polling mode, async callbacks that fire after a cell finishes still fail because no exec is active. In polling mode, async callbacks and timers scheduled during a polled exec keep that exec active until they settle or are cleared.\n- Request full-resolution image processing with `detail: \"original\"` only when the `view_image` tool schema includes a `detail` argument. The same availability applies to `codex.emitImage(...)`: if `view_image.detail` is present, you may also pass `detail: \"original\"` there. Use this when high-fidelity image perception or precise localization is needed, especially for CUA agents.\n- Example of sharing an in-memory Playwright screenshot: `await codex.emitImage({ bytes: await page.screenshot({ type: \"jpeg\", quality: 85 }), mimeType: \"image/jpeg\", detail: \"original\" })`.\n- Example of sharing a local image tool result: `await codex.emitImage(codex.tool(\"view_image\", { path: \"/absolute/path\", detail: \"original\" }))`.\n- When encoding an image to send with `codex.emitImage(...)` or `view_image`, prefer JPEG at about 85 quality when lossy compression is acceptable; use PNG when transparency or lossless detail matters. Smaller uploads are faster and less likely to hit size limits.\n- Top-level bindings persist across cells. If a cell throws, prior bindings remain available and bindings that finished initializing before the throw often remain usable in later cells. For code you plan to reuse across cells, prefer declaring or assigning it in direct top-level statements before operations that might throw. If you hit `SyntaxError: Identifier 'x' has already been declared`, first reuse the existing binding, reassign a previously declared `let`, or pick a new descriptive name. Use `{ ... }` only for a short temporary block when you specifically need local scratch names; do not wrap an entire cell in block scope if you want those names reusable later. Reset the kernel with `js_repl_reset` only when you need a clean state.\n- Top-level static import declarations (for example `import x from \"./file.js\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")`, `await import(\"./file.js\")`, or `await import(\"/abs/path/file.mjs\")` instead. Imported local files must be ESM `.js`/`.mjs` files and run in the same REPL VM context. Bare package imports always resolve from REPL-global search roots (`CODEX_JS_REPL_NODE_MODULE_DIRS`, then cwd), not relative to the imported file location. Local files may statically import only other local relative/absolute/`file://` `.js`/`.mjs` files; package and builtin imports from local files must stay dynamic. `import.meta.resolve()` returns importable strings such as `file://...`, bare package names, and `node:...` specifiers. Local file modules reload between execs, while top-level bindings persist until `js_repl_reset`.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`, `codex.tool(...)`, and `codex.emitImage(...)`.";
assert_eq!(res, expected);
}

View File

@@ -1,11 +1,9 @@
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value as JsonValue;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::protocol::ExecCommandSource;
@@ -14,44 +12,35 @@ use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventFailure;
use crate::tools::events::ToolEventStage;
use crate::tools::handlers::parse_arguments;
use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE;
use crate::tools::js_repl::JS_REPL_PRAGMA_PREFIX;
use crate::tools::js_repl::JsExecPollResult;
use crate::tools::js_repl::JsReplArgs;
use crate::tools::js_repl::JsReplExecuteError;
use crate::tools::js_repl::emit_js_repl_exec_end;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use codex_protocol::models::FunctionCallOutputContentItem;
pub struct JsReplHandler;
pub struct JsReplResetHandler;
pub struct JsReplPollHandler;
fn join_outputs(stdout: &str, stderr: &str) -> String {
if stdout.is_empty() {
stderr.to_string()
} else if stderr.is_empty() {
stdout.to_string()
} else {
format!("{stdout}\n{stderr}")
}
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct JsReplPollArgs {
exec_id: String,
#[serde(default)]
yield_time_ms: Option<u64>,
}
fn build_js_repl_exec_output(
output: &str,
error: Option<&str>,
duration: Duration,
) -> ExecToolCallOutput {
let stdout = output.to_string();
let stderr = error.unwrap_or("").to_string();
let aggregated_output = join_outputs(&stdout, &stderr);
ExecToolCallOutput {
exit_code: if error.is_some() { 1 } else { 0 },
stdout: StreamOutput::new(stdout),
stderr: StreamOutput::new(stderr),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out: false,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct JsReplResetArgs {
#[serde(default)]
session_id: Option<String>,
}
async fn emit_js_repl_exec_begin(
@@ -69,29 +58,6 @@ async fn emit_js_repl_exec_begin(
emitter.emit(ctx, ToolEventStage::Begin).await;
}
async fn emit_js_repl_exec_end(
session: &crate::codex::Session,
turn: &crate::codex::TurnContext,
call_id: &str,
output: &str,
error: Option<&str>,
duration: Duration,
) {
let exec_output = build_js_repl_exec_output(output, error, duration);
let emitter = ToolEmitter::shell(
vec!["js_repl".to_string()],
turn.cwd.clone(),
ExecCommandSource::Agent,
false,
);
let ctx = ToolEventCtx::new(session, turn, call_id, None);
let stage = if error.is_some() {
ToolEventStage::Failure(ToolEventFailure::Output(exec_output))
} else {
ToolEventStage::Success(exec_output)
};
emitter.emit(ctx, stage).await;
}
#[async_trait]
impl ToolHandler for JsReplHandler {
type Output = FunctionToolOutput;
@@ -125,22 +91,90 @@ impl ToolHandler for JsReplHandler {
let args = match payload {
ToolPayload::Function { arguments } => parse_arguments(&arguments)?,
ToolPayload::Custom { input } => parse_freeform_args(&input)?,
ToolPayload::Custom { input } => {
parse_freeform_args(&input, session.features().enabled(Feature::JsReplPolling))?
}
_ => {
return Err(FunctionCallError::RespondToModel(
"js_repl expects custom or function payload".to_string(),
));
}
};
if args.poll {
if args
.session_id
.as_deref()
.is_some_and(|session_id| session_id.trim().is_empty())
{
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
if args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(),
));
}
if !session.features().enabled(Feature::JsReplPolling) {
return Err(FunctionCallError::RespondToModel(
"js_repl polling is disabled by feature flag".to_string(),
));
}
} else if args.session_id.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id is only supported when poll=true".to_string(),
));
}
let manager = turn.js_repl.manager().await?;
let started_at = Instant::now();
emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await;
if args.poll {
let submission = Arc::clone(&manager)
.submit(
Arc::clone(&session),
Arc::clone(&turn),
tracker,
call_id.clone(),
args,
)
.await;
let submission = match submission {
Ok(submission) => submission,
Err(err) => {
let message = err.to_string();
emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
&call_id,
"",
Some(&message),
started_at.elapsed(),
false,
)
.await;
return Err(err);
}
};
let content = serde_json::to_string(&serde_json::json!({
"exec_id": submission.exec_id,
"session_id": submission.session_id,
"status": "running",
}))
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize js_repl submission result: {err}"
))
})?;
return Ok(FunctionToolOutput::from_text(content, Some(true)));
}
let result = manager
.execute(Arc::clone(&session), Arc::clone(&turn), tracker, args)
.await;
let result = match result {
Ok(result) => result,
Err(err) => {
let timed_out = matches!(err, JsReplExecuteError::TimedOut);
let message = err.to_string();
emit_js_repl_exec_end(
session.as_ref(),
@@ -149,9 +183,10 @@ impl ToolHandler for JsReplHandler {
"",
Some(&message),
started_at.elapsed(),
timed_out,
)
.await;
return Err(err);
return Err(err.into());
}
};
@@ -171,6 +206,7 @@ impl ToolHandler for JsReplHandler {
&content,
None,
started_at.elapsed(),
false,
)
.await;
@@ -191,21 +227,149 @@ impl ToolHandler for JsReplResetHandler {
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
if !invocation.session.features().enabled(Feature::JsRepl) {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
if !session.features().enabled(Feature::JsRepl) {
return Err(FunctionCallError::RespondToModel(
"js_repl is disabled by feature flag".to_string(),
));
}
let manager = invocation.turn.js_repl.manager().await?;
manager.reset().await?;
Ok(FunctionToolOutput::from_text(
"js_repl kernel reset".to_string(),
Some(true),
))
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(
"js_repl_reset expects function payload".to_string(),
));
};
let args: JsReplResetArgs = parse_arguments(&arguments)?;
let manager = turn.js_repl.manager().await?;
let content = if let Some(session_id) = args.session_id {
if session_id.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
manager.reset_session(&session_id).await?;
serde_json::to_string(&serde_json::json!({
"status": "reset",
"session_id": session_id,
}))
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}"))
})?
} else {
manager.reset().await?;
serde_json::to_string(&serde_json::json!({
"status": "reset_all",
}))
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}"))
})?
};
Ok(FunctionToolOutput::from_text(content, Some(true)))
}
}
fn parse_freeform_args(input: &str) -> Result<JsReplArgs, FunctionCallError> {
#[async_trait]
impl ToolHandler for JsReplPollHandler {
type Output = FunctionToolOutput;
fn kind(&self) -> ToolKind {
ToolKind::Function
}
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
if !session.features().enabled(Feature::JsRepl) {
return Err(FunctionCallError::RespondToModel(
"js_repl is disabled by feature flag".to_string(),
));
}
if !session.features().enabled(Feature::JsReplPolling) {
return Err(FunctionCallError::RespondToModel(
"js_repl polling is disabled by feature flag".to_string(),
));
}
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(
"js_repl_poll expects function payload".to_string(),
));
};
let args: JsReplPollArgs = parse_arguments(&arguments)?;
let manager = turn.js_repl.manager().await?;
let result = manager
.poll(
&args.exec_id,
args.yield_time_ms.map(|yield_time_ms| {
yield_time_ms.max(crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS)
}),
)
.await?;
format_poll_output(&result)
}
}
#[derive(Default)]
struct ParsedJsReplPragma {
timeout_ms: Option<u64>,
poll: Option<bool>,
session_id: Option<String>,
}
fn format_poll_output(result: &JsExecPollResult) -> Result<FunctionToolOutput, FunctionCallError> {
let status = if result.done {
if result.error.is_some() {
"error"
} else {
"completed"
}
} else {
"running"
};
let logs = if result.logs.is_empty() {
None
} else {
Some(result.logs.join("\n"))
};
let payload = serde_json::json!({
"exec_id": result.exec_id,
"session_id": result.session_id,
"status": status,
"logs": logs,
"final_output": result.final_output,
"error": result.error,
});
let content = serde_json::to_string(&payload).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize js_repl poll result: {err}"))
})?;
let output = if result.content_items.is_empty() {
FunctionToolOutput::from_text(content, Some(true))
} else {
let mut items = Vec::with_capacity(result.content_items.len() + 1);
items.push(FunctionCallOutputContentItem::InputText { text: content });
items.extend(result.content_items.clone());
FunctionToolOutput::from_content(items, Some(true))
};
Ok(output)
}
fn parse_freeform_args(
input: &str,
polling_enabled: bool,
) -> Result<JsReplArgs, FunctionCallError> {
if input.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl expects raw JavaScript tool input (non-empty). Provide JS source text, optionally with first-line `// codex-js-repl: ...`."
@@ -213,61 +377,119 @@ fn parse_freeform_args(input: &str) -> Result<JsReplArgs, FunctionCallError> {
));
}
let mut args = JsReplArgs {
code: input.to_string(),
timeout_ms: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(JS_REPL_PRAGMA_PREFIX) else {
reject_json_or_quoted_source(&args.code)?;
return Ok(args);
reject_json_or_quoted_source(input)?;
return Ok(JsReplArgs {
code: input.to_string(),
timeout_ms: None,
poll: false,
session_id: None,
});
};
let mut timeout_ms: Option<u64> = None;
let directive = pragma.trim();
if !directive.is_empty() {
for token in directive.split_whitespace() {
let (key, value) = token.split_once('=').ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms); got `{token}`"
))
})?;
match key {
"timeout_ms" => {
if timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies timeout_ms more than once".to_string(),
));
}
let parsed = value.parse::<u64>().map_err(|_| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma timeout_ms must be an integer; got `{value}`"
))
})?;
timeout_ms = Some(parsed);
}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma only supports timeout_ms; got `{key}`"
)));
}
}
}
}
if rest.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma must be followed by JavaScript source on subsequent lines".to_string(),
));
}
let pragma_args = parse_js_repl_pragma(pragma.trim(), polling_enabled)?;
reject_json_or_quoted_source(rest)?;
args.code = rest.to_string();
args.timeout_ms = timeout_ms;
let args = JsReplArgs {
code: rest.to_string(),
timeout_ms: pragma_args.timeout_ms,
poll: pragma_args.poll.unwrap_or(false),
session_id: pragma_args.session_id,
};
if args.session_id.is_some() && !args.poll {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id is only supported when poll=true".to_string(),
));
}
if args.poll && args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(),
));
}
Ok(args)
}
fn parse_js_repl_pragma(
directive: &str,
polling_enabled: bool,
) -> Result<ParsedJsReplPragma, FunctionCallError> {
let mut args = ParsedJsReplPragma::default();
let supported_keys = if polling_enabled {
"timeout_ms, poll, session_id"
} else {
"timeout_ms"
};
for token in directive
.split(|c: char| c.is_ascii_whitespace() || c == ',')
.filter(|token| !token.is_empty())
{
let (key, value) = token.split_once('=').ok_or_else(|| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma expects space-separated key=value pairs (supported keys: {supported_keys}); got `{token}`"
))
})?;
match key {
"timeout_ms" => {
if args.timeout_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies timeout_ms more than once".to_string(),
));
}
let parsed = value.parse::<u64>().map_err(|_| {
FunctionCallError::RespondToModel(format!(
"js_repl pragma timeout_ms must be an integer; got `{value}`"
))
})?;
args.timeout_ms = Some(parsed);
}
"poll" => {
if args.poll.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies poll more than once".to_string(),
));
}
let parsed = match value.to_ascii_lowercase().as_str() {
"true" => true,
"false" => false,
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma poll must be true or false; got `{value}`"
)));
}
};
args.poll = Some(parsed);
}
"session_id" => {
if args.session_id.is_some() {
return Err(FunctionCallError::RespondToModel(
"js_repl pragma specifies session_id more than once".to_string(),
));
}
if value.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"js_repl session_id must not be empty".to_string(),
));
}
args.session_id = Some(value.to_string());
}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"js_repl pragma only supports {supported_keys}; got `{key}`"
)));
}
}
}
Ok(args)
}
@@ -292,5 +514,327 @@ fn reject_json_or_quoted_source(code: &str) -> Result<(), FunctionCallError> {
}
#[cfg(test)]
#[path = "js_repl_tests.rs"]
mod tests;
mod tests {
use std::time::Duration;
use super::format_poll_output;
use super::parse_freeform_args;
use crate::codex::make_session_and_context_with_rx;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandSource;
use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE;
use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE;
use crate::tools::js_repl::JsExecPollResult;
use codex_protocol::models::FunctionCallOutputContentItem;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn parse_freeform_args_without_pragma() {
let args = parse_freeform_args("console.log('ok');", true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_with_pragma() {
let input = "// codex-js-repl: timeout_ms=15000\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, Some(15_000));
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_with_poll() {
let input = "// codex-js-repl: poll=true\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_rejects_timeout_ms_when_poll_true() {
let input = "// codex-js-repl: poll=true timeout_ms=15000\nconsole.log('ok');";
let err = parse_freeform_args(input, true).expect_err("expected error");
assert_eq!(err.to_string(), JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE);
}
#[test]
fn parse_freeform_args_with_poll_and_session_id() {
let input = "// codex-js-repl: poll=true session_id=my-session\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id.as_deref(), Some("my-session"));
}
#[test]
fn parse_freeform_args_with_comma_separated_poll_and_session_id() {
let input = "// codex-js-repl: poll=true, session_id=my-session\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, None);
assert!(args.poll);
assert_eq!(args.session_id.as_deref(), Some("my-session"));
}
#[test]
fn parse_freeform_args_with_comma_separated_pragma_without_spaces() {
let input = "// codex-js-repl: timeout_ms=15000,poll=false\nconsole.log('ok');";
let args = parse_freeform_args(input, true).expect("parse args");
assert_eq!(args.code, "console.log('ok');");
assert_eq!(args.timeout_ms, Some(15_000));
assert!(!args.poll);
assert_eq!(args.session_id, None);
}
#[test]
fn parse_freeform_args_rejects_session_id_without_poll() {
let input = "// codex-js-repl: session_id=my-session\nconsole.log('ok');";
let err = parse_freeform_args(input, true).expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl session_id is only supported when poll=true"
);
}
#[test]
fn parse_freeform_args_rejects_unknown_key() {
let err = parse_freeform_args("// codex-js-repl: nope=1\nconsole.log('ok');", true)
.expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl pragma only supports timeout_ms, poll, session_id; got `nope`"
);
}
#[test]
fn parse_freeform_args_rejects_duplicate_poll() {
let err = parse_freeform_args(
"// codex-js-repl: poll=true poll=false\nconsole.log('ok');",
true,
)
.expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl pragma specifies poll more than once"
);
}
#[test]
fn parse_freeform_args_rejects_json_wrapped_code() {
let err =
parse_freeform_args(r#"{"code":"await doThing()"}"#, true).expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl is a freeform tool and expects raw JavaScript source. Resend plain JS only (optional first line `// codex-js-repl: ...`); do not send JSON (`{\"code\":...}`), quoted code, or markdown fences."
);
}
#[test]
fn parse_freeform_args_hides_polling_keys_when_polling_disabled() {
let err = parse_freeform_args("// codex-js-repl: nope=1\nconsole.log('ok');", false)
.expect_err("expected error");
assert_eq!(
err.to_string(),
"js_repl pragma only supports timeout_ms; got `nope`"
);
}
#[test]
fn format_poll_output_serializes_logs_in_json_payload() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: vec!["line 1".to_string(), "line 2".to_string()],
final_output: None,
content_items: Vec::new(),
error: None,
done: false,
};
let output = format_poll_output(&result).expect("format poll output");
assert_eq!(output.success, Some(true));
assert_eq!(output.body.len(), 1);
let FunctionCallOutputContentItem::InputText { text: content } = &output.body[0] else {
panic!("expected text poll output");
};
let payload: serde_json::Value = serde_json::from_str(content).expect("valid json payload");
assert_eq!(
payload,
json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "running",
"logs": "line 1\nline 2",
"final_output": null,
"error": null,
})
);
}
#[test]
fn format_poll_output_preserves_empty_final_output() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: Vec::new(),
final_output: Some(String::new()),
content_items: Vec::new(),
error: None,
done: true,
};
let output = format_poll_output(&result).expect("format poll output");
assert_eq!(output.success, Some(true));
assert_eq!(output.body.len(), 1);
let FunctionCallOutputContentItem::InputText { text: content } = &output.body[0] else {
panic!("expected text poll output");
};
let payload: serde_json::Value = serde_json::from_str(content).expect("valid json payload");
assert_eq!(
payload,
json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "completed",
"logs": null,
"final_output": "",
"error": null,
})
);
}
#[test]
fn format_poll_output_serializes_multimodal_content_items() {
let result = JsExecPollResult {
exec_id: "exec-1".to_string(),
session_id: "session-1".to_string(),
logs: Vec::new(),
final_output: Some("stdout".to_string()),
content_items: vec![FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,abc".to_string(),
detail: None,
}],
error: None,
done: true,
};
let output = format_poll_output(&result).expect("format poll output");
assert_eq!(output.success, Some(true));
let items = output.body;
assert_eq!(
items,
vec![
FunctionCallOutputContentItem::InputText {
text: json!({
"exec_id": "exec-1",
"session_id": "session-1",
"status": "completed",
"logs": null,
"final_output": "stdout",
"error": null,
})
.to_string(),
},
FunctionCallOutputContentItem::InputImage {
image_url: "data:image/png;base64,abc".to_string(),
detail: None,
},
]
);
}
#[test]
fn js_repl_poll_args_reject_unknown_fields() {
let err = serde_json::from_str::<super::JsReplPollArgs>(
r#"{"exec_id":"exec-1","unknown":"value"}"#,
)
.expect_err("expected unknown-field deserialization error");
assert!(
err.to_string().contains("unknown field `unknown`"),
"unexpected deserialization error: {err}"
);
}
#[tokio::test]
async fn emit_js_repl_exec_end_sends_event() {
let (session, turn, rx) = make_session_and_context_with_rx().await;
super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-1").await;
super::emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
"call-1",
"hello",
None,
Duration::from_millis(12),
false,
)
.await;
let event = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let event = rx.recv().await.expect("event");
if let EventMsg::ExecCommandEnd(end) = event.msg {
break end;
}
}
})
.await
.expect("timed out waiting for exec end");
assert_eq!(event.call_id, "call-1");
assert_eq!(event.turn_id, turn.sub_id);
assert_eq!(event.command, vec!["js_repl".to_string()]);
assert_eq!(event.cwd, turn.cwd);
assert_eq!(event.source, ExecCommandSource::Agent);
assert_eq!(event.interaction_input, None);
assert_eq!(event.stdout, "hello");
assert_eq!(event.stderr, "");
assert!(event.aggregated_output.contains("hello"));
assert_eq!(event.exit_code, 0);
assert_eq!(event.duration, Duration::from_millis(12));
assert!(event.formatted_output.contains("hello"));
assert!(!event.formatted_output.contains("command timed out after"));
assert!(!event.parsed_cmd.is_empty());
}
#[tokio::test]
async fn emit_js_repl_exec_end_sends_timed_out_event() {
let (session, turn, rx) = make_session_and_context_with_rx().await;
super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-timeout").await;
super::emit_js_repl_exec_end(
session.as_ref(),
turn.as_ref(),
"call-timeout",
"",
Some(JS_REPL_TIMEOUT_ERROR_MESSAGE),
Duration::from_millis(50),
true,
)
.await;
let event = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let event = rx.recv().await.expect("event");
if let EventMsg::ExecCommandEnd(end) = event.msg {
break end;
}
}
})
.await
.expect("timed out waiting for exec end");
assert_eq!(event.call_id, "call-timeout");
assert!(
event
.formatted_output
.contains("command timed out after 50 milliseconds")
);
assert!(!event.parsed_cmd.is_empty());
}
}

View File

@@ -40,6 +40,7 @@ use codex_protocol::protocol::AskForApproval;
pub use dynamic::DynamicToolHandler;
pub use grep_files::GrepFilesHandler;
pub use js_repl::JsReplHandler;
pub use js_repl::JsReplPollHandler;
pub use js_repl::JsReplResetHandler;
pub use list_dir::ListDirHandler;
pub use mcp::McpHandler;

View File

@@ -7,6 +7,7 @@ const { AsyncLocalStorage } = require("node:async_hooks");
const crypto = require("node:crypto");
const fs = require("node:fs");
const { builtinModules, createRequire } = require("node:module");
const { createInterface } = require("node:readline");
const { performance } = require("node:perf_hooks");
const path = require("node:path");
const { URL, URLSearchParams, fileURLToPath, pathToFileURL } = require(
@@ -60,14 +61,14 @@ if (typeof performance !== "undefined") {
context.performance = performance;
}
context.crypto = crypto.webcrypto ?? crypto;
context.setTimeout = setTimeout;
context.clearTimeout = clearTimeout;
context.setInterval = setInterval;
context.clearInterval = clearInterval;
context.setTimeout = setTimeoutWithTracking;
context.clearTimeout = clearTrackedTimeout;
context.setInterval = setIntervalWithTracking;
context.clearInterval = clearTrackedInterval;
context.queueMicrotask = queueMicrotask;
if (typeof setImmediate !== "undefined") {
context.setImmediate = setImmediate;
context.clearImmediate = clearImmediate;
context.setImmediate = setImmediateWithTracking;
context.clearImmediate = clearTrackedImmediate;
}
context.atob = (data) => Buffer.from(data, "base64").toString("binary");
context.btoa = (data) => Buffer.from(data, "binary").toString("base64");
@@ -128,6 +129,9 @@ const pendingEmitImage = new Map();
let toolCounter = 0;
let emitImageCounter = 0;
const execContextStorage = new AsyncLocalStorage();
const trackedTimerCancels = new Map();
const trackedImmediateCancels = new Map();
const trackedIntervalCancels = new Map();
const cwd = process.cwd();
const tmpDir = process.env.CODEX_JS_TMP_DIR || cwd;
const homeDir = process.env.HOME ?? null;
@@ -1126,12 +1130,165 @@ function sendFatalExecResultSync(kind, error) {
function getCurrentExecState() {
const execState = execContextStorage.getStore();
if (!execState || typeof execState.id !== "string" || !execState.id) {
if (
!execState ||
typeof execState.id !== "string" ||
!execState.id ||
execState.closed
) {
throw new Error("js_repl exec context not found");
}
return execState;
}
function trackExecBackgroundTask(
execState,
operation,
observation = { observed: false },
) {
const trackedOperation = Promise.resolve(operation).then(
() => ({ ok: true, error: null, observation }),
(error) => ({ ok: false, error, observation }),
);
execState.pendingBackgroundTasks.add(trackedOperation);
return trackedOperation;
}
function clearTrackedHandle(handle, cleanupMap, fallbackClear) {
const cancel = cleanupMap.get(handle);
if (cancel) {
cleanupMap.delete(handle);
cancel();
return;
}
fallbackClear(handle);
}
function scheduleTrackedOneShot(
execState,
cleanupMap,
scheduler,
fallbackClear,
callback,
args,
) {
let handle;
let settled = false;
const operation = new Promise((resolve, reject) => {
const settle = (fn, value) => {
if (settled) {
return;
}
settled = true;
cleanupMap.delete(handle);
fn(value);
};
const wrapped = () => {
Promise.resolve()
.then(() => callback(...args))
.then(
() => settle(resolve),
(error) => settle(reject, error),
);
};
handle = scheduler(wrapped);
cleanupMap.set(handle, () => {
fallbackClear(handle);
settle(resolve);
});
});
trackExecBackgroundTask(execState, operation);
return handle;
}
function setTimeoutWithTracking(callback, delay, ...args) {
const execState = execContextStorage.getStore();
if (typeof callback !== "function" || !execState?.trackAsyncCallbacks) {
return setTimeout(callback, delay, ...args);
}
return scheduleTrackedOneShot(
execState,
trackedTimerCancels,
(wrapped) => setTimeout(wrapped, delay),
clearTimeout,
callback,
args,
);
}
function clearTrackedTimeout(handle) {
clearTrackedHandle(handle, trackedTimerCancels, clearTimeout);
}
function setImmediateWithTracking(callback, ...args) {
if (typeof setImmediate === "undefined") {
throw new Error("setImmediate is not available in this runtime");
}
const execState = execContextStorage.getStore();
if (typeof callback !== "function" || !execState?.trackAsyncCallbacks) {
return setImmediate(callback, ...args);
}
return scheduleTrackedOneShot(
execState,
trackedImmediateCancels,
(wrapped) => setImmediate(wrapped),
clearImmediate,
callback,
args,
);
}
function clearTrackedImmediate(handle) {
if (typeof clearImmediate !== "undefined") {
clearTrackedHandle(handle, trackedImmediateCancels, clearImmediate);
}
}
function setIntervalWithTracking(callback, delay, ...args) {
const execState = execContextStorage.getStore();
if (typeof callback !== "function" || !execState?.trackAsyncCallbacks) {
return setInterval(callback, delay, ...args);
}
let handle;
let settled = false;
let rejectOperation;
let resolveOperation;
const operation = new Promise((resolve, reject) => {
resolveOperation = resolve;
rejectOperation = reject;
});
trackExecBackgroundTask(execState, operation);
const settle = (fn, value) => {
if (settled) {
return;
}
settled = true;
trackedIntervalCancels.delete(handle);
fn(value);
};
handle = setInterval(() => {
Promise.resolve()
.then(() => callback(...args))
.catch((error) => {
clearInterval(handle);
settle(rejectOperation, error);
});
}, delay);
trackedIntervalCancels.set(handle, () => {
clearInterval(handle);
settle(resolveOperation);
});
return handle;
}
function clearTrackedInterval(handle) {
clearTrackedHandle(handle, trackedIntervalCancels, clearInterval);
}
function scheduleFatalExit(kind, error) {
if (fatalExitScheduled) {
process.exitCode = 1;
@@ -1163,29 +1320,40 @@ function formatLog(args) {
.join(" ");
}
function withCapturedConsole(ctx, fn) {
const logs = [];
function withCapturedConsole(ctx, onLog, captureLogs, fn) {
const logs = captureLogs ? [] : null;
const original = ctx.console ?? console;
function record(line) {
if (logs) {
logs.push(line);
}
if (onLog) onLog(line);
}
const captured = {
...original,
log: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
info: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
warn: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
error: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
debug: (...args) => {
logs.push(formatLog(args));
const line = formatLog(args);
record(line);
},
};
ctx.console = captured;
return fn(logs).finally(() => {
return fn(logs ?? []).finally(() => {
ctx.console = original;
});
}
@@ -1219,15 +1387,20 @@ function encodeByteImage(bytes, mimeType, detail) {
}
function parseImageDetail(detail) {
if (detail == null) {
if (typeof detail === "undefined") {
return undefined;
}
if (typeof detail !== "string" || !detail) {
throw new Error("codex.emitImage expected detail to be a non-empty string");
}
if (detail !== "original") {
if (
detail !== "auto" &&
detail !== "low" &&
detail !== "high" &&
detail !== "original"
) {
throw new Error(
'codex.emitImage only supports detail "original"; omit detail for default behavior',
'codex.emitImage expected detail to be one of "auto", "low", "high", or "original"',
);
}
return detail;
@@ -1459,7 +1632,7 @@ const codex = {
argumentsJson = JSON.stringify(args);
}
return new Promise((resolve, reject) => {
const operation = new Promise((resolve, reject) => {
const payload = {
type: "run_tool",
id,
@@ -1476,6 +1649,23 @@ const codex = {
resolve(res.response);
});
});
const observation = { observed: false };
trackExecBackgroundTask(execState, operation, observation);
return {
then(onFulfilled, onRejected) {
observation.observed = true;
return operation.then(onFulfilled, onRejected);
},
catch(onRejected) {
observation.observed = true;
return operation.catch(onRejected);
},
finally(onFinally) {
observation.observed = true;
return operation.finally(onFinally);
},
};
},
emitImage(imageLike) {
let execState;
@@ -1517,11 +1707,7 @@ const codex = {
})();
const observation = { observed: false };
const trackedOperation = operation.then(
() => ({ ok: true, error: null, observation }),
(error) => ({ ok: false, error, observation }),
);
execState.pendingBackgroundTasks.add(trackedOperation);
trackExecBackgroundTask(execState, operation, observation);
return {
then(onFulfilled, onRejected) {
observation.observed = true;
@@ -1544,6 +1730,8 @@ async function handleExec(message) {
activeExecId = message.id;
const execState = {
id: message.id,
closed: false,
trackAsyncCallbacks: Boolean(message.stream_logs),
pendingBackgroundTasks: new Set(),
};
@@ -1568,6 +1756,7 @@ async function handleExec(message) {
try {
const code = typeof message.code === "string" ? message.code : "";
const streamLogs = Boolean(message.stream_logs);
const builtSource = await buildModuleSource(code);
const source = builtSource.source;
currentBindings = builtSource.currentBindings;
@@ -1579,7 +1768,11 @@ async function handleExec(message) {
context.tmpDir = tmpDir;
await execContextStorage.run(execState, async () => {
await withCapturedConsole(context, async (logs) => {
await withCapturedConsole(
context,
streamLogs ? (line) => send({ type: "exec_log", id: message.id, text: line }) : null,
!streamLogs,
async (logs) => {
const cellIdentifier = path.join(
cwd,
`.codex_js_repl_cell_${cellCounter++}.mjs`,
@@ -1623,10 +1816,14 @@ async function handleExec(message) {
moduleLinked = true;
await module.evaluate();
if (execState.pendingBackgroundTasks.size > 0) {
const backgroundResults = await Promise.all([
while (execState.pendingBackgroundTasks.size > 0) {
const pendingBackgroundTasks = [
...execState.pendingBackgroundTasks,
]);
];
const backgroundResults = await Promise.all(pendingBackgroundTasks);
for (const task of pendingBackgroundTasks) {
execState.pendingBackgroundTasks.delete(task);
}
const firstUnhandledBackgroundError = backgroundResults.find(
(result) => !result.ok && !result.observation.observed,
);
@@ -1635,7 +1832,8 @@ async function handleExec(message) {
}
}
output = logs.join("\n");
});
},
);
});
previousModule = module;
@@ -1680,6 +1878,7 @@ async function handleExec(message) {
error: error && error.message ? error.message : String(error),
});
} finally {
execState.closed = true;
if (activeExecId === message.id) {
activeExecId = null;
}
@@ -1703,7 +1902,6 @@ function handleEmitImageResult(message) {
}
let queue = Promise.resolve();
let pendingInputSegments = [];
process.on("uncaughtException", (error) => {
scheduleFatalExit("uncaught exception", error);
@@ -1713,7 +1911,8 @@ process.on("unhandledRejection", (reason) => {
scheduleFatalExit("unhandled rejection", reason);
});
function handleInputLine(line) {
const input = createInterface({ input: process.stdin, crlfDelay: Infinity });
input.on("line", (line) => {
if (!line.trim()) {
return;
}
@@ -1736,49 +1935,4 @@ function handleInputLine(line) {
if (message.type === "emit_image_result") {
handleEmitImageResult(message);
}
}
function takePendingInputFrame() {
if (pendingInputSegments.length === 0) {
return null;
}
// Keep raw stdin chunks queued until a full JSONL frame is ready so we only
// assemble the frame bytes once.
const frame =
pendingInputSegments.length === 1
? pendingInputSegments[0]
: Buffer.concat(pendingInputSegments);
pendingInputSegments = [];
return frame;
}
function handleInputFrame(frame) {
if (!frame) {
return;
}
if (frame[frame.length - 1] === 0x0d) {
frame = frame.subarray(0, frame.length - 1);
}
handleInputLine(frame.toString("utf8"));
}
process.stdin.on("data", (chunk) => {
const input = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
let segmentStart = 0;
let frameEnd = input.indexOf(0x0a);
while (frameEnd !== -1) {
pendingInputSegments.push(input.subarray(segmentStart, frameEnd));
handleInputFrame(takePendingInputFrame());
segmentStart = frameEnd + 1;
frameEnd = input.indexOf(0x0a, segmentStart);
}
if (segmentStart < input.length) {
pendingInputSegments.push(input.subarray(segmentStart));
}
});
process.stdin.on("end", () => {
handleInputFrame(takePendingInputFrame());
});

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,124 @@
use super::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(super) struct NodeVersion {
pub(super) major: u64,
pub(super) minor: u64,
pub(super) patch: u64,
}
impl fmt::Display for NodeVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
impl NodeVersion {
pub(super) fn parse(input: &str) -> Result<Self, String> {
let trimmed = input.trim().trim_start_matches('v');
let mut parts = trimmed.split(['.', '-', '+']);
let major = parts
.next()
.ok_or_else(|| "missing major version".to_string())?
.parse::<u64>()
.map_err(|err| format!("invalid major version: {err}"))?;
let minor = parts
.next()
.ok_or_else(|| "missing minor version".to_string())?
.parse::<u64>()
.map_err(|err| format!("invalid minor version: {err}"))?;
let patch = parts
.next()
.ok_or_else(|| "missing patch version".to_string())?
.parse::<u64>()
.map_err(|err| format!("invalid patch version: {err}"))?;
Ok(Self {
major,
minor,
patch,
})
}
}
fn required_node_version() -> Result<NodeVersion, String> {
NodeVersion::parse(JS_REPL_MIN_NODE_VERSION)
}
async fn read_node_version(node_path: &Path) -> Result<NodeVersion, String> {
let output = tokio::process::Command::new(node_path)
.arg("--version")
.output()
.await
.map_err(|err| format!("failed to execute Node: {err}"))?;
if !output.status.success() {
let mut details = String::new();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = stdout.trim();
let stderr = stderr.trim();
if !stdout.is_empty() {
details.push_str(" stdout: ");
details.push_str(stdout);
}
if !stderr.is_empty() {
details.push_str(" stderr: ");
details.push_str(stderr);
}
let details = if details.is_empty() {
String::new()
} else {
format!(" ({details})")
};
return Err(format!(
"failed to read Node version (status {status}){details}",
status = output.status
));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let stdout = stdout.trim();
NodeVersion::parse(stdout)
.map_err(|err| format!("failed to parse Node version output `{stdout}`: {err}"))
}
async fn ensure_node_version(node_path: &Path) -> Result<(), String> {
let required = required_node_version()?;
let found = read_node_version(node_path).await?;
if found < required {
return Err(format!(
"Node runtime too old for js_repl (resolved {node_path}): found v{found}, requires >= v{required}. Install/update Node or set js_repl_node_path to a newer runtime.",
node_path = node_path.display()
));
}
Ok(())
}
pub(crate) async fn resolve_compatible_node(config_path: Option<&Path>) -> Result<PathBuf, String> {
let node_path = resolve_node(config_path).ok_or_else(|| {
"Node runtime not found; install Node or set CODEX_JS_REPL_NODE_PATH".to_string()
})?;
ensure_node_version(&node_path).await?;
Ok(node_path)
}
pub(super) fn resolve_node(config_path: Option<&Path>) -> Option<PathBuf> {
if let Some(path) = std::env::var_os("CODEX_JS_REPL_NODE_PATH") {
let p = PathBuf::from(path);
if p.exists() {
return Some(p);
}
}
if let Some(path) = config_path
&& path.exists()
{
return Some(path.to_path_buf());
}
if let Ok(path) = which::which("node") {
return Some(path);
}
None
}

View File

@@ -0,0 +1,440 @@
use super::*;
pub(super) struct ExecBuffer {
pub(super) event_call_id: String,
pub(super) session_id: Option<String>,
pub(super) session: Arc<Session>,
pub(super) turn: Arc<TurnContext>,
pub(super) logs: VecDeque<String>,
pub(super) logs_bytes: usize,
pub(super) logs_truncated: bool,
pub(super) all_logs: Vec<String>,
pub(super) all_logs_bytes: usize,
pub(super) all_logs_truncated: bool,
pub(super) final_output: Option<String>,
pub(super) content_items: Vec<FunctionCallOutputContentItem>,
pub(super) error: Option<String>,
pub(super) done: bool,
pub(super) host_terminating: bool,
pub(super) terminal_kind: Option<ExecTerminalKind>,
pub(super) completed_sequence: Option<u64>,
pub(super) started_at: Instant,
pub(super) notify: Arc<Notify>,
pub(super) emitted_deltas: usize,
}
impl ExecBuffer {
pub(super) fn new(
event_call_id: String,
session_id: Option<String>,
session: Arc<Session>,
turn: Arc<TurnContext>,
) -> Self {
Self {
event_call_id,
session_id,
session,
turn,
logs: VecDeque::new(),
logs_bytes: 0,
logs_truncated: false,
all_logs: Vec::new(),
all_logs_bytes: 0,
all_logs_truncated: false,
final_output: None,
content_items: Vec::new(),
error: None,
done: false,
host_terminating: false,
terminal_kind: None,
completed_sequence: None,
started_at: Instant::now(),
notify: Arc::new(Notify::new()),
emitted_deltas: 0,
}
}
pub(super) fn push_log(&mut self, text: String) {
self.logs.push_back(text.clone());
self.logs_bytes = self.logs_bytes.saturating_add(text.len());
while self.logs_bytes > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES {
let Some(removed) = self.logs.pop_front() else {
break;
};
self.logs_bytes = self.logs_bytes.saturating_sub(removed.len());
self.logs_truncated = true;
}
if self.logs_truncated
&& self
.logs
.front()
.is_none_or(|line| line != JS_REPL_POLL_LOGS_TRUNCATED_MARKER)
{
let marker_len = JS_REPL_POLL_LOGS_TRUNCATED_MARKER.len();
while self.logs_bytes.saturating_add(marker_len) > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES {
let Some(removed) = self.logs.pop_front() else {
break;
};
self.logs_bytes = self.logs_bytes.saturating_sub(removed.len());
}
self.logs
.push_front(JS_REPL_POLL_LOGS_TRUNCATED_MARKER.to_string());
self.logs_bytes = self.logs_bytes.saturating_add(marker_len);
}
if self.all_logs_truncated {
return;
}
let separator_bytes = if self.all_logs.is_empty() { 0 } else { 1 };
let next_bytes = text.len() + separator_bytes;
if self.all_logs_bytes.saturating_add(next_bytes) > JS_REPL_POLL_ALL_LOGS_MAX_BYTES {
self.all_logs
.push(JS_REPL_POLL_ALL_LOGS_TRUNCATED_MARKER.to_string());
self.all_logs_truncated = true;
return;
}
self.all_logs.push(text);
self.all_logs_bytes = self.all_logs_bytes.saturating_add(next_bytes);
}
pub(super) fn poll_logs(&mut self) -> Vec<String> {
let drained: Vec<String> = self.logs.drain(..).collect();
self.logs_bytes = 0;
self.logs_truncated = false;
drained
}
pub(super) fn display_output(&self) -> String {
if let Some(final_output) = self.final_output.as_deref()
&& !final_output.is_empty()
{
return final_output.to_string();
}
self.all_logs.join("\n")
}
pub(super) fn poll_final_output(&self) -> Option<String> {
if self.done {
self.final_output.clone()
} else {
None
}
}
pub(super) fn poll_content_items(&self) -> Vec<FunctionCallOutputContentItem> {
if self.done && self.error.is_none() {
self.content_items.clone()
} else {
Vec::new()
}
}
pub(super) fn output_delta_chunks_for_log_line(&mut self, line: &str) -> Vec<Vec<u8>> {
if self.emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL {
return Vec::new();
}
let mut text = String::with_capacity(line.len() + 1);
text.push_str(line);
text.push('\n');
let remaining = MAX_EXEC_OUTPUT_DELTAS_PER_CALL - self.emitted_deltas;
let chunks =
split_utf8_chunks_with_limits(&text, JS_REPL_OUTPUT_DELTA_MAX_BYTES, remaining);
self.emitted_deltas += chunks.len();
chunks
}
}
pub(super) fn split_utf8_chunks_with_limits(
input: &str,
max_bytes: usize,
max_chunks: usize,
) -> Vec<Vec<u8>> {
if input.is_empty() || max_bytes == 0 || max_chunks == 0 {
return Vec::new();
}
let bytes = input.as_bytes();
let mut output = Vec::new();
let mut start = 0usize;
while start < input.len() && output.len() < max_chunks {
let mut end = (start + max_bytes).min(input.len());
while end > start && !input.is_char_boundary(end) {
end -= 1;
}
if end == start {
if let Some(ch) = input[start..].chars().next() {
end = (start + ch.len_utf8()).min(input.len());
} else {
break;
}
}
output.push(bytes[start..end].to_vec());
start = end;
}
output
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) enum ExecTerminalKind {
Success,
Error,
KernelExit,
Cancelled,
}
pub(super) struct ExecCompletionEvent {
pub(super) session: Arc<Session>,
pub(super) turn: Arc<TurnContext>,
pub(super) event_call_id: String,
pub(super) output: String,
pub(super) error: Option<String>,
pub(super) duration: Duration,
pub(super) timed_out: bool,
}
pub(super) enum KernelStreamEnd {
Shutdown,
StdoutEof,
}
impl KernelStreamEnd {
pub(super) fn reason(&self) -> &'static str {
match self {
Self::Shutdown => "shutdown",
Self::StdoutEof => "stdout_eof",
}
}
pub(super) fn error(&self) -> Option<&str> {
None
}
}
pub(super) struct KernelDebugSnapshot {
pub(super) pid: Option<u32>,
pub(super) status: String,
pub(super) stderr_tail: String,
}
pub(super) fn format_stderr_tail(lines: &VecDeque<String>) -> String {
if lines.is_empty() {
return "<empty>".to_string();
}
lines
.iter()
.cloned()
.collect::<Vec<_>>()
.join(JS_REPL_STDERR_TAIL_SEPARATOR)
}
pub(super) fn truncate_utf8_prefix_by_bytes(input: &str, max_bytes: usize) -> String {
if input.len() <= max_bytes {
return input.to_string();
}
if max_bytes == 0 {
return String::new();
}
let mut end = max_bytes;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
input[..end].to_string()
}
pub(super) fn stderr_tail_formatted_bytes(lines: &VecDeque<String>) -> usize {
if lines.is_empty() {
return 0;
}
let payload_bytes: usize = lines.iter().map(String::len).sum();
let separator_bytes = JS_REPL_STDERR_TAIL_SEPARATOR.len() * (lines.len() - 1);
payload_bytes + separator_bytes
}
fn stderr_tail_bytes_with_candidate(lines: &VecDeque<String>, line: &str) -> usize {
if lines.is_empty() {
return line.len();
}
stderr_tail_formatted_bytes(lines) + JS_REPL_STDERR_TAIL_SEPARATOR.len() + line.len()
}
pub(super) fn push_stderr_tail_line(lines: &mut VecDeque<String>, line: &str) -> String {
let max_line_bytes = JS_REPL_STDERR_TAIL_LINE_MAX_BYTES.min(JS_REPL_STDERR_TAIL_MAX_BYTES);
let bounded_line = truncate_utf8_prefix_by_bytes(line, max_line_bytes);
if bounded_line.is_empty() {
return bounded_line;
}
while !lines.is_empty()
&& (lines.len() >= JS_REPL_STDERR_TAIL_LINE_LIMIT
|| stderr_tail_bytes_with_candidate(lines, &bounded_line)
> JS_REPL_STDERR_TAIL_MAX_BYTES)
{
lines.pop_front();
}
lines.push_back(bounded_line.clone());
bounded_line
}
pub(super) fn is_kernel_status_exited(status: &str) -> bool {
status.starts_with("exited(")
}
pub(super) fn should_include_model_diagnostics_for_write_error(
err_message: &str,
snapshot: &KernelDebugSnapshot,
) -> bool {
is_kernel_status_exited(&snapshot.status)
|| err_message.to_ascii_lowercase().contains("broken pipe")
}
fn format_model_kernel_failure_details(
reason: &str,
stream_error: Option<&str>,
snapshot: &KernelDebugSnapshot,
) -> String {
let payload = serde_json::json!({
"reason": reason,
"stream_error": stream_error
.map(|err| truncate_utf8_prefix_by_bytes(err, JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES)),
"kernel_pid": snapshot.pid,
"kernel_status": snapshot.status,
"kernel_stderr_tail": truncate_utf8_prefix_by_bytes(
&snapshot.stderr_tail,
JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES,
),
});
let encoded = serde_json::to_string(&payload)
.unwrap_or_else(|err| format!(r#"{{"reason":"serialization_error","error":"{err}"}}"#));
format!("js_repl diagnostics: {encoded}")
}
pub(super) fn with_model_kernel_failure_message(
base_message: &str,
reason: &str,
stream_error: Option<&str>,
snapshot: &KernelDebugSnapshot,
) -> String {
format!(
"{base_message}\n\n{}",
format_model_kernel_failure_details(reason, stream_error, snapshot)
)
}
fn join_outputs(stdout: &str, stderr: &str) -> String {
if stdout.is_empty() {
stderr.to_string()
} else if stderr.is_empty() {
stdout.to_string()
} else {
format!("{stdout}\n{stderr}")
}
}
pub(super) fn build_js_repl_exec_output(
output: &str,
error: Option<&str>,
duration: Duration,
timed_out: bool,
) -> ExecToolCallOutput {
let stdout = output.to_string();
let stderr = error.unwrap_or("").to_string();
let aggregated_output = join_outputs(&stdout, &stderr);
ExecToolCallOutput {
exit_code: if error.is_some() { 1 } else { 0 },
stdout: StreamOutput::new(stdout),
stderr: StreamOutput::new(stderr),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out,
}
}
pub(super) fn emitted_image_content_item(
turn: &TurnContext,
image_url: String,
detail: Option<ImageDetail>,
) -> FunctionCallOutputContentItem {
FunctionCallOutputContentItem::InputImage {
image_url,
detail: detail.or_else(|| default_output_image_detail_for_turn(turn)),
}
}
pub(super) fn drain_broadcast_lines(buffer: &mut Vec<u8>) -> Vec<String> {
let mut lines = Vec::new();
loop {
let Some(pos) = buffer.iter().position(|byte| *byte == b'\n') else {
break;
};
let line = buffer.drain(..=pos).collect::<Vec<_>>();
lines.push(decode_broadcast_line(&line));
}
lines
}
pub(super) fn finish_broadcast_line(buffer: &mut Vec<u8>) -> Option<String> {
if buffer.is_empty() {
None
} else {
Some(decode_broadcast_line(&std::mem::take(buffer)))
}
}
fn decode_broadcast_line(line: &[u8]) -> String {
let line = String::from_utf8_lossy(line);
line.trim_end_matches(['\n', '\r']).to_string()
}
pub(super) fn validate_emitted_image_url(image_url: &str) -> Result<(), String> {
if image_url
.get(..5)
.is_some_and(|scheme| scheme.eq_ignore_ascii_case("data:"))
{
Ok(())
} else {
Err("codex.emitImage only accepts data URLs".to_string())
}
}
fn default_output_image_detail_for_turn(turn: &TurnContext) -> Option<ImageDetail> {
(turn.config.features.enabled(Feature::ImageDetailOriginal)
&& turn.model_info.supports_image_detail_original)
.then_some(ImageDetail::Original)
}
pub(super) fn build_exec_result_content_items(
output: String,
content_items: Vec<FunctionCallOutputContentItem>,
) -> Vec<FunctionCallOutputContentItem> {
let mut all_content_items = Vec::with_capacity(content_items.len() + 1);
all_content_items.push(FunctionCallOutputContentItem::InputText { text: output });
all_content_items.extend(content_items);
all_content_items
}
pub(super) fn split_exec_result_content_items(
mut content_items: Vec<FunctionCallOutputContentItem>,
) -> (String, Vec<FunctionCallOutputContentItem>) {
match content_items.first() {
Some(FunctionCallOutputContentItem::InputText { .. }) => {
let FunctionCallOutputContentItem::InputText { text } = content_items.remove(0) else {
unreachable!("first content item should be input_text");
};
(text, content_items)
}
Some(FunctionCallOutputContentItem::InputImage { .. }) | None => {
(String::new(), content_items)
}
}
}
pub(super) fn clamp_poll_ms(value: Option<u64>) -> u64 {
value
.unwrap_or(JS_REPL_POLL_DEFAULT_MS)
.clamp(JS_REPL_POLL_MIN_MS, JS_REPL_POLL_MAX_MS)
}

View File

@@ -0,0 +1,79 @@
use super::*;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum KernelToHost {
ExecLog {
id: String,
text: String,
},
ExecResult {
id: String,
ok: bool,
output: String,
#[serde(default)]
error: Option<String>,
},
RunTool(RunToolRequest),
EmitImage(EmitImageRequest),
}
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum HostToKernel {
Exec {
id: String,
code: String,
#[serde(default)]
timeout_ms: Option<u64>,
#[serde(default)]
stream_logs: bool,
},
RunToolResult(RunToolResult),
EmitImageResult(EmitImageResult),
}
#[derive(Clone, Debug, Deserialize)]
pub(super) struct RunToolRequest {
pub(super) id: String,
pub(super) exec_id: String,
pub(super) tool_name: String,
pub(super) arguments: String,
}
#[derive(Clone, Debug, Serialize)]
pub(super) struct RunToolResult {
pub(super) id: String,
pub(super) ok: bool,
#[serde(default)]
pub(super) response: Option<JsonValue>,
#[serde(default)]
pub(super) error: Option<String>,
}
#[derive(Clone, Debug, Deserialize)]
pub(super) struct EmitImageRequest {
pub(super) id: String,
pub(super) exec_id: String,
pub(super) image_url: String,
#[serde(default)]
pub(super) detail: Option<ImageDetail>,
}
#[derive(Clone, Debug, Serialize)]
pub(super) struct EmitImageResult {
pub(super) id: String,
pub(super) ok: bool,
#[serde(default)]
pub(super) error: Option<String>,
}
#[derive(Debug)]
pub(super) enum ExecResultMessage {
Ok {
content_items: Vec<FunctionCallOutputContentItem>,
},
Err {
message: String,
},
}

View File

@@ -26,6 +26,7 @@ use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::default_exec_approval_requirement;
use crate::tools::sandboxing::has_managed_network_requirements;
use codex_otel::ToolDecisionSource;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::NetworkPolicyRuleAction;
@@ -166,38 +167,13 @@ impl ToolOrchestrator {
}
// 2) First attempt under the selected sandbox.
let has_managed_network_requirements = turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some();
let initial_sandbox = match tool.sandbox_mode_for_first_attempt(req) {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => self.sandbox.select_initial(
&turn_ctx.file_system_sandbox_policy,
turn_ctx.network_sandbox_policy,
tool.sandbox_preference(),
turn_ctx.windows_sandbox_level,
has_managed_network_requirements,
),
};
// Platform-specific flag gating is handled by SandboxManager::select_initial
// via crate::safety::get_platform_sandbox(..).
let use_legacy_landlock = turn_ctx.features.use_legacy_landlock();
let initial_attempt = SandboxAttempt {
sandbox: initial_sandbox,
policy: &turn_ctx.sandbox_policy,
file_system_policy: &turn_ctx.file_system_sandbox_policy,
network_policy: turn_ctx.network_sandbox_policy,
enforce_managed_network: has_managed_network_requirements,
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_legacy_landlock,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};
let has_managed_network_requirements = has_managed_network_requirements(turn_ctx);
let initial_attempt = SandboxAttempt::initial_for_turn(
&self.sandbox,
turn_ctx,
tool.sandbox_preference(),
tool.sandbox_mode_for_first_attempt(req),
);
let (first_result, first_deferred_network_approval) = Self::run_attempt(
tool,
@@ -308,18 +284,12 @@ impl ToolOrchestrator {
}
}
let escalated_attempt = SandboxAttempt {
sandbox: crate::exec::SandboxType::None,
policy: &turn_ctx.sandbox_policy,
file_system_policy: &turn_ctx.file_system_sandbox_policy,
network_policy: turn_ctx.network_sandbox_policy,
enforce_managed_network: has_managed_network_requirements,
manager: &self.sandbox,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: None,
use_legacy_landlock,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
};
let escalated_attempt = SandboxAttempt::initial_for_turn(
&self.sandbox,
turn_ctx,
tool.sandbox_preference(),
SandboxOverride::BypassSandboxFirstAttempt,
);
// Second attempt.
let (retry_result, retry_deferred_network_approval) = Self::run_attempt(

View File

@@ -159,7 +159,6 @@ impl ExecApprovalRequirement {
}
}
/// - Never, OnFailure: do not ask
/// - OnRequest: ask unless filesystem access is unrestricted
/// - Granular: ask unless filesystem access is unrestricted, but auto-reject
/// when granular sandbox approval is disabled.
@@ -234,7 +233,7 @@ pub(crate) trait Approvable<Req> {
// In most cases (shell, unified_exec), a request will have a single approval key.
//
// However, apply_patch needs session "Allow, don't ask again" semantics that
// However, apply_patch needs session "approve once, don't ask again" semantics that
// apply to multiple atomic targets (e.g., apply_patch approves per file path). Returning
// a list of keys lets the runtime treat the request as approved-for-session only if
// *all* keys are already approved, while still caching approvals per-key so future
@@ -325,7 +324,7 @@ pub(crate) trait ToolRuntime<Req, Out>: Approvable<Req> + Sandboxable {
pub(crate) struct SandboxAttempt<'a> {
pub sandbox: crate::exec::SandboxType,
pub policy: &'a crate::protocol::SandboxPolicy,
pub file_system_policy: &'a FileSystemSandboxPolicy,
pub file_system_policy: FileSystemSandboxPolicy,
pub network_policy: NetworkSandboxPolicy,
pub enforce_managed_network: bool,
pub(crate) manager: &'a SandboxManager,
@@ -335,8 +334,52 @@ pub(crate) struct SandboxAttempt<'a> {
pub windows_sandbox_level: codex_protocol::config_types::WindowsSandboxLevel,
}
pub(crate) fn has_managed_network_requirements(turn_ctx: &TurnContext) -> bool {
turn_ctx
.config
.config_layer_stack
.requirements_toml()
.network
.is_some()
}
impl<'a> SandboxAttempt<'a> {
pub fn env_for(
pub(crate) fn initial_for_turn(
manager: &'a SandboxManager,
turn_ctx: &'a TurnContext,
preference: SandboxablePreference,
sandbox_override: SandboxOverride,
) -> Self {
let enforce_managed_network = has_managed_network_requirements(turn_ctx);
let policy = turn_ctx.sandbox_policy.get();
let file_system_policy = FileSystemSandboxPolicy::from(policy);
let network_policy = NetworkSandboxPolicy::from(policy);
let sandbox = match sandbox_override {
SandboxOverride::BypassSandboxFirstAttempt => crate::exec::SandboxType::None,
SandboxOverride::NoOverride => manager.select_initial(
&file_system_policy,
network_policy,
preference,
turn_ctx.windows_sandbox_level,
enforce_managed_network,
),
};
Self {
sandbox,
policy,
file_system_policy,
network_policy,
enforce_managed_network,
manager,
sandbox_cwd: &turn_ctx.cwd,
codex_linux_sandbox_exe: turn_ctx.codex_linux_sandbox_exe.as_ref(),
use_legacy_landlock: false,
windows_sandbox_level: turn_ctx.windows_sandbox_level,
}
}
pub(crate) fn env_for(
&self,
spec: CommandSpec,
network: Option<&NetworkProxy>,
@@ -345,7 +388,7 @@ impl<'a> SandboxAttempt<'a> {
.transform(crate::sandboxing::SandboxTransformRequest {
spec,
policy: self.policy,
file_system_policy: self.file_system_policy,
file_system_policy: &self.file_system_policy,
network_policy: self.network_policy,
sandbox: self.sandbox,
enforce_managed_network: self.enforce_managed_network,
@@ -361,5 +404,116 @@ impl<'a> SandboxAttempt<'a> {
}
#[cfg(test)]
#[path = "sandboxing_tests.rs"]
mod tests;
mod tests {
use super::*;
use crate::sandboxing::SandboxPermissions;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::NetworkAccess;
use pretty_assertions::assert_eq;
#[test]
fn external_sandbox_skips_exec_approval_on_request() {
assert_eq!(
default_exec_approval_requirement(
AskForApproval::OnRequest,
&FileSystemSandboxPolicy::from(&SandboxPolicy::ExternalSandbox {
network_access: NetworkAccess::Restricted,
}),
),
ExecApprovalRequirement::Skip {
bypass_sandbox: false,
proposed_execpolicy_amendment: None,
}
);
}
#[test]
fn restricted_sandbox_requires_exec_approval_on_request() {
assert_eq!(
default_exec_approval_requirement(
AskForApproval::OnRequest,
&FileSystemSandboxPolicy::from(&SandboxPolicy::new_read_only_policy())
),
ExecApprovalRequirement::NeedsApproval {
reason: None,
proposed_execpolicy_amendment: None,
}
);
}
#[test]
fn default_exec_approval_requirement_rejects_sandbox_prompt_when_configured() {
let policy = AskForApproval::Granular(GranularApprovalConfig {
sandbox_approval: false,
rules: false,
skill_approval: true,
request_permissions: false,
mcp_elicitations: false,
});
let requirement = default_exec_approval_requirement(
policy,
&FileSystemSandboxPolicy::from(&SandboxPolicy::new_read_only_policy()),
);
assert_eq!(
requirement,
ExecApprovalRequirement::Forbidden {
reason: "approval policy disallowed sandbox approval prompt".to_string(),
}
);
}
#[test]
fn default_exec_approval_requirement_keeps_prompt_when_sandbox_rejection_is_disabled() {
let policy = AskForApproval::Granular(GranularApprovalConfig {
sandbox_approval: true,
rules: true,
skill_approval: false,
request_permissions: false,
mcp_elicitations: true,
});
let requirement = default_exec_approval_requirement(
policy,
&FileSystemSandboxPolicy::from(&SandboxPolicy::new_read_only_policy()),
);
assert_eq!(
requirement,
ExecApprovalRequirement::NeedsApproval {
reason: None,
proposed_execpolicy_amendment: None,
}
);
}
#[test]
fn additional_permissions_allow_bypass_sandbox_first_attempt_when_execpolicy_skips() {
assert_eq!(
sandbox_override_for_first_attempt(
SandboxPermissions::WithAdditionalPermissions,
&ExecApprovalRequirement::Skip {
bypass_sandbox: true,
proposed_execpolicy_amendment: None,
},
),
SandboxOverride::BypassSandboxFirstAttempt
);
}
#[test]
fn guardian_bypasses_sandbox_for_explicit_escalation_on_first_attempt() {
assert_eq!(
sandbox_override_for_first_attempt(
SandboxPermissions::RequireEscalated,
&ExecApprovalRequirement::Skip {
bypass_sandbox: false,
proposed_execpolicy_amendment: None,
},
),
SandboxOverride::BypassSandboxFirstAttempt
);
}
}

View File

@@ -227,6 +227,7 @@ pub(crate) struct ToolsConfig {
pub request_permissions_tool_enabled: bool,
pub code_mode_enabled: bool,
pub js_repl_enabled: bool,
pub js_repl_polling_enabled: bool,
pub js_repl_tools_only: bool,
pub can_request_original_image_detail: bool,
pub collab_tools: bool,
@@ -364,6 +365,7 @@ impl ToolsConfig {
request_permissions_tool_enabled,
code_mode_enabled: include_code_mode,
js_repl_enabled: include_js_repl,
js_repl_polling_enabled: features.enabled(Feature::JsReplPolling),
js_repl_tools_only: include_js_repl_tools_only,
can_request_original_image_detail: include_original_image_detail,
collab_tools: include_collab_tools,
@@ -2000,15 +2002,23 @@ JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/
}
fn create_js_repl_reset_tool() -> ToolSpec {
let properties = BTreeMap::from([(
"session_id".to_string(),
JsonSchema::String {
description: Some(
"Optional polling session identifier to reset. When omitted, resets the singleton kernel and all polling sessions."
.to_string(),
),
},
)]);
ToolSpec::Function(ResponsesApiTool {
name: "js_repl_reset".to_string(),
description:
"Restarts the js_repl kernel for this run and clears persisted top-level bindings."
.to_string(),
description: "Resets js_repl state. When `session_id` is omitted, restarts the singleton kernel and clears all polling sessions; when provided, resets only that polling session.".to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::Object {
properties: BTreeMap::new(),
properties,
required: None,
additional_properties: Some(false.into()),
},
@@ -2016,6 +2026,41 @@ fn create_js_repl_reset_tool() -> ToolSpec {
})
}
fn create_js_repl_poll_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
"exec_id".to_string(),
JsonSchema::String {
description: Some(
"Identifier returned by a polling js_repl submit call.".to_string(),
),
},
),
(
"yield_time_ms".to_string(),
JsonSchema::Number {
description: Some(
"How long to wait for additional logs or completion before returning."
.to_string(),
),
},
),
]);
ToolSpec::Function(ResponsesApiTool {
name: "js_repl_poll".to_string(),
description: "Polls a js_repl exec started with `poll=true`, returning incremental logs and terminal `final_output` / `error` once complete.".to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["exec_id".to_string()]),
additional_properties: Some(false.into()),
},
output_schema: None,
})
}
fn create_code_mode_tool(enabled_tool_names: &[String]) -> ToolSpec {
const CODE_MODE_FREEFORM_GRAMMAR: &str = r#"
start: pragma_source | plain_source
@@ -2441,6 +2486,7 @@ pub(crate) fn build_specs_with_discoverable_tools(
use crate::tools::handlers::DynamicToolHandler;
use crate::tools::handlers::GrepFilesHandler;
use crate::tools::handlers::JsReplHandler;
use crate::tools::handlers::JsReplPollHandler;
use crate::tools::handlers::JsReplResetHandler;
use crate::tools::handlers::ListDirHandler;
use crate::tools::handlers::McpHandler;
@@ -2482,6 +2528,7 @@ pub(crate) fn build_specs_with_discoverable_tools(
let code_mode_handler = Arc::new(CodeModeExecuteHandler);
let code_mode_wait_handler = Arc::new(CodeModeWaitHandler);
let js_repl_handler = Arc::new(JsReplHandler);
let js_repl_poll_handler = Arc::new(JsReplPollHandler);
let js_repl_reset_handler = Arc::new(JsReplResetHandler);
let artifacts_handler = Arc::new(ArtifactsHandler);
let exec_permission_approvals_enabled = config.exec_permission_approvals_enabled;
@@ -2619,6 +2666,15 @@ pub(crate) fn build_specs_with_discoverable_tools(
false,
config.code_mode_enabled,
);
if config.js_repl_polling_enabled {
push_tool_spec(
&mut builder,
create_js_repl_poll_tool(),
false,
config.code_mode_enabled,
);
builder.register_handler("js_repl_poll", js_repl_poll_handler);
}
push_tool_spec(
&mut builder,
create_js_repl_reset_tool(),

View File

@@ -832,6 +832,28 @@ fn js_repl_enabled_adds_tools() {
assert_contains_tool_names(&tools, &["js_repl", "js_repl_reset"]);
}
#[test]
fn js_repl_polling_enabled_adds_poll_tool() {
let config = test_config();
let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
let mut features = Features::with_defaults();
features.enable(Feature::JsRepl);
features.enable(Feature::JsReplPolling);
let available_models = Vec::new();
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
available_models: &available_models,
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::Cli,
sandbox_policy: &SandboxPolicy::DangerFullAccess,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
});
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
assert_contains_tool_names(&tools, &["js_repl", "js_repl_poll", "js_repl_reset"]);
}
#[test]
fn image_generation_tools_require_feature_and_supported_model() {
let config = test_config();

View File

@@ -48,6 +48,7 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
}
pub(crate) use errors::UnifiedExecError;
pub(crate) use process::ManagedSplitProcess;
pub(crate) use process::NoopSpawnLifecycle;
#[cfg(unix)]
pub(crate) use process::SpawnLifecycle;

View File

@@ -7,6 +7,7 @@ use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
@@ -48,7 +49,7 @@ pub(crate) struct OutputHandles {
#[derive(Debug)]
pub(crate) struct UnifiedExecProcess {
process_handle: ExecCommandSession,
output_rx: broadcast::Receiver<Vec<u8>>,
output_tx: broadcast::Sender<Vec<u8>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
@@ -60,10 +61,17 @@ pub(crate) struct UnifiedExecProcess {
_spawn_lifecycle: SpawnLifecycleHandle,
}
pub(crate) struct ManagedSplitProcess {
pub(crate) process: UnifiedExecProcess,
pub(crate) stdin: mpsc::Sender<Vec<u8>>,
pub(crate) stdout_rx: mpsc::Receiver<Vec<u8>>,
pub(crate) stderr_rx: mpsc::Receiver<Vec<u8>>,
}
impl UnifiedExecProcess {
pub(super) fn new(
pub(crate) fn new(
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
initial_output_rx: mpsc::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Self {
@@ -73,34 +81,28 @@ impl UnifiedExecProcess {
let output_closed_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let output_drained = Arc::new(Notify::new());
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let mut receiver = initial_output_rx;
let output_rx = receiver.resubscribe();
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_closed_clone = Arc::clone(&output_closed);
let output_closed_notify_clone = Arc::clone(&output_closed_notify);
let output_tx_clone = output_tx.clone();
let output_task = tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(chunk) => {
let mut guard = buffer_clone.lock().await;
guard.push_chunk(chunk);
drop(guard);
notify_clone.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
output_closed_clone.store(true, Ordering::Release);
output_closed_notify_clone.notify_waiters();
break;
}
};
while let Some(chunk) = receiver.recv().await {
let _ = output_tx_clone.send(chunk.clone());
let mut guard = buffer_clone.lock().await;
guard.push_chunk(chunk);
drop(guard);
notify_clone.notify_waiters();
}
output_closed_clone.store(true, Ordering::Release);
output_closed_notify_clone.notify_waiters();
});
Self {
process_handle,
output_rx,
output_tx,
output_buffer,
output_notify,
output_closed,
@@ -113,7 +115,7 @@ impl UnifiedExecProcess {
}
}
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
pub(crate) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.process_handle.writer_sender()
}
@@ -128,10 +130,10 @@ impl UnifiedExecProcess {
}
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
self.output_rx.resubscribe()
self.output_tx.subscribe()
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
pub(crate) fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
@@ -139,15 +141,15 @@ impl UnifiedExecProcess {
Arc::clone(&self.output_drained)
}
pub(super) fn has_exited(&self) -> bool {
pub(crate) fn has_exited(&self) -> bool {
self.process_handle.has_exited()
}
pub(super) fn exit_code(&self) -> Option<i32> {
pub(crate) fn exit_code(&self) -> Option<i32> {
self.process_handle.exit_code()
}
pub(super) fn terminate(&self) {
pub(crate) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();
@@ -155,6 +157,14 @@ impl UnifiedExecProcess {
self.output_task.abort();
}
pub(crate) fn request_terminate(&self) {
self.process_handle.request_terminate();
}
pub(crate) fn pid(&self) -> Option<u32> {
self.process_handle.pid()
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot_chunks()
@@ -218,11 +228,26 @@ impl UnifiedExecProcess {
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session: process_handle,
stdout_rx,
stderr_rx,
mut exit_rx,
output_rx,
exit_rx,
} = spawned;
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
Self::from_process_parts(
process_handle,
output_rx,
exit_rx,
sandbox_type,
spawn_lifecycle,
)
.await
}
pub(crate) async fn from_process_parts(
process_handle: ExecCommandSession,
output_rx: mpsc::Receiver<Vec<u8>>,
mut exit_rx: oneshot::Receiver<i32>,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<Self, UnifiedExecError> {
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View File

@@ -33,6 +33,7 @@ use crate::unified_exec::MAX_UNIFIED_EXEC_PROCESSES;
use crate::unified_exec::MAX_YIELD_TIME_MS;
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
use crate::unified_exec::MIN_YIELD_TIME_MS;
use crate::unified_exec::ManagedSplitProcess;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::ProcessStore;
use crate::unified_exec::UnifiedExecContext;
@@ -46,6 +47,7 @@ use crate::unified_exec::async_watcher::start_streaming_output;
use crate::unified_exec::clamp_yield_time;
use crate::unified_exec::generate_chunk_id;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::process::NoopSpawnLifecycle;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::SpawnLifecycleHandle;
@@ -102,6 +104,17 @@ struct PreparedProcessHandles {
tty: bool,
}
#[derive(Clone, Copy)]
enum ExecEnvSpawnMode {
Merged { tty: bool },
SplitPipe,
}
enum SpawnedExecEnvProcess {
Merged(codex_utils_pty::SpawnedPty),
Split(codex_utils_pty::SpawnedProcessSplit),
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> i32 {
loop {
@@ -533,13 +546,63 @@ impl UnifiedExecProcessManager {
tty: bool,
mut spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::Merged { tty }).await? {
SpawnedExecEnvProcess::Merged(spawned) => {
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
}
SpawnedExecEnvProcess::Split(_) => {
unreachable!("merged spawn mode returned split process")
}
}
}
pub(crate) async fn open_split_pipe_session_with_exec_env(
&self,
env: &ExecRequest,
) -> Result<ManagedSplitProcess, UnifiedExecError> {
match Self::spawn_exec_env_process(env, ExecEnvSpawnMode::SplitPipe).await? {
SpawnedExecEnvProcess::Merged(_) => {
unreachable!("split pipe spawn mode returned merged process")
}
SpawnedExecEnvProcess::Split(spawned) => {
let codex_utils_pty::SpawnedProcessSplit {
session: process_handle,
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
let stdin = process_handle.writer_sender();
let process = UnifiedExecProcess::from_process_parts(
process_handle,
output_rx,
exit_rx,
env.sandbox,
Box::new(NoopSpawnLifecycle),
)
.await?;
Ok(ManagedSplitProcess {
process,
stdin,
stdout_rx,
stderr_rx,
})
}
}
}
async fn spawn_exec_env_process(
env: &ExecRequest,
mode: ExecEnvSpawnMode,
) -> Result<SpawnedExecEnvProcess, UnifiedExecError> {
let (program, args) = env
.command
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawn_result = if tty {
codex_utils_pty::pty::spawn_process(
let spawn_result = match mode {
ExecEnvSpawnMode::Merged { tty: true } => codex_utils_pty::pty::spawn_process(
program,
args,
env.cwd.as_path(),
@@ -548,8 +611,19 @@ impl UnifiedExecProcessManager {
codex_utils_pty::TerminalSize::default(),
)
.await
} else {
codex_utils_pty::pipe::spawn_process_no_stdin(
.map(SpawnedExecEnvProcess::Merged),
ExecEnvSpawnMode::Merged { tty: false } => {
codex_utils_pty::pipe::spawn_process_no_stdin(
program,
args,
env.cwd.as_path(),
&env.env,
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Merged)
}
ExecEnvSpawnMode::SplitPipe => codex_utils_pty::pipe::spawn_process_split(
program,
args,
env.cwd.as_path(),
@@ -557,11 +631,10 @@ impl UnifiedExecProcessManager {
&env.arg0,
)
.await
.map(SpawnedExecEnvProcess::Split),
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))
}
pub(super) async fn open_session_with_sandbox(

View File

@@ -38,13 +38,20 @@ use image::Rgba;
use image::load_from_memory;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::time::Duration;
use wiremock::BodyPrintLimit;
use wiremock::Mock;
use wiremock::MockServer;
#[cfg(not(debug_assertions))]
use wiremock::Respond;
use wiremock::ResponseTemplate;
#[cfg(not(debug_assertions))]
use wiremock::matchers::body_string_contains;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
fn image_messages(body: &Value) -> Vec<&Value> {
body.get("input")
@@ -73,6 +80,27 @@ fn find_image_message(body: &Value) -> Option<&Value> {
image_messages(body).into_iter().next()
}
fn request_body_json(request: &wiremock::Request) -> Value {
let content_encoding = request
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok());
let body = if content_encoding
.is_some_and(|value| value.split(',').any(|entry| entry.trim() == "zstd"))
{
match zstd::stream::decode_all(std::io::Cursor::new(&request.body)) {
Ok(body) => body,
Err(err) => panic!("decode zstd request body: {err}"),
}
} else {
request.body.clone()
};
match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => panic!("request body json: {err}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_turn_with_local_image_attaches_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
@@ -910,6 +938,652 @@ await codex.emitImage(out);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_tool_attaches_multiple_local_images() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let call_id = "js-repl-view-image-two";
let js_input = r#"
const fs = await import("node:fs/promises");
const path = await import("node:path");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
const imagePathA = path.join(codex.tmpDir, "js-repl-view-image-a.png");
const imagePathB = path.join(codex.tmpDir, "js-repl-view-image-b.png");
await fs.writeFile(imagePathA, png);
await fs.writeFile(imagePathB, png);
const outA = await codex.tool("view_image", { path: imagePathA });
const outB = await codex.tool("view_image", { path: imagePathB });
await codex.emitImage(outA);
await codex.emitImage(outB);
console.log("attached-two-images");
"#;
let first_response = sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(call_id, "js_repl", js_input),
ev_completed("resp-1"),
]);
responses::mount_sse_once(&server, first_response).await;
let second_response = sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]);
let mock = responses::mount_sse_once(&server, second_response).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl to write two images and attach them".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: Some(ReasoningSummary::Auto),
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(10),
)
.await;
let req = mock.single_request();
let custom_output = req.custom_tool_call_output(call_id);
assert_ne!(
custom_output.get("success").and_then(Value::as_bool),
Some(false),
"js_repl call failed unexpectedly: {custom_output}"
);
let output_items = custom_output
.get("output")
.and_then(Value::as_array)
.expect("custom_tool_call_output should be a content item array");
let js_repl_output = output_items
.iter()
.find_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_text"))
.then(|| item.get("text").and_then(Value::as_str))
.flatten()
})
.expect("custom tool output text present");
assert!(
js_repl_output.contains("attached-two-images"),
"expected js_repl output marker, got {js_repl_output}"
);
let image_urls = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_urls.len(),
2,
"js_repl should include one input_image content item per nested view_image call"
);
for image_url in image_urls {
assert!(
image_url.starts_with("data:image/png;base64,"),
"expected png data URL, got {image_url}"
);
}
let body = req.body_json();
assert_eq!(
image_messages(&body).len(),
0,
"js_repl should not inject pending input image messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_tool_attaches_local_image() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit";
let poll_call_id = "js-repl-poll-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
await codex.emitImage(out);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
exec_id: Arc<Mutex<Option<String>>>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
*self.exec_id.lock().expect("exec id lock") = Some(exec_id.to_string());
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
let exec_id = self
.exec_id
.lock()
.expect("exec id lock")
.clone()
.expect("exec id should be recorded before polling");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2b"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2b"),
])
}
3 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
exec_id: Arc::new(Mutex::new(None)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(4)
.expect(4)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image and attach it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
4,
"expected submit, two polls, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(3)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let output_items = poll_output["output"]
.as_array()
.expect("js_repl_poll output should be a content item array");
let status_item = output_items
.first()
.expect("first poll output item should be status text");
assert_eq!(
status_item.get("type").and_then(Value::as_str),
Some("input_text")
);
let status_json: Value = serde_json::from_str(
status_item["text"]
.as_str()
.expect("status item text should be present"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
assert_eq!(status_json["logs"], Value::Null);
let image_items = output_items
.iter()
.filter_map(|item| {
(item.get("type").and_then(Value::as_str) == Some("input_image"))
.then(|| item.get("image_url").and_then(Value::as_str))
.flatten()
})
.collect::<Vec<_>>();
assert_eq!(
image_items.len(),
1,
"expected one image item in poll output"
);
assert!(
image_items[0].starts_with("data:image/png;base64,"),
"expected png data URL, got {}",
image_items[0]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_poll_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
const POLL_WAIT_MS: u64 = 30_000;
const TURN_TIMEOUT_SECS: u64 = 30;
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::JsRepl)
.expect("test config should allow feature update");
config
.features
.enable(Feature::JsReplPolling)
.expect("test config should allow feature update");
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let submit_call_id = "js-repl-poll-submit-no-emit";
let poll_call_id = "js-repl-poll-no-image";
let js_input = r#"// codex-js-repl: poll=true
const fs = await import("node:fs/promises");
const path = await import("node:path");
const imagePath = path.join(codex.tmpDir, "js-repl-poll-view-image-no-emit.png");
const png = Buffer.from(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
"base64"
);
await fs.writeFile(imagePath, png);
const out = await codex.tool("view_image", { path: imagePath });
console.log(out.type);
"#
.to_string();
#[derive(Clone)]
struct PollSequenceResponder {
requests: Arc<Mutex<Vec<wiremock::Request>>>,
call_count: Arc<AtomicUsize>,
exec_id: Arc<Mutex<Option<String>>>,
submit_call_id: &'static str,
poll_call_id: &'static str,
js_input: String,
}
impl Respond for PollSequenceResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
self.requests
.lock()
.expect("request log lock")
.push(request.clone());
let request_json = request_body_json(request);
let body = match self.call_count.fetch_add(1, Ordering::SeqCst) {
0 => sse(vec![
ev_response_created("resp-1"),
ev_custom_tool_call(self.submit_call_id, "js_repl", &self.js_input),
ev_completed("resp-1"),
]),
1 => {
let submit_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str)
== Some("custom_tool_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.submit_call_id)
})
.expect("js_repl polling submit output");
let submit_payload = submit_output["output"]
.as_str()
.expect("js_repl polling submit output string");
let submit_json: Value =
serde_json::from_str(submit_payload).expect("submit payload json");
let exec_id = submit_json["exec_id"]
.as_str()
.expect("exec_id present in submit payload");
*self.exec_id.lock().expect("exec id lock") = Some(exec_id.to_string());
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2"),
])
}
2 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
let exec_id = self
.exec_id
.lock()
.expect("exec id lock")
.clone()
.expect("exec id should be recorded before polling");
let poll_args = serde_json::json!({
"exec_id": exec_id,
"yield_time_ms": POLL_WAIT_MS,
})
.to_string();
sse(vec![
ev_response_created("resp-2b"),
ev_function_call(self.poll_call_id, "js_repl_poll", &poll_args),
ev_completed("resp-2b"),
])
}
3 => {
let poll_output = request_json["input"]
.as_array()
.expect("request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str)
== Some(self.poll_call_id)
})
.expect("js_repl_poll function_call_output present");
assert!(
poll_output.get("output").is_some(),
"js_repl_poll output should be present"
);
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
])
}
call_num => panic!("unexpected extra responses request {call_num}"),
};
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
}
let requests = Arc::new(Mutex::new(Vec::new()));
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(PollSequenceResponder {
requests: Arc::clone(&requests),
call_count: Arc::new(AtomicUsize::new(0)),
exec_id: Arc::new(Mutex::new(None)),
submit_call_id,
poll_call_id,
js_input,
})
.up_to_n_times(4)
.expect(4)
.mount(&server)
.await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "use js_repl polling to write an image without emitting it".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
service_tier: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
let mut tool_event = None;
wait_for_event_with_timeout(
&codex,
|event| match event {
EventMsg::ViewImageToolCall(_) => {
tool_event = Some(event.clone());
false
}
EventMsg::TurnComplete(_) => true,
_ => false,
},
Duration::from_secs(TURN_TIMEOUT_SECS),
)
.await;
let tool_event = match tool_event {
Some(EventMsg::ViewImageToolCall(event)) => event,
other => panic!("expected ViewImageToolCall event, got {other:?}"),
};
assert!(
tool_event
.path
.ends_with("js-repl-poll-view-image-no-emit.png"),
"unexpected image path: {}",
tool_event.path.display()
);
let requests = requests.lock().expect("request log lock").clone();
assert_eq!(
requests.len(),
4,
"expected submit, two polls, and completion requests"
);
let poll_request_json = request_body_json(
requests
.get(3)
.expect("final request with js_repl_poll output should be present"),
);
let poll_output = poll_request_json["input"]
.as_array()
.expect("poll request input array")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(poll_call_id)
})
.expect("js_repl_poll function_call_output present");
let status_json: Value = serde_json::from_str(
poll_output["output"]
.as_str()
.expect("js_repl_poll output should stay text without explicit emit"),
)
.expect("status item should be valid json");
assert_eq!(status_json["status"].as_str(), Some("completed"));
assert_eq!(status_json["error"], Value::Null);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_view_image_requires_explicit_emit() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -131,11 +131,10 @@ trust_level = "trusted"
let mut output = Vec::new();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut output_rx = output_rx;
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let interrupt_writer = writer_tx.clone();
@@ -151,14 +150,13 @@ trust_level = "trusted"
loop {
select! {
result = output_rx.recv() => match result {
Ok(chunk) => {
Some(chunk) => {
if chunk.windows(4).any(|window| window == b"\x1b[6n") {
let _ = writer_tx.send(b"\x1b[1;1R".to_vec()).await;
}
output.extend_from_slice(&chunk);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break exit_rx.await,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
None => break exit_rx.await,
},
result = &mut exit_rx => break result,
}

View File

@@ -77,11 +77,10 @@ async fn run_codex_cli(
let mut output = Vec::new();
let codex_utils_pty::SpawnedProcess {
session,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
} = spawned;
let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let mut output_rx = output_rx;
let mut exit_rx = exit_rx;
let writer_tx = session.writer_sender();
let exit_code_result = timeout(Duration::from_secs(10), async {
@@ -90,7 +89,7 @@ async fn run_codex_cli(
loop {
select! {
result = output_rx.recv() => match result {
Ok(chunk) => {
Some(chunk) => {
// The TUI asks for the cursor position via ESC[6n.
// Respond with a valid position to unblock startup.
if chunk.windows(4).any(|window| window == b"\x1b[6n") {
@@ -98,8 +97,7 @@ async fn run_codex_cli(
}
output.extend_from_slice(&chunk);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break exit_rx.await,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
None => break exit_rx.await,
},
result = &mut exit_rx => break result,
}

View File

@@ -7,7 +7,7 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
- `spawn_pty_process(program, args, cwd, env, arg0, size)``SpawnedProcess`
- `spawn_pipe_process(program, args, cwd, env, arg0)``SpawnedProcess`
- `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)``SpawnedProcess`
- `combine_output_receivers(stdout_rx, stderr_rx)``broadcast::Receiver<Vec<u8>>`
- `spawn_pipe_process_split(program, args, cwd, env, arg0)``SpawnedProcessSplit`
- `conpty_supported()``bool` (Windows only; always true elsewhere)
- `TerminalSize { rows, cols }` selects PTY dimensions in character cells.
- `ProcessHandle` exposes:
@@ -15,14 +15,14 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud
- `resize(TerminalSize)`
- `close_stdin()`
- `has_exited()`, `exit_code()`, `terminate()`
- `SpawnedProcess` bundles `session`, `stdout_rx`, `stderr_rx`, and `exit_rx` (oneshot exit code).
- `SpawnedProcess` bundles `session`, `output_rx`, and `exit_rx` (oneshot exit code).
- `SpawnedProcessSplit` bundles `session`, merged `output_rx`, split `stdout_rx` / `stderr_rx`, and `exit_rx`.
## Usage examples
```rust
use std::collections::HashMap;
use std::path::Path;
use codex_utils_pty::combine_output_receivers;
use codex_utils_pty::spawn_pty_process;
use codex_utils_pty::TerminalSize;
@@ -41,7 +41,7 @@ let writer = spawned.session.writer_sender();
writer.send(b"exit\n".to_vec()).await?;
// Collect output until the process exits.
let mut output_rx = combine_output_receivers(spawned.stdout_rx, spawned.stderr_rx);
let mut output_rx = spawned.output_rx;
let mut collected = Vec::new();
while let Ok(chunk) = output_rx.try_recv() {
collected.extend_from_slice(&chunk);

View File

@@ -13,18 +13,20 @@ pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024;
pub use pipe::spawn_process as spawn_pipe_process;
/// Spawn a non-interactive process using regular pipes, but close stdin immediately.
pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
/// Combine stdout/stderr receivers into a single broadcast receiver.
pub use process::combine_output_receivers;
/// Handle for interacting with a spawned process (PTY or pipe).
pub use process::ProcessHandle;
/// Bundle of process handles plus split output and exit receivers returned by spawn helpers.
/// Bundle of process handles plus merged output and exit receivers returned by spawn helpers.
pub use process::SpawnedProcess;
/// Bundle of process handles plus split stdout/stderr receivers returned by pipe spawn helpers.
pub use process::SpawnedProcessSplit;
/// Terminal size in character cells used for PTY spawn and resize operations.
pub use process::TerminalSize;
/// Backwards-compatible alias for ProcessHandle.
pub type ExecCommandSession = ProcessHandle;
/// Backwards-compatible alias for SpawnedProcess.
pub type SpawnedPty = SpawnedProcess;
/// Spawn a non-interactive process using regular pipes and preserve split stdout/stderr streams.
pub use pipe::spawn_process_split as spawn_pipe_process_split;
/// Report whether ConPTY is available on this platform (Windows only).
pub use pty::conpty_supported;
/// Spawn a process attached to a PTY for interactive use.

View File

@@ -20,6 +20,7 @@ use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::SpawnedProcess;
use crate::process::SpawnedProcessSplit;
#[cfg(target_os = "linux")]
use libc;
@@ -72,8 +73,11 @@ fn kill_process(pid: u32) -> io::Result<()> {
}
}
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
where
async fn read_output_stream<R>(
mut reader: R,
merged_output_tx: mpsc::Sender<Vec<u8>>,
stream_output_tx: mpsc::Sender<Vec<u8>>,
) where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; 8_192];
@@ -81,7 +85,9 @@ where
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec()).await;
let chunk = buf[..n].to_vec();
let _ = merged_output_tx.send(chunk.clone()).await;
let _ = stream_output_tx.send(chunk).await;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
@@ -102,7 +108,7 @@ async fn spawn_process_with_stdin_mode(
env: &HashMap<String, String>,
arg0: &Option<String>,
stdin_mode: PipeStdinMode,
) -> Result<SpawnedProcess> {
) -> Result<SpawnedProcessSplit> {
if program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
@@ -156,6 +162,7 @@ async fn spawn_process_with_stdin_mode(
let stderr = child.stderr.take();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, output_rx) = mpsc::channel::<Vec<u8>>(256);
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(128);
let writer_handle = if let Some(stdin) = stdin {
@@ -173,15 +180,17 @@ async fn spawn_process_with_stdin_mode(
};
let stdout_handle = stdout.map(|stdout| {
let output_tx = output_tx.clone();
let stdout_tx = stdout_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), stdout_tx).await;
read_output_stream(BufReader::new(stdout), output_tx, stdout_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let output_tx = output_tx.clone();
let stderr_tx = stderr_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), stderr_tx).await;
read_output_stream(BufReader::new(stderr), output_tx, stderr_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
@@ -219,6 +228,7 @@ async fn spawn_process_with_stdin_mode(
let handle = ProcessHandle::new(
writer_tx,
Some(pid),
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
@@ -234,15 +244,16 @@ async fn spawn_process_with_stdin_mode(
None,
);
Ok(SpawnedProcess {
Ok(SpawnedProcessSplit {
session: handle,
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit.
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, merged output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
@@ -250,7 +261,13 @@ pub async fn spawn_process(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await?;
Ok(SpawnedProcess {
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes, but close stdin immediately.
@@ -261,5 +278,22 @@ pub async fn spawn_process_no_stdin(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await
let spawned =
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Null).await?;
Ok(SpawnedProcess {
session: spawned.session,
output_rx: spawned.output_rx,
exit_rx: spawned.exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), preserving split stdout/stderr streams.
pub async fn spawn_process_split(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcessSplit> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped).await
}

View File

@@ -8,7 +8,6 @@ use anyhow::anyhow;
use portable_pty::MasterPty;
use portable_pty::PtySize;
use portable_pty::SlavePty;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
@@ -55,6 +54,7 @@ impl fmt::Debug for PtyHandles {
/// Handle for driving an interactive process (PTY or pipe).
pub struct ProcessHandle {
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
pid: Option<u32>,
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
@@ -77,6 +77,7 @@ impl ProcessHandle {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
pid: Option<u32>,
killer: Box<dyn ChildTerminator>,
reader_handle: JoinHandle<()>,
reader_abort_handles: Vec<AbortHandle>,
@@ -88,6 +89,7 @@ impl ProcessHandle {
) -> Self {
Self {
writer_tx: StdMutex::new(Some(writer_tx)),
pid,
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_abort_handles: StdMutex::new(reader_abort_handles),
@@ -112,6 +114,11 @@ impl ProcessHandle {
writer_tx
}
/// Returns the child process ID when available.
pub fn pid(&self) -> Option<u32> {
self.pid
}
/// True if the child process has exited.
pub fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
@@ -171,9 +178,7 @@ impl ProcessHandle {
}
}
if let Ok(mut h) = self.wait_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
let _ = h.take();
}
}
}
@@ -184,45 +189,19 @@ impl Drop for ProcessHandle {
}
}
/// Combine split stdout/stderr receivers into a single broadcast receiver.
pub fn combine_output_receivers(
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
) -> broadcast::Receiver<Vec<u8>> {
let (combined_tx, combined_rx) = broadcast::channel(256);
tokio::spawn(async move {
let mut stdout_open = true;
let mut stderr_open = true;
loop {
tokio::select! {
stdout = stdout_rx.recv(), if stdout_open => match stdout {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stdout_open = false;
}
},
stderr = stderr_rx.recv(), if stderr_open => match stderr {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stderr_open = false;
}
},
else => break,
}
}
});
combined_rx
}
/// Return value from PTY or pipe spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcess {
pub session: ProcessHandle,
pub output_rx: mpsc::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}
/// Return value from split-output spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcessSplit {
pub session: ProcessHandle,
pub output_rx: mpsc::Receiver<Vec<u8>>,
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,

View File

@@ -71,7 +71,7 @@ fn platform_native_pty_system() -> Box<dyn portable_pty::PtySystem + Send> {
}
}
/// Spawn a process attached to a PTY, returning handles for stdin, split output, and exit.
/// Spawn a process attached to a PTY, returning handles for stdin, merged output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
@@ -98,6 +98,7 @@ pub async fn spawn_process(
}
let mut child = pair.slave.spawn_command(command_builder)?;
let pid = child.process_id();
#[cfg(unix)]
// portable-pty establishes the spawned PTY child as a new session leader on
// Unix, so PID == PGID and we can reuse the pipe backend's process-group
@@ -106,8 +107,7 @@ pub async fn spawn_process(
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (_stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(1);
let (output_tx, output_rx) = mpsc::channel::<Vec<u8>>(256);
let mut reader = pair.master.try_clone_reader()?;
let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8_192];
@@ -115,7 +115,9 @@ pub async fn spawn_process(
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = stdout_tx.blocking_send(buf[..n].to_vec());
if output_tx.blocking_send(buf[..n].to_vec()).is_err() {
break;
}
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
@@ -169,6 +171,7 @@ pub async fn spawn_process(
let handle = ProcessHandle::new(
writer_tx,
pid,
Box::new(PtyChildTerminator {
killer,
#[cfg(unix)]
@@ -185,8 +188,7 @@ pub async fn spawn_process(
Ok(SpawnedProcess {
session: handle,
stdout_rx,
stderr_rx,
output_rx,
exit_rx,
})
}

View File

@@ -3,9 +3,8 @@ use std::path::Path;
use pretty_assertions::assert_eq;
use crate::combine_output_receivers;
use crate::spawn_pipe_process;
use crate::spawn_pipe_process_no_stdin;
use crate::spawn_pipe_process_split;
use crate::spawn_pty_process;
use crate::SpawnedProcess;
use crate::TerminalSize;
@@ -65,6 +64,21 @@ fn split_stdout_stderr_command() -> String {
}
}
fn combine_spawned_output(
spawned: SpawnedProcess,
) -> (
crate::ProcessHandle,
tokio::sync::mpsc::Receiver<Vec<u8>>,
tokio::sync::oneshot::Receiver<i32>,
) {
let SpawnedProcess {
session,
output_rx,
exit_rx,
} = spawned;
(session, output_rx, exit_rx)
}
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
let mut collected = Vec::new();
while let Some(chunk) = output_rx.recv().await {
@@ -73,28 +87,8 @@ async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>
collected
}
fn combine_spawned_output(
spawned: SpawnedProcess,
) -> (
crate::ProcessHandle,
tokio::sync::broadcast::Receiver<Vec<u8>>,
tokio::sync::oneshot::Receiver<i32>,
) {
let SpawnedProcess {
session,
stdout_rx,
stderr_rx,
exit_rx,
} = spawned;
(
session,
combine_output_receivers(stdout_rx, stderr_rx),
exit_rx,
)
}
async fn collect_output_until_exit(
mut output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
exit_rx: tokio::sync::oneshot::Receiver<i32>,
timeout_ms: u64,
) -> (Vec<u8>, i32) {
@@ -105,7 +99,7 @@ async fn collect_output_until_exit(
loop {
tokio::select! {
res = output_rx.recv() => {
if let Ok(chunk) = res {
if let Some(chunk) = res {
collected.extend_from_slice(&chunk);
}
}
@@ -120,9 +114,8 @@ async fn collect_output_until_exit(
tokio::time::Instant::now() + tokio::time::Duration::from_millis(max_ms);
while tokio::time::Instant::now() < max_deadline {
match tokio::time::timeout(quiet, output_rx.recv()).await {
Ok(Ok(chunk)) => collected.extend_from_slice(&chunk),
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
Ok(Some(chunk)) => collected.extend_from_slice(&chunk),
Ok(None) => break,
Err(_) => break,
}
}
@@ -136,7 +129,7 @@ async fn collect_output_until_exit(
}
async fn wait_for_python_repl_ready(
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
output_rx: &mut tokio::sync::mpsc::Receiver<Vec<u8>>,
timeout_ms: u64,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
@@ -147,14 +140,13 @@ async fn wait_for_python_repl_ready(
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
Ok(Some(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
Ok(None) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
@@ -187,7 +179,7 @@ fn process_exists(pid: i32) -> anyhow::Result<bool> {
#[cfg(unix)]
async fn wait_for_marker_pid(
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
output_rx: &mut tokio::sync::mpsc::Receiver<Vec<u8>>,
marker: &str,
timeout_ms: u64,
) -> anyhow::Result<i32> {
@@ -205,7 +197,8 @@ async fn wait_for_marker_pid(
let remaining = deadline.saturating_duration_since(now);
let chunk = tokio::time::timeout(remaining, output_rx.recv())
.await
.map_err(|_| anyhow::anyhow!("timeout waiting for PTY output"))??;
.map_err(|_| anyhow::anyhow!("timeout waiting for PTY output"))?
.ok_or_else(|| anyhow::anyhow!("PTY output closed while waiting for marker"))?;
collected.extend_from_slice(&chunk);
let text = String::from_utf8_lossy(&collected);
@@ -337,6 +330,112 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_preserves_initial_output_for_quick_exit() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let script = if cfg!(windows) {
"echo stdout-line & echo stderr-line 1>&2"
} else {
"printf 'stdout-line\\n'; printf 'stderr-line\\n' >&2"
};
let (program, args) = shell_command(script);
for _ in 0..32 {
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (output, code) =
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 2_000).await;
let text = String::from_utf8_lossy(&output);
assert_eq!(code, 0, "expected quick pipe process to exit cleanly");
assert!(
text.contains("stdout-line"),
"expected stdout in merged output: {text:?}"
);
assert!(
text.contains("stderr-line"),
"expected stderr in merged output: {text:?}"
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_split_preserves_stdout_and_stderr() -> anyhow::Result<()> {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_split_preserves_stdout_and_stderr");
return Ok(());
};
let args = vec![
"-u".to_string(),
"-c".to_string(),
"import sys; print('stdout-line'); sys.stderr.write('stderr-line\\n'); sys.stderr.flush()"
.to_string(),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process_split(&python, &args, Path::new("."), &env_map, &None).await?;
let crate::process::SpawnedProcessSplit {
output_rx,
stdout_rx,
stderr_rx,
exit_rx,
..
} = spawned;
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });
let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await });
let merged_task =
tokio::spawn(async move { collect_output_until_exit(output_rx, exit_rx, 5_000).await });
let stdout = stdout_task.await?;
let stderr = stderr_task.await?;
let (merged, code) = merged_task.await?;
assert_eq!(code, 0, "expected python -c to exit cleanly");
assert_eq!(
String::from_utf8_lossy(&stdout).replace("\r\n", "\n"),
"stdout-line\n"
);
assert_eq!(
String::from_utf8_lossy(&stderr).replace("\r\n", "\n"),
"stderr-line\n"
);
let merged = String::from_utf8_lossy(&merged);
assert!(merged.contains("stdout-line"));
assert!(merged.contains("stderr-line"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_split_preserves_initial_merged_output_for_quick_exit() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let script = if cfg!(windows) {
"echo stdout-line & echo stderr-line 1>&2"
} else {
"printf 'stdout-line\\n'; printf 'stderr-line\\n' >&2"
};
let (program, args) = shell_command(script);
for _ in 0..32 {
let spawned =
spawn_pipe_process_split(&program, &args, Path::new("."), &env_map, &None).await?;
let (output, code) =
collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 2_000).await;
let text = String::from_utf8_lossy(&output);
assert_eq!(code, 0, "expected quick split pipe process to exit cleanly");
assert!(
text.contains("stdout-line"),
"expected stdout in merged output: {text:?}"
);
assert!(
text.contains("stderr-line"),
"expected stderr in merged output: {text:?}"
);
}
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
@@ -351,8 +450,9 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> {
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (_session, mut output_rx, exit_rx) = combine_spawned_output(spawned);
let pid_bytes =
tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()).await??;
let pid_bytes = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("expected child pid output before pipe exit"))?;
let pid_text = String::from_utf8_lossy(&pid_bytes);
let child_pid: i32 = pid_text
.split_whitespace()
@@ -447,13 +547,15 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
let env_map: HashMap<String, String> = std::env::vars().collect();
let (program, args) = shell_command(&split_stdout_stderr_command());
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
session: _session,
spawn_pipe_process_split(&program, &args, Path::new("."), &env_map, &None).await?;
let crate::process::SpawnedProcessSplit {
session,
stdout_rx,
stderr_rx,
exit_rx,
..
} = spawned;
session.close_stdin();
let timeout_ms = if cfg!(windows) { 10_000 } else { 2_000 };
let timeout = tokio::time::Duration::from_millis(timeout_ms);
@@ -497,28 +599,25 @@ async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let script =
"setsid sh -c 'i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &";
"setsid sh -c 'echo ready; sleep 0.3; i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &";
let (program, args) = shell_command(script);
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned);
let _ = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv())
.await
.map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??;
let ready = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("expected detached output before terminate"))?;
assert_eq!(ready, b"ready\n".to_vec());
session.terminate();
let mut post_rx = output_rx.resubscribe();
let post_terminate =
tokio::time::timeout(tokio::time::Duration::from_millis(200), post_rx.recv()).await;
tokio::time::timeout(tokio::time::Duration::from_millis(200), output_rx.recv()).await;
match post_terminate {
Err(_) => Ok(()),
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => Ok(()),
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => {
anyhow::bail!("unexpected output after terminate (lagged)")
}
Ok(Ok(chunk)) => anyhow::bail!(
Ok(None) => Ok(()),
Ok(Some(chunk)) => anyhow::bail!(
"unexpected output after terminate: {:?}",
String::from_utf8_lossy(&chunk)
),

View File

@@ -19,7 +19,17 @@ js_repl = true
js_repl_tools_only = true
```
When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset`; other tools remain available via `await codex.tool(...)` inside js_repl.
When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset` (and `js_repl_poll` if polling is enabled). Other tools remain available via `await codex.tool(...)` inside `js_repl`.
`js_repl_polling` can be enabled to allow async/polled execution:
```toml
[features]
js_repl = true
js_repl_polling = true
```
When enabled, `js_repl` accepts `poll=true` in the first-line pragma and returns both `exec_id` and `session_id`. Reuse polling state by passing `session_id=<id>` in later `js_repl` pragmas. Omit `session_id` to create a new polling session; unknown `session_id` values return an error. Use `js_repl_poll` with `exec_id` until `status` becomes `completed` or `error`.
## Node runtime
@@ -59,6 +69,9 @@ imported local file. They are not resolved relative to the imported file's locat
- `js_repl` is a freeform tool: send raw JavaScript source text.
- Optional first-line pragma:
- `// codex-js-repl: timeout_ms=15000`
- `// codex-js-repl: poll=true`
- `// codex-js-repl: poll=true session_id=my-session`
- Use space-separated pragma arguments.
- Top-level bindings persist across calls.
- If a cell throws, prior bindings remain available, lexical bindings whose initialization completed before the throw stay available in later calls, and hoisted `var` / `function` bindings persist only when execution clearly reached their declaration or a supported write site.
- Supported hoisted-`var` failed-cell cases are direct top-level identifier writes and updates before the declaration (for example `x = 1`, `x += 1`, `x++`, `x &&= 1`) and non-empty top-level `for...in` / `for...of` loops.
@@ -70,6 +83,17 @@ imported local file. They are not resolved relative to the imported file's locat
- Local file modules reload between execs, so a later `await import("./file.js")` picks up edits and fixed failures. Top-level bindings you already created still persist until `js_repl_reset`.
- Use `js_repl_reset` to clear the kernel state.
### Polling flow
1. Submit with `js_repl` and `poll=true` pragma.
2. Read `exec_id` and `session_id` from the JSON response.
3. Call `js_repl_poll` with `{"exec_id":"...","yield_time_ms":5000}`.
4. Repeat until `status` is `completed` or `error`. If a poll returns `status: running`, keep polling the same `exec_id` even if the logs or `final_output` already look complete. Completed polls can also include nested multimodal tool output after the JSON status item.
5. Optional: reuse session state by submitting another polled `js_repl` call with `session_id=<id>` (must already exist). Omit `session_id` to create a new polling session.
6. Reset one session with `js_repl_reset({"session_id":"..."})`, or reset all kernels with `js_repl_reset({})`.
`timeout_ms` is only supported for non-polling `js_repl` executions. With `poll=true`, use `js_repl_poll.yield_time_ms` to control how long each poll waits before returning. If omitted, or set below `5000`, `js_repl_poll` waits up to 5 seconds before returning if nothing new arrives.
## Helper APIs inside the kernel
`js_repl` exposes these globals:
@@ -79,7 +103,7 @@ imported local file. They are not resolved relative to the imported file's locat
- `codex.tmpDir`: per-session scratch directory path.
- `codex.tool(name, args?)`: executes a normal Codex tool call from inside `js_repl` (including shell tools like `shell` / `shell_command` when available).
- `codex.emitImage(imageLike)`: explicitly adds one image to the outer `js_repl` function output each time you call it.
- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells, but async callbacks that fire after a cell finishes still fail because no exec is active.
- `codex.tool(...)` and `codex.emitImage(...)` keep stable helper identities across cells. Saved references and persisted objects can reuse them in later cells. In non-polling mode, async callbacks that fire after a cell finishes still fail because no exec is active. In polling mode, async callbacks and timers scheduled during a polled exec keep that exec active until they settle or are cleared.
- Imported local files run in the same VM context, so they can also access `codex.*`, the captured `console`, and Node-like `import.meta` helpers.
- Each `codex.tool(...)` call emits a bounded summary at `info` level from the `codex_core::tools::js_repl` logger. At `trace` level, the same path also logs the exact raw response object or error string seen by JavaScript.
- Nested `codex.tool(...)` outputs stay inside JavaScript unless you emit them explicitly.