mirror of
https://github.com/openai/codex.git
synced 2026-04-23 07:51:51 +03:00
Compare commits
2 Commits
dev/flaky-
...
add-elapse
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aeb9cc1a5b | ||
|
|
a977fb9f19 |
@@ -70,7 +70,11 @@ impl ThreadHistoryBuilder {
|
||||
let mut turn = self.new_turn();
|
||||
let id = self.next_item_id();
|
||||
let content = self.build_user_inputs(payload);
|
||||
turn.items.push(ThreadItem::UserMessage { id, content });
|
||||
turn.items.push(ThreadItem::UserMessage {
|
||||
id,
|
||||
content,
|
||||
elapsed_ms: None,
|
||||
});
|
||||
self.current_turn = Some(turn);
|
||||
}
|
||||
|
||||
@@ -80,9 +84,11 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
let id = self.next_item_id();
|
||||
self.ensure_turn()
|
||||
.items
|
||||
.push(ThreadItem::AgentMessage { id, text });
|
||||
self.ensure_turn().items.push(ThreadItem::AgentMessage {
|
||||
id,
|
||||
text,
|
||||
elapsed_ms: None,
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) {
|
||||
@@ -102,6 +108,7 @@ impl ThreadHistoryBuilder {
|
||||
id,
|
||||
summary: vec![payload.text.clone()],
|
||||
content: Vec::new(),
|
||||
elapsed_ms: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -122,6 +129,7 @@ impl ThreadHistoryBuilder {
|
||||
id,
|
||||
summary: Vec::new(),
|
||||
content: vec![payload.text.clone()],
|
||||
elapsed_ms: None,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -296,6 +304,7 @@ mod tests {
|
||||
url: "https://example.com/one.png".into(),
|
||||
}
|
||||
],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -303,6 +312,7 @@ mod tests {
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-2".into(),
|
||||
text: "Hi there".into(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -311,6 +321,7 @@ mod tests {
|
||||
id: "item-3".into(),
|
||||
summary: vec!["thinking".into()],
|
||||
content: vec!["full reasoning".into()],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -325,6 +336,7 @@ mod tests {
|
||||
text: "Second turn".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -332,6 +344,7 @@ mod tests {
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-5".into(),
|
||||
text: "Reply two".into(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -370,6 +383,7 @@ mod tests {
|
||||
id: "item-2".into(),
|
||||
summary: vec!["first summary".into()],
|
||||
content: vec!["first content".into()],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -378,6 +392,7 @@ mod tests {
|
||||
id: "item-4".into(),
|
||||
summary: vec!["second summary".into()],
|
||||
content: Vec::new(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -422,6 +437,7 @@ mod tests {
|
||||
text: "Please do the thing".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -429,6 +445,7 @@ mod tests {
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-2".into(),
|
||||
text: "Working...".into(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -443,6 +460,7 @@ mod tests {
|
||||
text: "Let's try again".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -450,6 +468,7 @@ mod tests {
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-4".into(),
|
||||
text: "Second attempt complete.".into(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -500,10 +519,12 @@ mod tests {
|
||||
text: "First".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-2".into(),
|
||||
text: "A1".into(),
|
||||
elapsed_ms: None,
|
||||
},
|
||||
],
|
||||
},
|
||||
@@ -518,10 +539,12 @@ mod tests {
|
||||
text: "Third".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-4".into(),
|
||||
text: "A3".into(),
|
||||
elapsed_ms: None,
|
||||
},
|
||||
],
|
||||
},
|
||||
|
||||
@@ -1789,10 +1789,20 @@ impl From<CoreUserInput> for UserInput {
|
||||
pub enum ThreadItem {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
UserMessage { id: String, content: Vec<UserInput> },
|
||||
UserMessage {
|
||||
id: String,
|
||||
content: Vec<UserInput>,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
AgentMessage { id: String, text: String },
|
||||
AgentMessage {
|
||||
id: String,
|
||||
text: String,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
Reasoning {
|
||||
@@ -1801,6 +1811,8 @@ pub enum ThreadItem {
|
||||
summary: Vec<String>,
|
||||
#[serde(default)]
|
||||
content: Vec<String>,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
@@ -1824,6 +1836,8 @@ pub enum ThreadItem {
|
||||
/// The duration of the command execution in milliseconds.
|
||||
#[ts(type = "number | null")]
|
||||
duration_ms: Option<i64>,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
@@ -1831,6 +1845,8 @@ pub enum ThreadItem {
|
||||
id: String,
|
||||
changes: Vec<FileUpdateChange>,
|
||||
status: PatchApplyStatus,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
@@ -1845,6 +1861,8 @@ pub enum ThreadItem {
|
||||
/// The duration of the MCP tool call in milliseconds.
|
||||
#[ts(type = "number | null")]
|
||||
duration_ms: Option<i64>,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
@@ -1864,19 +1882,41 @@ pub enum ThreadItem {
|
||||
prompt: Option<String>,
|
||||
/// Last known status of the target agents, when available.
|
||||
agents_states: HashMap<String, CollabAgentState>,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
WebSearch { id: String, query: String },
|
||||
WebSearch {
|
||||
id: String,
|
||||
query: String,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
ImageView { id: String, path: String },
|
||||
ImageView {
|
||||
id: String,
|
||||
path: String,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
EnteredReviewMode { id: String, review: String },
|
||||
EnteredReviewMode {
|
||||
id: String,
|
||||
review: String,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
ExitedReviewMode { id: String, review: String },
|
||||
ExitedReviewMode {
|
||||
id: String,
|
||||
review: String,
|
||||
#[ts(type = "number | null")]
|
||||
elapsed_ms: Option<i64>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<CoreTurnItem> for ThreadItem {
|
||||
@@ -1885,6 +1925,7 @@ impl From<CoreTurnItem> for ThreadItem {
|
||||
CoreTurnItem::UserMessage(user) => ThreadItem::UserMessage {
|
||||
id: user.id,
|
||||
content: user.content.into_iter().map(UserInput::from).collect(),
|
||||
elapsed_ms: None,
|
||||
},
|
||||
CoreTurnItem::AgentMessage(agent) => {
|
||||
let text = agent
|
||||
@@ -1894,16 +1935,22 @@ impl From<CoreTurnItem> for ThreadItem {
|
||||
CoreAgentMessageContent::Text { text } => text,
|
||||
})
|
||||
.collect::<String>();
|
||||
ThreadItem::AgentMessage { id: agent.id, text }
|
||||
ThreadItem::AgentMessage {
|
||||
id: agent.id,
|
||||
text,
|
||||
elapsed_ms: None,
|
||||
}
|
||||
}
|
||||
CoreTurnItem::Reasoning(reasoning) => ThreadItem::Reasoning {
|
||||
id: reasoning.id,
|
||||
summary: reasoning.summary_text,
|
||||
content: reasoning.raw_content,
|
||||
elapsed_ms: None,
|
||||
},
|
||||
CoreTurnItem::WebSearch(search) => ThreadItem::WebSearch {
|
||||
id: search.id,
|
||||
query: search.query,
|
||||
elapsed_ms: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -2548,6 +2595,7 @@ mod tests {
|
||||
path: PathBuf::from("/repo/.codex/skills/skill-creator/SKILL.md"),
|
||||
},
|
||||
],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -2568,6 +2616,7 @@ mod tests {
|
||||
ThreadItem::AgentMessage {
|
||||
id: "agent-1".to_string(),
|
||||
text: "Hello world".to_string(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -2583,6 +2632,7 @@ mod tests {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["line one".to_string(), "line two".to_string()],
|
||||
content: vec![],
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -2596,6 +2646,7 @@ mod tests {
|
||||
ThreadItem::WebSearch {
|
||||
id: "search-1".to_string(),
|
||||
query: "docs".to_string(),
|
||||
elapsed_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -403,6 +403,8 @@ Today both notifications carry an empty `items` array even when item events were
|
||||
- `exitedReviewMode` — `{id, review}` emitted when the reviewer finishes; `review` is the full plain-text review (usually, overall notes plus bullet point findings).
|
||||
- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically.
|
||||
|
||||
Each item also includes `elapsedMs?`, the elapsed milliseconds since the turn started. This is populated on `item/completed` notifications (and omitted/`null` on `item/started` and history-loaded items).
|
||||
|
||||
All items emit two shared lifecycle events:
|
||||
|
||||
- `item/started` — emits the full `item` when a new unit of work begins so the UI can render it immediately; the `item.id` in this payload matches the `itemId` used by deltas.
|
||||
|
||||
@@ -93,6 +93,7 @@ use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
|
||||
@@ -115,6 +116,22 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
msg,
|
||||
} = event;
|
||||
match msg {
|
||||
EventMsg::TurnStarted(_ev) => {
|
||||
if let ApiVersion::V2 = api_version {
|
||||
let mut map = turn_summary_store.lock().await;
|
||||
let summary = map.entry(conversation_id).or_default();
|
||||
summary.file_change_started.clear();
|
||||
summary.last_error = None;
|
||||
let is_same_turn =
|
||||
summary.active_turn_id.as_deref() == Some(event_turn_id.as_str());
|
||||
if !is_same_turn {
|
||||
summary.active_turn_id = Some(event_turn_id);
|
||||
summary.turn_started_at = Some(Instant::now());
|
||||
} else if summary.turn_started_at.is_none() {
|
||||
summary.turn_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::TurnComplete(_ev) => {
|
||||
handle_turn_complete(
|
||||
conversation_id,
|
||||
@@ -162,6 +179,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
id: item_id.clone(),
|
||||
changes: patch_changes.clone(),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -251,6 +269,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
params,
|
||||
))
|
||||
.await;
|
||||
let turn_summary_store = turn_summary_store.clone();
|
||||
tokio::spawn(async move {
|
||||
on_command_execution_request_approval_response(
|
||||
event_turn_id,
|
||||
@@ -262,6 +281,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
rx,
|
||||
conversation,
|
||||
outgoing,
|
||||
turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
@@ -331,10 +351,13 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::McpToolCallEnd(end_event) => {
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let notification = construct_mcp_tool_call_end_notification(
|
||||
end_event,
|
||||
conversation_id.to_string(),
|
||||
event_turn_id.clone(),
|
||||
elapsed_ms,
|
||||
)
|
||||
.await;
|
||||
outgoing
|
||||
@@ -350,6 +373,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some(begin_event.prompt),
|
||||
agents_states: HashMap::new(),
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -361,6 +385,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::CollabAgentSpawnEnd(end_event) => {
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let has_receiver = end_event.new_thread_id.is_some();
|
||||
let status = match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
@@ -387,6 +413,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids,
|
||||
prompt: Some(end_event.prompt),
|
||||
agents_states,
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -407,6 +434,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids,
|
||||
prompt: Some(begin_event.prompt),
|
||||
agents_states: HashMap::new(),
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -418,6 +446,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::CollabAgentInteractionEnd(end_event) => {
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let status = match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
|
||||
@@ -433,6 +463,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids: vec![receiver_id.clone()],
|
||||
prompt: Some(end_event.prompt),
|
||||
agents_states: [(receiver_id, received_status)].into_iter().collect(),
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -457,6 +488,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids,
|
||||
prompt: None,
|
||||
agents_states: HashMap::new(),
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -468,6 +500,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::CollabWaitingEnd(end_event) => {
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let status = if end_event.statuses.values().any(|status| {
|
||||
matches!(
|
||||
status,
|
||||
@@ -493,6 +527,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids,
|
||||
prompt: None,
|
||||
agents_states,
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -512,6 +547,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids: vec![begin_event.receiver_thread_id.to_string()],
|
||||
prompt: None,
|
||||
agents_states: HashMap::new(),
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -523,6 +559,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::CollabCloseEnd(end_event) => {
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let status = match &end_event.status {
|
||||
codex_protocol::protocol::AgentStatus::Errored(_)
|
||||
| codex_protocol::protocol::AgentStatus::NotFound => V2CollabToolCallStatus::Failed,
|
||||
@@ -543,6 +581,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
receiver_thread_ids: vec![receiver_id],
|
||||
prompt: None,
|
||||
agents_states,
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -682,6 +721,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
let item = ThreadItem::ImageView {
|
||||
id: view_image_event.call_id.clone(),
|
||||
path: view_image_event.path.to_string_lossy().into_owned(),
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let started = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -691,6 +731,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ItemStarted(started))
|
||||
.await;
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let mut item = item;
|
||||
set_elapsed_ms(&mut item, elapsed_ms);
|
||||
let completed = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id: event_turn_id.clone(),
|
||||
@@ -707,6 +751,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
let item = ThreadItem::EnteredReviewMode {
|
||||
id: event_turn_id.clone(),
|
||||
review,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let started = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -716,6 +761,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ItemStarted(started))
|
||||
.await;
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let mut item = item;
|
||||
set_elapsed_ms(&mut item, elapsed_ms);
|
||||
let completed = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id: event_turn_id.clone(),
|
||||
@@ -737,7 +786,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::ItemCompleted(item_completed_event) => {
|
||||
let item: ThreadItem = item_completed_event.item.clone().into();
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let mut item: ThreadItem = item_completed_event.item.clone().into();
|
||||
set_elapsed_ms(&mut item, elapsed_ms);
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id: event_turn_id.clone(),
|
||||
@@ -755,6 +807,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
let item = ThreadItem::ExitedReviewMode {
|
||||
id: event_turn_id.clone(),
|
||||
review,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let started = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -764,6 +817,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ItemStarted(started))
|
||||
.await;
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
let mut item = item;
|
||||
set_elapsed_ms(&mut item, elapsed_ms);
|
||||
let completed = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id: event_turn_id.clone(),
|
||||
@@ -798,6 +855,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
id: item_id.clone(),
|
||||
changes: convert_patch_changes(&patch_begin_event.changes),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -852,6 +910,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
let notification = ItemStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -946,6 +1005,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
};
|
||||
|
||||
let duration_ms = i64::try_from(duration.as_millis()).unwrap_or(i64::MAX);
|
||||
let elapsed_ms =
|
||||
turn_elapsed_ms(conversation_id, &event_turn_id, &turn_summary_store).await;
|
||||
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: call_id,
|
||||
@@ -957,6 +1018,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
aggregated_output,
|
||||
exit_code: Some(exit_code),
|
||||
duration_ms: Some(duration_ms),
|
||||
elapsed_ms,
|
||||
};
|
||||
|
||||
let notification = ItemCompletedNotification {
|
||||
@@ -1140,6 +1202,65 @@ async fn emit_turn_completed_with_status(
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn turn_elapsed_ms(
|
||||
conversation_id: ThreadId,
|
||||
turn_id: &str,
|
||||
turn_summary_store: &TurnSummaryStore,
|
||||
) -> Option<i64> {
|
||||
let mut map = turn_summary_store.lock().await;
|
||||
let summary = map.entry(conversation_id).or_default();
|
||||
if summary
|
||||
.active_turn_id
|
||||
.as_deref()
|
||||
.is_some_and(|active| active != turn_id)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
if summary.active_turn_id.is_none() {
|
||||
summary.active_turn_id = Some(turn_id.to_string());
|
||||
}
|
||||
let started_at = summary.turn_started_at.get_or_insert_with(Instant::now);
|
||||
i64::try_from(started_at.elapsed().as_millis()).ok()
|
||||
}
|
||||
|
||||
fn set_elapsed_ms(item: &mut ThreadItem, elapsed_ms: Option<i64>) {
|
||||
match item {
|
||||
ThreadItem::UserMessage {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::AgentMessage {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::Reasoning {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::CommandExecution {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::FileChange {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::McpToolCall {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::WebSearch {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::ImageView {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::EnteredReviewMode {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
ThreadItem::ExitedReviewMode {
|
||||
elapsed_ms: field, ..
|
||||
} => *field = elapsed_ms,
|
||||
}
|
||||
}
|
||||
|
||||
async fn complete_file_change_item(
|
||||
conversation_id: ThreadId,
|
||||
item_id: String,
|
||||
@@ -1156,10 +1277,12 @@ async fn complete_file_change_item(
|
||||
}
|
||||
}
|
||||
|
||||
let elapsed_ms = turn_elapsed_ms(conversation_id, &turn_id, turn_summary_store).await;
|
||||
let item = ThreadItem::FileChange {
|
||||
id: item_id,
|
||||
changes,
|
||||
status,
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -1182,7 +1305,9 @@ async fn complete_command_execution_item(
|
||||
command_actions: Vec<V2ParsedCommand>,
|
||||
status: CommandExecutionStatus,
|
||||
outgoing: &OutgoingMessageSender,
|
||||
turn_summary_store: &TurnSummaryStore,
|
||||
) {
|
||||
let elapsed_ms = turn_elapsed_ms(conversation_id, &turn_id, turn_summary_store).await;
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: item_id,
|
||||
command,
|
||||
@@ -1193,6 +1318,7 @@ async fn complete_command_execution_item(
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
elapsed_ms,
|
||||
};
|
||||
let notification = ItemCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
@@ -1612,6 +1738,7 @@ async fn on_command_execution_request_approval_response(
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
conversation: Arc<CodexThread>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
turn_summary_store: TurnSummaryStore,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let (decision, completion_status) = match response {
|
||||
@@ -1667,6 +1794,7 @@ async fn on_command_execution_request_approval_response(
|
||||
command_actions.clone(),
|
||||
status,
|
||||
outgoing.as_ref(),
|
||||
&turn_summary_store,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -1697,6 +1825,7 @@ async fn construct_mcp_tool_call_notification(
|
||||
result: None,
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
elapsed_ms: None,
|
||||
};
|
||||
ItemStartedNotification {
|
||||
thread_id,
|
||||
@@ -1710,6 +1839,7 @@ async fn construct_mcp_tool_call_end_notification(
|
||||
end_event: McpToolCallEndEvent,
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
elapsed_ms: Option<i64>,
|
||||
) -> ItemCompletedNotification {
|
||||
let status = if end_event.is_success() {
|
||||
McpToolCallStatus::Completed
|
||||
@@ -1743,6 +1873,7 @@ async fn construct_mcp_tool_call_end_notification(
|
||||
result,
|
||||
error,
|
||||
duration_ms,
|
||||
elapsed_ms,
|
||||
};
|
||||
ItemCompletedNotification {
|
||||
thread_id,
|
||||
@@ -2134,6 +2265,7 @@ mod tests {
|
||||
result: None,
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
elapsed_ms: None,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2292,6 +2424,7 @@ mod tests {
|
||||
result: None,
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
elapsed_ms: None,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2324,10 +2457,12 @@ mod tests {
|
||||
|
||||
let thread_id = ThreadId::new().to_string();
|
||||
let turn_id = "turn_3".to_string();
|
||||
let elapsed_ms = Some(123);
|
||||
let notification = construct_mcp_tool_call_end_notification(
|
||||
end_event.clone(),
|
||||
thread_id.clone(),
|
||||
turn_id.clone(),
|
||||
elapsed_ms,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -2346,6 +2481,7 @@ mod tests {
|
||||
}),
|
||||
error: None,
|
||||
duration_ms: Some(0),
|
||||
elapsed_ms,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2367,10 +2503,12 @@ mod tests {
|
||||
|
||||
let thread_id = ThreadId::new().to_string();
|
||||
let turn_id = "turn_4".to_string();
|
||||
let elapsed_ms = Some(456);
|
||||
let notification = construct_mcp_tool_call_end_notification(
|
||||
end_event.clone(),
|
||||
thread_id.clone(),
|
||||
turn_id.clone(),
|
||||
elapsed_ms,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -2388,6 +2526,7 @@ mod tests {
|
||||
message: "boom".to_string(),
|
||||
}),
|
||||
duration_ms: Some(1),
|
||||
elapsed_ms,
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -187,6 +187,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::select;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
@@ -207,6 +208,8 @@ pub(crate) type PendingRollbacks = Arc<Mutex<HashMap<ThreadId, RequestId>>>;
|
||||
pub(crate) struct TurnSummary {
|
||||
pub(crate) file_change_started: HashSet<String>,
|
||||
pub(crate) last_error: Option<TurnError>,
|
||||
pub(crate) active_turn_id: Option<String>,
|
||||
pub(crate) turn_started_at: Option<Instant>,
|
||||
}
|
||||
|
||||
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ThreadId, TurnSummary>>>;
|
||||
@@ -3581,6 +3584,7 @@ impl CodexMessageProcessor {
|
||||
// Review prompt display text is synthesized; no UI element ranges to preserve.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
elapsed_ms: None,
|
||||
}]
|
||||
};
|
||||
|
||||
|
||||
@@ -17,4 +17,5 @@ mod thread_resume;
|
||||
mod thread_rollback;
|
||||
mod thread_start;
|
||||
mod turn_interrupt;
|
||||
mod turn_items_elapsed;
|
||||
mod turn_start;
|
||||
|
||||
@@ -16,6 +16,7 @@ use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
@@ -87,7 +88,7 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
|
||||
let started: ItemStartedNotification =
|
||||
serde_json::from_value(item_started.params.expect("params must be present"))?;
|
||||
match started.item {
|
||||
ThreadItem::EnteredReviewMode { id, review } => {
|
||||
ThreadItem::EnteredReviewMode { id, review, .. } => {
|
||||
assert_eq!(id, turn_id);
|
||||
assert_eq!(review, "commit 1234567: Tidy UI colors");
|
||||
saw_entered_review_mode = true;
|
||||
@@ -101,10 +102,10 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
|
||||
"did not observe enteredReviewMode item"
|
||||
);
|
||||
|
||||
// Confirm we see the ExitedReviewMode marker (with review text)
|
||||
// on the same turn. Ignore any other items the stream surfaces.
|
||||
// Confirm we see the EnteredReviewMode + ExitedReviewMode markers on the same turn.
|
||||
let mut saw_entered_completed = false;
|
||||
let mut review_body: Option<String> = None;
|
||||
for _ in 0..10 {
|
||||
for _ in 0..12 {
|
||||
let review_notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("item/completed"),
|
||||
@@ -113,15 +114,28 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
|
||||
let completed: ItemCompletedNotification =
|
||||
serde_json::from_value(review_notif.params.expect("params must be present"))?;
|
||||
match completed.item {
|
||||
ThreadItem::ExitedReviewMode { id, review } => {
|
||||
ThreadItem::EnteredReviewMode { id, elapsed_ms, .. } => {
|
||||
assert_eq!(id, turn_id);
|
||||
assert_eq!(elapsed_ms.is_some(), true);
|
||||
saw_entered_completed = true;
|
||||
}
|
||||
ThreadItem::ExitedReviewMode {
|
||||
id,
|
||||
review,
|
||||
elapsed_ms,
|
||||
} => {
|
||||
assert_eq!(id, turn_id);
|
||||
assert_eq!(elapsed_ms.is_some(), true);
|
||||
review_body = Some(review);
|
||||
break;
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
if saw_entered_completed && review_body.is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(saw_entered_completed, true);
|
||||
let review = review_body.expect("did not observe a code review item");
|
||||
assert!(review.contains("Prefer Stylize helpers"));
|
||||
assert!(review.contains("/tmp/file.rs:10-20"));
|
||||
|
||||
303
codex-rs/app-server/tests/suite/v2/turn_items_elapsed.rs
Normal file
303
codex-rs/app-server/tests/suite/v2/turn_items_elapsed.rs
Normal file
@@ -0,0 +1,303 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_items_emit_elapsed_ms_on_completion() -> Result<()> {
|
||||
let tmp = TempDir::new()?;
|
||||
let codex_home = tmp.path().join("codex_home");
|
||||
std::fs::create_dir(&codex_home)?;
|
||||
let workspace = tmp.path().join("workspace");
|
||||
std::fs::create_dir(&workspace)?;
|
||||
|
||||
let image_path = workspace.join("image.png");
|
||||
std::fs::write(&image_path, b"not an image")?;
|
||||
let image_path_str = image_path.to_string_lossy().into_owned();
|
||||
|
||||
let mcp_tool_name = "list_mcp_resources";
|
||||
|
||||
let patch = r#"*** Begin Patch
|
||||
*** Add File: README.md
|
||||
+new line
|
||||
*** End Patch
|
||||
"#;
|
||||
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_reasoning_item("reason-1", &["summary"], &["details"]),
|
||||
responses::ev_web_search_call_done("search-1", "completed", "query"),
|
||||
responses::ev_assistant_message("msg-1", "hello"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_shell_command_sse_response(
|
||||
vec!["echo".to_string(), "hi".to_string()],
|
||||
None,
|
||||
None,
|
||||
"shell-1",
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("shell done")?,
|
||||
create_apply_patch_sse_response(patch, "patch-1")?,
|
||||
create_final_assistant_message_sse_response("patch done")?,
|
||||
create_function_call_sse_response(
|
||||
"view-1",
|
||||
"view_image",
|
||||
json!({ "path": image_path_str }),
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("view done")?,
|
||||
create_function_call_sse_response(
|
||||
"collab-1",
|
||||
"close_agent",
|
||||
json!({ "id": "00000000-0000-0000-0000-000000000001" }),
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("collab done")?,
|
||||
create_function_call_sse_response("mcp-1", mcp_tool_name, json!({}))?,
|
||||
create_final_assistant_message_sse_response("mcp done")?,
|
||||
];
|
||||
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_id = start_thread(&mut mcp, &workspace).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "first turn").await?;
|
||||
let mut pending = HashSet::from(["user", "agent", "reasoning", "web_search"]);
|
||||
for _ in 0..12 {
|
||||
let item = timeout(DEFAULT_READ_TIMEOUT, next_item_completed(&mut mcp)).await??;
|
||||
if let Some(kind) = item_kind(&item) {
|
||||
if pending.remove(kind) {
|
||||
assert_elapsed_ms(&item);
|
||||
}
|
||||
}
|
||||
if pending.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert_eq!(pending.is_empty(), true);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "shell turn").await?;
|
||||
let command_exec_item = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
wait_for_completed_item(&mut mcp, |item| {
|
||||
matches!(item, ThreadItem::CommandExecution { .. })
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
assert_elapsed_ms(&command_exec_item);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "patch turn").await?;
|
||||
let file_change_item = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
wait_for_completed_item(&mut mcp, |item| {
|
||||
matches!(item, ThreadItem::FileChange { .. })
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
assert_elapsed_ms(&file_change_item);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "image turn").await?;
|
||||
let image_item = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
wait_for_completed_item(&mut mcp, |item| {
|
||||
matches!(item, ThreadItem::ImageView { .. })
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
assert_elapsed_ms(&image_item);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "collab turn").await?;
|
||||
let collab_item = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
wait_for_completed_item(&mut mcp, |item| {
|
||||
matches!(item, ThreadItem::CollabAgentToolCall { .. })
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
assert_elapsed_ms(&collab_item);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
start_turn(&mut mcp, &thread_id, &workspace, "mcp turn").await?;
|
||||
let mcp_item = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
wait_for_completed_item(&mut mcp, |item| {
|
||||
matches!(item, ThreadItem::McpToolCall { .. })
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
assert_elapsed_ms(&mcp_item);
|
||||
wait_for_turn_completed(&mut mcp).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_thread(mcp: &mut McpProcess, cwd: &Path) -> Result<String> {
|
||||
let start_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
cwd: Some(cwd.to_string_lossy().into_owned()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
Ok(thread.id)
|
||||
}
|
||||
|
||||
async fn start_turn(
|
||||
mcp: &mut McpProcess,
|
||||
thread_id: &str,
|
||||
cwd: &Path,
|
||||
text: &str,
|
||||
) -> Result<String> {
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: text.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
cwd: Some(cwd.to_path_buf()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
Ok(turn.id)
|
||||
}
|
||||
|
||||
async fn next_item_completed(mcp: &mut McpProcess) -> Result<ThreadItem> {
|
||||
let notification: JSONRPCNotification = mcp
|
||||
.read_stream_until_notification_message("item/completed")
|
||||
.await?;
|
||||
let completed: ItemCompletedNotification =
|
||||
serde_json::from_value(notification.params.expect("item/completed params"))?;
|
||||
Ok(completed.item)
|
||||
}
|
||||
|
||||
async fn wait_for_completed_item<F>(mcp: &mut McpProcess, predicate: F) -> Result<ThreadItem>
|
||||
where
|
||||
F: Fn(&ThreadItem) -> bool,
|
||||
{
|
||||
loop {
|
||||
let item = next_item_completed(mcp).await?;
|
||||
if predicate(&item) {
|
||||
return Ok(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_turn_completed(mcp: &mut McpProcess) -> Result<()> {
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn item_kind(item: &ThreadItem) -> Option<&'static str> {
|
||||
match item {
|
||||
ThreadItem::UserMessage { .. } => Some("user"),
|
||||
ThreadItem::AgentMessage { .. } => Some("agent"),
|
||||
ThreadItem::Reasoning { .. } => Some("reasoning"),
|
||||
ThreadItem::WebSearch { .. } => Some("web_search"),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_elapsed_ms(item: &ThreadItem) {
|
||||
let elapsed_ms = match item {
|
||||
ThreadItem::UserMessage { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::AgentMessage { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::Reasoning { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::CommandExecution { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::FileChange { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::McpToolCall { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::CollabAgentToolCall { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::WebSearch { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::ImageView { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::EnteredReviewMode { elapsed_ms, .. } => *elapsed_ms,
|
||||
ThreadItem::ExitedReviewMode { elapsed_ms, .. } => *elapsed_ms,
|
||||
};
|
||||
assert_eq!(elapsed_ms.is_some(), true);
|
||||
if let Some(value) = elapsed_ms {
|
||||
assert_eq!(value >= 0, true);
|
||||
}
|
||||
}
|
||||
|
||||
fn create_function_call_sse_response(
|
||||
call_id: &str,
|
||||
name: &str,
|
||||
args: serde_json::Value,
|
||||
) -> Result<String> {
|
||||
let arguments = serde_json::to_string(&args)?;
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, name, &arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "workspace-write"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
|
||||
[features]
|
||||
collab = true
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -930,6 +930,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
|
||||
ref id,
|
||||
status,
|
||||
ref changes,
|
||||
..
|
||||
} = started_file_change
|
||||
else {
|
||||
unreachable!("loop ensures we break on file change items");
|
||||
@@ -1283,6 +1284,7 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
|
||||
ref id,
|
||||
status,
|
||||
ref changes,
|
||||
..
|
||||
} = started_file_change
|
||||
else {
|
||||
unreachable!("loop ensures we break on file change items");
|
||||
|
||||
Reference in New Issue
Block a user