feat(app-server): experimental flag to persist extended history (#11227)

This PR adds an experimental `persist_extended_history` bool flag to
app-server thread APIs so rollout logs can retain a richer set of
EventMsgs for non-lossy Thread > Turn > ThreadItems reconstruction (i.e.
on `thread/resume`).

### Motivation
Today, our rollout recorder only persists a small subset (e.g. user
message, reasoning, assistant message) of `EventMsg` types, dropping a
good number (like command exec, file change, etc.) that are important
for reconstructing full item history for `thread/resume`, `thread/read`,
and `thread/fork`.

Some clients want to be able to resume a thread without lossiness. This
lossiness is primarily a UI thing, since what the model sees are
`ResponseItem` and not `EventMsg`.

### Approach
This change introduces an opt-in `persist_full_history` flag to preserve
those events when you start/resume/fork a thread (defaults to `false`).

This is done by adding an `EventPersistenceMode` to the rollout
recorder:
- `Limited` (existing behavior, default)
- `Extended` (new opt-in behavior)

In `Extended` mode, persist additional `EventMsg` variants needed for
non-lossy app-server `ThreadItem` reconstruction. We now store the
following ThreadItems that we didn't before:
- web search
- command execution
- patch/file changes
- MCP tool calls
- image view calls
- collab tool outcomes
- context compaction
- review mode enter/exit

For **command executions** in particular, we truncate the output using
the existing `truncate_text` from core to store an upper bound of 10,000
bytes, which is also the default value for truncating tool outputs shown
to the model. This keeps the size of the rollout file and command
execution items returned over the wire reasonable.

And we also persist `EventMsg::Error` which we can now map back to the
Turn's status and populates the Turn's error metadata.

#### Updates to EventMsgs
To truly make `thread/resume` non-lossy, we also needed to persist the
`status` on `EventMsg::CommandExecutionEndEvent` and
`EventMsg::PatchApplyEndEvent`. Previously it was not obvious whether a
command failed or was declined (similar for apply_patch). These
EventMsgs were never persisted before so I made it a required field.
This commit is contained in:
Owen Lin
2026-02-12 11:34:22 -08:00
committed by GitHub
parent 22fa283511
commit efc8d45750
43 changed files with 1724 additions and 138 deletions

View File

@@ -2,12 +2,20 @@ use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use codex_protocol::models::ResponseItem;
/// Whether a rollout `item` should be persisted in rollout files.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum EventPersistenceMode {
#[default]
Limited,
Extended,
}
/// Whether a rollout `item` should be persisted in rollout files for the
/// provided persistence `mode`.
#[inline]
pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool {
pub(crate) fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool {
match item {
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
@@ -51,9 +59,33 @@ pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) ->
}
}
/// Whether an `EventMsg` should be persisted in rollout files.
/// Whether an `EventMsg` should be persisted in rollout files for the
/// provided persistence `mode`.
#[inline]
pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
pub(crate) fn should_persist_event_msg(ev: &EventMsg, mode: EventPersistenceMode) -> bool {
match mode {
EventPersistenceMode::Limited => should_persist_event_msg_limited(ev),
EventPersistenceMode::Extended => should_persist_event_msg_extended(ev),
}
}
fn should_persist_event_msg_limited(ev: &EventMsg) -> bool {
matches!(
event_msg_persistence_mode(ev),
Some(EventPersistenceMode::Limited)
)
}
fn should_persist_event_msg_extended(ev: &EventMsg) -> bool {
matches!(
event_msg_persistence_mode(ev),
Some(EventPersistenceMode::Limited) | Some(EventPersistenceMode::Extended)
)
}
/// Returns the minimum persistence mode that includes this event.
/// `None` means the event should never be persisted.
fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
match ev {
EventMsg::UserMessage(_)
| EventMsg::AgentMessage(_)
@@ -67,15 +99,29 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::UndoCompleted(_)
| EventMsg::TurnAborted(_)
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_) => true,
| EventMsg::TurnComplete(_) => Some(EventPersistenceMode::Limited),
EventMsg::ItemCompleted(event) => {
// Plan items are derived from streaming tags and are not part of the
// raw ResponseItem history, so we persist their completion to replay
// them on resume without bloating rollouts with every item lifecycle.
matches!(event.item, codex_protocol::items::TurnItem::Plan(_))
if matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) {
Some(EventPersistenceMode::Limited)
} else {
None
}
}
EventMsg::Error(_)
| EventMsg::Warning(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ViewImageToolCall(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseEnd(_)
| EventMsg::CollabResumeEnd(_) => Some(EventPersistenceMode::Extended),
EventMsg::Warning(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
@@ -84,13 +130,10 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::TerminalInteraction(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::RequestUserInput(_)
| EventMsg::DynamicToolCallRequest(_)
@@ -99,7 +142,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::BackgroundEvent(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::UndoStarted(_)
@@ -112,7 +154,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::RemoteSkillDownloaded(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::DeprecationNotice(_)
| EventMsg::ItemStarted(_)
| EventMsg::AgentMessageContentDelta(_)
@@ -121,14 +162,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabCloseEnd(_)
| EventMsg::CollabResumeBegin(_)
| EventMsg::CollabResumeEnd(_) => false,
| EventMsg::CollabResumeBegin(_) => None,
}
}

View File

@@ -36,6 +36,7 @@ use super::list::get_threads_in_root;
use super::list::parse_cursor;
use super::list::parse_timestamp_uuid_from_filename;
use super::metadata;
use super::policy::EventPersistenceMode;
use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::default_client::originator;
@@ -43,6 +44,9 @@ use crate::git_info::collect_git_info;
use crate::path_utils;
use crate::state_db;
use crate::state_db::StateDbHandle;
use crate::truncate::TruncationPolicy;
use crate::truncate::truncate_text;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
@@ -67,6 +71,7 @@ pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
pub(crate) rollout_path: PathBuf,
state_db: Option<StateDbHandle>,
event_persistence_mode: EventPersistenceMode,
}
#[derive(Clone)]
@@ -77,9 +82,11 @@ pub enum RolloutRecorderParams {
source: SessionSource,
base_instructions: BaseInstructions,
dynamic_tools: Vec<DynamicToolSpec>,
event_persistence_mode: EventPersistenceMode,
},
Resume {
path: PathBuf,
event_persistence_mode: EventPersistenceMode,
},
}
@@ -104,6 +111,7 @@ impl RolloutRecorderParams {
source: SessionSource,
base_instructions: BaseInstructions,
dynamic_tools: Vec<DynamicToolSpec>,
event_persistence_mode: EventPersistenceMode,
) -> Self {
Self::Create {
conversation_id,
@@ -111,11 +119,42 @@ impl RolloutRecorderParams {
source,
base_instructions,
dynamic_tools,
event_persistence_mode,
}
}
pub fn resume(path: PathBuf) -> Self {
Self::Resume { path }
pub fn resume(path: PathBuf, event_persistence_mode: EventPersistenceMode) -> Self {
Self::Resume {
path,
event_persistence_mode,
}
}
}
const PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES: usize = 10_000;
fn sanitize_rollout_item_for_persistence(
item: RolloutItem,
mode: EventPersistenceMode,
) -> RolloutItem {
if mode != EventPersistenceMode::Extended {
return item;
}
match item {
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(mut event)) => {
// Persist only a bounded aggregated summary of command output.
event.aggregated_output = truncate_text(
&event.aggregated_output,
TruncationPolicy::Bytes(PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES),
);
// Drop unnecessary fields from rollout storage since aggregated_output is all we need.
event.stdout.clear();
event.stderr.clear();
event.formatted_output.clear();
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(event))
}
_ => item,
}
}
@@ -322,58 +361,70 @@ impl RolloutRecorder {
state_db_ctx: Option<StateDbHandle>,
state_builder: Option<ThreadMetadataBuilder>,
) -> std::io::Result<Self> {
let (file, deferred_log_file_info, rollout_path, meta) = match params {
RolloutRecorderParams::Create {
conversation_id,
forked_from_id,
source,
base_instructions,
dynamic_tools,
} => {
let log_file_info = precompute_log_file_info(config, conversation_id)?;
let path = log_file_info.path.clone();
let session_id = log_file_info.conversation_id;
let started_at = log_file_info.timestamp;
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = started_at
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let session_meta = SessionMeta {
id: session_id,
let (file, deferred_log_file_info, rollout_path, meta, event_persistence_mode) =
match params {
RolloutRecorderParams::Create {
conversation_id,
forked_from_id,
timestamp,
cwd: config.cwd.clone(),
originator: originator().value,
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source,
model_provider: Some(config.model_provider_id.clone()),
base_instructions: Some(base_instructions),
dynamic_tools: if dynamic_tools.is_empty() {
None
} else {
Some(dynamic_tools)
},
};
base_instructions,
dynamic_tools,
event_persistence_mode,
} => {
let log_file_info = precompute_log_file_info(config, conversation_id)?;
let path = log_file_info.path.clone();
let session_id = log_file_info.conversation_id;
let started_at = log_file_info.timestamp;
(None, Some(log_file_info), path, Some(session_meta))
}
RolloutRecorderParams::Resume { path } => (
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = started_at
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let session_meta = SessionMeta {
id: session_id,
forked_from_id,
timestamp,
cwd: config.cwd.clone(),
originator: originator().value,
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source,
model_provider: Some(config.model_provider_id.clone()),
base_instructions: Some(base_instructions),
dynamic_tools: if dynamic_tools.is_empty() {
None
} else {
Some(dynamic_tools)
},
};
(
None,
Some(log_file_info),
path,
Some(session_meta),
event_persistence_mode,
)
}
RolloutRecorderParams::Resume {
path,
event_persistence_mode,
} => (
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
event_persistence_mode,
),
None,
path,
None,
),
};
};
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
@@ -402,6 +453,7 @@ impl RolloutRecorder {
tx,
rollout_path,
state_db: state_db_ctx,
event_persistence_mode,
})
}
@@ -419,8 +471,11 @@ impl RolloutRecorder {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if is_persisted_response_item(item) {
filtered.push(item.clone());
if is_persisted_response_item(item, self.event_persistence_mode) {
filtered.push(sanitize_rollout_item_for_persistence(
item.clone(),
self.event_persistence_mode,
));
}
}
if filtered.is_empty() {
@@ -673,9 +728,7 @@ async fn rollout_writer(
RolloutCmd::AddItems(items) => {
let mut persisted_items = Vec::new();
for item in items {
if is_persisted_response_item(&item) {
persisted_items.push(item);
}
persisted_items.push(item);
}
if persisted_items.is_empty() {
continue;
@@ -1003,6 +1056,7 @@ mod tests {
SessionSource::Exec,
BaseInstructions::default(),
Vec::new(),
EventPersistenceMode::Limited,
),
None,
None,