This commit is contained in:
Ahmed Ibrahim
2025-09-10 12:10:51 -07:00
parent 6564b498ad
commit 5cccaa5a0e

View File

@@ -543,7 +543,7 @@ impl Session {
InitialHistory::New => {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(turn_context);
self.record_conversation_items(&items).await;
self.record_response_items(&items).await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -552,7 +552,13 @@ impl Session {
// Always add response items to conversation history
let response_items = conversation_history.get_response_items();
if !response_items.is_empty() {
self.record_into_history(&response_items);
self.record_into_history_response_items(&response_items);
}
// Always add event msgs to conversation history
let event_msgs = conversation_history.get_event_msgs();
if let Some(event_msgs) = event_msgs {
self.record_into_history_event_msgs(&event_msgs);
}
// If persisting, persist all rollout items as-is (recorder filters)
@@ -566,13 +572,8 @@ impl Session {
/// Persist the event to rollout and send it to clients.
pub(crate) async fn send_event(&self, event: Event) {
// Persist the event into event_msgs in memory
self.state
.lock_unchecked()
.event_msgs
.record_items(std::slice::from_ref(&event.msg));
// Persist the event into rollout (recorder filters as needed)
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
self.persist_rollout_items(&rollout_items).await;
self.record_conversation_event_msgs(std::slice::from_ref(&event.msg))
.await;
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send tool call event: {e}");
}
@@ -662,21 +663,34 @@ impl Session {
state.approved_commands.insert(cmd);
}
async fn record_conversation_event_msgs(&self, items: &[EventMsg]) {
self.record_into_history_event_msgs(items);
self.persist_rollout_event_msgs(items).await;
}
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_into_history(items);
async fn record_response_items(&self, items: &[ResponseItem]) {
self.record_into_history_response_items(items);
self.persist_rollout_response_items(items).await;
}
/// Append ResponseItems to the in-memory conversation history only.
fn record_into_history(&self, items: &[ResponseItem]) {
fn record_into_history_response_items(&self, items: &[ResponseItem]) {
self.state
.lock_unchecked()
.response_items
.record_items(items.iter());
}
/// Append EventMsgs to the in-memory conversation history only.
fn record_into_history_event_msgs(&self, items: &[EventMsg]) {
self.state
.lock_unchecked()
.event_msgs
.record_items(items.iter());
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
@@ -686,6 +700,12 @@ impl Session {
self.persist_rollout_items(&rollout_items).await;
}
async fn persist_rollout_event_msgs(&self, items: &[EventMsg]) {
let rollout_items: Vec<RolloutItem> =
items.iter().cloned().map(RolloutItem::EventMsg).collect();
self.persist_rollout_items(&rollout_items).await;
}
fn build_initial_context(&self, turn_context: &TurnContext) -> Vec<ResponseItem> {
let mut items = Vec::<ResponseItem>::with_capacity(2);
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
@@ -717,7 +737,7 @@ impl Session {
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
let response_item: ResponseItem = response_input.clone().into();
// Add to conversation history and persist response item to rollout
self.record_conversation_items(std::slice::from_ref(&response_item))
self.record_response_items(std::slice::from_ref(&response_item))
.await;
// Derive user message events and persist only UserMessage to rollout
@@ -1172,7 +1192,7 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
sess.record_response_items(&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
@@ -1460,7 +1480,7 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
sess.record_conversation_items(&pending_input).await;
sess.record_response_items(&pending_input).await;
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1587,7 +1607,7 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
sess.record_response_items(&items_to_record_in_conversation_history)
.await;
}