Compare commits

..

5 Commits

Author SHA1 Message Date
jimmyfraiture
ee6677d398 NIT 2 2025-09-08 11:57:06 -07:00
jimmyfraiture
927ccb3299 V4 2025-09-08 10:52:08 -07:00
jimmyfraiture
10537867ad V3 2025-09-08 10:42:56 -07:00
jimmyfraiture
fdf52e87c2 V2 2025-09-08 10:33:21 -07:00
jimmyfraiture
731a354f6c V1 2025-09-08 10:07:57 -07:00
45 changed files with 940 additions and 1387 deletions

View File

@@ -14,18 +14,33 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v5
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
run_install: false
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@v4
with:
node-version: 22
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.8.1
run_install: false
- name: Get pnpm store directory
id: pnpm-cache
shell: bash
run: |
echo "store_path=$(pnpm store path --silent)" >> $GITHUB_OUTPUT
- name: Setup pnpm cache
uses: actions/cache@v4
with:
path: ${{ steps.pnpm-cache.outputs.store_path }}
key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-store-
- name: Install dependencies
run: pnpm install --frozen-lockfile
run: pnpm install
# Run all tasks using workspace filters

69
codex-rs/Cargo.lock generated
View File

@@ -917,8 +917,6 @@ name = "codex-protocol"
version = "0.0.0"
dependencies = [
"base64 0.22.1",
"icu_decimal",
"icu_locale_core",
"mcp-types",
"mime_guess",
"pretty_assertions",
@@ -928,7 +926,6 @@ dependencies = [
"serde_with",
"strum 0.27.2",
"strum_macros 0.27.2",
"sys-locale",
"tracing",
"ts-rs",
"uuid",
@@ -1761,17 +1758,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "fixed_decimal"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35943d22b2f19c0cb198ecf915910a8158e94541c89dcc63300d7799d46c2c5e"
dependencies = [
"displaydoc",
"smallvec",
"writeable",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -2271,45 +2257,6 @@ dependencies = [
"zerovec",
]
[[package]]
name = "icu_decimal"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fec61c43fdc4e368a9f450272833123a8ef0d7083a44597660ce94d791b8a2e2"
dependencies = [
"displaydoc",
"fixed_decimal",
"icu_decimal_data",
"icu_locale",
"icu_locale_core",
"icu_provider",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_decimal_data"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b70963bc35f9bdf1bc66a5c1f458f4991c1dc71760e00fa06016b2c76b2738d5"
[[package]]
name = "icu_locale"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ae5921528335e91da1b6c695dbf1ec37df5ac13faa3f91e5640be93aa2fbefd"
dependencies = [
"displaydoc",
"icu_collections",
"icu_locale_core",
"icu_locale_data",
"icu_provider",
"potential_utf",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_locale_core"
version = "2.0.0"
@@ -2323,12 +2270,6 @@ dependencies = [
"zerovec",
]
[[package]]
name = "icu_locale_data"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fdef0c124749d06a743c69e938350816554eb63ac979166590e2b4ee4252765"
[[package]]
name = "icu_normalizer"
version = "2.0.0"
@@ -3573,7 +3514,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585"
dependencies = [
"serde",
"zerovec",
]
@@ -4910,15 +4850,6 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "sys-locale"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eab9a99a024a169fe8a903cf9d4a3b3601109bcc13bd9e3c6fff259138626c4"
dependencies = [
"libc",
]
[[package]]
name = "system-configuration"
version = "0.6.1"

View File

@@ -10,15 +10,13 @@ use std::time::Duration;
use crate::AuthManager;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::rollout::RolloutItem;
use crate::rollout::recorder::RolloutItemSliceExt;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
@@ -33,6 +31,7 @@ use tracing::error;
use tracing::info;
use tracing::trace;
use tracing::warn;
use uuid::Uuid;
use crate::ModelProviderInfo;
use crate::apply_patch;
@@ -204,6 +203,9 @@ impl Codex {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
session
.record_initial_history(&turn_context, conversation_history)
.await;
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
@@ -265,6 +267,12 @@ struct State {
pending_input: Vec<ResponseInputItem>,
history: ConversationHistory,
token_info: Option<TokenUsageInfo>,
last_undo_patch: Option<StoredUndoPatch>,
}
#[derive(Clone)]
struct StoredUndoPatch {
patch: String,
}
/// Context for an initialized model agent
@@ -360,6 +368,7 @@ impl Session {
tx_event: Sender<Event>,
initial_history: InitialHistory,
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
let conversation_id = ConversationId::from(Uuid::new_v4());
let ConfigureSession {
provider,
model,
@@ -377,11 +386,6 @@ impl Session {
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
}
let conversation_id = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => ConversationId::default(),
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
// Error messages to dispatch after SessionConfigured is sent.
let mut post_session_configured_error_events = Vec::<Event>::new();
@@ -481,10 +485,10 @@ impl Session {
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = Some(
sess.apply_initial_history(&turn_context, initial_history.clone())
.await,
);
let initial_messages = match &initial_history {
InitialHistory::New => None,
InitialHistory::Resumed(items) => Some(sess.build_initial_messages(items)),
};
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
@@ -498,7 +502,9 @@ impl Session {
})
.chain(post_session_configured_error_events.into_iter());
for event in events {
sess.send_event(event).await;
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
}
Ok((sess, turn_context))
@@ -521,31 +527,22 @@ impl Session {
}
}
async fn apply_initial_history(
async fn record_initial_history(
&self,
turn_context: &TurnContext,
conversation_history: InitialHistory,
) -> Vec<EventMsg> {
) {
match conversation_history {
InitialHistory::New => self.record_initial_history_new(turn_context).await,
InitialHistory::Forked(items) => {
self.record_conversation_items_internal(&items, true).await;
items
.into_iter()
.flat_map(|ri| {
map_response_item_to_event_messages(&ri, self.show_raw_agent_reasoning)
})
.filter(|m| matches!(m, EventMsg::UserMessage(_)))
.collect()
InitialHistory::New => {
self.record_initial_history_new(turn_context).await;
}
InitialHistory::Resumed(resumed_history) => {
self.record_initial_history_resumed(resumed_history.history)
.await
InitialHistory::Resumed(items) => {
self.record_initial_history_resumed(items).await;
}
}
}
async fn record_initial_history_new(&self, turn_context: &TurnContext) -> Vec<EventMsg> {
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
// record the initial user instructions and environment context,
// regardless of whether we restored items.
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
@@ -559,44 +556,29 @@ impl Session {
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
for item in conversation_items {
self.record_conversation_item(item).await;
}
vec![]
self.record_conversation_items(&conversation_items).await;
}
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
self.record_conversation_items_internal(&items, false).await;
async fn record_initial_history_resumed(&self, items: Vec<ResponseItem>) {
self.record_conversation_items(&items).await;
}
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
// Record transcript (without persisting again)
let responses: Vec<ResponseItem> = items.as_slice().get_response_items();
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, true)
.await;
}
items.as_slice().get_events()
/// build the initial messages vector for SessionConfigured by converting
/// ResponseItems into EventMsg.
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
items
.iter()
.flat_map(|item| {
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
})
.collect()
}
/// Sends the given event to the client and records it to the rollout (if enabled).
/// Any send/record errors are logged and swallowed.
/// Sends the given event to the client and swallows the send event, if
/// any, logging it as an error.
pub(crate) async fn send_event(&self, event: Event) {
let event_to_record = event.clone();
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send event: {e}");
}
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec
.record_items(crate::rollout::RolloutItem::Event(event_to_record))
.await
{
error!("failed to record rollout event: {e:#}");
error!("failed to send tool call event: {e}");
}
}
@@ -628,7 +610,7 @@ impl Session {
reason,
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
rx_approve
}
@@ -660,7 +642,7 @@ impl Session {
grant_root,
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
rx_approve
}
@@ -684,61 +666,43 @@ impl Session {
state.approved_commands.insert(cmd);
}
fn store_last_undo_patch(&self, patch: String) {
let mut state = self.state.lock_unchecked();
state.last_undo_patch = Some(StoredUndoPatch { patch });
}
fn last_undo_patch(&self) -> Option<StoredUndoPatch> {
self.state.lock_unchecked().last_undo_patch.clone()
}
fn clear_last_undo_patch(&self) {
self.state.lock_unchecked().last_undo_patch = None;
}
/// Records items to both the rollout and the chat completions/ZDR
/// transcript, if enabled.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_conversation_items_internal(items, true).await;
}
async fn record_conversation_item(&self, item: ResponseItem) {
let items = [item];
self.record_conversation_items_internal(&items, true).await;
}
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
debug!("Recording items for conversation: {items:?}");
if persist {
// Record snapshot of these items into rollout
for item in items {
self.record_state_snapshot(RolloutItem::ResponseItem(item.clone()))
.await;
}
}
self.record_state_snapshot(items).await;
self.state.lock_unchecked().history.record_items(items);
}
async fn record_state_snapshot(&self, item: RolloutItem) {
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
let snapshot = { crate::rollout::SessionStateSnapshot {} };
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec.record_items(item).await
{
error!("failed to record rollout items: {e:#}");
}
}
/// Records a user input into conversation history AND a corresponding UserMessage event in rollout.
/// Does not send events to the UI.
async fn record_user_input(&self, sub_id: &str, response_item: ResponseItem) {
// Record the message/tool input in conversation history/rollout state
self.record_conversation_item(response_item.clone()).await;
// Derive and record a UserMessage event alongside it in the rollout
let user_events =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning)
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_)));
for msg in user_events {
let event = Event {
id: sub_id.to_string(),
msg,
};
self.record_state_snapshot(RolloutItem::Event(event)).await;
if let Some(rec) = recorder {
if let Err(e) = rec.record_state(snapshot).await {
error!("failed to record rollout state: {e:#}");
}
if let Err(e) = rec.record_items(items).await {
error!("failed to record rollout items: {e:#}");
}
}
}
@@ -759,6 +723,7 @@ impl Session {
user_explicitly_approved_this_action,
changes,
}) => {
self.clear_last_undo_patch();
turn_diff_tracker.on_patch_begin(&changes);
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
@@ -781,14 +746,13 @@ impl Session {
id: sub_id.to_string(),
msg,
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
}
async fn on_exec_command_end(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
call_id: &str,
context: &ExecCommandContext,
output: &ExecToolCallOutput,
is_apply_patch: bool,
) {
@@ -807,14 +771,14 @@ impl Session {
let msg = if is_apply_patch {
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: call_id.to_string(),
call_id: context.call_id.clone(),
stdout,
stderr,
success: *exit_code == 0,
})
} else {
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: call_id.to_string(),
call_id: context.call_id.clone(),
stdout,
stderr,
aggregated_output,
@@ -825,22 +789,63 @@ impl Session {
};
let event = Event {
id: sub_id.to_string(),
id: context.sub_id.clone(),
msg,
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
if is_apply_patch {
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
id: sub_id.into(),
msg,
};
self.send_event(event).await;
match turn_diff_tracker.get_unified_diff() {
Ok(Some(unified_diff)) => {
let msg = EventMsg::TurnDiff(TurnDiffEvent {
unified_diff: unified_diff.clone(),
});
let event = Event {
id: context.sub_id.clone(),
msg,
};
let _ = self.tx_event.send(event).await;
if *exit_code == 0 {
match turn_diff_tracker.build_undo_patch() {
Ok(Some(patch)) => {
self.store_last_undo_patch(patch);
}
Ok(None) => {
self.clear_last_undo_patch();
}
Err(error) => {
warn!("failed to prepare undo patch: {error:#}");
self.clear_last_undo_patch();
self.notify_background_event(
&context.sub_id,
format!("Undo is unavailable for this turn: {error:#}"),
)
.await;
}
}
}
}
Ok(None) => {
if *exit_code == 0 {
self.clear_last_undo_patch();
}
}
Err(error) => {
warn!("failed to compute unified diff: {error:#}");
if *exit_code == 0 {
self.clear_last_undo_patch();
self
.notify_background_event(
&context.sub_id,
format!(
"Undo is unavailable for this turn: failed to compute diff: {error:#}"
),
)
.await;
}
}
}
}
}
@@ -855,8 +860,6 @@ impl Session {
exec_args: ExecInvokeArgs<'a>,
) -> crate::error::Result<ExecToolCallOutput> {
let is_apply_patch = begin_ctx.apply_patch.is_some();
let sub_id = begin_ctx.sub_id.clone();
let call_id = begin_ctx.call_id.clone();
self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
.await;
@@ -884,14 +887,8 @@ impl Session {
&output_stderr
}
};
self.on_exec_command_end(
turn_diff_tracker,
&sub_id,
&call_id,
borrowed,
is_apply_patch,
)
.await;
self.on_exec_command_end(turn_diff_tracker, &begin_ctx, borrowed, is_apply_patch)
.await;
result
}
@@ -906,7 +903,7 @@ impl Session {
message: message.into(),
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
}
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
@@ -916,7 +913,38 @@ impl Session {
message: message.into(),
}),
};
self.send_event(event).await;
let _ = self.tx_event.send(event).await;
}
async fn undo_last_turn_diff(&self, sub_id: &str) {
let Some(stored_patch) = self.last_undo_patch() else {
self.notify_background_event(sub_id, "No turn diff available to undo.")
.await;
return;
};
let mut stdout = Vec::new();
let mut stderr = Vec::new();
match codex_apply_patch::apply_patch(&stored_patch.patch, &mut stdout, &mut stderr) {
Ok(()) => {
self.clear_last_undo_patch();
if stdout.is_empty() {
self.notify_background_event(sub_id, "Reverted last turn diff.")
.await;
} else if let Ok(output) = String::from_utf8(stdout) {
self.notify_background_event(sub_id, output).await;
}
}
Err(error) => {
let mut message = format!("failed to undo turn diff: {error:#}");
if let Ok(stderr_text) = String::from_utf8(stderr)
&& !stderr_text.is_empty()
{
message = format!("{message}\n{stderr_text}");
}
self.notify_stream_error(sub_id, message).await;
}
}
}
/// Build the full turn input by concatenating the current conversation
@@ -1079,9 +1107,9 @@ impl AgentTask {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let sess = self.sess.clone();
let tx_event = self.sess.tx_event.clone();
tokio::spawn(async move {
sess.send_event(event).await;
tx_event.send(event).await.ok();
});
}
}
@@ -1177,13 +1205,13 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_item(ResponseItem::from(EnvironmentContext::new(
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
)))
))])
.await;
}
}
@@ -1196,6 +1224,9 @@ async fn submission_loop(
sess.set_task(task);
}
}
Op::UndoLastTurnDiff => {
sess.undo_last_turn_diff(&sub.id).await;
}
Op::UserTurn {
items,
cwd,
@@ -1286,7 +1317,7 @@ async fn submission_loop(
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let sess_for_spawn = sess.clone();
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
@@ -1314,10 +1345,13 @@ async fn submission_loop(
),
};
sess_for_spawn.send_event(event).await;
if let Err(e) = tx_event.send(event).await {
warn!("failed to send GetHistoryEntryResponse event: {e}");
}
});
}
Op::ListMcpTools => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
@@ -1328,9 +1362,12 @@ async fn submission_loop(
crate::protocol::McpListToolsResponseEvent { tools },
),
};
sess.send_event(event).await;
if let Err(e) = tx_event.send(event).await {
warn!("failed to send McpListToolsResponse event: {e}");
}
}
Op::ListCustomPrompts => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
@@ -1346,7 +1383,9 @@ async fn submission_loop(
custom_prompts,
}),
};
sess.send_event(event).await;
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ListCustomPromptsResponse event: {e}");
}
}
Op::Compact => {
// Create a summarization request as user input
@@ -1382,36 +1421,34 @@ async fn submission_loop(
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
sess.send_event(event).await;
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
sess.send_event(event).await;
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
break;
}
Op::GetConversationPath => {
Op::GetHistory => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
// Ensure rollout file is flushed so consumers can read it immediately.
let rec_opt = { sess.rollout.lock_unchecked().as_ref().cloned() };
if let Some(rec) = rec_opt {
let _ = rec.flush().await;
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
conversation_id: sess.conversation_id,
path: sess
.rollout
.lock_unchecked()
.as_ref()
.map(|r| r.path().to_path_buf())
.unwrap_or_default(),
entries: sess.state.lock_unchecked().history.contents(),
}),
};
sess.send_event(event).await;
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ConversationHistory event: {e}");
}
}
_ => {
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
@@ -1443,17 +1480,19 @@ async fn run_task(
if input.is_empty() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// Record the user's input and corresponding event into the rollout
let user_input_response: ResponseItem = ResponseItem::from(initial_input_for_turn.clone());
sess.record_user_input(&sub_id, user_input_response).await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
}),
};
sess.send_event(event).await;
if sess.tx_event.send(event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
.await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
@@ -1469,9 +1508,7 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
for item in pending_input.iter() {
sess.record_user_input(&sub_id, item.clone()).await;
}
sess.record_conversation_items(&pending_input).await;
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1598,9 +1635,8 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
for item in items_to_record_in_conversation_history.iter().cloned() {
sess.record_conversation_item(item).await;
}
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
if responses.is_empty() {
@@ -1624,7 +1660,7 @@ async fn run_task(
message: e.to_string(),
}),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
// let the user continue the conversation
break;
}
@@ -1635,7 +1671,7 @@ async fn run_task(
id: sub_id,
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
async fn run_turn(
@@ -1811,11 +1847,13 @@ async fn try_run_turn(
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::WebSearchCallBegin { call_id } => {
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
let _ = sess
.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
}
ResponseEvent::Completed {
response_id: _,
@@ -1831,11 +1869,13 @@ async fn try_run_turn(
st.token_info = info.clone();
info
};
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
@@ -1844,7 +1884,7 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg,
};
sess.send_event(event).await;
let _ = sess.tx_event.send(event).await;
}
return Ok(output);
@@ -1854,21 +1894,21 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryPartAdded => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
@@ -1878,7 +1918,7 @@ async fn try_run_turn(
AgentReasoningRawContentDeltaEvent { delta },
),
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
}
}
@@ -1899,7 +1939,9 @@ async fn run_compact_task(
model_context_window,
}),
};
sess.send_event(start_event).await;
if sess.tx_event.send(start_event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let turn_input: Vec<ResponseItem> =
@@ -2077,7 +2119,7 @@ async fn handle_response_item(
id: sub_id.to_string(),
msg,
};
sess.send_event(event).await;
sess.tx_event.send(event).await.ok();
}
None
}
@@ -2910,11 +2952,13 @@ async fn drain_to_completed(
info
};
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
return Ok(());
}

View File

@@ -1,5 +1,12 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use crate::AuthManager;
use crate::CodexAuth;
use codex_protocol::mcp_protocol::ConversationId;
use tokio::sync::RwLock;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
@@ -10,63 +17,13 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use crate::rollout::recorder::RolloutItemSliceExt;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<RolloutItem>,
pub rollout_path: PathBuf,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<ResponseItem>),
}
impl PartialEq for InitialHistory {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(InitialHistory::New, InitialHistory::New) => true,
(InitialHistory::Forked(a), InitialHistory::Forked(b)) => a == b,
(InitialHistory::Resumed(_), InitialHistory::Resumed(_)) => true,
_ => false,
}
}
}
impl InitialHistory {
/// Return all response items contained in this initial history.
pub fn get_response_items(&self) -> Vec<ResponseItem> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_response_items(items.history.as_slice())
}
}
}
/// Return all events contained in this initial history.
pub fn get_events(&self) -> Vec<crate::protocol::EventMsg> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_events(items.history.as_slice())
}
}
}
Resumed(Vec<ResponseItem>),
}
/// Represents a newly created Codex conversation, including the first event
@@ -120,7 +77,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, InitialHistory::New).await?;
} = { Codex::spawn(config, auth_manager, InitialHistory::New).await? };
self.finalize_spawn(codex, conversation_id).await
}
}
@@ -192,43 +149,21 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
base_rollout_path: PathBuf,
_base_conversation_id: ConversationId,
conversation_history: Vec<ResponseItem>,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Read prior responses from the rollout file (tolerate both tagged and legacy formats).
let text = tokio::fs::read_to_string(&base_rollout_path)
.await
.map_err(|e| CodexErr::Io(std::io::Error::other(format!("read rollout: {e}"))))?;
let mut responses: Vec<ResponseItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
// Only consider response items (legacy lines have no record_type)
match v.get("record_type").and_then(|s| s.as_str()) {
Some("response") | None => {
if let Ok(item) = serde_json::from_value::<ResponseItem>(v) {
responses.push(item);
}
}
_ => {}
}
}
let kept = truncate_after_dropping_last_messages(responses, num_messages_to_drop);
// Compute the prefix up to the cut point.
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, kept).await?;
} = Codex::spawn(config, auth_manager, history).await?;
self.finalize_spawn(codex, conversation_id).await
}
}
@@ -237,7 +172,7 @@ impl ConversationManager {
/// and all items that follow them.
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Forked(items);
return InitialHistory::Resumed(items);
}
// Walk backwards counting only `user` Message items, find cut index.
@@ -259,7 +194,7 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
InitialHistory::Resumed(items.into_iter().take(cut_index).collect())
}
}
@@ -317,10 +252,10 @@ mod tests {
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
assert_eq!(
truncated,
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
InitialHistory::Resumed(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
);
let truncated2 = truncate_after_dropping_last_messages(items, 2);
assert!(matches!(truncated2, InitialHistory::New));
assert_eq!(truncated2, InitialHistory::New);
}
}

View File

@@ -62,7 +62,6 @@ pub mod terminal;
mod tool_apply_patch;
pub mod turn_diff_tracker;
pub use rollout::RolloutRecorder;
pub use rollout::SessionMeta;
pub use rollout::list::ConversationItem;
pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;

View File

@@ -187,13 +187,7 @@ impl McpConnectionManager {
let mut clients: HashMap<String, ManagedClient> = HashMap::with_capacity(join_set.len());
while let Some(res) = join_set.join_next().await {
let (server_name, client_res) = match res {
Ok((server_name, client_res)) => (server_name, client_res),
Err(e) => {
warn!("Task panic when starting MCP server: {e:#}");
continue;
}
};
let (server_name, client_res) = res?; // JoinError propagation
match client_res {
Ok((client, startup_timeout)) => {
@@ -211,13 +205,7 @@ impl McpConnectionManager {
}
}
let all_tools = match list_all_tools(&clients).await {
Ok(tools) => tools,
Err(e) => {
warn!("Failed to list tools from some MCP servers: {e:#}");
Vec::new()
}
};
let all_tools = list_all_tools(&clients).await?;
let tools = qualify_tools(all_tools);
@@ -282,19 +270,8 @@ async fn list_all_tools(clients: &HashMap<String, ManagedClient>) -> Result<Vec<
let mut aggregated: Vec<ToolInfo> = Vec::with_capacity(join_set.len());
while let Some(join_res) = join_set.join_next().await {
let (server_name, list_result) = if let Ok(result) = join_res {
result
} else {
warn!("Task panic when listing tools for MCP server: {join_res:#?}");
continue;
};
let list_result = if let Ok(result) = list_result {
result
} else {
warn!("Failed to list tools for MCP server '{server_name}': {list_result:#?}");
continue;
};
let (server_name, list_result) = join_res?;
let list_result = list_result?;
for tool in list_result.tools {
let tool_info = ToolInfo {

View File

@@ -23,6 +23,7 @@ use std::path::PathBuf;
use serde::Deserialize;
use serde::Serialize;
use codex_protocol::mcp_protocol::ConversationId;
use std::time::Duration;
use tokio::fs;
use tokio::io::AsyncReadExt;
@@ -30,7 +31,6 @@ use tokio::io::AsyncReadExt;
use crate::config::Config;
use crate::config_types::HistoryPersistence;
use codex_protocol::mcp_protocol::ConversationId;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(unix)]

View File

@@ -10,7 +10,6 @@ use time::macros::format_description;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
use super::recorder::SessionMetaWithGit;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
@@ -171,9 +170,7 @@ async fn traverse_directories_for_paths(
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
.await
.unwrap_or_default();
if should_include_session(&head) {
items.push(ConversationItem { path, head });
}
items.push(ConversationItem { path, head });
}
}
}
@@ -299,37 +296,3 @@ async fn read_first_jsonl_records(
}
Ok(head)
}
/// Return true if this conversation should be included in the listing.
///
/// Current rule: include only when the first JSON object is a session meta record
/// (i.e., has `{"record_type": "session_meta", ...}`), which is how rollout
/// files are written. Empty or malformed heads are excluded.
fn should_include_session(head: &[serde_json::Value]) -> bool {
let Some(first) = head.first() else {
return false;
};
passes_session_meta_filter(first)
}
/// Validate that the first record is a fullyformed session meta line.
///
/// Requirements:
/// - `record_type == "session_meta"`
/// - Remaining fields (after removing `record_type`) deserialize into
/// `SessionMetaWithGit`.
fn passes_session_meta_filter(first: &serde_json::Value) -> bool {
let Some(obj) = first.as_object() else {
return false;
};
let record_type = obj.get("record_type").and_then(|v| v.as_str());
if record_type != Some("session_meta") {
return false;
}
// Remove the marker field and validate the remainder matches SessionMetaWithGit
let mut cleaned = obj.clone();
cleaned.remove("record_type");
let val = serde_json::Value::Object(cleaned);
serde_json::from_value::<SessionMetaWithGit>(val).is_ok()
}

View File

@@ -6,9 +6,7 @@ pub mod list;
pub(crate) mod policy;
pub mod recorder;
pub use recorder::RolloutItem;
pub use recorder::RolloutRecorder;
pub use recorder::SessionMeta;
pub use recorder::SessionStateSnapshot;
#[cfg(test)]

View File

@@ -1,6 +1,4 @@
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
/// Whether a `ResponseItem` should be persisted in rollout files.
#[inline]
@@ -16,42 +14,3 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => false,
}
}
pub(crate) fn is_persisted_event(event: &Event) -> bool {
match event.msg {
EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::TaskStarted(_)
| EventMsg::TaskComplete(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ShutdownComplete
| EventMsg::ConversationHistory(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::StreamError(_)
| EventMsg::Error(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_) => false,
EventMsg::UserMessage(_)
| EventMsg::AgentMessage(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_) => true,
}
}

View File

@@ -4,13 +4,11 @@ use std::fs::File;
use std::fs::{self};
use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
@@ -28,32 +26,26 @@ use super::list::get_conversations;
use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::conversation_manager::InitialHistory;
use crate::conversation_manager::ResumedHistory;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use crate::rollout::policy::is_persisted_event;
use codex_protocol::models::ResponseItem;
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SessionMeta {
pub id: ConversationId,
pub timestamp: String,
pub cwd: String,
pub originator: String,
pub cli_version: String,
pub instructions: Option<String>,
}
// SessionMetaWithGit is used in writes and reads; ensure it implements Debug.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionMetaWithGit {
#[derive(Serialize)]
struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Default, Clone)]
@@ -78,98 +70,15 @@ pub struct SavedSession {
#[derive(Clone)]
pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
path: PathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "record_type", rename_all = "snake_case")]
enum TaggedLine {
Response {
#[serde(flatten)]
item: ResponseItem,
},
Event {
#[serde(flatten)]
event: Event,
},
SessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
PrevSessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
State {
#[serde(flatten)]
state: SessionStateSnapshot,
},
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TimestampedLine {
timestamp: String,
#[serde(flatten)]
record: TaggedLine,
}
#[derive(Debug, Clone)]
pub enum RolloutItem {
ResponseItem(ResponseItem),
Event(Event),
SessionMeta(SessionMetaWithGit),
}
impl From<ResponseItem> for RolloutItem {
fn from(item: ResponseItem) -> Self {
RolloutItem::ResponseItem(item)
}
}
impl From<Event> for RolloutItem {
fn from(event: Event) -> Self {
RolloutItem::Event(event)
}
}
/// Convenience helpers to extract typed items from a list of rollout items.
pub trait RolloutItemSliceExt {
fn get_response_items(&self) -> Vec<ResponseItem>;
fn get_events(&self) -> Vec<EventMsg>;
}
impl RolloutItemSliceExt for [RolloutItem] {
fn get_response_items(&self) -> Vec<ResponseItem> {
self.iter()
.filter_map(|it| match it {
RolloutItem::ResponseItem(ri) => Some(ri.clone()),
_ => None,
})
.collect()
}
fn get_events(&self) -> Vec<EventMsg> {
self.iter()
.filter_map(|it| match it {
RolloutItem::Event(ev) => Some(ev.msg.clone()),
_ => None,
})
.collect()
}
}
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Flush { ack: oneshot::Sender<()> },
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorder {
pub fn path(&self) -> &Path {
&self.path
}
#[allow(dead_code)]
/// List conversations (rollout files) under the provided Codex home directory.
pub async fn list_conversations(
@@ -192,169 +101,109 @@ impl RolloutRecorder {
file,
conversation_id: session_id,
timestamp,
path,
} = create_log_file(config, conversation_id)?;
let timestamp_format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let cwd = config.cwd.to_path_buf();
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
let (tx, rx) = mpsc::channel(100);
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.to_string_lossy().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
}),
cwd,
));
Ok(Self { tx, path })
Ok(Self { tx })
}
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
match item {
RolloutItem::ResponseItem(item) => self.record_response_item(&item).await,
RolloutItem::Event(event) => self.record_event(&event).await,
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if is_persisted_response_item(item) {
filtered.push(item.clone());
}
}
}
/// Ensure all writes up to this point have been processed by the writer task.
///
/// This is a sequencing barrier for readers that plan to open and read the
/// rollout file immediately after calling this method. The background writer
/// processes the channel serially; when it dequeues `Flush`, all prior
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
/// via `write_line`, which calls `file.flush()` (OSbuffer flush).
pub async fn flush(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
self.tx
.send(RolloutCmd::Flush { ack: tx_done })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
}
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if !is_persisted_response_item(item) {
if filtered.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddResponseItems(vec![item.clone()]))
.send(RolloutCmd::AddItems(filtered))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
async fn record_event(&self, event: &Event) -> std::io::Result<()> {
if !is_persisted_event(event) {
return Ok(());
}
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::AddEvents(vec![event.clone()]))
.send(RolloutCmd::UpdateState(state))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
}
async fn record_session_meta(&self, meta: &SessionMetaWithGit) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::AddSessionMeta(meta.clone()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout session meta: {e}")))
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
tracing::error!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
let mut lines = text.lines();
let first_line = lines
let _ = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let conversation_id = if let Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
}) = serde_json::from_str::<TimestampedLine>(first_line)
{
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMetaWithGit>(first_line) {
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMeta>(first_line) {
Some(meta.id)
} else {
return Err(IoError::other(
"failed to parse first line of rollout file as SessionMeta",
));
};
let mut items = Vec::new();
let mut items: Vec<RolloutItem> = Vec::new();
for line in lines {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<TimestampedLine>(line) {
Ok(TimestampedLine {
record: TaggedLine::State { .. },
..
}) => {}
Ok(TimestampedLine {
record: TaggedLine::Event { event },
..
}) => items.push(RolloutItem::Event(event)),
Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
})
| Ok(TimestampedLine {
record: TaggedLine::PrevSessionMeta { meta },
..
}) => items.push(RolloutItem::SessionMeta(meta)),
Ok(TimestampedLine {
record: TaggedLine::Response { item },
..
}) => {
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
if v.get("record_type")
.and_then(|rt| rt.as_str())
.map(|s| s == "state")
.unwrap_or(false)
{
continue;
}
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
items.push(item);
}
}
Err(_) => warn!("failed to parse rollout line: {line}"),
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
}
}
tracing::error!(
"Resumed rollout with {} items, conversation ID: {:?}",
items.len(),
conversation_id
);
let conversation_id = conversation_id
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
if items.is_empty() {
return Ok(InitialHistory::New);
}
info!("Resumed rollout successfully from {path:?}");
Ok(InitialHistory::Resumed(ResumedHistory {
conversation_id,
history: items,
rollout_path: path.to_path_buf(),
}))
if items.is_empty() {
Ok(InitialHistory::New)
} else {
Ok(InitialHistory::Resumed(items))
}
}
pub async fn shutdown(&self) -> std::io::Result<()> {
@@ -382,9 +231,6 @@ struct LogFileInfo {
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
/// Full filesystem path to the rollout file.
path: PathBuf,
}
fn create_log_file(
@@ -392,7 +238,8 @@ fn create_log_file(
conversation_id: ConversationId,
) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_utc();
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
@@ -420,7 +267,6 @@ fn create_log_file(
file,
conversation_id,
timestamp,
path,
})
}
@@ -439,37 +285,33 @@ async fn rollout_writer(
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
writer
.write_tagged(TaggedLine::SessionMeta {
meta: session_meta_with_git,
})
.await?;
writer.write_line(&session_meta_with_git).await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddResponseItems(items) => {
RolloutCmd::AddItems(items) => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_tagged(TaggedLine::Response { item }).await?;
writer.write_line(&item).await?;
}
}
}
RolloutCmd::AddEvents(events) => {
for event in events {
writer.write_tagged(TaggedLine::Event { event }).await?;
RolloutCmd::UpdateState(state) => {
#[derive(Serialize)]
struct StateLine<'a> {
record_type: &'static str,
#[serde(flatten)]
state: &'a SessionStateSnapshot,
}
}
// Sequencing barrier: by the time we handle `Flush`, all previously
// queued writes have been applied and flushed to OS buffers.
RolloutCmd::Flush { ack } => {
let _ = ack.send(());
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_tagged(TaggedLine::PrevSessionMeta { meta })
.write_line(&StateLine {
record_type: "state",
state: &state,
})
.await?;
}
RolloutCmd::Shutdown { ack } => {
@@ -489,16 +331,8 @@ impl JsonlWriter {
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
let _ = self.file.write_all(json.as_bytes()).await;
self.file.flush().await?;
Ok(())
}
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
let timestamp = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine { timestamp, record };
self.write_line(&line).await
}
}

View File

@@ -41,13 +41,8 @@ fn write_session_file(
let mut file = File::create(file_path)?;
let meta = serde_json::json!({
"record_type": "session_meta",
"timestamp": ts_str,
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
"id": uuid.to_string()
});
writeln!(file, "{meta}")?;
@@ -61,18 +56,6 @@ fn write_session_file(
Ok((dt, uuid))
}
fn expected_session_meta(ts: &str, uuid: Uuid) -> serde_json::Value {
serde_json::json!({
"record_type": "session_meta",
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
})
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
@@ -111,19 +94,19 @@ async fn test_list_conversations_latest_first() {
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![
expected_session_meta("2025-01-03T12-00-00", u3),
serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_2 = vec![
expected_session_meta("2025-01-02T12-00-00", u2),
serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_1 = vec![
expected_session_meta("2025-01-01T12-00-00", u1),
serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
@@ -188,11 +171,11 @@ async fn test_pagination_cursor() {
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![
expected_session_meta("2025-03-05T09-00-00", u5),
serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_4 = vec![
expected_session_meta("2025-03-04T09-00-00", u4),
serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor1: Cursor =
@@ -230,11 +213,11 @@ async fn test_pagination_cursor() {
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![
expected_session_meta("2025-03-03T09-00-00", u3),
serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_2 = vec![
expected_session_meta("2025-03-02T09-00-00", u2),
serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor2: Cursor =
@@ -266,7 +249,7 @@ async fn test_pagination_cursor() {
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![
expected_session_meta("2025-03-01T09-00-00", u1),
serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor3: Cursor =
@@ -305,7 +288,7 @@ async fn test_get_conversation_contents() {
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![
expected_session_meta(ts, uuid),
serde_json::json!({"timestamp": ts, "id": uuid.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
];
@@ -322,7 +305,7 @@ async fn test_get_conversation_contents() {
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = expected_session_meta(ts, uuid);
let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()});
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
@@ -357,7 +340,9 @@ async fn test_stable_ordering_same_second_pagination() {
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> { vec![expected_session_meta(ts, u)] };
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ConversationsPage {
items: vec![

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
@@ -249,6 +250,64 @@ impl TurnDiffTracker {
}
}
pub fn build_undo_patch(&mut self) -> Result<Option<String>> {
let mut delete_paths: BTreeSet<PathBuf> = BTreeSet::new();
let mut add_entries: Vec<(PathBuf, String)> = Vec::new();
let mut baseline_file_names: Vec<String> =
self.baseline_file_info.keys().cloned().collect();
baseline_file_names.sort();
for internal in baseline_file_names {
let Some(info) = self.baseline_file_info.get(&internal) else {
continue;
};
let current_path = self
.get_path_for_internal(&internal)
.unwrap_or(info.path.clone());
if current_path.exists() {
delete_paths.insert(current_path);
}
if info.oid.as_str() != ZERO_OID {
let content = String::from_utf8(info.content.clone()).map_err(|_| {
anyhow!(
"undo is not supported for non-UTF8 baseline file {}",
info.path.display()
)
})?;
add_entries.push((info.path.clone(), content));
}
}
if delete_paths.is_empty() && add_entries.is_empty() {
return Ok(None);
}
add_entries.sort_by(|(left_path, _), (right_path, _)| left_path.cmp(right_path));
let mut patch = String::from("*** Begin Patch\n");
for path in delete_paths {
patch.push_str(&format!("*** Delete File: {}\n", path.display()));
}
for (path, content) in add_entries {
patch.push_str(&format!("*** Add File: {}\n", path.display()));
if !content.is_empty() {
for line in content.split_terminator('\n') {
patch.push('+');
patch.push_str(line);
patch.push('\n');
}
if !content.ends_with('\n') {
patch.push_str("+\n");
}
}
}
patch.push_str("*** End Patch\n");
Ok(Some(patch))
}
fn get_file_diff(&mut self, internal_file_name: &str) -> String {
let mut aggregated = String::new();
@@ -503,6 +562,146 @@ mod tests {
out
}
fn normalize_patch_for_test(input: &str, root: &Path) -> String {
let root_str = root.display().to_string().replace('\\', "/");
let mut replaced = input.replace('\\', "/");
replaced = replaced.replace(&root_str, "<TMP>");
if let Some(root_name) = root.file_name().and_then(|name| name.to_str()) {
let marker = format!("/{root_name}");
let mut normalized = String::with_capacity(replaced.len());
let mut search_start = 0;
while let Some(relative_pos) = replaced[search_start..].find(&marker) {
let absolute_pos = search_start + relative_pos;
let path_start = replaced[..absolute_pos]
.rfind(['\n', ' '])
.map(|idx| idx + 1)
.unwrap_or(0);
let prefix_end = replaced[path_start..absolute_pos]
.find('/')
.map(|idx| path_start + idx + 1)
.unwrap_or(path_start);
normalized.push_str(&replaced[search_start..prefix_end]);
normalized.push_str("<TMP>");
let after_marker = absolute_pos + marker.len();
let mut rest_start = after_marker;
if after_marker < replaced.len() && replaced.as_bytes()[after_marker] == b'/' {
normalized.push('/');
rest_start += 1;
}
search_start = rest_start;
}
normalized.push_str(&replaced[search_start..]);
replaced = normalized;
}
if !replaced.ends_with('\n') {
replaced.push('\n');
}
replaced
}
#[test]
fn build_undo_patch_returns_none_without_baseline() {
let mut tracker = TurnDiffTracker::new();
assert_eq!(tracker.build_undo_patch().unwrap(), None);
}
#[test]
fn build_undo_patch_restores_updated_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("undo.txt");
fs::write(&path, "before\n").unwrap();
let mut tracker = TurnDiffTracker::new();
let update_changes = HashMap::from([(
path.clone(),
FileChange::Update {
unified_diff: String::new(),
move_path: None,
},
)]);
tracker.on_patch_begin(&update_changes);
fs::write(&path, "after\n").unwrap();
let patch = tracker
.build_undo_patch()
.expect("undo patch")
.expect("some undo patch");
let normalized = normalize_patch_for_test(&patch, dir.path());
let expected = concat!(
"*** Begin Patch\n",
"*** Delete File: <TMP>/undo.txt\n",
"*** Add File: <TMP>/undo.txt\n",
"+before\n",
"*** End Patch\n",
);
assert_eq!(normalized, expected);
}
#[test]
fn build_undo_patch_restores_deleted_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("gone.txt");
fs::write(&path, "gone\n").unwrap();
let mut tracker = TurnDiffTracker::new();
let delete_changes = HashMap::from([(
path.clone(),
FileChange::Delete {
content: "gone\n".to_string(),
},
)]);
tracker.on_patch_begin(&delete_changes);
fs::remove_file(&path).unwrap();
let patch = tracker
.build_undo_patch()
.expect("undo patch")
.expect("some undo patch");
let normalized = normalize_patch_for_test(&patch, dir.path());
let expected = concat!(
"*** Begin Patch\n",
"*** Add File: <TMP>/gone.txt\n",
"+gone\n",
"*** End Patch\n",
);
assert_eq!(normalized, expected);
}
#[test]
fn build_undo_patch_rejects_non_utf8_content() {
let dir = tempdir().unwrap();
let path = dir.path().join("binary.bin");
fs::write(&path, [0xff, 0xfe, 0x00]).unwrap();
let mut tracker = TurnDiffTracker::new();
let update_changes = HashMap::from([(
path.clone(),
FileChange::Update {
unified_diff: String::new(),
move_path: None,
},
)]);
tracker.on_patch_begin(&update_changes);
let err = tracker.build_undo_patch().unwrap_err();
let message = format!("{err:#}");
assert!(
message.contains("undo is not supported for non-UTF8 baseline file"),
"unexpected error message: {message}"
);
}
#[test]
fn accumulates_add_and_update() {
let mut acc = TurnDiffTracker::new();

View File

@@ -388,7 +388,8 @@ async fn integration_creates_and_checks_session_file() {
"No message found in session file containing the marker"
);
// Second run: resume should update the existing file.
// Second run: resume should create a NEW session file that contains both old and new history.
let orig_len = content.lines().count();
let marker2 = format!("integration-resume-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
// Crossplatform safe resume override. On Windows, backslashes in a TOML string must be escaped
@@ -448,8 +449,8 @@ async fn integration_creates_and_checks_session_file() {
}
let resumed_path = resumed_path.expect("No resumed session file found containing the marker2");
// Resume should write to the existing log file.
assert_eq!(
// Resume should have written to a new file, not the original one.
assert_ne!(
resumed_path, path,
"resume should create a new session file"
);
@@ -463,6 +464,14 @@ async fn integration_creates_and_checks_session_file() {
resumed_content.contains(&marker2),
"resumed file missing resumed marker"
);
// Original file should remain unchanged.
let content_after = std::fs::read_to_string(&path).unwrap();
assert_eq!(
content_after.lines().count(),
orig_len,
"original rollout file should not change on resume"
);
}
/// Integration test to verify git info is collected and recorded in session files.

View File

@@ -4,12 +4,9 @@ use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::WireApi;
use codex_core::built_in_model_providers;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::InputMessageKind;
use codex_core::protocol::Op;
use codex_core::protocol::UserMessageEvent;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::mcp_protocol::AuthMode;
use core_test_support::load_default_config_for_test;
@@ -18,7 +15,6 @@ use core_test_support::wait_for_event;
use serde_json::json;
use std::io::Write;
use tempfile::TempDir;
use uuid::Uuid;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -126,17 +122,11 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
let tmpdir = TempDir::new().unwrap();
let session_path = tmpdir.path().join("resume-session.jsonl");
let mut f = std::fs::File::create(&session_path).unwrap();
// First line: meta (content not used by reader other than non-empty)
writeln!(
f,
"{}",
json!({
"record_type": "session_meta",
"id": Uuid::new_v4(),
"timestamp": "2024-01-01T00:00:00Z",
"cwd": tmpdir.path().to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
serde_json::json!({"meta":"test","instructions":"be nice"})
)
.unwrap();
@@ -148,30 +138,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed user message".to_string(),
}],
};
let mut prior_user_obj = serde_json::to_value(&prior_user)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_user_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_user_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_user_obj)).unwrap();
// Also include a matching user message event to preserve ordering at resume
let prior_user_event = EventMsg::UserMessage(UserMessageEvent {
message: "resumed user message".to_string(),
kind: Some(InputMessageKind::Plain),
});
let prior_user_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-0",
"msg": prior_user_event,
});
writeln!(f, "{prior_user_event_line}").unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap();
// Prior item: system message (excluded from API history)
let prior_system = codex_protocol::models::ResponseItem::Message {
@@ -181,17 +148,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed system instruction".to_string(),
}],
};
let mut prior_system_obj = serde_json::to_value(&prior_system)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_system_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_system_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_system_obj)).unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap();
// Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message {
@@ -201,27 +158,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed assistant message".to_string(),
}],
};
let mut prior_item_obj = serde_json::to_value(&prior_item)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_item_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_item_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_item_obj)).unwrap();
let prior_item_event = EventMsg::AgentMessage(AgentMessageEvent {
message: "resumed assistant message".to_string(),
});
let prior_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-1",
"msg": prior_item_event,
});
writeln!(f, "{prior_event_line}").unwrap();
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
drop(f);
// Mock server that will receive the resumed request
@@ -265,7 +202,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
.clone()
.expect("expected initial messages for resumed session");
let initial_json = serde_json::to_value(&initial_msgs).unwrap();
let expected_initial_json = json!([
let expected_initial_json = serde_json::json!([
{ "type": "user_message", "message": "resumed user message", "kind": "plain" },
{ "type": "agent_message", "message": "resumed assistant message" }
]);
@@ -284,7 +221,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.body_json::<serde_json::Value>().unwrap();
let expected_input = json!([
let expected_input = serde_json::json!([
{
"type": "message",
"role": "user",
@@ -1030,7 +967,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
assert_eq!(requests.len(), 3, "expected 3 requests (one per turn)");
// Replace full-array compare with tail-only raw JSON compare using a single hard-coded value.
let r3_tail_expected = json!([
let r3_tail_expected = serde_json::json!([
{
"type": "message",
"role": "user",

View File

@@ -3,11 +3,10 @@ use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_protocol::models::ResponseItem;
use core_test_support::load_default_config_for_test;
use core_test_support::wait_for_event;
use tempfile::TempDir;
@@ -73,34 +72,17 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation.
codex.submit(Op::GetConversationPath).await.unwrap();
codex.submit(Op::GetHistory).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture path/id from the base history and compute expected prefixes after each fork.
let (base_conv_id, base_path) = match &base_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three = match &base_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
entries.clone()
}
_ => panic!("expected ConversationHistory event"),
};
// Read entries from rollout file.
async fn read_response_entries(path: &std::path::Path) -> Vec<ResponseItem> {
let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
let mut out = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(item) = serde_json::from_str::<ResponseItem>(line) {
out.push(item);
}
}
out
}
let entries_after_three: Vec<ResponseItem> = read_response_entries(&base_path).await;
// History layout for this test:
// [0] user instructions,
// [1] environment context,
@@ -131,46 +113,42 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone())
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
.await
.expect("fork 1");
codex_fork1.submit(Op::GetConversationPath).await.unwrap();
codex_fork1.submit(Op::GetHistory).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let (fork1_id, fork1_path) = match &fork1_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
let entries_after_first_fork = match &fork1_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
assert!(matches!(
fork1_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
));
entries.clone()
}
_ => panic!("expected ConversationHistory event after first fork"),
};
let entries_after_first_fork: Vec<ResponseItem> = read_response_entries(&fork1_path).await;
assert_eq!(entries_after_first_fork, expected_after_first);
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
let NewConversation {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone())
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
.await
.expect("fork 2");
codex_fork2.submit(Op::GetConversationPath).await.unwrap();
codex_fork2.submit(Op::GetHistory).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let (_fork2_id, fork2_path) = match &fork2_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after second fork"),
};
let entries_after_second_fork: Vec<ResponseItem> = read_response_entries(&fork2_path).await;
assert_eq!(entries_after_second_fork, expected_after_second);
assert!(matches!(
fork2_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
));
}

View File

@@ -26,7 +26,6 @@ use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::WebSearchBeginEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_protocol::num_format::format_with_separators;
use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
@@ -195,7 +194,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_println!(
self,
"tokens used: {}",
format_with_separators(usage_info.total_token_usage.blended_total())
usage_info.total_token_usage.blended_total()
);
}
}

View File

@@ -1,3 +1,8 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::json_to_toml::json_to_toml;
@@ -9,13 +14,11 @@ use codex_core::ConversationManager;
use codex_core::Cursor as RolloutCursor;
use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::auth::CLIENT_ID;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config::ConfigToml;
use codex_core::config::load_config_as_toml;
use codex_core::default_client::get_codex_user_agent;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::get_platform_sandbox;
@@ -45,7 +48,6 @@ use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
use codex_protocol::mcp_protocol::ExecCommandApprovalResponse;
use codex_protocol::mcp_protocol::ExecOneOffCommandParams;
use codex_protocol::mcp_protocol::GetUserAgentResponse;
use codex_protocol::mcp_protocol::GetUserSavedConfigResponse;
use codex_protocol::mcp_protocol::GitDiffToRemoteResponse;
use codex_protocol::mcp_protocol::InputItem as WireInputItem;
@@ -66,16 +68,8 @@ use codex_protocol::mcp_protocol::SendUserTurnParams;
use codex_protocol::mcp_protocol::SendUserTurnResponse;
use codex_protocol::mcp_protocol::ServerNotification;
use codex_protocol::mcp_protocol::UserSavedConfig;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use mcp_types::JSONRPCErrorError;
use mcp_types::RequestId;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
@@ -175,9 +169,6 @@ impl CodexMessageProcessor {
ClientRequest::GetUserSavedConfig { request_id } => {
self.get_user_saved_config(request_id).await;
}
ClientRequest::GetUserAgent { request_id } => {
self.get_user_agent(request_id).await;
}
ClientRequest::ExecOneOffCommand { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
@@ -393,12 +384,6 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn get_user_agent(&self, request_id: RequestId) {
let user_agent = get_codex_user_agent(Some(&self.config.responses_originator_header));
let response = GetUserAgentResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
}
async fn get_user_saved_config(&self, request_id: RequestId) {
let toml_value = match load_config_as_toml(&self.config.codex_home) {
Ok(val) => val,
@@ -574,11 +559,16 @@ impl CodexMessageProcessor {
}
};
let items = page
.items
.into_iter()
.filter_map(|it| extract_conversation_summary(it.path, &it.head))
.collect();
// Build summaries
let mut items: Vec<ConversationSummary> = Vec::new();
for it in page.items.into_iter() {
let (timestamp, preview) = extract_ts_and_preview(&it.head);
items.push(ConversationSummary {
path: it.path,
preview,
timestamp,
});
}
// Encode next_cursor as a plain string
let next_cursor = match page.next_cursor {
@@ -632,29 +622,19 @@ impl CodexMessageProcessor {
session_configured,
..
}) => {
let event = Event {
let event = codex_core::protocol::Event {
id: "".to_string(),
msg: EventMsg::SessionConfigured(session_configured.clone()),
msg: codex_core::protocol::EventMsg::SessionConfigured(
session_configured.clone(),
),
};
self.outgoing.send_event_as_notification(&event, None).await;
let initial_messages = session_configured.initial_messages.map(|msgs| {
msgs.into_iter()
.filter(|event| {
// Don't send non-plain user messages (like user instructions
// or environment context) back so they don't get rendered.
if let EventMsg::UserMessage(user_message) = event {
return matches!(user_message.kind, Some(InputMessageKind::Plain));
}
true
})
.collect()
});
// Reply with conversation id + model and initial messages (when present)
let response = codex_protocol::mcp_protocol::ResumeConversationResponse {
conversation_id,
model: session_configured.model.clone(),
initial_messages,
initial_messages: session_configured.initial_messages.clone(),
};
self.outgoing.send_response(request_id, response).await;
}
@@ -842,11 +822,11 @@ impl CodexMessageProcessor {
let mut params = match serde_json::to_value(event.clone()) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
error!("event did not serialize to an object");
tracing::error!("event did not serialize to an object");
continue;
}
Err(err) => {
error!("failed to serialize event: {err}");
tracing::error!("failed to serialize event: {err}");
continue;
}
};
@@ -1029,7 +1009,7 @@ fn derive_config_from_params(
async fn on_patch_approval_response(
event_id: String,
receiver: oneshot::Receiver<mcp_types::Result>,
receiver: tokio::sync::oneshot::Receiver<mcp_types::Result>,
codex: Arc<CodexConversation>,
) {
let response = receiver.await;
@@ -1071,14 +1051,14 @@ async fn on_patch_approval_response(
async fn on_exec_approval_response(
event_id: String,
receiver: oneshot::Receiver<mcp_types::Result>,
receiver: tokio::sync::oneshot::Receiver<mcp_types::Result>,
conversation: Arc<CodexConversation>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Err(err) => {
error!("request failed: {err:?}");
tracing::error!("request failed: {err:?}");
return;
}
};
@@ -1105,99 +1085,37 @@ async fn on_exec_approval_response(
}
}
fn extract_conversation_summary(
path: PathBuf,
head: &[serde_json::Value],
) -> Option<ConversationSummary> {
let session_meta = match head.first() {
Some(first_line) => match serde_json::from_value::<SessionMeta>(first_line.clone()) {
Ok(session_meta) => session_meta,
Err(..) => return None,
},
None => return None,
};
fn extract_ts_and_preview(head: &[serde_json::Value]) -> (Option<String>, String) {
let ts = head
.first()
.and_then(|v| v.get("timestamp"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let preview = find_first_user_text(head).unwrap_or_default();
(ts, preview)
}
let preview = head
.iter()
.filter_map(|value| serde_json::from_value::<ResponseItem>(value.clone()).ok())
.find_map(|item| match item {
ResponseItem::Message { content, .. } => {
content.into_iter().find_map(|content| match content {
ContentItem::InputText { text } => {
match InputMessageKind::from(("user", &text)) {
InputMessageKind::Plain => Some(text),
_ => None,
}
}
_ => None,
})
fn find_first_user_text(head: &[serde_json::Value]) -> Option<String> {
use codex_core::protocol::InputMessageKind;
for v in head.iter() {
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
if t != "message" {
continue;
}
if v.get("role").and_then(|x| x.as_str()) != Some("user") {
continue;
}
if let Some(arr) = v.get("content").and_then(|c| c.as_array()) {
for c in arr.iter() {
if let (Some("input_text"), Some(txt)) =
(c.get("type").and_then(|t| t.as_str()), c.get("text"))
&& let Some(s) = txt.as_str()
&& matches!(InputMessageKind::from(("user", s)), InputMessageKind::Plain)
{
return Some(s.to_string());
}
}
_ => None,
})?;
let preview = match preview.find(USER_MESSAGE_BEGIN) {
Some(idx) => preview[idx + USER_MESSAGE_BEGIN.len()..].trim(),
None => preview.as_str(),
};
let timestamp = if session_meta.timestamp.is_empty() {
None
} else {
Some(session_meta.timestamp.clone())
};
Some(ConversationSummary {
conversation_id: session_meta.id,
timestamp,
path,
preview: preview.to_string(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn extract_conversation_summary_prefers_plain_user_messages() {
let conversation_id =
ConversationId(Uuid::parse_str("3f941c35-29b3-493b-b0a4-e25800d9aeb0").unwrap());
let timestamp = Some("2025-09-05T16:53:11.850Z".to_string());
let path = PathBuf::from("rollout.jsonl");
let head = vec![
json!({
"id": conversation_id.0,
"timestamp": timestamp,
}),
json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": "<user_instructions>\n<AGENTS.md contents>\n</user_instructions>".to_string(),
}],
}),
json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": format!("<prior context> {USER_MESSAGE_BEGIN}Count to 5"),
}],
}),
];
let summary = extract_conversation_summary(path.clone(), &head).expect("summary");
assert_eq!(summary.conversation_id, conversation_id);
assert_eq!(
summary.timestamp,
Some("2025-09-05T16:53:11.850Z".to_string())
);
assert_eq!(summary.path, path);
assert_eq!(summary.preview, "Count to 5");
}
}
None
}

View File

@@ -5,10 +5,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::exec_approval::handle_exec_approval_request;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::patch_approval::handle_patch_approval_request;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::NewConversation;
@@ -30,6 +26,11 @@ use mcp_types::TextContent;
use serde_json::json;
use tokio::sync::Mutex;
use crate::exec_approval::handle_exec_approval_request;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::patch_approval::handle_patch_approval_request;
pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
/// Run a complete Codex session and stream events back to the client.

View File

@@ -247,11 +247,6 @@ impl McpProcess {
self.send_request("getUserSavedConfig", None).await
}
/// Send a `getUserAgent` JSON-RPC request.
pub async fn send_get_user_agent_request(&mut self) -> anyhow::Result<i64> {
self.send_request("getUserAgent", None).await
}
/// Send a `listConversations` JSON-RPC request.
pub async fn send_list_conversations_request(
&mut self,

View File

@@ -156,17 +156,8 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str,
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
lines.push(
json!({
"record_type": "session_meta",
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": codex_home.to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
.to_string(),
);
// Meta line with timestamp
lines.push(json!({"timestamp": meta_rfc3339}).to_string());
// Minimal user message entry as a persisted response item
lines.push(
json!({

View File

@@ -8,4 +8,3 @@ mod interrupt;
mod list_resume;
mod login;
mod send_message;
mod user_agent;

View File

@@ -1,45 +0,0 @@
use codex_core::default_client::DEFAULT_ORIGINATOR;
use codex_core::default_client::get_codex_user_agent;
use codex_protocol::mcp_protocol::GetUserAgentResponse;
use mcp_test_support::McpProcess;
use mcp_test_support::to_response;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_user_agent_returns_current_codex_user_agent() {
let codex_home = TempDir::new().unwrap_or_else(|err| panic!("create tempdir: {err}"));
let mut mcp = McpProcess::new(codex_home.path())
.await
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")
.expect("initialize request");
let request_id = mcp
.send_get_user_agent_request()
.await
.expect("send getUserAgent");
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await
.expect("getUserAgent timeout")
.expect("getUserAgent response");
let received: GetUserAgentResponse =
to_response(response).expect("deserialize getUserAgent response");
let expected = GetUserAgentResponse {
user_agent: get_codex_user_agent(Some(DEFAULT_ORIGINATOR)),
};
assert_eq!(received, expected);
}

View File

@@ -9,4 +9,4 @@ workspace = true
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
ts-rs = { version = "11", features = ["serde-json-impl", "no-serde-warnings"] }
ts-rs = { version = "11", features = ["serde-json-impl"] }

View File

@@ -20,25 +20,34 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
codex_protocol::mcp_protocol::InputItem::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ClientRequest::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ServerRequest::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::NewConversationParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::NewConversationResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::AddConversationListenerParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::AddConversationSubscriptionResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::RemoveConversationListenerParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::SendUserMessageParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::SendUserMessageResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::SendUserTurnParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::SendUserTurnResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::InterruptConversationParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::InterruptConversationResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GitDiffToRemoteParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GitDiffToRemoteResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::LoginChatGptResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::LoginChatGptCompleteNotification::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::CancelLoginChatGptParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::CancelLoginChatGptResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::LogoutChatGptParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::LogoutChatGptResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetAuthStatusParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetAuthStatusResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ApplyPatchApprovalParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ApplyPatchApprovalResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ExecCommandApprovalParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ExecCommandApprovalResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserSavedConfigResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserAgentResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ServerNotification::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ListConversationsResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ResumeConversationResponse::export_all_to(out_dir)?;
generate_index_ts(out_dir)?;

View File

@@ -12,8 +12,6 @@ workspace = true
[dependencies]
base64 = "0.22.1"
icu_decimal = "2.0.0"
icu_locale_core = "2.0.0"
mcp-types = { path = "../mcp-types" }
mime_guess = "2.0.5"
serde = { version = "1", features = ["derive"] }
@@ -22,9 +20,8 @@ serde_json = "1"
serde_with = { version = "3.14.0", features = ["macros", "base64"] }
strum = "0.27.2"
strum_macros = "0.27.2"
sys-locale = "0.3.2"
tracing = "0.1.41"
ts-rs = { version = "11", features = ["uuid-impl", "serde-json-impl", "no-serde-warnings"] }
ts-rs = { version = "11", features = ["uuid-impl", "serde-json-impl"] }
uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]

View File

@@ -1,9 +1,8 @@
use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, TS)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CustomPrompt {
pub name: String,
pub path: PathBuf,

View File

@@ -3,7 +3,6 @@ pub mod custom_prompts;
pub mod mcp_protocol;
pub mod message_history;
pub mod models;
pub mod num_format;
pub mod parse_command;
pub mod plan_tool;
pub mod protocol;

View File

@@ -19,7 +19,7 @@ use strum_macros::Display;
use ts_rs::TS;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS, Hash, Default)]
#[ts(type = "string")]
pub struct ConversationId(pub Uuid);
@@ -29,12 +29,6 @@ impl ConversationId {
}
}
impl Default for ConversationId {
fn default() -> Self {
Self::new()
}
}
impl Display for ConversationId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
@@ -143,10 +137,6 @@ pub enum ClientRequest {
#[serde(rename = "id")]
request_id: RequestId,
},
GetUserAgent {
#[serde(rename = "id")]
request_id: RequestId,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
#[serde(rename = "id")]
@@ -205,7 +195,7 @@ pub struct NewConversationResponse {
pub model: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, TS)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResumeConversationResponse {
pub conversation_id: ConversationId,
@@ -228,7 +218,6 @@ pub struct ListConversationsParams {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct ConversationSummary {
pub conversation_id: ConversationId,
pub path: PathBuf,
pub preview: String,
/// RFC3339 timestamp string for the session start, if available.
@@ -350,12 +339,6 @@ pub struct GetAuthStatusResponse {
pub auth_token: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct GetUserAgentResponse {
pub user_agent: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct GetUserSavedConfigResponse {
@@ -638,10 +621,4 @@ mod tests {
serde_json::to_value(&request).unwrap(),
);
}
#[test]
fn test_conversation_id_default_is_not_zeroes() {
let id = ConversationId::default();
assert_ne!(id.0, Uuid::nil());
}
}

View File

@@ -1,8 +1,7 @@
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, TS)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HistoryEntry {
pub conversation_id: String,
pub ts: u64,

View File

@@ -6,11 +6,10 @@ use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::ser::Serializer;
use ts_rs::TS;
use crate::protocol::InputItem;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ResponseInputItem {
Message {
@@ -31,7 +30,7 @@ pub enum ResponseInputItem {
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentItem {
InputText { text: String },
@@ -39,7 +38,7 @@ pub enum ContentItem {
OutputText { text: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ResponseItem {
Message {
@@ -160,7 +159,7 @@ impl From<ResponseInputItem> for ResponseItem {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum LocalShellStatus {
Completed,
@@ -168,13 +167,13 @@ pub enum LocalShellStatus {
Incomplete,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LocalShellAction {
Exec(LocalShellExecAction),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LocalShellExecAction {
pub command: Vec<String>,
pub timeout_ms: Option<u64>,
@@ -183,7 +182,7 @@ pub struct LocalShellExecAction {
pub user: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WebSearchAction {
Search {
@@ -193,13 +192,13 @@ pub enum WebSearchAction {
Other,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReasoningItemReasoningSummary {
SummaryText { text: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReasoningItemContent {
ReasoningText { text: String },
@@ -243,7 +242,7 @@ impl From<Vec<InputItem>> for ResponseInputItem {
/// If the `name` of a `ResponseItem::FunctionCall` is either `container.exec`
/// or shell`, the `arguments` field should deserialize to this struct.
#[derive(Deserialize, Debug, Clone, PartialEq, TS)]
#[derive(Deserialize, Debug, Clone, PartialEq)]
pub struct ShellToolCallParams {
pub command: Vec<String>,
pub workdir: Option<String>,
@@ -257,7 +256,7 @@ pub struct ShellToolCallParams {
pub justification: Option<String>,
}
#[derive(Debug, Clone, PartialEq, TS)]
#[derive(Debug, Clone, PartialEq)]
pub struct FunctionCallOutputPayload {
pub content: String,
pub success: Option<bool>,

View File

@@ -1,98 +0,0 @@
use std::sync::OnceLock;
use icu_decimal::DecimalFormatter;
use icu_decimal::input::Decimal;
use icu_decimal::options::DecimalFormatterOptions;
use icu_locale_core::Locale;
fn make_local_formatter() -> Option<DecimalFormatter> {
let loc: Locale = sys_locale::get_locale()?.parse().ok()?;
DecimalFormatter::try_new(loc.into(), DecimalFormatterOptions::default()).ok()
}
fn make_en_us_formatter() -> DecimalFormatter {
#![allow(clippy::expect_used)]
let loc: Locale = "en-US".parse().expect("en-US wasn't a valid locale");
DecimalFormatter::try_new(loc.into(), DecimalFormatterOptions::default())
.expect("en-US wasn't a valid locale")
}
fn formatter() -> &'static DecimalFormatter {
static FORMATTER: OnceLock<DecimalFormatter> = OnceLock::new();
FORMATTER.get_or_init(|| make_local_formatter().unwrap_or_else(make_en_us_formatter))
}
/// Format a u64 with locale-aware digit separators (e.g. "12345" -> "12,345"
/// for en-US).
pub fn format_with_separators(n: u64) -> String {
formatter().format(&Decimal::from(n)).to_string()
}
fn format_si_suffix_with_formatter(n: u64, formatter: &DecimalFormatter) -> String {
if n < 1000 {
return formatter.format(&Decimal::from(n)).to_string();
}
// Format `n / scale` with the requested number of fractional digits.
let format_scaled = |n: u64, scale: u64, frac_digits: u32| -> String {
let value = n as f64 / scale as f64;
let scaled: u64 = (value * 10f64.powi(frac_digits as i32)).round() as u64;
let mut dec = Decimal::from(scaled);
dec.multiply_pow10(-(frac_digits as i16));
formatter.format(&dec).to_string()
};
const UNITS: [(u64, &str); 3] = [(1_000, "K"), (1_000_000, "M"), (1_000_000_000, "G")];
let f = n as f64;
for &(scale, suffix) in &UNITS {
if (100.0 * f / scale as f64).round() < 1000.0 {
return format!("{}{}", format_scaled(n, scale, 2), suffix);
} else if (10.0 * f / scale as f64).round() < 1000.0 {
return format!("{}{}", format_scaled(n, scale, 1), suffix);
} else if (f / scale as f64).round() < 1000.0 {
return format!("{}{}", format_scaled(n, scale, 0), suffix);
}
}
// Above 1000G, keep wholeG precision.
format!(
"{}G",
format_with_separators(((n as f64) / 1e9).round() as u64)
)
}
/// Format token counts to 3 significant figures, using base-10 SI suffixes.
///
/// Examples (en-US):
/// - 999 -> "999"
/// - 1200 -> "1.20K"
/// - 123456789 -> "123M"
pub fn format_si_suffix(n: u64) -> String {
format_si_suffix_with_formatter(n, formatter())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kmg() {
let formatter = make_en_us_formatter();
let fmt = |n: u64| format_si_suffix_with_formatter(n, &formatter);
assert_eq!(fmt(0), "0");
assert_eq!(fmt(999), "999");
assert_eq!(fmt(1_000), "1.00K");
assert_eq!(fmt(1_200), "1.20K");
assert_eq!(fmt(10_000), "10.0K");
assert_eq!(fmt(100_000), "100K");
assert_eq!(fmt(999_500), "1.00M");
assert_eq!(fmt(1_000_000), "1.00M");
assert_eq!(fmt(1_234_000), "1.23M");
assert_eq!(fmt(12_345_678), "12.3M");
assert_eq!(fmt(999_950_000), "1.00G");
assert_eq!(fmt(1_000_000_000), "1.00G");
assert_eq!(fmt(1_234_000_000), "1.23G");
// Above 1000G we keep wholeG precision (no higher unit supported here).
assert_eq!(fmt(1_234_000_000_000), "1,234G");
}
}

View File

@@ -1,8 +1,7 @@
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ParsedCommand {
Read {

View File

@@ -1,9 +1,8 @@
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
// Types for the TODO tool arguments matching codex-vscode/todo-mcp/src/main.rs
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StepStatus {
Pending,
@@ -11,14 +10,14 @@ pub enum StepStatus {
Completed,
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PlanItemArg {
pub step: String,
pub status: StepStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct UpdatePlanArgs {
#[serde(default)]

View File

@@ -15,7 +15,7 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::custom_prompts::CustomPrompt;
use crate::mcp_protocol::ConversationId;
use crate::message_history::HistoryEntry;
use crate::num_format::format_with_separators;
use crate::models::ResponseItem;
use crate::parse_command::ParsedCommand;
use crate::plan_tool::UpdatePlanArgs;
use mcp_types::CallToolResult;
@@ -32,7 +32,6 @@ pub const USER_INSTRUCTIONS_OPEN_TAG: &str = "<user_instructions>";
pub const USER_INSTRUCTIONS_CLOSE_TAG: &str = "</user_instructions>";
pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = "</environment_context>";
pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:";
/// Submission Queue Entry - requests from user
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -59,6 +58,9 @@ pub enum Op {
items: Vec<InputItem>,
},
/// Undo the most recently applied turn diff using the local git repo.
UndoLastTurnDiff,
/// Similar to [`Op::UserInput`], but contains additional context required
/// for a turn of a [`crate::codex_conversation::CodexConversation`].
UserTurn {
@@ -148,7 +150,7 @@ pub enum Op {
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetConversationPath,
GetHistory,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
@@ -404,7 +406,7 @@ pub struct Event {
}
/// Response event from the agent
#[derive(Debug, Clone, Deserialize, Serialize, Display, TS)]
#[derive(Debug, Clone, Deserialize, Serialize, Display)]
#[serde(tag = "type", rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum EventMsg {
@@ -424,7 +426,7 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),
/// User/system input message (what was sent to the model).
/// User/system input message (what was sent to the model)
UserMessage(UserMessageEvent),
/// Agent text output delta message
@@ -498,27 +500,27 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationHistory(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}
// Individual event payload types matching each `EventMsg` variant.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ErrorEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TaskCompleteEvent {
pub last_agent_message: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TaskStartedEvent {
pub model_context_window: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, TS)]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct TokenUsage {
pub input_tokens: u64,
pub cached_input_tokens: u64,
@@ -527,7 +529,7 @@ pub struct TokenUsage {
pub total_tokens: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TokenUsageInfo {
pub total_token_usage: TokenUsage,
pub last_token_usage: TokenUsage,
@@ -564,7 +566,7 @@ impl TokenUsageInfo {
}
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TokenCountEvent {
pub info: Option<TokenUsageInfo>,
}
@@ -646,26 +648,19 @@ impl From<TokenUsage> for FinalOutput {
impl fmt::Display for FinalOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let token_usage = &self.token_usage;
write!(
f,
"Token usage: total={} input={}{} output={}{}",
format_with_separators(token_usage.blended_total()),
format_with_separators(token_usage.non_cached_input()),
token_usage.blended_total(),
token_usage.non_cached_input(),
if token_usage.cached_input() > 0 {
format!(
" (+ {} cached)",
format_with_separators(token_usage.cached_input())
)
format!(" (+ {} cached)", token_usage.cached_input())
} else {
String::new()
},
format_with_separators(token_usage.output_tokens),
token_usage.output_tokens,
if token_usage.reasoning_output_tokens > 0 {
format!(
" (reasoning {})",
format_with_separators(token_usage.reasoning_output_tokens)
)
format!(" (reasoning {})", token_usage.reasoning_output_tokens)
} else {
String::new()
}
@@ -673,12 +668,12 @@ impl fmt::Display for FinalOutput {
}
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum InputMessageKind {
/// Plain user text (default)
@@ -689,7 +684,7 @@ pub enum InputMessageKind {
EnvironmentContext,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct UserMessageEvent {
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -719,35 +714,35 @@ where
}
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageDeltaEvent {
pub delta: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningEvent {
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningRawContentEvent {
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningRawContentDeltaEvent {
pub delta: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningSectionBreakEvent {}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningDeltaEvent {
pub delta: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpInvocation {
/// Name of the MCP server as defined in the config.
pub server: String,
@@ -757,19 +752,18 @@ pub struct McpInvocation {
pub arguments: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpToolCallBeginEvent {
/// Identifier so this can be paired with the McpToolCallEnd event.
pub call_id: String,
pub invocation: McpInvocation,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpToolCallEndEvent {
/// Identifier for the corresponding McpToolCallBegin that finished.
pub call_id: String,
pub invocation: McpInvocation,
#[ts(type = "string")]
pub duration: Duration,
/// Result of the tool call. Note this could be an error.
pub result: Result<CallToolResult, String>,
@@ -784,12 +778,12 @@ impl McpToolCallEndEvent {
}
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WebSearchBeginEvent {
pub call_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WebSearchEndEvent {
pub call_id: String,
pub query: String,
@@ -797,13 +791,13 @@ pub struct WebSearchEndEvent {
/// Response payload for `Op::GetHistory` containing the current session's
/// in-memory transcript.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationPathResponseEvent {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConversationHistoryResponseEvent {
pub conversation_id: ConversationId,
pub path: PathBuf,
pub entries: Vec<ResponseItem>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandBeginEvent {
/// Identifier so this can be paired with the ExecCommandEnd event.
pub call_id: String,
@@ -814,7 +808,7 @@ pub struct ExecCommandBeginEvent {
pub parsed_cmd: Vec<ParsedCommand>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandEndEvent {
/// Identifier for the ExecCommandBegin that finished.
pub call_id: String,
@@ -828,13 +822,12 @@ pub struct ExecCommandEndEvent {
/// The command's exit code.
pub exit_code: i32,
/// The duration of the command execution.
#[ts(type = "string")]
pub duration: Duration,
/// Formatted output from the command, as seen by the model.
pub formatted_output: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, TS)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ExecOutputStream {
Stdout,
@@ -842,7 +835,7 @@ pub enum ExecOutputStream {
}
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, TS)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct ExecCommandOutputDeltaEvent {
/// Identifier for the ExecCommandBegin that produced this chunk.
pub call_id: String,
@@ -850,11 +843,10 @@ pub struct ExecCommandOutputDeltaEvent {
pub stream: ExecOutputStream,
/// Raw bytes from the stream (may not be valid UTF-8).
#[serde_as(as = "serde_with::base64::Base64")]
#[ts(type = "string")]
pub chunk: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecApprovalRequestEvent {
/// Identifier for the associated exec call, if available.
pub call_id: String,
@@ -867,7 +859,7 @@ pub struct ExecApprovalRequestEvent {
pub reason: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ApplyPatchApprovalRequestEvent {
/// Responses API call id for the associated patch apply call, if available.
pub call_id: String,
@@ -880,17 +872,17 @@ pub struct ApplyPatchApprovalRequestEvent {
pub grant_root: Option<PathBuf>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BackgroundEventEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamErrorEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PatchApplyBeginEvent {
/// Identifier so this can be paired with the PatchApplyEnd event.
pub call_id: String,
@@ -900,7 +892,7 @@ pub struct PatchApplyBeginEvent {
pub changes: HashMap<PathBuf, FileChange>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PatchApplyEndEvent {
/// Identifier for the PatchApplyBegin that finished.
pub call_id: String,
@@ -912,12 +904,12 @@ pub struct PatchApplyEndEvent {
pub success: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TurnDiffEvent {
pub unified_diff: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GetHistoryEntryResponseEvent {
pub offset: usize,
pub log_id: u64,
@@ -927,19 +919,19 @@ pub struct GetHistoryEntryResponseEvent {
}
/// Response payload for `Op::ListMcpTools`.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpListToolsResponseEvent {
/// Fully qualified tool name -> tool definition.
pub tools: std::collections::HashMap<String, McpTool>,
}
/// Response payload for `Op::ListCustomPrompts`.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ListCustomPromptsResponseEvent {
pub custom_prompts: Vec<CustomPrompt>,
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct SessionConfiguredEvent {
/// Name left as session_id instead of conversation_id for backwards compatibility.
pub session_id: ConversationId,
@@ -996,7 +988,7 @@ pub enum FileChange {
},
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Chunk {
/// 1-based line index of the first line in the original file
pub orig_index: u32,
@@ -1004,7 +996,7 @@ pub struct Chunk {
pub inserted_lines: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TurnAbortedEvent {
pub reason: TurnAbortReason,
}
@@ -1019,6 +1011,7 @@ pub enum TurnAbortReason {
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
/// Serialize Event to verify that its JSON representation has the expected
/// amount of nesting.

View File

@@ -3,13 +3,12 @@ use crate::backtrack_helpers;
use crate::pager_overlay::Overlay;
use crate::tui;
use crate::tui::TuiEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_protocol::mcp_protocol::ConversationId;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind;
/// Aggregates all backtrack-related state used by the App.
#[derive(Default)]
pub(crate) struct BacktrackState {
@@ -98,7 +97,7 @@ impl App {
) {
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetConversationPath,
codex_core::protocol::Op::GetHistory,
));
}
@@ -265,7 +264,7 @@ impl App {
pub(crate) async fn on_conversation_history_for_backtrack(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
) -> Result<()> {
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
&& ev.conversation_id == *base_id
@@ -281,16 +280,15 @@ impl App {
async fn fork_and_switch_to_new_conversation(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
drop_count: usize,
prefill: String,
) {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone())
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
.await;
// We aren't using the initial history UI replay in session configured because we have more accurate version of the history.
match result {
Ok(new_conv) => {
self.install_forked_conversation(tui, cfg, new_conv, drop_count, &prefill)
@@ -302,13 +300,12 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
conversation_path: std::path::PathBuf,
conversation_id: codex_protocol::mcp_protocol::ConversationId,
entries: Vec<codex_protocol::models::ResponseItem>,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(conversation_path, conversation_id, drop_count, cfg)
.fork_conversation(entries, drop_count, cfg)
.await
}

View File

@@ -1,4 +1,4 @@
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::Event;
use codex_file_search::FileMatch;
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
UpdateSandboxPolicy(SandboxPolicy),
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}

View File

@@ -1,5 +1,4 @@
use codex_core::protocol::TokenUsageInfo;
use codex_protocol::num_format::format_si_suffix;
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
use crossterm::event::KeyEventKind;
@@ -1277,11 +1276,8 @@ impl WidgetRef for ChatComposer {
let token_usage = &token_usage_info.total_token_usage;
hint.push(" ".into());
hint.push(
Span::from(format!(
"{} tokens used",
format_si_suffix(token_usage.blended_total())
))
.style(Style::default().add_modifier(Modifier::DIM)),
Span::from(format!("{} tokens used", token_usage.blended_total()))
.style(Style::default().add_modifier(Modifier::DIM)),
);
let last_token_usage = &token_usage_info.last_token_usage;
if let Some(context_window) = token_usage_info.model_context_window {

View File

@@ -411,6 +411,8 @@ impl ChatWidget {
fn on_background_event(&mut self, message: String) {
debug!("BackgroundEvent: {message}");
self.add_to_history(history_cell::new_background_event(message));
self.request_redraw();
}
fn on_stream_error(&mut self, message: String) {
@@ -862,6 +864,9 @@ impl ChatWidget {
tx.send(AppEvent::DiffResult(text));
});
}
SlashCommand::Undo => {
self.open_undo_confirmation_popup();
}
SlashCommand::Mention => {
self.insert_str("@");
}
@@ -1253,6 +1258,43 @@ impl ChatWidget {
);
}
fn open_undo_confirmation_popup(&mut self) {
let confirm_message = "Undoing the last Codex turn diff.".to_string();
let undo_actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
tx.send(AppEvent::InsertHistoryCell(Box::new(
history_cell::new_background_event(confirm_message.clone()),
)));
tx.send(AppEvent::CodexOp(Op::UndoLastTurnDiff));
})];
let items = vec![
SelectionItem {
name: "Undo last turn diff".to_string(),
description: Some(
"Revert files that Codex changed during the most recent turn.".to_string(),
),
is_current: false,
actions: undo_actions,
},
SelectionItem {
name: "Cancel".to_string(),
description: Some("Close without undoing any files.".to_string()),
is_current: false,
actions: Vec::new(),
},
];
self.bottom_pane.show_selection_view(
"Undo last Codex turn?".to_string(),
Some(
"Codex will apply a patch to restore files from before the previous turn."
.to_string(),
),
Some("Press Enter to confirm or Esc to cancel".to_string()),
items,
);
}
/// Set the approval policy in the widget's config copy.
pub(crate) fn set_approval_policy(&mut self, policy: AskForApproval) {
self.config.approval_policy = policy;

View File

@@ -13,6 +13,7 @@ use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningDeltaEvent;
use codex_core::protocol::AgentReasoningEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
@@ -177,10 +178,6 @@ fn resumed_initial_messages_render_history() {
);
}
#[cfg_attr(
target_os = "macos",
ignore = "system configuration APIs are blocked under macOS seatbelt"
)]
#[tokio::test(flavor = "current_thread")]
async fn helpers_are_available_and_do_not_panic() {
let (tx_raw, _rx) = unbounded_channel::<AppEvent>();
@@ -618,6 +615,58 @@ fn disabled_slash_command_while_task_running_snapshot() {
assert_snapshot!(blob);
}
#[test]
fn undo_command_requires_confirmation() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Undo);
assert!(rx.try_recv().is_err(), "undo should require confirmation");
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let mut undo_requested = false;
let mut history_lines = Vec::new();
while let Ok(event) = rx.try_recv() {
match event {
AppEvent::InsertHistoryCell(cell) => {
history_lines.push(cell.display_lines(80));
}
AppEvent::CodexOp(Op::UndoLastTurnDiff) => {
undo_requested = true;
}
_ => {}
}
}
assert!(undo_requested, "expected undo op after confirmation");
let combined = history_lines
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<String>();
assert!(combined.contains("Undoing the last Codex turn diff."));
}
#[test]
fn background_events_are_rendered_in_history() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.handle_codex_event(Event {
id: "undo".to_string(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Reverted last turn diff.".to_string(),
}),
});
let history = drain_insert_history(&mut rx);
let combined = history
.iter()
.map(|lines| lines_to_single_string(lines))
.collect::<String>();
assert!(combined.contains("Reverted last turn diff."));
}
#[tokio::test(flavor = "current_thread")]
async fn binary_size_transcript_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();

View File

@@ -49,60 +49,34 @@ pub struct PastedImageInfo {
/// Capture image from system clipboard, encode to PNG, and return bytes + info.
#[cfg(not(target_os = "android"))]
pub fn paste_image_as_png() -> Result<(Vec<u8>, PastedImageInfo), PasteImageError> {
let _span = tracing::debug_span!("paste_image_as_png").entered();
tracing::debug!("attempting clipboard image read");
let mut cb = arboard::Clipboard::new()
.map_err(|e| PasteImageError::ClipboardUnavailable(e.to_string()))?;
// Sometimes images on the clipboard come as files (e.g. when copy/pasting from
// Finder), sometimes they come as image data (e.g. when pasting from Chrome).
// Accept both, and prefer files if both are present.
let files = cb
.get()
.file_list()
.map_err(|e| PasteImageError::ClipboardUnavailable(e.to_string()));
let dyn_img = if let Some(img) = files
.unwrap_or_default()
.into_iter()
.find_map(|f| image::open(f).ok())
{
tracing::debug!(
"clipboard image opened from file: {}x{}",
img.width(),
img.height()
);
img
} else {
let _span = tracing::debug_span!("get_image").entered();
let img = cb
.get_image()
.map_err(|e| PasteImageError::NoImage(e.to_string()))?;
let w = img.width as u32;
let h = img.height as u32;
tracing::debug!("clipboard image opened from image: {}x{}", w, h);
let Some(rgba_img) = image::RgbaImage::from_raw(w, h, img.bytes.into_owned()) else {
return Err(PasteImageError::EncodeFailed("invalid RGBA buffer".into()));
};
image::DynamicImage::ImageRgba8(rgba_img)
};
let img = cb
.get_image()
.map_err(|e| PasteImageError::NoImage(e.to_string()))?;
let w = img.width as u32;
let h = img.height as u32;
let mut png: Vec<u8> = Vec::new();
let Some(rgba_img) = image::RgbaImage::from_raw(w, h, img.bytes.into_owned()) else {
return Err(PasteImageError::EncodeFailed("invalid RGBA buffer".into()));
};
let dyn_img = image::DynamicImage::ImageRgba8(rgba_img);
tracing::debug!("clipboard image decoded RGBA {w}x{h}");
{
let span =
tracing::debug_span!("encode_image", byte_length = tracing::field::Empty).entered();
let mut cursor = std::io::Cursor::new(&mut png);
dyn_img
.write_to(&mut cursor, image::ImageFormat::Png)
.map_err(|e| PasteImageError::EncodeFailed(e.to_string()))?;
span.record("byte_length", png.len());
}
tracing::debug!("clipboard image encoded to PNG ({}) bytes", png.len());
Ok((
png,
PastedImageInfo {
width: dyn_img.width(),
height: dyn_img.height(),
width: w,
height: h,
encoded_format: EncodedImageFormat::Png,
},
))

View File

@@ -28,7 +28,6 @@ use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TokenUsage;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::num_format::format_with_separators;
use codex_protocol::parse_command::ParsedCommand;
use image::DynamicImage;
use image::ImageReader;
@@ -965,7 +964,7 @@ pub(crate) fn new_status_output(
// Input: <input> [+ <cached> cached]
let mut input_line_spans: Vec<Span<'static>> = vec![
" • Input: ".into(),
format_with_separators(usage.non_cached_input()).into(),
usage.non_cached_input().to_string().into(),
];
if usage.cached_input_tokens > 0 {
let cached = usage.cached_input_tokens;
@@ -975,12 +974,12 @@ pub(crate) fn new_status_output(
// Output: <output>
lines.push(Line::from(vec![
" • Output: ".into(),
format_with_separators(usage.output_tokens).into(),
usage.output_tokens.to_string().into(),
]));
// Total: <total>
lines.push(Line::from(vec![
" • Total: ".into(),
format_with_separators(usage.blended_total()).into(),
usage.blended_total().to_string().into(),
]));
PlainHistoryCell { lines }
@@ -1065,6 +1064,11 @@ pub(crate) fn new_stream_error_event(message: String) -> PlainHistoryCell {
PlainHistoryCell { lines }
}
pub(crate) fn new_background_event(message: String) -> PlainHistoryCell {
let lines: Vec<Line<'static>> = vec![vec![padded_emoji("").into(), message.into()].into()];
PlainHistoryCell { lines }
}
/// Render a userfriendly plan update styled like a checkbox todo list.
pub(crate) fn new_plan_update(update: UpdatePlanArgs) -> PlanUpdateCell {
let UpdatePlanArgs { explanation, plan } = update;
@@ -1178,14 +1182,13 @@ pub(crate) fn new_proposed_command(command: &[String]) -> PlainHistoryCell {
let mut lines: Vec<Line<'static>> = Vec::new();
lines.push(Line::from(vec!["".into(), "Proposed Command".bold()]));
let highlighted_lines = crate::render::highlight::highlight_bash_to_lines(&cmd);
let cmd_lines: Vec<Line<'static>> = cmd
.lines()
.map(|part| Line::from(part.to_string()))
.collect();
let initial_prefix: Span<'static> = "".dim();
let subsequent_prefix: Span<'static> = " ".into();
lines.extend(prefix_lines(
highlighted_lines,
initial_prefix,
subsequent_prefix,
));
lines.extend(prefix_lines(cmd_lines, initial_prefix, subsequent_prefix));
PlainHistoryCell { lines }
}

View File

@@ -217,7 +217,6 @@ pub async fn run_main(
let file_layer = tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_target(false)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
.with_filter(env_filter());
if cli.oss {

View File

@@ -8,6 +8,7 @@ use codex_core::ConversationItem;
use codex_core::ConversationsPage;
use codex_core::Cursor;
use codex_core::RolloutRecorder;
use codex_core::protocol::InputMessageKind;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
@@ -23,10 +24,6 @@ use crate::text_formatting::truncate_text;
use crate::tui::FrameRequester;
use crate::tui::Tui;
use crate::tui::TuiEvent;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
const PAGE_SIZE: usize = 25;
@@ -276,7 +273,7 @@ fn head_to_row(item: &ConversationItem) -> Option<Row> {
ts = Some(parsed.with_timezone(&Utc));
}
let preview = preview_from_head(&item.head)?;
let preview = find_first_user_text(&item.head)?;
let preview = preview.trim().to_string();
if preview.is_empty() {
return None;
@@ -288,42 +285,37 @@ fn head_to_row(item: &ConversationItem) -> Option<Row> {
})
}
fn preview_from_head(head: &[serde_json::Value]) -> Option<String> {
head.iter()
.filter_map(|value| serde_json::from_value::<ResponseItem>(value.clone()).ok())
.find_map(|item| match item {
ResponseItem::Message { content, .. } => {
// Find the actual user message (as opposed to user instructions or ide context)
let preview = content
.into_iter()
.filter_map(|content| match content {
ContentItem::InputText { text }
if matches!(
InputMessageKind::from(("user", text.as_str())),
InputMessageKind::Plain
) =>
{
// Strip ide context.
let text = match text.find(USER_MESSAGE_BEGIN) {
Some(idx) => {
text[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string()
}
None => text,
};
Some(text)
}
_ => None,
})
.collect::<String>();
if preview.is_empty() {
None
} else {
Some(preview)
/// Return the first plain user text from the JSONL `head` of a rollout.
///
/// Strategy: scan for the first `{ type: "message", role: "user" }` entry and
/// then return the first `content` item where `{ type: "input_text" }` that is
/// classified as `InputMessageKind::Plain` (i.e., not wrapped in
/// `<user_instructions>` or `<environment_context>` tags).
fn find_first_user_text(head: &[serde_json::Value]) -> Option<String> {
for v in head.iter() {
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
if t != "message" {
continue;
}
if v.get("role").and_then(|x| x.as_str()) != Some("user") {
continue;
}
if let Some(arr) = v.get("content").and_then(|c| c.as_array()) {
for c in arr.iter() {
if let (Some("input_text"), Some(txt)) =
(c.get("type").and_then(|t| t.as_str()), c.get("text"))
&& let Some(s) = txt.as_str()
{
// Skip XML-wrapped user_instructions/environment_context blocks and
// return the first plain user text we find.
if matches!(InputMessageKind::from(("user", s)), InputMessageKind::Plain) {
return Some(s.to_string());
}
}
}
_ => None,
})
}
}
None
}
fn draw_picker(tui: &mut Tui, state: &PickerState) -> std::io::Result<()> {
@@ -460,26 +452,31 @@ mod tests {
}
#[test]
fn preview_uses_first_message_input_text() {
fn skips_user_instructions_and_env_context() {
let head = vec![
json!({ "timestamp": "2025-01-01T00:00:00Z" }),
json!({
"type": "message",
"role": "user",
"content": [
{ "type": "input_text", "text": "<user_instructions>hi</user_instructions>" },
{ "type": "input_text", "text": "real question" },
{ "type": "input_image", "image_url": "ignored" }
{ "type": "input_text", "text": "<user_instructions>hi</user_instructions>" }
]
}),
json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": "later text" } ]
"content": [
{ "type": "input_text", "text": "<environment_context>cwd</environment_context>" }
]
}),
json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": "real question" } ]
}),
];
let preview = preview_from_head(&head);
assert_eq!(preview.as_deref(), Some("real question"));
let first = find_first_user_text(&head);
assert_eq!(first.as_deref(), Some("real question"));
}
#[test]

View File

@@ -18,6 +18,7 @@ pub enum SlashCommand {
Init,
Compact,
Diff,
Undo,
Mention,
Status,
Mcp,
@@ -36,6 +37,7 @@ impl SlashCommand {
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
SlashCommand::Quit => "exit Codex",
SlashCommand::Diff => "show git diff (including untracked files)",
SlashCommand::Undo => "undo the last turn diff applied by Codex",
SlashCommand::Mention => "mention a file",
SlashCommand::Status => "show current session configuration and token usage",
SlashCommand::Model => "choose what model and reasoning effort to use",
@@ -63,6 +65,7 @@ impl SlashCommand {
| SlashCommand::Approvals
| SlashCommand::Logout => false,
SlashCommand::Diff
| SlashCommand::Undo
| SlashCommand::Mention
| SlashCommand::Status
| SlashCommand::Mcp