Compare commits

...

1 Commits

Author SHA1 Message Date
Channing Conger
a8e4ae7612 Code Mode => New Crate + v8 2026-03-19 21:42:05 -07:00
29 changed files with 3778 additions and 2241 deletions

19
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

247
codex-rs/Cargo.lock generated
View File

@@ -949,6 +949,8 @@ dependencies = [
"cexpr",
"clang-sys",
"itertools 0.13.0",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
@@ -1152,6 +1154,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade8366b8bd5ba243f0a58f036cc0ca8a2f069cff1a2351ef1cac6b083e16fc0"
[[package]]
name = "calendrical_calculations"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a0b39595c6ee54a8d0900204ba4c401d0ab4eb45adaf07178e8d017541529e7"
dependencies = [
"core_maths",
"displaydoc",
]
[[package]]
name = "cassowary"
version = "0.3.0"
@@ -1584,7 +1596,7 @@ dependencies = [
"thiserror 2.0.18",
"tokio",
"url",
"which",
"which 8.0.0",
"wiremock",
"zip",
]
@@ -1788,6 +1800,21 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "codex-code-mode"
version = "0.0.0"
dependencies = [
"async-trait",
"libc",
"pretty_assertions",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"v8",
]
[[package]]
name = "codex-config"
version = "0.0.0"
@@ -1845,6 +1872,8 @@ dependencies = [
"codex-arg0",
"codex-artifacts",
"codex-async-utils",
"codex-client",
"codex-code-mode",
"codex-config",
"codex-connectors",
"codex-exec-server",
@@ -1930,7 +1959,7 @@ dependencies = [
"url",
"uuid",
"walkdir",
"which",
"which 8.0.0",
"wildmatch",
"windows-sys 0.52.0",
"wiremock",
@@ -2179,7 +2208,7 @@ dependencies = [
"serde_json",
"tokio",
"tracing",
"which",
"which 8.0.0",
"wiremock",
]
@@ -2432,7 +2461,7 @@ dependencies = [
"tracing",
"urlencoding",
"webbrowser",
"which",
"which 8.0.0",
]
[[package]]
@@ -2472,7 +2501,7 @@ dependencies = [
"tree-sitter",
"tree-sitter-bash",
"url",
"which",
"which 8.0.0",
]
[[package]]
@@ -2643,7 +2672,7 @@ dependencies = [
"uuid",
"vt100",
"webbrowser",
"which",
"which 8.0.0",
"windows-sys 0.52.0",
"winsplit",
]
@@ -2735,7 +2764,7 @@ dependencies = [
"uuid",
"vt100",
"webbrowser",
"which",
"which 8.0.0",
"windows-sys 0.52.0",
"winsplit",
]
@@ -3111,6 +3140,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "core_maths"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77745e017f5edba1a9c1d854f6f3a52dac8a12dd5af5d2f54aecf61e43d80d30"
dependencies = [
"libm",
]
[[package]]
name = "core_test_support"
version = "0.0.0"
@@ -3715,6 +3753,38 @@ dependencies = [
"subtle",
]
[[package]]
name = "diplomat"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9adb46b05e2f53dcf6a7dfc242e4ce9eb60c369b6b6eb10826a01e93167f59c6"
dependencies = [
"diplomat_core",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "diplomat-runtime"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0569bd3caaf13829da7ee4e83dbf9197a0e1ecd72772da6d08f0b4c9285c8d29"
[[package]]
name = "diplomat_core"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51731530ed7f2d4495019abc7df3744f53338e69e2863a6a64ae91821c763df1"
dependencies = [
"proc-macro2",
"quote",
"serde",
"smallvec",
"strck",
"syn 2.0.114",
]
[[package]]
name = "dirs"
version = "6.0.0"
@@ -4321,6 +4391,16 @@ dependencies = [
"libc",
]
[[package]]
name = "fslock"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futures"
version = "0.3.31"
@@ -4549,6 +4629,15 @@ dependencies = [
"regex-syntax 0.8.8",
]
[[package]]
name = "gzip-header"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2"
dependencies = [
"crc32fast",
]
[[package]]
name = "h2"
version = "0.4.13"
@@ -5006,6 +5095,28 @@ dependencies = [
"cc",
]
[[package]]
name = "icu_calendar"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6f0e52e009b6b16ba9c0693578796f2dd4aaa59a7f8f920423706714a89ac4e"
dependencies = [
"calendrical_calculations",
"displaydoc",
"icu_calendar_data",
"icu_locale",
"icu_locale_core",
"icu_provider",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_calendar_data"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527f04223b17edfe0bd43baf14a0cb1b017830db65f3950dc00224860a9a446d"
[[package]]
name = "icu_collections"
version = "2.1.1"
@@ -5440,6 +5551,12 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "ixdtf"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84de9d95a6d2547d9b77ee3f25fa0ee32e3c3a6484d47a55adebc0439c077992"
[[package]]
name = "jiff"
version = "0.2.18"
@@ -7165,6 +7282,16 @@ dependencies = [
"yansi",
]
[[package]]
name = "prettyplease"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn 2.0.114",
]
[[package]]
name = "proc-macro-crate"
version = "3.4.0"
@@ -7998,6 +8125,16 @@ dependencies = [
"webpki-roots 1.0.5",
]
[[package]]
name = "resb"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a067ab3b5ca3b4dc307d0de9cf75f9f5e6ca9717b192b2f28a36c83e5de9e76"
dependencies = [
"potential_utf",
"serde_core",
]
[[package]]
name = "resolv-conf"
version = "0.7.6"
@@ -9378,6 +9515,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "strck"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42316e70da376f3d113a68d138a60d8a9883c604fe97942721ec2068dab13a9f"
dependencies = [
"unicode-ident",
]
[[package]]
name = "streaming-iterator"
version = "0.1.9"
@@ -9622,6 +9768,39 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "temporal_capi"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a151e402c2bdb6a3a2a2f3f225eddaead2e7ce7dd5d3fa2090deb11b17aa4ed8"
dependencies = [
"diplomat",
"diplomat-runtime",
"icu_calendar",
"icu_locale",
"num-traits",
"temporal_rs",
"timezone_provider",
"writeable",
"zoneinfo64",
]
[[package]]
name = "temporal_rs"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88afde3bd75d2fc68d77a914bece426aa08aa7649ffd0cdd4a11c3d4d33474d1"
dependencies = [
"core_maths",
"icu_calendar",
"icu_locale",
"ixdtf",
"num-traits",
"timezone_provider",
"tinystr",
"writeable",
]
[[package]]
name = "term"
version = "0.7.0"
@@ -9829,6 +10008,18 @@ dependencies = [
"time-core",
]
[[package]]
name = "timezone_provider"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9ba0000e9e73862f3e7ca1ff159e2ddf915c9d8bb11e38a7874760f445d993"
dependencies = [
"tinystr",
"zerotrie",
"zerovec",
"zoneinfo64",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
@@ -10628,6 +10819,23 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "v8"
version = "146.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97bcac5cdc5a195a4813f1855a6bc658f240452aac36caa12fd6c6f16026ab1"
dependencies = [
"bindgen",
"bitflags 2.10.0",
"fslock",
"gzip-header",
"home",
"miniz_oxide",
"paste",
"temporal_capi",
"which 6.0.3",
]
[[package]]
name = "valuable"
version = "0.1.1"
@@ -10927,6 +11135,18 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88"
[[package]]
name = "which"
version = "6.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f"
dependencies = [
"either",
"home",
"rustix 0.38.44",
"winsafe",
]
[[package]]
name = "which"
version = "8.0.0"
@@ -11930,6 +12150,19 @@ version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445"
[[package]]
name = "zoneinfo64"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2e5597efbe7c421da8a7fd396b20b571704e787c21a272eecf35dfe9d386f0"
dependencies = [
"calendrical_calculations",
"icu_locale_core",
"potential_utf",
"resb",
"serde",
]
[[package]]
name = "zopfli"
version = "0.8.3"

View File

@@ -13,6 +13,7 @@ members = [
"feedback",
"features",
"codex-backend-openapi-models",
"code-mode",
"cloud-requirements",
"cloud-tasks",
"cloud-tasks-client",
@@ -90,6 +91,7 @@ app_test_support = { path = "app-server/tests/common" }
codex-ansi-escape = { path = "ansi-escape" }
codex-api = { path = "codex-api" }
codex-artifacts = { path = "artifacts" }
codex-code-mode = { path = "code-mode" }
codex-package-manager = { path = "package-manager" }
codex-app-server = { path = "app-server" }
codex-app-server-client = { path = "app-server-client" }
@@ -314,6 +316,7 @@ unicode-width = "0.2"
url = "2"
urlencoding = "2.1"
uuid = "1"
v8 = "=146.4.0"
vt100 = "0.16.2"
walkdir = "2.5.0"
webbrowser = "1.0"

View File

@@ -0,0 +1,30 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-code-mode"
version.workspace = true
[lib]
doctest = false
name = "codex_code_mode"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
async-trait = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
v8 = { workspace = true }
[dev-dependencies]
libc = { workspace = true }
pretty_assertions = { workspace = true }
[[bench]]
name = "exec_overhead"
harness = false

View File

@@ -0,0 +1,550 @@
use std::collections::HashMap;
use std::env;
use std::hint::black_box;
use std::process::Command;
use std::process::ExitCode;
use std::time::Instant;
use codex_code_mode::CodeModeService;
use codex_code_mode::CodeModeToolKind;
use codex_code_mode::ExecuteRequest;
use codex_code_mode::RuntimeResponse;
use codex_code_mode::ToolDefinition;
use serde::Deserialize;
use serde::Serialize;
const DEFAULT_SAMPLES: usize = 8;
const DEFAULT_WARM_ITERATIONS: usize = 25;
const DEFAULT_WARMUPS: usize = 1;
const DEFAULT_TOOL_COUNTS: &[usize] = &[0, 32, 128];
const BENCH_SOURCE: &str = r#"text("bench");"#;
fn main() -> ExitCode {
match try_main() {
Ok(()) => ExitCode::SUCCESS,
Err(err) => {
eprintln!("{err}");
ExitCode::FAILURE
}
}
}
fn try_main() -> Result<(), String> {
let args = Args::parse(env::args().skip(1))?;
if let Some(worker) = args.worker {
let report = run_worker(worker)?;
println!(
"{}",
serde_json::to_string(&report)
.map_err(|err| format!("failed to serialize benchmark report: {err}"))?
);
return Ok(());
}
let config = args.parent.unwrap_or_default();
run_parent(config)
}
#[derive(Clone, Debug)]
struct ParentArgs {
samples: usize,
warm_iterations: usize,
tool_counts: Vec<usize>,
}
impl Default for ParentArgs {
fn default() -> Self {
Self {
samples: DEFAULT_SAMPLES,
warm_iterations: DEFAULT_WARM_ITERATIONS,
tool_counts: DEFAULT_TOOL_COUNTS.to_vec(),
}
}
}
#[derive(Clone, Debug)]
struct WorkerArgs {
scenario: Scenario,
tool_count: usize,
iterations: usize,
warmups: usize,
}
#[derive(Clone, Debug)]
struct Args {
parent: Option<ParentArgs>,
worker: Option<WorkerArgs>,
}
impl Args {
fn parse<I>(args: I) -> Result<Self, String>
where
I: IntoIterator<Item = String>,
{
let mut parent = ParentArgs::default();
let mut worker_mode = false;
let mut scenario = None;
let mut tool_count = None;
let mut iterations = None;
let mut warmups = None;
let mut args = args.into_iter();
while let Some(arg) = args.next() {
match arg.as_str() {
"--bench" => {}
"--worker" => worker_mode = true,
"--samples" => parent.samples = parse_usize_flag("--samples", args.next())?,
"--warm-iterations" => {
parent.warm_iterations = parse_usize_flag("--warm-iterations", args.next())?;
}
"--tool-counts" => {
parent.tool_counts = parse_tool_counts(args.next())?;
}
"--scenario" => {
let value = args
.next()
.ok_or_else(|| "missing value for --scenario".to_string())?;
scenario = Some(Scenario::parse(&value)?);
}
"--tool-count" => {
tool_count = Some(parse_usize_flag("--tool-count", args.next())?);
}
"--iterations" => {
iterations = Some(parse_usize_flag("--iterations", args.next())?);
}
"--warmups" => {
warmups = Some(parse_usize_flag("--warmups", args.next())?);
}
"--help" | "-h" => {
print_help();
std::process::exit(0);
}
_ => return Err(format!("unknown argument: {arg}")),
}
}
if worker_mode {
let scenario =
scenario.ok_or_else(|| "missing --scenario in worker mode".to_string())?;
let tool_count =
tool_count.ok_or_else(|| "missing --tool-count in worker mode".to_string())?;
let iterations =
iterations.ok_or_else(|| "missing --iterations in worker mode".to_string())?;
let warmups = warmups.ok_or_else(|| "missing --warmups in worker mode".to_string())?;
return Ok(Self {
parent: None,
worker: Some(WorkerArgs {
scenario,
tool_count,
iterations,
warmups,
}),
});
}
if parent.samples == 0 {
return Err("--samples must be greater than 0".to_string());
}
if parent.warm_iterations == 0 {
return Err("--warm-iterations must be greater than 0".to_string());
}
if parent.tool_counts.is_empty() {
return Err("--tool-counts must include at least one count".to_string());
}
Ok(Self {
parent: Some(parent),
worker: None,
})
}
}
fn print_help() {
println!(
"exec_overhead benchmark\n\
\n\
Usage:\n\
cargo bench -p codex-code-mode --bench exec_overhead -- [--samples N] [--warm-iterations N] [--tool-counts 0,32,128]\n\
\n\
The benchmark runs two scenarios for each tool count:\n\
- cold_exec: one fresh exec in a fresh process\n\
- warm_exec: repeated execs after one warmup exec in a fresh process\n\
\n\
Memory is reported as a fresh-process max RSS delta for each scenario.\n"
);
}
fn parse_usize_flag(flag: &str, value: Option<String>) -> Result<usize, String> {
let value = value.ok_or_else(|| format!("missing value for {flag}"))?;
value
.parse::<usize>()
.map_err(|err| format!("invalid value for {flag}: {err}"))
}
fn parse_tool_counts(value: Option<String>) -> Result<Vec<usize>, String> {
let value = value.ok_or_else(|| "missing value for --tool-counts".to_string())?;
let mut counts = Vec::new();
for item in value.split(',') {
let trimmed = item.trim();
if trimmed.is_empty() {
continue;
}
counts.push(
trimmed
.parse::<usize>()
.map_err(|err| format!("invalid tool count `{trimmed}`: {err}"))?,
);
}
if counts.is_empty() {
return Err("--tool-counts must include at least one count".to_string());
}
Ok(counts)
}
#[derive(Clone, Copy, Debug)]
enum Scenario {
ColdExec,
WarmExec,
}
impl Scenario {
fn all() -> [Self; 2] {
[Self::ColdExec, Self::WarmExec]
}
fn parse(value: &str) -> Result<Self, String> {
match value {
"cold_exec" => Ok(Self::ColdExec),
"warm_exec" => Ok(Self::WarmExec),
_ => Err(format!("unknown scenario `{value}`")),
}
}
fn as_str(self) -> &'static str {
match self {
Self::ColdExec => "cold_exec",
Self::WarmExec => "warm_exec",
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct WorkerReport {
scenario: String,
tool_count: usize,
iterations: usize,
warmups: usize,
mean_exec_nanos: u128,
min_exec_nanos: u128,
max_exec_nanos: u128,
total_exec_nanos: u128,
max_rss_delta_bytes: u64,
}
#[derive(Debug)]
struct SummaryRow {
scenario: Scenario,
tool_count: usize,
samples: usize,
iterations: usize,
warmups: usize,
mean_exec_nanos: u128,
p95_exec_nanos: u128,
median_rss_delta_bytes: u64,
max_rss_delta_bytes: u64,
}
fn run_parent(config: ParentArgs) -> Result<(), String> {
let exe = env::current_exe().map_err(|err| format!("failed to locate bench binary: {err}"))?;
let mut rows = Vec::new();
for tool_count in &config.tool_counts {
for scenario in Scenario::all() {
let (iterations, warmups) = match scenario {
Scenario::ColdExec => (1, 0),
Scenario::WarmExec => (config.warm_iterations, DEFAULT_WARMUPS),
};
let mut reports = Vec::with_capacity(config.samples);
for _sample in 0..config.samples {
reports.push(run_sample(
&exe,
WorkerArgs {
scenario,
tool_count: *tool_count,
iterations,
warmups,
},
)?);
}
rows.push(summarize_reports(scenario, *tool_count, reports)?);
}
}
print_summary(&config, &rows);
Ok(())
}
fn run_sample(exe: &std::path::Path, args: WorkerArgs) -> Result<WorkerReport, String> {
let output = Command::new(exe)
.arg("--worker")
.arg("--scenario")
.arg(args.scenario.as_str())
.arg("--tool-count")
.arg(args.tool_count.to_string())
.arg("--iterations")
.arg(args.iterations.to_string())
.arg("--warmups")
.arg(args.warmups.to_string())
.output()
.map_err(|err| format!("failed to run benchmark worker: {err}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"benchmark worker failed for scenario {} / tools {}: {}",
args.scenario.as_str(),
args.tool_count,
stderr.trim()
));
}
serde_json::from_slice(&output.stdout)
.map_err(|err| format!("failed to parse benchmark worker output: {err}"))
}
fn summarize_reports(
scenario: Scenario,
tool_count: usize,
reports: Vec<WorkerReport>,
) -> Result<SummaryRow, String> {
if reports.is_empty() {
return Err("no benchmark reports collected".to_string());
}
let iterations = reports[0].iterations;
let warmups = reports[0].warmups;
let mut exec_times = reports
.iter()
.map(|report| report.mean_exec_nanos)
.collect::<Vec<_>>();
exec_times.sort_unstable();
let mut rss_deltas = reports
.iter()
.map(|report| report.max_rss_delta_bytes)
.collect::<Vec<_>>();
rss_deltas.sort_unstable();
let mean_exec_nanos = exec_times.iter().sum::<u128>() / u128::from(exec_times.len() as u64);
let p95_exec_nanos = percentile_u128(&exec_times, 95);
let median_rss_delta_bytes = percentile(&rss_deltas, 50);
let max_rss_delta_bytes = rss_deltas.last().copied().unwrap_or(0);
Ok(SummaryRow {
scenario,
tool_count,
samples: reports.len(),
iterations,
warmups,
mean_exec_nanos,
p95_exec_nanos,
median_rss_delta_bytes,
max_rss_delta_bytes,
})
}
fn percentile(values: &[u64], percentile: usize) -> u64 {
debug_assert!(!values.is_empty());
let last_index = values.len().saturating_sub(1);
let index = (last_index * percentile) / 100;
values[index]
}
fn percentile_u128(values: &[u128], percentile: usize) -> u128 {
debug_assert!(!values.is_empty());
let last_index = values.len().saturating_sub(1);
let index = (last_index * percentile) / 100;
values[index]
}
fn print_summary(config: &ParentArgs, rows: &[SummaryRow]) {
println!(
"exec_overhead: samples={}, warm_iterations={}, tool_counts={:?}",
config.samples, config.warm_iterations, config.tool_counts
);
println!(
"{:<12} {:>7} {:>7} {:>10} {:>10} {:>14} {:>14} {:>14} {:>14}",
"scenario",
"tools",
"samples",
"warmups",
"iters",
"mean/exec",
"p95/exec",
"rssΔ p50",
"rssΔ max"
);
for row in rows {
println!(
"{:<12} {:>7} {:>7} {:>10} {:>10} {:>14} {:>14} {:>14} {:>14}",
row.scenario.as_str(),
row.tool_count,
row.samples,
row.warmups,
row.iterations,
format_duration_nanos(row.mean_exec_nanos),
format_duration_nanos(row.p95_exec_nanos),
format_bytes(row.median_rss_delta_bytes),
format_bytes(row.max_rss_delta_bytes),
);
}
println!("memory uses a fresh-process max RSS delta for each scenario");
}
fn format_duration_nanos(nanos: u128) -> String {
if nanos >= 1_000_000_000 {
return format!("{:.2}s", nanos as f64 / 1_000_000_000.0);
}
if nanos >= 1_000_000 {
return format!("{:.2}ms", nanos as f64 / 1_000_000.0);
}
if nanos >= 1_000 {
return format!("{:.2}us", nanos as f64 / 1_000.0);
}
format!("{nanos}ns")
}
fn format_bytes(bytes: u64) -> String {
const KIB: u64 = 1024;
const MIB: u64 = 1024 * KIB;
const GIB: u64 = 1024 * MIB;
if bytes >= GIB {
return format!("{:.2}GiB", bytes as f64 / GIB as f64);
}
if bytes >= MIB {
return format!("{:.2}MiB", bytes as f64 / MIB as f64);
}
if bytes >= KIB {
return format!("{:.2}KiB", bytes as f64 / KIB as f64);
}
format!("{bytes}B")
}
fn run_worker(args: WorkerArgs) -> Result<WorkerReport, String> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| format!("failed to create tokio runtime: {err}"))?;
runtime.block_on(async move {
let service = CodeModeService::new();
let tools = benchmark_tools(args.tool_count);
for warmup_index in 0..args.warmups {
execute_benchmark_script(&service, &tools, warmup_index).await?;
}
let baseline_rss = current_max_rss_bytes()?;
let mut total_exec_nanos = 0_u128;
let mut min_exec_nanos = u128::MAX;
let mut max_exec_nanos = 0_u128;
for iteration in 0..args.iterations {
let started_at = Instant::now();
let response =
execute_benchmark_script(&service, &tools, args.warmups + iteration).await?;
let elapsed_nanos = started_at.elapsed().as_nanos();
total_exec_nanos += elapsed_nanos;
min_exec_nanos = min_exec_nanos.min(elapsed_nanos);
max_exec_nanos = max_exec_nanos.max(elapsed_nanos);
black_box(response);
}
let mean_exec_nanos = total_exec_nanos / u128::from(args.iterations as u64);
let max_rss_delta_bytes = current_max_rss_bytes()?.saturating_sub(baseline_rss);
Ok(WorkerReport {
scenario: args.scenario.as_str().to_string(),
tool_count: args.tool_count,
iterations: args.iterations,
warmups: args.warmups,
mean_exec_nanos,
min_exec_nanos,
max_exec_nanos,
total_exec_nanos,
max_rss_delta_bytes,
})
})
}
async fn execute_benchmark_script(
service: &CodeModeService,
tools: &[ToolDefinition],
exec_index: usize,
) -> Result<RuntimeResponse, String> {
let response = service
.execute(ExecuteRequest {
tool_call_id: format!("bench_call_{exec_index}"),
enabled_tools: tools.to_vec(),
source: BENCH_SOURCE.to_string(),
stored_values: HashMap::new(),
yield_time_ms: None,
max_output_tokens: None,
})
.await?;
match &response {
RuntimeResponse::Result {
error_text,
content_items,
..
} => {
if error_text.is_some() {
return Err(format!(
"benchmark exec failed unexpectedly: {error_text:?}"
));
}
if content_items.len() != 1 {
return Err(format!(
"benchmark exec produced unexpected content item count: {}",
content_items.len()
));
}
}
unexpected => {
return Err(format!(
"benchmark exec produced unexpected response: {unexpected:?}"
));
}
}
Ok(response)
}
fn benchmark_tools(tool_count: usize) -> Vec<ToolDefinition> {
(0..tool_count)
.map(|tool_index| ToolDefinition {
name: format!("bench_tool_{tool_index:04}"),
description: format!("Benchmark tool {tool_index}"),
kind: CodeModeToolKind::Function,
input_schema: None,
output_schema: None,
})
.collect()
}
fn current_max_rss_bytes() -> Result<u64, String> {
let mut usage = std::mem::MaybeUninit::<libc::rusage>::uninit();
let status = unsafe { libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr()) };
if status != 0 {
return Err(std::io::Error::last_os_error().to_string());
}
let max_rss = unsafe { usage.assume_init() }.ru_maxrss as u64;
#[cfg(any(target_os = "macos", target_os = "ios"))]
{
Ok(max_rss)
}
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
{
Ok(max_rss.saturating_mul(1024))
}
}

View File

@@ -0,0 +1,630 @@
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use crate::PUBLIC_TOOL_NAME;
const MAX_JS_SAFE_INTEGER: u64 = (1_u64 << 53) - 1;
const CODE_MODE_ONLY_PREFACE: &str =
"Use `exec/wait` tool to run all other tools, do not attempt to use any other tools directly";
const EXEC_DESCRIPTION_TEMPLATE: &str = r#"## exec
- Runs raw JavaScript in an isolated context (no Node, no file system, or network access, no console).
- Send raw JavaScript source text, not JSON, quoted strings, or markdown code fences.
- You may optionally start the tool input with a first-line pragma like `// @exec: {"yield_time_ms": 10000, "max_output_tokens": 1000}`.
- `yield_time_ms` asks `exec` to yield early after that many milliseconds if the script is still running.
- `max_output_tokens` sets the token budget for direct `exec` results. By default the result is truncated to 10000 tokens.
- All nested tools are available on the global `tools` object, for example `await tools.exec_command(...)`. Tool names are exposed as normalized JavaScript identifiers, for example `await tools.mcp__ologs__get_profile(...)`.
- Tool methods take either string or object as parameter.
- They return either a structured value or a string based on the description above.
- Global helpers:
- `exit()`: Immediately ends the current script successfully (like an early return from the top level).
- `text(value: string | number | boolean | undefined | null)`: Appends a text item and returns it. Non-string values are stringified with `JSON.stringify(...)` when possible.
- `image(imageUrlOrItem: string | { image_url: string; detail?: "auto" | "low" | "high" | "original" | null })`: Appends an image item and returns it. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL.
- `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session.
- `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing.
- `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`.
- `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries.
- `yield_control()`: yields the accumulated output to the model immediately while the script keeps running."#;
const WAIT_DESCRIPTION_TEMPLATE: &str = r#"- Use `wait` only after `exec` returns `Script running with cell ID ...`.
- `cell_id` identifies the running `exec` cell to resume.
- `yield_time_ms` controls how long to wait for more output before yielding again. If omitted, `wait` uses its default wait timeout.
- `max_tokens` limits how much new output this wait call returns.
- `terminate: true` stops the running cell instead of waiting for more output.
- `wait` returns only the new output since the last yield, or the final completion or termination result for that cell.
- If the cell is still running, `wait` may yield again with the same `cell_id`.
- If the cell has already finished, `wait` returns the completed result and closes the cell."#;
pub const CODE_MODE_PRAGMA_PREFIX: &str = "// @exec:";
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CodeModeToolKind {
Function,
Freeform,
}
pub trait CodeModeToolDefinition {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn kind(&self) -> CodeModeToolKind;
fn input_schema(&self) -> Option<&JsonValue>;
fn output_schema(&self) -> Option<&JsonValue>;
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ToolReference {
pub module_path: String,
pub namespace: Vec<String>,
pub tool_key: String,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ToolDefinition {
pub name: String,
pub description: String,
pub kind: CodeModeToolKind,
pub input_schema: Option<JsonValue>,
pub output_schema: Option<JsonValue>,
}
impl CodeModeToolDefinition for ToolDefinition {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
&self.description
}
fn kind(&self) -> CodeModeToolKind {
self.kind
}
fn input_schema(&self) -> Option<&JsonValue> {
self.input_schema.as_ref()
}
fn output_schema(&self) -> Option<&JsonValue> {
self.output_schema.as_ref()
}
}
#[derive(Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct CodeModeExecPragma {
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ParsedExecSource {
pub code: String,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
pub fn parse_exec_source(input: &str) -> Result<ParsedExecSource, String> {
if input.trim().is_empty() {
return Err(
"exec expects raw JavaScript source text (non-empty). Provide JS only, optionally with first-line `// @exec: {\"yield_time_ms\": 10000, \"max_output_tokens\": 1000}`.".to_string(),
);
}
let mut args = ParsedExecSource {
code: input.to_string(),
yield_time_ms: None,
max_output_tokens: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(CODE_MODE_PRAGMA_PREFIX) else {
return Ok(args);
};
if rest.trim().is_empty() {
return Err(
"exec pragma must be followed by JavaScript source on subsequent lines".to_string(),
);
}
let directive = pragma.trim();
if directive.is_empty() {
return Err(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
);
}
let value: serde_json::Value = serde_json::from_str(directive).map_err(|err| {
format!(
"exec pragma must be valid JSON with supported fields `yield_time_ms` and `max_output_tokens`: {err}"
)
})?;
let object = value.as_object().ok_or_else(|| {
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string()
})?;
for key in object.keys() {
match key.as_str() {
"yield_time_ms" | "max_output_tokens" => {}
_ => {
return Err(format!(
"exec pragma only supports `yield_time_ms` and `max_output_tokens`; got `{key}`"
));
}
}
}
let pragma: CodeModeExecPragma = serde_json::from_value(value).map_err(|err| {
format!(
"exec pragma fields `yield_time_ms` and `max_output_tokens` must be non-negative safe integers: {err}"
)
})?;
if pragma
.yield_time_ms
.is_some_and(|yield_time_ms| yield_time_ms > MAX_JS_SAFE_INTEGER)
{
return Err(
"exec pragma field `yield_time_ms` must be a non-negative safe integer".to_string(),
);
}
if pragma.max_output_tokens.is_some_and(|max_output_tokens| {
u64::try_from(max_output_tokens)
.map(|max_output_tokens| max_output_tokens > MAX_JS_SAFE_INTEGER)
.unwrap_or(true)
}) {
return Err(
"exec pragma field `max_output_tokens` must be a non-negative safe integer".to_string(),
);
}
args.code = rest.to_string();
args.yield_time_ms = pragma.yield_time_ms;
args.max_output_tokens = pragma.max_output_tokens;
Ok(args)
}
pub fn is_code_mode_nested_tool(tool_name: &str) -> bool {
tool_name != crate::PUBLIC_TOOL_NAME && tool_name != crate::WAIT_TOOL_NAME
}
pub fn build_exec_tool_description(
enabled_tools: &[(String, String)],
code_mode_only: bool,
) -> String {
if !code_mode_only {
return EXEC_DESCRIPTION_TEMPLATE.to_string();
}
let mut sections = vec![
CODE_MODE_ONLY_PREFACE.to_string(),
EXEC_DESCRIPTION_TEMPLATE.to_string(),
];
if !enabled_tools.is_empty() {
let nested_tool_reference = enabled_tools
.iter()
.map(|(name, nested_description)| {
let global_name = normalize_code_mode_identifier(name);
format!(
"### `{global_name}` (`{name}`)\n{}",
nested_description.trim()
)
})
.collect::<Vec<_>>()
.join("\n\n");
sections.push(nested_tool_reference);
}
sections.join("\n\n")
}
pub fn build_wait_tool_description() -> &'static str {
WAIT_DESCRIPTION_TEMPLATE
}
pub fn tool_reference(tool_name: &str) -> ToolReference {
if let Some((server_name, tool_key)) = split_qualified_tool_name(tool_name) {
let namespace = vec!["mcp".to_string(), server_name];
return ToolReference {
module_path: format!("tools/{}.js", namespace.join("/")),
namespace,
tool_key,
};
}
ToolReference {
module_path: "tools.js".to_string(),
namespace: Vec::new(),
tool_key: tool_name.to_string(),
}
}
pub fn normalize_code_mode_identifier(tool_key: &str) -> String {
let mut identifier = String::new();
for (index, ch) in tool_key.chars().enumerate() {
let is_valid = if index == 0 {
ch == '_' || ch == '$' || ch.is_ascii_alphabetic()
} else {
ch == '_' || ch == '$' || ch.is_ascii_alphanumeric()
};
if is_valid {
identifier.push(ch);
} else {
identifier.push('_');
}
}
if identifier.is_empty() {
"_".to_string()
} else {
identifier
}
}
pub fn augment_tool_definition(mut definition: ToolDefinition) -> ToolDefinition {
if definition.name != PUBLIC_TOOL_NAME {
definition.description = append_code_mode_sample_for_definition(&definition);
}
definition
}
pub fn enabled_tool_metadata(definition: &impl CodeModeToolDefinition) -> EnabledToolMetadata {
let tool_name = definition.name().to_string();
let reference = tool_reference(&tool_name);
EnabledToolMetadata {
tool_name,
global_name: normalize_code_mode_identifier(definition.name()),
module_path: reference.module_path,
namespace: reference.namespace,
name: normalize_code_mode_identifier(&reference.tool_key),
description: definition.description().to_string(),
kind: definition.kind(),
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct EnabledToolMetadata {
pub tool_name: String,
pub global_name: String,
pub module_path: String,
pub namespace: Vec<String>,
pub name: String,
pub description: String,
pub kind: CodeModeToolKind,
}
pub fn append_code_mode_sample(
description: &str,
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let declaration = format!(
"declare const tools: {{ {} }};",
render_code_mode_tool_declaration(tool_name, input_name, input_type, output_type)
);
format!("{description}\n\nexec tool declaration:\n```ts\n{declaration}\n```")
}
fn append_code_mode_sample_for_definition(definition: &impl CodeModeToolDefinition) -> String {
let input_name = match definition.kind() {
CodeModeToolKind::Function => "args",
CodeModeToolKind::Freeform => "input",
};
let input_type = match definition.kind() {
CodeModeToolKind::Function => definition
.input_schema()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
CodeModeToolKind::Freeform => "string".to_string(),
};
let output_type = definition
.output_schema()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string());
append_code_mode_sample(
definition.description(),
definition.name(),
input_name,
input_type,
output_type,
)
}
fn render_code_mode_tool_declaration(
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let tool_name = normalize_code_mode_identifier(tool_name);
format!("{tool_name}({input_name}: {input_type}): Promise<{output_type}>;")
}
fn split_qualified_tool_name(qualified_name: &str) -> Option<(String, String)> {
let mut parts = qualified_name.split("__");
let prefix = parts.next()?;
if prefix != "mcp" {
return None;
}
let server_name = parts.next()?;
let tool_name: String = parts.collect::<Vec<_>>().join("__");
if tool_name.is_empty() {
return None;
}
Some((server_name.to_string(), tool_name))
}
pub fn render_json_schema_to_typescript(schema: &JsonValue) -> String {
render_json_schema_to_typescript_inner(schema)
}
fn render_json_schema_to_typescript_inner(schema: &JsonValue) -> String {
match schema {
JsonValue::Bool(true) => "unknown".to_string(),
JsonValue::Bool(false) => "never".to_string(),
JsonValue::Object(map) => {
if let Some(value) = map.get("const") {
return render_json_schema_literal(value);
}
if let Some(values) = map.get("enum").and_then(JsonValue::as_array) {
let rendered = values
.iter()
.map(render_json_schema_literal)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
for key in ["anyOf", "oneOf"] {
if let Some(variants) = map.get(key).and_then(JsonValue::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
}
if let Some(variants) = map.get("allOf").and_then(JsonValue::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" & ");
}
}
if let Some(schema_type) = map.get("type") {
if let Some(types) = schema_type.as_array() {
let rendered = types
.iter()
.filter_map(JsonValue::as_str)
.map(|schema_type| render_json_schema_type_keyword(map, schema_type))
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
if let Some(schema_type) = schema_type.as_str() {
return render_json_schema_type_keyword(map, schema_type);
}
}
if map.contains_key("properties")
|| map.contains_key("additionalProperties")
|| map.contains_key("required")
{
return render_json_schema_object(map);
}
if map.contains_key("items") || map.contains_key("prefixItems") {
return render_json_schema_array(map);
}
"unknown".to_string()
}
_ => "unknown".to_string(),
}
}
fn render_json_schema_type_keyword(
map: &serde_json::Map<String, JsonValue>,
schema_type: &str,
) -> String {
match schema_type {
"string" => "string".to_string(),
"number" | "integer" => "number".to_string(),
"boolean" => "boolean".to_string(),
"null" => "null".to_string(),
"array" => render_json_schema_array(map),
"object" => render_json_schema_object(map),
_ => "unknown".to_string(),
}
}
fn render_json_schema_array(map: &serde_json::Map<String, JsonValue>) -> String {
if let Some(items) = map.get("items") {
let item_type = render_json_schema_to_typescript_inner(items);
return format!("Array<{item_type}>");
}
if let Some(items) = map.get("prefixItems").and_then(JsonValue::as_array) {
let item_types = items
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !item_types.is_empty() {
return format!("[{}]", item_types.join(", "));
}
}
"unknown[]".to_string()
}
fn render_json_schema_object(map: &serde_json::Map<String, JsonValue>) -> String {
let required = map
.get("required")
.and_then(JsonValue::as_array)
.map(|items| {
items
.iter()
.filter_map(JsonValue::as_str)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let properties = map
.get("properties")
.and_then(JsonValue::as_object)
.cloned()
.unwrap_or_default();
let mut sorted_properties = properties.iter().collect::<Vec<_>>();
sorted_properties.sort_unstable_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
let mut lines = sorted_properties
.into_iter()
.map(|(name, value)| {
let optional = if required.iter().any(|required_name| required_name == name) {
""
} else {
"?"
};
let property_name = render_json_schema_property_name(name);
let property_type = render_json_schema_to_typescript_inner(value);
format!("{property_name}{optional}: {property_type};")
})
.collect::<Vec<_>>();
if let Some(additional_properties) = map.get("additionalProperties") {
let property_type = match additional_properties {
JsonValue::Bool(true) => Some("unknown".to_string()),
JsonValue::Bool(false) => None,
value => Some(render_json_schema_to_typescript_inner(value)),
};
if let Some(property_type) = property_type {
lines.push(format!("[key: string]: {property_type};"));
}
} else if properties.is_empty() {
lines.push("[key: string]: unknown;".to_string());
}
if lines.is_empty() {
return "{}".to_string();
}
format!("{{ {} }}", lines.join(" "))
}
fn render_json_schema_property_name(name: &str) -> String {
if normalize_code_mode_identifier(name) == name {
name.to_string()
} else {
serde_json::to_string(name).unwrap_or_else(|_| format!("\"{}\"", name.replace('"', "\\\"")))
}
}
fn render_json_schema_literal(value: &JsonValue) -> String {
serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string())
}
#[cfg(test)]
mod tests {
use super::CodeModeToolKind;
use super::ParsedExecSource;
use super::ToolDefinition;
use super::augment_tool_definition;
use super::build_exec_tool_description;
use super::normalize_code_mode_identifier;
use super::parse_exec_source;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn parse_exec_source_without_pragma() {
assert_eq!(
parse_exec_source("text('hi')").unwrap(),
ParsedExecSource {
code: "text('hi')".to_string(),
yield_time_ms: None,
max_output_tokens: None,
}
);
}
#[test]
fn parse_exec_source_with_pragma() {
assert_eq!(
parse_exec_source("// @exec: {\"yield_time_ms\": 10}\ntext('hi')").unwrap(),
ParsedExecSource {
code: "text('hi')".to_string(),
yield_time_ms: Some(10),
max_output_tokens: None,
}
);
}
#[test]
fn normalize_identifier_rewrites_invalid_characters() {
assert_eq!(
"mcp__ologs__get_profile",
normalize_code_mode_identifier("mcp__ologs__get_profile")
);
assert_eq!(
"hidden_dynamic_tool",
normalize_code_mode_identifier("hidden-dynamic-tool")
);
}
#[test]
fn augment_tool_definition_appends_typed_declaration() {
let definition = ToolDefinition {
name: "hidden_dynamic_tool".to_string(),
description: "Test tool".to_string(),
kind: CodeModeToolKind::Function,
input_schema: Some(json!({
"type": "object",
"properties": { "city": { "type": "string" } },
"required": ["city"],
"additionalProperties": false
})),
output_schema: Some(json!({
"type": "object",
"properties": { "ok": { "type": "boolean" } },
"required": ["ok"]
})),
};
let description = augment_tool_definition(definition).description;
assert!(description.contains("declare const tools"));
assert!(
description.contains(
"hidden_dynamic_tool(args: { city: string; }): Promise<{ ok: boolean; }>;"
)
);
}
#[test]
fn code_mode_only_description_includes_nested_tools() {
let description =
build_exec_tool_description(&[("foo".to_string(), "bar".to_string())], true);
assert!(description.contains("### `foo` (`foo`)"));
}
}

View File

@@ -0,0 +1,32 @@
mod description;
mod response;
mod runtime;
mod service;
pub use description::CODE_MODE_PRAGMA_PREFIX;
pub use description::CodeModeToolDefinition;
pub use description::CodeModeToolKind;
pub use description::ToolDefinition;
pub use description::append_code_mode_sample;
pub use description::augment_tool_definition;
pub use description::build_exec_tool_description;
pub use description::build_wait_tool_description;
pub use description::is_code_mode_nested_tool;
pub use description::normalize_code_mode_identifier;
pub use description::parse_exec_source;
pub use description::render_json_schema_to_typescript;
pub use description::tool_reference;
pub use response::FunctionCallOutputContentItem;
pub use response::ImageDetail;
pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
pub use runtime::ExecuteRequest;
pub use runtime::RuntimeResponse;
pub use runtime::WaitRequest;
pub use service::CodeModeService;
pub use service::CodeModeTurnHost;
pub use service::CodeModeTurnWorker;
pub const PUBLIC_TOOL_NAME: &str = "exec";
pub const WAIT_TOOL_NAME: &str = "wait";

View File

@@ -0,0 +1,24 @@
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ImageDetail {
Auto,
Low,
High,
Original,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FunctionCallOutputContentItem {
InputText {
text: String,
},
InputImage {
image_url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
detail: Option<ImageDetail>,
},
}

View File

@@ -0,0 +1,246 @@
use crate::response::FunctionCallOutputContentItem;
use super::EXIT_SENTINEL;
use super::RuntimeEvent;
use super::RuntimeState;
use super::value::content_item_to_js_value;
use super::value::json_to_v8;
use super::value::normalize_output_image;
use super::value::serialize_output_text;
use super::value::throw_type_error;
use super::value::v8_value_to_json;
pub(super) fn tool_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let tool_name = args.data().to_rust_string_lossy(scope);
let input = if args.length() == 0 {
Ok(None)
} else {
v8_value_to_json(scope, args.get(0))
};
let input = match input {
Ok(input) => input,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
let Some(resolver) = v8::PromiseResolver::new(scope) else {
throw_type_error(scope, "failed to create tool promise");
return;
};
let promise = resolver.get_promise(scope);
let resolver = v8::Global::new(scope, resolver);
let Some(state) = scope.get_slot_mut::<RuntimeState>() else {
throw_type_error(scope, "runtime state unavailable");
return;
};
let id = format!("tool-{}", state.next_tool_call_id);
state.next_tool_call_id = state.next_tool_call_id.saturating_add(1);
let event_tx = state.event_tx.clone();
state.pending_tool_calls.insert(id.clone(), resolver);
let _ = event_tx.send(RuntimeEvent::ToolCall {
id,
name: tool_name,
input,
});
retval.set(promise.into());
}
pub(super) fn text_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let text = match serialize_output_text(scope, value) {
Ok(text) => text,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::ContentItem(
FunctionCallOutputContentItem::InputText { text: text.clone() },
));
}
let item = v8::Object::new(scope);
let Some(type_key) = v8::String::new(scope, "type") else {
throw_type_error(scope, "failed to allocate text output item");
return;
};
let Some(type_value) = v8::String::new(scope, "input_text") else {
throw_type_error(scope, "failed to allocate text output item");
return;
};
let Some(text_key) = v8::String::new(scope, "text") else {
throw_type_error(scope, "failed to allocate text output item");
return;
};
let Some(text_value) = v8::String::new(scope, &text) else {
throw_type_error(scope, "failed to allocate text output item");
return;
};
item.set(scope, type_key.into(), type_value.into());
item.set(scope, text_key.into(), text_value.into());
retval.set(item.into());
}
pub(super) fn image_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let image_item = match normalize_output_image(scope, value) {
Ok(image_item) => image_item,
Err(()) => return,
};
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state
.event_tx
.send(RuntimeEvent::ContentItem(image_item.clone()));
}
let item = match content_item_to_js_value(scope, &image_item) {
Some(item) => item,
None => return,
};
retval.set(item);
}
pub(super) fn store_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
let key = match args.get(0).to_string(scope) {
Some(key) => key.to_rust_string_lossy(scope),
None => {
throw_type_error(scope, "store key must be a string");
return;
}
};
let value = args.get(1);
let serialized = match v8_value_to_json(scope, value) {
Ok(Some(value)) => value,
Ok(None) => {
throw_type_error(
scope,
&format!("Unable to store {key:?}. Only plain serializable objects can be stored."),
);
return;
}
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.stored_values.insert(key, serialized);
}
}
pub(super) fn load_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let key = match args.get(0).to_string(scope) {
Some(key) => key.to_rust_string_lossy(scope),
None => {
throw_type_error(scope, "load key must be a string");
return;
}
};
let value = scope
.get_slot::<RuntimeState>()
.and_then(|state| state.stored_values.get(&key))
.cloned();
let Some(value) = value else {
retval.set(v8::undefined(scope).into());
return;
};
let Some(value) = json_to_v8(scope, &value) else {
throw_type_error(scope, "failed to load stored value");
return;
};
retval.set(value);
}
pub(super) fn notify_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let text = match serialize_output_text(scope, value) {
Ok(text) => text,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if text.trim().is_empty() {
throw_type_error(scope, "notify expects non-empty text");
return;
}
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::Notify {
call_id: state.tool_call_id.clone(),
text: text.clone(),
});
}
let Some(value) = v8::String::new(scope, &text) else {
throw_type_error(scope, "failed to allocate notify result");
return;
};
retval.set(value.into());
}
pub(super) fn yield_control_callback(
scope: &mut v8::PinScope<'_, '_>,
_args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::YieldRequested);
}
}
pub(super) fn exit_callback(
scope: &mut v8::PinScope<'_, '_>,
_args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.exit_requested = true;
}
if let Some(error) = v8::String::new(scope, EXIT_SENTINEL) {
scope.throw_exception(error.into());
}
}
pub(super) fn noop_callback(
_scope: &mut v8::PinScope<'_, '_>,
_args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
}

View File

@@ -0,0 +1,187 @@
use std::collections::HashMap;
use serde_json::Value as JsonValue;
use crate::description::EnabledToolMetadata;
use super::MODULE_TOOLS_SYMBOL_KEY;
use super::RuntimeState;
use super::callbacks::exit_callback;
use super::callbacks::image_callback;
use super::callbacks::load_callback;
use super::callbacks::noop_callback;
use super::callbacks::notify_callback;
use super::callbacks::store_callback;
use super::callbacks::text_callback;
use super::callbacks::tool_callback;
use super::callbacks::yield_control_callback;
use super::value::json_to_v8;
pub(super) fn install_globals(scope: &mut v8::PinScope<'_, '_>) -> Result<(), String> {
let global = scope.get_current_context().global(scope);
let tools = build_tools_object(scope)?;
let module_tools = build_module_tools_object(scope)?;
let all_tools = build_all_tools_value(scope)?;
let console = build_console_object(scope)?;
let text = helper_function(scope, "text", text_callback)?;
let image = helper_function(scope, "image", image_callback)?;
let store = helper_function(scope, "store", store_callback)?;
let load = helper_function(scope, "load", load_callback)?;
let notify = helper_function(scope, "notify", notify_callback)?;
let yield_control = helper_function(scope, "yield_control", yield_control_callback)?;
let exit = helper_function(scope, "exit", exit_callback)?;
set_global(scope, global, "tools", tools.into())?;
set_global(scope, global, "ALL_TOOLS", all_tools)?;
let module_tools_symbol_description = v8::String::new(scope, MODULE_TOOLS_SYMBOL_KEY)
.ok_or_else(|| "failed to allocate module tools symbol".to_string())?;
let module_tools_symbol = v8::Symbol::for_key(scope, module_tools_symbol_description);
if global.set(scope, module_tools_symbol.into(), module_tools.into()) != Some(true) {
return Err("failed to set module tools symbol".to_string());
}
set_global(scope, global, "console", console.into())?;
set_global(scope, global, "text", text.into())?;
set_global(scope, global, "image", image.into())?;
set_global(scope, global, "store", store.into())?;
set_global(scope, global, "load", load.into())?;
set_global(scope, global, "notify", notify.into())?;
set_global(scope, global, "yield_control", yield_control.into())?;
set_global(scope, global, "exit", exit.into())?;
Ok(())
}
fn build_tools_object<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Object>, String> {
let tools = v8::Object::new(scope);
let enabled_tools = scope
.get_slot::<RuntimeState>()
.map(|state| state.enabled_tools.clone())
.unwrap_or_default();
for tool in enabled_tools {
let name = v8::String::new(scope, &tool.global_name)
.ok_or_else(|| "failed to allocate tool name".to_string())?;
let function = tool_function(scope, &tool.tool_name)?;
tools.set(scope, name.into(), function.into());
}
Ok(tools)
}
fn build_module_tools_object<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Object>, String> {
let module_tools = v8::Object::new(scope);
let enabled_tools = scope
.get_slot::<RuntimeState>()
.map(|state| state.enabled_tools.clone())
.unwrap_or_default();
let mut buckets = HashMap::<String, Vec<EnabledToolMetadata>>::new();
for tool in enabled_tools {
if tool.namespace.is_empty() {
continue;
}
buckets
.entry(tool.namespace.join("/"))
.or_default()
.push(tool);
}
for (key, tools) in buckets {
let tool_object = v8::Object::new(scope);
for tool in tools {
let name = v8::String::new(scope, &tool.name)
.ok_or_else(|| "failed to allocate module export name".to_string())?;
let function = tool_function(scope, &tool.tool_name)?;
tool_object.set(scope, name.into(), function.into());
}
let key = v8::String::new(scope, &key)
.ok_or_else(|| "failed to allocate module namespace".to_string())?;
module_tools.set(scope, key.into(), tool_object.into());
}
Ok(module_tools)
}
fn build_all_tools_value<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Value>, String> {
let all_tools = scope
.get_slot::<RuntimeState>()
.map(|state| {
state
.enabled_tools
.iter()
.map(|tool| {
serde_json::json!({
"name": tool.global_name,
"description": tool.description,
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
json_to_v8(scope, &JsonValue::Array(all_tools))
.ok_or_else(|| "failed to build ALL_TOOLS metadata".to_string())
}
fn build_console_object<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Object>, String> {
let console = v8::Object::new(scope);
for name in ["log", "info", "warn", "error", "debug"] {
let key = v8::String::new(scope, name)
.ok_or_else(|| "failed to allocate console key".to_string())?;
let value = helper_function(scope, name, noop_callback)?;
console.set(scope, key.into(), value.into());
}
Ok(console)
}
fn helper_function<'s, F>(
scope: &mut v8::PinScope<'s, '_>,
name: &str,
callback: F,
) -> Result<v8::Local<'s, v8::Function>, String>
where
F: v8::MapFnTo<v8::FunctionCallback>,
{
let name =
v8::String::new(scope, name).ok_or_else(|| "failed to allocate helper name".to_string())?;
let template = v8::FunctionTemplate::builder(callback)
.data(name.into())
.build(scope);
template
.get_function(scope)
.ok_or_else(|| "failed to create helper function".to_string())
}
fn tool_function<'s>(
scope: &mut v8::PinScope<'s, '_>,
tool_name: &str,
) -> Result<v8::Local<'s, v8::Function>, String> {
let data = v8::String::new(scope, tool_name)
.ok_or_else(|| "failed to allocate tool callback data".to_string())?;
let template = v8::FunctionTemplate::builder(tool_callback)
.data(data.into())
.build(scope);
template
.get_function(scope)
.ok_or_else(|| "failed to create tool function".to_string())
}
fn set_global<'s>(
scope: &mut v8::PinScope<'s, '_>,
global: v8::Local<'s, v8::Object>,
name: &str,
value: v8::Local<'s, v8::Value>,
) -> Result<(), String> {
let key = v8::String::new(scope, name)
.ok_or_else(|| format!("failed to allocate global `{name}`"))?;
if global.set(scope, key.into(), value) == Some(true) {
Ok(())
} else {
Err(format!("failed to set global `{name}`"))
}
}

View File

@@ -0,0 +1,376 @@
mod callbacks;
mod globals;
mod module_loader;
mod value;
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::OnceLock;
use std::sync::mpsc as std_mpsc;
use std::thread;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
use crate::description::EnabledToolMetadata;
use crate::description::ToolDefinition;
use crate::description::enabled_tool_metadata;
use crate::response::FunctionCallOutputContentItem;
pub const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL: usize = 10_000;
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
const MODULE_TOOLS_SYMBOL_KEY: &str = "__codex_code_mode_module_tools__";
#[derive(Clone, Debug)]
pub struct ExecuteRequest {
pub tool_call_id: String,
pub enabled_tools: Vec<ToolDefinition>,
pub source: String,
pub stored_values: HashMap<String, JsonValue>,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
#[derive(Clone, Debug)]
pub struct WaitRequest {
pub cell_id: String,
pub yield_time_ms: u64,
pub terminate: bool,
}
#[derive(Debug, PartialEq)]
pub enum RuntimeResponse {
Yielded {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
},
Terminated {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
},
Result {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
max_output_tokens_per_exec_call: Option<usize>,
},
}
#[derive(Debug)]
pub(crate) enum TurnMessage {
ToolCall {
cell_id: String,
id: String,
name: String,
input: Option<JsonValue>,
},
Notify {
cell_id: String,
call_id: String,
text: String,
},
}
#[derive(Debug)]
pub(crate) enum RuntimeCommand {
ToolResponse { id: String, result: JsonValue },
ToolError { id: String, error_text: String },
Terminate,
}
#[derive(Debug)]
pub(crate) enum RuntimeEvent {
Started,
ContentItem(FunctionCallOutputContentItem),
YieldRequested,
ToolCall {
id: String,
name: String,
input: Option<JsonValue>,
},
Notify {
call_id: String,
text: String,
},
Result {
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
pub(crate) fn spawn_runtime(
request: ExecuteRequest,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> Result<(std_mpsc::Sender<RuntimeCommand>, v8::IsolateHandle), String> {
let (command_tx, command_rx) = std_mpsc::channel();
let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1);
let enabled_tools = request
.enabled_tools
.iter()
.map(enabled_tool_metadata)
.collect::<Vec<_>>();
let config = RuntimeConfig {
tool_call_id: request.tool_call_id,
enabled_tools,
source: request.source,
stored_values: request.stored_values,
};
thread::spawn(move || {
let runtime_event_tx = event_tx.clone();
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
run_runtime(config, runtime_event_tx, command_rx, isolate_handle_tx)
}));
if let Err(panic_payload) = result {
let error_text = if let Some(message) = panic_payload.downcast_ref::<String>() {
message.clone()
} else if let Some(message) = panic_payload.downcast_ref::<&str>() {
(*message).to_string()
} else {
"code mode runtime panicked".to_string()
};
let _ = event_tx.send(RuntimeEvent::Result {
stored_values: HashMap::new(),
error_text: Some(error_text),
});
}
});
let isolate_handle = isolate_handle_rx
.recv()
.map_err(|_| "failed to initialize code mode runtime".to_string())?;
Ok((command_tx, isolate_handle))
}
#[derive(Clone)]
struct RuntimeConfig {
tool_call_id: String,
enabled_tools: Vec<EnabledToolMetadata>,
source: String,
stored_values: HashMap<String, JsonValue>,
}
pub(super) struct RuntimeState {
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
stored_values: HashMap<String, JsonValue>,
enabled_tools: Vec<EnabledToolMetadata>,
module_cache: HashMap<String, v8::Global<v8::Module>>,
next_tool_call_id: u64,
tool_call_id: String,
exit_requested: bool,
}
pub(super) enum CompletionState {
Pending,
Completed {
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
fn initialize_v8() {
static PLATFORM: OnceLock<v8::SharedRef<v8::Platform>> = OnceLock::new();
let _ = PLATFORM.get_or_init(|| {
let platform = v8::new_default_platform(0, false).make_shared();
v8::V8::initialize_platform(platform.clone());
v8::V8::initialize();
platform
});
}
fn run_runtime(
config: RuntimeConfig,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
command_rx: std_mpsc::Receiver<RuntimeCommand>,
isolate_handle_tx: std_mpsc::SyncSender<v8::IsolateHandle>,
) {
initialize_v8();
let isolate = &mut v8::Isolate::new(v8::CreateParams::default());
let isolate_handle = isolate.thread_safe_handle();
if isolate_handle_tx.send(isolate_handle).is_err() {
return;
}
isolate.set_host_import_module_dynamically_callback(module_loader::dynamic_import_callback);
v8::scope!(let scope, isolate);
let context = v8::Context::new(scope, Default::default());
let scope = &mut v8::ContextScope::new(scope, context);
scope.set_slot(RuntimeState {
event_tx: event_tx.clone(),
pending_tool_calls: HashMap::new(),
stored_values: config.stored_values,
enabled_tools: config.enabled_tools,
module_cache: HashMap::new(),
next_tool_call_id: 1,
tool_call_id: config.tool_call_id,
exit_requested: false,
});
if let Err(error_text) = globals::install_globals(scope) {
let _ = event_tx.send(RuntimeEvent::Result {
stored_values: HashMap::new(),
error_text: Some(error_text),
});
return;
}
let _ = event_tx.send(RuntimeEvent::Started);
let pending_promise = match module_loader::evaluate_main_module(scope, &config.source) {
Ok(pending_promise) => pending_promise,
Err(error_text) => {
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text: Some(error_text),
});
return;
}
};
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
error_text,
} => {
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text,
});
return;
}
CompletionState::Pending => {}
}
let mut pending_promise = pending_promise;
loop {
let Ok(command) = command_rx.recv() else {
break;
};
match command {
RuntimeCommand::Terminate => break,
RuntimeCommand::ToolResponse { id, result } => {
if let Err(error_text) =
module_loader::resolve_tool_response(scope, &id, Ok(result))
{
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text: Some(error_text),
});
return;
}
}
RuntimeCommand::ToolError { id, error_text } => {
if let Err(runtime_error) =
module_loader::resolve_tool_response(scope, &id, Err(error_text))
{
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text: Some(runtime_error),
});
return;
}
}
}
scope.perform_microtask_checkpoint();
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
error_text,
} => {
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text,
});
return;
}
CompletionState::Pending => {}
}
if let Some(promise) = pending_promise.as_ref() {
let promise = v8::Local::new(scope, promise);
if promise.state() != v8::PromiseState::Pending {
pending_promise = None;
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;
use super::ExecuteRequest;
use super::RuntimeEvent;
use super::spawn_runtime;
fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
}
#[tokio::test]
async fn terminate_execution_stops_cpu_bound_module() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (_runtime_tx, runtime_terminate_handle) =
spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap();
let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(started_event, RuntimeEvent::Started));
assert!(runtime_terminate_handle.terminate_execution());
let result_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.unwrap();
let RuntimeEvent::Result {
stored_values,
error_text,
} = result_event
else {
panic!("expected runtime result after termination");
};
assert_eq!(stored_values, HashMap::new());
assert!(error_text.is_some());
assert!(
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.is_none()
);
}
}

View File

@@ -0,0 +1,333 @@
use serde_json::Value as JsonValue;
use tracing::warn;
use crate::description::EnabledToolMetadata;
use super::CompletionState;
use super::EXIT_SENTINEL;
use super::MODULE_TOOLS_SYMBOL_KEY;
use super::RuntimeState;
use super::value::json_to_v8;
use super::value::value_to_error_text;
pub(super) fn evaluate_main_module(
scope: &mut v8::PinScope<'_, '_>,
source_text: &str,
) -> Result<Option<v8::Global<v8::Promise>>, String> {
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let source = v8::String::new(&tc, source_text)
.ok_or_else(|| "failed to allocate exec source".to_string())?;
let origin = script_origin(&mut tc, "exec_main.mjs")?;
let mut source = v8::script_compiler::Source::new(source, Some(&origin));
let module = v8::script_compiler::compile_module(&tc, &mut source).ok_or_else(|| {
tc.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string())
})?;
module
.instantiate_module(&tc, resolve_module_callback)
.ok_or_else(|| {
tc.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string())
})?;
let result = match module.evaluate(&tc) {
Some(result) => result,
None => {
if let Some(exception) = tc.exception() {
if is_exit_exception(&mut tc, exception) {
return Ok(None);
}
return Err(value_to_error_text(&mut tc, exception));
}
return Err("unknown code mode exception".to_string());
}
};
tc.perform_microtask_checkpoint();
if result.is_promise() {
let promise = v8::Local::<v8::Promise>::try_from(result)
.map_err(|_| "failed to read exec promise".to_string())?;
return Ok(Some(v8::Global::new(&tc, promise)));
}
Ok(None)
}
fn is_exit_exception(
scope: &mut v8::PinScope<'_, '_>,
exception: v8::Local<'_, v8::Value>,
) -> bool {
scope
.get_slot::<RuntimeState>()
.map(|state| state.exit_requested)
.unwrap_or(false)
&& exception.is_string()
&& exception.to_rust_string_lossy(scope) == EXIT_SENTINEL
}
pub(super) fn resolve_tool_response(
scope: &mut v8::PinScope<'_, '_>,
id: &str,
response: Result<JsonValue, String>,
) -> Result<(), String> {
let resolver = {
let state = scope
.get_slot_mut::<RuntimeState>()
.ok_or_else(|| "runtime state unavailable".to_string())?;
state.pending_tool_calls.remove(id)
}
.ok_or_else(|| format!("unknown tool call `{id}`"))?;
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let resolver = v8::Local::new(&tc, &resolver);
match response {
Ok(result) => {
let value = json_to_v8(&mut tc, &result)
.ok_or_else(|| "failed to serialize tool response".to_string())?;
resolver.resolve(&tc, value);
}
Err(error_text) => {
let value = v8::String::new(&tc, &error_text)
.ok_or_else(|| "failed to allocate tool error".to_string())?;
resolver.reject(&tc, value.into());
}
}
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
Ok(())
}
pub(super) fn completion_state(
scope: &mut v8::PinScope<'_, '_>,
pending_promise: Option<&v8::Global<v8::Promise>>,
) -> CompletionState {
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
let Some(pending_promise) = pending_promise else {
return CompletionState::Completed {
stored_values,
error_text: None,
};
};
let promise = v8::Local::new(scope, pending_promise);
match promise.state() {
v8::PromiseState::Pending => CompletionState::Pending,
v8::PromiseState::Fulfilled => CompletionState::Completed {
stored_values,
error_text: None,
},
v8::PromiseState::Rejected => {
let result = promise.result(scope);
let error_text = if is_exit_exception(scope, result) {
None
} else {
Some(value_to_error_text(scope, result))
};
CompletionState::Completed {
stored_values,
error_text,
}
}
}
}
fn script_origin<'s>(
scope: &mut v8::PinScope<'s, '_>,
resource_name_: &str,
) -> Result<v8::ScriptOrigin<'s>, String> {
let resource_name = v8::String::new(scope, resource_name_)
.ok_or_else(|| "failed to allocate script origin".to_string())?;
let source_map_url = v8::String::new(scope, resource_name_)
.ok_or_else(|| "failed to allocate source map url".to_string())?;
Ok(v8::ScriptOrigin::new(
scope,
resource_name.into(),
0,
0,
true,
0,
Some(source_map_url.into()),
true,
false,
true,
None,
))
}
fn resolve_module_callback<'s>(
context: v8::Local<'s, v8::Context>,
specifier: v8::Local<'s, v8::String>,
_import_attributes: v8::Local<'s, v8::FixedArray>,
_referrer: v8::Local<'s, v8::Module>,
) -> Option<v8::Local<'s, v8::Module>> {
v8::callback_scope!(unsafe scope, context);
let specifier = specifier.to_rust_string_lossy(scope);
resolve_module(scope, &specifier)
}
pub(super) fn dynamic_import_callback<'s>(
scope: &mut v8::PinScope<'s, '_>,
_host_defined_options: v8::Local<'s, v8::Data>,
_resource_name: v8::Local<'s, v8::Value>,
specifier: v8::Local<'s, v8::String>,
_import_attributes: v8::Local<'s, v8::FixedArray>,
) -> Option<v8::Local<'s, v8::Promise>> {
let specifier = specifier.to_rust_string_lossy(scope);
let resolver = v8::PromiseResolver::new(scope)?;
match resolve_module(scope, &specifier) {
Some(module) => {
if module.get_status() == v8::ModuleStatus::Uninstantiated
&& module
.instantiate_module(scope, resolve_module_callback)
.is_none()
{
let error = v8::String::new(scope, "failed to instantiate module")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
return Some(resolver.get_promise(scope));
}
if matches!(
module.get_status(),
v8::ModuleStatus::Instantiated | v8::ModuleStatus::Evaluated
) && module.evaluate(scope).is_none()
{
let error = v8::String::new(scope, "failed to evaluate module")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
return Some(resolver.get_promise(scope));
}
let namespace = module.get_module_namespace();
resolver.resolve(scope, namespace);
Some(resolver.get_promise(scope))
}
None => {
let error = v8::String::new(scope, "unsupported import in exec")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
Some(resolver.get_promise(scope))
}
}
}
fn resolve_module<'s>(
scope: &mut v8::PinScope<'s, '_>,
specifier: &str,
) -> Option<v8::Local<'s, v8::Module>> {
if let Some(existing) = scope
.get_slot::<RuntimeState>()
.and_then(|state| state.module_cache.get(specifier).cloned())
{
return Some(v8::Local::new(scope, existing));
}
let module_source = scope
.get_slot::<RuntimeState>()
.and_then(|state| helper_module_source(&state.enabled_tools, specifier));
let module_source = match module_source {
Some(source) => source,
None => {
if let Some(message) =
v8::String::new(scope, &format!("Unsupported import in exec: {specifier}"))
{
scope.throw_exception(message.into());
} else {
scope.throw_exception(v8::undefined(scope).into());
}
return None;
}
};
let source = v8::String::new(scope, &module_source)?;
let origin = script_origin(scope, specifier).ok()?;
let mut source = v8::script_compiler::Source::new(source, Some(&origin));
let module = match v8::script_compiler::compile_module(scope, &mut source) {
Some(module) => module,
None => {
warn!("failed to compile helper module `{specifier}`");
return None;
}
};
let global = v8::Global::new(scope, module);
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.module_cache.insert(specifier.to_string(), global);
}
Some(module)
}
fn helper_module_source(enabled_tools: &[EnabledToolMetadata], specifier: &str) -> Option<String> {
if specifier == "tools.js" {
let mut lines = vec![
"const tools = globalThis.tools;".to_string(),
"export const ALL_TOOLS = globalThis.ALL_TOOLS;".to_string(),
];
for tool in enabled_tools {
if tool.global_name != "ALL_TOOLS" {
lines.push(format!(
"export const {} = tools.{};",
tool.global_name, tool.global_name
));
}
}
return Some(lines.join("\n"));
}
if specifier == "@openai/code_mode" || specifier == "openai/code_mode" {
return Some(
[
"export const exit = globalThis.exit;",
"export const image = globalThis.image;",
"export const load = globalThis.load;",
"export const notify = globalThis.notify;",
"export const output_image = globalThis.image;",
"export const output_text = globalThis.text;",
"export const store = globalThis.store;",
"export const text = globalThis.text;",
"export const yield_control = globalThis.yield_control;",
]
.join("\n"),
);
}
let namespace = specifier
.strip_prefix("tools/")?
.strip_suffix(".js")?
.split('/')
.filter(|segment| !segment.is_empty())
.map(str::to_string)
.collect::<Vec<_>>();
if namespace.is_empty() {
return None;
}
let exports = enabled_tools
.iter()
.filter(|tool| tool.namespace == namespace)
.map(|tool| tool.name.clone())
.collect::<Vec<_>>();
if exports.is_empty() {
return None;
}
let namespace_key = namespace.join("/");
let mut lines = vec![format!(
"const tools = globalThis[Symbol.for({MODULE_TOOLS_SYMBOL_KEY:?})][{namespace_key:?}];"
)];
for export in exports {
lines.push(format!("export const {export} = tools.{export};"));
}
Some(lines.join("\n"))
}

View File

@@ -0,0 +1,190 @@
use serde_json::Value as JsonValue;
use crate::response::FunctionCallOutputContentItem;
use crate::response::ImageDetail;
pub(super) fn serialize_output_text(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<String, String> {
if value.is_string() {
return Ok(value.to_rust_string_lossy(scope));
}
if value.is_undefined()
|| value.is_null()
|| value.is_boolean()
|| value.is_number()
|| value.is_big_int()
{
return Ok(value.to_rust_string_lossy(scope));
}
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
if let Some(stringified) = v8::json::stringify(&tc, value) {
return Ok(stringified.to_rust_string_lossy(&tc));
}
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
Ok(value.to_rust_string_lossy(&tc))
}
pub(super) fn normalize_output_image(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<FunctionCallOutputContentItem, ()> {
let (image_url, detail) = if value.is_string() {
(value.to_rust_string_lossy(scope), None)
} else if value.is_object() && !value.is_array() {
let Ok(object) = v8::Local::<v8::Object>::try_from(value) else {
throw_type_error(
scope,
"image expects a non-empty image URL string or an object with image_url and optional detail",
);
return Err(());
};
let Some(image_url_key) = v8::String::new(scope, "image_url") else {
throw_type_error(scope, "failed to allocate image helper keys");
return Err(());
};
let Some(detail_key) = v8::String::new(scope, "detail") else {
throw_type_error(scope, "failed to allocate image helper keys");
return Err(());
};
let image_url = object
.get(scope, image_url_key.into())
.filter(|value| value.is_string())
.map(|value| value.to_rust_string_lossy(scope));
let detail = object.get(scope, detail_key.into()).and_then(|value| {
if value.is_string() {
Some(value.to_rust_string_lossy(scope))
} else if value.is_null() || value.is_undefined() {
None
} else {
throw_type_error(scope, "image detail must be a string when provided");
None
}
});
let Some(image_url) = image_url else {
throw_type_error(
scope,
"image expects a non-empty image URL string or an object with image_url and optional detail",
);
return Err(());
};
(image_url, detail)
} else {
throw_type_error(
scope,
"image expects a non-empty image URL string or an object with image_url and optional detail",
);
return Err(());
};
if image_url.is_empty() {
throw_type_error(
scope,
"image expects a non-empty image URL string or an object with image_url and optional detail",
);
return Err(());
}
let lower = image_url.to_ascii_lowercase();
if !(lower.starts_with("http://")
|| lower.starts_with("https://")
|| lower.starts_with("data:"))
{
throw_type_error(scope, "image expects an http(s) or data URL");
return Err(());
}
let detail = detail.and_then(|detail| {
let normalized = detail.to_ascii_lowercase();
match normalized.as_str() {
"auto" => Some(ImageDetail::Auto),
"low" => Some(ImageDetail::Low),
"high" => Some(ImageDetail::High),
"original" => Some(ImageDetail::Original),
_ => {
throw_type_error(
scope,
"image detail must be one of: auto, low, high, original",
);
None
}
}
});
Ok(FunctionCallOutputContentItem::InputImage { image_url, detail })
}
pub(super) fn content_item_to_js_value<'s>(
scope: &mut v8::PinScope<'s, '_>,
item: &FunctionCallOutputContentItem,
) -> Option<v8::Local<'s, v8::Value>> {
let value = match item {
FunctionCallOutputContentItem::InputText { text } => serde_json::json!({
"type": "input_text",
"text": text,
}),
FunctionCallOutputContentItem::InputImage { image_url, detail } => serde_json::json!({
"type": "input_image",
"image_url": image_url,
"detail": detail,
}),
};
json_to_v8(scope, &value)
}
pub(super) fn v8_value_to_json(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<Option<JsonValue>, String> {
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let Some(stringified) = v8::json::stringify(&tc, value) else {
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
return Ok(None);
};
serde_json::from_str(&stringified.to_rust_string_lossy(&tc))
.map(Some)
.map_err(|err| format!("failed to serialize JavaScript value: {err}"))
}
pub(super) fn json_to_v8<'s>(
scope: &mut v8::PinScope<'s, '_>,
value: &JsonValue,
) -> Option<v8::Local<'s, v8::Value>> {
let json = serde_json::to_string(value).ok()?;
let json = v8::String::new(scope, &json)?;
v8::json::parse(scope, json)
}
pub(super) fn value_to_error_text(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> String {
if value.is_object()
&& let Ok(object) = v8::Local::<v8::Object>::try_from(value)
&& let Some(key) = v8::String::new(scope, "stack")
&& let Some(stack) = object.get(scope, key.into())
&& stack.is_string()
{
return stack.to_rust_string_lossy(scope);
}
value.to_rust_string_lossy(scope)
}
pub(super) fn throw_type_error(scope: &mut v8::PinScope<'_, '_>, message: &str) {
if let Some(message) = v8::String::new(scope, message) {
scope.throw_exception(message.into());
}
}

View File

@@ -0,0 +1,604 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use async_trait::async_trait;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::FunctionCallOutputContentItem;
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
use crate::runtime::ExecuteRequest;
use crate::runtime::RuntimeCommand;
use crate::runtime::RuntimeEvent;
use crate::runtime::RuntimeResponse;
use crate::runtime::TurnMessage;
use crate::runtime::WaitRequest;
use crate::runtime::spawn_runtime;
#[async_trait]
pub trait CodeModeTurnHost: Send + Sync {
async fn invoke_tool(
&self,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String>;
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String>;
}
#[derive(Clone)]
struct SessionHandle {
control_tx: mpsc::UnboundedSender<SessionControlCommand>,
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
}
struct Inner {
stored_values: Mutex<HashMap<String, JsonValue>>,
sessions: Mutex<HashMap<String, SessionHandle>>,
turn_message_tx: mpsc::UnboundedSender<TurnMessage>,
turn_message_rx: Arc<Mutex<mpsc::UnboundedReceiver<TurnMessage>>>,
next_cell_id: AtomicU64,
}
pub struct CodeModeService {
inner: Arc<Inner>,
}
impl CodeModeService {
pub fn new() -> Self {
let (turn_message_tx, turn_message_rx) = mpsc::unbounded_channel();
Self {
inner: Arc::new(Inner {
stored_values: Mutex::new(HashMap::new()),
sessions: Mutex::new(HashMap::new()),
turn_message_tx,
turn_message_rx: Arc::new(Mutex::new(turn_message_rx)),
next_cell_id: AtomicU64::new(1),
}),
}
}
pub async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.inner.stored_values.lock().await.clone()
}
pub async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.inner.stored_values.lock().await = values;
}
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
let cell_id = self
.inner
.next_cell_id
.fetch_add(1, Ordering::Relaxed)
.to_string();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?;
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = oneshot::channel();
self.inner.sessions.lock().await.insert(
cell_id.clone(),
SessionHandle {
control_tx: control_tx.clone(),
runtime_tx: runtime_tx.clone(),
},
);
tokio::spawn(run_session_control(
Arc::clone(&self.inner),
SessionControlContext {
cell_id: cell_id.clone(),
max_output_tokens_per_exec_call: request.max_output_tokens,
runtime_tx,
runtime_terminate_handle,
},
event_rx,
control_rx,
response_tx,
request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS),
));
response_rx
.await
.map_err(|_| "exec runtime ended unexpectedly".to_string())
}
pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
let cell_id = request.cell_id.clone();
let handle = self
.inner
.sessions
.lock()
.await
.get(&request.cell_id)
.cloned();
let Some(handle) = handle else {
return Ok(missing_cell_response(cell_id));
};
let (response_tx, response_rx) = oneshot::channel();
let control_message = if request.terminate {
SessionControlCommand::Terminate { response_tx }
} else {
SessionControlCommand::Poll {
yield_time_ms: request.yield_time_ms,
response_tx,
}
};
if handle.control_tx.send(control_message).is_err() {
return Ok(missing_cell_response(cell_id));
}
match response_rx.await {
Ok(response) => Ok(response),
Err(_) => Ok(missing_cell_response(request.cell_id)),
}
}
pub fn start_turn_worker(&self, host: Arc<dyn CodeModeTurnHost>) -> CodeModeTurnWorker {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let inner = Arc::clone(&self.inner);
let turn_message_rx = Arc::clone(&self.inner.turn_message_rx);
tokio::spawn(async move {
loop {
let next_message = tokio::select! {
_ = &mut shutdown_rx => break,
message = async {
let mut turn_message_rx = turn_message_rx.lock().await;
turn_message_rx.recv().await
} => message,
};
let Some(next_message) = next_message else {
break;
};
match next_message {
TurnMessage::Notify {
cell_id,
call_id,
text,
} => {
if let Err(err) = host.notify(call_id, cell_id.clone(), text).await {
warn!(
"failed to deliver code mode notification for cell {cell_id}: {err}"
);
}
}
TurnMessage::ToolCall {
cell_id,
id,
name,
input,
} => {
let host = Arc::clone(&host);
let inner = Arc::clone(&inner);
tokio::spawn(async move {
let response = host
.invoke_tool(name, input, CancellationToken::new())
.await;
let runtime_tx = inner
.sessions
.lock()
.await
.get(&cell_id)
.map(|handle| handle.runtime_tx.clone());
let Some(runtime_tx) = runtime_tx else {
return;
};
let command = match response {
Ok(result) => RuntimeCommand::ToolResponse { id, result },
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
};
let _ = runtime_tx.send(command);
});
}
}
}
});
CodeModeTurnWorker {
shutdown_tx: Some(shutdown_tx),
}
}
}
impl Default for CodeModeService {
fn default() -> Self {
Self::new()
}
}
pub struct CodeModeTurnWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl Drop for CodeModeTurnWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
enum SessionControlCommand {
Poll {
yield_time_ms: u64,
response_tx: oneshot::Sender<RuntimeResponse>,
},
Terminate {
response_tx: oneshot::Sender<RuntimeResponse>,
},
}
struct PendingResult {
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
}
struct SessionControlContext {
cell_id: String,
max_output_tokens_per_exec_call: Option<usize>,
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
runtime_terminate_handle: v8::IsolateHandle,
}
fn missing_cell_response(cell_id: String) -> RuntimeResponse {
RuntimeResponse::Result {
error_text: Some(format!("exec cell {cell_id} not found")),
cell_id,
content_items: Vec::new(),
stored_values: HashMap::new(),
max_output_tokens_per_exec_call: None,
}
}
async fn run_session_control(
inner: Arc<Inner>,
context: SessionControlContext,
mut event_rx: mpsc::UnboundedReceiver<RuntimeEvent>,
mut control_rx: mpsc::UnboundedReceiver<SessionControlCommand>,
initial_response_tx: oneshot::Sender<RuntimeResponse>,
initial_yield_time_ms: u64,
) {
let SessionControlContext {
cell_id,
max_output_tokens_per_exec_call,
runtime_tx,
runtime_terminate_handle,
} = context;
let mut content_items = Vec::new();
let mut pending_result: Option<PendingResult> = None;
let mut response_tx = Some(initial_response_tx);
let mut termination_requested = false;
let mut runtime_closed = false;
let mut yield_timer: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
loop {
tokio::select! {
maybe_event = async {
if runtime_closed {
std::future::pending::<Option<RuntimeEvent>>().await
} else {
event_rx.recv().await
}
} => {
let Some(event) = maybe_event else {
runtime_closed = true;
if termination_requested {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
}
if pending_result.is_none() {
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values: HashMap::new(),
error_text: Some("exec runtime ended unexpectedly".to_string()),
};
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Result {
cell_id: cell_id.clone(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
max_output_tokens_per_exec_call,
});
break;
}
pending_result = Some(result);
}
continue;
};
match event {
RuntimeEvent::Started => {
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms))));
}
RuntimeEvent::ContentItem(item) => {
content_items.push(item);
}
RuntimeEvent::YieldRequested => {
yield_timer = None;
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Yielded {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
}
RuntimeEvent::Notify { call_id, text } => {
let _ = inner.turn_message_tx.send(TurnMessage::Notify {
cell_id: cell_id.clone(),
call_id,
text,
});
}
RuntimeEvent::ToolCall { id, name, input } => {
let _ = inner.turn_message_tx.send(TurnMessage::ToolCall {
cell_id: cell_id.clone(),
id,
name,
input,
});
}
RuntimeEvent::Result {
stored_values,
error_text,
} => {
yield_timer = None;
if termination_requested {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
}
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values,
error_text,
};
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Result {
cell_id: cell_id.clone(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
max_output_tokens_per_exec_call,
});
break;
}
pending_result = Some(result);
}
}
}
maybe_command = control_rx.recv() => {
let Some(command) = maybe_command else {
break;
};
match command {
SessionControlCommand::Poll {
yield_time_ms,
response_tx: next_response_tx,
} => {
if let Some(result) = pending_result.take() {
let _ = next_response_tx.send(RuntimeResponse::Result {
cell_id: cell_id.clone(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
max_output_tokens_per_exec_call,
});
break;
}
response_tx = Some(next_response_tx);
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(yield_time_ms))));
}
SessionControlCommand::Terminate { response_tx: next_response_tx } => {
if let Some(result) = pending_result.take() {
let _ = next_response_tx.send(RuntimeResponse::Result {
cell_id: cell_id.clone(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
max_output_tokens_per_exec_call,
});
break;
}
response_tx = Some(next_response_tx);
termination_requested = true;
yield_timer = None;
let _ = runtime_tx.send(RuntimeCommand::Terminate);
let _ = runtime_terminate_handle.terminate_execution();
if runtime_closed {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
} else {
continue;
}
}
}
}
_ = async {
if let Some(yield_timer) = yield_timer.as_mut() {
yield_timer.await;
} else {
std::future::pending::<()>().await;
}
} => {
yield_timer = None;
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Yielded {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
}
}
}
let _ = runtime_tx.send(RuntimeCommand::Terminate);
inner.sessions.lock().await.remove(&cell_id);
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use super::CodeModeService;
use super::Inner;
use super::RuntimeCommand;
use super::RuntimeResponse;
use super::SessionControlCommand;
use super::SessionControlContext;
use super::run_session_control;
use crate::FunctionCallOutputContentItem;
use crate::runtime::ExecuteRequest;
use crate::runtime::RuntimeEvent;
use crate::runtime::spawn_runtime;
fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
}
fn test_inner() -> Arc<Inner> {
let (turn_message_tx, turn_message_rx) = mpsc::unbounded_channel();
Arc::new(Inner {
stored_values: Mutex::new(HashMap::new()),
sessions: Mutex::new(HashMap::new()),
turn_message_tx,
turn_message_rx: Arc::new(Mutex::new(turn_message_rx)),
next_cell_id: AtomicU64::new(1),
})
}
#[tokio::test]
async fn synchronous_exit_returns_successfully() {
let service = CodeModeService::new();
let response = service
.execute(ExecuteRequest {
source: r#"text("before"); exit(); text("after");"#.to_string(),
yield_time_ms: None,
..execute_request("")
})
.await
.unwrap();
assert_eq!(
response,
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "before".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
max_output_tokens_per_exec_call: None,
}
);
}
#[tokio::test]
async fn terminate_waits_for_runtime_shutdown_before_responding() {
let inner = test_inner();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (initial_response_tx, initial_response_rx) = oneshot::channel();
let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(
ExecuteRequest {
source: "await new Promise(() => {})".to_string(),
yield_time_ms: None,
..execute_request("")
},
runtime_event_tx,
)
.unwrap();
tokio::spawn(run_session_control(
inner,
SessionControlContext {
cell_id: "cell-1".to_string(),
max_output_tokens_per_exec_call: None,
runtime_tx: runtime_tx.clone(),
runtime_terminate_handle,
},
event_rx,
control_rx,
initial_response_tx,
60_000,
));
event_tx.send(RuntimeEvent::Started).unwrap();
event_tx.send(RuntimeEvent::YieldRequested).unwrap();
assert_eq!(
initial_response_rx.await.unwrap(),
RuntimeResponse::Yielded {
cell_id: "cell-1".to_string(),
content_items: Vec::new(),
}
);
let (terminate_response_tx, terminate_response_rx) = oneshot::channel();
control_tx
.send(SessionControlCommand::Terminate {
response_tx: terminate_response_tx,
})
.unwrap();
let terminate_response = async { terminate_response_rx.await.unwrap() };
tokio::pin!(terminate_response);
assert!(
tokio::time::timeout(Duration::from_millis(100), terminate_response.as_mut())
.await
.is_err()
);
drop(event_tx);
assert_eq!(
terminate_response.await,
RuntimeResponse::Terminated {
cell_id: "cell-1".to_string(),
content_items: Vec::new(),
}
);
let _ = runtime_tx.send(RuntimeCommand::Terminate);
}
}

View File

@@ -31,6 +31,8 @@ codex-api = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }
codex-code-mode = { workspace = true }
codex-client = { workspace = true }
codex-connectors = { workspace = true }
codex-config = { workspace = true }
codex-exec-server = { workspace = true }

View File

@@ -1,51 +0,0 @@
const __codexContentItems = Array.isArray(globalThis.__codexContentItems)
? globalThis.__codexContentItems
: [];
const __codexRuntime = globalThis.__codexRuntime;
delete globalThis.__codexRuntime;
Object.defineProperty(globalThis, '__codexContentItems', {
value: __codexContentItems,
configurable: true,
enumerable: false,
writable: false,
});
(() => {
if (!__codexRuntime || typeof __codexRuntime !== 'object') {
throw new Error('code mode runtime is unavailable');
}
function defineGlobal(name, value) {
Object.defineProperty(globalThis, name, {
value,
configurable: true,
enumerable: true,
writable: false,
});
}
defineGlobal('ALL_TOOLS', __codexRuntime.ALL_TOOLS);
defineGlobal('exit', __codexRuntime.exit);
defineGlobal('image', __codexRuntime.image);
defineGlobal('load', __codexRuntime.load);
defineGlobal('notify', __codexRuntime.notify);
defineGlobal('store', __codexRuntime.store);
defineGlobal('text', __codexRuntime.text);
defineGlobal('tools', __codexRuntime.tools);
defineGlobal('yield_control', __codexRuntime.yield_control);
defineGlobal(
'console',
Object.freeze({
log() {},
info() {},
warn() {},
error() {},
debug() {},
})
);
})();
__CODE_MODE_USER_CODE_PLACEHOLDER__

View File

@@ -1,19 +0,0 @@
## exec
- Runs raw JavaScript in an isolated context (no Node, no file system, or network access, no console).
- Send raw JavaScript source text, not JSON, quoted strings, or markdown code fences.
- You may optionally start the tool input with a first-line pragma like `// @exec: {"yield_time_ms": 10000, "max_output_tokens": 1000}`.
- `yield_time_ms` asks `exec` to yield early after that many milliseconds if the script is still running.
- `max_output_tokens` sets the token budget for direct `exec` results. By default the result is truncated to 10000 tokens.
- All nested tools are available on the global `tools` object, for example `await tools.exec_command(...)`. Tool names are exposed as normalized JavaScript identifiers, for example `await tools.mcp__ologs__get_profile(...)`.
- Tool methods take either string or object as parameter.
- They return either a structured value or a string based on the description above.
- Global helpers:
- `exit()`: Immediately ends the current script successfully (like an early return from the top level).
- `text(value: string | number | boolean | undefined | null)`: Appends a text item and returns it. Non-string values are stringified with `JSON.stringify(...)` when possible.
- `image(imageUrlOrItem: string | { image_url: string; detail?: "auto" | "low" | "high" | "original" | null })`: Appends an image item and returns it. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL.
- `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session.
- `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing.
- `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`.
- `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries.
- `yield_control()`: yields the accumulated output to the model immediately while the script keeps running.

View File

@@ -1,8 +1,5 @@
use async_trait::async_trait;
use serde::Deserialize;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
@@ -10,178 +7,52 @@ use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use super::CODE_MODE_PRAGMA_PREFIX;
use super::CodeModeSessionProgress;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::build_enabled_tools;
use super::handle_node_message;
use super::protocol::HostToNodeMessage;
use super::protocol::build_source;
use super::handle_runtime_response;
pub struct CodeModeExecuteHandler;
const MAX_JS_SAFE_INTEGER: u64 = (1_u64 << 53) - 1;
#[derive(Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct CodeModeExecPragma {
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
}
#[derive(Debug, PartialEq, Eq)]
struct CodeModeExecArgs {
code: String,
yield_time_ms: Option<u64>,
max_output_tokens: Option<usize>,
}
impl CodeModeExecuteHandler {
async fn execute(
&self,
session: std::sync::Arc<Session>,
turn: std::sync::Arc<TurnContext>,
session: std::sync::Arc<crate::codex::Session>,
turn: std::sync::Arc<crate::codex::TurnContext>,
call_id: String,
code: String,
) -> Result<FunctionToolOutput, FunctionCallError> {
let args = parse_freeform_args(&code)?;
let args =
codex_code_mode::parse_exec_source(&code).map_err(FunctionCallError::RespondToModel)?;
let exec = ExecContext { session, turn };
let enabled_tools = build_enabled_tools(&exec).await;
let service = &exec.session.services.code_mode_service;
let stored_values = service.stored_values().await;
let source =
build_source(&args.code, &enabled_tools).map_err(FunctionCallError::RespondToModel)?;
let cell_id = service.allocate_cell_id().await;
let request_id = service.allocate_request_id().await;
let process_slot = service
.ensure_started()
.await
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let stored_values = exec
.session
.services
.code_mode_service
.stored_values()
.await;
let started_at = std::time::Instant::now();
let message = HostToNodeMessage::Start {
request_id: request_id.clone(),
cell_id: cell_id.clone(),
tool_call_id: call_id,
default_yield_time_ms: super::DEFAULT_EXEC_YIELD_TIME_MS,
enabled_tools,
stored_values,
source,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
};
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(
&exec, cell_id, message, /*poll_max_output_tokens*/ None, started_at,
)
let response = exec
.session
.services
.code_mode_service
.execute(codex_code_mode::ExecuteRequest {
tool_call_id: call_id,
enabled_tools,
source: args.code,
stored_values,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
})
.await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
}
}
fn parse_freeform_args(input: &str) -> Result<CodeModeExecArgs, FunctionCallError> {
if input.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec expects raw JavaScript source text (non-empty). Provide JS only, optionally with first-line `// @exec: {\"yield_time_ms\": 10000, \"max_output_tokens\": 1000}`.".to_string(),
));
}
let mut args = CodeModeExecArgs {
code: input.to_string(),
yield_time_ms: None,
max_output_tokens: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(CODE_MODE_PRAGMA_PREFIX) else {
return Ok(args);
};
if rest.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec pragma must be followed by JavaScript source on subsequent lines".to_string(),
));
}
let directive = pragma.trim();
if directive.is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
));
}
let value: serde_json::Value = serde_json::from_str(directive).map_err(|err| {
FunctionCallError::RespondToModel(format!(
"exec pragma must be valid JSON with supported fields `yield_time_ms` and `max_output_tokens`: {err}"
))
})?;
let object = value.as_object().ok_or_else(|| {
FunctionCallError::RespondToModel(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
.map_err(FunctionCallError::RespondToModel)?;
handle_runtime_response(
&exec, response, /*poll_max_output_tokens*/ None, started_at,
)
})?;
for key in object.keys() {
match key.as_str() {
"yield_time_ms" | "max_output_tokens" => {}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"exec pragma only supports `yield_time_ms` and `max_output_tokens`; got `{key}`"
)));
}
}
.await
.map_err(FunctionCallError::RespondToModel)
}
let pragma: CodeModeExecPragma = serde_json::from_value(value).map_err(|err| {
FunctionCallError::RespondToModel(format!(
"exec pragma fields `yield_time_ms` and `max_output_tokens` must be non-negative safe integers: {err}"
))
})?;
if pragma
.yield_time_ms
.is_some_and(|yield_time_ms| yield_time_ms > MAX_JS_SAFE_INTEGER)
{
return Err(FunctionCallError::RespondToModel(
"exec pragma field `yield_time_ms` must be a non-negative safe integer".to_string(),
));
}
if pragma.max_output_tokens.is_some_and(|max_output_tokens| {
u64::try_from(max_output_tokens)
.map(|max_output_tokens| max_output_tokens > MAX_JS_SAFE_INTEGER)
.unwrap_or(true)
}) {
return Err(FunctionCallError::RespondToModel(
"exec pragma field `max_output_tokens` must be a non-negative safe integer".to_string(),
));
}
args.code = rest.to_string();
args.yield_time_ms = pragma.yield_time_ms;
args.max_output_tokens = pragma.max_output_tokens;
Ok(args)
}
#[async_trait]
@@ -216,7 +87,3 @@ impl ToolHandler for CodeModeExecuteHandler {
}
}
}
#[cfg(test)]
#[path = "execute_handler_tests.rs"]
mod execute_handler_tests;

View File

@@ -1,15 +1,19 @@
mod execute_handler;
mod process;
mod protocol;
mod service;
mod response_adapter;
mod wait_handler;
mod worker;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_code_mode::CodeModeTurnHost;
use codex_code_mode::RuntimeResponse;
use codex_features::Feature;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use serde_json::Value as JsonValue;
use tokio_util::sync::CancellationToken;
use crate::client_common::tools::ToolSpec;
use crate::codex::Session;
@@ -17,9 +21,8 @@ use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
use crate::tools::ToolRouter;
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
use crate::tools::code_mode_description::code_mode_tool_reference;
use crate::tools::code_mode_description::normalize_code_mode_identifier;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolPayload;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolCall;
@@ -30,140 +33,189 @@ use crate::truncate::formatted_truncate_text_content_items_with_policy;
use crate::truncate::truncate_function_output_items_with_policy;
use crate::unified_exec::resolve_max_tokens;
const CODE_MODE_RUNNER_SOURCE: &str = include_str!("runner.cjs");
const CODE_MODE_BRIDGE_SOURCE: &str = include_str!("bridge.js");
const CODE_MODE_DESCRIPTION_TEMPLATE: &str = include_str!("description.md");
const CODE_MODE_WAIT_DESCRIPTION_TEMPLATE: &str = include_str!("wait_description.md");
const CODE_MODE_PRAGMA_PREFIX: &str = "// @exec:";
const CODE_MODE_ONLY_PREFACE: &str =
"Use `exec/wait` tool to run all other tools, do not attempt to use any other tools directly";
pub(crate) use execute_handler::CodeModeExecuteHandler;
use response_adapter::into_function_call_output_content_items;
pub(crate) use wait_handler::CodeModeWaitHandler;
pub(crate) const PUBLIC_TOOL_NAME: &str = "exec";
pub(crate) const WAIT_TOOL_NAME: &str = "wait";
pub(crate) fn is_code_mode_nested_tool(tool_name: &str) -> bool {
tool_name != PUBLIC_TOOL_NAME && tool_name != WAIT_TOOL_NAME
}
pub(crate) const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const PUBLIC_TOOL_NAME: &str = codex_code_mode::PUBLIC_TOOL_NAME;
pub(crate) const WAIT_TOOL_NAME: &str = codex_code_mode::WAIT_TOOL_NAME;
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = codex_code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
#[derive(Clone)]
pub(super) struct ExecContext {
pub(crate) struct ExecContext {
pub(super) session: Arc<Session>,
pub(super) turn: Arc<TurnContext>,
}
pub(crate) use execute_handler::CodeModeExecuteHandler;
pub(crate) use service::CodeModeService;
pub(crate) use wait_handler::CodeModeWaitHandler;
enum CodeModeSessionProgress {
Finished(FunctionToolOutput),
Yielded { output: FunctionToolOutput },
pub(crate) struct CodeModeService {
inner: codex_code_mode::CodeModeService,
}
enum CodeModeExecutionStatus {
Completed,
Failed,
Running(String),
Terminated,
impl CodeModeService {
pub(crate) fn new(_js_repl_node_path: Option<PathBuf>) -> Self {
Self {
inner: codex_code_mode::CodeModeService::new(),
}
}
pub(crate) async fn stored_values(&self) -> std::collections::HashMap<String, JsonValue> {
self.inner.stored_values().await
}
pub(crate) async fn replace_stored_values(
&self,
values: std::collections::HashMap<String, JsonValue>,
) {
self.inner.replace_stored_values(values).await;
}
pub(crate) async fn execute(
&self,
request: codex_code_mode::ExecuteRequest,
) -> Result<RuntimeResponse, String> {
self.inner.execute(request).await
}
pub(crate) async fn wait(
&self,
request: codex_code_mode::WaitRequest,
) -> Result<RuntimeResponse, String> {
self.inner.wait(request).await
}
pub(crate) async fn start_turn_worker(
&self,
session: &Arc<Session>,
turn: &Arc<TurnContext>,
router: Arc<ToolRouter>,
tracker: SharedTurnDiffTracker,
) -> Option<codex_code_mode::CodeModeTurnWorker> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}
let exec = ExecContext {
session: Arc::clone(session),
turn: Arc::clone(turn),
};
let tool_runtime =
ToolCallRuntime::new(router, Arc::clone(session), Arc::clone(turn), tracker);
let host = Arc::new(CoreTurnHost { exec, tool_runtime });
Some(self.inner.start_turn_worker(host))
}
}
struct CoreTurnHost {
exec: ExecContext,
tool_runtime: ToolCallRuntime,
}
#[async_trait::async_trait]
impl CodeModeTurnHost for CoreTurnHost {
async fn invoke_tool(
&self,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String> {
call_nested_tool(
self.exec.clone(),
self.tool_runtime.clone(),
tool_name,
input,
cancellation_token,
)
.await
.map_err(|error| error.to_string())
}
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String> {
if text.trim().is_empty() {
return Ok(());
}
self.exec
.session
.inject_response_items(vec![ResponseInputItem::CustomToolCallOutput {
call_id,
name: Some(PUBLIC_TOOL_NAME.to_string()),
output: FunctionCallOutputPayload::from_text(text),
}])
.await
.map_err(|_| {
format!("failed to inject exec notify message for cell {cell_id}: no active turn")
})
}
}
pub(crate) fn is_code_mode_nested_tool(tool_name: &str) -> bool {
codex_code_mode::is_code_mode_nested_tool(tool_name)
}
pub(crate) fn tool_description(enabled_tools: &[(String, String)], code_mode_only: bool) -> String {
let description_template = CODE_MODE_DESCRIPTION_TEMPLATE.trim_end();
if !code_mode_only {
return description_template.to_string();
}
let mut sections = vec![
CODE_MODE_ONLY_PREFACE.to_string(),
description_template.to_string(),
];
if !enabled_tools.is_empty() {
let nested_tool_reference = enabled_tools
.iter()
.map(|(name, nested_description)| {
let global_name = normalize_code_mode_identifier(name);
format!(
"### `{global_name}` (`{name}`)\n{}",
nested_description.trim()
)
})
.collect::<Vec<_>>()
.join("\n\n");
sections.push(nested_tool_reference);
}
sections.join("\n\n")
codex_code_mode::build_exec_tool_description(enabled_tools, code_mode_only)
}
pub(crate) fn wait_tool_description() -> &'static str {
CODE_MODE_WAIT_DESCRIPTION_TEMPLATE
codex_code_mode::build_wait_tool_description()
}
async fn handle_node_message(
pub(super) async fn handle_runtime_response(
exec: &ExecContext,
cell_id: String,
message: protocol::NodeToHostMessage,
response: RuntimeResponse,
poll_max_output_tokens: Option<Option<usize>>,
started_at: std::time::Instant,
) -> Result<CodeModeSessionProgress, String> {
match message {
protocol::NodeToHostMessage::ToolCall { .. } => Err(protocol::unexpected_tool_call_error()),
protocol::NodeToHostMessage::Notify { .. } => Err(format!(
"unexpected {PUBLIC_TOOL_NAME} notify message in response path"
)),
protocol::NodeToHostMessage::Yielded { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
) -> Result<FunctionToolOutput, String> {
match response {
RuntimeResponse::Yielded {
cell_id,
content_items,
} => {
let mut content_items = into_function_call_output_content_items(content_items);
content_items =
truncate_code_mode_result(content_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
&mut content_items,
CodeModeExecutionStatus::Running(cell_id),
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Yielded {
output: FunctionToolOutput::from_content(delta_items, Some(true)),
})
Ok(FunctionToolOutput::from_content(content_items, Some(true)))
}
protocol::NodeToHostMessage::Terminated { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
RuntimeResponse::Terminated { content_items, .. } => {
let mut content_items = into_function_call_output_content_items(content_items);
content_items =
truncate_code_mode_result(content_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
&mut content_items,
CodeModeExecutionStatus::Terminated,
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(true)),
))
Ok(FunctionToolOutput::from_content(content_items, Some(true)))
}
protocol::NodeToHostMessage::Result {
RuntimeResponse::Result {
content_items,
stored_values,
error_text,
max_output_tokens_per_exec_call,
..
} => {
let mut content_items = into_function_call_output_content_items(content_items);
exec.session
.services
.code_mode_service
.replace_stored_values(stored_values)
.await;
let mut delta_items = output_content_items_from_json_values(content_items)?;
let success = error_text.is_none();
if let Some(error_text) = error_text {
delta_items.push(FunctionCallOutputContentItem::InputText {
content_items.push(FunctionCallOutputContentItem::InputText {
text: format!("Script error:\n{error_text}"),
});
}
let mut delta_items = truncate_code_mode_result(
delta_items,
content_items = truncate_code_mode_result(
content_items,
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
);
prepend_script_status(
&mut delta_items,
&mut content_items,
if success {
CodeModeExecutionStatus::Completed
} else {
@@ -171,13 +223,21 @@ async fn handle_node_message(
},
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(success)),
Ok(FunctionToolOutput::from_content(
content_items,
Some(success),
))
}
}
}
enum CodeModeExecutionStatus {
Completed,
Failed,
Running(String),
Terminated,
}
fn prepend_script_status(
content_items: &mut Vec<FunctionCallOutputContentItem>,
status: CodeModeExecutionStatus,
@@ -216,21 +276,9 @@ fn truncate_code_mode_result(
truncate_function_output_items_with_policy(&items, policy)
}
fn output_content_items_from_json_values(
content_items: Vec<JsonValue>,
) -> Result<Vec<FunctionCallOutputContentItem>, String> {
content_items
.into_iter()
.enumerate()
.map(|(index, item)| {
serde_json::from_value(item).map_err(|err| {
format!("invalid {PUBLIC_TOOL_NAME} content item at index {index}: {err}")
})
})
.collect()
}
async fn build_enabled_tools(exec: &ExecContext) -> Vec<protocol::EnabledTool> {
pub(super) async fn build_enabled_tools(
exec: &ExecContext,
) -> Vec<codex_code_mode::ToolDefinition> {
let router = build_nested_router(exec).await;
let mut out = router
.specs()
@@ -238,39 +286,37 @@ async fn build_enabled_tools(exec: &ExecContext) -> Vec<protocol::EnabledTool> {
.map(|spec| augment_tool_spec_for_code_mode(spec, /*code_mode_enabled*/ true))
.filter_map(enabled_tool_from_spec)
.collect::<Vec<_>>();
out.sort_by(|left, right| left.tool_name.cmp(&right.tool_name));
out.dedup_by(|left, right| left.tool_name == right.tool_name);
out.sort_by(|left, right| left.name.cmp(&right.name));
out.dedup_by(|left, right| left.name == right.name);
out
}
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<protocol::EnabledTool> {
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<codex_code_mode::ToolDefinition> {
let tool_name = spec.name().to_string();
if !is_code_mode_nested_tool(&tool_name) {
return None;
}
let reference = code_mode_tool_reference(&tool_name);
let global_name = normalize_code_mode_identifier(&tool_name);
let (description, kind) = match spec {
ToolSpec::Function(tool) => (tool.description, protocol::CodeModeToolKind::Function),
ToolSpec::Freeform(tool) => (tool.description, protocol::CodeModeToolKind::Freeform),
match spec {
ToolSpec::Function(tool) => Some(codex_code_mode::ToolDefinition {
name: tool_name,
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Function,
input_schema: serde_json::to_value(&tool.parameters).ok(),
output_schema: tool.output_schema,
}),
ToolSpec::Freeform(tool) => Some(codex_code_mode::ToolDefinition {
name: tool_name,
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Freeform,
input_schema: None,
output_schema: None,
}),
ToolSpec::LocalShell {}
| ToolSpec::ImageGeneration { .. }
| ToolSpec::ToolSearch { .. }
| ToolSpec::WebSearch { .. } => {
return None;
}
};
Some(protocol::EnabledTool {
tool_name,
global_name,
module_path: reference.module_path,
namespace: reference.namespace,
name: normalize_code_mode_identifier(&reference.tool_key),
description,
kind,
})
| ToolSpec::WebSearch { .. } => None,
}
}
async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
@@ -303,7 +349,7 @@ async fn call_nested_tool(
tool_runtime: ToolCallRuntime,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: tokio_util::sync::CancellationToken,
cancellation_token: CancellationToken,
) -> Result<JsonValue, FunctionCallError> {
if tool_name == PUBLIC_TOOL_NAME {
return Err(FunctionCallError::RespondToModel(format!(
@@ -340,18 +386,18 @@ async fn call_nested_tool(
Ok(result.code_mode_result())
}
fn tool_kind_for_spec(spec: &ToolSpec) -> protocol::CodeModeToolKind {
fn tool_kind_for_spec(spec: &ToolSpec) -> codex_code_mode::CodeModeToolKind {
if matches!(spec, ToolSpec::Freeform(_)) {
protocol::CodeModeToolKind::Freeform
codex_code_mode::CodeModeToolKind::Freeform
} else {
protocol::CodeModeToolKind::Function
codex_code_mode::CodeModeToolKind::Function
}
}
fn tool_kind_for_name(
spec: Option<ToolSpec>,
tool_name: &str,
) -> Result<protocol::CodeModeToolKind, String> {
) -> Result<codex_code_mode::CodeModeToolKind, String> {
spec.as_ref()
.map(tool_kind_for_spec)
.ok_or_else(|| format!("tool `{tool_name}` is not enabled in {PUBLIC_TOOL_NAME}"))
@@ -364,8 +410,12 @@ fn build_nested_tool_payload(
) -> Result<ToolPayload, String> {
let actual_kind = tool_kind_for_name(spec, tool_name)?;
match actual_kind {
protocol::CodeModeToolKind::Function => build_function_tool_payload(tool_name, input),
protocol::CodeModeToolKind::Freeform => build_freeform_tool_payload(tool_name, input),
codex_code_mode::CodeModeToolKind::Function => {
build_function_tool_payload(tool_name, input)
}
codex_code_mode::CodeModeToolKind::Freeform => {
build_freeform_tool_payload(tool_name, input)
}
}
}

View File

@@ -1,173 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::warn;
use super::CODE_MODE_RUNNER_SOURCE;
use super::PUBLIC_TOOL_NAME;
use super::protocol::HostToNodeMessage;
use super::protocol::NodeToHostMessage;
use super::protocol::message_request_id;
pub(super) struct CodeModeProcess {
pub(super) child: tokio::process::Child,
pub(super) stdin: Arc<Mutex<tokio::process::ChildStdin>>,
pub(super) stdout_task: JoinHandle<()>,
pub(super) response_waiters: Arc<Mutex<HashMap<String, oneshot::Sender<NodeToHostMessage>>>>,
pub(super) message_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeToHostMessage>>>,
}
impl CodeModeProcess {
pub(super) async fn send(
&mut self,
request_id: &str,
message: &HostToNodeMessage,
) -> Result<NodeToHostMessage, std::io::Error> {
if self.stdout_task.is_finished() {
return Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
)));
}
let (tx, rx) = oneshot::channel();
self.response_waiters
.lock()
.await
.insert(request_id.to_string(), tx);
if let Err(err) = write_message(&self.stdin, message).await {
self.response_waiters.lock().await.remove(request_id);
return Err(err);
}
match rx.await {
Ok(message) => Ok(message),
Err(_) => Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
))),
}
}
pub(super) fn has_exited(&mut self) -> Result<bool, std::io::Error> {
self.child
.try_wait()
.map(|status| status.is_some())
.map_err(std::io::Error::other)
}
}
pub(super) async fn spawn_code_mode_process(
node_path: &std::path::Path,
) -> Result<CodeModeProcess, std::io::Error> {
let mut cmd = tokio::process::Command::new(node_path);
cmd.arg("--experimental-vm-modules");
cmd.arg("--eval");
cmd.arg(CODE_MODE_RUNNER_SOURCE);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().map_err(std::io::Error::other)?;
let stdout = child.stdout.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdout"))
})?;
let stderr = child.stderr.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stderr"))
})?;
let stdin = child
.stdin
.take()
.ok_or_else(|| std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdin")))?;
let stdin = Arc::new(Mutex::new(stdin));
let response_waiters = Arc::new(Mutex::new(HashMap::<
String,
oneshot::Sender<NodeToHostMessage>,
>::new()));
let (message_tx, message_rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = Vec::new();
match reader.read_to_end(&mut buf).await {
Ok(_) => {
let stderr = String::from_utf8_lossy(&buf).trim().to_string();
if !stderr.is_empty() {
warn!("{PUBLIC_TOOL_NAME} runner stderr: {stderr}");
}
}
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stderr: {err}");
}
}
});
let stdout_task = tokio::spawn({
let response_waiters = Arc::clone(&response_waiters);
async move {
let mut stdout_lines = BufReader::new(stdout).lines();
loop {
let line = match stdout_lines.next_line().await {
Ok(line) => line,
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stdout: {err}");
break;
}
};
let Some(line) = line else {
break;
};
if line.trim().is_empty() {
continue;
}
let message: NodeToHostMessage = match serde_json::from_str(&line) {
Ok(message) => message,
Err(err) => {
warn!("failed to parse {PUBLIC_TOOL_NAME} stdout message: {err}");
break;
}
};
match message {
message @ (NodeToHostMessage::ToolCall { .. }
| NodeToHostMessage::Notify { .. }) => {
let _ = message_tx.send(message);
}
message => {
if let Some(request_id) = message_request_id(&message)
&& let Some(waiter) = response_waiters.lock().await.remove(request_id)
{
let _ = waiter.send(message);
}
}
}
}
response_waiters.lock().await.clear();
}
});
Ok(CodeModeProcess {
child,
stdin,
stdout_task,
response_waiters,
message_rx: Arc::new(Mutex::new(message_rx)),
})
}
pub(super) async fn write_message(
stdin: &Arc<Mutex<tokio::process::ChildStdin>>,
message: &HostToNodeMessage,
) -> Result<(), std::io::Error> {
let line = serde_json::to_string(message).map_err(std::io::Error::other)?;
let mut stdin = stdin.lock().await;
stdin.write_all(line.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}

View File

@@ -1,169 +0,0 @@
use std::collections::HashMap;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use super::CODE_MODE_BRIDGE_SOURCE;
use super::PUBLIC_TOOL_NAME;
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub(super) enum CodeModeToolKind {
Function,
Freeform,
}
#[derive(Clone, Debug, Serialize)]
pub(super) struct EnabledTool {
pub(super) tool_name: String,
pub(super) global_name: String,
#[serde(rename = "module")]
pub(super) module_path: String,
pub(super) namespace: Vec<String>,
pub(super) name: String,
pub(super) description: String,
pub(super) kind: CodeModeToolKind,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(super) struct CodeModeToolCall {
pub(super) request_id: String,
pub(super) id: String,
pub(super) name: String,
#[serde(default)]
pub(super) input: Option<JsonValue>,
}
#[derive(Clone, Debug, Deserialize)]
pub(super) struct CodeModeNotify {
pub(super) cell_id: String,
pub(super) call_id: String,
pub(super) text: String,
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum HostToNodeMessage {
Start {
request_id: String,
cell_id: String,
tool_call_id: String,
default_yield_time_ms: u64,
enabled_tools: Vec<EnabledTool>,
stored_values: HashMap<String, JsonValue>,
source: String,
yield_time_ms: Option<u64>,
max_output_tokens: Option<usize>,
},
Poll {
request_id: String,
cell_id: String,
yield_time_ms: u64,
},
Terminate {
request_id: String,
cell_id: String,
},
Response {
request_id: String,
id: String,
code_mode_result: JsonValue,
#[serde(default)]
error_text: Option<String>,
},
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum NodeToHostMessage {
ToolCall {
#[serde(flatten)]
tool_call: CodeModeToolCall,
},
Yielded {
request_id: String,
content_items: Vec<JsonValue>,
},
Terminated {
request_id: String,
content_items: Vec<JsonValue>,
},
Notify {
#[serde(flatten)]
notify: CodeModeNotify,
},
Result {
request_id: String,
content_items: Vec<JsonValue>,
stored_values: HashMap<String, JsonValue>,
#[serde(default)]
error_text: Option<String>,
#[serde(default)]
max_output_tokens_per_exec_call: Option<usize>,
},
}
pub(super) fn build_source(
user_code: &str,
enabled_tools: &[EnabledTool],
) -> Result<String, String> {
let enabled_tools_json = serde_json::to_string(enabled_tools)
.map_err(|err| format!("failed to serialize enabled tools: {err}"))?;
Ok(CODE_MODE_BRIDGE_SOURCE
.replace(
"__CODE_MODE_ENABLED_TOOLS_PLACEHOLDER__",
&enabled_tools_json,
)
.replace("__CODE_MODE_USER_CODE_PLACEHOLDER__", user_code))
}
pub(super) fn message_request_id(message: &NodeToHostMessage) -> Option<&str> {
match message {
NodeToHostMessage::ToolCall { .. } => None,
NodeToHostMessage::Yielded { request_id, .. }
| NodeToHostMessage::Terminated { request_id, .. }
| NodeToHostMessage::Result { request_id, .. } => Some(request_id),
NodeToHostMessage::Notify { .. } => None,
}
}
pub(super) fn unexpected_tool_call_error() -> String {
format!("{PUBLIC_TOOL_NAME} received an unexpected tool call response")
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::CodeModeNotify;
use super::NodeToHostMessage;
use super::message_request_id;
#[test]
fn message_request_id_absent_for_notify() {
let message = NodeToHostMessage::Notify {
notify: CodeModeNotify {
cell_id: "1".to_string(),
call_id: "call-1".to_string(),
text: "hello".to_string(),
},
};
assert_eq!(None, message_request_id(&message));
}
#[test]
fn message_request_id_present_for_result() {
let message = NodeToHostMessage::Result {
request_id: "req-1".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: None,
max_output_tokens_per_exec_call: None,
};
assert_eq!(Some("req-1"), message_request_id(&message));
}
}

View File

@@ -0,0 +1,44 @@
use codex_code_mode::ImageDetail as CodeModeImageDetail;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ImageDetail;
trait IntoProtocol<T> {
fn into_protocol(self) -> T;
}
pub(super) fn into_function_call_output_content_items(
items: Vec<codex_code_mode::FunctionCallOutputContentItem>,
) -> Vec<FunctionCallOutputContentItem> {
items.into_iter().map(IntoProtocol::into_protocol).collect()
}
impl IntoProtocol<ImageDetail> for CodeModeImageDetail {
fn into_protocol(self) -> ImageDetail {
let value = self;
match value {
CodeModeImageDetail::Auto => ImageDetail::Auto,
CodeModeImageDetail::Low => ImageDetail::Low,
CodeModeImageDetail::High => ImageDetail::High,
CodeModeImageDetail::Original => ImageDetail::Original,
}
}
}
impl IntoProtocol<FunctionCallOutputContentItem>
for codex_code_mode::FunctionCallOutputContentItem
{
fn into_protocol(self) -> FunctionCallOutputContentItem {
let value = self;
match value {
codex_code_mode::FunctionCallOutputContentItem::InputText { text } => {
FunctionCallOutputContentItem::InputText { text }
}
codex_code_mode::FunctionCallOutputContentItem::InputImage { image_url, detail } => {
FunctionCallOutputContentItem::InputImage {
image_url,
detail: detail.map(IntoProtocol::into_protocol),
}
}
}
}
}

View File

@@ -1,938 +0,0 @@
'use strict';
const readline = require('node:readline');
const { Worker } = require('node:worker_threads');
const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL = 10000;
function normalizeMaxOutputTokensPerExecCall(value) {
if (!Number.isSafeInteger(value) || value < 0) {
throw new TypeError('max_output_tokens_per_exec_call must be a non-negative safe integer');
}
return value;
}
function normalizeYieldTime(value) {
if (!Number.isSafeInteger(value) || value < 0) {
throw new TypeError('yield_time must be a non-negative safe integer');
}
return value;
}
function formatErrorText(error) {
return String(error && error.stack ? error.stack : error);
}
function cloneJsonValue(value) {
return JSON.parse(JSON.stringify(value));
}
function clearTimer(timer) {
if (timer !== null) {
clearTimeout(timer);
}
return null;
}
function takeContentItems(session) {
const clonedContentItems = cloneJsonValue(session.content_items);
session.content_items.splice(0, session.content_items.length);
return Array.isArray(clonedContentItems) ? clonedContentItems : [];
}
function codeModeWorkerMain() {
'use strict';
const { parentPort, workerData } = require('node:worker_threads');
const vm = require('node:vm');
const { SourceTextModule, SyntheticModule } = vm;
function formatErrorText(error) {
return String(error && error.stack ? error.stack : error);
}
function cloneJsonValue(value) {
return JSON.parse(JSON.stringify(value));
}
class CodeModeExitSignal extends Error {
constructor() {
super('code mode exit');
this.name = 'CodeModeExitSignal';
}
}
function isCodeModeExitSignal(error) {
return error instanceof CodeModeExitSignal;
}
function createToolCaller() {
let nextId = 0;
const pending = new Map();
parentPort.on('message', (message) => {
if (message.type === 'tool_response') {
const entry = pending.get(message.id);
if (!entry) {
return;
}
pending.delete(message.id);
entry.resolve(message.result ?? '');
return;
}
if (message.type === 'tool_response_error') {
const entry = pending.get(message.id);
if (!entry) {
return;
}
pending.delete(message.id);
entry.reject(new Error(message.error_text ?? 'tool call failed'));
return;
}
});
return (name, input) => {
const id = 'msg-' + ++nextId;
return new Promise((resolve, reject) => {
pending.set(id, { resolve, reject });
parentPort.postMessage({
type: 'tool_call',
id,
name: String(name),
input,
});
});
};
}
function createContentItems() {
const contentItems = [];
const push = contentItems.push.bind(contentItems);
contentItems.push = (...items) => {
for (const item of items) {
parentPort.postMessage({
type: 'content_item',
item: cloneJsonValue(item),
});
}
return push(...items);
};
parentPort.on('message', (message) => {
if (message.type === 'clear_content') {
contentItems.splice(0, contentItems.length);
}
});
return contentItems;
}
function createGlobalToolsNamespace(callTool, enabledTools) {
const tools = Object.create(null);
for (const { tool_name, global_name } of enabledTools) {
Object.defineProperty(tools, global_name, {
value: async (args) => callTool(tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createModuleToolsNamespace(callTool, enabledTools) {
const tools = Object.create(null);
for (const { tool_name, global_name } of enabledTools) {
Object.defineProperty(tools, global_name, {
value: async (args) => callTool(tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createAllToolsMetadata(enabledTools) {
return Object.freeze(
enabledTools.map(({ global_name, description }) =>
Object.freeze({
name: global_name,
description,
})
)
);
}
function createToolsModule(context, callTool, enabledTools) {
const tools = createModuleToolsNamespace(callTool, enabledTools);
const allTools = createAllToolsMetadata(enabledTools);
const exportNames = ['ALL_TOOLS'];
for (const { global_name } of enabledTools) {
if (global_name !== 'ALL_TOOLS') {
exportNames.push(global_name);
}
}
const uniqueExportNames = [...new Set(exportNames)];
return new SyntheticModule(
uniqueExportNames,
function initToolsModule() {
this.setExport('ALL_TOOLS', allTools);
for (const exportName of uniqueExportNames) {
if (exportName !== 'ALL_TOOLS') {
this.setExport(exportName, tools[exportName]);
}
}
},
{ context }
);
}
function ensureContentItems(context) {
if (!Array.isArray(context.__codexContentItems)) {
context.__codexContentItems = [];
}
return context.__codexContentItems;
}
function serializeOutputText(value) {
if (typeof value === 'string') {
return value;
}
if (
typeof value === 'undefined' ||
value === null ||
typeof value === 'boolean' ||
typeof value === 'number' ||
typeof value === 'bigint'
) {
return String(value);
}
const serialized = JSON.stringify(value);
if (typeof serialized === 'string') {
return serialized;
}
return String(value);
}
function normalizeOutputImage(value) {
let imageUrl;
let detail;
if (typeof value === 'string') {
imageUrl = value;
} else if (
value &&
typeof value === 'object' &&
!Array.isArray(value)
) {
if (typeof value.image_url === 'string') {
imageUrl = value.image_url;
}
if (typeof value.detail === 'string') {
detail = value.detail;
} else if (
Object.prototype.hasOwnProperty.call(value, 'detail') &&
value.detail !== null &&
typeof value.detail !== 'undefined'
) {
throw new TypeError('image detail must be a string when provided');
}
}
if (typeof imageUrl !== 'string' || !imageUrl) {
throw new TypeError(
'image expects a non-empty image URL string or an object with image_url and optional detail'
);
}
if (!/^(?:https?:\/\/|data:)/i.test(imageUrl)) {
throw new TypeError('image expects an http(s) or data URL');
}
if (typeof detail !== 'undefined' && !/^(?:auto|low|high|original)$/i.test(detail)) {
throw new TypeError('image detail must be one of: auto, low, high, original');
}
const normalized = { image_url: imageUrl };
if (typeof detail === 'string') {
normalized.detail = detail.toLowerCase();
}
return normalized;
}
function createCodeModeHelpers(context, state, toolCallId) {
const load = (key) => {
if (typeof key !== 'string') {
throw new TypeError('load key must be a string');
}
if (!Object.prototype.hasOwnProperty.call(state.storedValues, key)) {
return undefined;
}
return cloneJsonValue(state.storedValues[key]);
};
const store = (key, value) => {
if (typeof key !== 'string') {
throw new TypeError('store key must be a string');
}
state.storedValues[key] = cloneJsonValue(value);
};
const text = (value) => {
const item = {
type: 'input_text',
text: serializeOutputText(value),
};
ensureContentItems(context).push(item);
return item;
};
const image = (value) => {
const item = Object.assign({ type: 'input_image' }, normalizeOutputImage(value));
ensureContentItems(context).push(item);
return item;
};
const yieldControl = () => {
parentPort.postMessage({ type: 'yield' });
};
const notify = (value) => {
const text = serializeOutputText(value);
if (text.trim().length === 0) {
throw new TypeError('notify expects non-empty text');
}
if (typeof toolCallId !== 'string' || toolCallId.length === 0) {
throw new TypeError('notify requires a valid tool call id');
}
parentPort.postMessage({
type: 'notify',
call_id: toolCallId,
text,
});
return text;
};
const exit = () => {
throw new CodeModeExitSignal();
};
return Object.freeze({
exit,
image,
load,
notify,
output_image: image,
output_text: text,
store,
text,
yield_control: yieldControl,
});
}
function createCodeModeModule(context, helpers) {
return new SyntheticModule(
[
'exit',
'image',
'load',
'notify',
'output_text',
'output_image',
'store',
'text',
'yield_control',
],
function initCodeModeModule() {
this.setExport('exit', helpers.exit);
this.setExport('image', helpers.image);
this.setExport('load', helpers.load);
this.setExport('notify', helpers.notify);
this.setExport('output_text', helpers.output_text);
this.setExport('output_image', helpers.output_image);
this.setExport('store', helpers.store);
this.setExport('text', helpers.text);
this.setExport('yield_control', helpers.yield_control);
},
{ context }
);
}
function createBridgeRuntime(callTool, enabledTools, helpers) {
return Object.freeze({
ALL_TOOLS: createAllToolsMetadata(enabledTools),
exit: helpers.exit,
image: helpers.image,
load: helpers.load,
notify: helpers.notify,
store: helpers.store,
text: helpers.text,
tools: createGlobalToolsNamespace(callTool, enabledTools),
yield_control: helpers.yield_control,
});
}
function namespacesMatch(left, right) {
if (left.length !== right.length) {
return false;
}
return left.every((segment, index) => segment === right[index]);
}
function createNamespacedToolsNamespace(callTool, enabledTools, namespace) {
const tools = Object.create(null);
for (const tool of enabledTools) {
const toolNamespace = Array.isArray(tool.namespace) ? tool.namespace : [];
if (!namespacesMatch(toolNamespace, namespace)) {
continue;
}
Object.defineProperty(tools, tool.name, {
value: async (args) => callTool(tool.tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createNamespacedToolsModule(context, callTool, enabledTools, namespace) {
const tools = createNamespacedToolsNamespace(callTool, enabledTools, namespace);
const exportNames = [];
for (const exportName of Object.keys(tools)) {
if (exportName !== 'ALL_TOOLS') {
exportNames.push(exportName);
}
}
const uniqueExportNames = [...new Set(exportNames)];
return new SyntheticModule(
uniqueExportNames,
function initNamespacedToolsModule() {
for (const exportName of uniqueExportNames) {
this.setExport(exportName, tools[exportName]);
}
},
{ context }
);
}
function createModuleResolver(context, callTool, enabledTools, helpers) {
let toolsModule;
let codeModeModule;
const namespacedModules = new Map();
return function resolveModule(specifier) {
if (specifier === 'tools.js') {
toolsModule ??= createToolsModule(context, callTool, enabledTools);
return toolsModule;
}
if (specifier === '@openai/code_mode' || specifier === 'openai/code_mode') {
codeModeModule ??= createCodeModeModule(context, helpers);
return codeModeModule;
}
const namespacedMatch = /^tools\/(.+)\.js$/.exec(specifier);
if (!namespacedMatch) {
throw new Error('Unsupported import in exec: ' + specifier);
}
const namespace = namespacedMatch[1]
.split('/')
.filter((segment) => segment.length > 0);
if (namespace.length === 0) {
throw new Error('Unsupported import in exec: ' + specifier);
}
const cacheKey = namespace.join('/');
if (!namespacedModules.has(cacheKey)) {
namespacedModules.set(
cacheKey,
createNamespacedToolsModule(context, callTool, enabledTools, namespace)
);
}
return namespacedModules.get(cacheKey);
};
}
async function resolveDynamicModule(specifier, resolveModule) {
const module = resolveModule(specifier);
if (module.status === 'unlinked') {
await module.link(resolveModule);
}
if (module.status === 'linked' || module.status === 'evaluating') {
await module.evaluate();
}
if (module.status === 'errored') {
throw module.error;
}
return module;
}
async function runModule(context, start, callTool, helpers) {
const resolveModule = createModuleResolver(
context,
callTool,
start.enabled_tools ?? [],
helpers
);
const mainModule = new SourceTextModule(start.source, {
context,
identifier: 'exec_main.mjs',
importModuleDynamically: async (specifier) =>
resolveDynamicModule(specifier, resolveModule),
});
await mainModule.link(resolveModule);
await mainModule.evaluate();
}
async function main() {
const start = workerData ?? {};
const toolCallId = start.tool_call_id;
const state = {
storedValues: cloneJsonValue(start.stored_values ?? {}),
};
const callTool = createToolCaller();
const enabledTools = start.enabled_tools ?? [];
const contentItems = createContentItems();
const context = vm.createContext({
__codexContentItems: contentItems,
});
const helpers = createCodeModeHelpers(context, state, toolCallId);
Object.defineProperty(context, '__codexRuntime', {
value: createBridgeRuntime(callTool, enabledTools, helpers),
configurable: true,
enumerable: false,
writable: false,
});
parentPort.postMessage({ type: 'started' });
try {
await runModule(context, start, callTool, helpers);
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
});
} catch (error) {
if (isCodeModeExitSignal(error)) {
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
});
return;
}
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
error_text: formatErrorText(error),
});
}
}
void main().catch((error) => {
parentPort.postMessage({
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
}
function createProtocol() {
const rl = readline.createInterface({
input: process.stdin,
crlfDelay: Infinity,
});
let nextId = 0;
const pending = new Map();
const sessions = new Map();
let closedResolve;
const closed = new Promise((resolve) => {
closedResolve = resolve;
});
rl.on('line', (line) => {
if (!line.trim()) {
return;
}
let message;
try {
message = JSON.parse(line);
} catch (error) {
process.stderr.write(formatErrorText(error) + '\n');
return;
}
if (message.type === 'start') {
startSession(protocol, sessions, message);
return;
}
if (message.type === 'poll') {
const session = sessions.get(message.cell_id);
if (session) {
session.request_id = String(message.request_id);
if (session.pending_result) {
void completeSession(protocol, sessions, session, session.pending_result);
} else {
schedulePollYield(protocol, session, normalizeYieldTime(message.yield_time_ms ?? 0));
}
} else {
void protocol.send({
type: 'result',
request_id: message.request_id,
content_items: [],
stored_values: {},
error_text: `exec cell ${message.cell_id} not found`,
max_output_tokens_per_exec_call: DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL,
});
}
return;
}
if (message.type === 'terminate') {
const session = sessions.get(message.cell_id);
if (session) {
session.request_id = String(message.request_id);
void terminateSession(protocol, sessions, session);
} else {
void protocol.send({
type: 'result',
request_id: message.request_id,
content_items: [],
stored_values: {},
error_text: `exec cell ${message.cell_id} not found`,
max_output_tokens_per_exec_call: DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL,
});
}
return;
}
if (message.type === 'response') {
const entry = pending.get(message.request_id + ':' + message.id);
if (!entry) {
return;
}
pending.delete(message.request_id + ':' + message.id);
if (typeof message.error_text === 'string') {
entry.reject(new Error(message.error_text));
return;
}
entry.resolve(message.code_mode_result ?? '');
return;
}
process.stderr.write('Unknown protocol message type: ' + message.type + '\n');
});
rl.on('close', () => {
const error = new Error('stdin closed');
for (const entry of pending.values()) {
entry.reject(error);
}
pending.clear();
for (const session of sessions.values()) {
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
void session.worker.terminate().catch(() => {});
}
sessions.clear();
closedResolve();
});
function send(message) {
return new Promise((resolve, reject) => {
process.stdout.write(JSON.stringify(message) + '\n', (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
function request(type, payload) {
const requestId = 'req-' + ++nextId;
const id = 'msg-' + ++nextId;
const pendingKey = requestId + ':' + id;
return new Promise((resolve, reject) => {
pending.set(pendingKey, { resolve, reject });
void send({ type, request_id: requestId, id, ...payload }).catch((error) => {
pending.delete(pendingKey);
reject(error);
});
});
}
const protocol = { closed, request, send };
return protocol;
}
function sessionWorkerSource() {
return '(' + codeModeWorkerMain.toString() + ')();';
}
function startSession(protocol, sessions, start) {
if (typeof start.tool_call_id !== 'string' || start.tool_call_id.length === 0) {
throw new TypeError('start requires a valid tool_call_id');
}
const maxOutputTokensPerExecCall =
start.max_output_tokens == null
? DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL
: normalizeMaxOutputTokensPerExecCall(start.max_output_tokens);
const session = {
completed: false,
content_items: [],
default_yield_time_ms: normalizeYieldTime(start.default_yield_time_ms),
id: start.cell_id,
initial_yield_time_ms:
start.yield_time_ms == null
? normalizeYieldTime(start.default_yield_time_ms)
: normalizeYieldTime(start.yield_time_ms),
initial_yield_timer: null,
initial_yield_triggered: false,
max_output_tokens_per_exec_call: maxOutputTokensPerExecCall,
pending_result: null,
poll_yield_timer: null,
request_id: String(start.request_id),
worker: new Worker(sessionWorkerSource(), {
eval: true,
workerData: start,
}),
};
sessions.set(session.id, session);
session.worker.on('message', (message) => {
void handleWorkerMessage(protocol, sessions, session, message).catch((error) => {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
});
session.worker.on('error', (error) => {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
session.worker.on('exit', (code) => {
if (code !== 0 && !session.completed) {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: 'exec worker exited with code ' + code,
});
}
});
}
async function handleWorkerMessage(protocol, sessions, session, message) {
if (session.completed) {
return;
}
if (message.type === 'content_item') {
session.content_items.push(cloneJsonValue(message.item));
return;
}
if (message.type === 'started') {
scheduleInitialYield(protocol, session, session.initial_yield_time_ms);
return;
}
if (message.type === 'yield') {
void sendYielded(protocol, session);
return;
}
if (message.type === 'notify') {
if (typeof message.text !== 'string' || message.text.trim().length === 0) {
throw new TypeError('notify requires non-empty text');
}
if (typeof message.call_id !== 'string' || message.call_id.length === 0) {
throw new TypeError('notify requires a valid call id');
}
await protocol.send({
type: 'notify',
cell_id: session.id,
call_id: message.call_id,
text: message.text,
});
return;
}
if (message.type === 'tool_call') {
void forwardToolCall(protocol, session, message);
return;
}
if (message.type === 'result') {
const result = {
type: 'result',
stored_values: cloneJsonValue(message.stored_values ?? {}),
error_text:
typeof message.error_text === 'string' ? message.error_text : undefined,
};
if (session.request_id === null) {
session.pending_result = result;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
return;
}
await completeSession(protocol, sessions, session, result);
return;
}
process.stderr.write('Unknown worker message type: ' + message.type + '\n');
}
async function forwardToolCall(protocol, session, message) {
try {
const result = await protocol.request('tool_call', {
name: String(message.name),
input: message.input,
});
if (session.completed) {
return;
}
try {
session.worker.postMessage({
type: 'tool_response',
id: message.id,
result,
});
} catch {}
} catch (error) {
if (session.completed) {
return;
}
try {
session.worker.postMessage({
type: 'tool_response_error',
id: message.id,
error_text: formatErrorText(error),
});
} catch {}
}
}
async function sendYielded(protocol, session) {
if (session.completed || session.request_id === null) {
return;
}
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.initial_yield_triggered = true;
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
const contentItems = takeContentItems(session);
const requestId = session.request_id;
try {
session.worker.postMessage({ type: 'clear_content' });
} catch {}
await protocol.send({
type: 'yielded',
request_id: requestId,
content_items: contentItems,
});
session.request_id = null;
}
function scheduleInitialYield(protocol, session, yieldTime) {
if (session.completed || session.initial_yield_triggered) {
return yieldTime;
}
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.initial_yield_timer = setTimeout(() => {
session.initial_yield_timer = null;
session.initial_yield_triggered = true;
void sendYielded(protocol, session);
}, yieldTime);
return yieldTime;
}
function schedulePollYield(protocol, session, yieldTime) {
if (session.completed) {
return;
}
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
session.poll_yield_timer = setTimeout(() => {
session.poll_yield_timer = null;
void sendYielded(protocol, session);
}, yieldTime);
}
async function completeSession(protocol, sessions, session, message) {
if (session.completed) {
return;
}
if (session.request_id === null) {
session.pending_result = message;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
return;
}
const requestId = session.request_id;
session.completed = true;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
sessions.delete(session.id);
const contentItems = takeContentItems(session);
session.pending_result = null;
try {
session.worker.postMessage({ type: 'clear_content' });
} catch {}
await protocol.send({
...message,
request_id: requestId,
content_items: contentItems,
max_output_tokens_per_exec_call: session.max_output_tokens_per_exec_call,
});
}
async function terminateSession(protocol, sessions, session) {
if (session.completed) {
return;
}
session.completed = true;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
sessions.delete(session.id);
const contentItems = takeContentItems(session);
try {
await session.worker.terminate();
} catch {}
await protocol.send({
type: 'terminated',
request_id: session.request_id,
content_items: contentItems,
});
}
async function main() {
const protocol = createProtocol();
await protocol.closed;
}
void main().catch(async (error) => {
try {
process.stderr.write(formatErrorText(error) + '\n');
} finally {
process.exitCode = 1;
}
});

View File

@@ -1,108 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tracing::warn;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::js_repl::resolve_compatible_node;
use crate::tools::parallel::ToolCallRuntime;
use codex_features::Feature;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::process::CodeModeProcess;
use super::process::spawn_code_mode_process;
use super::worker::CodeModeWorker;
pub(crate) struct CodeModeService {
js_repl_node_path: Option<PathBuf>,
stored_values: Mutex<HashMap<String, JsonValue>>,
process: Arc<Mutex<Option<CodeModeProcess>>>,
next_cell_id: Mutex<u64>,
}
impl CodeModeService {
pub(crate) fn new(js_repl_node_path: Option<PathBuf>) -> Self {
Self {
js_repl_node_path,
stored_values: Mutex::new(HashMap::new()),
process: Arc::new(Mutex::new(None)),
next_cell_id: Mutex::new(1),
}
}
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.stored_values.lock().await.clone()
}
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.stored_values.lock().await = values;
}
pub(super) async fn ensure_started(
&self,
) -> Result<tokio::sync::OwnedMutexGuard<Option<CodeModeProcess>>, std::io::Error> {
let mut process_slot = self.process.lock().await;
let needs_spawn = match process_slot.as_mut() {
Some(process) => !matches!(process.has_exited(), Ok(false)),
None => true,
};
if needs_spawn {
let node_path = resolve_compatible_node(self.js_repl_node_path.as_deref())
.await
.map_err(std::io::Error::other)?;
*process_slot = Some(spawn_code_mode_process(&node_path).await?);
}
drop(process_slot);
Ok(self.process.clone().lock_owned().await)
}
pub(crate) async fn start_turn_worker(
&self,
session: &Arc<Session>,
turn: &Arc<TurnContext>,
router: Arc<ToolRouter>,
tracker: SharedTurnDiffTracker,
) -> Option<CodeModeWorker> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}
let exec = ExecContext {
session: Arc::clone(session),
turn: Arc::clone(turn),
};
let tool_runtime =
ToolCallRuntime::new(router, Arc::clone(session), Arc::clone(turn), tracker);
let mut process_slot = match self.ensure_started().await {
Ok(process_slot) => process_slot,
Err(err) => {
warn!("failed to start {PUBLIC_TOOL_NAME} worker for turn: {err}");
return None;
}
};
let Some(process) = process_slot.as_mut() else {
warn!(
"failed to start {PUBLIC_TOOL_NAME} worker for turn: {PUBLIC_TOOL_NAME} runner failed to start"
);
return None;
};
Some(process.worker(exec, tool_runtime))
}
pub(crate) async fn allocate_cell_id(&self) -> String {
let mut next_cell_id = self.next_cell_id.lock().await;
let cell_id = *next_cell_id;
*next_cell_id = next_cell_id.saturating_add(1);
cell_id.to_string()
}
pub(crate) async fn allocate_request_id(&self) -> String {
uuid::Uuid::new_v4().to_string()
}
}

View File

@@ -1,8 +0,0 @@
- Use `wait` only after `exec` returns `Script running with cell ID ...`.
- `cell_id` identifies the running `exec` cell to resume.
- `yield_time_ms` controls how long to wait for more output before yielding again. If omitted, `wait` uses its default wait timeout.
- `max_tokens` limits how much new output this wait call returns.
- `terminate: true` stops the running cell instead of waiting for more output.
- `wait` returns only the new output since the last yield, or the final completion or termination result for that cell.
- If the cell is still running, `wait` may yield again with the same `cell_id`.
- If the cell has already finished, `wait` returns the completed result and closes the cell.

View File

@@ -8,13 +8,10 @@ use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use super::CodeModeSessionProgress;
use super::DEFAULT_WAIT_YIELD_TIME_MS;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::WAIT_TOOL_NAME;
use super::handle_node_message;
use super::protocol::HostToNodeMessage;
use super::handle_runtime_response;
pub struct CodeModeWaitHandler;
@@ -63,66 +60,21 @@ impl ToolHandler for CodeModeWaitHandler {
ToolPayload::Function { arguments } if tool_name == WAIT_TOOL_NAME => {
let args: ExecWaitArgs = parse_arguments(&arguments)?;
let exec = ExecContext { session, turn };
let request_id = exec
.session
.services
.code_mode_service
.allocate_request_id()
.await;
let started_at = std::time::Instant::now();
let message = if args.terminate {
HostToNodeMessage::Terminate {
request_id: request_id.clone(),
cell_id: args.cell_id.clone(),
}
} else {
HostToNodeMessage::Poll {
request_id: request_id.clone(),
cell_id: args.cell_id.clone(),
yield_time_ms: args.yield_time_ms,
}
};
let process_slot = exec
let response = exec
.session
.services
.code_mode_service
.ensure_started()
.wait(codex_code_mode::WaitRequest {
cell_id: args.cell_id,
yield_time_ms: args.yield_time_ms,
terminate: args.terminate,
})
.await
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
if !matches!(process.has_exited(), Ok(false)) {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
}
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(
&exec,
args.cell_id,
message,
Some(args.max_tokens),
started_at,
)
.map_err(FunctionCallError::RespondToModel)?;
handle_runtime_response(&exec, response, Some(args.max_tokens), started_at)
.await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
.map_err(FunctionCallError::RespondToModel)
}
_ => Err(FunctionCallError::RespondToModel(format!(
"{WAIT_TOOL_NAME} expects JSON arguments"

View File

@@ -1,116 +0,0 @@
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::warn;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::call_nested_tool;
use super::process::CodeModeProcess;
use super::process::write_message;
use super::protocol::HostToNodeMessage;
use super::protocol::NodeToHostMessage;
use crate::tools::parallel::ToolCallRuntime;
pub(crate) struct CodeModeWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl Drop for CodeModeWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
impl CodeModeProcess {
pub(super) fn worker(
&self,
exec: ExecContext,
tool_runtime: ToolCallRuntime,
) -> CodeModeWorker {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let stdin = self.stdin.clone();
let message_rx = self.message_rx.clone();
tokio::spawn(async move {
loop {
let next_message = tokio::select! {
_ = &mut shutdown_rx => break,
message = async {
let mut message_rx = message_rx.lock().await;
message_rx.recv().await
} => message,
};
let Some(next_message) = next_message else {
break;
};
match next_message {
NodeToHostMessage::ToolCall { tool_call } => {
let exec = exec.clone();
let tool_runtime = tool_runtime.clone();
let stdin = stdin.clone();
tokio::spawn(async move {
let result = call_nested_tool(
exec,
tool_runtime,
tool_call.name,
tool_call.input,
CancellationToken::new(),
)
.await;
let (code_mode_result, error_text) = match result {
Ok(code_mode_result) => (code_mode_result, None),
Err(error) => (serde_json::Value::Null, Some(error.to_string())),
};
let response = HostToNodeMessage::Response {
request_id: tool_call.request_id,
id: tool_call.id,
code_mode_result,
error_text,
};
if let Err(err) = write_message(&stdin, &response).await {
warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}");
}
});
}
NodeToHostMessage::Notify { notify } => {
if notify.text.trim().is_empty() {
continue;
}
if exec
.session
.inject_response_items(vec![ResponseInputItem::CustomToolCallOutput {
call_id: notify.call_id.clone(),
name: Some(PUBLIC_TOOL_NAME.to_string()),
output: FunctionCallOutputPayload::from_text(notify.text),
}])
.await
.is_err()
{
warn!(
"failed to inject {PUBLIC_TOOL_NAME} notify message for cell {}: no active turn",
notify.cell_id
);
}
}
unexpected_message @ (NodeToHostMessage::Yielded { .. }
| NodeToHostMessage::Terminated { .. }
| NodeToHostMessage::Result { .. }) => {
error!(
"received unexpected {PUBLIC_TOOL_NAME} message in worker loop: {unexpected_message:?}"
);
break;
}
}
}
});
CodeModeWorker {
shutdown_tx: Some(shutdown_tx),
}
}
}

View File

@@ -1,30 +1,11 @@
use crate::client_common::tools::ToolSpec;
use crate::mcp::split_qualified_tool_name;
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
use serde_json::Value as JsonValue;
pub(crate) struct CodeModeToolReference {
pub(crate) module_path: String,
pub(crate) namespace: Vec<String>,
pub(crate) tool_key: String,
}
pub(crate) fn code_mode_tool_reference(tool_name: &str) -> CodeModeToolReference {
if let Some((server_name, tool_key)) = split_qualified_tool_name(tool_name) {
let namespace = vec!["mcp".to_string(), server_name];
return CodeModeToolReference {
module_path: format!("tools/{}.js", namespace.join("/")),
namespace,
tool_key,
};
}
CodeModeToolReference {
module_path: "tools.js".to_string(),
namespace: Vec::new(),
tool_key: tool_name.to_string(),
}
}
#[allow(unused_imports)]
#[cfg(test)]
pub(crate) use codex_code_mode::append_code_mode_sample;
#[allow(unused_imports)]
#[cfg(test)]
pub(crate) use codex_code_mode::render_json_schema_to_typescript;
pub(crate) fn augment_tool_spec_for_code_mode(spec: ToolSpec, code_mode_enabled: bool) -> ToolSpec {
if !code_mode_enabled {
@@ -32,268 +13,39 @@ pub(crate) fn augment_tool_spec_for_code_mode(spec: ToolSpec, code_mode_enabled:
}
match spec {
ToolSpec::Function(mut tool) => {
if tool.name != PUBLIC_TOOL_NAME {
tool.description = append_code_mode_sample(
&tool.description,
&tool.name,
"args",
serde_json::to_value(&tool.parameters)
.ok()
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
tool.output_schema
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
);
}
ToolSpec::Function(tool)
ToolSpec::Function(tool) => {
let definition = codex_code_mode::ToolDefinition {
name: tool.name.clone(),
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Function,
input_schema: serde_json::to_value(&tool.parameters).ok(),
output_schema: tool.output_schema,
};
let definition = codex_code_mode::augment_tool_definition(definition);
ToolSpec::Function(crate::client_common::tools::ResponsesApiTool {
name: tool.name,
description: definition.description,
strict: tool.strict,
defer_loading: tool.defer_loading,
parameters: tool.parameters,
output_schema: definition.output_schema,
})
}
ToolSpec::Freeform(mut tool) => {
if tool.name != PUBLIC_TOOL_NAME {
tool.description = append_code_mode_sample(
&tool.description,
&tool.name,
"input",
"string".to_string(),
"unknown".to_string(),
);
}
ToolSpec::Freeform(tool)
ToolSpec::Freeform(tool) => {
let definition = codex_code_mode::ToolDefinition {
name: tool.name.clone(),
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Freeform,
input_schema: None,
output_schema: None,
};
let definition = codex_code_mode::augment_tool_definition(definition);
ToolSpec::Freeform(crate::client_common::tools::FreeformTool {
name: tool.name,
description: definition.description,
format: tool.format,
})
}
other => other,
}
}
fn append_code_mode_sample(
description: &str,
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let declaration = format!(
"declare const tools: {{ {} }};",
render_code_mode_tool_declaration(tool_name, input_name, input_type, output_type)
);
format!("{description}\n\nexec tool declaration:\n```ts\n{declaration}\n```")
}
fn render_code_mode_tool_declaration(
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let tool_name = normalize_code_mode_identifier(tool_name);
format!("{tool_name}({input_name}: {input_type}): Promise<{output_type}>;")
}
pub(crate) fn normalize_code_mode_identifier(tool_key: &str) -> String {
let mut identifier = String::new();
for (index, ch) in tool_key.chars().enumerate() {
let is_valid = if index == 0 {
ch == '_' || ch == '$' || ch.is_ascii_alphabetic()
} else {
ch == '_' || ch == '$' || ch.is_ascii_alphanumeric()
};
if is_valid {
identifier.push(ch);
} else {
identifier.push('_');
}
}
if identifier.is_empty() {
"_".to_string()
} else {
identifier
}
}
fn render_json_schema_to_typescript(schema: &JsonValue) -> String {
render_json_schema_to_typescript_inner(schema)
}
fn render_json_schema_to_typescript_inner(schema: &JsonValue) -> String {
match schema {
JsonValue::Bool(true) => "unknown".to_string(),
JsonValue::Bool(false) => "never".to_string(),
JsonValue::Object(map) => {
if let Some(value) = map.get("const") {
return render_json_schema_literal(value);
}
if let Some(values) = map.get("enum").and_then(serde_json::Value::as_array) {
let rendered = values
.iter()
.map(render_json_schema_literal)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
for key in ["anyOf", "oneOf"] {
if let Some(variants) = map.get(key).and_then(serde_json::Value::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
}
if let Some(variants) = map.get("allOf").and_then(serde_json::Value::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" & ");
}
}
if let Some(schema_type) = map.get("type") {
if let Some(types) = schema_type.as_array() {
let rendered = types
.iter()
.filter_map(serde_json::Value::as_str)
.map(|schema_type| render_json_schema_type_keyword(map, schema_type))
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
if let Some(schema_type) = schema_type.as_str() {
return render_json_schema_type_keyword(map, schema_type);
}
}
if map.contains_key("properties")
|| map.contains_key("additionalProperties")
|| map.contains_key("required")
{
return render_json_schema_object(map);
}
if map.contains_key("items") || map.contains_key("prefixItems") {
return render_json_schema_array(map);
}
"unknown".to_string()
}
_ => "unknown".to_string(),
}
}
fn render_json_schema_type_keyword(
map: &serde_json::Map<String, JsonValue>,
schema_type: &str,
) -> String {
match schema_type {
"string" => "string".to_string(),
"number" | "integer" => "number".to_string(),
"boolean" => "boolean".to_string(),
"null" => "null".to_string(),
"array" => render_json_schema_array(map),
"object" => render_json_schema_object(map),
_ => "unknown".to_string(),
}
}
fn render_json_schema_array(map: &serde_json::Map<String, JsonValue>) -> String {
if let Some(items) = map.get("items") {
let item_type = render_json_schema_to_typescript_inner(items);
return format!("Array<{item_type}>");
}
if let Some(items) = map.get("prefixItems").and_then(serde_json::Value::as_array) {
let item_types = items
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !item_types.is_empty() {
return format!("[{}]", item_types.join(", "));
}
}
"unknown[]".to_string()
}
fn render_json_schema_object(map: &serde_json::Map<String, JsonValue>) -> String {
let required = map
.get("required")
.and_then(serde_json::Value::as_array)
.map(|items| {
items
.iter()
.filter_map(serde_json::Value::as_str)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let properties = map
.get("properties")
.and_then(serde_json::Value::as_object)
.cloned()
.unwrap_or_default();
let mut sorted_properties = properties.iter().collect::<Vec<_>>();
sorted_properties.sort_unstable_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
let mut lines = sorted_properties
.into_iter()
.map(|(name, value)| {
let optional = if required.iter().any(|required_name| required_name == name) {
""
} else {
"?"
};
let property_name = render_json_schema_property_name(name);
let property_type = render_json_schema_to_typescript_inner(value);
format!("{property_name}{optional}: {property_type};")
})
.collect::<Vec<_>>();
if let Some(additional_properties) = map.get("additionalProperties") {
let additional_type = match additional_properties {
JsonValue::Bool(true) => Some("unknown".to_string()),
JsonValue::Bool(false) => None,
value => Some(render_json_schema_to_typescript_inner(value)),
};
if let Some(additional_type) = additional_type {
lines.push(format!("[key: string]: {additional_type};"));
}
} else if properties.is_empty() {
lines.push("[key: string]: unknown;".to_string());
}
if lines.is_empty() {
return "{}".to_string();
}
format!("{{ {} }}", lines.join(" "))
}
fn render_json_schema_property_name(name: &str) -> String {
if normalize_code_mode_identifier(name) == name {
name.to_string()
} else {
serde_json::to_string(name).unwrap_or_else(|_| format!("\"{}\"", name.replace('"', "\\\"")))
}
}
fn render_json_schema_literal(value: &JsonValue) -> String {
serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string())
}
#[cfg(test)]
#[path = "code_mode_description_tests.rs"]
mod tests;

View File

@@ -2365,8 +2365,12 @@ async fn js_repl_imported_local_files_can_access_repl_globals() -> anyhow::Resul
return Ok(());
}
let Ok(node_path) = which::which("node") else {
return Ok(());
};
let cwd_dir = tempdir()?;
let expected_home_dir = serde_json::to_string("/tmp/codex-home")?;
let expected_home_dir = serde_json::to_string(&std::env::var("HOME").ok())?;
write_js_repl_test_module(
cwd_dir.path(),
"globals.js",
@@ -2376,20 +2380,11 @@ async fn js_repl_imported_local_files_can_access_repl_globals() -> anyhow::Resul
)?;
let (session, mut turn) = make_session_and_context().await;
session
.set_dependency_env(HashMap::from([(
"HOME".to_string(),
"/tmp/codex-home".to_string(),
)]))
.await;
turn.shell_environment_policy
.r#set
.remove("CODEX_JS_REPL_NODE_MODULE_DIRS");
turn.cwd = cwd_dir.path().to_path_buf();
turn.js_repl = Arc::new(JsReplHandle::with_node_path(
turn.config.js_repl_node_path.clone(),
Vec::new(),
));
turn.js_repl = Arc::new(JsReplHandle::with_node_path(Some(node_path), Vec::new()));
let session = Arc::new(session);
let turn = Arc::new(turn);