mirror of
https://github.com/openai/codex.git
synced 2026-03-19 20:36:30 +03:00
Compare commits
2 Commits
starr/exec
...
pakrym/cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1cc893f95 | ||
|
|
cf0785e99b |
@@ -15,6 +15,8 @@ use crate::models_manager::manager::ModelsManager;
|
||||
use crate::plugins::PluginsManager;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::state_db::StateDbHandle;
|
||||
use crate::tools::code_mode::CodeModeProcess;
|
||||
use crate::tools::code_mode::CodeModeYieldedSession;
|
||||
use crate::tools::network_approval::NetworkApprovalService;
|
||||
use crate::tools::runtimes::ExecveSessionApproval;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
@@ -31,12 +33,18 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub(crate) struct CodeModeStoreService {
|
||||
stored_values: Mutex<HashMap<String, JsonValue>>,
|
||||
process: Mutex<Option<Arc<Mutex<CodeModeProcess>>>>,
|
||||
yielded_sessions: Mutex<HashMap<i32, CodeModeYieldedSession>>,
|
||||
next_session_id: Mutex<i32>,
|
||||
}
|
||||
|
||||
impl Default for CodeModeStoreService {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stored_values: Mutex::new(HashMap::new()),
|
||||
process: Mutex::new(None),
|
||||
yielded_sessions: Mutex::new(HashMap::new()),
|
||||
next_session_id: Mutex::new(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -49,6 +57,35 @@ impl CodeModeStoreService {
|
||||
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
|
||||
*self.stored_values.lock().await = values;
|
||||
}
|
||||
|
||||
pub(crate) async fn store_process(&self, process: Arc<Mutex<CodeModeProcess>>) {
|
||||
*self.process.lock().await = Some(process);
|
||||
}
|
||||
|
||||
pub(crate) async fn process(&self) -> Option<Arc<Mutex<CodeModeProcess>>> {
|
||||
self.process.lock().await.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn allocate_session_id(&self) -> i32 {
|
||||
let mut next_session_id = self.next_session_id.lock().await;
|
||||
let session_id = *next_session_id;
|
||||
*next_session_id = next_session_id.saturating_add(1);
|
||||
session_id
|
||||
}
|
||||
|
||||
pub(crate) async fn store_yielded_session(&self, yielded_session: CodeModeYieldedSession) {
|
||||
self.yielded_sessions
|
||||
.lock()
|
||||
.await
|
||||
.insert(yielded_session.session_id, yielded_session);
|
||||
}
|
||||
|
||||
pub(crate) async fn take_yielded_session(
|
||||
&self,
|
||||
session_id: i32,
|
||||
) -> Option<CodeModeYieldedSession> {
|
||||
self.yielded_sessions.lock().await.remove(&session_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SessionServices {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -30,10 +31,13 @@ use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const CODE_MODE_RUNNER_SOURCE: &str = include_str!("code_mode_runner.cjs");
|
||||
const CODE_MODE_BRIDGE_SOURCE: &str = include_str!("code_mode_bridge.js");
|
||||
pub(crate) const PUBLIC_TOOL_NAME: &str = "exec";
|
||||
pub(crate) const WAIT_TOOL_NAME: &str = "exec_wait";
|
||||
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ExecContext {
|
||||
@@ -42,6 +46,43 @@ struct ExecContext {
|
||||
tracker: SharedTurnDiffTracker,
|
||||
}
|
||||
|
||||
pub(crate) struct CodeModeProcess {
|
||||
child: tokio::process::Child,
|
||||
stdin: tokio::process::ChildStdin,
|
||||
stdout_lines: tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
|
||||
stderr_task: Option<JoinHandle<String>>,
|
||||
pending_messages: HashMap<i32, VecDeque<NodeToHostMessage>>,
|
||||
}
|
||||
|
||||
impl CodeModeProcess {
|
||||
fn has_exited(&mut self) -> Result<bool, String> {
|
||||
self.child
|
||||
.try_wait()
|
||||
.map(|status| status.is_some())
|
||||
.map_err(|err| format!("failed to inspect {PUBLIC_TOOL_NAME} runner: {err}"))
|
||||
}
|
||||
|
||||
async fn wait_for_exit(&mut self) -> Result<std::process::ExitStatus, String> {
|
||||
self.child
|
||||
.wait()
|
||||
.await
|
||||
.map_err(|err| format!("failed to wait for {PUBLIC_TOOL_NAME} runner: {err}"))
|
||||
}
|
||||
|
||||
async fn stderr(&mut self) -> Result<String, String> {
|
||||
self.stderr_task
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} stderr collector missing"))?
|
||||
.await
|
||||
.map_err(|err| format!("failed to collect {PUBLIC_TOOL_NAME} stderr: {err}"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct CodeModeYieldedSession {
|
||||
pub(crate) session_id: i32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum CodeModeToolKind {
|
||||
@@ -63,12 +104,21 @@ struct EnabledTool {
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum HostToNodeMessage {
|
||||
Init {
|
||||
Start {
|
||||
session_id: i32,
|
||||
enabled_tools: Vec<EnabledTool>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
source: String,
|
||||
},
|
||||
Poll {
|
||||
session_id: i32,
|
||||
yield_time_ms: u64,
|
||||
},
|
||||
Terminate {
|
||||
session_id: i32,
|
||||
},
|
||||
Response {
|
||||
session_id: i32,
|
||||
id: String,
|
||||
code_mode_result: JsonValue,
|
||||
},
|
||||
@@ -78,12 +128,26 @@ enum HostToNodeMessage {
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum NodeToHostMessage {
|
||||
ToolCall {
|
||||
session_id: i32,
|
||||
id: String,
|
||||
name: String,
|
||||
#[serde(default)]
|
||||
input: Option<JsonValue>,
|
||||
},
|
||||
Yielded {
|
||||
session_id: i32,
|
||||
content_items: Vec<JsonValue>,
|
||||
#[serde(default)]
|
||||
max_output_tokens_per_exec_call: Option<usize>,
|
||||
},
|
||||
Terminated {
|
||||
session_id: i32,
|
||||
content_items: Vec<JsonValue>,
|
||||
#[serde(default)]
|
||||
max_output_tokens_per_exec_call: Option<usize>,
|
||||
},
|
||||
Result {
|
||||
session_id: i32,
|
||||
content_items: Vec<JsonValue>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
#[serde(default)]
|
||||
@@ -93,6 +157,36 @@ enum NodeToHostMessage {
|
||||
},
|
||||
}
|
||||
|
||||
enum CodeModeSessionAction {
|
||||
Start {
|
||||
enabled_tools: Vec<EnabledTool>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
source: String,
|
||||
},
|
||||
Poll {
|
||||
yield_time_ms: u64,
|
||||
max_output_tokens: Option<usize>,
|
||||
},
|
||||
Terminate {
|
||||
max_output_tokens: Option<usize>,
|
||||
},
|
||||
}
|
||||
|
||||
enum CodeModeSessionProgress {
|
||||
Finished(FunctionToolOutput),
|
||||
Yielded {
|
||||
output: FunctionToolOutput,
|
||||
yielded_session: CodeModeYieldedSession,
|
||||
},
|
||||
}
|
||||
|
||||
enum CodeModeExecutionStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Running(i32),
|
||||
Terminated,
|
||||
}
|
||||
|
||||
pub(crate) fn instructions(config: &Config) -> Option<String> {
|
||||
if !config.features.enabled(Feature::CodeMode) {
|
||||
return None;
|
||||
@@ -113,7 +207,10 @@ pub(crate) fn instructions(config: &Config) -> Option<String> {
|
||||
));
|
||||
section.push_str("- Import nested tools from `tools.js`, for example `import { exec_command } from \"tools.js\"` or `import { ALL_TOOLS } from \"tools.js\"` to inspect the available `{ module, name, description }` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import { append_notebook_logs_chart } from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values.\n");
|
||||
section.push_str(&format!(
|
||||
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution; the default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
|
||||
"- Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `@openai/code_mode` (or `\"openai/code_mode\"`). `output_text(value)` surfaces text back to the model and stringifies non-string objects with `JSON.stringify(...)` when possible. `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs. `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, and `load(key)` returns a cloned stored value or `undefined`. `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns; `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument instead and defaults to `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker.\n",
|
||||
));
|
||||
section.push_str(&format!(
|
||||
"- If `{PUBLIC_TOOL_NAME}` returns `Script running with session ID …`, call `{WAIT_TOOL_NAME}` with that `session_id` to keep waiting for more output, completion, or termination.\n",
|
||||
));
|
||||
section.push_str(
|
||||
"- Function tools require JSON object arguments. Freeform tools require raw strings.\n",
|
||||
@@ -138,20 +235,110 @@ pub(crate) async fn execute(
|
||||
let enabled_tools = build_enabled_tools(&exec).await;
|
||||
let stored_values = exec.session.services.code_mode_store.stored_values().await;
|
||||
let source = build_source(&code, &enabled_tools).map_err(FunctionCallError::RespondToModel)?;
|
||||
execute_node(exec, source, enabled_tools, stored_values)
|
||||
let session_id = exec
|
||||
.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.allocate_session_id()
|
||||
.await;
|
||||
let process = ensure_shared_code_mode_process(&exec)
|
||||
.await
|
||||
.map_err(FunctionCallError::RespondToModel)
|
||||
.map_err(FunctionCallError::RespondToModel)?;
|
||||
let result = {
|
||||
let mut process = process.lock().await;
|
||||
drive_code_mode_session(
|
||||
&exec,
|
||||
&mut process,
|
||||
session_id,
|
||||
CodeModeSessionAction::Start {
|
||||
enabled_tools,
|
||||
stored_values,
|
||||
source,
|
||||
},
|
||||
)
|
||||
.await
|
||||
};
|
||||
if let Ok(CodeModeSessionProgress::Yielded {
|
||||
yielded_session, ..
|
||||
}) = &result
|
||||
{
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.store_yielded_session(yielded_session.clone())
|
||||
.await;
|
||||
}
|
||||
match result {
|
||||
Ok(CodeModeSessionProgress::Finished(output))
|
||||
| Ok(CodeModeSessionProgress::Yielded { output, .. }) => Ok(output),
|
||||
Err(error) => Err(FunctionCallError::RespondToModel(error)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_node(
|
||||
exec: ExecContext,
|
||||
source: String,
|
||||
enabled_tools: Vec<EnabledTool>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
) -> Result<FunctionToolOutput, String> {
|
||||
let node_path = resolve_compatible_node(exec.turn.config.js_repl_node_path.as_deref()).await?;
|
||||
let started_at = std::time::Instant::now();
|
||||
pub(crate) async fn wait(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: SharedTurnDiffTracker,
|
||||
session_id: i32,
|
||||
yield_time_ms: u64,
|
||||
max_output_tokens: Option<usize>,
|
||||
terminate: bool,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let exec = ExecContext {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
};
|
||||
let yielded_session = exec
|
||||
.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.take_yielded_session(session_id)
|
||||
.await
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"{WAIT_TOOL_NAME} session_id {session_id} is not waiting on {WAIT_TOOL_NAME}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let process = existing_shared_code_mode_process(&exec).await?;
|
||||
let result = {
|
||||
let mut process = process.lock().await;
|
||||
drive_code_mode_session(
|
||||
&exec,
|
||||
&mut process,
|
||||
yielded_session.session_id,
|
||||
if terminate {
|
||||
CodeModeSessionAction::Terminate { max_output_tokens }
|
||||
} else {
|
||||
CodeModeSessionAction::Poll {
|
||||
yield_time_ms,
|
||||
max_output_tokens,
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
};
|
||||
if let Ok(CodeModeSessionProgress::Yielded {
|
||||
yielded_session, ..
|
||||
}) = &result
|
||||
{
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.store_yielded_session(yielded_session.clone())
|
||||
.await;
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(CodeModeSessionProgress::Finished(output))
|
||||
| Ok(CodeModeSessionProgress::Yielded { output, .. }) => Ok(output),
|
||||
Err(error) => Err(FunctionCallError::RespondToModel(error)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_code_mode_process(exec: &ExecContext) -> Result<CodeModeProcess, String> {
|
||||
let node_path = resolve_compatible_node(exec.turn.config.js_repl_node_path.as_deref()).await?;
|
||||
let env = create_env(&exec.turn.shell_environment_policy, None);
|
||||
let mut cmd = tokio::process::Command::new(&node_path);
|
||||
cmd.arg("--experimental-vm-modules");
|
||||
@@ -176,7 +363,7 @@ async fn execute_node(
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stderr"))?;
|
||||
let mut stdin = child
|
||||
let stdin = child
|
||||
.stdin
|
||||
.take()
|
||||
.ok_or_else(|| format!("{PUBLIC_TOOL_NAME} runner missing stdin"))?;
|
||||
@@ -188,19 +375,132 @@ async fn execute_node(
|
||||
String::from_utf8_lossy(&buf).trim().to_string()
|
||||
});
|
||||
|
||||
write_message(
|
||||
&mut stdin,
|
||||
&HostToNodeMessage::Init {
|
||||
enabled_tools: enabled_tools.clone(),
|
||||
Ok(CodeModeProcess {
|
||||
child,
|
||||
stdin,
|
||||
stdout_lines: BufReader::new(stdout).lines(),
|
||||
stderr_task: Some(stderr_task),
|
||||
pending_messages: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn ensure_shared_code_mode_process(
|
||||
exec: &ExecContext,
|
||||
) -> Result<Arc<tokio::sync::Mutex<CodeModeProcess>>, String> {
|
||||
if let Some(process) = exec.session.services.code_mode_store.process().await {
|
||||
let is_running = {
|
||||
let mut process_guard = process.lock().await;
|
||||
matches!(process_guard.has_exited(), Ok(false))
|
||||
};
|
||||
if is_running {
|
||||
return Ok(process);
|
||||
}
|
||||
}
|
||||
|
||||
let process = Arc::new(tokio::sync::Mutex::new(
|
||||
spawn_code_mode_process(exec).await?,
|
||||
));
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.store_process(process.clone())
|
||||
.await;
|
||||
Ok(process)
|
||||
}
|
||||
|
||||
async fn existing_shared_code_mode_process(
|
||||
exec: &ExecContext,
|
||||
) -> Result<Arc<tokio::sync::Mutex<CodeModeProcess>>, FunctionCallError> {
|
||||
let process = exec
|
||||
.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.process()
|
||||
.await
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner is not available for {WAIT_TOOL_NAME}"
|
||||
))
|
||||
})?;
|
||||
let is_running = {
|
||||
let mut process_guard = process.lock().await;
|
||||
matches!(process_guard.has_exited(), Ok(false))
|
||||
};
|
||||
if is_running {
|
||||
Ok(process)
|
||||
} else {
|
||||
Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} runner is not available for {WAIT_TOOL_NAME}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn drive_code_mode_session(
|
||||
exec: &ExecContext,
|
||||
process: &mut CodeModeProcess,
|
||||
session_id: i32,
|
||||
action: CodeModeSessionAction,
|
||||
) -> Result<CodeModeSessionProgress, String> {
|
||||
let started_at = std::time::Instant::now();
|
||||
let is_terminate = matches!(action, CodeModeSessionAction::Terminate { .. });
|
||||
let (message, poll_max_output_tokens) = match action {
|
||||
CodeModeSessionAction::Start {
|
||||
enabled_tools,
|
||||
stored_values,
|
||||
source,
|
||||
},
|
||||
} => (
|
||||
HostToNodeMessage::Start {
|
||||
session_id,
|
||||
enabled_tools,
|
||||
stored_values,
|
||||
source,
|
||||
},
|
||||
None,
|
||||
),
|
||||
CodeModeSessionAction::Poll {
|
||||
yield_time_ms,
|
||||
max_output_tokens,
|
||||
} => (
|
||||
HostToNodeMessage::Poll {
|
||||
session_id,
|
||||
yield_time_ms,
|
||||
},
|
||||
Some(max_output_tokens),
|
||||
),
|
||||
CodeModeSessionAction::Terminate { max_output_tokens } => (
|
||||
HostToNodeMessage::Terminate { session_id },
|
||||
Some(max_output_tokens),
|
||||
),
|
||||
};
|
||||
if let Some(progress) = process_pending_messages(
|
||||
exec,
|
||||
process,
|
||||
session_id,
|
||||
poll_max_output_tokens,
|
||||
started_at,
|
||||
is_terminate,
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
{
|
||||
return Ok(progress);
|
||||
}
|
||||
write_message(&mut process.stdin, &message).await?;
|
||||
|
||||
let mut stdout_lines = BufReader::new(stdout).lines();
|
||||
let mut pending_result = None;
|
||||
while let Some(line) = stdout_lines
|
||||
if let Some(progress) = process_pending_messages(
|
||||
exec,
|
||||
process,
|
||||
session_id,
|
||||
poll_max_output_tokens,
|
||||
started_at,
|
||||
is_terminate,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(progress);
|
||||
}
|
||||
|
||||
while let Some(line) = process
|
||||
.stdout_lines
|
||||
.next_line()
|
||||
.await
|
||||
.map_err(|err| format!("failed to read {PUBLIC_TOOL_NAME} runner stdout: {err}"))?
|
||||
@@ -211,77 +511,203 @@ async fn execute_node(
|
||||
let message: NodeToHostMessage = serde_json::from_str(&line).map_err(|err| {
|
||||
format!("invalid {PUBLIC_TOOL_NAME} runner message: {err}; line={line}")
|
||||
})?;
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { id, name, input } => {
|
||||
let message_session_id = message_session_id(&message);
|
||||
if message_session_id != session_id {
|
||||
if let NodeToHostMessage::ToolCall {
|
||||
session_id: message_session_id,
|
||||
id,
|
||||
name,
|
||||
input,
|
||||
} = message
|
||||
{
|
||||
let response = HostToNodeMessage::Response {
|
||||
session_id: message_session_id,
|
||||
id,
|
||||
code_mode_result: call_nested_tool(exec.clone(), name, input).await,
|
||||
};
|
||||
write_message(&mut stdin, &response).await?;
|
||||
}
|
||||
NodeToHostMessage::Result {
|
||||
content_items,
|
||||
stored_values,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
} => {
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.replace_stored_values(stored_values)
|
||||
.await;
|
||||
pending_result = Some((
|
||||
output_content_items_from_json_values(content_items)?,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
));
|
||||
break;
|
||||
write_message(&mut process.stdin, &response).await?;
|
||||
} else {
|
||||
process
|
||||
.pending_messages
|
||||
.entry(message_session_id)
|
||||
.or_default()
|
||||
.push_back(message);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if let Some(progress) = handle_node_message(
|
||||
exec,
|
||||
process,
|
||||
session_id,
|
||||
message,
|
||||
poll_max_output_tokens,
|
||||
started_at,
|
||||
is_terminate,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(progress);
|
||||
}
|
||||
}
|
||||
|
||||
drop(stdin);
|
||||
|
||||
let status = child
|
||||
.wait()
|
||||
.await
|
||||
.map_err(|err| format!("failed to wait for {PUBLIC_TOOL_NAME} runner: {err}"))?;
|
||||
let stderr = stderr_task
|
||||
.await
|
||||
.map_err(|err| format!("failed to collect {PUBLIC_TOOL_NAME} stderr: {err}"))?;
|
||||
let wall_time = started_at.elapsed();
|
||||
let success = status.success();
|
||||
|
||||
let Some((mut content_items, error_text, max_output_tokens_per_exec_call)) = pending_result
|
||||
else {
|
||||
let message = if stderr.is_empty() {
|
||||
format!("{PUBLIC_TOOL_NAME} runner exited without returning a result (status {status})")
|
||||
} else {
|
||||
stderr
|
||||
};
|
||||
return Err(message);
|
||||
let status = process.wait_for_exit().await?;
|
||||
let stderr = process.stderr().await?;
|
||||
let message = if stderr.is_empty() {
|
||||
format!("{PUBLIC_TOOL_NAME} runner exited without returning a result (status {status})")
|
||||
} else {
|
||||
stderr
|
||||
};
|
||||
Err(message)
|
||||
}
|
||||
|
||||
if !success {
|
||||
let error_text = error_text.unwrap_or_else(|| {
|
||||
if stderr.is_empty() {
|
||||
format!("Process exited with status {status}")
|
||||
} else {
|
||||
stderr
|
||||
}
|
||||
});
|
||||
content_items.push(FunctionCallOutputContentItem::InputText {
|
||||
text: format!("Script error:\n{error_text}"),
|
||||
});
|
||||
async fn process_pending_messages(
|
||||
exec: &ExecContext,
|
||||
process: &mut CodeModeProcess,
|
||||
session_id: i32,
|
||||
poll_max_output_tokens: Option<Option<usize>>,
|
||||
started_at: std::time::Instant,
|
||||
is_terminate: bool,
|
||||
) -> Result<Option<CodeModeSessionProgress>, String> {
|
||||
loop {
|
||||
let Some(message) = process
|
||||
.pending_messages
|
||||
.get_mut(&session_id)
|
||||
.and_then(VecDeque::pop_front)
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
if let Some(progress) = handle_node_message(
|
||||
exec,
|
||||
process,
|
||||
session_id,
|
||||
message,
|
||||
poll_max_output_tokens,
|
||||
started_at,
|
||||
is_terminate,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(progress));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut content_items =
|
||||
truncate_code_mode_result(content_items, max_output_tokens_per_exec_call);
|
||||
prepend_script_status(&mut content_items, success, wall_time);
|
||||
Ok(FunctionToolOutput::from_content(
|
||||
content_items,
|
||||
Some(success),
|
||||
))
|
||||
async fn handle_node_message(
|
||||
exec: &ExecContext,
|
||||
process: &mut CodeModeProcess,
|
||||
session_id: i32,
|
||||
message: NodeToHostMessage,
|
||||
poll_max_output_tokens: Option<Option<usize>>,
|
||||
started_at: std::time::Instant,
|
||||
is_terminate: bool,
|
||||
) -> Result<Option<CodeModeSessionProgress>, String> {
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall {
|
||||
session_id: message_session_id,
|
||||
id,
|
||||
name,
|
||||
input,
|
||||
} => {
|
||||
if is_terminate {
|
||||
return Ok(None);
|
||||
}
|
||||
let response = HostToNodeMessage::Response {
|
||||
session_id: message_session_id,
|
||||
id,
|
||||
code_mode_result: call_nested_tool(exec.clone(), name, input).await,
|
||||
};
|
||||
write_message(&mut process.stdin, &response).await?;
|
||||
Ok(None)
|
||||
}
|
||||
NodeToHostMessage::Yielded {
|
||||
content_items,
|
||||
max_output_tokens_per_exec_call,
|
||||
..
|
||||
} => {
|
||||
if is_terminate {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
delta_items = truncate_code_mode_result(
|
||||
delta_items,
|
||||
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
|
||||
);
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
CodeModeExecutionStatus::Running(session_id),
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(Some(CodeModeSessionProgress::Yielded {
|
||||
output: FunctionToolOutput::from_content(delta_items, Some(true)),
|
||||
yielded_session: CodeModeYieldedSession { session_id },
|
||||
}))
|
||||
}
|
||||
NodeToHostMessage::Terminated {
|
||||
content_items,
|
||||
max_output_tokens_per_exec_call,
|
||||
..
|
||||
} => {
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
delta_items = truncate_code_mode_result(
|
||||
delta_items,
|
||||
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
|
||||
);
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
CodeModeExecutionStatus::Terminated,
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(Some(CodeModeSessionProgress::Finished(
|
||||
FunctionToolOutput::from_content(delta_items, Some(true)),
|
||||
)))
|
||||
}
|
||||
NodeToHostMessage::Result {
|
||||
content_items,
|
||||
stored_values,
|
||||
error_text,
|
||||
max_output_tokens_per_exec_call,
|
||||
..
|
||||
} => {
|
||||
exec.session
|
||||
.services
|
||||
.code_mode_store
|
||||
.replace_stored_values(stored_values)
|
||||
.await;
|
||||
let mut delta_items = output_content_items_from_json_values(content_items)?;
|
||||
let success = error_text.is_none();
|
||||
if let Some(error_text) = error_text {
|
||||
delta_items.push(FunctionCallOutputContentItem::InputText {
|
||||
text: format!("Script error:\n{error_text}"),
|
||||
});
|
||||
}
|
||||
|
||||
let mut delta_items = truncate_code_mode_result(
|
||||
delta_items,
|
||||
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
|
||||
);
|
||||
prepend_script_status(
|
||||
&mut delta_items,
|
||||
if success {
|
||||
CodeModeExecutionStatus::Completed
|
||||
} else {
|
||||
CodeModeExecutionStatus::Failed
|
||||
},
|
||||
started_at.elapsed(),
|
||||
);
|
||||
Ok(Some(CodeModeSessionProgress::Finished(
|
||||
FunctionToolOutput::from_content(delta_items, Some(success)),
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn message_session_id(message: &NodeToHostMessage) -> i32 {
|
||||
match message {
|
||||
NodeToHostMessage::ToolCall { session_id, .. }
|
||||
| NodeToHostMessage::Yielded { session_id, .. }
|
||||
| NodeToHostMessage::Terminated { session_id, .. }
|
||||
| NodeToHostMessage::Result { session_id, .. } => *session_id,
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_message(
|
||||
@@ -306,16 +732,19 @@ async fn write_message(
|
||||
|
||||
fn prepend_script_status(
|
||||
content_items: &mut Vec<FunctionCallOutputContentItem>,
|
||||
success: bool,
|
||||
status: CodeModeExecutionStatus,
|
||||
wall_time: Duration,
|
||||
) {
|
||||
let wall_time_seconds = ((wall_time.as_secs_f32()) * 10.0).round() / 10.0;
|
||||
let header = format!(
|
||||
"{}\nWall time {wall_time_seconds:.1} seconds\nOutput:\n",
|
||||
if success {
|
||||
"Script completed"
|
||||
} else {
|
||||
"Script failed"
|
||||
match status {
|
||||
CodeModeExecutionStatus::Completed => "Script completed".to_string(),
|
||||
CodeModeExecutionStatus::Failed => "Script failed".to_string(),
|
||||
CodeModeExecutionStatus::Running(session_id) => {
|
||||
format!("Script running with session ID {session_id}")
|
||||
}
|
||||
CodeModeExecutionStatus::Terminated => "Script terminated".to_string(),
|
||||
}
|
||||
);
|
||||
content_items.insert(0, FunctionCallOutputContentItem::InputText { text: header });
|
||||
@@ -365,7 +794,7 @@ async fn build_enabled_tools(exec: &ExecContext) -> Vec<EnabledTool> {
|
||||
|
||||
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<EnabledTool> {
|
||||
let tool_name = spec.name().to_string();
|
||||
if tool_name == PUBLIC_TOOL_NAME {
|
||||
if tool_name == PUBLIC_TOOL_NAME || tool_name == WAIT_TOOL_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,16 +1,35 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::code_mode;
|
||||
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
|
||||
use crate::tools::code_mode::WAIT_TOOL_NAME;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::handlers::parse_arguments;
|
||||
use crate::tools::registry::ToolHandler;
|
||||
use crate::tools::registry::ToolKind;
|
||||
|
||||
pub struct CodeModeHandler;
|
||||
pub struct CodeModeWaitHandler;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ExecWaitArgs {
|
||||
session_id: i32,
|
||||
#[serde(default = "default_wait_yield_time_ms")]
|
||||
yield_time_ms: u64,
|
||||
#[serde(default)]
|
||||
max_tokens: Option<usize>,
|
||||
#[serde(default)]
|
||||
terminate: bool,
|
||||
}
|
||||
|
||||
fn default_wait_yield_time_ms() -> u64 {
|
||||
DEFAULT_WAIT_YIELD_TIME_MS
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolHandler for CodeModeHandler {
|
||||
@@ -29,25 +48,57 @@ impl ToolHandler for CodeModeHandler {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
tool_name,
|
||||
payload,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
if !session.features().enabled(Feature::CodeMode) {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} is disabled by feature flag"
|
||||
)));
|
||||
}
|
||||
|
||||
let code = match payload {
|
||||
ToolPayload::Custom { input } => input,
|
||||
_ => {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
|
||||
)));
|
||||
match payload {
|
||||
ToolPayload::Custom { input } if tool_name == PUBLIC_TOOL_NAME => {
|
||||
code_mode::execute(session, turn, tracker, input).await
|
||||
}
|
||||
};
|
||||
|
||||
code_mode::execute(session, turn, tracker, code).await
|
||||
_ => Err(FunctionCallError::RespondToModel(format!(
|
||||
"{PUBLIC_TOOL_NAME} expects raw JavaScript source text"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolHandler for CodeModeWaitHandler {
|
||||
type Output = FunctionToolOutput;
|
||||
|
||||
fn kind(&self) -> ToolKind {
|
||||
ToolKind::Function
|
||||
}
|
||||
|
||||
async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
|
||||
let ToolInvocation {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
tool_name,
|
||||
payload,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
match payload {
|
||||
ToolPayload::Function { arguments } if tool_name == WAIT_TOOL_NAME => {
|
||||
let args: ExecWaitArgs = parse_arguments(&arguments)?;
|
||||
code_mode::wait(
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
args.session_id,
|
||||
args.yield_time_ms,
|
||||
args.max_tokens,
|
||||
args.terminate,
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => Err(FunctionCallError::RespondToModel(format!(
|
||||
"{WAIT_TOOL_NAME} expects JSON arguments"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::sandboxing::normalize_additional_permissions;
|
||||
pub use apply_patch::ApplyPatchHandler;
|
||||
pub use artifacts::ArtifactsHandler;
|
||||
pub use code_mode::CodeModeHandler;
|
||||
pub use code_mode::CodeModeWaitHandler;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
pub use dynamic::DynamicToolHandler;
|
||||
|
||||
@@ -7,7 +7,9 @@ use crate::features::Feature;
|
||||
use crate::features::Features;
|
||||
use crate::mcp_connection_manager::ToolInfo;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::tools::code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
|
||||
use crate::tools::code_mode::WAIT_TOOL_NAME;
|
||||
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
|
||||
use crate::tools::handlers::PLAN_TOOL;
|
||||
use crate::tools::handlers::SEARCH_TOOL_BM25_DEFAULT_LIMIT;
|
||||
@@ -572,6 +574,54 @@ fn create_write_stdin_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
fn create_exec_wait_tool() -> ToolSpec {
|
||||
let properties = BTreeMap::from([
|
||||
(
|
||||
"session_id".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some("Identifier of the running exec session.".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
"yield_time_ms".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"How long to wait (in milliseconds) for more output before yielding again."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"max_tokens".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum number of output tokens to return for this wait call.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"terminate".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some("Whether to terminate the running exec session.".to_string()),
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: WAIT_TOOL_NAME.to_string(),
|
||||
description: format!(
|
||||
"Waits on a yielded `{PUBLIC_TOOL_NAME}` session and returns new output or completion."
|
||||
),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["session_id".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
output_schema: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn create_shell_tool(request_permission_enabled: bool) -> ToolSpec {
|
||||
let mut properties = BTreeMap::from([
|
||||
(
|
||||
@@ -1660,7 +1710,7 @@ source: /[\s\S]+/
|
||||
enabled_tool_names.join(", ")
|
||||
};
|
||||
let description = format!(
|
||||
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, and `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate the final Rust-side result of the current `{PUBLIC_TOOL_NAME}` execution. The default is `10000`. This guards the overall `{PUBLIC_TOOL_NAME}` output, not individual nested tool invocations. The returned content starts with a separate `Script completed` or `Script failed` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
|
||||
"Runs JavaScript in a Node-backed `node:vm` context. This is a freeform tool: send raw JavaScript source text (no JSON/quotes/markdown fences). Direct tool calls remain available while `{PUBLIC_TOOL_NAME}` is enabled. Inside JavaScript, import nested tools from `tools.js`, for example `import {{ exec_command }} from \"tools.js\"` or `import {{ ALL_TOOLS }} from \"tools.js\"` to inspect the available `{{ module, name, description }}` entries. Namespaced tools are also available from `tools/<namespace...>.js`; MCP tools use `tools/mcp/<server>.js`, for example `import {{ append_notebook_logs_chart }} from \"tools/mcp/ologs.js\"`. Nested tool calls resolve to their code-mode result values. Import `{{ output_text, output_image, set_max_output_tokens_per_exec_call, set_yield_time, store, load }}` from `\"@openai/code_mode\"` (or `\"openai/code_mode\"`); `output_text(value)` surfaces text back to the model and stringifies non-string objects when possible, `output_image(imageUrl)` appends an `input_image` content item for `http(s)` or `data:` URLs, `store(key, value)` persists JSON-serializable values across `{PUBLIC_TOOL_NAME}` calls in the current session, `load(key)` returns a cloned stored value or `undefined`, `set_max_output_tokens_per_exec_call(value)` sets the token budget used to truncate direct `{PUBLIC_TOOL_NAME}` returns, and `{WAIT_TOOL_NAME}` uses its own `max_tokens` argument with a default of `10000`. `set_yield_time(value)` asks `{PUBLIC_TOOL_NAME}` to return early if the script is still running after that many milliseconds so `{WAIT_TOOL_NAME}` can resume it later. The default wait timeout for `{WAIT_TOOL_NAME}` is {DEFAULT_WAIT_YIELD_TIME_MS}. The returned content starts with a separate `Script completed`, `Script failed`, or `Script running with session ID …` text item that includes wall time. When truncation happens, the final text may include `Total output lines:` and the usual `…N tokens truncated…` marker. Function tools require JSON object arguments. Freeform tools require raw strings. `add_content(value)` remains available for compatibility with a content item, content-item array, or string. Structured nested-tool results should be converted to text first, for example with `JSON.stringify(...)`. Only content passed to `output_text(...)`, `output_image(...)`, or `add_content(value)` is surfaced back to the model. Enabled nested tools: {enabled_list}."
|
||||
);
|
||||
|
||||
ToolSpec::Freeform(FreeformTool {
|
||||
@@ -1675,7 +1725,9 @@ source: /[\s\S]+/
|
||||
}
|
||||
|
||||
fn is_code_mode_nested_tool(spec: &ToolSpec) -> bool {
|
||||
spec.name() != PUBLIC_TOOL_NAME && matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
|
||||
spec.name() != PUBLIC_TOOL_NAME
|
||||
&& spec.name() != WAIT_TOOL_NAME
|
||||
&& matches!(spec, ToolSpec::Function(_) | ToolSpec::Freeform(_))
|
||||
}
|
||||
|
||||
fn create_list_mcp_resources_tool() -> ToolSpec {
|
||||
@@ -2030,6 +2082,7 @@ pub(crate) fn build_specs(
|
||||
use crate::tools::handlers::ApplyPatchHandler;
|
||||
use crate::tools::handlers::ArtifactsHandler;
|
||||
use crate::tools::handlers::CodeModeHandler;
|
||||
use crate::tools::handlers::CodeModeWaitHandler;
|
||||
use crate::tools::handlers::DynamicToolHandler;
|
||||
use crate::tools::handlers::GrepFilesHandler;
|
||||
use crate::tools::handlers::JsReplHandler;
|
||||
@@ -2067,6 +2120,7 @@ pub(crate) fn build_specs(
|
||||
});
|
||||
let search_tool_handler = Arc::new(SearchToolBm25Handler);
|
||||
let code_mode_handler = Arc::new(CodeModeHandler);
|
||||
let code_mode_wait_handler = Arc::new(CodeModeWaitHandler);
|
||||
let js_repl_handler = Arc::new(JsReplHandler);
|
||||
let js_repl_reset_handler = Arc::new(JsReplResetHandler);
|
||||
let artifacts_handler = Arc::new(ArtifactsHandler);
|
||||
@@ -2096,6 +2150,13 @@ pub(crate) fn build_specs(
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler(PUBLIC_TOOL_NAME, code_mode_handler);
|
||||
push_tool_spec(
|
||||
&mut builder,
|
||||
create_exec_wait_tool(),
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler(WAIT_TOOL_NAME, code_mode_wait_handler);
|
||||
}
|
||||
|
||||
match &config.shell_type {
|
||||
|
||||
@@ -21,6 +21,7 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use wiremock::MockServer;
|
||||
|
||||
@@ -32,6 +33,16 @@ fn custom_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value>
|
||||
.clone()
|
||||
}
|
||||
|
||||
fn function_tool_output_items(req: &ResponsesRequest, call_id: &str) -> Vec<Value> {
|
||||
match req.function_call_output(call_id).get("output") {
|
||||
Some(Value::Array(items)) => items.clone(),
|
||||
Some(Value::String(text)) => {
|
||||
vec![serde_json::json!({ "type": "input_text", "text": text })]
|
||||
}
|
||||
_ => panic!("function tool output should be serialized as text or content items"),
|
||||
}
|
||||
}
|
||||
|
||||
fn text_item(items: &[Value], index: usize) -> &str {
|
||||
items[index]
|
||||
.get("text")
|
||||
@@ -39,6 +50,20 @@ fn text_item(items: &[Value], index: usize) -> &str {
|
||||
.expect("content item should be input_text")
|
||||
}
|
||||
|
||||
fn extract_running_session_id(text: &str) -> i32 {
|
||||
text.strip_prefix("Script running with session ID ")
|
||||
.and_then(|rest| rest.split('\n').next())
|
||||
.expect("running header should contain a session ID")
|
||||
.parse()
|
||||
.expect("session ID should parse as i32")
|
||||
}
|
||||
|
||||
fn wait_for_file_source(path: &Path) -> Result<String> {
|
||||
let quoted_path = shlex::try_join([path.to_string_lossy().as_ref()])?;
|
||||
let command = format!("if [ -f {quoted_path} ]; then printf ready; fi");
|
||||
Ok(format!("await waitForFile({command:?});"))
|
||||
}
|
||||
|
||||
fn custom_tool_output_body_and_success(
|
||||
req: &ResponsesRequest,
|
||||
call_id: &str,
|
||||
@@ -289,6 +314,775 @@ Error:\ boom\n
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_yield_and_resume_with_exec_wait() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let phase_2_gate = test.workspace_path("code-mode-phase-2.ready");
|
||||
let phase_3_gate = test.workspace_path("code-mode-phase-3.ready");
|
||||
let phase_2_wait = wait_for_file_source(&phase_2_gate)?;
|
||||
let phase_3_wait = wait_for_file_source(&phase_3_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("phase 1");
|
||||
set_yield_time(10);
|
||||
{phase_2_wait}
|
||||
output_text("phase 2");
|
||||
{phase_3_wait}
|
||||
output_text("phase 3");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&first_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "still waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
fs::write(&phase_2_gate, "ready")?;
|
||||
test.submit_turn("wait again").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
assert_eq!(
|
||||
extract_running_session_id(text_item(&second_items, 0)),
|
||||
session_id
|
||||
);
|
||||
assert_eq!(text_item(&second_items, 1), "phase 2");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
fs::write(&phase_3_gate, "ready")?;
|
||||
test.submit_turn("wait for completion").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "phase 3");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_run_multiple_yielded_sessions() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let session_a_gate = test.workspace_path("code-mode-session-a.ready");
|
||||
let session_b_gate = test.workspace_path("code-mode-session-b.ready");
|
||||
let session_a_wait = wait_for_file_source(&session_a_gate)?;
|
||||
let session_b_wait = wait_for_file_source(&session_b_gate)?;
|
||||
|
||||
let session_a_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("session a start");
|
||||
set_yield_time(10);
|
||||
{session_a_wait}
|
||||
output_text("session a done");
|
||||
"#
|
||||
);
|
||||
let session_b_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("session b start");
|
||||
set_yield_time(10);
|
||||
{session_b_wait}
|
||||
output_text("session b done");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &session_a_code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "session a waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session a").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "session a start");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_custom_tool_call("call-2", "exec", &session_b_code),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "session b waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session b").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = custom_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
|
||||
assert_eq!(text_item(&second_items, 1), "session b start");
|
||||
assert_ne!(session_a_id, session_b_id);
|
||||
|
||||
fs::write(&session_a_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_a_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "session a done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session a").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "session a done");
|
||||
|
||||
fs::write(&session_b_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-7"),
|
||||
responses::ev_function_call(
|
||||
"call-4",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_b_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-7"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let fourth_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-4", "session b done"),
|
||||
ev_completed("resp-8"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session b").await?;
|
||||
|
||||
let fourth_request = fourth_completion.single_request();
|
||||
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
|
||||
assert_eq!(fourth_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&fourth_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&fourth_items, 1), "session b done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_can_terminate_and_continue() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let termination_gate = test.workspace_path("code-mode-terminate.ready");
|
||||
let termination_wait = wait_for_file_source(&termination_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("phase 1");
|
||||
set_yield_time(10);
|
||||
{termination_wait}
|
||||
output_text("phase 2");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"terminate": true,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "terminated"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("terminate it").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 1);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script terminated\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
ev_custom_tool_call(
|
||||
"call-3",
|
||||
"exec",
|
||||
r#"
|
||||
import { output_text } from "@openai/code_mode";
|
||||
|
||||
output_text("after terminate");
|
||||
"#,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "done"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run another exec").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = custom_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&third_items, 1), "after terminate");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_terminate_returns_completed_session_if_it_finished_in_background()
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let session_a_gate = test.workspace_path("code-mode-session-a-finished.ready");
|
||||
let session_b_gate = test.workspace_path("code-mode-session-b-blocked.ready");
|
||||
let session_a_wait = wait_for_file_source(&session_a_gate)?;
|
||||
let session_b_wait = wait_for_file_source(&session_b_gate)?;
|
||||
|
||||
let session_a_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("session a start");
|
||||
set_yield_time(10);
|
||||
{session_a_wait}
|
||||
output_text("session a done");
|
||||
"#
|
||||
);
|
||||
let session_b_code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("session b start");
|
||||
set_yield_time(10);
|
||||
{session_b_wait}
|
||||
output_text("session b done");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &session_a_code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "session a waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session a").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
let session_a_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
assert_eq!(text_item(&first_items, 1), "session a start");
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_custom_tool_call("call-2", "exec", &session_b_code),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "session b waiting"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start session b").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = custom_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
let session_b_id = extract_running_session_id(text_item(&second_items, 0));
|
||||
assert_eq!(text_item(&second_items, 1), "session b start");
|
||||
|
||||
fs::write(&session_a_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-5"),
|
||||
responses::ev_function_call(
|
||||
"call-3",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_b_id,
|
||||
"yield_time_ms": 1_000,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-5"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let third_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-3", "session b still waiting"),
|
||||
ev_completed("resp-6"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait session b").await?;
|
||||
|
||||
let third_request = third_completion.single_request();
|
||||
let third_items = function_tool_output_items(&third_request, "call-3");
|
||||
assert_eq!(third_items.len(), 1);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script running with session ID \d+\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&third_items, 0),
|
||||
);
|
||||
assert_eq!(
|
||||
extract_running_session_id(text_item(&third_items, 0)),
|
||||
session_b_id
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-7"),
|
||||
responses::ev_function_call(
|
||||
"call-4",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_a_id,
|
||||
"terminate": true,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-7"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let fourth_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-4", "session a already done"),
|
||||
ev_completed("resp-8"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("terminate session a").await?;
|
||||
|
||||
let fourth_request = fourth_completion.single_request();
|
||||
let fourth_items = function_tool_output_items(&fourth_request, "call-4");
|
||||
assert_eq!(fourth_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&fourth_items, 0),
|
||||
);
|
||||
assert_eq!(text_item(&fourth_items, 1), "session a done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_wait_uses_its_own_max_tokens_budget() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let completion_gate = test.workspace_path("code-mode-max-tokens.ready");
|
||||
let completion_wait = wait_for_file_source(&completion_gate)?;
|
||||
|
||||
let code = format!(
|
||||
r#"
|
||||
import {{ output_text, set_max_output_tokens_per_exec_call, set_yield_time }} from "@openai/code_mode";
|
||||
import {{ exec_command }} from "tools.js";
|
||||
|
||||
const waitForFile = async (cmd) => {{
|
||||
while ((await exec_command({{ cmd }})).output !== "ready") {{
|
||||
}}
|
||||
}};
|
||||
|
||||
output_text("phase 1");
|
||||
set_max_output_tokens_per_exec_call(100);
|
||||
set_yield_time(10);
|
||||
{completion_wait}
|
||||
output_text("token one token two token three token four token five token six token seven");
|
||||
"#
|
||||
);
|
||||
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_custom_tool_call("call-1", "exec", &code),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let first_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "waiting"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("start the long exec").await?;
|
||||
|
||||
let first_request = first_completion.single_request();
|
||||
let first_items = custom_tool_output_items(&first_request, "call-1");
|
||||
assert_eq!(first_items.len(), 2);
|
||||
assert_eq!(text_item(&first_items, 1), "phase 1");
|
||||
let session_id = extract_running_session_id(text_item(&first_items, 0));
|
||||
|
||||
fs::write(&completion_gate, "ready")?;
|
||||
responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
responses::ev_function_call(
|
||||
"call-2",
|
||||
"exec_wait",
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"yield_time_ms": 1_000,
|
||||
"max_tokens": 6,
|
||||
}))?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_completion = responses::mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-2", "done"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
test.submit_turn("wait for completion").await?;
|
||||
|
||||
let second_request = second_completion.single_request();
|
||||
let second_items = function_tool_output_items(&second_request, "call-2");
|
||||
assert_eq!(second_items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
),
|
||||
text_item(&second_items, 0),
|
||||
);
|
||||
let expected_pattern = r#"(?sx)
|
||||
\A
|
||||
Total\ output\ lines:\ 1\n
|
||||
\n
|
||||
.*…\d+\ tokens\ truncated….*
|
||||
\z
|
||||
"#;
|
||||
assert_regex_match(expected_pattern, text_item(&second_items, 1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_output_serialized_text_via_openai_code_mode_module() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user