[app-server] introduce turn/completed v2 event (#6800)

similar to logic in
`codex/codex-rs/exec/src/event_processor_with_jsonl_output.rs`.
translation of v1 -> v2 events:
`codex/event/task_complete` -> `turn/completed`
`codex/event/turn_aborted` -> `turn/completed` with `interrupted` status
`codex/event/error` -> `turn/completed` with `error` status

this PR also makes `items` field in `Turn` optional. For now, we only
populate it when we resume a thread, and leave it as None for all other
places until we properly rewrite core to keep track of items.

tested using the codex app server client. example new event:
```
< {
<   "method": "turn/completed",
<   "params": {
<     "turn": {
<       "id": "0",
<       "items": [],
<       "status": "interrupted"
<     }
<   }
< }
```
This commit is contained in:
Celia Chen
2025-11-18 17:55:24 -08:00
committed by GitHub
parent 4288091f63
commit b395dc1be6
6 changed files with 362 additions and 22 deletions

View File

@@ -1,5 +1,7 @@
use crate::codex_message_processor::ApiVersion;
use crate::codex_message_processor::PendingInterrupts;
use crate::codex_message_processor::TurnSummary;
use crate::codex_message_processor::TurnSummaryStore;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AgentMessageDeltaNotification;
@@ -26,7 +28,11 @@ use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAsses
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStatus;
use codex_core::CodexConversation;
use codex_core::parse_command::shlex_join;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
@@ -54,10 +60,14 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
pending_interrupts: PendingInterrupts,
turn_summary_store: TurnSummaryStore,
api_version: ApiVersion,
) {
let Event { id: event_id, msg } = event;
match msg {
EventMsg::TaskComplete(_ev) => {
handle_turn_complete(conversation_id, event_id, &outgoing, &turn_summary_store).await;
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
changes,
@@ -191,6 +201,9 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
}
EventMsg::Error(ev) => {
handle_error(conversation_id, ev.message, &turn_summary_store).await;
}
EventMsg::EnteredReviewMode(review_request) => {
let notification = ItemStartedNotification {
item: ThreadItem::CodeReview {
@@ -326,12 +339,79 @@ pub(crate) async fn apply_bespoke_event_handling(
}
}
}
handle_turn_interrupted(conversation_id, event_id, &outgoing, &turn_summary_store)
.await;
}
_ => {}
}
}
async fn emit_turn_completed_with_status(
event_id: String,
status: TurnStatus,
outgoing: &OutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
turn: Turn {
id: event_id,
items: vec![],
status,
},
};
outgoing
.send_server_notification(ServerNotification::TurnCompleted(notification))
.await;
}
async fn find_and_remove_turn_summary(
conversation_id: ConversationId,
turn_summary_store: &TurnSummaryStore,
) -> TurnSummary {
let mut map = turn_summary_store.lock().await;
map.remove(&conversation_id).unwrap_or_default()
}
async fn handle_turn_complete(
conversation_id: ConversationId,
event_id: String,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
let status = if let Some(message) = turn_summary.last_error_message {
TurnStatus::Failed {
error: TurnError { message },
}
} else {
TurnStatus::Completed
};
emit_turn_completed_with_status(event_id, status, outgoing).await;
}
async fn handle_turn_interrupted(
conversation_id: ConversationId,
event_id: String,
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
emit_turn_completed_with_status(event_id, TurnStatus::Interrupted, outgoing).await;
}
async fn handle_error(
conversation_id: ConversationId,
message: String,
turn_summary_store: &TurnSummaryStore,
) {
let mut map = turn_summary_store.lock().await;
map.entry(conversation_id).or_default().last_error_message = Some(message);
}
async fn on_patch_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonValue>,
@@ -536,13 +616,140 @@ async fn construct_mcp_tool_call_end_notification(
#[cfg(test)]
mod tests {
use super::*;
use crate::CHANNEL_CAPACITY;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use codex_core::protocol::McpInvocation;
use mcp_types::CallToolResult;
use mcp_types::ContentBlock;
use mcp_types::TextContent;
use pretty_assertions::assert_eq;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
fn new_turn_summary_store() -> TurnSummaryStore {
Arc::new(Mutex::new(HashMap::new()))
}
#[tokio::test]
async fn test_handle_error_records_message() -> Result<()> {
let conversation_id = ConversationId::new();
let turn_summary_store = new_turn_summary_store();
handle_error(conversation_id, "boom".to_string(), &turn_summary_store).await;
let turn_summary = find_and_remove_turn_summary(conversation_id, &turn_summary_store).await;
assert_eq!(turn_summary.last_error_message, Some("boom".to_string()));
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
let turn_summary_store = new_turn_summary_store();
handle_turn_complete(
conversation_id,
event_id.clone(),
&outgoing,
&turn_summary_store,
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(n.turn.status, TurnStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "interrupt1".to_string();
let turn_summary_store = new_turn_summary_store();
handle_error(conversation_id, "oops".to_string(), &turn_summary_store).await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
handle_turn_interrupted(
conversation_id,
event_id.clone(),
&outgoing,
&turn_summary_store,
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(n.turn.status, TurnStatus::Interrupted);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_emits_failed_with_error() -> Result<()> {
let conversation_id = ConversationId::new();
let event_id = "complete_err1".to_string();
let turn_summary_store = new_turn_summary_store();
handle_error(conversation_id, "bad".to_string(), &turn_summary_store).await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
handle_turn_complete(
conversation_id,
event_id.clone(),
&outgoing,
&turn_summary_store,
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_id);
assert_eq!(
n.turn.status,
TurnStatus::Failed {
error: TurnError {
message: "bad".to_string(),
}
}
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_construct_mcp_tool_call_begin_notification_with_args() {
@@ -572,6 +779,105 @@ mod tests {
assert_eq!(notification, expected);
}
#[tokio::test]
async fn test_handle_turn_complete_emits_error_multiple_turns() -> Result<()> {
// Conversation A will have two turns; Conversation B will have one turn.
let conversation_a = ConversationId::new();
let conversation_b = ConversationId::new();
let turn_summary_store = new_turn_summary_store();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(tx));
// Turn 1 on conversation A
let a_turn1 = "a_turn1".to_string();
handle_error(conversation_a, "a1".to_string(), &turn_summary_store).await;
handle_turn_complete(
conversation_a,
a_turn1.clone(),
&outgoing,
&turn_summary_store,
)
.await;
// Turn 1 on conversation B
let b_turn1 = "b_turn1".to_string();
handle_error(conversation_b, "b1".to_string(), &turn_summary_store).await;
handle_turn_complete(
conversation_b,
b_turn1.clone(),
&outgoing,
&turn_summary_store,
)
.await;
// Turn 2 on conversation A
let a_turn2 = "a_turn2".to_string();
handle_turn_complete(
conversation_a,
a_turn2.clone(),
&outgoing,
&turn_summary_store,
)
.await;
// Verify: A turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send first notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn1);
assert_eq!(
n.turn.status,
TurnStatus::Failed {
error: TurnError {
message: "a1".to_string(),
}
}
);
}
other => bail!("unexpected message: {other:?}"),
}
// Verify: B turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send second notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, b_turn1);
assert_eq!(
n.turn.status,
TurnStatus::Failed {
error: TurnError {
message: "b1".to_string(),
}
}
);
}
other => bail!("unexpected message: {other:?}"),
}
// Verify: A turn 2
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send third notification"))?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn2);
assert_eq!(n.turn.status, TurnStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_construct_mcp_tool_call_begin_notification_without_args() {
let begin_event = McpToolCallBeginEvent {