Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
aeb9cc1a5b Include elapsedMs in app-server protocol 2026-01-22 18:06:59 -08:00
Ahmed Ibrahim
a977fb9f19 Add elapsedMs to completed turn items 2026-01-22 18:04:11 -08:00
9 changed files with 557 additions and 18 deletions

View File

@@ -70,7 +70,11 @@ impl ThreadHistoryBuilder {
let mut turn = self.new_turn();
let id = self.next_item_id();
let content = self.build_user_inputs(payload);
turn.items.push(ThreadItem::UserMessage { id, content });
turn.items.push(ThreadItem::UserMessage {
id,
content,
elapsed_ms: None,
});
self.current_turn = Some(turn);
}
@@ -80,9 +84,11 @@ impl ThreadHistoryBuilder {
}
let id = self.next_item_id();
self.ensure_turn()
.items
.push(ThreadItem::AgentMessage { id, text });
self.ensure_turn().items.push(ThreadItem::AgentMessage {
id,
text,
elapsed_ms: None,
});
}
fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) {
@@ -102,6 +108,7 @@ impl ThreadHistoryBuilder {
id,
summary: vec![payload.text.clone()],
content: Vec::new(),
elapsed_ms: None,
});
}
@@ -122,6 +129,7 @@ impl ThreadHistoryBuilder {
id,
summary: Vec::new(),
content: vec![payload.text.clone()],
elapsed_ms: None,
});
}
@@ -296,6 +304,7 @@ mod tests {
url: "https://example.com/one.png".into(),
}
],
elapsed_ms: None,
}
);
assert_eq!(
@@ -303,6 +312,7 @@ mod tests {
ThreadItem::AgentMessage {
id: "item-2".into(),
text: "Hi there".into(),
elapsed_ms: None,
}
);
assert_eq!(
@@ -311,6 +321,7 @@ mod tests {
id: "item-3".into(),
summary: vec!["thinking".into()],
content: vec!["full reasoning".into()],
elapsed_ms: None,
}
);
@@ -325,6 +336,7 @@ mod tests {
text: "Second turn".into(),
text_elements: Vec::new(),
}],
elapsed_ms: None,
}
);
assert_eq!(
@@ -332,6 +344,7 @@ mod tests {
ThreadItem::AgentMessage {
id: "item-5".into(),
text: "Reply two".into(),
elapsed_ms: None,
}
);
}
@@ -370,6 +383,7 @@ mod tests {
id: "item-2".into(),
summary: vec!["first summary".into()],
content: vec!["first content".into()],
elapsed_ms: None,
}
);
assert_eq!(
@@ -378,6 +392,7 @@ mod tests {
id: "item-4".into(),
summary: vec!["second summary".into()],
content: Vec::new(),
elapsed_ms: None,
}
);
}
@@ -422,6 +437,7 @@ mod tests {
text: "Please do the thing".into(),
text_elements: Vec::new(),
}],
elapsed_ms: None,
}
);
assert_eq!(
@@ -429,6 +445,7 @@ mod tests {
ThreadItem::AgentMessage {
id: "item-2".into(),
text: "Working...".into(),
elapsed_ms: None,
}
);
@@ -443,6 +460,7 @@ mod tests {
text: "Let's try again".into(),
text_elements: Vec::new(),
}],
elapsed_ms: None,
}
);
assert_eq!(
@@ -450,6 +468,7 @@ mod tests {
ThreadItem::AgentMessage {
id: "item-4".into(),
text: "Second attempt complete.".into(),
elapsed_ms: None,
}
);
}
@@ -500,10 +519,12 @@ mod tests {
text: "First".into(),
text_elements: Vec::new(),
}],
elapsed_ms: None,
},
ThreadItem::AgentMessage {
id: "item-2".into(),
text: "A1".into(),
elapsed_ms: None,
},
],
},
@@ -518,10 +539,12 @@ mod tests {
text: "Third".into(),
text_elements: Vec::new(),
}],
elapsed_ms: None,
},
ThreadItem::AgentMessage {
id: "item-4".into(),
text: "A3".into(),
elapsed_ms: None,
},
],
},

View File

@@ -1789,10 +1789,20 @@ impl From<CoreUserInput> for UserInput {
pub enum ThreadItem {
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
UserMessage { id: String, content: Vec<UserInput> },
UserMessage {
id: String,
content: Vec<UserInput>,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
AgentMessage { id: String, text: String },
AgentMessage {
id: String,
text: String,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
Reasoning {
@@ -1801,6 +1811,8 @@ pub enum ThreadItem {
summary: Vec<String>,
#[serde(default)]
content: Vec<String>,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
@@ -1824,6 +1836,8 @@ pub enum ThreadItem {
/// The duration of the command execution in milliseconds.
#[ts(type = "number | null")]
duration_ms: Option<i64>,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
@@ -1831,6 +1845,8 @@ pub enum ThreadItem {
id: String,
changes: Vec<FileUpdateChange>,
status: PatchApplyStatus,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
@@ -1845,6 +1861,8 @@ pub enum ThreadItem {
/// The duration of the MCP tool call in milliseconds.
#[ts(type = "number | null")]
duration_ms: Option<i64>,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
@@ -1864,19 +1882,41 @@ pub enum ThreadItem {
prompt: Option<String>,
/// Last known status of the target agents, when available.
agents_states: HashMap<String, CollabAgentState>,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
WebSearch { id: String, query: String },
WebSearch {
id: String,
query: String,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ImageView { id: String, path: String },
ImageView {
id: String,
path: String,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
EnteredReviewMode { id: String, review: String },
EnteredReviewMode {
id: String,
review: String,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ExitedReviewMode { id: String, review: String },
ExitedReviewMode {
id: String,
review: String,
#[ts(type = "number | null")]
elapsed_ms: Option<i64>,
},
}
impl From<CoreTurnItem> for ThreadItem {
@@ -1885,6 +1925,7 @@ impl From<CoreTurnItem> for ThreadItem {
CoreTurnItem::UserMessage(user) => ThreadItem::UserMessage {
id: user.id,
content: user.content.into_iter().map(UserInput::from).collect(),
elapsed_ms: None,
},
CoreTurnItem::AgentMessage(agent) => {
let text = agent
@@ -1894,16 +1935,22 @@ impl From<CoreTurnItem> for ThreadItem {
CoreAgentMessageContent::Text { text } => text,
})
.collect::<String>();
ThreadItem::AgentMessage { id: agent.id, text }
ThreadItem::AgentMessage {
id: agent.id,
text,
elapsed_ms: None,
}
}
CoreTurnItem::Reasoning(reasoning) => ThreadItem::Reasoning {
id: reasoning.id,
summary: reasoning.summary_text,
content: reasoning.raw_content,
elapsed_ms: None,
},
CoreTurnItem::WebSearch(search) => ThreadItem::WebSearch {
id: search.id,
query: search.query,
elapsed_ms: None,
},
}
}
@@ -2548,6 +2595,7 @@ mod tests {
path: PathBuf::from("/repo/.codex/skills/skill-creator/SKILL.md"),
},
],
elapsed_ms: None,
}
);
@@ -2568,6 +2616,7 @@ mod tests {
ThreadItem::AgentMessage {
id: "agent-1".to_string(),
text: "Hello world".to_string(),
elapsed_ms: None,
}
);
@@ -2583,6 +2632,7 @@ mod tests {
id: "reasoning-1".to_string(),
summary: vec!["line one".to_string(), "line two".to_string()],
content: vec![],
elapsed_ms: None,
}
);
@@ -2596,6 +2646,7 @@ mod tests {
ThreadItem::WebSearch {
id: "search-1".to_string(),
query: "docs".to_string(),
elapsed_ms: None,
}
);
}

View File

@@ -403,6 +403,8 @@ Today both notifications carry an empty `items` array even when item events were
- `exitedReviewMode``{id, review}` emitted when the reviewer finishes; `review` is the full plain-text review (usually, overall notes plus bullet point findings).
- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically.
Each item also includes `elapsedMs?`, the elapsed milliseconds since the turn started. This is populated on `item/completed` notifications (and omitted/`null` on `item/started` and history-loaded items).
All items emit two shared lifecycle events:
- `item/started` — emits the full `item` when a new unit of work begins so the UI can render it immediately; the `item.id` in this payload matches the `itemId` used by deltas.

View File

@@ -93,6 +93,7 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::error;
@@ -115,6 +116,22 @@ pub(crate) async fn apply_bespoke_event_handling(
msg,
} = event;
match msg {
EventMsg::TurnStarted(_ev) => {
if let ApiVersion::V2 = api_version {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
summary.file_change_started.clear();
summary.last_error = None;
let is_same_turn =
summary.active_turn_id.as_deref() == Some(event_turn_id.as_str());
if !is_same_turn {
summary.active_turn_id = Some(event_turn_id);
summary.turn_started_at = Some(Instant::now());
} else if summary.turn_started_at.is_none() {
summary.turn_started_at = Some(Instant::now());
}
}
}
EventMsg::TurnComplete(_ev) => {
handle_turn_complete(
conversation_id,
@@ -162,6 +179,7 @@ pub(crate) async fn apply_bespoke_event_handling(
id: item_id.clone(),
changes: patch_changes.clone(),
status: PatchApplyStatus::InProgress,
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -251,6 +269,7 @@ pub(crate) async fn apply_bespoke_event_handling(
params,
))
.await;
let turn_summary_store = turn_summary_store.clone();
tokio::spawn(async move {
on_command_execution_request_approval_response(
event_turn_id,
@@ -262,6 +281,7 @@ pub(crate) async fn apply_bespoke_event_handling(
rx,
conversation,
outgoing,
turn_summary_store,
)
.await;
});
@@ -331,10 +351,13 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::McpToolCallEnd(end_event) => {
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let notification = construct_mcp_tool_call_end_notification(
end_event,
conversation_id.to_string(),
event_turn_id.clone(),
elapsed_ms,
)
.await;
outgoing
@@ -350,6 +373,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids: Vec::new(),
prompt: Some(begin_event.prompt),
agents_states: HashMap::new(),
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -361,6 +385,8 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::CollabAgentSpawnEnd(end_event) => {
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let has_receiver = end_event.new_thread_id.is_some();
let status = match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
@@ -387,6 +413,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids,
prompt: Some(end_event.prompt),
agents_states,
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -407,6 +434,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids,
prompt: Some(begin_event.prompt),
agents_states: HashMap::new(),
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -418,6 +446,8 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::CollabAgentInteractionEnd(end_event) => {
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let status = match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
@@ -433,6 +463,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids: vec![receiver_id.clone()],
prompt: Some(end_event.prompt),
agents_states: [(receiver_id, received_status)].into_iter().collect(),
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -457,6 +488,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids,
prompt: None,
agents_states: HashMap::new(),
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -468,6 +500,8 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::CollabWaitingEnd(end_event) => {
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let status = if end_event.statuses.values().any(|status| {
matches!(
status,
@@ -493,6 +527,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids,
prompt: None,
agents_states,
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -512,6 +547,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids: vec![begin_event.receiver_thread_id.to_string()],
prompt: None,
agents_states: HashMap::new(),
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -523,6 +559,8 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::CollabCloseEnd(end_event) => {
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let status = match &end_event.status {
codex_protocol::protocol::AgentStatus::Errored(_)
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
@@ -543,6 +581,7 @@ pub(crate) async fn apply_bespoke_event_handling(
receiver_thread_ids: vec![receiver_id],
prompt: None,
agents_states,
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -682,6 +721,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let item = ThreadItem::ImageView {
id: view_image_event.call_id.clone(),
path: view_image_event.path.to_string_lossy().into_owned(),
elapsed_ms: None,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -691,6 +731,10 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
.await;
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let mut item = item;
set_elapsed_ms(&mut item, elapsed_ms);
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
@@ -707,6 +751,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let item = ThreadItem::EnteredReviewMode {
id: event_turn_id.clone(),
review,
elapsed_ms: None,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -716,6 +761,10 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
.await;
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let mut item = item;
set_elapsed_ms(&mut item, elapsed_ms);
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
@@ -737,7 +786,10 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::ItemCompleted(item_completed_event) => {
let item: ThreadItem = item_completed_event.item.clone().into();
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let mut item: ThreadItem = item_completed_event.item.clone().into();
set_elapsed_ms(&mut item, elapsed_ms);
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
@@ -755,6 +807,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let item = ThreadItem::ExitedReviewMode {
id: event_turn_id.clone(),
review,
elapsed_ms: None,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -764,6 +817,10 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
.await;
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let mut item = item;
set_elapsed_ms(&mut item, elapsed_ms);
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
@@ -798,6 +855,7 @@ pub(crate) async fn apply_bespoke_event_handling(
id: item_id.clone(),
changes: convert_patch_changes(&patch_begin_event.changes),
status: PatchApplyStatus::InProgress,
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -852,6 +910,7 @@ pub(crate) async fn apply_bespoke_event_handling(
aggregated_output: None,
exit_code: None,
duration_ms: None,
elapsed_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
@@ -946,6 +1005,8 @@ pub(crate) async fn apply_bespoke_event_handling(
};
let duration_ms = i64::try_from(duration.as_millis()).unwrap_or(i64::MAX);
let elapsed_ms =
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
let item = ThreadItem::CommandExecution {
id: call_id,
@@ -957,6 +1018,7 @@ pub(crate) async fn apply_bespoke_event_handling(
aggregated_output,
exit_code: Some(exit_code),
duration_ms: Some(duration_ms),
elapsed_ms,
};
let notification = ItemCompletedNotification {
@@ -1140,6 +1202,65 @@ async fn emit_turn_completed_with_status(
.await;
}
async fn turn_elapsed_ms(
conversation_id: ThreadId,
turn_id: &str,
turn_summary_store: &TurnSummaryStore,
) -> Option<i64> {
let mut map = turn_summary_store.lock().await;
let summary = map.entry(conversation_id).or_default();
if summary
.active_turn_id
.as_deref()
.is_some_and(|active| active != turn_id)
{
return None;
}
if summary.active_turn_id.is_none() {
summary.active_turn_id = Some(turn_id.to_string());
}
let started_at = summary.turn_started_at.get_or_insert_with(Instant::now);
i64::try_from(started_at.elapsed().as_millis()).ok()
}
fn set_elapsed_ms(item: &mut ThreadItem, elapsed_ms: Option<i64>) {
match item {
ThreadItem::UserMessage {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::AgentMessage {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::Reasoning {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::CommandExecution {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::FileChange {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::McpToolCall {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::CollabAgentToolCall {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::WebSearch {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::ImageView {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::EnteredReviewMode {
elapsed_ms: field, ..
} => *field = elapsed_ms,
ThreadItem::ExitedReviewMode {
elapsed_ms: field, ..
} => *field = elapsed_ms,
}
}
async fn complete_file_change_item(
conversation_id: ThreadId,
item_id: String,
@@ -1156,10 +1277,12 @@ async fn complete_file_change_item(
}
}
let elapsed_ms = turn_elapsed_ms(conversation_id, &turn_id, turn_summary_store).await;
let item = ThreadItem::FileChange {
id: item_id,
changes,
status,
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -1182,7 +1305,9 @@ async fn complete_command_execution_item(
command_actions: Vec<V2ParsedCommand>,
status: CommandExecutionStatus,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
let elapsed_ms = turn_elapsed_ms(conversation_id, &turn_id, turn_summary_store).await;
let item = ThreadItem::CommandExecution {
id: item_id,
command,
@@ -1193,6 +1318,7 @@ async fn complete_command_execution_item(
aggregated_output: None,
exit_code: None,
duration_ms: None,
elapsed_ms,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
@@ -1612,6 +1738,7 @@ async fn on_command_execution_request_approval_response(
receiver: oneshot::Receiver<JsonValue>,
conversation: Arc<CodexThread>,
outgoing: Arc<OutgoingMessageSender>,
turn_summary_store: TurnSummaryStore,
) {
let response = receiver.await;
let (decision, completion_status) = match response {
@@ -1667,6 +1794,7 @@ async fn on_command_execution_request_approval_response(
command_actions.clone(),
status,
outgoing.as_ref(),
&turn_summary_store,
)
.await;
}
@@ -1697,6 +1825,7 @@ async fn construct_mcp_tool_call_notification(
result: None,
error: None,
duration_ms: None,
elapsed_ms: None,
};
ItemStartedNotification {
thread_id,
@@ -1710,6 +1839,7 @@ async fn construct_mcp_tool_call_end_notification(
end_event: McpToolCallEndEvent,
thread_id: String,
turn_id: String,
elapsed_ms: Option<i64>,
) -> ItemCompletedNotification {
let status = if end_event.is_success() {
McpToolCallStatus::Completed
@@ -1743,6 +1873,7 @@ async fn construct_mcp_tool_call_end_notification(
result,
error,
duration_ms,
elapsed_ms,
};
ItemCompletedNotification {
thread_id,
@@ -2134,6 +2265,7 @@ mod tests {
result: None,
error: None,
duration_ms: None,
elapsed_ms: None,
},
};
@@ -2292,6 +2424,7 @@ mod tests {
result: None,
error: None,
duration_ms: None,
elapsed_ms: None,
},
};
@@ -2324,10 +2457,12 @@ mod tests {
let thread_id = ThreadId::new().to_string();
let turn_id = "turn_3".to_string();
let elapsed_ms = Some(123);
let notification = construct_mcp_tool_call_end_notification(
end_event.clone(),
thread_id.clone(),
turn_id.clone(),
elapsed_ms,
)
.await;
@@ -2346,6 +2481,7 @@ mod tests {
}),
error: None,
duration_ms: Some(0),
elapsed_ms,
},
};
@@ -2367,10 +2503,12 @@ mod tests {
let thread_id = ThreadId::new().to_string();
let turn_id = "turn_4".to_string();
let elapsed_ms = Some(456);
let notification = construct_mcp_tool_call_end_notification(
end_event.clone(),
thread_id.clone(),
turn_id.clone(),
elapsed_ms,
)
.await;
@@ -2388,6 +2526,7 @@ mod tests {
message: "boom".to_string(),
}),
duration_ms: Some(1),
elapsed_ms,
},
};

View File

@@ -187,6 +187,7 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
@@ -207,6 +208,8 @@ pub(crate) type PendingRollbacks = Arc<Mutex<HashMap<ThreadId, RequestId>>>;
pub(crate) struct TurnSummary {
pub(crate) file_change_started: HashSet<String>,
pub(crate) last_error: Option<TurnError>,
pub(crate) active_turn_id: Option<String>,
pub(crate) turn_started_at: Option<Instant>,
}
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ThreadId, TurnSummary>>>;
@@ -3581,6 +3584,7 @@ impl CodexMessageProcessor {
// Review prompt display text is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}],
elapsed_ms: None,
}]
};

View File

@@ -17,4 +17,5 @@ mod thread_resume;
mod thread_rollback;
mod thread_start;
mod turn_interrupt;
mod turn_items_elapsed;
mod turn_start;

View File

@@ -16,6 +16,7 @@ use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStatus;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -87,7 +88,7 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
let started: ItemStartedNotification =
serde_json::from_value(item_started.params.expect("params must be present"))?;
match started.item {
ThreadItem::EnteredReviewMode { id, review } => {
ThreadItem::EnteredReviewMode { id, review, .. } => {
assert_eq!(id, turn_id);
assert_eq!(review, "commit 1234567: Tidy UI colors");
saw_entered_review_mode = true;
@@ -101,10 +102,10 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
"did not observe enteredReviewMode item"
);
// Confirm we see the ExitedReviewMode marker (with review text)
// on the same turn. Ignore any other items the stream surfaces.
// Confirm we see the EnteredReviewMode + ExitedReviewMode markers on the same turn.
let mut saw_entered_completed = false;
let mut review_body: Option<String> = None;
for _ in 0..10 {
for _ in 0..12 {
let review_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("item/completed"),
@@ -113,15 +114,28 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
let completed: ItemCompletedNotification =
serde_json::from_value(review_notif.params.expect("params must be present"))?;
match completed.item {
ThreadItem::ExitedReviewMode { id, review } => {
ThreadItem::EnteredReviewMode { id, elapsed_ms, .. } => {
assert_eq!(id, turn_id);
assert_eq!(elapsed_ms.is_some(), true);
saw_entered_completed = true;
}
ThreadItem::ExitedReviewMode {
id,
review,
elapsed_ms,
} => {
assert_eq!(id, turn_id);
assert_eq!(elapsed_ms.is_some(), true);
review_body = Some(review);
break;
}
_ => continue,
}
if saw_entered_completed && review_body.is_some() {
break;
}
}
assert_eq!(saw_entered_completed, true);
let review = review_body.expect("did not observe a code review item");
assert!(review.contains("Prefer Stylize helpers"));
assert!(review.contains("/tmp/file.rs:10-20"));

View File

@@ -0,0 +1,303 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::HashSet;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn turn_items_emit_elapsed_ms_on_completion() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let image_path = workspace.join("image.png");
std::fs::write(&image_path, b"not an image")?;
let image_path_str = image_path.to_string_lossy().into_owned();
let mcp_tool_name = "list_mcp_resources";
let patch = r#"*** Begin Patch
*** Add File: README.md
+new line
*** End Patch
"#;
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_reasoning_item("reason-1", &["summary"], &["details"]),
responses::ev_web_search_call_done("search-1", "completed", "query"),
responses::ev_assistant_message("msg-1", "hello"),
responses::ev_completed("resp-1"),
]),
create_shell_command_sse_response(
vec!["echo".to_string(), "hi".to_string()],
None,
None,
"shell-1",
)?,
create_final_assistant_message_sse_response("shell done")?,
create_apply_patch_sse_response(patch, "patch-1")?,
create_final_assistant_message_sse_response("patch done")?,
create_function_call_sse_response(
"view-1",
"view_image",
json!({ "path": image_path_str }),
)?,
create_final_assistant_message_sse_response("view done")?,
create_function_call_sse_response(
"collab-1",
"close_agent",
json!({ "id": "00000000-0000-0000-0000-000000000001" }),
)?,
create_final_assistant_message_sse_response("collab done")?,
create_function_call_sse_response("mcp-1", mcp_tool_name, json!({}))?,
create_final_assistant_message_sse_response("mcp done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp, &workspace).await?;
start_turn(&mut mcp, &thread_id, &workspace, "first turn").await?;
let mut pending = HashSet::from(["user", "agent", "reasoning", "web_search"]);
for _ in 0..12 {
let item = timeout(DEFAULT_READ_TIMEOUT, next_item_completed(&mut mcp)).await??;
if let Some(kind) = item_kind(&item) {
if pending.remove(kind) {
assert_elapsed_ms(&item);
}
}
if pending.is_empty() {
break;
}
}
assert_eq!(pending.is_empty(), true);
wait_for_turn_completed(&mut mcp).await?;
start_turn(&mut mcp, &thread_id, &workspace, "shell turn").await?;
let command_exec_item = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_completed_item(&mut mcp, |item| {
matches!(item, ThreadItem::CommandExecution { .. })
}),
)
.await??;
assert_elapsed_ms(&command_exec_item);
wait_for_turn_completed(&mut mcp).await?;
start_turn(&mut mcp, &thread_id, &workspace, "patch turn").await?;
let file_change_item = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_completed_item(&mut mcp, |item| {
matches!(item, ThreadItem::FileChange { .. })
}),
)
.await??;
assert_elapsed_ms(&file_change_item);
wait_for_turn_completed(&mut mcp).await?;
start_turn(&mut mcp, &thread_id, &workspace, "image turn").await?;
let image_item = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_completed_item(&mut mcp, |item| {
matches!(item, ThreadItem::ImageView { .. })
}),
)
.await??;
assert_elapsed_ms(&image_item);
wait_for_turn_completed(&mut mcp).await?;
start_turn(&mut mcp, &thread_id, &workspace, "collab turn").await?;
let collab_item = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_completed_item(&mut mcp, |item| {
matches!(item, ThreadItem::CollabAgentToolCall { .. })
}),
)
.await??;
assert_elapsed_ms(&collab_item);
wait_for_turn_completed(&mut mcp).await?;
start_turn(&mut mcp, &thread_id, &workspace, "mcp turn").await?;
let mcp_item = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_completed_item(&mut mcp, |item| {
matches!(item, ThreadItem::McpToolCall { .. })
}),
)
.await??;
assert_elapsed_ms(&mcp_item);
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
async fn start_thread(mcp: &mut McpProcess, cwd: &Path) -> Result<String> {
let start_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
cwd: Some(cwd.to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
Ok(thread.id)
}
async fn start_turn(
mcp: &mut McpProcess,
thread_id: &str,
cwd: &Path,
text: &str,
) -> Result<String> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.to_string(),
input: vec![V2UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
cwd: Some(cwd.to_path_buf()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
Ok(turn.id)
}
async fn next_item_completed(mcp: &mut McpProcess) -> Result<ThreadItem> {
let notification: JSONRPCNotification = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification =
serde_json::from_value(notification.params.expect("item/completed params"))?;
Ok(completed.item)
}
async fn wait_for_completed_item<F>(mcp: &mut McpProcess, predicate: F) -> Result<ThreadItem>
where
F: Fn(&ThreadItem) -> bool,
{
loop {
let item = next_item_completed(mcp).await?;
if predicate(&item) {
return Ok(item);
}
}
}
async fn wait_for_turn_completed(mcp: &mut McpProcess) -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
fn item_kind(item: &ThreadItem) -> Option<&'static str> {
match item {
ThreadItem::UserMessage { .. } => Some("user"),
ThreadItem::AgentMessage { .. } => Some("agent"),
ThreadItem::Reasoning { .. } => Some("reasoning"),
ThreadItem::WebSearch { .. } => Some("web_search"),
_ => None,
}
}
fn assert_elapsed_ms(item: &ThreadItem) {
let elapsed_ms = match item {
ThreadItem::UserMessage { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::AgentMessage { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::Reasoning { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::CommandExecution { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::FileChange { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::McpToolCall { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::CollabAgentToolCall { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::WebSearch { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::ImageView { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::EnteredReviewMode { elapsed_ms, .. } => *elapsed_ms,
ThreadItem::ExitedReviewMode { elapsed_ms, .. } => *elapsed_ms,
};
assert_eq!(elapsed_ms.is_some(), true);
if let Some(value) = elapsed_ms {
assert_eq!(value >= 0, true);
}
}
fn create_function_call_sse_response(
call_id: &str,
name: &str,
args: serde_json::Value,
) -> Result<String> {
let arguments = serde_json::to_string(&args)?;
Ok(responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(call_id, name, &arguments),
responses::ev_completed("resp-1"),
]))
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
[features]
collab = true
"#
),
)
}

View File

@@ -930,6 +930,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
ref id,
status,
ref changes,
..
} = started_file_change
else {
unreachable!("loop ensures we break on file change items");
@@ -1283,6 +1284,7 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
ref id,
status,
ref changes,
..
} = started_file_change
else {
unreachable!("loop ensures we break on file change items");