Compare commits

...

18 Commits

Author SHA1 Message Date
Eric Traut
302f30eb80 codex: queue user input during shell commands (#18290) 2026-04-17 12:53:49 -07:00
Eric Traut
c13f09c9ee codex: address PR review feedback (#18290) 2026-04-17 12:17:31 -07:00
Eric Traut
0ca2f52f44 Merge branch 'main' into etraut/tui-queue-user-cmd-followup 2026-04-17 11:57:19 -07:00
Eric Traut
67fd4bb22f codex: address PR review feedback (#18290) 2026-04-17 09:08:59 -07:00
Eric Traut
02fee5130e codex: address PR review feedback (#18290) 2026-04-17 08:48:03 -07:00
Eric Traut
d1542245b6 codex: address PR review feedback (#18290) 2026-04-17 02:31:25 -07:00
Eric Traut
501ddd5d87 codex: address PR review feedback (#18290) 2026-04-17 02:08:55 -07:00
Eric Traut
e925c8d801 codex: address PR review feedback (#18290) 2026-04-17 01:53:08 -07:00
Eric Traut
a582474d40 tui: simplify pending steer retry handling
Fixes #17954
2026-04-17 01:35:36 -07:00
Eric Traut
36ce90620a codex: address PR review feedback (#18290) 2026-04-17 01:23:49 -07:00
Eric Traut
9db9732877 codex: address PR review feedback (#18290) 2026-04-17 00:59:49 -07:00
Eric Traut
6ff6061177 codex: address PR review feedback (#18290) 2026-04-17 00:07:55 -07:00
Eric Traut
5588ecfec5 codex: address PR review feedback (#18290) 2026-04-16 23:47:13 -07:00
Eric Traut
2e64b866a9 Merge branch 'main' into etraut/tui-queue-user-cmd-followup 2026-04-16 23:41:22 -07:00
Eric Traut
4791ac92ce codex: address PR review feedback (#18290) 2026-04-16 23:26:59 -07:00
Eric Traut
62a3a0676f codex: address PR review feedback (#18290) 2026-04-16 23:11:46 -07:00
Eric Traut
a4c8483955 codex: address PR review feedback (#18290) 2026-04-16 22:52:17 -07:00
Eric Traut
bda2d87fc7 tui: queue input during standalone shell commands
Fixes #17954
2026-04-16 22:35:34 -07:00
6 changed files with 536 additions and 98 deletions

View File

@@ -891,6 +891,9 @@ pub(crate) struct ChatWidget {
// The bottom pane shows these above queued drafts until core records the
// corresponding user message item.
pending_steers: VecDeque<PendingSteer>,
// Active turn captured with restored input state. Replayed terminal/user-message events can
// only mutate restored queues when they match this id.
restored_active_turn_id: Option<String>,
// When set, the next interrupt should resubmit all pending steers as one
// fresh user turn instead of restoring them into the composer.
submit_pending_steers_after_interrupt: bool,
@@ -1066,9 +1069,10 @@ impl ThreadComposerState {
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ThreadInputState {
composer: Option<ThreadComposerState>,
pending_steers: VecDeque<UserMessage>,
pending_steers: VecDeque<PendingSteer>,
rejected_steers_queue: VecDeque<UserMessage>,
queued_user_messages: VecDeque<UserMessage>,
active_turn_id: Option<String>,
current_collaboration_mode: CollaborationMode,
active_collaboration_mask: Option<CollaborationModeMask>,
task_running: bool,
@@ -1101,9 +1105,11 @@ impl From<&str> for UserMessage {
}
}
#[derive(Clone, Debug, PartialEq)]
struct PendingSteer {
user_message: UserMessage,
compare_key: PendingSteerCompareKey,
turn_id: Option<String>,
}
pub(crate) fn create_initial_user_message(
@@ -1980,6 +1986,7 @@ impl ChatWidget {
self.session_network_proxy = event.network_proxy.clone();
self.thread_id = Some(event.session_id);
self.last_turn_id = None;
self.restored_active_turn_id = None;
self.thread_name = event.thread_name.clone();
self.forked_from = event.forked_from_id;
self.current_rollout_path = event.rollout_path.clone();
@@ -2429,10 +2436,14 @@ impl ChatWidget {
self.unified_exec_wait_streak = None;
self.request_redraw();
let had_pending_steers = !self.pending_steers.is_empty();
let had_unacknowledged_pending_steers =
!from_replay && self.queue_unacknowledged_pending_steers();
self.refresh_pending_input_preview();
if !from_replay && !self.has_queued_follow_up_messages() && !had_pending_steers {
if !from_replay
&& !self.has_queued_follow_up_messages()
&& !had_unacknowledged_pending_steers
{
self.maybe_prompt_plan_implementation();
}
// Keep this flag for replayed completion events so a subsequent live TurnComplete can
@@ -2494,6 +2505,102 @@ impl ChatWidget {
!self.rejected_steers_queue.is_empty() || !self.queued_user_messages.is_empty()
}
fn queue_unacknowledged_pending_steers(&mut self) -> bool {
if self.pending_steers.is_empty() {
return false;
}
self.rejected_steers_queue.extend(
self.pending_steers
.drain(..)
.map(|pending| pending.user_message),
);
true
}
fn queue_unacknowledged_pending_steers_for_turn(&mut self, turn_id: &str) -> bool {
let mut queued = false;
let pending_steers = std::mem::take(&mut self.pending_steers);
for pending in pending_steers {
if pending
.turn_id
.as_deref()
.is_none_or(|pending_turn_id| pending_turn_id == turn_id)
{
self.rejected_steers_queue.push_back(pending.user_message);
queued = true;
} else {
self.pending_steers.push_back(pending);
}
}
queued
}
fn replayed_turn_matches_restored_active_turn(&self, turn_id: &str) -> bool {
self.restored_active_turn_id.as_deref() == Some(turn_id)
}
fn replayed_turn_is_stale_for_restored_input(&self, turn_id: &str) -> bool {
self.restored_active_turn_id
.as_deref()
.is_some_and(|active_turn_id| active_turn_id != turn_id)
}
fn clear_restored_active_turn_if_matches(&mut self, turn_id: &str) {
if self.replayed_turn_matches_restored_active_turn(turn_id) {
self.restored_active_turn_id = None;
}
}
fn restore_pending_messages_after_replayed_incomplete_turn(&mut self) {
self.finalize_turn();
if let Some(combined) = self.drain_pending_messages_for_restore() {
self.restore_user_message_to_composer(combined);
}
self.refresh_pending_input_preview();
self.request_redraw();
}
fn on_user_message_event_reconciling_pending_steer(
&mut self,
event: UserMessageEvent,
compare_key: PendingSteerCompareKey,
turn_id: Option<&str>,
) {
let rendered = Self::rendered_user_message_event_from_event(&event);
if self.pending_steers.front().is_some_and(|pending| {
pending.compare_key == compare_key
&& turn_id.is_none_or(|turn_id| {
pending
.turn_id
.as_deref()
.is_none_or(|pending_turn_id| pending_turn_id == turn_id)
})
}) {
if let Some(pending) = self.pending_steers.pop_front() {
self.refresh_pending_input_preview();
let pending_event = UserMessageEvent {
message: pending.user_message.text,
images: Some(pending.user_message.remote_image_urls),
local_images: pending
.user_message
.local_images
.into_iter()
.map(|image| image.path)
.collect(),
text_elements: pending.user_message.text_elements,
};
self.on_user_message_event(pending_event);
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered) {
tracing::warn!(
"pending steer matched compare key but queue was empty when rendering committed user message"
);
self.on_user_message_event(event);
}
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered) {
self.on_user_message_event(event);
}
}
fn pop_next_queued_user_message(&mut self) -> Option<UserMessage> {
if self.rejected_steers_queue.is_empty() {
self.queued_user_messages.pop_front()
@@ -2816,6 +2923,7 @@ impl ChatWidget {
fn on_server_overloaded_error(&mut self, message: String) {
self.submit_pending_steers_after_interrupt = false;
self.queue_unacknowledged_pending_steers();
self.finalize_turn();
let message = if message.trim().is_empty() {
@@ -2831,6 +2939,7 @@ impl ChatWidget {
fn on_error(&mut self, message: String) {
self.submit_pending_steers_after_interrupt = false;
self.queue_unacknowledged_pending_steers();
self.finalize_turn();
self.add_to_history(history_cell::new_error_event(message));
self.request_redraw();
@@ -3219,13 +3328,13 @@ impl ChatWidget {
};
Some(ThreadInputState {
composer: composer.has_content().then_some(composer),
pending_steers: self
.pending_steers
.iter()
.map(|pending| pending.user_message.clone())
.collect(),
pending_steers: self.pending_steers.clone(),
rejected_steers_queue: self.rejected_steers_queue.clone(),
queued_user_messages: self.queued_user_messages.clone(),
active_turn_id: self
.last_turn_id
.clone()
.filter(|_| self.agent_turn_running || self.bottom_pane.is_task_running()),
current_collaboration_mode: self.current_collaboration_mode.clone(),
active_collaboration_mask: self.active_collaboration_mask.clone(),
task_running: self.bottom_pane.is_task_running(),
@@ -3239,6 +3348,7 @@ impl ChatWidget {
self.current_collaboration_mode = input_state.current_collaboration_mode;
self.active_collaboration_mask = input_state.active_collaboration_mask;
self.agent_turn_running = input_state.agent_turn_running;
self.restored_active_turn_id = input_state.active_turn_id.clone();
self.update_collaboration_mode_indicator();
self.refresh_model_dependent_surfaces();
if let Some(composer) = input_state.composer {
@@ -3266,22 +3376,12 @@ impl ChatWidget {
);
self.bottom_pane.set_composer_pending_pastes(Vec::new());
}
self.pending_steers = input_state
.pending_steers
.into_iter()
.map(|user_message| PendingSteer {
compare_key: PendingSteerCompareKey {
message: user_message.text.clone(),
image_count: user_message.local_images.len()
+ user_message.remote_image_urls.len(),
},
user_message,
})
.collect();
self.pending_steers = input_state.pending_steers;
self.rejected_steers_queue = input_state.rejected_steers_queue;
self.queued_user_messages = input_state.queued_user_messages;
} else {
self.agent_turn_running = false;
self.restored_active_turn_id = None;
self.pending_steers.clear();
self.rejected_steers_queue.clear();
self.set_remote_image_urls(Vec::new());
@@ -4897,6 +4997,7 @@ impl ChatWidget {
queued_user_messages: VecDeque::new(),
rejected_steers_queue: VecDeque::new(),
pending_steers: VecDeque::new(),
restored_active_turn_id: None,
submit_pending_steers_after_interrupt: false,
queued_message_edit_binding,
show_welcome_banner: is_first_run,
@@ -5107,8 +5208,8 @@ impl ChatWidget {
{
return;
}
let should_submit_now =
self.is_session_configured() && !self.is_plan_streaming_in_tui();
let should_submit_now = self.is_session_configured()
&& !self.should_queue_submitted_user_message(&user_message.text);
if should_submit_now {
// Submitted is emitted when user submits.
// Reset any reasoning header only when we are actually submitting a turn.
@@ -5335,6 +5436,19 @@ impl ChatWidget {
}
}
fn should_queue_submitted_user_message(&self, text: &str) -> bool {
self.is_plan_streaming_in_tui()
|| (self.only_user_shell_commands_running() && !text.trim_start().starts_with('!'))
}
fn only_user_shell_commands_running(&self) -> bool {
!self.running_commands.is_empty()
&& self
.running_commands
.values()
.all(|command| command.source == ExecCommandSource::UserShell)
}
fn submit_user_message(&mut self, user_message: UserMessage) {
if !self.is_session_configured() {
tracing::warn!("cannot submit user message before session is configured; queueing");
@@ -5352,6 +5466,16 @@ impl ChatWidget {
if text.is_empty() && local_images.is_empty() && remote_image_urls.is_empty() {
return;
}
if self.should_queue_submitted_user_message(&text) {
self.queue_user_message(UserMessage {
text,
local_images,
remote_image_urls,
text_elements,
mention_bindings,
});
return;
}
if (!local_images.is_empty() || !remote_image_urls.is_empty())
&& !self.current_model_supports_images()
{
@@ -5532,6 +5656,7 @@ impl ChatWidget {
mention_bindings: mention_bindings.clone(),
},
compare_key: Self::pending_steer_compare_key_from_items(&items),
turn_id: self.last_turn_id.clone(),
});
let personality = self
.config
@@ -5663,6 +5788,9 @@ impl ChatWidget {
duration_ms,
} = turn;
if matches!(status, TurnStatus::InProgress) {
self.restored_active_turn_id
.get_or_insert_with(|| turn_id.clone());
self.last_turn_id = Some(turn_id.clone());
self.last_non_retry_error = None;
self.on_task_started();
}
@@ -5723,41 +5851,15 @@ impl ChatWidget {
else {
unreachable!("user message item should convert to a user message event");
};
if from_replay {
let compare_key = Self::pending_steer_compare_key_from_items(&user_message.content);
if from_replay && !self.replayed_turn_matches_restored_active_turn(&turn_id) {
self.on_user_message_event(event);
} else {
let rendered = Self::rendered_user_message_event_from_event(&event);
let compare_key =
Self::pending_steer_compare_key_from_items(&user_message.content);
if self
.pending_steers
.front()
.is_some_and(|pending| pending.compare_key == compare_key)
{
if let Some(pending) = self.pending_steers.pop_front() {
self.refresh_pending_input_preview();
let pending_event = UserMessageEvent {
message: pending.user_message.text,
images: Some(pending.user_message.remote_image_urls),
local_images: pending
.user_message
.local_images
.into_iter()
.map(|image| image.path)
.collect(),
text_elements: pending.user_message.text_elements,
};
self.on_user_message_event(pending_event);
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered)
{
tracing::warn!(
"pending steer matched compare key but queue was empty when rendering committed user message"
);
self.on_user_message_event(event);
}
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered) {
self.on_user_message_event(event);
}
self.on_user_message_event_reconciling_pending_steer(
event,
compare_key,
Some(&turn_id),
);
}
}
ThreadItem::AgentMessage {
@@ -6100,7 +6202,11 @@ impl ChatWidget {
}
}
ServerNotification::TurnStarted(notification) => {
self.last_turn_id = Some(notification.turn.id);
let turn_id = notification.turn.id;
if from_replay && self.replayed_turn_is_stale_for_restored_input(&turn_id) {
return;
}
self.last_turn_id = Some(turn_id);
self.last_non_retry_error = None;
if !matches!(replay_kind, Some(ReplayKind::ResumeInitialMessages)) {
self.on_task_started();
@@ -6179,6 +6285,28 @@ impl ChatWidget {
notification.error.additional_details,
);
}
} else if from_replay {
self.last_non_retry_error = None;
if self.replayed_turn_is_stale_for_restored_input(&notification.turn_id) {
return;
}
if matches!(
notification.error.codex_error_info,
Some(AppServerCodexErrorInfo::ActiveTurnNotSteerable { .. })
) {
if self.pending_steers.front().is_some_and(|pending| {
pending.turn_id.as_deref().is_none_or(|pending_turn_id| {
pending_turn_id == notification.turn_id
})
}) {
self.enqueue_rejected_steer();
}
} else {
self.queue_unacknowledged_pending_steers_for_turn(&notification.turn_id);
self.finalize_turn();
self.request_redraw();
self.clear_restored_active_turn_if_matches(&notification.turn_id);
}
} else {
self.last_non_retry_error = Some((
notification.turn_id.clone(),
@@ -6359,16 +6487,50 @@ impl ChatWidget {
notification: TurnCompletedNotification,
replay_kind: Option<ReplayKind>,
) {
let from_replay = replay_kind.is_some();
let turn_id = notification.turn.id.clone();
let replayed_turn_matches_restored_active_turn =
from_replay && self.replayed_turn_matches_restored_active_turn(&turn_id);
if from_replay && self.replayed_turn_is_stale_for_restored_input(&turn_id) {
return;
}
match notification.turn.status {
TurnStatus::Completed => {
self.last_non_retry_error = None;
self.on_task_complete(/*last_agent_message*/ None, replay_kind.is_some())
if replayed_turn_matches_restored_active_turn {
self.queue_unacknowledged_pending_steers_for_turn(&turn_id);
}
self.on_task_complete(/*last_agent_message*/ None, from_replay);
self.clear_restored_active_turn_if_matches(&turn_id);
}
TurnStatus::Interrupted => {
self.last_non_retry_error = None;
self.on_interrupted_turn(TurnAbortReason::Interrupted);
if from_replay {
if replayed_turn_matches_restored_active_turn {
self.restore_pending_messages_after_replayed_incomplete_turn();
self.clear_restored_active_turn_if_matches(&turn_id);
} else {
self.finalize_turn();
self.request_redraw();
}
} else {
self.on_interrupted_turn(TurnAbortReason::Interrupted);
}
}
TurnStatus::Failed => {
if from_replay {
if replayed_turn_matches_restored_active_turn {
self.last_non_retry_error = None;
self.restore_pending_messages_after_replayed_incomplete_turn();
self.clear_restored_active_turn_if_matches(&turn_id);
} else {
self.last_non_retry_error = None;
self.finalize_turn();
self.request_redraw();
}
return;
}
if let Some(error) = notification.turn.error {
if self.last_non_retry_error.as_ref()
== Some(&(notification.turn.id.clone(), error.message.clone()))
@@ -6697,7 +6859,15 @@ impl ChatWidget {
message,
codex_error_info,
}) => {
if codex_error_info
if from_replay {
if !matches!(
codex_error_info,
Some(CoreCodexErrorInfo::ActiveTurnNotSteerable { .. })
) {
self.finalize_turn();
self.request_redraw();
}
} else if codex_error_info
.as_ref()
.is_some_and(|info| self.handle_steer_rejected_error(info))
{
@@ -6870,37 +7040,12 @@ impl ChatWidget {
let EventMsg::UserMessage(event) = item.as_legacy_event() else {
unreachable!("user message item should convert to a legacy user message");
};
let rendered = Self::rendered_user_message_event_from_event(&event);
let compare_key = Self::pending_steer_compare_key_from_item(item);
if self
.pending_steers
.front()
.is_some_and(|pending| pending.compare_key == compare_key)
{
if let Some(pending) = self.pending_steers.pop_front() {
self.refresh_pending_input_preview();
let pending_event = UserMessageEvent {
message: pending.user_message.text,
images: Some(pending.user_message.remote_image_urls),
local_images: pending
.user_message
.local_images
.into_iter()
.map(|image| image.path)
.collect(),
text_elements: pending.user_message.text_elements,
};
self.on_user_message_event(pending_event);
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered)
{
tracing::warn!(
"pending steer matched compare key but queue was empty when rendering committed user message"
);
self.on_user_message_event(event);
}
} else if self.last_rendered_user_message_event.as_ref() != Some(&rendered) {
self.on_user_message_event(event);
}
let compare_key = Self::pending_steer_compare_key_from_items(&item.content);
self.on_user_message_event_reconciling_pending_steer(
event,
compare_key,
/*turn_id*/ None,
);
}
if let codex_protocol::items::TurnItem::Plan(plan_item) = &item {
self.on_plan_item_completed(plan_item.text.clone());

View File

@@ -132,13 +132,6 @@ impl ChatWidget {
}
}
#[cfg(test)]
pub(super) fn pending_steer_compare_key_from_item(
item: &codex_protocol::items::UserMessageItem,
) -> PendingSteerCompareKey {
Self::pending_steer_compare_key_from_items(&item.content)
}
#[cfg(test)]
pub(super) fn rendered_user_message_event_from_inputs(
items: &[UserInput],

View File

@@ -797,6 +797,7 @@ async fn restore_thread_input_state_syncs_sleep_inhibitor_state() {
pending_steers: VecDeque::new(),
rejected_steers_queue: VecDeque::new(),
queued_user_messages: VecDeque::new(),
active_turn_id: Some("turn-1".to_string()),
current_collaboration_mode: chat.current_collaboration_mode.clone(),
active_collaboration_mask: chat.active_collaboration_mask.clone(),
task_running: true,

View File

@@ -1027,6 +1027,74 @@ async fn bang_shell_command_submits_run_user_shell_command_in_app_server_tui() {
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[tokio::test]
async fn user_message_during_user_shell_command_is_queued_not_steered() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.handle_codex_event(Event {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}),
});
let begin_sleep = begin_exec_with_source(
&mut chat,
"user-shell-sleep",
"sleep 10",
ExecCommandSource::UserShell,
);
chat.bottom_pane
.set_composer_text("please continue".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
assert_eq!(chat.pending_steers.len(), 0);
assert_eq!(
chat.queued_user_messages
.iter()
.map(|message| message.text.as_str())
.collect::<Vec<_>>(),
vec!["please continue"],
);
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
chat.handle_key_event(KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
let mut saw_interrupt = false;
while let Ok(event) = rx.try_recv() {
if matches!(event, AppEvent::CodexOp(Op::Interrupt)) {
saw_interrupt = true;
break;
}
}
assert!(saw_interrupt, "expected Esc to send Op::Interrupt");
end_exec(&mut chat, begin_sleep, "", "", /*exit_code*/ 0);
chat.handle_codex_event(Event {
id: "turn-1-complete".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
});
match next_submit_op(&mut op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "please continue".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected Op::UserTurn, got {other:?}"),
}
}
#[tokio::test]
async fn disabled_slash_command_while_task_running_snapshot() {
// Build a chat widget and simulate an active task

View File

@@ -255,6 +255,7 @@ pub(super) async fn make_chatwidget_manual(
queued_user_messages: VecDeque::new(),
rejected_steers_queue: VecDeque::new(),
pending_steers: VecDeque::new(),
restored_active_turn_id: None,
submit_pending_steers_after_interrupt: false,
queued_message_edit_binding: crate::key_hint::alt(KeyCode::Up),
suppress_session_configured_redraw: false,
@@ -573,6 +574,7 @@ pub(super) fn pending_steer(text: &str) -> PendingSteer {
message: text.to_string(),
image_count: 0,
},
turn_id: None,
}
}

View File

@@ -350,7 +350,9 @@ async fn review_restores_context_window_indicator() {
async fn restore_thread_input_state_restores_pending_steers_without_downgrading_them() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
let mut pending_steers = VecDeque::new();
pending_steers.push_back(UserMessage::from("pending steer"));
let mut pending_steer = pending_steer("pending steer");
pending_steer.turn_id = Some("turn-1".to_string());
pending_steers.push_back(pending_steer);
let mut rejected_steers_queue = VecDeque::new();
rejected_steers_queue.push_back(UserMessage::from("already rejected"));
let mut queued_user_messages = VecDeque::new();
@@ -361,6 +363,7 @@ async fn restore_thread_input_state_restores_pending_steers_without_downgrading_
pending_steers,
rejected_steers_queue,
queued_user_messages,
active_turn_id: Some("turn-1".to_string()),
current_collaboration_mode: chat.current_collaboration_mode.clone(),
active_collaboration_mask: chat.active_collaboration_mask.clone(),
task_running: false,
@@ -376,6 +379,10 @@ async fn restore_thread_input_state_restores_pending_steers_without_downgrading_
chat.pending_steers.front().unwrap().user_message.text,
"pending steer"
);
assert_eq!(
chat.pending_steers.front().unwrap().turn_id.as_deref(),
Some("turn-1")
);
}
#[tokio::test]
@@ -435,6 +442,228 @@ async fn steer_enter_uses_pending_steers_while_turn_is_running_without_streaming
assert!(lines_to_single_string(&inserted[0]).contains("queued while running"));
}
#[tokio::test]
async fn unacknowledged_pending_steer_is_retried_as_follow_up_when_turn_completes() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.bottom_pane
.set_composer_text("try again after tool".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match next_submit_op(&mut op_rx) {
Op::UserTurn { .. } => {}
other => panic!("expected running-turn steer submit, got {other:?}"),
}
assert_eq!(chat.pending_steers.len(), 1);
chat.on_task_complete(/*last_agent_message*/ None, /*from_replay*/ false);
match next_submit_op(&mut op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "try again after tool".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected unacknowledged steer to be retried, got {other:?}"),
}
assert!(chat.pending_steers.is_empty());
assert!(chat.rejected_steers_queue.is_empty());
}
#[tokio::test]
async fn replayed_completion_does_not_retry_pending_steer() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.bottom_pane
.set_composer_text("already committed".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match next_submit_op(&mut op_rx) {
Op::UserTurn { .. } => {}
other => panic!("expected running-turn steer submit, got {other:?}"),
}
assert_eq!(chat.pending_steers.len(), 1);
chat.replay_thread_turns(
vec![AppServerTurn {
id: "older-turn".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
}],
ReplayKind::ThreadSnapshot,
);
assert_eq!(chat.pending_steers.len(), 1);
assert_no_submit_op(&mut op_rx);
assert_eq!(chat.pending_steers.len(), 1);
assert!(chat.rejected_steers_queue.is_empty());
assert_no_submit_op(&mut op_rx);
}
#[tokio::test]
async fn replayed_completion_retries_matching_pending_steers_without_front_match() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.thread_id = Some(ThreadId::new());
chat.on_task_started();
chat.restored_active_turn_id = Some("active-turn".to_string());
let mut other_turn = pending_steer("other turn");
other_turn.turn_id = Some("other-turn".to_string());
let mut active_turn = pending_steer("active turn");
active_turn.turn_id = Some("active-turn".to_string());
chat.pending_steers.push_back(other_turn);
chat.pending_steers
.push_back(pending_steer("ownerless turn"));
chat.pending_steers.push_back(active_turn);
chat.handle_turn_completed_notification(
TurnCompletedNotification {
thread_id: chat.thread_id.map(|id| id.to_string()).unwrap_or_default(),
turn: AppServerTurn {
id: "active-turn".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
},
Some(ReplayKind::ThreadSnapshot),
);
let mut expected_remaining = pending_steer("other turn");
expected_remaining.turn_id = Some("other-turn".to_string());
assert_eq!(chat.pending_steers, VecDeque::from([expected_remaining]));
match next_submit_op(&mut op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "ownerless turn\nactive turn".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected queued pending steers to submit, got {other:?}"),
}
}
#[tokio::test]
async fn replayed_in_progress_turn_is_captured_as_active_turn() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.replay_thread_turns(
vec![AppServerTurn {
id: "active-turn".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
}],
ReplayKind::ThreadSnapshot,
);
let input_state = chat
.capture_thread_input_state()
.expect("expected thread input state");
assert_eq!(input_state.active_turn_id.as_deref(), Some("active-turn"));
assert_eq!(chat.restored_active_turn_id.as_deref(), Some("active-turn"));
chat.handle_server_notification(
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: "thread-1".to_string(),
turn: AppServerTurn {
id: "older-turn".to_string(),
items: Vec::new(),
status: AppServerTurnStatus::InProgress,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
},
}),
Some(ReplayKind::ThreadSnapshot),
);
assert_eq!(chat.last_turn_id.as_deref(), Some("active-turn"));
}
#[tokio::test]
async fn replayed_user_message_acknowledges_pending_steer_only_for_restored_turn() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
let mut pending = pending_steer("already committed");
pending.turn_id = Some("active-turn".to_string());
chat.pending_steers.push_back(pending);
chat.restored_active_turn_id = Some("active-turn".to_string());
let user_message = || AppServerThreadItem::UserMessage {
id: "user-message".to_string(),
content: vec![AppServerUserInput::Text {
text: "already committed".to_string(),
text_elements: Vec::new(),
}],
};
chat.replay_thread_item(
user_message(),
"older-turn".to_string(),
ReplayKind::ThreadSnapshot,
);
assert_eq!(chat.pending_steers.len(), 1);
chat.replay_thread_item(
user_message(),
"active-turn".to_string(),
ReplayKind::ThreadSnapshot,
);
assert!(chat.pending_steers.is_empty());
}
#[tokio::test]
async fn replayed_steer_rejection_queues_matching_pending_steer() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
let mut pending = pending_steer("retry after rejection");
pending.turn_id = Some("active-turn".to_string());
chat.pending_steers.push_back(pending);
chat.restored_active_turn_id = Some("active-turn".to_string());
chat.handle_server_notification(
ServerNotification::Error(ErrorNotification {
error: AppServerTurnError {
message: "cannot steer this turn".to_string(),
codex_error_info: Some(
codex_app_server_protocol::CodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: codex_app_server_protocol::NonSteerableTurnKind::Review,
},
),
additional_details: None,
},
will_retry: false,
thread_id: "thread-1".to_string(),
turn_id: "active-turn".to_string(),
}),
Some(ReplayKind::ThreadSnapshot),
);
assert!(chat.pending_steers.is_empty());
assert_eq!(
chat.queued_user_message_texts(),
vec!["retry after rejection"]
);
}
#[tokio::test]
async fn steer_enter_uses_pending_steers_while_final_answer_stream_is_active() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;