Inline response recording

This commit is contained in:
Ahmed Ibrahim
2025-11-25 17:12:05 -08:00
parent 981e2f742d
commit 387700a887
4 changed files with 292 additions and 291 deletions

View File

@@ -15,7 +15,6 @@ use crate::features::Feature;
use crate::function_tool::FunctionCallError; use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command; use crate::parse_command::parse_command;
use crate::parse_turn_item; use crate::parse_turn_item;
use crate::response_processing::process_items;
use crate::terminal; use crate::terminal;
use crate::truncate::TruncationPolicy; use crate::truncate::TruncationPolicy;
use crate::user_notification::UserNotifier; use crate::user_notification::UserNotifier;
@@ -38,7 +37,7 @@ use codex_protocol::protocol::TurnContextItem;
use codex_rmcp_client::ElicitationResponse; use codex_rmcp_client::ElicitationResponse;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::prelude::*; use futures::prelude::*;
use futures::stream::FuturesOrdered; use futures::stream::FuturesUnordered;
use mcp_types::CallToolResult; use mcp_types::CallToolResult;
use mcp_types::ListResourceTemplatesRequestParams; use mcp_types::ListResourceTemplatesRequestParams;
use mcp_types::ListResourceTemplatesResult; use mcp_types::ListResourceTemplatesResult;
@@ -1967,15 +1966,16 @@ pub(crate) async fn run_task(
.await .await
{ {
Ok(turn_output) => { Ok(turn_output) => {
let processed_items = turn_output; let TurnRunResult {
responses,
last_agent_message: turn_last_agent_message,
} = turn_output;
let limit = turn_context let limit = turn_context
.client .client
.get_auto_compact_token_limit() .get_auto_compact_token_limit()
.unwrap_or(i64::MAX); .unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await; let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= limit; let token_limit_reached = total_usage_tokens >= limit;
let (responses, items_to_record_in_conversation_history) =
process_items(processed_items, &sess, &turn_context).await;
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached { if token_limit_reached {
@@ -1989,9 +1989,7 @@ pub(crate) async fn run_task(
} }
if responses.is_empty() { if responses.is_empty() {
last_agent_message = get_last_assistant_message_from_turn( last_agent_message = turn_last_agent_message;
&items_to_record_in_conversation_history,
);
sess.notifier() sess.notifier()
.notify(&UserNotification::AgentTurnComplete { .notify(&UserNotification::AgentTurnComplete {
thread_id: sess.conversation_id.to_string(), thread_id: sess.conversation_id.to_string(),
@@ -2004,10 +2002,7 @@ pub(crate) async fn run_task(
} }
continue; continue;
} }
Err(CodexErr::TurnAborted { Err(CodexErr::TurnAborted) => {
dangling_artifacts: processed_items,
}) => {
let _ = process_items(processed_items, &sess, &turn_context).await;
// Aborted turn is reported via a different event. // Aborted turn is reported via a different event.
break; break;
} }
@@ -2030,7 +2025,7 @@ async fn run_turn(
turn_diff_tracker: SharedTurnDiffTracker, turn_diff_tracker: SharedTurnDiffTracker,
input: Vec<ResponseItem>, input: Vec<ResponseItem>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> CodexResult<Vec<ProcessedResponseItem>> { ) -> CodexResult<TurnRunResult> {
let mcp_tools = sess let mcp_tools = sess
.services .services
.mcp_connection_manager .mcp_connection_manager
@@ -2095,12 +2090,8 @@ async fn run_turn(
.await .await
{ {
Ok(output) => return Ok(output), Ok(output) => return Ok(output),
Err(CodexErr::TurnAborted { Err(CodexErr::TurnAborted) => {
dangling_artifacts: processed_items, return Err(CodexErr::TurnAborted);
}) => {
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
} }
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
@@ -2151,14 +2142,10 @@ async fn run_turn(
} }
} }
/// When the model is prompted, it returns a stream of events. Some of these
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)] #[derive(Debug)]
pub struct ProcessedResponseItem { struct TurnRunResult {
pub item: ResponseItem, responses: Vec<ResponseInputItem>,
pub response: Option<ResponseInputItem>, last_agent_message: Option<String>,
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@@ -2169,7 +2156,7 @@ async fn try_run_turn(
turn_diff_tracker: SharedTurnDiffTracker, turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt, prompt: &Prompt,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> CodexResult<Vec<ProcessedResponseItem>> { ) -> CodexResult<TurnRunResult> {
let rollout_item = RolloutItem::TurnContext(TurnContextItem { let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(), cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy, approval_policy: turn_context.approval_policy,
@@ -2193,214 +2180,304 @@ async fn try_run_turn(
Arc::clone(&turn_context), Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker), Arc::clone(&turn_diff_tracker),
); );
let mut output: FuturesOrdered<BoxFuture<CodexResult<ProcessedResponseItem>>> = let mut in_flight: FuturesUnordered<BoxFuture<CodexResult<ResponseInputItem>>> =
FuturesOrdered::new(); FuturesUnordered::new();
let mut responses: Vec<ResponseInputItem> = Vec::new();
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None; let mut active_item: Option<TurnItem> = None;
loop { loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err tokio::select! {
// cases so that transient stream failures (e.g., dropped SSE connection before Some(res) = in_flight.next(), if !in_flight.is_empty() => {
// `response.completed`) bubble up and trigger the caller's retry logic. let response_input = res?;
let event = match stream.next().or_cancel(&cancellation_token).await { responses.push(response_input);
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => {
let processed_items = output.try_collect().await?;
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
} }
}; event = stream.next().or_cancel(&cancellation_token) => {
let event = match event {
let event = match event { Ok(event) => event,
Some(res) => res?, Err(codex_async_utils::CancelErr::Cancelled) => {
None => { while let Some(res) = in_flight.next().await {
return Err(CodexErr::Stream( let _ = res?;
"stream closed before response.completed".into(), }
None, return Err(CodexErr::TurnAborted);
));
}
};
let add_completed = &mut |response_item: ProcessedResponseItem| {
output.push_back(future::ready(Ok(response_item)).boxed());
};
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let previously_active_item = active_item.take();
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await {
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
output.push_back(
async move {
Ok(ProcessedResponseItem {
item,
response: Some(response.await?),
})
}
.boxed(),
);
} }
Ok(None) => { };
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
if previously_active_item.is_none() {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
sess.emit_turn_item_completed(&turn_context, turn_item) let event = match event {
Some(res) => res?,
None => {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
}
};
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let previously_active_item = active_item.take();
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()).await {
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&item),
)
.await; .await;
let sess_for_output = Arc::clone(&sess);
let turn_for_output = Arc::clone(&turn_context);
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
in_flight.push(async move {
let response_input = response.await?;
if let Some(response_item) =
response_input_to_response_item(&response_input)
{
sess_for_output
.record_conversation_items(
turn_for_output.as_ref(),
std::slice::from_ref(&response_item),
)
.await;
}
Ok(response_input)
}
.boxed());
}
Ok(None) => {
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
if previously_active_item.is_none() {
sess.emit_turn_item_started(&turn_context, &turn_item).await;
}
sess.emit_turn_item_completed(&turn_context, turn_item)
.await;
}
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&item),
)
.await;
if let Some(agent_message) = last_assistant_message_from_item(&item) {
last_agent_message = Some(agent_message);
}
}
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
turn_context
.client
.get_otel_event_manager()
.log_tool_failed("local_shell", msg);
error!(msg);
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: msg.to_string(),
..Default::default()
},
};
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&item),
)
.await;
if let Some(response_item) =
response_input_to_response_item(&response)
{
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&response_item),
)
.await;
}
responses.push(response);
}
Err(FunctionCallError::RespondToModel(message))
| Err(FunctionCallError::Denied(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: message,
..Default::default()
},
};
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&item),
)
.await;
if let Some(response_item) =
response_input_to_response_item(&response)
{
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&response_item),
)
.await;
}
responses.push(response);
}
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
}
}
}
ResponseEvent::OutputItemAdded(item) => {
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
let tracked_item = turn_item.clone();
sess.emit_turn_item_started(&turn_context, &turn_item).await;
active_item = Some(tracked_item);
}
}
ResponseEvent::RateLimits(snapshot) => {
// Update internal state with latest rate limits, but defer sending until
// token usage is available to avoid duplicate TokenCount events.
sess.update_rate_limits(&turn_context, snapshot).await;
}
ResponseEvent::Completed {
response_id: _,
token_usage,
} => {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
while let Some(res) = in_flight.next().await {
responses.push(res?);
}
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
sess.send_event(&turn_context, msg).await;
} }
add_completed(ProcessedResponseItem { return Ok(TurnRunResult {
item, responses,
response: None, last_agent_message,
}); });
} }
Err(FunctionCallError::MissingLocalShellCallId) => { ResponseEvent::OutputTextDelta(delta) => {
let msg = "LocalShellCall without call_id or id"; // In review child threads, suppress assistant text deltas; the
turn_context // UI will show a selection popup from the final ReviewOutput.
.client if let Some(active) = active_item.as_ref() {
.get_otel_event_manager() let event = AgentMessageContentDeltaEvent {
.log_tool_failed("local_shell", msg); thread_id: sess.conversation_id.to_string(),
error!(msg); turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
let response = ResponseInputItem::FunctionCallOutput { delta: delta.clone(),
call_id: String::new(), };
output: FunctionCallOutputPayload { sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event))
content: msg.to_string(), .await;
..Default::default() } else {
}, error_or_panic("OutputTextDelta without active item".to_string());
}; }
add_completed(ProcessedResponseItem {
item,
response: Some(response),
});
} }
Err(FunctionCallError::RespondToModel(message)) ResponseEvent::ReasoningSummaryDelta {
| Err(FunctionCallError::Denied(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: message,
..Default::default()
},
};
add_completed(ProcessedResponseItem {
item,
response: Some(response),
});
}
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
}
}
}
ResponseEvent::OutputItemAdded(item) => {
if let Some(turn_item) = handle_non_tool_response_item(&item).await {
let tracked_item = turn_item.clone();
sess.emit_turn_item_started(&turn_context, &turn_item).await;
active_item = Some(tracked_item);
}
}
ResponseEvent::RateLimits(snapshot) => {
// Update internal state with latest rate limits, but defer sending until
// token usage is available to avoid duplicate TokenCount events.
sess.update_rate_limits(&turn_context, snapshot).await;
}
ResponseEvent::Completed {
response_id: _,
token_usage,
} => {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
let processed_items = output.try_collect().await?;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
sess.send_event(&turn_context, msg).await;
}
return Ok(processed_items);
}
ResponseEvent::OutputTextDelta(delta) => {
// In review child threads, suppress assistant text deltas; the
// UI will show a selection popup from the final ReviewOutput.
if let Some(active) = active_item.as_ref() {
let event = AgentMessageContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
delta: delta.clone(),
};
sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event))
.await;
} else {
error_or_panic("OutputTextDelta without active item".to_string());
}
}
ResponseEvent::ReasoningSummaryDelta {
delta,
summary_index,
} => {
if let Some(active) = active_item.as_ref() {
let event = ReasoningContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
delta, delta,
summary_index, summary_index,
}; } => {
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) if let Some(active) = active_item.as_ref() {
.await; let event = ReasoningContentDeltaEvent {
} else { thread_id: sess.conversation_id.to_string(),
error_or_panic("ReasoningSummaryDelta without active item".to_string()); turn_id: turn_context.sub_id.clone(),
} item_id: active.id(),
} delta,
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { summary_index,
if let Some(active) = active_item.as_ref() { };
let event = sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { .await;
item_id: active.id(), } else {
summary_index, error_or_panic("ReasoningSummaryDelta without active item".to_string());
}); }
sess.send_event(&turn_context, event).await; }
} else { ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); if let Some(active) = active_item.as_ref() {
} let event =
} EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {
ResponseEvent::ReasoningContentDelta { item_id: active.id(),
delta, summary_index,
content_index, });
} => { sess.send_event(&turn_context, event).await;
if let Some(active) = active_item.as_ref() { } else {
let event = ReasoningRawContentDeltaEvent { error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
thread_id: sess.conversation_id.to_string(), }
turn_id: turn_context.sub_id.clone(), }
item_id: active.id(), ResponseEvent::ReasoningContentDelta {
delta, delta,
content_index, content_index,
}; } => {
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) if let Some(active) = active_item.as_ref() {
.await; let event = ReasoningRawContentDeltaEvent {
} else { thread_id: sess.conversation_id.to_string(),
error_or_panic("ReasoningRawContentDelta without active item".to_string()); turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
delta,
content_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;
} else {
error_or_panic("ReasoningRawContentDelta without active item".to_string());
}
}
} }
} }
} }
} }
} }
fn last_assistant_message_from_item(item: &ResponseItem) -> Option<String> {
if let ResponseItem::Message { role, content, .. } = item
&& role == "assistant" {
return content.iter().rev().find_map(|ci| match ci {
ContentItem::OutputText { text } => Some(text.clone()),
_ => None,
});
}
None
}
fn response_input_to_response_item(input: &ResponseInputItem) -> Option<ResponseItem> {
match input {
ResponseInputItem::FunctionCallOutput { call_id, output } => {
Some(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
})
}
ResponseInputItem::CustomToolCallOutput { call_id, output } => {
Some(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
})
}
ResponseInputItem::McpToolCallOutput { call_id, result } => {
let output = match result {
Ok(call_tool_result) => FunctionCallOutputPayload::from(call_tool_result),
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
..Default::default()
},
};
Some(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
})
}
_ => None,
}
}
async fn handle_non_tool_response_item(item: &ResponseItem) -> Option<TurnItem> { async fn handle_non_tool_response_item(item: &ResponseItem) -> Option<TurnItem> {
debug!(?item, "Output item"); debug!(?item, "Output item");

View File

@@ -1,4 +1,3 @@
use crate::codex::ProcessedResponseItem;
use crate::exec::ExecToolCallOutput; use crate::exec::ExecToolCallOutput;
use crate::token_data::KnownPlan; use crate::token_data::KnownPlan;
use crate::token_data::PlanType; use crate::token_data::PlanType;
@@ -61,9 +60,7 @@ pub enum SandboxErr {
pub enum CodexErr { pub enum CodexErr {
// todo(aibrahim): git rid of this error carrying the dangling artifacts // todo(aibrahim): git rid of this error carrying the dangling artifacts
#[error("turn aborted. Something went wrong? Hit `/feedback` to report the issue.")] #[error("turn aborted. Something went wrong? Hit `/feedback` to report the issue.")]
TurnAborted { TurnAborted,
dangling_artifacts: Vec<ProcessedResponseItem>,
},
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP /// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
/// handshake has succeeded but **before** it finished emitting `response.completed`. /// handshake has succeeded but **before** it finished emitting `response.completed`.
@@ -173,9 +170,7 @@ pub enum CodexErr {
impl From<CancelErr> for CodexErr { impl From<CancelErr> for CodexErr {
fn from(_: CancelErr) -> Self { fn from(_: CancelErr) -> Self {
CodexErr::TurnAborted { CodexErr::TurnAborted
dangling_artifacts: Vec::new(),
}
} }
} }

View File

@@ -40,7 +40,6 @@ mod message_history;
mod model_provider_info; mod model_provider_info;
pub mod parse_command; pub mod parse_command;
pub mod powershell; pub mod powershell;
mod response_processing;
pub mod sandboxing; pub mod sandboxing;
mod text_encoding; mod text_encoding;
pub mod token_data; pub mod token_data;

View File

@@ -1,70 +0,0 @@
use crate::codex::Session;
use crate::codex::TurnContext;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use tracing::warn;
/// Process streamed `ResponseItem`s from the model into the pair of:
/// - items we should record in conversation history; and
/// - `ResponseInputItem`s to send back to the model on the next turn.
pub(crate) async fn process_items(
processed_items: Vec<crate::codex::ProcessedResponseItem>,
sess: &Session,
turn_context: &TurnContext,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut outputs_to_record = Vec::<ResponseItem>::new();
let mut new_inputs_to_record = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
if let Some(response) = &response {
responses.push(response.clone());
}
match response {
Some(ResponseInputItem::FunctionCallOutput { call_id, output }) => {
new_inputs_to_record.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }) => {
new_inputs_to_record.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
Some(ResponseInputItem::McpToolCallOutput { call_id, result }) => {
let output = match result {
Ok(call_tool_result) => FunctionCallOutputPayload::from(&call_tool_result),
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
..Default::default()
},
};
new_inputs_to_record.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
None => {}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
outputs_to_record.push(item);
}
let all_items_to_record = [outputs_to_record, new_inputs_to_record].concat();
// Only attempt to take the lock if there is something to record.
if !all_items_to_record.is_empty() {
sess.record_conversation_items(turn_context, &all_items_to_record)
.await;
}
(responses, all_items_to_record)
}