mirror of
https://github.com/openai/codex.git
synced 2026-05-01 03:42:05 +03:00
Append realtime startup context to initial session update
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ConversationAudioParams;
|
||||
use codex_protocol::protocol::ConversationStartParams;
|
||||
@@ -11,13 +13,16 @@ use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_state::Stage1JobClaimOutcome;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::streaming_sse::StreamingSseChunk;
|
||||
use core_test_support::streaming_sse::start_streaming_sse_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
@@ -25,9 +30,82 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
request.body_json()["item"]["content"][0]["text"]
|
||||
.as_str()
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
fn websocket_request_instructions(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
request.body_json()["session"]["instructions"]
|
||||
.as_str()
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
async fn seed_stage1_output(
|
||||
test: &TestCodex,
|
||||
raw_memory: &str,
|
||||
rollout_summary: &str,
|
||||
rollout_slug: &str,
|
||||
) -> Result<()> {
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let thread_id = ThreadId::new();
|
||||
let updated_at = Utc::now();
|
||||
let mut metadata_builder = codex_state::ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
test.codex_home_path()
|
||||
.join(format!("rollout-{thread_id}.jsonl")),
|
||||
updated_at,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
metadata_builder.cwd = test.workspace_path(format!("workspace-{rollout_slug}"));
|
||||
metadata_builder.model_provider = Some("test-provider".to_string());
|
||||
metadata_builder.git_branch = Some(format!("branch-{rollout_slug}"));
|
||||
let metadata = metadata_builder.build("test-provider");
|
||||
db.upsert_thread(&metadata).await?;
|
||||
|
||||
let claim = db
|
||||
.try_claim_stage1_job(
|
||||
thread_id,
|
||||
ThreadId::new(),
|
||||
updated_at.timestamp(),
|
||||
3_600,
|
||||
64,
|
||||
)
|
||||
.await?;
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
assert!(
|
||||
db.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
&ownership_token,
|
||||
updated_at.timestamp(),
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
Some(rollout_slug),
|
||||
)
|
||||
.await?,
|
||||
"stage-1 success should enqueue global consolidation"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -122,10 +200,9 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
connection[0].body_json()["type"].as_str(),
|
||||
Some("session.update")
|
||||
);
|
||||
assert_eq!(
|
||||
connection[0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("backend prompt")
|
||||
);
|
||||
let initial_instructions = websocket_request_instructions(&connection[0])
|
||||
.expect("initial session update instructions");
|
||||
assert!(initial_instructions.starts_with("backend prompt"));
|
||||
assert_eq!(
|
||||
server.handshakes()[1]
|
||||
.header("x-session-id")
|
||||
@@ -452,19 +529,17 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 3);
|
||||
assert_eq!(connections[1].len(), 1);
|
||||
assert_eq!(
|
||||
connections[1][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("old")
|
||||
);
|
||||
let old_instructions =
|
||||
websocket_request_instructions(&connections[1][0]).expect("old session instructions");
|
||||
assert!(old_instructions.starts_with("old"));
|
||||
assert_eq!(
|
||||
server.handshakes()[1].header("x-session-id").as_deref(),
|
||||
Some("conv_old")
|
||||
);
|
||||
assert_eq!(connections[2].len(), 2);
|
||||
assert_eq!(
|
||||
connections[2][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("new")
|
||||
);
|
||||
let new_instructions =
|
||||
websocket_request_instructions(&connections[2][0]).expect("new session instructions");
|
||||
assert!(new_instructions.starts_with("new"));
|
||||
assert_eq!(
|
||||
server.handshakes()[2].header("x-session-id").as_deref(),
|
||||
Some("conv_new")
|
||||
@@ -570,9 +645,175 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
|
||||
let connections = server.connections();
|
||||
assert_eq!(connections.len(), 2);
|
||||
let overridden_instructions = websocket_request_instructions(&connections[1][0])
|
||||
.expect("overridden session instructions");
|
||||
assert!(overridden_instructions.starts_with("prompt from config"));
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_injects_startup_context_from_global_memories() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
seed_stage1_output(
|
||||
&test,
|
||||
"User profile: principal engineer focused on Codex reliability and realtime UX.",
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
"latest",
|
||||
)
|
||||
.await?;
|
||||
fs::create_dir_all(test.workspace_path("docs"))?;
|
||||
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.contains("User profile: principal engineer"));
|
||||
assert!(startup_context.contains("Recent work: cleaned up startup flows"));
|
||||
assert!(startup_context.contains("git_branch: branch-latest"));
|
||||
assert!(startup_context.contains("## Machine / Workspace Map"));
|
||||
assert!(startup_context.contains("README.md"));
|
||||
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
|
||||
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_workspace" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.contains("## Machine / Workspace Map"));
|
||||
assert!(startup_context.contains("notes.txt"));
|
||||
assert!(startup_context.contains("codex-rs/"));
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
],
|
||||
])
|
||||
.await;
|
||||
|
||||
let oversized_summary = "recent work ".repeat(3_500);
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
seed_stage1_output(&test, &oversized_summary, "summary", "oversized").await?;
|
||||
fs::write(test.workspace_path("marker.txt"), "marker")?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_truncated" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.len() <= 20_500);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationText(ConversationTextParams {
|
||||
text: "hello".to_string(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let explicit_text_request = server.wait_for_request(1, 1).await;
|
||||
assert_eq!(
|
||||
connections[1][0].body_json()["session"]["instructions"].as_str(),
|
||||
Some("prompt from config")
|
||||
websocket_request_text(&explicit_text_request),
|
||||
Some("hello".to_string())
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
|
||||
Reference in New Issue
Block a user