Compare commits

...

4 Commits

Author SHA1 Message Date
Ee Durbin
2a99bb5a55 Merge updated core queue pause branch into TUI 2026-05-14 07:41:22 -07:00
Ee Durbin
236fa42600 TUI: refine paused queued-send resume flow 2026-05-14 06:32:23 -07:00
Ee Durbin
8ac22f06eb TUI: pause queued sends after usage limits
Co-authored-by: Codex <noreply@openai.com>
2026-05-12 07:36:28 -07:00
Ee Durbin
5880fd7913 Core: pause pending steers after usage limits
Co-authored-by: Codex <noreply@openai.com>
2026-05-11 16:51:33 -07:00
20 changed files with 623 additions and 17 deletions

View File

@@ -1523,6 +1523,9 @@ impl App {
AppEvent::UpdateRateLimitSwitchPromptHidden(hidden) => {
self.chat_widget.set_rate_limit_switch_prompt_hidden(hidden);
}
AppEvent::ResumeQueuedSends => {
self.chat_widget.resume_queued_sends();
}
AppEvent::UpdatePlanModeReasoningEffort(effort) => {
self.config.plan_mode_reasoning_effort = effort;
self.chat_widget.set_plan_mode_reasoning_effort(effort);

View File

@@ -772,6 +772,9 @@ pub(crate) enum AppEvent {
/// Update whether the rate limit switch prompt has been acknowledged for the session.
UpdateRateLimitSwitchPromptHidden(bool),
/// Resume queued follow-up sends after the user explicitly confirms intent.
ResumeQueuedSends,
/// Update the Plan-mode-specific reasoning effort in memory.
UpdatePlanModeReasoningEffort(Option<ReasoningEffort>),

View File

@@ -1154,10 +1154,13 @@ impl BottomPane {
queued: Vec<String>,
pending_steers: Vec<String>,
rejected_steers: Vec<String>,
queued_sends_paused_after_usage_limit: bool,
) {
self.pending_input_preview.pending_steers = pending_steers;
self.pending_input_preview.rejected_steers = rejected_steers;
self.pending_input_preview.queued_messages = queued;
self.pending_input_preview
.queued_sends_paused_after_usage_limit = queued_sends_paused_after_usage_limit;
self.request_redraw();
}
@@ -2402,6 +2405,7 @@ mod tests {
vec!["Queued follow-up question".to_string()],
Vec::new(),
Vec::new(),
/*queued_sends_paused_after_usage_limit*/ false,
);
let width = 48;
@@ -2433,6 +2437,7 @@ mod tests {
vec!["Queued follow-up question".to_string()],
Vec::new(),
Vec::new(),
/*queued_sends_paused_after_usage_limit*/ false,
);
pane.hide_status_indicator();
@@ -2465,6 +2470,7 @@ mod tests {
vec!["Queued follow-up question".to_string()],
Vec::new(),
Vec::new(),
/*queued_sends_paused_after_usage_limit*/ false,
);
let width = 48;

View File

@@ -24,6 +24,7 @@ pub(crate) struct PendingInputPreview {
pub pending_steers: Vec<String>,
pub rejected_steers: Vec<String>,
pub queued_messages: Vec<String>,
pub queued_sends_paused_after_usage_limit: bool,
/// Key combination rendered in the hint line. Defaults to Alt+Up but may
/// be overridden for terminals where that chord is unavailable.
edit_binding: Option<key_hint::KeyBinding>,
@@ -37,6 +38,7 @@ impl PendingInputPreview {
pending_steers: Vec::new(),
rejected_steers: Vec::new(),
queued_messages: Vec::new(),
queued_sends_paused_after_usage_limit: false,
edit_binding: Some(key_hint::alt(KeyCode::Up)),
}
}
@@ -80,7 +82,26 @@ impl PendingInputPreview {
let mut lines = vec![];
if self.queued_sends_paused_after_usage_limit {
Self::push_section_header(
&mut lines,
width,
Line::from("Queued sends paused after usage limit".cyan().bold()),
);
lines.push(Line::from(vec![
" Empty composer: press ".into(),
key_hint::plain(KeyCode::Enter)
.display_label()
.cyan()
.bold(),
" to review queue".into(),
]));
}
if !self.pending_steers.is_empty() {
if !lines.is_empty() {
lines.push(Line::from(""));
}
Self::push_section_header(
&mut lines,
width,
@@ -350,6 +371,35 @@ mod tests {
);
}
#[test]
fn render_paused_after_usage_limit() {
let mut queue = PendingInputPreview::new();
queue.queued_messages.push("Try again later".to_string());
queue.queued_sends_paused_after_usage_limit = true;
let width = 48;
let height = queue.desired_height(width);
let mut buf = Buffer::empty(Rect::new(0, 0, width, height));
queue.render(Rect::new(0, 0, width, height), &mut buf);
assert_snapshot!("render_paused_after_usage_limit", format!("{buf:?}"));
}
#[test]
fn render_paused_steering_after_usage_limit() {
let mut queue = PendingInputPreview::new();
queue
.rejected_steers
.push("Check the final command output.".to_string());
queue.queued_sends_paused_after_usage_limit = true;
let width = 48;
let height = queue.desired_height(width);
let mut buf = Buffer::empty(Rect::new(0, 0, width, height));
queue.render(Rect::new(0, 0, width, height), &mut buf);
assert_snapshot!(
"render_paused_steering_after_usage_limit",
format!("{buf:?}")
);
}
#[test]
fn render_multiline_pending_steer_uses_single_prefix_and_truncates() {
let mut queue = PendingInputPreview::new();

View File

@@ -0,0 +1,29 @@
---
source: tui/src/bottom_pane/pending_input_preview.rs
expression: "format!(\"{buf:?}\")"
---
Buffer {
area: Rect { x: 0, y: 0, width: 48, height: 6 },
content: [
"• Queued sends paused after usage limit ",
" Empty composer: press enter to review queue ",
" ",
"• Queued follow-up inputs ",
" ↳ Try again later ",
" ⌥ + ↑ edit last queued message ",
],
styles: [
x: 0, y: 0, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 2, y: 0, fg: Cyan, bg: Reset, underline: Reset, modifier: BOLD,
x: 39, y: 0, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 24, y: 1, fg: Cyan, bg: Reset, underline: Reset, modifier: BOLD,
x: 29, y: 1, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 0, y: 3, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 2, y: 3, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 0, y: 4, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 4, y: 4, fg: Reset, bg: Reset, underline: Reset, modifier: DIM | ITALIC,
x: 19, y: 4, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 0, y: 5, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 34, y: 5, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
]
}

View File

@@ -0,0 +1,25 @@
---
source: tui/src/bottom_pane/pending_input_preview.rs
expression: "format!(\"{buf:?}\")"
---
Buffer {
area: Rect { x: 0, y: 0, width: 48, height: 5 },
content: [
"• Queued sends paused after usage limit ",
" Empty composer: press enter to review queue ",
" ",
"• Messages to be submitted at end of turn ",
" ↳ Check the final command output. ",
],
styles: [
x: 0, y: 0, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 2, y: 0, fg: Cyan, bg: Reset, underline: Reset, modifier: BOLD,
x: 39, y: 0, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 24, y: 1, fg: Cyan, bg: Reset, underline: Reset, modifier: BOLD,
x: 29, y: 1, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 0, y: 3, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 2, y: 3, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
x: 0, y: 4, fg: Reset, bg: Reset, underline: Reset, modifier: DIM,
x: 35, y: 4, fg: Reset, bg: Reset, underline: Reset, modifier: NONE,
]
}

View File

@@ -369,6 +369,7 @@ use self::notifications::Notification;
mod permission_popups;
mod protocol;
mod protocol_requests;
mod queued_sends;
mod rate_limits;
use self::rate_limits::RateLimitErrorKind;
use self::rate_limits::RateLimitSwitchPromptState;

View File

@@ -77,7 +77,10 @@ impl ChatWidget {
user_message: UserMessage,
action: QueuedInputAction,
) {
if !self.is_session_configured() || self.is_user_turn_pending_or_running() {
if self.input_queue.queued_sends_paused_after_usage_limit
|| !self.is_session_configured()
|| self.is_user_turn_pending_or_running()
{
self.input_queue
.queued_user_messages
.push_back(QueuedUserMessage::new(user_message, action));
@@ -92,7 +95,9 @@ impl ChatWidget {
/// If idle and there are queued inputs, submit exactly one to start the next turn.
pub(crate) fn maybe_send_next_queued_input(&mut self) -> bool {
if self.input_queue.suppress_queue_autosend {
if self.input_queue.suppress_queue_autosend
|| self.input_queue.queued_sends_paused_after_usage_limit
{
return false;
}
if self.is_user_turn_pending_or_running() {
@@ -147,12 +152,20 @@ impl ChatWidget {
/// Rebuild and update the bottom-pane pending-input preview.
pub(super) fn refresh_pending_input_preview(&mut self) {
let queued_sends_were_paused = self.input_queue.queued_sends_paused_after_usage_limit;
if !self.has_queued_follow_up_messages() {
self.input_queue.queued_sends_paused_after_usage_limit = false;
}
let preview = self.input_queue.preview();
self.bottom_pane.set_pending_input_preview(
preview.queued_messages,
preview.pending_steers,
preview.rejected_steers,
self.input_queue.queued_sends_paused_after_usage_limit,
);
if queued_sends_were_paused && !self.input_queue.queued_sends_paused_after_usage_limit {
self.maybe_show_pending_rate_limit_prompt();
}
}
pub(crate) fn submit_user_message_with_mode(

View File

@@ -38,6 +38,9 @@ pub(super) struct InputQueueState {
pub(super) rejected_steer_history_records: VecDeque<UserMessageHistoryRecord>,
/// Steers already submitted to core but not yet committed into history.
pub(super) pending_steers: VecDeque<PendingSteer>,
/// After a limit error, queued follow-up inputs must not resume without an
/// explicit user confirmation.
pub(super) queued_sends_paused_after_usage_limit: bool,
/// When set, the next interrupt should resubmit all pending steers as one
/// fresh user turn instead of restoring them into the composer.
pub(super) submit_pending_steers_after_interrupt: bool,
@@ -56,6 +59,7 @@ impl InputQueueState {
self.rejected_steers_queue.clear();
self.rejected_steer_history_records.clear();
self.pending_steers.clear();
self.queued_sends_paused_after_usage_limit = false;
self.submit_pending_steers_after_interrupt = false;
}
@@ -139,6 +143,7 @@ mod tests {
.rejected_steers_queue
.push_back(UserMessage::from("rejected"));
state.user_turn_pending_start = true;
state.queued_sends_paused_after_usage_limit = true;
state.submit_pending_steers_after_interrupt = true;
state.clear();
@@ -149,6 +154,7 @@ mod tests {
assert!(state.rejected_steers_queue.is_empty());
assert!(state.rejected_steer_history_records.is_empty());
assert!(state.pending_steers.is_empty());
assert!(!state.queued_sends_paused_after_usage_limit);
assert!(!state.submit_pending_steers_after_interrupt);
}
}

View File

@@ -271,6 +271,9 @@ impl ChatWidget {
.input_queue
.queued_user_message_history_records
.clone(),
queued_sends_paused_after_usage_limit: self
.input_queue
.queued_sends_paused_after_usage_limit,
user_turn_pending_start: self.input_queue.user_turn_pending_start,
current_collaboration_mode: self.current_collaboration_mode.clone(),
active_collaboration_mask: self.active_collaboration_mask.clone(),
@@ -346,6 +349,8 @@ impl ChatWidget {
self.input_queue.queued_user_messages = input_state.queued_user_messages;
self.input_queue.queued_user_message_history_records =
input_state.queued_user_message_history_records;
self.input_queue.queued_sends_paused_after_usage_limit =
input_state.queued_sends_paused_after_usage_limit;
self.input_queue.queued_user_message_history_records.resize(
self.input_queue.queued_user_messages.len(),
UserMessageHistoryRecord::UserMessageText,

View File

@@ -99,6 +99,14 @@ impl ChatWidget {
_ => {}
}
if key_event.kind == KeyEventKind::Press
&& matches!(key_event.code, KeyCode::Enter)
&& self.should_prompt_to_resume_queued_sends()
{
self.show_resume_queued_sends_prompt();
return;
}
if key_event.kind == KeyEventKind::Press
&& self.chat_keymap.edit_queued_message.is_pressed(key_event)
&& self.has_queued_follow_up_messages()

View File

@@ -0,0 +1,59 @@
//! Confirmation flow for queued follow-up sends paused by usage limits.
use super::*;
impl ChatWidget {
pub(super) fn pause_queued_sends_after_limit_error(&mut self) {
if self.has_queued_follow_up_messages() {
self.input_queue.queued_sends_paused_after_usage_limit = true;
self.refresh_pending_input_preview();
}
}
pub(super) fn should_prompt_to_resume_queued_sends(&self) -> bool {
self.input_queue.queued_sends_paused_after_usage_limit
&& self.has_queued_follow_up_messages()
&& !self.is_user_turn_pending_or_running()
&& self.bottom_pane.composer_is_empty()
&& self.bottom_pane.no_modal_or_popup_active()
}
pub(super) fn show_resume_queued_sends_prompt(&mut self) {
self.show_selection_view(SelectionViewParams {
title: Some("Resume queued sends?".to_string()),
subtitle: Some(
"Queued inputs were paused after a usage limit was reached.".to_string(),
),
footer_hint: Some(standard_popup_hint_line()),
initial_selected_idx: Some(0),
items: vec![
SelectionItem {
name: "Keep paused".to_string(),
description: Some(
"Leave queued sends paused until you review them later.".to_string(),
),
dismiss_on_select: true,
..Default::default()
},
SelectionItem {
name: "Resume queued sends".to_string(),
description: Some("Continue sending queued inputs.".to_string()),
actions: vec![Box::new(|tx| {
tx.send(AppEvent::ResumeQueuedSends);
})],
dismiss_on_select: true,
..Default::default()
},
],
..Default::default()
});
}
pub(crate) fn resume_queued_sends(&mut self) {
self.input_queue.queued_sends_paused_after_usage_limit = false;
let resumed_queue = self.maybe_send_next_queued_input();
if !resumed_queue && !self.has_queued_follow_up_messages() {
self.maybe_show_pending_rate_limit_prompt();
}
}
}

View File

@@ -257,6 +257,9 @@ impl ChatWidget {
}
pub(super) fn maybe_show_pending_rate_limit_prompt(&mut self) {
if self.input_queue.queued_sends_paused_after_usage_limit {
return;
}
if self.rate_limit_switch_prompt_hidden() {
self.rate_limit_switch_prompt = RateLimitSwitchPromptState::Idle;
return;

View File

@@ -0,0 +1,11 @@
---
source: tui/src/chatwidget/tests/status_and_layout.rs
expression: popup
---
Resume queued sends?
Queued inputs were paused after a usage limit was reached.
1. Keep paused Leave queued sends paused until you review them later.
2. Resume queued sends Continue sending queued inputs.
Press enter to confirm or esc to go back

View File

@@ -932,6 +932,7 @@ async fn restore_thread_input_state_syncs_sleep_inhibitor_state() {
rejected_steer_history_records: VecDeque::new(),
queued_user_messages: VecDeque::new(),
queued_user_message_history_records: VecDeque::new(),
queued_sends_paused_after_usage_limit: false,
user_turn_pending_start: false,
current_collaboration_mode: chat.current_collaboration_mode.clone(),
active_collaboration_mask: chat.active_collaboration_mask.clone(),

View File

@@ -358,6 +358,7 @@ async fn restore_thread_input_state_restores_pending_steers_without_downgrading_
rejected_steer_history_records: VecDeque::new(),
queued_user_messages,
queued_user_message_history_records: VecDeque::new(),
queued_sends_paused_after_usage_limit: true,
user_turn_pending_start: false,
current_collaboration_mode: chat.current_collaboration_mode.clone(),
active_collaboration_mask: chat.active_collaboration_mask.clone(),
@@ -383,6 +384,7 @@ async fn restore_thread_input_state_restores_pending_steers_without_downgrading_
chat.input_queue.pending_steers.front().unwrap().compare_key,
expected_compare_key
);
assert!(chat.input_queue.queued_sends_paused_after_usage_limit);
}
#[tokio::test]

View File

@@ -786,6 +786,76 @@ async fn rate_limit_switch_prompt_defers_until_task_complete() {
));
}
#[tokio::test]
async fn rate_limit_switch_prompt_stays_hidden_while_queued_sends_are_paused_after_usage_limit() {
let (mut chat, _, _) = make_chatwidget_manual(Some("gpt-5")).await;
chat.has_chatgpt_account = true;
chat.rate_limit_switch_prompt = RateLimitSwitchPromptState::Pending;
chat.input_queue
.queued_user_messages
.push_back(UserMessage::from("queued follow-up").into());
chat.input_queue
.queued_user_message_history_records
.push_back(UserMessageHistoryRecord::UserMessageText);
chat.input_queue.queued_sends_paused_after_usage_limit = true;
chat.maybe_show_pending_rate_limit_prompt();
assert!(matches!(
chat.rate_limit_switch_prompt,
RateLimitSwitchPromptState::Pending
));
assert!(chat.bottom_pane.no_modal_or_popup_active());
}
#[tokio::test]
async fn rate_limit_switch_prompt_shows_when_paused_queue_is_cleared() {
let (mut chat, _, _) = make_chatwidget_manual(Some("gpt-5")).await;
chat.has_chatgpt_account = true;
chat.rate_limit_switch_prompt = RateLimitSwitchPromptState::Pending;
chat.input_queue
.queued_user_messages
.push_back(UserMessage::from("queued follow-up").into());
chat.input_queue
.queued_user_message_history_records
.push_back(UserMessageHistoryRecord::UserMessageText);
chat.input_queue.queued_sends_paused_after_usage_limit = true;
chat.input_queue.queued_user_messages.clear();
chat.input_queue.queued_user_message_history_records.clear();
chat.refresh_pending_input_preview();
assert!(matches!(
chat.rate_limit_switch_prompt,
RateLimitSwitchPromptState::Shown
));
assert!(!chat.bottom_pane.no_modal_or_popup_active());
}
#[tokio::test]
async fn resuming_paused_queue_keeps_rate_limit_switch_prompt_deferred() {
let (mut chat, _, mut op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
chat.thread_id = Some(ThreadId::new());
chat.has_chatgpt_account = true;
chat.rate_limit_switch_prompt = RateLimitSwitchPromptState::Pending;
chat.input_queue
.queued_user_messages
.push_back(UserMessage::from("queued follow-up").into());
chat.input_queue
.queued_user_message_history_records
.push_back(UserMessageHistoryRecord::UserMessageText);
chat.input_queue.queued_sends_paused_after_usage_limit = true;
chat.resume_queued_sends();
assert!(matches!(
chat.rate_limit_switch_prompt,
RateLimitSwitchPromptState::Pending
));
assert!(chat.bottom_pane.no_modal_or_popup_active());
assert!(matches!(next_submit_op(&mut op_rx), Op::UserTurn { .. }));
}
#[tokio::test]
async fn rate_limit_switch_prompt_popup_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await;
@@ -887,6 +957,67 @@ async fn usage_limit_error_remaps_stale_member_credits_state_to_usage_limit_prom
assert_eq!(event, AddCreditsNudgeCreditType::UsageLimit);
}
#[tokio::test]
async fn usage_limit_error_pauses_queued_follow_ups_until_confirmed() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.queue_user_message("first queued follow-up".into());
chat.queue_user_message("second queued follow-up".into());
chat.on_rate_limit_error(
RateLimitErrorKind::UsageLimit,
"Usage limit reached.".to_string(),
);
assert!(chat.input_queue.queued_sends_paused_after_usage_limit);
assert_eq!(
chat.queued_user_message_texts(),
vec![
"first queued follow-up".to_string(),
"second queued follow-up".to_string(),
]
);
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
chat.resume_queued_sends();
let items = match next_submit_op(&mut op_rx) {
Op::UserTurn { items, .. } => items,
other => panic!("expected Op::UserTurn, got {other:?}"),
};
assert_eq!(
items,
vec![UserInput::Text {
text: "first queued follow-up".to_string(),
text_elements: Vec::new(),
}]
);
assert_eq!(
chat.queued_user_message_texts(),
vec!["second queued follow-up".to_string()]
);
}
#[tokio::test]
async fn paused_queued_sends_require_confirmation_before_resuming() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.input_queue
.queued_user_messages
.push_back(UserMessage::from("queued follow-up").into());
chat.input_queue
.queued_user_message_history_records
.push_back(UserMessageHistoryRecord::UserMessageText);
chat.input_queue.queued_sends_paused_after_usage_limit = true;
chat.refresh_pending_input_preview();
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let popup = render_bottom_popup(&chat, /*width*/ 100);
assert_chatwidget_snapshot!("resume_queued_sends_prompt", popup);
}
#[tokio::test]
async fn workspace_owner_limit_states_do_not_prompt_for_owner_nudge() {
for (limit_type, error_kind) in [
@@ -966,6 +1097,69 @@ async fn missing_rate_limit_reached_type_does_not_prompt_or_refresh() {
assert_no_owner_nudge_or_rate_limit_refresh(&mut rx);
}
#[tokio::test]
async fn usage_limit_error_pauses_pending_steers_until_confirmed() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.input_queue
.pending_steers
.push_back(pending_steer("keep this from auto-sending"));
chat.refresh_pending_input_preview();
chat.on_rate_limit_error(
RateLimitErrorKind::UsageLimit,
"Usage limit reached.".to_string(),
);
assert!(chat.input_queue.pending_steers.is_empty());
assert_eq!(chat.input_queue.rejected_steers_queue.len(), 1);
assert_eq!(
chat.input_queue.rejected_steers_queue.front().unwrap().text,
"keep this from auto-sending"
);
assert!(chat.input_queue.queued_sends_paused_after_usage_limit);
chat.finalize_turn();
assert!(!chat.maybe_send_next_queued_input());
assert_no_submit_op(&mut op_rx);
chat.resume_queued_sends();
match next_submit_op(&mut op_rx) {
Op::UserTurn { items, .. } => {
assert_eq!(
items,
vec![UserInput::Text {
text: "keep this from auto-sending".to_string(),
text_elements: Vec::new(),
}]
);
}
other => panic!("expected resumed steer as Op::UserTurn, got {other:?}"),
}
}
#[tokio::test]
async fn generic_rate_limit_error_does_not_pause_pending_steers() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.input_queue
.pending_steers
.push_back(pending_steer("keep this in flight"));
chat.refresh_pending_input_preview();
chat.on_rate_limit_error(
RateLimitErrorKind::Generic,
"Rate limit reached.".to_string(),
);
assert_eq!(chat.input_queue.pending_steers.len(), 1);
assert!(chat.input_queue.rejected_steers_queue.is_empty());
assert!(!chat.input_queue.queued_sends_paused_after_usage_limit);
}
#[tokio::test]
async fn workspace_owner_nudge_default_no_dismisses_without_sending() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;

View File

@@ -287,6 +287,17 @@ impl ChatWidget {
) && self.enqueue_rejected_steer()
}
fn defer_pending_steers_after_usage_limit(&mut self) {
while let Some(pending_steer) = self.input_queue.pending_steers.pop_front() {
self.input_queue
.rejected_steers_queue
.push_back(pending_steer.user_message);
self.input_queue
.rejected_steer_history_records
.push_back(pending_steer.history_record);
}
}
/// Finalize any active exec as failed and stop/clear agent-turn UI state.
///
/// This does not clear MCP startup tracking, because MCP startup can overlap with turn cleanup
@@ -360,6 +371,11 @@ impl ChatWidget {
}
pub(super) fn on_rate_limit_error(&mut self, error_kind: RateLimitErrorKind, message: String) {
if matches!(error_kind, RateLimitErrorKind::UsageLimit) {
self.defer_pending_steers_after_usage_limit();
self.pause_queued_sends_after_limit_error();
}
let rate_limit_reached_type = self.codex_rate_limit_reached_type.map(|kind| {
if matches!(error_kind, RateLimitErrorKind::UsageLimit) {
match kind {

View File

@@ -125,6 +125,7 @@ pub(crate) struct ThreadInputState {
pub(super) rejected_steer_history_records: VecDeque<UserMessageHistoryRecord>,
pub(super) queued_user_messages: VecDeque<QueuedUserMessage>,
pub(super) queued_user_message_history_records: VecDeque<UserMessageHistoryRecord>,
pub(super) queued_sends_paused_after_usage_limit: bool,
pub(super) user_turn_pending_start: bool,
pub(super) current_collaboration_mode: CollaborationMode,
pub(super) active_collaboration_mask: Option<CollaborationModeMask>,

View File

@@ -5,6 +5,7 @@ import asyncio
import datetime as dt
import json
import sys
from dataclasses import dataclass
from typing import Any
import websockets
@@ -19,6 +20,8 @@ FUNCTION_NAME = "shell_command"
FUNCTION_ARGS_JSON = json.dumps({"command": "echo websocket"}, separators=(",", ":"))
ASSISTANT_TEXT = "done"
SCENARIO_NORMAL = "normal"
SCENARIO_USAGE_LIMIT = "usage-limit"
def _utc_iso() -> str:
@@ -66,6 +69,51 @@ def _event_assistant_message(message_id: str, text: str) -> dict[str, Any]:
}
def _event_usage_limit_error() -> dict[str, Any]:
return {
"type": "error",
"status": 429,
"error": {
"type": "usage_limit_reached",
"message": "The usage limit has been reached",
"plan_type": "pro",
"resets_at": 1704067242,
"resets_in_seconds": 1234,
},
"headers": {
"x-codex-primary-used-percent": "100.0",
"x-codex-secondary-used-percent": "87.5",
"x-codex-primary-over-secondary-limit-percent": "95.0",
"x-codex-primary-window-minutes": "15",
"x-codex-secondary-window-minutes": "60",
},
}
def _event_approaching_rate_limits() -> dict[str, Any]:
return {
"type": "codex.rate_limits",
"plan_type": "plus",
"rate_limits": {
"allowed": True,
"limit_reached": False,
"primary": {
"used_percent": 95,
"window_minutes": 15,
"reset_at": 1704067242,
},
"secondary": {
"used_percent": 87.5,
"window_minutes": 60,
"reset_at": 1704067242,
},
},
"code_review_rate_limits": None,
"credits": None,
"promo": None,
}
def _dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
@@ -75,9 +123,21 @@ def _print_request(prefix: str, payload: Any) -> None:
sys.stdout.write(f"{prefix} {_utc_iso()}\n{pretty}\n")
sys.stdout.flush()
@dataclass
class ScenarioState:
scenario: str
usage_limit_delay_seconds: float
usage_limit_approach_first: bool
approaching_rate_limits_emitted: bool = False
usage_limit_emitted: bool = False
success_count: int = 0
async def _handle_connection(
websocket: Any,
*,
scenario_state: ScenarioState,
expected_path: str = PATH,
) -> None:
# websockets v15 exposes the request path here.
@@ -109,28 +169,77 @@ async def _handle_connection(
sys.stdout.write(f"[conn] {_utc_iso()} send {_dump_json(ev)}\n")
await websocket.send(_dump_json(ev))
# Request 1: provoke a function call (mirrors `codex-rs/core/tests/suite/agent_websocket.rs`).
await recv_json("req1")
await send_event(_event_response_created("resp-1"))
await send_event(_event_function_call(CALL_ID, FUNCTION_NAME, FUNCTION_ARGS_JSON))
await send_event(_event_response_done())
if scenario_state.scenario == SCENARIO_NORMAL:
# Request 1: provoke a function call (mirrors `codex-rs/core/tests/suite/agent_websocket.rs`).
await recv_json("req1")
await send_event(_event_response_created("resp-1"))
await send_event(_event_function_call(CALL_ID, FUNCTION_NAME, FUNCTION_ARGS_JSON))
await send_event(_event_response_done())
# Request 2: expect appended tool output; send final assistant message.
await recv_json("req2")
await send_event(_event_response_created("resp-2"))
await send_event(_event_assistant_message("msg-1", ASSISTANT_TEXT))
await send_event(_event_response_completed("resp-2"))
# Request 2: expect appended tool output; send final assistant message.
await recv_json("req2")
await send_event(_event_response_created("resp-2"))
await send_event(_event_assistant_message("msg-1", ASSISTANT_TEXT))
await send_event(_event_response_completed("resp-2"))
else:
request_count = 0
while True:
request_count += 1
payload = await recv_json(f"req{request_count}")
if payload.get("generate") is False:
await send_event(_event_response_created(f"resp-warmup-{request_count}"))
await send_event(_event_response_completed(f"resp-warmup-{request_count}"))
continue
if (
scenario_state.usage_limit_approach_first
and not scenario_state.approaching_rate_limits_emitted
):
scenario_state.approaching_rate_limits_emitted = True
await send_event(_event_approaching_rate_limits())
await send_event(_event_response_created("resp-approaching-limits"))
await send_event(_event_assistant_message("msg-approaching-limits", ASSISTANT_TEXT))
await send_event(_event_response_completed("resp-approaching-limits"))
continue
if not scenario_state.usage_limit_emitted:
scenario_state.usage_limit_emitted = True
sys.stdout.write(
f"[conn] {_utc_iso()} waiting {scenario_state.usage_limit_delay_seconds:.1f}s before usage limit\n"
)
sys.stdout.flush()
await asyncio.sleep(scenario_state.usage_limit_delay_seconds)
await send_event(_event_usage_limit_error())
continue
scenario_state.success_count += 1
response_id = f"resp-after-limit-{scenario_state.success_count}"
message_id = f"msg-after-limit-{scenario_state.success_count}"
await send_event(_event_response_created(response_id))
await send_event(_event_assistant_message(message_id, ASSISTANT_TEXT))
await send_event(_event_response_completed(response_id))
sys.stdout.write(f"[conn] {_utc_iso()} closing\n")
sys.stdout.flush()
await websocket.close()
async def _serve(port: int) -> int:
async def _serve(
port: int,
scenario: str,
usage_limit_delay_seconds: float,
usage_limit_approach_first: bool,
) -> int:
scenario_state = ScenarioState(
scenario=scenario,
usage_limit_delay_seconds=usage_limit_delay_seconds,
usage_limit_approach_first=usage_limit_approach_first,
)
async def handler(ws: Any) -> None:
try:
await _handle_connection(ws, expected_path=PATH)
except websockets.exceptions.ConnectionClosedOK:
await _handle_connection(ws, scenario_state=scenario_state, expected_path=PATH)
except websockets.exceptions.ConnectionClosed:
return
try:
@@ -142,14 +251,24 @@ async def _serve(port: int) -> int:
ws_uri = f"ws://{HOST}:{bound_port}"
sys.stdout.write("[server] mock Responses WebSocket server running\n")
if scenario == SCENARIO_USAGE_LIMIT:
if usage_limit_approach_first:
sys.stdout.write(
f"[server] scenario={SCENARIO_USAGE_LIMIT} first real turn approaches limits; next real turn errors after {usage_limit_delay_seconds:.1f}s\n"
)
else:
sys.stdout.write(
f"[server] scenario={SCENARIO_USAGE_LIMIT} first real turn errors after {usage_limit_delay_seconds:.1f}s\n"
)
sys.stdout.write(f"""Add this to your config.toml:
[model_providers.localapi_ws]
base_url = "{ws_uri}/v1"
name = "localapi_ws"
wire_api = "responses_websocket"
wire_api = "responses"
env_key = "OPENAI_API_KEY_STAGING"
supports_websockets = true
[profiles.localapi_ws]
model = "gpt-5.2"
@@ -159,6 +278,24 @@ model_reasoning_effort = "medium"
start codex with `codex --profile localapi_ws`
""")
if scenario == SCENARIO_USAGE_LIMIT:
if usage_limit_approach_first:
sys.stdout.write(
"""To exercise the approaching-limit then paused-queue flow:
1. Submit one prompt and dismiss the approaching-rate-limits prompt.
2. Submit a second prompt.
3. Before the delayed usage-limit error arrives, press Enter for a steer and Tab for a queued follow-up.
4. After the error, confirm both queued sections stay paused until you resume them.
"""
)
else:
sys.stdout.write(
"""To exercise the paused queue state:
1. Submit one prompt.
2. Before the delayed usage-limit error arrives, type follow-ups and press Tab to queue them.
3. After the error, confirm queued sends stay paused until you press Enter and choose `Resume queued sends`.
"""
)
sys.stdout.flush()
try:
@@ -183,10 +320,43 @@ def main() -> int:
default=DEFAULT_PORT,
help=f"Bind port (default: {DEFAULT_PORT}; use 0 for random free port).",
)
parser.add_argument(
"--scenario",
choices=[SCENARIO_NORMAL, SCENARIO_USAGE_LIMIT],
default=SCENARIO_NORMAL,
help=(
"Behavior to emulate (default: normal).\n"
"Use `usage-limit` to make the first real turn hit a usage limit."
),
)
parser.add_argument(
"--usage-limit-delay-seconds",
type=float,
default=15.0,
help=(
"Delay before sending the usage-limit error in the `usage-limit` scenario "
"(default: 15.0)."
),
)
parser.add_argument(
"--usage-limit-approach-first",
action="store_true",
help=(
"In the `usage-limit` scenario, complete one successful near-limit turn before "
"the delayed hard-limit turn."
),
)
args = parser.parse_args()
try:
return asyncio.run(_serve(args.port))
return asyncio.run(
_serve(
args.port,
args.scenario,
args.usage_limit_delay_seconds,
args.usage_limit_approach_first,
)
)
except KeyboardInterrupt:
return 0