This commit is contained in:
Ahmed Ibrahim
2025-07-19 22:52:43 -07:00
parent 018003e52f
commit 4c66c11a62
3 changed files with 80 additions and 67 deletions

View File

@@ -527,7 +527,7 @@ async fn submission_loop(
ctrl_c: Arc<Notify>,
) {
// Generate a unique ID for the lifetime of this Codex session.
let mut session_id = Uuid::new_v4();
let session_id = Uuid::new_v4();
let mut sess: Option<Arc<Session>> = None;
// shorthand - send an event when there is no active session
@@ -597,41 +597,15 @@ async fn submission_loop(
}
return;
}
// Optionally resume an existing rollout.
let mut restored_items: Option<Vec<ResponseItem>> = None;
let mut restored_prev_id: Option<String> = None;
let rollout_recorder: Option<RolloutRecorder> =
if let Some(path) = resume_path.as_ref() {
match RolloutRecorder::resume(path).await {
Ok((rec, saved)) => {
session_id = saved.session_id;
restored_prev_id = saved.state.previous_response_id;
if !saved.items.is_empty() {
restored_items = Some(saved.items);
}
Some(rec)
}
Err(e) => {
warn!("failed to resume rollout from {path:?}: {e}");
None
}
}
} else {
None
};
let rollout_recorder = match rollout_recorder {
Some(rec) => Some(rec),
None => match RolloutRecorder::new(&config, session_id, instructions.clone())
.await
{
Ok(r) => Some(r),
Err(e) => {
warn!("failed to initialise rollout recorder: {e}");
None
}
},
};
let (rollout_recorder, restored_items, restored_prev_id, session_id) =
crate::rollout::prepare_rollout_recorder(
&config,
session_id,
instructions.clone(),
resume_path.as_deref(),
)
.await;
let client = ModelClient::new(
config.clone(),

View File

@@ -15,6 +15,7 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tracing::info;
use tracing::warn;
use uuid::Uuid;
use crate::config::Config;
@@ -297,3 +298,46 @@ async fn rollout_writer(
}
}
}
pub async fn prepare_rollout_recorder(
config: &Config,
mut session_id: Uuid,
instructions: Option<String>,
resume_path: Option<&Path>,
) -> (
Option<RolloutRecorder>,
Option<Vec<ResponseItem>>, // restored_items
Option<String>, // restored_prev_id
Uuid, // possibly updated session_id
) {
// Try to resume
let (mut restored_items, mut restored_prev_id, mut recorder_opt) = (None, None, None);
if let Some(path) = resume_path {
match RolloutRecorder::resume(path).await {
Ok((rec, saved)) => {
session_id = saved.session_id;
restored_prev_id = saved.state.previous_response_id;
if !saved.items.is_empty() {
restored_items = Some(saved.items);
}
recorder_opt = Some(rec);
}
Err(e) => {
warn!("failed to resume rollout from {path:?}: {e}");
}
}
}
// If not resumed, create a new recorder
if recorder_opt.is_none() {
match RolloutRecorder::new(config, session_id, instructions.clone()).await {
Ok(r) => recorder_opt = Some(r),
Err(e) => {
warn!("failed to initialise rollout recorder: {e}");
}
}
}
(recorder_opt, restored_items, restored_prev_id, session_id)
}

View File

@@ -1,7 +1,8 @@
#![expect(clippy::unwrap_used)]
use assert_cmd::Command as AssertCommand;
use assert_cmd::prelude::*;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use std::process::Command;
use std::time::Duration;
use std::time::Instant;
use tempfile::TempDir;
@@ -50,13 +51,8 @@ async fn chat_mode_stream_cli() {
"model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"chat\" }}",
server.uri()
);
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
let mut cmd = Command::cargo_bin("codex").unwrap();
cmd.arg("exec")
.arg("--skip-git-repo-check")
.arg("-c")
.arg(&provider_override)
@@ -100,13 +96,8 @@ async fn responses_api_stream_cli() {
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/cli_responses_fixture.sse");
let home = TempDir::new().unwrap();
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
let mut cmd = Command::cargo_bin("codex").unwrap();
cmd.arg("exec")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
@@ -146,13 +137,8 @@ async fn integration_creates_and_checks_session_file() {
// 4. Run the codex CLI through cargo (ensures the right bin is built) and invoke `exec`,
// which is what records a session.
let mut cmd = AssertCommand::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
let mut cmd = Command::cargo_bin("codex").unwrap();
cmd.arg("exec")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
@@ -176,6 +162,7 @@ async fn integration_creates_and_checks_session_file() {
while !sessions_dir.exists() && Instant::now() < dir_deadline {
std::thread::sleep(Duration::from_millis(50));
}
eprintln!("sessions_dir: {sessions_dir:?}");
assert!(sessions_dir.exists(), "sessions directory never appeared");
// Find the session file that contains `marker`.
@@ -185,34 +172,47 @@ async fn integration_creates_and_checks_session_file() {
for entry in WalkDir::new(&sessions_dir) {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
Err(_) => {
eprintln!("error walking dir: {entry:?}");
continue;
}
};
if !entry.file_type().is_file() {
eprintln!("not a file: {entry:?}");
continue;
}
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
eprintln!("not a jsonl file: {entry:?}");
continue;
}
let path = entry.path();
let Ok(content) = std::fs::read_to_string(path) else {
eprintln!("error reading file: {path:?}");
continue;
};
let mut lines = content.lines();
if lines.next().is_none() {
eprintln!("no lines in file: {path:?}");
continue;
}
for line in lines {
eprintln!("line: {line:?}");
if line.trim().is_empty() {
eprintln!("empty line in file: {path:?}");
continue;
}
let item: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
Err(_) => {
eprintln!("error parsing line as json: {line:?}");
continue;
}
};
if item.get("type").and_then(|t| t.as_str()) == Some("message") {
if let Some(c) = item.get("content") {
if c.to_string().contains(&marker) {
matching_path = Some(path.to_path_buf());
eprintln!("found matching path: {path:?}");
break;
}
}
@@ -223,12 +223,12 @@ async fn integration_creates_and_checks_session_file() {
std::thread::sleep(Duration::from_millis(50));
}
}
eprintln!("matching_path: {matching_path:?}");
let path = match matching_path {
Some(p) => p,
None => panic!("No session file containing the marker was found"),
};
// Basic sanity checks on location and metadata.
let rel = match path.strip_prefix(&sessions_dir) {
Ok(r) => r,
@@ -312,13 +312,8 @@ async fn integration_creates_and_checks_session_file() {
// to sidestep the issue.
let resume_path_str = path.to_string_lossy().replace('\\', "/");
let resume_override = format!("experimental_resume=\"{resume_path_str}\"");
let mut cmd2 = AssertCommand::new("cargo");
cmd2.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("exec")
let mut cmd2 = Command::cargo_bin("codex").unwrap();
cmd2.arg("exec")
.arg("--skip-git-repo-check")
.arg("-c")
.arg(&resume_override)