diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 71150d7126..d125784483 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -36,6 +36,7 @@ use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; #[tokio::test] async fn realtime_conversation_streams_v2_notifications() -> Result<()> { @@ -114,6 +115,18 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { assert_eq!(started.thread_id, thread_start.thread.id); assert!(started.session_id.is_some()); + let startup_context_request = realtime_server.wait_for_request(0, 0).await; + assert_eq!( + startup_context_request.body_json()["type"].as_str(), + Some("session.update") + ); + assert!( + startup_context_request.body_json()["session"]["instructions"] + .as_str() + .context("expected startup context instructions")? + .contains(STARTUP_CONTEXT_HEADER) + ); + let audio_append_request_id = mcp .send_thread_realtime_append_audio_request(ThreadRealtimeAppendAudioParams { thread_id: started.thread_id.clone(), @@ -183,6 +196,12 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { connection[0].body_json()["type"].as_str(), Some("session.update") ); + assert!( + connection[0].body_json()["session"]["instructions"] + .as_str() + .context("expected startup context instructions")? + .contains(STARTUP_CONTEXT_HEADER) + ); let mut request_types = [ connection[1].body_json()["type"] .as_str() diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 828bbe214a..9b822a85a1 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -13,6 +13,7 @@ pub mod auth; mod client; mod client_common; pub mod codex; +mod realtime_context; mod realtime_conversation; pub use codex::SteerInputError; mod codex_thread; diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs new file mode 100644 index 0000000000..0c2277091e --- /dev/null +++ b/codex-rs/core/src/realtime_context.rs @@ -0,0 +1,402 @@ +use crate::codex::Session; +use crate::git_info::resolve_root_git_project_for_trust; +use crate::truncate::TruncationPolicy; +use crate::truncate::truncate_text; +use codex_state::Stage1Output; +use std::ffi::OsStr; +use std::fs::DirEntry; +use std::io; +use std::path::Path; +use tracing::debug; +use tracing::warn; + +const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.\nThis is background context about the user, recent work, and machine/workspace layout. It may be incomplete or stale. Use it to inform responses, and do not repeat it back unless relevant."; +const USER_SECTION_TOKEN_BUDGET: usize = 800; +const RECENT_WORK_SECTION_TOKEN_BUDGET: usize = 2_200; +const WORKSPACE_SECTION_TOKEN_BUDGET: usize = 1_600; +const NOTES_SECTION_TOKEN_BUDGET: usize = 300; +const MAX_STAGE1_OUTPUTS: usize = 3; +const TREE_MAX_DEPTH: usize = 2; +const DIR_ENTRY_LIMIT: usize = 20; +const APPROX_BYTES_PER_TOKEN: usize = 4; +const NOISY_DIR_NAMES: &[&str] = &[ + ".git", + ".next", + ".pytest_cache", + ".ruff_cache", + "__pycache__", + "build", + "dist", + "node_modules", + "out", + "target", +]; + +pub(crate) async fn build_realtime_startup_context( + sess: &Session, + budget_tokens: usize, +) -> Option { + let config = sess.get_config().await; + let cwd = config.cwd.clone(); + let memories = load_global_memories(sess).await; + let user_section = build_user_section(&memories); + let recent_work_section = build_recent_work_section(&memories); + let workspace_section = build_workspace_section(&cwd); + + if user_section.is_none() && recent_work_section.is_none() && workspace_section.is_none() { + debug!("realtime startup context unavailable; skipping injection"); + return None; + } + + let notes_section = build_notes_section(); + let mut parts = vec![STARTUP_CONTEXT_HEADER.to_string()]; + + let has_user_section = user_section.is_some(); + let has_recent_work_section = recent_work_section.is_some(); + let has_workspace_section = workspace_section.is_some(); + + if let Some(section) = format_section("User", user_section, USER_SECTION_TOKEN_BUDGET) { + parts.push(section); + } + if let Some(section) = format_section( + "Recent Work", + recent_work_section, + RECENT_WORK_SECTION_TOKEN_BUDGET, + ) { + parts.push(section); + } + if let Some(section) = format_section( + "Machine / Workspace Map", + workspace_section, + WORKSPACE_SECTION_TOKEN_BUDGET, + ) { + parts.push(section); + } + if let Some(section) = format_section("Notes", Some(notes_section), NOTES_SECTION_TOKEN_BUDGET) + { + parts.push(section); + } + + let context = truncate_text(&parts.join("\n\n"), TruncationPolicy::Tokens(budget_tokens)); + debug!( + approx_tokens = approx_token_count(&context), + bytes = context.len(), + has_user_section, + has_recent_work_section, + has_workspace_section, + "built realtime startup context" + ); + Some(context) +} + +#[derive(Default)] +struct GlobalMemories { + entries: Vec, +} + +async fn load_global_memories(sess: &Session) -> GlobalMemories { + let Some(state_db) = sess.services.state_db.as_ref() else { + return GlobalMemories::default(); + }; + + match state_db + .list_stage1_outputs_for_global(MAX_STAGE1_OUTPUTS) + .await + { + Ok(entries) => GlobalMemories { entries }, + Err(err) => { + warn!("failed to load realtime startup memories from state db: {err}"); + GlobalMemories::default() + } + } +} + +fn build_user_section(memories: &GlobalMemories) -> Option { + let sections = memories + .entries + .iter() + .filter_map(|entry| format_memory_entry(entry, &entry.raw_memory)) + .collect::>(); + (!sections.is_empty()).then(|| sections.join("\n\n")) +} + +fn build_recent_work_section(memories: &GlobalMemories) -> Option { + let sections = memories + .entries + .iter() + .filter_map(|entry| format_memory_entry(entry, &entry.rollout_summary)) + .collect::>(); + (!sections.is_empty()).then(|| sections.join("\n\n")) +} + +fn build_workspace_section(cwd: &Path) -> Option { + let git_root = resolve_root_git_project_for_trust(cwd); + let cwd_tree = render_tree(cwd); + let git_root_tree = git_root + .as_ref() + .filter(|git_root| git_root.as_path() != cwd) + .and_then(|git_root| render_tree(git_root)); + let parent_layout = git_root + .as_ref() + .and_then(|_| cwd.parent()) + .and_then(|parent| render_parent_layout(parent, cwd.file_name())); + + if cwd_tree.is_none() && git_root.is_none() && parent_layout.is_none() { + return None; + } + + let mut lines = vec![ + format!("Current working directory: {}", cwd.display()), + format!("Working directory name: {}", display_name(cwd)), + ]; + + if let Some(git_root) = &git_root { + lines.push(format!("Git root: {}", git_root.display())); + lines.push(format!("Git project: {}", display_name(git_root))); + } + + if let Some(tree) = cwd_tree { + lines.push(String::new()); + lines.push("Working directory tree:".to_string()); + lines.extend(tree); + } + + if let Some(tree) = git_root_tree { + lines.push(String::new()); + lines.push("Git root tree:".to_string()); + lines.extend(tree); + } + + if let Some(layout) = parent_layout { + lines.push(String::new()); + lines.push("Parent workspace layout:".to_string()); + lines.extend(layout); + } + + Some(lines.join("\n")) +} + +fn render_tree(root: &Path) -> Option> { + if !root.is_dir() { + return None; + } + + let mut lines = Vec::new(); + collect_tree_lines(root, 0, &mut lines); + (!lines.is_empty()).then_some(lines) +} + +fn collect_tree_lines(dir: &Path, depth: usize, lines: &mut Vec) { + if depth >= TREE_MAX_DEPTH { + return; + } + + let entries = match read_sorted_entries(dir) { + Ok(entries) => entries, + Err(_) => return, + }; + let total_entries = entries.len(); + + for entry in entries.into_iter().take(DIR_ENTRY_LIMIT) { + let Ok(file_type) = entry.file_type() else { + continue; + }; + let name = file_name_string(&entry.path()); + let indent = " ".repeat(depth); + let suffix = if file_type.is_dir() { "/" } else { "" }; + lines.push(format!("{indent}- {name}{suffix}")); + if file_type.is_dir() { + collect_tree_lines(&entry.path(), depth + 1, lines); + } + } + + if total_entries > DIR_ENTRY_LIMIT { + lines.push(format!( + "{}- ... {} more entries", + " ".repeat(depth), + total_entries - DIR_ENTRY_LIMIT + )); + } +} + +fn render_parent_layout(parent: &Path, current: Option<&OsStr>) -> Option> { + let entries = read_sorted_entries(parent).ok()?; + if entries.len() <= 1 { + return None; + } + + let mut lines = Vec::new(); + for entry in entries.into_iter().take(DIR_ENTRY_LIMIT) { + let Ok(file_type) = entry.file_type() else { + continue; + }; + let name = entry.file_name(); + let mut label = name.to_string_lossy().into_owned(); + if file_type.is_dir() { + label.push('/'); + } + if current.is_some_and(|current| current == name) { + label.push_str(" (current)"); + } + lines.push(format!("- {label}")); + } + (!lines.is_empty()).then_some(lines) +} + +fn read_sorted_entries(dir: &Path) -> io::Result> { + let mut entries = std::fs::read_dir(dir)? + .filter_map(Result::ok) + .filter(|entry| !is_noisy_name(&entry.file_name())) + .collect::>(); + entries.sort_by(|left, right| { + let left_is_dir = left + .file_type() + .map(|file_type| file_type.is_dir()) + .unwrap_or(false); + let right_is_dir = right + .file_type() + .map(|file_type| file_type.is_dir()) + .unwrap_or(false); + (!left_is_dir, file_name_string(&left.path())) + .cmp(&(!right_is_dir, file_name_string(&right.path()))) + }); + Ok(entries) +} + +fn is_noisy_name(name: &OsStr) -> bool { + let name = name.to_string_lossy(); + NOISY_DIR_NAMES.iter().any(|noisy| *noisy == name) +} + +fn build_notes_section() -> String { + "Built at realtime startup from persisted global Codex memories in the state DB and a bounded local workspace scan. This excludes repo memory instructions, AGENTS files, and project-doc prompt blends.".to_string() +} + +fn format_section(title: &str, body: Option, budget_tokens: usize) -> Option { + let body = body?; + let body = body.trim(); + if body.is_empty() { + return None; + } + + Some(format!( + "## {title}\n{}", + truncate_text(body, TruncationPolicy::Tokens(budget_tokens)) + )) +} + +fn normalize_text(text: &str) -> String { + text.replace("\r\n", "\n").trim().to_string() +} + +fn format_memory_entry(entry: &Stage1Output, body: &str) -> Option { + let body = normalize_text(body); + if body.is_empty() { + return None; + } + + let mut lines = vec![ + format!("### {}", entry.source_updated_at.to_rfc3339()), + format!("cwd: {}", entry.cwd.display()), + ]; + if let Some(git_branch) = entry.git_branch.as_deref() { + lines.push(format!("git_branch: {git_branch}")); + } + lines.push(String::new()); + lines.push(body); + Some(lines.join("\n")) +} + +fn display_name(path: &Path) -> String { + path.file_name() + .and_then(OsStr::to_str) + .map(str::to_owned) + .unwrap_or_else(|| path.display().to_string()) +} + +fn file_name_string(path: &Path) -> String { + path.file_name() + .and_then(OsStr::to_str) + .map(str::to_owned) + .unwrap_or_else(|| path.display().to_string()) +} + +fn approx_token_count(text: &str) -> usize { + text.len().div_ceil(APPROX_BYTES_PER_TOKEN) +} + +#[cfg(test)] +mod tests { + use super::GlobalMemories; + use super::build_recent_work_section; + use super::build_user_section; + use super::build_workspace_section; + use chrono::TimeZone; + use chrono::Utc; + use codex_protocol::ThreadId; + use codex_state::Stage1Output; + use pretty_assertions::assert_eq; + use std::fs; + use std::path::PathBuf; + use tempfile::TempDir; + + fn stage1_output(raw_memory: &str, rollout_summary: &str) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + rollout_path: PathBuf::from("/tmp/rollout.jsonl"), + source_updated_at: Utc + .timestamp_opt(1_709_251_200, 0) + .single() + .expect("valid timestamp"), + raw_memory: raw_memory.to_string(), + rollout_summary: rollout_summary.to_string(), + rollout_slug: Some("slug".to_string()), + cwd: PathBuf::from("/tmp/workspace"), + git_branch: Some("main".to_string()), + generated_at: Utc + .timestamp_opt(1_709_251_260, 0) + .single() + .expect("valid timestamp"), + } + } + + #[test] + fn workspace_section_requires_meaningful_structure() { + let cwd = TempDir::new().expect("tempdir"); + assert_eq!(build_workspace_section(cwd.path()), None); + } + + #[test] + fn workspace_section_includes_tree_when_entries_exist() { + let cwd = TempDir::new().expect("tempdir"); + fs::create_dir(cwd.path().join("docs")).expect("create docs dir"); + fs::write(cwd.path().join("README.md"), "hello").expect("write readme"); + + let section = build_workspace_section(cwd.path()).expect("workspace section"); + assert!(section.contains("Working directory tree:")); + assert!(section.contains("- docs/")); + assert!(section.contains("- README.md")); + } + + #[test] + fn recent_work_section_uses_rollout_summaries() { + let memories = GlobalMemories { + entries: vec![stage1_output("user memory", "recent")], + }; + + let section = build_recent_work_section(&memories).expect("recent work section"); + assert!(section.contains("cwd: /tmp/workspace")); + assert!(section.contains("recent")); + } + + #[test] + fn user_section_uses_raw_memories() { + let memories = GlobalMemories { + entries: vec![stage1_output("prefers concise updates", "recent")], + }; + + let section = build_user_section(&memories).expect("user section"); + assert!(section.contains("prefers concise updates")); + assert!(section.contains("git_branch: main")); + } +} diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 656b590e57..4d8d6127d5 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -5,6 +5,7 @@ use crate::codex::Session; use crate::default_client::default_headers; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::realtime_context::build_realtime_startup_context; use async_channel::Receiver; use async_channel::Sender; use async_channel::TrySendError; @@ -43,6 +44,7 @@ const AUDIO_IN_QUEUE_CAPACITY: usize = 256; const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64; const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; +const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000; pub(crate) struct RealtimeConversationManager { state: Mutex>, @@ -282,6 +284,13 @@ pub(crate) async fn handle_start( .experimental_realtime_ws_backend_prompt .clone() .unwrap_or(params.prompt); + let prompt = + match build_realtime_startup_context(sess.as_ref(), REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET) + .await + { + Some(context) => format!("{prompt}\n\n{context}"), + None => prompt, + }; let model = config.experimental_realtime_ws_model.clone(); let requested_session_id = params diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 1a3d87b55a..645a65d809 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -1,6 +1,8 @@ use anyhow::Result; +use chrono::Utc; use codex_core::CodexAuth; use codex_core::auth::OPENAI_API_KEY_ENV_VAR; +use codex_protocol::ThreadId; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::ConversationStartParams; @@ -11,13 +13,16 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeEvent; +use codex_protocol::protocol::SessionSource; use codex_protocol::user_input::UserInput; +use codex_state::Stage1JobClaimOutcome; use core_test_support::responses; use core_test_support::responses::start_mock_server; use core_test_support::responses::start_websocket_server; use core_test_support::skip_if_no_network; use core_test_support::streaming_sse::StreamingSseChunk; use core_test_support::streaming_sse::start_streaming_sse_server; +use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; @@ -25,9 +30,82 @@ use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::ffi::OsString; +use std::fs; use std::time::Duration; use tokio::sync::oneshot; +const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; +const MEMORY_PROMPT_PHRASE: &str = + "You have access to a memory folder with guidance from prior runs."; + +fn websocket_request_text( + request: &core_test_support::responses::WebSocketRequest, +) -> Option { + request.body_json()["item"]["content"][0]["text"] + .as_str() + .map(str::to_owned) +} + +fn websocket_request_instructions( + request: &core_test_support::responses::WebSocketRequest, +) -> Option { + request.body_json()["session"]["instructions"] + .as_str() + .map(str::to_owned) +} + +async fn seed_stage1_output( + test: &TestCodex, + raw_memory: &str, + rollout_summary: &str, + rollout_slug: &str, +) -> Result<()> { + let db = test.codex.state_db().expect("state db enabled"); + let thread_id = ThreadId::new(); + let updated_at = Utc::now(); + let mut metadata_builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + test.codex_home_path() + .join(format!("rollout-{thread_id}.jsonl")), + updated_at, + SessionSource::Cli, + ); + metadata_builder.cwd = test.workspace_path(format!("workspace-{rollout_slug}")); + metadata_builder.model_provider = Some("test-provider".to_string()); + metadata_builder.git_branch = Some(format!("branch-{rollout_slug}")); + let metadata = metadata_builder.build("test-provider"); + db.upsert_thread(&metadata).await?; + + let claim = db + .try_claim_stage1_job( + thread_id, + ThreadId::new(), + updated_at.timestamp(), + 3_600, + 64, + ) + .await?; + let ownership_token = match claim { + Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stage-1 claim outcome: {other:?}"), + }; + + assert!( + db.mark_stage1_job_succeeded( + thread_id, + &ownership_token, + updated_at.timestamp(), + raw_memory, + rollout_summary, + Some(rollout_slug), + ) + .await?, + "stage-1 success should enqueue global consolidation" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_start_audio_text_close_round_trip() -> Result<()> { skip_if_no_network!(Ok(())); @@ -122,10 +200,9 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { connection[0].body_json()["type"].as_str(), Some("session.update") ); - assert_eq!( - connection[0].body_json()["session"]["instructions"].as_str(), - Some("backend prompt") - ); + let initial_instructions = websocket_request_instructions(&connection[0]) + .expect("initial session update instructions"); + assert!(initial_instructions.starts_with("backend prompt")); assert_eq!( server.handshakes()[1] .header("x-session-id") @@ -452,19 +529,17 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { let connections = server.connections(); assert_eq!(connections.len(), 3); assert_eq!(connections[1].len(), 1); - assert_eq!( - connections[1][0].body_json()["session"]["instructions"].as_str(), - Some("old") - ); + let old_instructions = + websocket_request_instructions(&connections[1][0]).expect("old session instructions"); + assert!(old_instructions.starts_with("old")); assert_eq!( server.handshakes()[1].header("x-session-id").as_deref(), Some("conv_old") ); assert_eq!(connections[2].len(), 2); - assert_eq!( - connections[2][0].body_json()["session"]["instructions"].as_str(), - Some("new") - ); + let new_instructions = + websocket_request_instructions(&connections[2][0]).expect("new session instructions"); + assert!(new_instructions.starts_with("new")); assert_eq!( server.handshakes()[2].header("x-session-id").as_deref(), Some("conv_new") @@ -570,9 +645,175 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> let connections = server.connections(); assert_eq!(connections.len(), 2); + let overridden_instructions = websocket_request_instructions(&connections[1][0]) + .expect("overridden session instructions"); + assert!(overridden_instructions.starts_with("prompt from config")); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_start_injects_startup_context_from_global_memories() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![vec![json!({ + "type": "session.updated", + "session": { "id": "sess_context", "instructions": "backend prompt" } + })]], + ]) + .await; + + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + seed_stage1_output( + &test, + "User profile: principal engineer focused on Codex reliability and realtime UX.", + "Recent work: cleaned up startup flows and reviewed websocket routing.", + "latest", + ) + .await?; + fs::create_dir_all(test.workspace_path("docs"))?; + fs::write(test.workspace_path("README.md"), "workspace marker")?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_context" => Some(Ok(())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + + let startup_context_request = server.wait_for_request(1, 0).await; + let startup_context = websocket_request_instructions(&startup_context_request) + .expect("startup context request should contain instructions"); + + assert!(startup_context.contains(STARTUP_CONTEXT_HEADER)); + assert!(startup_context.contains("User profile: principal engineer")); + assert!(startup_context.contains("Recent work: cleaned up startup flows")); + assert!(startup_context.contains("git_branch: branch-latest")); + assert!(startup_context.contains("## Machine / Workspace Map")); + assert!(startup_context.contains("README.md")); + assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE)); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![vec![json!({ + "type": "session.updated", + "session": { "id": "sess_workspace", "instructions": "backend prompt" } + })]], + ]) + .await; + + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + fs::create_dir_all(test.workspace_path("codex-rs/core"))?; + fs::write(test.workspace_path("notes.txt"), "workspace marker")?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_workspace" => Some(Ok(())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + + let startup_context_request = server.wait_for_request(1, 0).await; + let startup_context = websocket_request_instructions(&startup_context_request) + .expect("startup context request should contain instructions"); + + assert!(startup_context.contains(STARTUP_CONTEXT_HEADER)); + assert!(startup_context.contains("## Machine / Workspace Map")); + assert!(startup_context.contains("notes.txt")); + assert!(startup_context.contains("codex-rs/")); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![ + vec![json!({ + "type": "session.updated", + "session": { "id": "sess_truncated", "instructions": "backend prompt" } + })], + vec![], + ], + ]) + .await; + + let oversized_summary = "recent work ".repeat(3_500); + let mut builder = test_codex(); + let test = builder.build_with_websocket_server(&server).await?; + seed_stage1_output(&test, &oversized_summary, "summary", "oversized").await?; + fs::write(test.workspace_path("marker.txt"), "marker")?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_truncated" => Some(Ok(())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + + let startup_context_request = server.wait_for_request(1, 0).await; + let startup_context = websocket_request_instructions(&startup_context_request) + .expect("startup context request should contain instructions"); + assert!(startup_context.contains(STARTUP_CONTEXT_HEADER)); + assert!(startup_context.len() <= 20_500); + + test.codex + .submit(Op::RealtimeConversationText(ConversationTextParams { + text: "hello".to_string(), + })) + .await?; + + let explicit_text_request = server.wait_for_request(1, 1).await; assert_eq!( - connections[1][0].body_json()["session"]["instructions"].as_str(), - Some("prompt from config") + websocket_request_text(&explicit_text_request), + Some("hello".to_string()) ); server.shutdown().await;