adding exit signal

This commit is contained in:
Ahmed Ibrahim
2025-07-20 17:32:54 -07:00
parent e382e19c65
commit 4262b77cfa
2 changed files with 37 additions and 2 deletions

View File

@@ -807,6 +807,20 @@ async fn submission_loop(
}
}
}
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
if let Some(sess_arc) = sess {
let recorder_opt = {
let mut guard = sess_arc.rollout.lock().unwrap();
guard.take()
};
if let Some(rec) = recorder_opt {
if let Err(e) = rec.shutdown().await {
warn!("failed to shutdown rollout recorder: {e}");
}
}
}
debug!("Agent loop exited");
}

View File

@@ -14,6 +14,7 @@ use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
@@ -59,10 +60,10 @@ pub(crate) struct RolloutRecorder {
tx: Sender<RolloutCmd>,
}
#[derive(Clone)]
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
Shutdown(oneshot::Sender<()>),
}
impl RolloutRecorder {
@@ -95,7 +96,7 @@ impl RolloutRecorder {
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the callers thread.
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
@@ -201,6 +202,19 @@ impl RolloutRecorder {
info!("Resumed rollout successfully from {path:?}");
Ok((Self { tx }, saved))
}
pub async fn shutdown(&self) -> std::io::Result<()> {
// Send a shutdown command and wait for the writer task to flush.
let (tx_done, rx_done) = oneshot::channel();
if let Err(e) = self.tx.send(RolloutCmd::Shutdown(tx_done)).await {
// If the channel is closed, the writer is already gone treat as success.
warn!("failed to send rollout shutdown command: {e}");
return Ok(());
}
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}")))
}
}
struct LogFileInfo {
@@ -312,6 +326,13 @@ async fn rollout_writer(
warn!("Failed to flush state: {e}");
}
}
RolloutCmd::Shutdown(done_tx) => {
if let Err(e) = file.flush().await {
warn!("Failed to flush on shutdown: {e}");
}
let _ = done_tx.send(());
break;
}
}
}
}