Compare commits

...

3 Commits

Author SHA1 Message Date
Dylan Hurd
20b61e25f4 migrate to item_id 2026-03-09 00:14:25 -07:00
Dylan Hurd
c12e25a722 fix(core) Update active_item logic 2026-03-08 21:08:44 -07:00
Dylan Hurd
ae4158f979 reproduce issue 2026-03-08 21:08:44 -07:00
15 changed files with 205 additions and 78 deletions

View File

@@ -41,7 +41,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
let responses = vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_message_item_added("msg-1", ""),
responses::ev_output_text_delta(&full_message),
responses::ev_output_text_delta("msg-1", &full_message),
responses::ev_assistant_message("msg-1", &full_message),
responses::ev_completed("resp-1"),
])];

View File

@@ -68,16 +68,22 @@ pub enum ResponseEvent {
response_id: String,
token_usage: Option<TokenUsage>,
},
OutputTextDelta(String),
OutputTextDelta {
item_id: String,
delta: String,
},
ReasoningSummaryDelta {
item_id: String,
delta: String,
summary_index: i64,
},
ReasoningContentDelta {
item_id: String,
delta: String,
content_index: i64,
},
ReasoningSummaryPartAdded {
item_id: String,
summary_index: i64,
},
RateLimits(RateLimitSnapshot),

View File

@@ -162,6 +162,7 @@ pub struct ResponsesStreamEvent {
headers: Option<Value>,
response: Option<Value>,
item: Option<Value>,
item_id: Option<String>,
delta: Option<String>,
summary_index: Option<i64>,
content_index: Option<i64>,
@@ -241,21 +242,27 @@ pub fn process_responses_event(
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
return Ok(Some(ResponseEvent::OutputTextDelta(delta)));
if let (Some(item_id), Some(delta)) = (event.item_id, event.delta) {
return Ok(Some(ResponseEvent::OutputTextDelta { item_id, delta }));
}
}
"response.reasoning_summary_text.delta" => {
if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) {
if let (Some(item_id), Some(delta), Some(summary_index)) =
(event.item_id, event.delta, event.summary_index)
{
return Ok(Some(ResponseEvent::ReasoningSummaryDelta {
item_id,
delta,
summary_index,
}));
}
}
"response.reasoning_text.delta" => {
if let (Some(delta), Some(content_index)) = (event.delta, event.content_index) {
if let (Some(item_id), Some(delta), Some(content_index)) =
(event.item_id, event.delta, event.content_index)
{
return Ok(Some(ResponseEvent::ReasoningContentDelta {
item_id,
delta,
content_index,
}));
@@ -335,8 +342,9 @@ pub fn process_responses_event(
}
}
"response.reasoning_summary_part.added" => {
if let Some(summary_index) = event.summary_index {
if let (Some(item_id), Some(summary_index)) = (event.item_id, event.summary_index) {
return Ok(Some(ResponseEvent::ReasoningSummaryPartAdded {
item_id,
summary_index,
}));
}

View File

@@ -6172,6 +6172,16 @@ impl AssistantMessageStreamParsers {
}
}
fn response_item_stream_id(item: &ResponseItem) -> Option<String> {
match item {
ResponseItem::Message { id, role, .. } if role == "assistant" => id.clone(),
ResponseItem::Reasoning { id, .. } => Some(id.clone()),
ResponseItem::WebSearchCall { id, .. } => Some(id.clone().unwrap_or_default()),
ResponseItem::ImageGenerationCall { id, .. } => Some(id.clone()),
_ => None,
}
}
impl ProposedPlanItemState {
fn new(turn_id: &str) -> Self {
Self {
@@ -6673,7 +6683,7 @@ async fn try_run_sampling_request(
FuturesOrdered::new();
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
let mut items_by_id: HashMap<String, TurnItem> = HashMap::new();
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
@@ -6716,7 +6726,8 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let previously_active_item = active_item.take();
let previously_active_item =
response_item_stream_id(&item).and_then(|item_id| items_by_id.remove(&item_id));
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
{
@@ -6812,7 +6823,7 @@ async fn try_run_sampling_request(
)
.await;
}
active_item = Some(turn_item);
items_by_id.insert(turn_item.id(), turn_item);
}
}
ResponseEvent::ServerModel(server_model) => {
@@ -6858,81 +6869,74 @@ async fn try_run_sampling_request(
last_agent_message,
});
}
ResponseEvent::OutputTextDelta(delta) => {
ResponseEvent::OutputTextDelta { item_id, delta } => {
// In review child threads, suppress assistant text deltas; the
// UI will show a selection popup from the final ReviewOutput.
if let Some(active) = active_item.as_ref() {
let item_id = active.id();
if matches!(active, TurnItem::AgentMessage(_)) {
let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta);
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
} else {
let event = AgentMessageContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id,
delta,
};
sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event))
.await;
}
if let Some(TurnItem::AgentMessage(_)) = items_by_id.get(&item_id) {
let parsed = assistant_message_stream_parsers.parse_delta(&item_id, &delta);
emit_streamed_assistant_text_delta(
&sess,
&turn_context,
plan_mode_state.as_mut(),
&item_id,
parsed,
)
.await;
} else {
error_or_panic("OutputTextDelta without active item".to_string());
error_or_panic(format!("OutputTextDelta without item {item_id}"));
}
}
ResponseEvent::ReasoningSummaryDelta {
item_id,
delta,
summary_index,
} => {
if let Some(active) = active_item.as_ref() {
if let Some(TurnItem::Reasoning(_)) = items_by_id.get(&item_id) {
let event = ReasoningContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
item_id,
delta,
summary_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
} else {
error_or_panic("ReasoningSummaryDelta without active item".to_string());
error_or_panic(format!("ReasoningSummaryDelta without item {item_id}"));
}
}
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
if let Some(active) = active_item.as_ref() {
ResponseEvent::ReasoningSummaryPartAdded {
item_id,
summary_index,
} => {
if let Some(TurnItem::Reasoning(_)) = items_by_id.get(&item_id) {
let event =
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {
item_id: active.id(),
item_id,
summary_index,
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
error_or_panic(format!("ReasoningSummaryPartAdded without item {item_id}"));
}
}
ResponseEvent::ReasoningContentDelta {
item_id,
delta,
content_index,
} => {
if let Some(active) = active_item.as_ref() {
if let Some(TurnItem::Reasoning(_)) = items_by_id.get(&item_id) {
let event = ReasoningRawContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
item_id,
delta,
content_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;
} else {
error_or_panic("ReasoningRawContentDelta without active item".to_string());
error_or_panic(format!("ReasoningRawContentDelta without item {item_id}"));
}
}
}

View File

@@ -361,7 +361,7 @@ mod job {
let mut token_usage = None;
while let Some(message) = stream.next().await.transpose()? {
match message {
ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta),
ResponseEvent::OutputTextDelta { delta, .. } => result.push_str(&delta),
ResponseEvent::OutputItemDone(item) => {
if result.is_empty()
&& let ResponseItem::Message { content, .. } = item

View File

@@ -105,7 +105,7 @@ fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
response_item_records_turn_ttft(item)
}
ResponseEvent::OutputTextDelta(_)
ResponseEvent::OutputTextDelta { .. }
| ResponseEvent::ReasoningSummaryDelta { .. }
| ResponseEvent::ReasoningContentDelta { .. } => true,
ResponseEvent::Created
@@ -171,7 +171,10 @@ mod tests {
let state = TurnTimingState::default();
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta {
item_id: "msg-1".to_string(),
delta: "hi".to_string(),
})
.await,
None
);
@@ -185,15 +188,19 @@ mod tests {
);
assert!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta {
item_id: "msg-1".to_string(),
delta: "hi".to_string(),
})
.await
.is_some()
);
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta(
"again".to_string()
))
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta {
item_id: "msg-1".to_string(),
delta: "again".to_string(),
})
.await,
None
);
@@ -206,7 +213,10 @@ mod tests {
assert!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta {
item_id: "msg-1".to_string(),
delta: "hi".to_string(),
})
.await
.is_some()
);

View File

@@ -645,9 +645,10 @@ pub fn ev_message_item_added(id: &str, text: &str) -> Value {
})
}
pub fn ev_output_text_delta(delta: &str) -> Value {
pub fn ev_output_text_delta(item_id: &str, delta: &str) -> Value {
serde_json::json!({
"type": "response.output_text.delta",
"item_id": item_id,
"delta": delta,
})
}
@@ -700,17 +701,19 @@ pub fn ev_reasoning_item_added(id: &str, summary: &[&str]) -> Value {
})
}
pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
pub fn ev_reasoning_summary_text_delta(item_id: &str, delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"item_id": item_id,
"delta": delta,
"summary_index": 0,
})
}
pub fn ev_reasoning_text_delta(delta: &str) -> Value {
pub fn ev_reasoning_text_delta(item_id: &str, delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_text.delta",
"item_id": item_id,
"delta": delta,
"content_index": 0,
})

View File

@@ -2230,7 +2230,7 @@ async fn incomplete_response_emits_content_filter_error_message() -> anyhow::Res
let incomplete_response = sse(vec![
ev_response_created("resp_incomplete"),
ev_message_item_added("msg_incomplete", "partial content"),
ev_output_text_delta("continued chunk"),
ev_output_text_delta("msg_incomplete", "continued chunk"),
json!({
"type": "response.incomplete",
"response": {
@@ -2474,9 +2474,9 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":""}]
}},
{"type":"response.output_text.delta", "delta":"Hey "},
{"type":"response.output_text.delta", "delta":"there"},
{"type":"response.output_text.delta", "delta":"!\n"},
{"type":"response.output_text.delta", "item_id":"msg-1", "delta":"Hey "},
{"type":"response.output_text.delta", "item_id":"msg-1", "delta":"there"},
{"type":"response.output_text.delta", "item_id":"msg-1", "delta":"!\n"},
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":"Hey there!\n"}]

View File

@@ -201,7 +201,7 @@ async fn codex_delegate_ignores_legacy_deltas() {
let sse_stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_item_added("reason-1", &["initial"]),
ev_reasoning_summary_text_delta("think-1"),
ev_reasoning_summary_text_delta("reason-1", "think-1"),
ev_completed("resp-1"),
]);

View File

@@ -376,7 +376,7 @@ async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> {
let stream = sse(vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("streamed response"),
ev_output_text_delta("msg-1", "streamed response"),
ev_assistant_message("msg-1", "streamed response"),
ev_completed("resp-1"),
]);
@@ -432,6 +432,102 @@ async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interleaved_reasoning_and_assistant_streams_keep_item_ids_aligned() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let TestCodex { codex, .. } = test_codex().build(&server).await?;
let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_item_added("reasoning-1", &[""]),
ev_message_item_added("msg-1", ""),
ev_reasoning_summary_text_delta("reasoning-1", "thinking"),
ev_reasoning_item("reasoning-1", &["thinking"], &[]),
ev_output_text_delta("msg-1", "answer"),
ev_assistant_message("msg-1", "answer"),
ev_completed("resp-1"),
]);
mount_sse_once(&server, stream).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "please reason and answer".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let reasoning_started = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::Reasoning(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
let agent_started = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::AgentMessage(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
let reasoning_delta = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ReasoningContentDelta(event) => Some(event.clone()),
_ => None,
})
.await;
let reasoning_completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Reasoning(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
let agent_delta = wait_for_event_match(&codex, |ev| match ev {
EventMsg::AgentMessageContentDelta(event) => Some(event.clone()),
_ => None,
})
.await;
let agent_completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::AgentMessage(item),
..
}) => Some(item.clone()),
_ => None,
})
.await;
assert_eq!(reasoning_started.id, "reasoning-1");
assert_eq!(reasoning_started.summary_text, vec![String::new()]);
assert_eq!(reasoning_delta.item_id, reasoning_started.id);
assert_eq!(reasoning_delta.delta, "thinking");
assert_eq!(reasoning_completed.id, reasoning_started.id);
assert_eq!(
reasoning_completed.summary_text,
vec!["thinking".to_string()]
);
assert_eq!(reasoning_completed.raw_content, Vec::<String>::new());
assert_eq!(agent_started.id, "msg-1");
assert_eq!(agent_delta.item_id, agent_started.id);
assert_eq!(agent_delta.delta, "answer");
assert_eq!(agent_completed.id, agent_started.id);
let Some(AgentMessageContent::Text { text }) = agent_completed.content.first() else {
panic!("expected completed agent message text content");
};
assert_eq!(text, "answer");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_mode_emits_plan_item_from_proposed_plan_block() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
@@ -449,7 +545,7 @@ async fn plan_mode_emits_plan_item_from_proposed_plan_block() -> anyhow::Result<
let stream = sse(vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", ""),
ev_output_text_delta(&full_message),
ev_output_text_delta("msg-1", &full_message),
ev_assistant_message("msg-1", &full_message),
ev_completed("resp-1"),
]);
@@ -525,7 +621,7 @@ async fn plan_mode_strips_plan_from_agent_messages() -> anyhow::Result<()> {
let stream = sse(vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", ""),
ev_output_text_delta(&full_message),
ev_output_text_delta("msg-1", &full_message),
ev_assistant_message("msg-1", &full_message),
ev_completed("resp-1"),
]);
@@ -633,7 +729,7 @@ async fn plan_mode_streaming_citations_are_stripped_across_added_deltas_and_done
ev_message_item_added("msg-1", added_text),
];
for delta in deltas {
events.push(ev_output_text_delta(delta));
events.push(ev_output_text_delta("msg-1", delta));
}
events.push(ev_assistant_message("msg-1", &full_message));
events.push(ev_completed("resp-1"));
@@ -819,7 +915,7 @@ async fn plan_mode_streaming_proposed_plan_tag_split_across_added_and_delta_is_p
ev_message_item_added("msg-1", added_text),
];
for delta in deltas {
events.push(ev_output_text_delta(delta));
events.push(ev_output_text_delta("msg-1", delta));
}
events.push(ev_assistant_message("msg-1", &full_message));
events.push(ev_completed("resp-1"));
@@ -932,7 +1028,7 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> {
let stream = sse(vec![
ev_response_created("resp-1"),
ev_message_item_added("msg-1", ""),
ev_output_text_delta(full_message),
ev_output_text_delta("msg-1", full_message),
ev_assistant_message("msg-1", full_message),
ev_completed("resp-1"),
]);
@@ -1018,7 +1114,7 @@ async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> {
let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_item_added("reasoning-1", &[""]),
ev_reasoning_summary_text_delta("step one"),
ev_reasoning_summary_text_delta("reasoning-1", "step one"),
ev_reasoning_item("reasoning-1", &["step one"], &[]),
ev_completed("resp-1"),
]);
@@ -1077,7 +1173,7 @@ async fn reasoning_raw_content_delta_respects_flag() -> anyhow::Result<()> {
let stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_item_added("reasoning-raw", &[""]),
ev_reasoning_text_delta("raw detail"),
ev_reasoning_text_delta("reasoning-raw", "raw detail"),
ev_reasoning_item("reasoning-raw", &["complete"], &["raw detail"]),
ev_completed("resp-1"),
]);

View File

@@ -641,9 +641,9 @@ async fn record_responses_sets_span_fields_for_response_events() {
}),
ev_message_item_added("msg-added", "hi there"),
ev_reasoning_item_added("reasoning-1", &["summary"]),
ev_output_text_delta("delta"),
ev_reasoning_summary_text_delta("summary-delta"),
ev_reasoning_text_delta("raw-delta"),
ev_output_text_delta("msg-added", "delta"),
ev_reasoning_summary_text_delta("reasoning-1", "summary-delta"),
ev_reasoning_text_delta("reasoning-1", "raw-delta"),
ev_function_call("call-1", "fn", "{\"key\":\"value\"}"),
ev_assistant_message("msg-1", "agent"),
ev_reasoning_item("reasoning-1", &["summary"], &[]),

View File

@@ -60,11 +60,11 @@ async fn injected_user_input_triggers_follow_up_request_with_deltas() {
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
body: sse_event(ev_output_text_delta("msg-1", "first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("turn")),
body: sse_event(ev_output_text_delta("msg-1", "turn")),
},
StreamingSseChunk {
gate: None,

View File

@@ -1681,11 +1681,11 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_output_text_delta("first ")),
body: sse_event(responses::ev_output_text_delta("msg-1", "first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(responses::ev_output_text_delta("turn")),
body: sse_event(responses::ev_output_text_delta("msg-1", "turn")),
},
StreamingSseChunk {
gate: None,

View File

@@ -244,8 +244,8 @@ async fn review_filters_agent_message_related_events() {
"type":"message", "role":"assistant", "id":"msg-1",
"content":[{"type":"output_text","text":""}]
}},
{"type":"response.output_text.delta", "delta":"Hi"},
{"type":"response.output_text.delta", "delta":" there"},
{"type":"response.output_text.delta", "item_id":"msg-1", "delta":"Hi"},
{"type":"response.output_text.delta", "item_id":"msg-1", "delta":" there"},
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant", "id":"msg-1",
"content":[{"type":"output_text","text":"Hi there"}]

View File

@@ -905,7 +905,7 @@ impl SessionTelemetry {
SessionTelemetry::responses_item_type(item)
}
ResponseEvent::Completed { .. } => "completed".into(),
ResponseEvent::OutputTextDelta(_) => "text_delta".into(),
ResponseEvent::OutputTextDelta { .. } => "text_delta".into(),
ResponseEvent::ReasoningSummaryDelta { .. } => "reasoning_summary_delta".into(),
ResponseEvent::ReasoningContentDelta { .. } => "reasoning_content_delta".into(),
ResponseEvent::ReasoningSummaryPartAdded { .. } => {