refactor(core): isolate unified exec streaming source

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-21 02:20:06 +00:00
parent 0ee08bc702
commit 87701ece86
3 changed files with 210 additions and 129 deletions

View File

@@ -2,13 +2,14 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{Mutex, Notify};
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::Sleep;
use tokio_util::sync::CancellationToken;
use super::UnifiedExecContext;
use super::process::UnifiedExecProcess;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::exec::ExecToolCallOutput;
@@ -18,7 +19,8 @@ use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecCommandSource;
use crate::protocol::ExecOutputStream;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::StreamingSource;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventStage;
@@ -46,128 +48,178 @@ pub(crate) fn start_streaming_output(
let session_ref = Arc::clone(&context.session);
let turn_ref = Arc::clone(&context.turn);
let call_id = context.call_id.clone();
let OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = process.output_handles();
match process.streaming_source() {
StreamingSource::Receiver {
receiver,
output_drained,
cancellation_token,
} => {
tokio::spawn(async move {
stream_output_from_receiver(
session_ref,
turn_ref,
call_id,
transcript,
output_drained,
receiver,
cancellation_token,
)
.await;
});
}
StreamingSource::Buffered {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
output_drained,
cancellation_token,
} => {
tokio::spawn(async move {
stream_output_from_buffer(
session_ref,
turn_ref,
call_id,
transcript,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
output_drained,
cancellation_token,
)
.await;
});
}
}
}
if let Some(local_process) = process.as_local_process() {
let mut receiver = local_process.output_receiver();
let output_drained = local_process.output_drained_notify();
tokio::spawn(async move {
use tokio::sync::broadcast::error::RecvError;
async fn stream_output_from_receiver(
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
transcript: Arc<Mutex<HeadTailBuffer>>,
output_drained: Arc<Notify>,
mut receiver: Receiver<Vec<u8>>,
cancellation_token: CancellationToken,
) {
use tokio::sync::broadcast::error::RecvError;
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
loop {
tokio::select! {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
}
loop {
tokio::select! {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
}
_ = async {
if let Some(sleep) = grace_sleep.as_mut() {
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
_ = async {
if let Some(sleep) = grace_sleep.as_mut() {
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
output_drained.notify_one();
break;
}
received = receiver.recv() => {
let chunk = match received {
Ok(chunk) => chunk,
Err(RecvError::Lagged(_)) => {
continue;
},
Err(RecvError::Closed) => {
output_drained.notify_one();
break;
}
};
received = receiver.recv() => {
let chunk = match received {
Ok(chunk) => chunk,
Err(RecvError::Lagged(_)) => {
continue;
},
Err(RecvError::Closed) => {
output_drained.notify_one();
break;
}
};
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
}
}
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
}
});
return;
}
}
}
let output_drained = process
.output_drained()
.unwrap_or_else(|| Arc::new(Notify::new()));
async fn stream_output_from_buffer(
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
transcript: Arc<Mutex<HeadTailBuffer>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<std::sync::atomic::AtomicBool>,
output_closed_notify: Arc<Notify>,
output_drained: Arc<Notify>,
cancellation_token: CancellationToken,
) {
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
tokio::spawn(async move {
let mut pending = Vec::<u8>::new();
let mut emitted_deltas: usize = 0;
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
loop {
tokio::select! {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
}
loop {
tokio::select! {
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
_ = async {
if let Some(sleep) = grace_sleep.as_mut() {
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
output_drained.notify_one();
break;
}
_ = async {
if let Some(sleep) = grace_sleep.as_mut() {
sleep.as_mut().await;
}
}, if grace_sleep.is_some() => {
output_drained.notify_one();
break;
}
_ = output_notify.notified() => {
let drained_chunks = {
let mut guard = output_buffer.lock().await;
guard.drain_chunks()
};
if drained_chunks.is_empty() {
if cancellation_token.is_cancelled() && output_closed.load(std::sync::atomic::Ordering::Acquire) {
output_drained.notify_one();
break;
}
continue;
}
for chunk in drained_chunks {
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
_ = output_notify.notified() => {
let drained_chunks = {
let mut guard = output_buffer.lock().await;
guard.drain_chunks()
};
if drained_chunks.is_empty() {
if cancellation_token.is_cancelled()
&& output_closed.load(std::sync::atomic::Ordering::Acquire)
{
output_drained.notify_one();
break;
}
continue;
}
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
grace_sleep.replace(Box::pin(tokio::time::sleep_until(Instant::now() + TRAILING_OUTPUT_GRACE)));
for chunk in drained_chunks {
process_chunk(
&mut pending,
&transcript,
&call_id,
&session_ref,
&turn_ref,
&mut emitted_deltas,
chunk,
).await;
}
}
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
grace_sleep.replace(Box::pin(tokio::time::sleep_until(
Instant::now() + TRAILING_OUTPUT_GRACE,
)));
}
}
});
}
}
/// Spawn a background watcher that waits for the PTY to exit and then emits a

View File

@@ -187,36 +187,41 @@ pub(crate) enum ProcessBackend {
}
impl ProcessBackend {
pub(crate) fn is_local(&self) -> bool {
matches!(self, Self::Local { .. })
}
pub(crate) fn as_local_process(&self) -> Option<&Arc<UnifiedExecProcess>> {
pub(crate) fn streaming_source(&self) -> crate::unified_exec::process::StreamingSource {
match self {
Self::Local { process } => Some(process),
Self::ExecServer { .. } => None,
Self::Local { process } => process.streaming_source(),
Self::ExecServer {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
output_drained,
cancellation_token,
..
} => crate::unified_exec::process::StreamingSource::Buffered {
output_buffer: Arc::clone(output_buffer),
output_notify: Arc::clone(output_notify),
output_closed: Arc::clone(output_closed),
output_closed_notify: Arc::clone(output_closed_notify),
output_drained: Arc::clone(output_drained),
cancellation_token: cancellation_token.clone(),
},
}
}
pub(crate) fn output_handles(
&self,
) -> (
crate::unified_exec::process::OutputBuffer,
Arc<Notify>,
Arc<AtomicBool>,
Arc<Notify>,
CancellationToken,
) {
) -> crate::unified_exec::process::OutputHandles {
match self {
Self::Local { process } => {
let handles = process.output_handles();
(
handles.output_buffer,
handles.output_notify,
handles.output_closed,
handles.output_closed_notify,
handles.cancellation_token,
)
crate::unified_exec::process::OutputHandles {
output_buffer: handles.output_buffer,
output_notify: handles.output_notify,
output_closed: handles.output_closed,
output_closed_notify: handles.output_closed_notify,
cancellation_token: handles.cancellation_token,
}
}
Self::ExecServer {
output_buffer,
@@ -225,13 +230,13 @@ impl ProcessBackend {
output_closed_notify,
cancellation_token,
..
} => (
Arc::clone(output_buffer),
Arc::clone(output_notify),
Arc::clone(output_closed),
Arc::clone(output_closed_notify),
cancellation_token.clone(),
),
} => crate::unified_exec::process::OutputHandles {
output_buffer: Arc::clone(output_buffer),
output_notify: Arc::clone(output_notify),
output_closed: Arc::clone(output_closed),
output_closed_notify: Arc::clone(output_closed_notify),
cancellation_token: cancellation_token.clone(),
},
}
}

View File

@@ -54,6 +54,22 @@ pub(crate) struct OutputHandles {
pub(crate) cancellation_token: CancellationToken,
}
pub(crate) enum StreamingSource {
Receiver {
receiver: broadcast::Receiver<Vec<u8>>,
output_drained: Arc<Notify>,
cancellation_token: CancellationToken,
},
Buffered {
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
output_drained: Arc<Notify>,
cancellation_token: CancellationToken,
},
}
#[derive(Debug)]
pub(crate) struct UnifiedExecProcess {
process_handle: ExecCommandSession,
@@ -140,6 +156,14 @@ impl UnifiedExecProcess {
self.output_rx.resubscribe()
}
pub(super) fn streaming_source(&self) -> StreamingSource {
StreamingSource::Receiver {
receiver: self.output_receiver(),
output_drained: Arc::clone(&self.output_drained),
cancellation_token: self.cancellation_token(),
}
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}