mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
Terminate process group on interruption to avoid dangling processes
This commit is contained in:
@@ -556,23 +556,52 @@ async fn exec(
|
||||
consume_truncated_output(child, expiration, stdout_stream).await
|
||||
}
|
||||
|
||||
struct GroupTerminator {
|
||||
child: Child,
|
||||
kill_on_drop: bool,
|
||||
}
|
||||
|
||||
impl GroupTerminator {
|
||||
fn new(child: Child) -> Self {
|
||||
Self {
|
||||
child,
|
||||
kill_on_drop: true,
|
||||
}
|
||||
}
|
||||
|
||||
fn disarm(&mut self) {
|
||||
self.kill_on_drop = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for GroupTerminator {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop {
|
||||
let _ = kill_child_process_group(&mut self.child);
|
||||
let _ = self.child.start_kill();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the output of a child process, truncating it so it is suitable for
|
||||
/// use as the output of a `shell` tool call. Also enforces specified timeout.
|
||||
/// If the future is dropped early (e.g., on interrupt), the process group is killed.
|
||||
async fn consume_truncated_output(
|
||||
mut child: Child,
|
||||
child: Child,
|
||||
expiration: ExecExpiration,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> Result<RawExecToolCallOutput> {
|
||||
let mut terminator = GroupTerminator::new(child);
|
||||
// Both stdout and stderr were configured with `Stdio::piped()`
|
||||
// above, therefore `take()` should normally return `Some`. If it doesn't
|
||||
// we treat it as an exceptional I/O error
|
||||
|
||||
let stdout_reader = child.stdout.take().ok_or_else(|| {
|
||||
let stdout_reader = terminator.child.stdout.take().ok_or_else(|| {
|
||||
CodexErr::Io(io::Error::other(
|
||||
"stdout pipe was unexpectedly not available",
|
||||
))
|
||||
})?;
|
||||
let stderr_reader = child.stderr.take().ok_or_else(|| {
|
||||
let stderr_reader = terminator.child.stderr.take().ok_or_else(|| {
|
||||
CodexErr::Io(io::Error::other(
|
||||
"stderr pipe was unexpectedly not available",
|
||||
))
|
||||
@@ -594,18 +623,21 @@ async fn consume_truncated_output(
|
||||
));
|
||||
|
||||
let (exit_status, timed_out) = tokio::select! {
|
||||
status_result = child.wait() => {
|
||||
status_result = terminator.child.wait() => {
|
||||
let exit_status = status_result?;
|
||||
terminator.disarm();
|
||||
(exit_status, false)
|
||||
}
|
||||
_ = expiration.wait() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
kill_child_process_group(&mut terminator.child)?;
|
||||
terminator.child.start_kill()?;
|
||||
terminator.disarm();
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
kill_child_process_group(&mut terminator.child)?;
|
||||
terminator.child.start_kill()?;
|
||||
terminator.disarm();
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -199,11 +199,19 @@ impl Session {
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
self.services
|
||||
.unified_exec_manager
|
||||
.terminate_sessions_for_turn(&sub_id)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
|
||||
task.cancellation_token.cancel();
|
||||
self.services
|
||||
.unified_exec_manager
|
||||
.terminate_sessions_for_turn(&sub_id)
|
||||
.await;
|
||||
let session_task = task.task;
|
||||
|
||||
select! {
|
||||
|
||||
@@ -643,6 +643,25 @@ impl UnifiedExecSessionManager {
|
||||
entry.session.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate_sessions_for_turn(&self, sub_id: &str) {
|
||||
let entries: Vec<SessionEntry> = {
|
||||
let mut sessions = self.session_store.lock().await;
|
||||
let ids: Vec<String> = sessions
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|(_, entry)| entry.turn_ref.sub_id == sub_id)
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
ids.into_iter()
|
||||
.filter_map(|id| sessions.remove(&id))
|
||||
.collect()
|
||||
};
|
||||
|
||||
for entry in entries {
|
||||
entry.session.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum SessionStatus {
|
||||
|
||||
Reference in New Issue
Block a user