mirror of
https://github.com/openai/codex.git
synced 2026-05-02 04:11:39 +03:00
Remove the legacy TUI split (#15922)
This is the part 1 of 2 PRs that will delete the `tui` / `tui_app_server` split. This part simply deletes the existing `tui` directory and marks the `tui_app_server` feature flag as removed. I left the `tui_app_server` feature flag in place for now so its presence doesn't result in an error. It is simply ignored. Part 2 will rename the `tui_app_server` directory `tui`. I did this as two parts to reduce visible code churn.
This commit is contained in:
@@ -1,439 +0,0 @@
|
||||
//! Adaptive stream chunking policy for commit animation ticks.
|
||||
//!
|
||||
//! This policy preserves the baseline user experience while adapting to bursty
|
||||
//! stream input. In [`ChunkingMode::Smooth`], one queued line is drained per
|
||||
//! baseline commit tick. When queue pressure rises, it switches to
|
||||
//! [`ChunkingMode::CatchUp`] and drains queued backlog immediately so display
|
||||
//! lag converges as quickly as possible.
|
||||
//!
|
||||
//! The policy is source-agnostic: it depends only on queue depth and queue
|
||||
//! age from [`QueueSnapshot`]. It does not branch on source identity or explicit
|
||||
//! throughput targets.
|
||||
//!
|
||||
//! # Mental model
|
||||
//!
|
||||
//! Think of this as a two-gear system:
|
||||
//!
|
||||
//! - [`ChunkingMode::Smooth`]: steady baseline display pacing.
|
||||
//! - [`ChunkingMode::CatchUp`]: full queue draining while backlog exists.
|
||||
//!
|
||||
//! The transition logic intentionally uses hysteresis:
|
||||
//!
|
||||
//! - enter catch-up on higher-pressure thresholds
|
||||
//! - exit catch-up on lower-pressure thresholds, held for [`EXIT_HOLD`]
|
||||
//! - after exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`]
|
||||
//! unless backlog is severe
|
||||
//!
|
||||
//! This avoids rapid gear-flapping near threshold boundaries.
|
||||
//!
|
||||
//! # Policy flow
|
||||
//!
|
||||
//! On each decision tick, [`AdaptiveChunkingPolicy::decide`] does:
|
||||
//!
|
||||
//! 1. If queue is empty, reset to [`ChunkingMode::Smooth`].
|
||||
//! 2. If currently smooth, call [`AdaptiveChunkingPolicy::maybe_enter_catch_up`].
|
||||
//! 3. If currently catch-up, call [`AdaptiveChunkingPolicy::maybe_exit_catch_up`].
|
||||
//! 4. Build [`DrainPlan`] (`Single` for smooth, `Batch(queued_lines)` for catch-up).
|
||||
//!
|
||||
//! # Concrete examples
|
||||
//!
|
||||
//! With current defaults:
|
||||
//!
|
||||
//! - `Smooth` drains one line per commit tick.
|
||||
//! - `CatchUp` drains all currently queued lines in a tick.
|
||||
//!
|
||||
//! # Tuning guide (in code terms)
|
||||
//!
|
||||
//! Prefer tuning in this order so causes remain clear:
|
||||
//!
|
||||
//! 1. enter/exit thresholds: [`ENTER_QUEUE_DEPTH_LINES`], [`ENTER_OLDEST_AGE`],
|
||||
//! [`EXIT_QUEUE_DEPTH_LINES`], [`EXIT_OLDEST_AGE`]
|
||||
//! 2. hysteresis windows: [`EXIT_HOLD`], [`REENTER_CATCH_UP_HOLD`]
|
||||
//! 3. severe gates: [`SEVERE_QUEUE_DEPTH_LINES`], [`SEVERE_OLDEST_AGE`]
|
||||
//!
|
||||
//! Symptom-oriented adjustments:
|
||||
//!
|
||||
//! - lag starts too late: lower enter thresholds
|
||||
//! - frequent smooth/catch-up chatter: increase hold windows, or tighten exit
|
||||
//! thresholds
|
||||
//! - catch-up re-enters too eagerly after exit: increase re-entry hold
|
||||
//!
|
||||
//! # Responsibilities
|
||||
//!
|
||||
//! - track mode and hysteresis state
|
||||
//! - produce deterministic [`ChunkingDecision`] values from queue snapshots
|
||||
//! - preserve queue order by draining from queue head only
|
||||
//!
|
||||
//! # Non-responsibilities
|
||||
//!
|
||||
//! - scheduling commit ticks
|
||||
//! - reordering stream lines
|
||||
//! - transport/source-specific semantics
|
||||
//!
|
||||
//! Markdown docs remain supplemental:
|
||||
//!
|
||||
//! - `docs/tui-stream-chunking-review.md`
|
||||
//! - `docs/tui-stream-chunking-tuning.md`
|
||||
//! - `docs/tui-stream-chunking-validation.md`
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
/// Queue-depth threshold that allows entering catch-up mode.
|
||||
///
|
||||
/// Crossing this threshold alone is sufficient to leave smooth mode.
|
||||
const ENTER_QUEUE_DEPTH_LINES: usize = 8;
|
||||
|
||||
/// Oldest-line age threshold that allows entering catch-up mode.
|
||||
///
|
||||
/// Crossing this threshold alone is sufficient to leave smooth mode.
|
||||
const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120);
|
||||
|
||||
/// Queue-depth threshold used when evaluating catch-up exit hysteresis.
|
||||
///
|
||||
/// Depth must be at or below this value before exit hold timing can begin.
|
||||
const EXIT_QUEUE_DEPTH_LINES: usize = 2;
|
||||
|
||||
/// Oldest-line age threshold used when evaluating catch-up exit hysteresis.
|
||||
///
|
||||
/// Age must be at or below this value before exit hold timing can begin.
|
||||
const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40);
|
||||
|
||||
/// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode.
|
||||
const EXIT_HOLD: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Cooldown window after a catch-up exit that suppresses immediate re-entry.
|
||||
///
|
||||
/// Severe backlog still bypasses this hold to avoid unbounded queue-age growth.
|
||||
const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Queue-depth cutoff that marks backlog as severe for faster convergence.
|
||||
///
|
||||
/// This threshold is used to bypass re-entry hold after a recent catch-up exit.
|
||||
const SEVERE_QUEUE_DEPTH_LINES: usize = 64;
|
||||
|
||||
/// Oldest-line age cutoff that marks backlog as severe for faster convergence.
|
||||
const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300);
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub(crate) enum ChunkingMode {
|
||||
/// Drain one line per baseline commit tick.
|
||||
#[default]
|
||||
Smooth,
|
||||
/// Drain multiple lines per tick according to queue pressure.
|
||||
CatchUp,
|
||||
}
|
||||
|
||||
/// Captures queue pressure inputs used by adaptive chunking decisions.
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub(crate) struct QueueSnapshot {
|
||||
/// Number of queued stream lines waiting to be displayed.
|
||||
pub(crate) queued_lines: usize,
|
||||
/// Age of the oldest queued line at decision time.
|
||||
pub(crate) oldest_age: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum DrainPlan {
|
||||
/// Emit exactly one queued line.
|
||||
Single,
|
||||
/// Emit up to `usize` queued lines.
|
||||
Batch(usize),
|
||||
}
|
||||
|
||||
/// Represents one policy decision for a specific queue snapshot.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct ChunkingDecision {
|
||||
/// Mode after applying hysteresis transitions for this decision.
|
||||
pub(crate) mode: ChunkingMode,
|
||||
/// Whether this decision transitioned from `Smooth` into `CatchUp`.
|
||||
pub(crate) entered_catch_up: bool,
|
||||
/// Drain plan to execute for the current commit tick.
|
||||
pub(crate) drain_plan: DrainPlan,
|
||||
}
|
||||
|
||||
/// Maintains adaptive chunking mode and hysteresis state across ticks.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct AdaptiveChunkingPolicy {
|
||||
mode: ChunkingMode,
|
||||
below_exit_threshold_since: Option<Instant>,
|
||||
last_catch_up_exit_at: Option<Instant>,
|
||||
}
|
||||
|
||||
impl AdaptiveChunkingPolicy {
|
||||
/// Returns the policy mode used by the most recent decision.
|
||||
pub(crate) fn mode(&self) -> ChunkingMode {
|
||||
self.mode
|
||||
}
|
||||
|
||||
/// Resets state to baseline smooth mode.
|
||||
pub(crate) fn reset(&mut self) {
|
||||
self.mode = ChunkingMode::Smooth;
|
||||
self.below_exit_threshold_since = None;
|
||||
self.last_catch_up_exit_at = None;
|
||||
}
|
||||
|
||||
/// Computes a drain decision from the current queue snapshot.
|
||||
///
|
||||
/// The decision is deterministic for a given `(mode, snapshot, now)` triple. Callers should
|
||||
/// avoid inventing synthetic snapshots; stale queue age data can cause premature catch-up exits.
|
||||
pub(crate) fn decide(&mut self, snapshot: QueueSnapshot, now: Instant) -> ChunkingDecision {
|
||||
if snapshot.queued_lines == 0 {
|
||||
self.note_catch_up_exit(now);
|
||||
self.mode = ChunkingMode::Smooth;
|
||||
self.below_exit_threshold_since = None;
|
||||
return ChunkingDecision {
|
||||
mode: self.mode,
|
||||
entered_catch_up: false,
|
||||
drain_plan: DrainPlan::Single,
|
||||
};
|
||||
}
|
||||
|
||||
let entered_catch_up = match self.mode {
|
||||
ChunkingMode::Smooth => self.maybe_enter_catch_up(snapshot, now),
|
||||
ChunkingMode::CatchUp => {
|
||||
self.maybe_exit_catch_up(snapshot, now);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let drain_plan = match self.mode {
|
||||
ChunkingMode::Smooth => DrainPlan::Single,
|
||||
ChunkingMode::CatchUp => DrainPlan::Batch(snapshot.queued_lines.max(1)),
|
||||
};
|
||||
|
||||
ChunkingDecision {
|
||||
mode: self.mode,
|
||||
entered_catch_up,
|
||||
drain_plan,
|
||||
}
|
||||
}
|
||||
|
||||
/// Switches from `Smooth` to `CatchUp` when enter thresholds are crossed.
|
||||
///
|
||||
/// Returns `true` only on the transition tick so callers can emit one-shot
|
||||
/// transition observability.
|
||||
fn maybe_enter_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) -> bool {
|
||||
if !should_enter_catch_up(snapshot) {
|
||||
return false;
|
||||
}
|
||||
if self.reentry_hold_active(now) && !is_severe_backlog(snapshot) {
|
||||
return false;
|
||||
}
|
||||
self.mode = ChunkingMode::CatchUp;
|
||||
self.below_exit_threshold_since = None;
|
||||
self.last_catch_up_exit_at = None;
|
||||
true
|
||||
}
|
||||
|
||||
/// Applies exit hysteresis while in `CatchUp` mode.
|
||||
///
|
||||
/// The policy requires queue pressure to stay below exit thresholds for the
|
||||
/// full `EXIT_HOLD` window before returning to `Smooth`.
|
||||
fn maybe_exit_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) {
|
||||
if !should_exit_catch_up(snapshot) {
|
||||
self.below_exit_threshold_since = None;
|
||||
return;
|
||||
}
|
||||
|
||||
match self.below_exit_threshold_since {
|
||||
Some(since) if now.saturating_duration_since(since) >= EXIT_HOLD => {
|
||||
self.mode = ChunkingMode::Smooth;
|
||||
self.below_exit_threshold_since = None;
|
||||
self.last_catch_up_exit_at = Some(now);
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
self.below_exit_threshold_since = Some(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn note_catch_up_exit(&mut self, now: Instant) {
|
||||
if self.mode == ChunkingMode::CatchUp {
|
||||
self.last_catch_up_exit_at = Some(now);
|
||||
}
|
||||
}
|
||||
|
||||
fn reentry_hold_active(&self, now: Instant) -> bool {
|
||||
self.last_catch_up_exit_at
|
||||
.is_some_and(|exit| now.saturating_duration_since(exit) < REENTER_CATCH_UP_HOLD)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether current queue pressure warrants entering catch-up mode.
|
||||
///
|
||||
/// Either depth or age pressure is sufficient to trigger catch-up.
|
||||
fn should_enter_catch_up(snapshot: QueueSnapshot) -> bool {
|
||||
snapshot.queued_lines >= ENTER_QUEUE_DEPTH_LINES
|
||||
|| snapshot
|
||||
.oldest_age
|
||||
.is_some_and(|oldest| oldest >= ENTER_OLDEST_AGE)
|
||||
}
|
||||
|
||||
/// Returns whether queue pressure is low enough to begin exit hysteresis.
|
||||
///
|
||||
/// Both depth and age must be below thresholds; this prevents oscillation when
|
||||
/// one signal is still under load.
|
||||
fn should_exit_catch_up(snapshot: QueueSnapshot) -> bool {
|
||||
snapshot.queued_lines <= EXIT_QUEUE_DEPTH_LINES
|
||||
&& snapshot
|
||||
.oldest_age
|
||||
.is_some_and(|oldest| oldest <= EXIT_OLDEST_AGE)
|
||||
}
|
||||
|
||||
/// Returns whether backlog is severe enough to use a faster catch-up target.
|
||||
///
|
||||
/// Severe pressure bypasses re-entry hold to avoid queue-age growth after a
|
||||
/// recent catch-up exit.
|
||||
fn is_severe_backlog(snapshot: QueueSnapshot) -> bool {
|
||||
snapshot.queued_lines >= SEVERE_QUEUE_DEPTH_LINES
|
||||
|| snapshot
|
||||
.oldest_age
|
||||
.is_some_and(|oldest| oldest >= SEVERE_OLDEST_AGE)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn snapshot(queued_lines: usize, oldest_age_ms: u64) -> QueueSnapshot {
|
||||
QueueSnapshot {
|
||||
queued_lines,
|
||||
oldest_age: Some(Duration::from_millis(oldest_age_ms)),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smooth_mode_is_default() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
|
||||
let decision = policy.decide(snapshot(1, 10), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(decision.entered_catch_up, false);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Single);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enters_catch_up_on_depth_threshold() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
|
||||
let decision = policy.decide(snapshot(8, 10), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(decision.entered_catch_up, true);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enters_catch_up_on_age_threshold() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
|
||||
let decision = policy.decide(snapshot(2, 120), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(decision.entered_catch_up, true);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn severe_backlog_uses_faster_paced_batches() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
let _ = policy.decide(snapshot(9, 10), now);
|
||||
|
||||
let decision = policy.decide(snapshot(64, 10), now + Duration::from_millis(5));
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(64));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn catch_up_batch_drains_current_backlog() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
let decision = policy.decide(snapshot(512, 400), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(512));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exits_catch_up_after_hysteresis_hold() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let _ = policy.decide(snapshot(9, 10), t0);
|
||||
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
|
||||
|
||||
let pre_hold = policy.decide(snapshot(2, 40), t0 + Duration::from_millis(200));
|
||||
assert_eq!(pre_hold.mode, ChunkingMode::CatchUp);
|
||||
|
||||
let post_hold = policy.decide(snapshot(2, 40), t0 + Duration::from_millis(460));
|
||||
assert_eq!(post_hold.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(post_hold.drain_plan, DrainPlan::Single);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drops_back_to_smooth_when_idle() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let now = Instant::now();
|
||||
let _ = policy.decide(snapshot(9, 10), now);
|
||||
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
|
||||
|
||||
let decision = policy.decide(
|
||||
QueueSnapshot {
|
||||
queued_lines: 0,
|
||||
oldest_age: None,
|
||||
},
|
||||
now + Duration::from_millis(20),
|
||||
);
|
||||
assert_eq!(decision.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Single);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn holds_reentry_after_catch_up_exit() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let entered = policy.decide(snapshot(8, 20), t0);
|
||||
assert_eq!(entered.mode, ChunkingMode::CatchUp);
|
||||
|
||||
let drained = policy.decide(
|
||||
QueueSnapshot {
|
||||
queued_lines: 0,
|
||||
oldest_age: None,
|
||||
},
|
||||
t0 + Duration::from_millis(20),
|
||||
);
|
||||
assert_eq!(drained.mode, ChunkingMode::Smooth);
|
||||
|
||||
let held = policy.decide(snapshot(8, 20), t0 + Duration::from_millis(120));
|
||||
assert_eq!(held.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(held.drain_plan, DrainPlan::Single);
|
||||
|
||||
let reentered = policy.decide(snapshot(8, 20), t0 + Duration::from_millis(320));
|
||||
assert_eq!(reentered.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(reentered.drain_plan, DrainPlan::Batch(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn severe_backlog_can_reenter_during_hold() {
|
||||
let mut policy = AdaptiveChunkingPolicy::default();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let _ = policy.decide(snapshot(8, 20), t0);
|
||||
let _ = policy.decide(
|
||||
QueueSnapshot {
|
||||
queued_lines: 0,
|
||||
oldest_age: None,
|
||||
},
|
||||
t0 + Duration::from_millis(20),
|
||||
);
|
||||
|
||||
let severe = policy.decide(snapshot(64, 20), t0 + Duration::from_millis(120));
|
||||
assert_eq!(severe.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(severe.drain_plan, DrainPlan::Batch(64));
|
||||
}
|
||||
}
|
||||
@@ -1,214 +0,0 @@
|
||||
//! Orchestrates commit-tick drains across streaming controllers.
|
||||
//!
|
||||
//! This module bridges queue-based chunking policy (`chunking`) with the concrete stream
|
||||
//! controllers (`controller`). Callers provide the current controllers and tick scope; the module
|
||||
//! computes queue pressure, selects a drain plan, applies it, and returns emitted history cells.
|
||||
//!
|
||||
//! The module preserves ordering by draining only from controller queue heads. It does not schedule
|
||||
//! ticks and it does not mutate UI state directly; callers remain responsible for animation events
|
||||
//! and history insertion side effects.
|
||||
//!
|
||||
//! The main flow is:
|
||||
//! [`run_commit_tick`] -> [`stream_queue_snapshot`] -> [`QueueSnapshot`] ->
|
||||
//! [`resolve_chunking_plan`] -> [`ChunkingDecision`]/[`DrainPlan`] ->
|
||||
//! [`apply_commit_tick_plan`] -> [`CommitTickOutput`].
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::history_cell::HistoryCell;
|
||||
|
||||
use super::chunking::AdaptiveChunkingPolicy;
|
||||
use super::chunking::ChunkingDecision;
|
||||
use super::chunking::ChunkingMode;
|
||||
use super::chunking::DrainPlan;
|
||||
use super::chunking::QueueSnapshot;
|
||||
use super::controller::PlanStreamController;
|
||||
use super::controller::StreamController;
|
||||
|
||||
/// Describes whether a commit tick may run in all modes or only in catch-up mode.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum CommitTickScope {
|
||||
/// Always run the tick, regardless of current chunking mode.
|
||||
AnyMode,
|
||||
/// Run model transitions and policy updates, but commit lines only in `CatchUp`.
|
||||
CatchUpOnly,
|
||||
}
|
||||
|
||||
/// Describes what a single commit tick produced.
|
||||
pub(crate) struct CommitTickOutput {
|
||||
/// Cells produced by drained stream lines during this tick.
|
||||
pub(crate) cells: Vec<Box<dyn HistoryCell>>,
|
||||
/// Whether at least one stream controller was present for this tick.
|
||||
pub(crate) has_controller: bool,
|
||||
/// Whether all present controllers were idle after this tick.
|
||||
pub(crate) all_idle: bool,
|
||||
}
|
||||
|
||||
impl Default for CommitTickOutput {
|
||||
/// Creates an output that represents "no commit performed".
|
||||
///
|
||||
/// This is used when a tick is intentionally suppressed, for example when
|
||||
/// the scope is [`CommitTickScope::CatchUpOnly`] and policy is not in catch-up mode.
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cells: Vec::new(),
|
||||
has_controller: false,
|
||||
all_idle: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs one commit tick against the provided stream controllers.
|
||||
///
|
||||
/// This function collects a [`QueueSnapshot`], asks [`AdaptiveChunkingPolicy`] for a
|
||||
/// [`ChunkingDecision`], and then applies the resulting [`DrainPlan`] to both controllers.
|
||||
/// If callers pass stale controller references (for example, references not tied to the
|
||||
/// current turn), queue age can be misread and the policy may stay in catch-up longer
|
||||
/// than expected.
|
||||
pub(crate) fn run_commit_tick(
|
||||
policy: &mut AdaptiveChunkingPolicy,
|
||||
stream_controller: Option<&mut StreamController>,
|
||||
plan_stream_controller: Option<&mut PlanStreamController>,
|
||||
scope: CommitTickScope,
|
||||
now: Instant,
|
||||
) -> CommitTickOutput {
|
||||
let snapshot = stream_queue_snapshot(
|
||||
stream_controller.as_deref(),
|
||||
plan_stream_controller.as_deref(),
|
||||
now,
|
||||
);
|
||||
let decision = resolve_chunking_plan(policy, snapshot, now);
|
||||
if scope == CommitTickScope::CatchUpOnly && decision.mode != ChunkingMode::CatchUp {
|
||||
return CommitTickOutput::default();
|
||||
}
|
||||
|
||||
apply_commit_tick_plan(
|
||||
decision.drain_plan,
|
||||
stream_controller,
|
||||
plan_stream_controller,
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds the combined queue-pressure snapshot consumed by chunking policy.
|
||||
///
|
||||
/// The snapshot sums queue depth across controllers and keeps the maximum oldest age
|
||||
/// so policy decisions reflect the most delayed queued line currently visible.
|
||||
fn stream_queue_snapshot(
|
||||
stream_controller: Option<&StreamController>,
|
||||
plan_stream_controller: Option<&PlanStreamController>,
|
||||
now: Instant,
|
||||
) -> QueueSnapshot {
|
||||
let mut queued_lines = 0usize;
|
||||
let mut oldest_age: Option<Duration> = None;
|
||||
|
||||
if let Some(controller) = stream_controller {
|
||||
queued_lines += controller.queued_lines();
|
||||
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
|
||||
}
|
||||
if let Some(controller) = plan_stream_controller {
|
||||
queued_lines += controller.queued_lines();
|
||||
oldest_age = max_duration(oldest_age, controller.oldest_queued_age(now));
|
||||
}
|
||||
|
||||
QueueSnapshot {
|
||||
queued_lines,
|
||||
oldest_age,
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes one policy decision and emits a trace log on mode transitions.
|
||||
///
|
||||
/// This keeps policy transition logging in one place so callers can rely on
|
||||
/// [`run_commit_tick`] to provide consistent observability.
|
||||
fn resolve_chunking_plan(
|
||||
policy: &mut AdaptiveChunkingPolicy,
|
||||
snapshot: QueueSnapshot,
|
||||
now: Instant,
|
||||
) -> ChunkingDecision {
|
||||
let prior_mode = policy.mode();
|
||||
let decision = policy.decide(snapshot, now);
|
||||
if decision.mode != prior_mode {
|
||||
tracing::trace!(
|
||||
prior_mode = ?prior_mode,
|
||||
new_mode = ?decision.mode,
|
||||
queued_lines = snapshot.queued_lines,
|
||||
oldest_queued_age_ms = snapshot.oldest_age.map(|age| age.as_millis() as u64),
|
||||
entered_catch_up = decision.entered_catch_up,
|
||||
"stream chunking mode transition"
|
||||
);
|
||||
}
|
||||
decision
|
||||
}
|
||||
|
||||
/// Applies a [`DrainPlan`] to all available stream controllers.
|
||||
///
|
||||
/// The returned [`CommitTickOutput`] reports emitted cells and whether all
|
||||
/// present controllers are idle after draining.
|
||||
fn apply_commit_tick_plan(
|
||||
drain_plan: DrainPlan,
|
||||
stream_controller: Option<&mut StreamController>,
|
||||
plan_stream_controller: Option<&mut PlanStreamController>,
|
||||
) -> CommitTickOutput {
|
||||
let mut output = CommitTickOutput::default();
|
||||
|
||||
if let Some(controller) = stream_controller {
|
||||
output.has_controller = true;
|
||||
let (cell, is_idle) = drain_stream_controller(controller, drain_plan);
|
||||
if let Some(cell) = cell {
|
||||
output.cells.push(cell);
|
||||
}
|
||||
output.all_idle &= is_idle;
|
||||
}
|
||||
if let Some(controller) = plan_stream_controller {
|
||||
output.has_controller = true;
|
||||
let (cell, is_idle) = drain_plan_stream_controller(controller, drain_plan);
|
||||
if let Some(cell) = cell {
|
||||
output.cells.push(cell);
|
||||
}
|
||||
output.all_idle &= is_idle;
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
/// Applies one drain step to the main stream controller.
|
||||
///
|
||||
/// [`DrainPlan::Single`] maps to one-line drain; [`DrainPlan::Batch`] maps to
|
||||
/// multi-line drain (including instant catch-up when policy requests the full
|
||||
/// queued backlog).
|
||||
fn drain_stream_controller(
|
||||
controller: &mut StreamController,
|
||||
drain_plan: DrainPlan,
|
||||
) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
match drain_plan {
|
||||
DrainPlan::Single => controller.on_commit_tick(),
|
||||
DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines),
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies one drain step to the plan stream controller.
|
||||
///
|
||||
/// This mirrors [`drain_stream_controller`] so both controller types follow the
|
||||
/// same chunking policy decisions.
|
||||
fn drain_plan_stream_controller(
|
||||
controller: &mut PlanStreamController,
|
||||
drain_plan: DrainPlan,
|
||||
) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
match drain_plan {
|
||||
DrainPlan::Single => controller.on_commit_tick(),
|
||||
DrainPlan::Batch(max_lines) => controller.on_commit_tick_batch(max_lines),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the greater of two optional durations.
|
||||
///
|
||||
/// This helper preserves whichever side is present when only one duration exists.
|
||||
fn max_duration(lhs: Option<Duration>, rhs: Option<Duration>) -> Option<Duration> {
|
||||
match (lhs, rhs) {
|
||||
(Some(left), Some(right)) => Some(left.max(right)),
|
||||
(Some(left), None) => Some(left),
|
||||
(None, Some(right)) => Some(right),
|
||||
(None, None) => None,
|
||||
}
|
||||
}
|
||||
@@ -1,396 +0,0 @@
|
||||
use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::{self};
|
||||
use crate::render::line_utils::prefix_lines;
|
||||
use crate::style::proposed_plan_style;
|
||||
use ratatui::prelude::Stylize;
|
||||
use ratatui::text::Line;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use super::StreamState;
|
||||
|
||||
/// Controller that manages newline-gated streaming, header emission, and
|
||||
/// commit animation across streams.
|
||||
pub(crate) struct StreamController {
|
||||
state: StreamState,
|
||||
finishing_after_drain: bool,
|
||||
header_emitted: bool,
|
||||
}
|
||||
|
||||
impl StreamController {
|
||||
/// Create a controller whose markdown renderer shortens local file links relative to `cwd`.
|
||||
///
|
||||
/// The controller snapshots the path into stream state so later commit ticks and finalization
|
||||
/// render against the same session cwd that was active when streaming started.
|
||||
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
|
||||
Self {
|
||||
state: StreamState::new(width, cwd),
|
||||
finishing_after_drain: false,
|
||||
header_emitted: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a delta; if it contains a newline, commit completed lines and start animation.
|
||||
pub(crate) fn push(&mut self, delta: &str) -> bool {
|
||||
let state = &mut self.state;
|
||||
if !delta.is_empty() {
|
||||
state.has_seen_delta = true;
|
||||
}
|
||||
state.collector.push_delta(delta);
|
||||
if delta.contains('\n') {
|
||||
let newly_completed = state.collector.commit_complete_lines();
|
||||
if !newly_completed.is_empty() {
|
||||
state.enqueue(newly_completed);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Finalize the active stream. Drain and emit now.
|
||||
pub(crate) fn finalize(&mut self) -> Option<Box<dyn HistoryCell>> {
|
||||
// Finalize collector first.
|
||||
let remaining = {
|
||||
let state = &mut self.state;
|
||||
state.collector.finalize_and_drain()
|
||||
};
|
||||
// Collect all output first to avoid emitting headers when there is no content.
|
||||
let mut out_lines = Vec::new();
|
||||
{
|
||||
let state = &mut self.state;
|
||||
if !remaining.is_empty() {
|
||||
state.enqueue(remaining);
|
||||
}
|
||||
let step = state.drain_all();
|
||||
out_lines.extend(step);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
self.state.clear();
|
||||
self.finishing_after_drain = false;
|
||||
self.emit(out_lines)
|
||||
}
|
||||
|
||||
/// Step animation: commit at most one queued line and handle end-of-drain cleanup.
|
||||
pub(crate) fn on_commit_tick(&mut self) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
let step = self.state.step();
|
||||
(self.emit(step), self.state.is_idle())
|
||||
}
|
||||
|
||||
/// Step animation: commit at most `max_lines` queued lines.
|
||||
///
|
||||
/// This is intended for adaptive catch-up drains. Callers should keep `max_lines` bounded; a
|
||||
/// very large value can collapse perceived animation into a single jump.
|
||||
pub(crate) fn on_commit_tick_batch(
|
||||
&mut self,
|
||||
max_lines: usize,
|
||||
) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
let step = self.state.drain_n(max_lines.max(1));
|
||||
(self.emit(step), self.state.is_idle())
|
||||
}
|
||||
|
||||
/// Returns the current number of queued lines waiting to be displayed.
|
||||
pub(crate) fn queued_lines(&self) -> usize {
|
||||
self.state.queued_len()
|
||||
}
|
||||
|
||||
/// Returns the age of the oldest queued line.
|
||||
pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
|
||||
self.state.oldest_queued_age(now)
|
||||
}
|
||||
|
||||
fn emit(&mut self, lines: Vec<Line<'static>>) -> Option<Box<dyn HistoryCell>> {
|
||||
if lines.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Box::new(history_cell::AgentMessageCell::new(lines, {
|
||||
let header_emitted = self.header_emitted;
|
||||
self.header_emitted = true;
|
||||
!header_emitted
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
/// Controller that streams proposed plan markdown into a styled plan block.
|
||||
pub(crate) struct PlanStreamController {
|
||||
state: StreamState,
|
||||
header_emitted: bool,
|
||||
top_padding_emitted: bool,
|
||||
}
|
||||
|
||||
impl PlanStreamController {
|
||||
/// Create a plan-stream controller whose markdown renderer shortens local file links relative
|
||||
/// to `cwd`.
|
||||
///
|
||||
/// The controller snapshots the path into stream state so later commit ticks and finalization
|
||||
/// render against the same session cwd that was active when streaming started.
|
||||
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
|
||||
Self {
|
||||
state: StreamState::new(width, cwd),
|
||||
header_emitted: false,
|
||||
top_padding_emitted: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a delta; if it contains a newline, commit completed lines and start animation.
|
||||
pub(crate) fn push(&mut self, delta: &str) -> bool {
|
||||
let state = &mut self.state;
|
||||
if !delta.is_empty() {
|
||||
state.has_seen_delta = true;
|
||||
}
|
||||
state.collector.push_delta(delta);
|
||||
if delta.contains('\n') {
|
||||
let newly_completed = state.collector.commit_complete_lines();
|
||||
if !newly_completed.is_empty() {
|
||||
state.enqueue(newly_completed);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Finalize the active stream. Drain and emit now.
|
||||
pub(crate) fn finalize(&mut self) -> Option<Box<dyn HistoryCell>> {
|
||||
let remaining = {
|
||||
let state = &mut self.state;
|
||||
state.collector.finalize_and_drain()
|
||||
};
|
||||
let mut out_lines = Vec::new();
|
||||
{
|
||||
let state = &mut self.state;
|
||||
if !remaining.is_empty() {
|
||||
state.enqueue(remaining);
|
||||
}
|
||||
let step = state.drain_all();
|
||||
out_lines.extend(step);
|
||||
}
|
||||
|
||||
self.state.clear();
|
||||
self.emit(out_lines, /*include_bottom_padding*/ true)
|
||||
}
|
||||
|
||||
/// Step animation: commit at most one queued line and handle end-of-drain cleanup.
|
||||
pub(crate) fn on_commit_tick(&mut self) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
let step = self.state.step();
|
||||
(
|
||||
self.emit(step, /*include_bottom_padding*/ false),
|
||||
self.state.is_idle(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Step animation: commit at most `max_lines` queued lines.
|
||||
///
|
||||
/// This is intended for adaptive catch-up drains. Callers should keep `max_lines` bounded; a
|
||||
/// very large value can collapse perceived animation into a single jump.
|
||||
pub(crate) fn on_commit_tick_batch(
|
||||
&mut self,
|
||||
max_lines: usize,
|
||||
) -> (Option<Box<dyn HistoryCell>>, bool) {
|
||||
let step = self.state.drain_n(max_lines.max(1));
|
||||
(
|
||||
self.emit(step, /*include_bottom_padding*/ false),
|
||||
self.state.is_idle(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the current number of queued plan lines waiting to be displayed.
|
||||
pub(crate) fn queued_lines(&self) -> usize {
|
||||
self.state.queued_len()
|
||||
}
|
||||
|
||||
/// Returns the age of the oldest queued plan line.
|
||||
pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
|
||||
self.state.oldest_queued_age(now)
|
||||
}
|
||||
|
||||
fn emit(
|
||||
&mut self,
|
||||
lines: Vec<Line<'static>>,
|
||||
include_bottom_padding: bool,
|
||||
) -> Option<Box<dyn HistoryCell>> {
|
||||
if lines.is_empty() && !include_bottom_padding {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut out_lines: Vec<Line<'static>> = Vec::new();
|
||||
let is_stream_continuation = self.header_emitted;
|
||||
if !self.header_emitted {
|
||||
out_lines.push(vec!["• ".dim(), "Proposed Plan".bold()].into());
|
||||
out_lines.push(Line::from(" "));
|
||||
self.header_emitted = true;
|
||||
}
|
||||
|
||||
let mut plan_lines: Vec<Line<'static>> = Vec::new();
|
||||
if !self.top_padding_emitted {
|
||||
plan_lines.push(Line::from(" "));
|
||||
self.top_padding_emitted = true;
|
||||
}
|
||||
plan_lines.extend(lines);
|
||||
if include_bottom_padding {
|
||||
plan_lines.push(Line::from(" "));
|
||||
}
|
||||
|
||||
let plan_style = proposed_plan_style();
|
||||
let plan_lines = prefix_lines(plan_lines, " ".into(), " ".into())
|
||||
.into_iter()
|
||||
.map(|line| line.style(plan_style))
|
||||
.collect::<Vec<_>>();
|
||||
out_lines.extend(plan_lines);
|
||||
|
||||
Some(Box::new(history_cell::new_proposed_plan_stream(
|
||||
out_lines,
|
||||
is_stream_continuation,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn test_cwd() -> PathBuf {
|
||||
// These tests only need a stable absolute cwd; using temp_dir() avoids baking Unix- or
|
||||
// Windows-specific root semantics into the fixtures.
|
||||
std::env::temp_dir()
|
||||
}
|
||||
|
||||
fn lines_to_plain_strings(lines: &[ratatui::text::Line<'_>]) -> Vec<String> {
|
||||
lines
|
||||
.iter()
|
||||
.map(|l| {
|
||||
l.spans
|
||||
.iter()
|
||||
.map(|s| s.content.clone())
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn controller_loose_vs_tight_with_commit_ticks_matches_full() {
|
||||
let mut ctrl = StreamController::new(None, &test_cwd());
|
||||
let mut lines = Vec::new();
|
||||
|
||||
// Exact deltas from the session log (section: Loose vs. tight list items)
|
||||
let deltas = vec![
|
||||
"\n\n",
|
||||
"Loose",
|
||||
" vs",
|
||||
".",
|
||||
" tight",
|
||||
" list",
|
||||
" items",
|
||||
":\n",
|
||||
"1",
|
||||
".",
|
||||
" Tight",
|
||||
" item",
|
||||
"\n",
|
||||
"2",
|
||||
".",
|
||||
" Another",
|
||||
" tight",
|
||||
" item",
|
||||
"\n\n",
|
||||
"1",
|
||||
".",
|
||||
" Loose",
|
||||
" item",
|
||||
" with",
|
||||
" its",
|
||||
" own",
|
||||
" paragraph",
|
||||
".\n\n",
|
||||
" ",
|
||||
" This",
|
||||
" paragraph",
|
||||
" belongs",
|
||||
" to",
|
||||
" the",
|
||||
" same",
|
||||
" list",
|
||||
" item",
|
||||
".\n\n",
|
||||
"2",
|
||||
".",
|
||||
" Second",
|
||||
" loose",
|
||||
" item",
|
||||
" with",
|
||||
" a",
|
||||
" nested",
|
||||
" list",
|
||||
" after",
|
||||
" a",
|
||||
" blank",
|
||||
" line",
|
||||
".\n\n",
|
||||
" ",
|
||||
" -",
|
||||
" Nested",
|
||||
" bullet",
|
||||
" under",
|
||||
" a",
|
||||
" loose",
|
||||
" item",
|
||||
"\n",
|
||||
" ",
|
||||
" -",
|
||||
" Another",
|
||||
" nested",
|
||||
" bullet",
|
||||
"\n\n",
|
||||
];
|
||||
|
||||
// Simulate streaming with a commit tick attempt after each delta.
|
||||
for d in deltas.iter() {
|
||||
ctrl.push(d);
|
||||
while let (Some(cell), idle) = ctrl.on_commit_tick() {
|
||||
lines.extend(cell.transcript_lines(u16::MAX));
|
||||
if idle {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finalize and flush remaining lines now.
|
||||
if let Some(cell) = ctrl.finalize() {
|
||||
lines.extend(cell.transcript_lines(u16::MAX));
|
||||
}
|
||||
|
||||
let streamed: Vec<_> = lines_to_plain_strings(&lines)
|
||||
.into_iter()
|
||||
// skip • and 2-space indentation
|
||||
.map(|s| s.chars().skip(2).collect::<String>())
|
||||
.collect();
|
||||
|
||||
// Full render of the same source
|
||||
let source: String = deltas.iter().copied().collect();
|
||||
let mut rendered: Vec<ratatui::text::Line<'static>> = Vec::new();
|
||||
let test_cwd = test_cwd();
|
||||
crate::markdown::append_markdown(&source, None, Some(test_cwd.as_path()), &mut rendered);
|
||||
let rendered_strs = lines_to_plain_strings(&rendered);
|
||||
|
||||
assert_eq!(streamed, rendered_strs);
|
||||
|
||||
// Also assert exact expected plain strings for clarity.
|
||||
let expected = vec![
|
||||
"Loose vs. tight list items:".to_string(),
|
||||
"".to_string(),
|
||||
"1. Tight item".to_string(),
|
||||
"2. Another tight item".to_string(),
|
||||
"3. Loose item with its own paragraph.".to_string(),
|
||||
"".to_string(),
|
||||
" This paragraph belongs to the same list item.".to_string(),
|
||||
"4. Second loose item with a nested list after a blank line.".to_string(),
|
||||
" - Nested bullet under a loose item".to_string(),
|
||||
" - Another nested bullet".to_string(),
|
||||
];
|
||||
assert_eq!(
|
||||
streamed, expected,
|
||||
"expected exact rendered lines for loose/tight section"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
//! Streaming primitives used by the TUI transcript pipeline.
|
||||
//!
|
||||
//! `StreamState` owns newline-gated markdown collection and a FIFO queue of committed render lines.
|
||||
//! Higher-level modules build on top of this state:
|
||||
//! - `controller` adapts queued lines into `HistoryCell` emission rules for message and plan streams.
|
||||
//! - `chunking` computes adaptive drain plans from queue pressure.
|
||||
//! - `commit_tick` binds policy decisions to concrete controller drains.
|
||||
//!
|
||||
//! The key invariant is queue ordering. All drains pop from the front, and enqueue records an
|
||||
//! arrival timestamp so policy code can reason about oldest queued age without peeking into text.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use ratatui::text::Line;
|
||||
|
||||
use crate::markdown_stream::MarkdownStreamCollector;
|
||||
pub(crate) mod chunking;
|
||||
pub(crate) mod commit_tick;
|
||||
pub(crate) mod controller;
|
||||
|
||||
struct QueuedLine {
|
||||
line: Line<'static>,
|
||||
enqueued_at: Instant,
|
||||
}
|
||||
|
||||
/// Holds in-flight markdown stream state and queued committed lines.
|
||||
pub(crate) struct StreamState {
|
||||
pub(crate) collector: MarkdownStreamCollector,
|
||||
queued_lines: VecDeque<QueuedLine>,
|
||||
pub(crate) has_seen_delta: bool,
|
||||
}
|
||||
|
||||
impl StreamState {
|
||||
/// Create stream state whose markdown collector renders local file links relative to `cwd`.
|
||||
///
|
||||
/// Controllers are expected to pass the session cwd here once and keep it stable for the
|
||||
/// lifetime of the active stream.
|
||||
pub(crate) fn new(width: Option<usize>, cwd: &Path) -> Self {
|
||||
Self {
|
||||
collector: MarkdownStreamCollector::new(width, cwd),
|
||||
queued_lines: VecDeque::new(),
|
||||
has_seen_delta: false,
|
||||
}
|
||||
}
|
||||
/// Resets collector and queue state for the next stream lifecycle.
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.collector.clear();
|
||||
self.queued_lines.clear();
|
||||
self.has_seen_delta = false;
|
||||
}
|
||||
/// Drains one queued line from the front of the queue.
|
||||
pub(crate) fn step(&mut self) -> Vec<Line<'static>> {
|
||||
self.queued_lines
|
||||
.pop_front()
|
||||
.map(|queued| queued.line)
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
/// Drains up to `max_lines` queued lines from the front of the queue.
|
||||
///
|
||||
/// Callers that pass very large values still get bounded behavior because this method clamps to
|
||||
/// the currently available queue length.
|
||||
pub(crate) fn drain_n(&mut self, max_lines: usize) -> Vec<Line<'static>> {
|
||||
let end = max_lines.min(self.queued_lines.len());
|
||||
self.queued_lines
|
||||
.drain(..end)
|
||||
.map(|queued| queued.line)
|
||||
.collect()
|
||||
}
|
||||
/// Drains all queued lines from the front of the queue.
|
||||
pub(crate) fn drain_all(&mut self) -> Vec<Line<'static>> {
|
||||
self.queued_lines
|
||||
.drain(..)
|
||||
.map(|queued| queued.line)
|
||||
.collect()
|
||||
}
|
||||
/// Returns whether no lines are queued for commit.
|
||||
pub(crate) fn is_idle(&self) -> bool {
|
||||
self.queued_lines.is_empty()
|
||||
}
|
||||
/// Returns the current queue depth.
|
||||
pub(crate) fn queued_len(&self) -> usize {
|
||||
self.queued_lines.len()
|
||||
}
|
||||
/// Returns the age of the oldest queued line.
|
||||
pub(crate) fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
|
||||
self.queued_lines
|
||||
.front()
|
||||
.map(|queued| now.saturating_duration_since(queued.enqueued_at))
|
||||
}
|
||||
/// Appends committed lines to the queue with a shared enqueue timestamp.
|
||||
pub(crate) fn enqueue(&mut self, lines: Vec<Line<'static>>) {
|
||||
let now = Instant::now();
|
||||
self.queued_lines
|
||||
.extend(lines.into_iter().map(|line| QueuedLine {
|
||||
line,
|
||||
enqueued_at: now,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn test_cwd() -> PathBuf {
|
||||
// These tests only need a stable absolute cwd; using temp_dir() avoids baking Unix- or
|
||||
// Windows-specific root semantics into the fixtures.
|
||||
std::env::temp_dir()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drain_n_clamps_to_available_lines() {
|
||||
let mut state = StreamState::new(None, &test_cwd());
|
||||
state.enqueue(vec![Line::from("one")]);
|
||||
|
||||
let drained = state.drain_n(8);
|
||||
assert_eq!(drained, vec![Line::from("one")]);
|
||||
assert!(state.is_idle());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user