mirror of
https://github.com/openai/codex.git
synced 2026-04-30 03:12:20 +03:00
merge
This commit is contained in:
@@ -14,6 +14,7 @@ use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
@@ -28,8 +29,9 @@ use crate::sandboxing::ExecEnv;
|
||||
use crate::sandboxing::SandboxManager;
|
||||
use crate::spawn::StdioPolicy;
|
||||
use crate::spawn::spawn_child_async;
|
||||
use crate::text_encoding::bytes_to_string_smart;
|
||||
|
||||
const DEFAULT_TIMEOUT_MS: u64 = 10_000;
|
||||
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
|
||||
|
||||
// Hardcode these since it does not seem worth including the libc crate just
|
||||
// for these.
|
||||
@@ -46,11 +48,11 @@ const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB
|
||||
/// Aggregation still collects full output; only the live event stream is capped.
|
||||
pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub struct ExecParams {
|
||||
pub command: Vec<String>,
|
||||
pub cwd: PathBuf,
|
||||
pub timeout_ms: Option<u64>,
|
||||
pub expiration: ExecExpiration,
|
||||
pub env: HashMap<String, String>,
|
||||
pub with_escalated_permissions: Option<bool>,
|
||||
pub justification: Option<String>,
|
||||
@@ -59,9 +61,48 @@ pub struct ExecParams {
|
||||
pub max_output_chars: Option<usize>,
|
||||
}
|
||||
|
||||
impl ExecParams {
|
||||
pub fn timeout_duration(&self) -> Duration {
|
||||
Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS))
|
||||
/// Mechanism to terminate an exec invocation before it finishes naturally.
|
||||
#[derive(Debug)]
|
||||
pub enum ExecExpiration {
|
||||
Timeout(Duration),
|
||||
DefaultTimeout,
|
||||
Cancellation(CancellationToken),
|
||||
}
|
||||
|
||||
impl From<Option<u64>> for ExecExpiration {
|
||||
fn from(timeout_ms: Option<u64>) -> Self {
|
||||
timeout_ms.map_or(ExecExpiration::DefaultTimeout, |timeout_ms| {
|
||||
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u64> for ExecExpiration {
|
||||
fn from(timeout_ms: u64) -> Self {
|
||||
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecExpiration {
|
||||
async fn wait(self) {
|
||||
match self {
|
||||
ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await,
|
||||
ExecExpiration::DefaultTimeout => {
|
||||
tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)).await
|
||||
}
|
||||
ExecExpiration::Cancellation(cancel) => {
|
||||
cancel.cancelled().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If ExecExpiration is a timeout, returns the timeout in milliseconds.
|
||||
pub(crate) fn timeout_ms(&self) -> Option<u64> {
|
||||
match self {
|
||||
ExecExpiration::Timeout(duration) => Some(duration.as_millis() as u64),
|
||||
ExecExpiration::DefaultTimeout => Some(DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
|
||||
ExecExpiration::Cancellation(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +138,7 @@ pub async fn process_exec_tool_call(
|
||||
let ExecParams {
|
||||
command,
|
||||
cwd,
|
||||
timeout_ms,
|
||||
expiration,
|
||||
env,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
@@ -118,7 +159,7 @@ pub async fn process_exec_tool_call(
|
||||
args: args.to_vec(),
|
||||
cwd,
|
||||
env,
|
||||
timeout_ms,
|
||||
expiration,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
max_output_tokens,
|
||||
@@ -128,7 +169,7 @@ pub async fn process_exec_tool_call(
|
||||
let manager = SandboxManager::new();
|
||||
let exec_env = manager
|
||||
.transform(
|
||||
&spec,
|
||||
spec,
|
||||
sandbox_policy,
|
||||
sandbox_type,
|
||||
sandbox_cwd,
|
||||
@@ -137,7 +178,7 @@ pub async fn process_exec_tool_call(
|
||||
.map_err(CodexErr::from)?;
|
||||
|
||||
// Route through the sandboxing module for a single, unified execution path.
|
||||
crate::sandboxing::execute_env(&exec_env, sandbox_policy, stdout_stream).await
|
||||
crate::sandboxing::execute_env(exec_env, sandbox_policy, stdout_stream).await
|
||||
}
|
||||
|
||||
pub(crate) async fn execute_exec_env(
|
||||
@@ -149,7 +190,7 @@ pub(crate) async fn execute_exec_env(
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
timeout_ms,
|
||||
expiration,
|
||||
sandbox,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
@@ -161,7 +202,7 @@ pub(crate) async fn execute_exec_env(
|
||||
let params = ExecParams {
|
||||
command,
|
||||
cwd,
|
||||
timeout_ms,
|
||||
expiration,
|
||||
env,
|
||||
with_escalated_permissions,
|
||||
justification,
|
||||
@@ -188,9 +229,12 @@ async fn exec_windows_sandbox(
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
timeout_ms,
|
||||
expiration,
|
||||
..
|
||||
} = params;
|
||||
// TODO(iceweasel-oai): run_windows_sandbox_capture should support all
|
||||
// variants of ExecExpiration, not just timeout.
|
||||
let timeout_ms = expiration.timeout_ms();
|
||||
|
||||
let policy_str = serde_json::to_string(sandbox_policy).map_err(|err| {
|
||||
CodexErr::Io(io::Error::other(format!(
|
||||
@@ -424,7 +468,7 @@ impl StreamOutput<String> {
|
||||
impl StreamOutput<Vec<u8>> {
|
||||
pub fn from_utf8_lossy(&self) -> StreamOutput<String> {
|
||||
StreamOutput {
|
||||
text: String::from_utf8_lossy(&self.text).to_string(),
|
||||
text: bytes_to_string_smart(&self.text),
|
||||
truncated_after_lines: self.truncated_after_lines,
|
||||
}
|
||||
}
|
||||
@@ -458,12 +502,12 @@ async fn exec(
|
||||
{
|
||||
return exec_windows_sandbox(params, sandbox_policy).await;
|
||||
}
|
||||
let timeout = params.timeout_duration();
|
||||
let ExecParams {
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
arg0,
|
||||
expiration,
|
||||
..
|
||||
} = params;
|
||||
|
||||
@@ -484,14 +528,14 @@ async fn exec(
|
||||
env,
|
||||
)
|
||||
.await?;
|
||||
consume_truncated_output(child, timeout, stdout_stream).await
|
||||
consume_truncated_output(child, expiration, stdout_stream).await
|
||||
}
|
||||
|
||||
/// Consumes the output of a child process, truncating it so it is suitable for
|
||||
/// use as the output of a `shell` tool call. Also enforces specified timeout.
|
||||
async fn consume_truncated_output(
|
||||
mut child: Child,
|
||||
timeout: Duration,
|
||||
expiration: ExecExpiration,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<RawExecToolCallOutput> {
|
||||
// Both stdout and stderr were configured with `Stdio::piped()`
|
||||
@@ -525,20 +569,14 @@ async fn consume_truncated_output(
|
||||
));
|
||||
|
||||
let (exit_status, timed_out) = tokio::select! {
|
||||
result = tokio::time::timeout(timeout, child.wait()) => {
|
||||
match result {
|
||||
Ok(status_result) => {
|
||||
let exit_status = status_result?;
|
||||
(exit_status, false)
|
||||
}
|
||||
Err(_) => {
|
||||
// timeout
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
// Debatable whether `child.wait().await` should be called here.
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
|
||||
}
|
||||
}
|
||||
status_result = child.wait() => {
|
||||
let exit_status = status_result?;
|
||||
(exit_status, false)
|
||||
}
|
||||
_ = expiration.wait() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
@@ -790,6 +828,15 @@ mod tests {
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn kill_child_process_group_kills_grandchildren_on_timeout() -> Result<()> {
|
||||
// On Linux/macOS, /bin/bash is typically present; on FreeBSD/OpenBSD,
|
||||
// prefer /bin/sh to avoid NotFound errors.
|
||||
#[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
|
||||
let command = vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 60 & echo $!; sleep 60".to_string(),
|
||||
];
|
||||
#[cfg(all(unix, not(any(target_os = "freebsd", target_os = "openbsd"))))]
|
||||
let command = vec![
|
||||
"/bin/bash".to_string(),
|
||||
"-c".to_string(),
|
||||
@@ -799,7 +846,7 @@ mod tests {
|
||||
let params = ExecParams {
|
||||
command,
|
||||
cwd: std::env::current_dir()?,
|
||||
timeout_ms: Some(500),
|
||||
expiration: 500.into(),
|
||||
env,
|
||||
with_escalated_permissions: None,
|
||||
justification: None,
|
||||
@@ -835,4 +882,62 @@ mod tests {
|
||||
assert!(killed, "grandchild process with pid {pid} is still alive");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn process_exec_tool_call_respects_cancellation_token() -> Result<()> {
|
||||
let command = long_running_command();
|
||||
let cwd = std::env::current_dir()?;
|
||||
let env: HashMap<String, String> = std::env::vars().collect();
|
||||
let cancel_token = CancellationToken::new();
|
||||
let cancel_tx = cancel_token.clone();
|
||||
let params = ExecParams {
|
||||
command,
|
||||
cwd: cwd.clone(),
|
||||
expiration: ExecExpiration::Cancellation(cancel_token),
|
||||
env,
|
||||
with_escalated_permissions: None,
|
||||
justification: None,
|
||||
arg0: None,
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(1_000)).await;
|
||||
cancel_tx.cancel();
|
||||
});
|
||||
let result = process_exec_tool_call(
|
||||
params,
|
||||
SandboxType::None,
|
||||
&SandboxPolicy::DangerFullAccess,
|
||||
cwd.as_path(),
|
||||
&None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let output = match result {
|
||||
Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output,
|
||||
other => panic!("expected timeout error, got {other:?}"),
|
||||
};
|
||||
assert!(output.timed_out);
|
||||
assert_eq!(output.exit_code, EXEC_TIMEOUT_EXIT_CODE);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn long_running_command() -> Vec<String> {
|
||||
vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 30".to_string(),
|
||||
]
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn long_running_command() -> Vec<String> {
|
||||
vec![
|
||||
"powershell.exe".to_string(),
|
||||
"-NonInteractive".to_string(),
|
||||
"-NoLogo".to_string(),
|
||||
"-Command".to_string(),
|
||||
"Start-Sleep -Seconds 30".to_string(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user