mirror of
https://github.com/openai/codex.git
synced 2026-04-26 17:31:02 +03:00
Compare commits
3 Commits
codex-debu
...
codex/pash
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c300ec25b8 | ||
|
|
29c79ac63b | ||
|
|
d049004391 |
@@ -101,6 +101,7 @@ use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
pub const X_CODEX_TURN_ID_HEADER: &str = "x-codex-turn-id";
|
||||
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
@@ -393,9 +394,11 @@ impl ModelClient {
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
|
||||
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
|
||||
let headers =
|
||||
self.build_websocket_headers(turn_state.as_ref(), turn_id_header, turn_metadata_header);
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(otel_manager);
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
@@ -414,12 +417,15 @@ impl ModelClient {
|
||||
fn build_websocket_headers(
|
||||
&self,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> ApiHeaderMap {
|
||||
let turn_id_header = parse_turn_id_header(turn_id_header);
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let mut headers = build_responses_headers(
|
||||
self.state.beta_features_header.as_deref(),
|
||||
turn_state,
|
||||
turn_id_header.as_ref(),
|
||||
turn_metadata_header.as_ref(),
|
||||
);
|
||||
headers.extend(build_conversation_headers(Some(
|
||||
@@ -523,9 +529,11 @@ impl ModelClientSession {
|
||||
/// regardless of transport choice.
|
||||
fn build_responses_options(
|
||||
&self,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
compression: Compression,
|
||||
) -> ApiResponsesOptions {
|
||||
let turn_id_header = parse_turn_id_header(turn_id_header);
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let conversation_id = self.client.state.conversation_id.to_string();
|
||||
|
||||
@@ -535,6 +543,7 @@ impl ModelClientSession {
|
||||
extra_headers: build_responses_headers(
|
||||
self.client.state.beta_features_header.as_deref(),
|
||||
Some(&self.turn_state),
|
||||
turn_id_header.as_ref(),
|
||||
turn_metadata_header.as_ref(),
|
||||
),
|
||||
compression,
|
||||
@@ -630,6 +639,7 @@ impl ModelClientSession {
|
||||
&mut self,
|
||||
otel_manager: &OtelManager,
|
||||
model_info: &ModelInfo,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> std::result::Result<(), ApiError> {
|
||||
if !self.client.responses_websocket_enabled(model_info) || self.client.websockets_disabled()
|
||||
@@ -653,6 +663,7 @@ impl ModelClientSession {
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
Some(Arc::clone(&self.turn_state)),
|
||||
turn_id_header,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await?;
|
||||
@@ -666,6 +677,7 @@ impl ModelClientSession {
|
||||
otel_manager: &OtelManager,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
options: &ApiResponsesOptions,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
@@ -688,6 +700,7 @@ impl ModelClientSession {
|
||||
api_provider,
|
||||
api_auth,
|
||||
Some(turn_state),
|
||||
turn_id_header,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await?;
|
||||
@@ -722,6 +735,7 @@ impl ModelClientSession {
|
||||
otel_manager: &OtelManager,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<ResponseStream> {
|
||||
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
||||
@@ -744,7 +758,8 @@ impl ModelClientSession {
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
let options =
|
||||
self.build_responses_options(turn_id_header, turn_metadata_header, compression);
|
||||
|
||||
let request = self.build_responses_request(
|
||||
&client_setup.api_provider,
|
||||
@@ -786,6 +801,7 @@ impl ModelClientSession {
|
||||
otel_manager: &OtelManager,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<WebsocketStreamOutcome> {
|
||||
let auth_manager = self.client.state.auth_manager.clone();
|
||||
@@ -797,7 +813,8 @@ impl ModelClientSession {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
let options =
|
||||
self.build_responses_options(turn_id_header, turn_metadata_header, compression);
|
||||
let request = self.build_responses_request(
|
||||
&client_setup.api_provider,
|
||||
prompt,
|
||||
@@ -812,6 +829,7 @@ impl ModelClientSession {
|
||||
otel_manager,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
turn_id_header,
|
||||
turn_metadata_header,
|
||||
&options,
|
||||
)
|
||||
@@ -885,6 +903,7 @@ impl ModelClientSession {
|
||||
otel_manager: &OtelManager,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
turn_id_header: Option<&str>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<ResponseStream> {
|
||||
let wire_api = self.client.state.provider.wire_api;
|
||||
@@ -901,6 +920,7 @@ impl ModelClientSession {
|
||||
otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
turn_id_header,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await?
|
||||
@@ -918,6 +938,7 @@ impl ModelClientSession {
|
||||
otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
turn_id_header,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await
|
||||
@@ -954,10 +975,10 @@ impl ModelClientSession {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses per-turn metadata into an HTTP header value.
|
||||
///
|
||||
/// Invalid values are treated as absent so callers can compare and propagate
|
||||
/// metadata with the same sanitization path used when constructing headers.
|
||||
fn parse_turn_id_header(turn_id_header: Option<&str>) -> Option<HeaderValue> {
|
||||
turn_id_header.and_then(|value| HeaderValue::from_str(value).ok())
|
||||
}
|
||||
|
||||
fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<HeaderValue> {
|
||||
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
|
||||
}
|
||||
@@ -968,10 +989,12 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
|
||||
///
|
||||
/// - `x-codex-beta-features`: comma-separated beta feature keys enabled for the session.
|
||||
/// - `x-codex-turn-state`: sticky routing token captured earlier in the turn.
|
||||
/// - `x-codex-turn-id`: current turn id (`sub_id`) when available for this request.
|
||||
/// - `x-codex-turn-metadata`: optional per-turn metadata for observability.
|
||||
fn build_responses_headers(
|
||||
beta_features_header: Option<&str>,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
turn_id_header: Option<&HeaderValue>,
|
||||
turn_metadata_header: Option<&HeaderValue>,
|
||||
) -> ApiHeaderMap {
|
||||
let mut headers = ApiHeaderMap::new();
|
||||
@@ -987,6 +1010,9 @@ fn build_responses_headers(
|
||||
{
|
||||
headers.insert(X_CODEX_TURN_STATE_HEADER, header_value);
|
||||
}
|
||||
if let Some(turn_id_header) = turn_id_header {
|
||||
headers.insert(X_CODEX_TURN_ID_HEADER, turn_id_header.clone());
|
||||
}
|
||||
if let Some(header_value) = turn_metadata_header {
|
||||
headers.insert(X_CODEX_TURN_METADATA_HEADER, header_value.clone());
|
||||
}
|
||||
|
||||
@@ -37,6 +37,8 @@ use crate::stream_events_utils::handle_output_item_done;
|
||||
use crate::stream_events_utils::last_assistant_message_from_item;
|
||||
use crate::terminal;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::turn_metadata::TurnMetadataHeaderJob;
|
||||
use crate::turn_metadata::TurnMetadataPoll;
|
||||
use crate::turn_metadata::build_turn_metadata_header;
|
||||
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
|
||||
use crate::util::error_or_panic;
|
||||
@@ -90,7 +92,6 @@ use rmcp::model::RequestId;
|
||||
use serde_json;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::watch;
|
||||
@@ -102,7 +103,6 @@ use tracing::field;
|
||||
use tracing::info;
|
||||
use tracing::info_span;
|
||||
use tracing::instrument;
|
||||
use tracing::trace;
|
||||
use tracing::trace_span;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
@@ -559,7 +559,7 @@ pub(crate) struct TurnContext {
|
||||
pub(crate) truncation_policy: TruncationPolicy,
|
||||
pub(crate) js_repl: Arc<JsReplHandle>,
|
||||
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
|
||||
turn_metadata_header: OnceCell<Option<String>>,
|
||||
turn_metadata_job: Arc<TurnMetadataHeaderJob>,
|
||||
}
|
||||
impl TurnContext {
|
||||
pub(crate) fn model_context_window(&self) -> Option<i64> {
|
||||
@@ -639,7 +639,7 @@ impl TurnContext {
|
||||
truncation_policy,
|
||||
js_repl: Arc::clone(&self.js_repl),
|
||||
dynamic_tools: self.dynamic_tools.clone(),
|
||||
turn_metadata_header: self.turn_metadata_header.clone(),
|
||||
turn_metadata_job: self.turn_metadata_job.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -655,39 +655,23 @@ impl TurnContext {
|
||||
.unwrap_or(compact::SUMMARIZATION_PROMPT)
|
||||
}
|
||||
|
||||
async fn build_turn_metadata_header(&self) -> Option<String> {
|
||||
let sandbox = sandbox_tag(&self.sandbox_policy, self.windows_sandbox_level);
|
||||
self.turn_metadata_header
|
||||
.get_or_init(|| async {
|
||||
build_turn_metadata_header(self.cwd.as_path(), Some(sandbox)).await
|
||||
})
|
||||
.await
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Resolves the per-turn metadata header under a shared timeout policy.
|
||||
///
|
||||
/// This uses the same timeout helper as websocket startup prewarm so both turn execution and
|
||||
/// background prewarm observe identical "timeout means best-effort fallback" behavior.
|
||||
pub async fn resolve_turn_metadata_header(&self) -> Option<String> {
|
||||
resolve_turn_metadata_header_with_timeout(
|
||||
self.build_turn_metadata_header(),
|
||||
self.turn_metadata_header.get().cloned().flatten(),
|
||||
)
|
||||
.await
|
||||
/// Returns turn metadata if ready, without waiting on metadata computation.
|
||||
pub(crate) fn poll_turn_metadata_header(&self) -> TurnMetadataPoll {
|
||||
self.turn_metadata_job.poll()
|
||||
}
|
||||
|
||||
/// Starts best-effort background computation of turn metadata.
|
||||
///
|
||||
/// This warms the cached value used by [`TurnContext::resolve_turn_metadata_header`] so turns
|
||||
/// and websocket prewarm are less likely to pay metadata construction latency on demand.
|
||||
/// Requests do not await this task. Callers should poll via
|
||||
/// [`TurnContext::poll_turn_metadata_header`] and attach metadata only when ready.
|
||||
pub fn spawn_turn_metadata_header_task(self: &Arc<Self>) {
|
||||
let context = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
trace!("Spawning turn metadata calculation task");
|
||||
context.build_turn_metadata_header().await;
|
||||
trace!("Turn metadata calculation task completed");
|
||||
});
|
||||
let sandbox = sandbox_tag(&self.sandbox_policy, self.windows_sandbox_level).to_string();
|
||||
self.turn_metadata_job
|
||||
.spawn(self.cwd.clone(), Some(sandbox));
|
||||
}
|
||||
|
||||
pub(crate) fn cancel_turn_metadata_header_task(&self) {
|
||||
self.turn_metadata_job.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -944,7 +928,7 @@ impl Session {
|
||||
truncation_policy: model_info.truncation_policy.into(),
|
||||
js_repl,
|
||||
dynamic_tools: session_configuration.dynamic_tools.clone(),
|
||||
turn_metadata_header: OnceCell::new(),
|
||||
turn_metadata_job: Arc::new(TurnMetadataHeaderJob::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4028,7 +4012,7 @@ async fn spawn_review_thread(
|
||||
js_repl: Arc::clone(&sess.js_repl),
|
||||
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
|
||||
truncation_policy: model_info.truncation_policy.into(),
|
||||
turn_metadata_header: parent_turn_context.turn_metadata_header.clone(),
|
||||
turn_metadata_job: Arc::new(TurnMetadataHeaderJob::default()),
|
||||
};
|
||||
|
||||
// Seed the child task with the review prompt as the initial user message.
|
||||
@@ -4038,6 +4022,7 @@ async fn spawn_review_thread(
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
let tc = Arc::new(review_turn_context);
|
||||
tc.spawn_turn_metadata_header_task();
|
||||
sess.spawn_task(tc.clone(), input, ReviewTask::new()).await;
|
||||
|
||||
// Announce entering review mode so UIs can switch modes.
|
||||
@@ -4116,7 +4101,14 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
|
||||
/// back to the model in the next sampling request.
|
||||
/// - If the model sends only an assistant message, we record it in the
|
||||
/// conversation history and consider the turn complete.
|
||||
///
|
||||
struct TurnMetadataJobCancellation<'a>(&'a TurnContext);
|
||||
|
||||
impl Drop for TurnMetadataJobCancellation<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.0.cancel_turn_metadata_header_task();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
@@ -4124,6 +4116,8 @@ pub(crate) async fn run_turn(
|
||||
prewarmed_client_session: Option<ModelClientSession>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let _turn_metadata_job_cancellation = TurnMetadataJobCancellation(turn_context.as_ref());
|
||||
|
||||
if input.is_empty() {
|
||||
return None;
|
||||
}
|
||||
@@ -4248,7 +4242,6 @@ pub(crate) async fn run_turn(
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
|
||||
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
|
||||
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
|
||||
// one instance across retries within this turn.
|
||||
let mut client_session =
|
||||
@@ -4300,6 +4293,10 @@ pub(crate) async fn run_turn(
|
||||
})
|
||||
.map(|user_message| user_message.message())
|
||||
.collect::<Vec<String>>();
|
||||
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
|
||||
TurnMetadataPoll::Ready(header) => header,
|
||||
TurnMetadataPoll::Pending => None,
|
||||
};
|
||||
match run_sampling_request(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
@@ -5269,6 +5266,7 @@ async fn try_run_sampling_request(
|
||||
&turn_context.otel_manager,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
Some(turn_context.sub_id.as_str()),
|
||||
turn_metadata_header,
|
||||
)
|
||||
.instrument(trace_span!("stream_request"))
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::protocol::WarningEvent;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::approx_token_count;
|
||||
use crate::truncate::truncate_text;
|
||||
use crate::turn_metadata::TurnMetadataPoll;
|
||||
use crate::util::backoff;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
@@ -85,7 +86,6 @@ async fn run_compact_task_inner(
|
||||
|
||||
let max_retries = turn_context.provider.stream_max_retries();
|
||||
let mut retries = 0;
|
||||
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
|
||||
let mut client_session = sess.services.model_client.new_session();
|
||||
// Reuse one client session so turn-scoped state (sticky routing, websocket append tracking)
|
||||
// survives retries within this compact turn.
|
||||
@@ -124,6 +124,10 @@ async fn run_compact_task_inner(
|
||||
personality: turn_context.personality,
|
||||
..Default::default()
|
||||
};
|
||||
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
|
||||
TurnMetadataPoll::Ready(header) => header,
|
||||
TurnMetadataPoll::Pending => None,
|
||||
};
|
||||
let attempt_result = drain_to_completed(
|
||||
&sess,
|
||||
turn_context.as_ref(),
|
||||
@@ -388,6 +392,7 @@ async fn drain_to_completed(
|
||||
&turn_context.otel_manager,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
Some(turn_context.sub_id.as_str()),
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -150,6 +150,18 @@ pub async fn get_head_commit_hash(cwd: &Path) -> Option<String> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return whether the repository has tracked or untracked modifications.
|
||||
///
|
||||
/// Uses `git status --porcelain` where empty output means the working tree is clean.
|
||||
pub async fn get_has_changes(cwd: &Path) -> Option<bool> {
|
||||
let output = run_git_command_with_timeout(&["status", "--porcelain"], cwd).await?;
|
||||
if !output.status.success() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(!output.stdout.is_empty())
|
||||
}
|
||||
|
||||
fn parse_git_remote_urls(stdout: &str) -> Option<BTreeMap<String, String>> {
|
||||
let mut remotes = BTreeMap::new();
|
||||
for line in stdout.lines() {
|
||||
@@ -254,7 +266,11 @@ pub async fn git_diff_to_remote(cwd: &Path) -> Option<GitDiffToRemote> {
|
||||
/// Run a git command with a timeout to prevent blocking on large repositories
|
||||
async fn run_git_command_with_timeout(args: &[&str], cwd: &Path) -> Option<std::process::Output> {
|
||||
let mut command = Command::new("git");
|
||||
command.args(args).current_dir(cwd).kill_on_drop(true);
|
||||
command
|
||||
.arg("--no-optional-locks")
|
||||
.args(args)
|
||||
.current_dir(cwd)
|
||||
.kill_on_drop(true);
|
||||
let result = timeout(GIT_COMMAND_TIMEOUT, command.output()).await;
|
||||
|
||||
match result {
|
||||
@@ -962,6 +978,43 @@ mod tests {
|
||||
assert_eq!(git_info.branch, Some("feature-branch".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_has_changes_clean_repo_returns_false() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp dir");
|
||||
let repo_path = create_test_git_repo(&temp_dir).await;
|
||||
|
||||
let has_changes = get_has_changes(&repo_path)
|
||||
.await
|
||||
.expect("git status should succeed");
|
||||
assert!(!has_changes);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_has_changes_returns_true_for_tracked_modifications() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp dir");
|
||||
let repo_path = create_test_git_repo(&temp_dir).await;
|
||||
|
||||
fs::write(repo_path.join("test.txt"), "modified tracked file").expect("write tracked file");
|
||||
|
||||
let has_changes = get_has_changes(&repo_path)
|
||||
.await
|
||||
.expect("git status should succeed");
|
||||
assert!(has_changes);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_has_changes_returns_true_for_untracked_files() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp dir");
|
||||
let repo_path = create_test_git_repo(&temp_dir).await;
|
||||
|
||||
fs::write(repo_path.join("new-file.txt"), "new untracked file").expect("write new file");
|
||||
|
||||
let has_changes = get_has_changes(&repo_path)
|
||||
.await
|
||||
.expect("git status should succeed");
|
||||
assert!(has_changes);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_git_working_tree_state_clean_repo() {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp dir");
|
||||
|
||||
@@ -141,6 +141,7 @@ pub use codex_shell_command::parse_command;
|
||||
pub use codex_shell_command::powershell;
|
||||
|
||||
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
|
||||
pub use client::X_CODEX_TURN_ID_HEADER;
|
||||
pub use client::X_CODEX_TURN_METADATA_HEADER;
|
||||
pub use exec_policy::ExecPolicyError;
|
||||
pub use exec_policy::check_execpolicy_for_warnings;
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::memories::phase_one;
|
||||
use crate::memories::prompts::build_stage_one_input_message;
|
||||
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use crate::rollout::policy::should_persist_response_item_for_memories;
|
||||
use crate::turn_metadata::TurnMetadataPoll;
|
||||
use codex_api::ResponseEvent;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
@@ -29,6 +30,7 @@ use tracing::warn;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::memories) struct Phase1RequestContext {
|
||||
pub(in crate::memories) turn_id: String,
|
||||
pub(in crate::memories) model_info: ModelInfo,
|
||||
pub(in crate::memories) otel_manager: OtelManager,
|
||||
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
|
||||
@@ -124,6 +126,7 @@ impl Phase1RequestContext {
|
||||
turn_metadata_header: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
model_info: turn_context.model_info.clone(),
|
||||
otel_manager: turn_context.otel_manager.clone(),
|
||||
reasoning_effort: turn_context.reasoning_effort,
|
||||
@@ -174,10 +177,11 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
|
||||
|
||||
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
|
||||
let turn_context = session.new_default_turn().await;
|
||||
Phase1RequestContext::from_turn_context(
|
||||
turn_context.as_ref(),
|
||||
turn_context.resolve_turn_metadata_header().await,
|
||||
)
|
||||
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
|
||||
TurnMetadataPoll::Ready(header) => header,
|
||||
TurnMetadataPoll::Pending => None,
|
||||
};
|
||||
Phase1RequestContext::from_turn_context(turn_context.as_ref(), turn_metadata_header)
|
||||
}
|
||||
|
||||
async fn run_jobs(
|
||||
@@ -283,6 +287,7 @@ mod job {
|
||||
&stage_one_context.otel_manager,
|
||||
stage_one_context.reasoning_effort,
|
||||
stage_one_context.reasoning_summary,
|
||||
Some(stage_one_context.turn_id.as_str()),
|
||||
stage_one_context.turn_metadata_header.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -45,7 +45,12 @@ impl RegularTask {
|
||||
let mut client_session = model_client.new_session();
|
||||
let turn_metadata_header = turn_metadata_header.await;
|
||||
match client_session
|
||||
.prewarm_websocket(&otel_manager, &model_info, turn_metadata_header.as_deref())
|
||||
.prewarm_websocket(
|
||||
&otel_manager,
|
||||
&model_info,
|
||||
None,
|
||||
turn_metadata_header.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => Some(client_session),
|
||||
|
||||
@@ -1,19 +1,25 @@
|
||||
//! Helpers for computing and resolving optional per-turn metadata headers.
|
||||
//! Helpers for computing optional per-turn metadata headers.
|
||||
//!
|
||||
//! This module owns both metadata construction and the shared timeout policy used by
|
||||
//! turn execution and startup websocket prewarm. Keeping timeout behavior centralized
|
||||
//! ensures both call sites treat timeout as the same best-effort fallback condition.
|
||||
//! startup websocket prewarm. Turn-time request attachment is handled via a non-blocking
|
||||
//! background job (`TurnMetadataHeaderJob`) so request send paths never await metadata.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::Serialize;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::git_info::get_git_remote_urls_assume_git_repo;
|
||||
use crate::git_info::get_git_repo_root;
|
||||
use crate::git_info::get_has_changes;
|
||||
use crate::git_info::get_head_commit_hash;
|
||||
|
||||
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
|
||||
@@ -22,8 +28,8 @@ pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(
|
||||
///
|
||||
/// On timeout, this logs a warning and returns the provided fallback header.
|
||||
///
|
||||
/// Keeping this helper centralized avoids drift between turn-time metadata resolution and startup
|
||||
/// websocket prewarm, both of which need identical timeout semantics.
|
||||
/// Keeping this helper centralized avoids drift between startup websocket prewarm and any other
|
||||
/// timeout-bounded one-shot metadata call sites.
|
||||
pub(crate) async fn resolve_turn_metadata_header_with_timeout<F>(
|
||||
build_header: F,
|
||||
fallback_on_timeout: Option<String>,
|
||||
@@ -48,6 +54,8 @@ struct TurnMetadataWorkspace {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
associated_remote_urls: Option<BTreeMap<String, String>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
has_changes: Option<bool>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
latest_git_commit_hash: Option<String>,
|
||||
}
|
||||
|
||||
@@ -62,11 +70,16 @@ struct TurnMetadata {
|
||||
pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Option<String> {
|
||||
let repo_root = get_git_repo_root(cwd);
|
||||
|
||||
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
|
||||
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
|
||||
get_head_commit_hash(cwd),
|
||||
get_git_remote_urls_assume_git_repo(cwd)
|
||||
get_git_remote_urls_assume_git_repo(cwd),
|
||||
get_has_changes(cwd)
|
||||
);
|
||||
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() && sandbox.is_none() {
|
||||
if latest_git_commit_hash.is_none()
|
||||
&& associated_remote_urls.is_none()
|
||||
&& has_changes.is_none()
|
||||
&& sandbox.is_none()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -76,6 +89,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
|
||||
repo_root.to_string_lossy().into_owned(),
|
||||
TurnMetadataWorkspace {
|
||||
associated_remote_urls,
|
||||
has_changes,
|
||||
latest_git_commit_hash,
|
||||
},
|
||||
);
|
||||
@@ -86,3 +100,93 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum TurnMetadataPoll {
|
||||
Pending,
|
||||
Ready(Option<String>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
enum TurnMetadataHeaderJobState {
|
||||
#[default]
|
||||
NotStarted,
|
||||
Pending {
|
||||
receiver: oneshot::Receiver<Option<String>>,
|
||||
task: JoinHandle<()>,
|
||||
},
|
||||
Ready(Option<String>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct TurnMetadataHeaderJob {
|
||||
state: Mutex<TurnMetadataHeaderJobState>,
|
||||
}
|
||||
|
||||
impl TurnMetadataHeaderJob {
|
||||
pub(crate) fn spawn(&self, cwd: PathBuf, sandbox: Option<String>) {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if !matches!(*state, TurnMetadataHeaderJobState::NotStarted) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(handle) = tokio::runtime::Handle::try_current() else {
|
||||
*state = TurnMetadataHeaderJobState::Ready(None);
|
||||
return;
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel::<Option<String>>();
|
||||
let task = handle.spawn(async move {
|
||||
let header = build_turn_metadata_header(cwd.as_path(), sandbox.as_deref()).await;
|
||||
let _ = tx.send(header);
|
||||
});
|
||||
*state = TurnMetadataHeaderJobState::Pending { receiver: rx, task };
|
||||
}
|
||||
|
||||
pub(crate) fn poll(&self) -> TurnMetadataPoll {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
match &mut *state {
|
||||
TurnMetadataHeaderJobState::NotStarted => TurnMetadataPoll::Pending,
|
||||
TurnMetadataHeaderJobState::Ready(header) => TurnMetadataPoll::Ready(header.clone()),
|
||||
TurnMetadataHeaderJobState::Pending { receiver, .. } => match receiver.try_recv() {
|
||||
Ok(header) => {
|
||||
*state = TurnMetadataHeaderJobState::Ready(header.clone());
|
||||
TurnMetadataPoll::Ready(header)
|
||||
}
|
||||
Err(TryRecvError::Empty) => TurnMetadataPoll::Pending,
|
||||
Err(TryRecvError::Closed) => {
|
||||
*state = TurnMetadataHeaderJobState::Ready(None);
|
||||
TurnMetadataPoll::Ready(None)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cancel(&self) {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if let TurnMetadataHeaderJobState::Pending { task, .. } = &mut *state {
|
||||
task.abort();
|
||||
}
|
||||
*state = TurnMetadataHeaderJobState::Ready(None);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TurnMetadataHeaderJob {
|
||||
fn drop(&mut self) {
|
||||
let Ok(state) = self.state.get_mut() else {
|
||||
return;
|
||||
};
|
||||
if let TurnMetadataHeaderJobState::Pending { task, .. } = state {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +111,15 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
}];
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
@@ -217,7 +225,15 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
}];
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
@@ -322,7 +338,15 @@ async fn responses_respects_model_info_overrides_from_config() {
|
||||
}];
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
@@ -357,31 +381,20 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_body = responses::sse(vec![
|
||||
let first_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_shell_command_call("call-1", "sleep 1"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let second_response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-1", "done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]);
|
||||
|
||||
let test = test_codex().build(&server).await.expect("build test codex");
|
||||
let cwd = test.cwd_path();
|
||||
|
||||
let first_request = responses::mount_sse_once(&server, response_body.clone()).await;
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("submit first turn prompt");
|
||||
let initial_header = first_request
|
||||
.single_request()
|
||||
.header("x-codex-turn-metadata")
|
||||
.expect("x-codex-turn-metadata header should be present");
|
||||
let initial_parsed: serde_json::Value =
|
||||
serde_json::from_str(&initial_header).expect("x-codex-turn-metadata should be valid JSON");
|
||||
assert_eq!(
|
||||
initial_parsed
|
||||
.get("sandbox")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some("none")
|
||||
);
|
||||
|
||||
let git_config_global = cwd.join("empty-git-config");
|
||||
std::fs::write(&git_config_global, "").expect("write empty git config");
|
||||
let run_git = |args: &[&str]| {
|
||||
@@ -423,57 +436,111 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
|
||||
.expect("git remote get-url output should be valid UTF-8")
|
||||
.trim()
|
||||
.to_string();
|
||||
let expected_repo_root = std::fs::canonicalize(cwd)
|
||||
.unwrap_or_else(|_| cwd.to_path_buf())
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
|
||||
loop {
|
||||
let request_recorder = responses::mount_sse_once(&server, response_body.clone()).await;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("submit post-git turn prompt");
|
||||
let clean_turn_recorder = responses::mount_response_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse_response(first_response.clone()),
|
||||
responses::sse_response(second_response.clone()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
test.submit_turn("run a shell command")
|
||||
.await
|
||||
.expect("submit clean turn prompt");
|
||||
|
||||
let maybe_metadata = request_recorder
|
||||
.single_request()
|
||||
.header("x-codex-turn-metadata")
|
||||
.and_then(|header_value| {
|
||||
let parsed: serde_json::Value = serde_json::from_str(&header_value).ok()?;
|
||||
let workspace = parsed
|
||||
.get("workspaces")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.and_then(|workspaces| workspaces.values().next())
|
||||
.cloned()?;
|
||||
Some((parsed, workspace))
|
||||
});
|
||||
let Some((parsed, workspace)) = maybe_metadata else {
|
||||
if tokio::time::Instant::now() >= deadline {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
|
||||
continue;
|
||||
};
|
||||
let clean_requests = clean_turn_recorder.requests();
|
||||
assert_eq!(clean_requests.len(), 2);
|
||||
let clean_turn_id_initial = clean_requests[0]
|
||||
.header("x-codex-turn-id")
|
||||
.expect("x-codex-turn-id should be present on initial request");
|
||||
let clean_turn_id_follow_up = clean_requests[1]
|
||||
.header("x-codex-turn-id")
|
||||
.expect("x-codex-turn-id should be present on follow-up request");
|
||||
assert_eq!(clean_turn_id_initial, clean_turn_id_follow_up);
|
||||
|
||||
assert_eq!(
|
||||
parsed.get("sandbox").and_then(serde_json::Value::as_str),
|
||||
Some("none")
|
||||
);
|
||||
assert_eq!(
|
||||
workspace
|
||||
.get("latest_git_commit_hash")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(expected_head.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
workspace
|
||||
.get("associated_remote_urls")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.and_then(|remotes| remotes.get("origin"))
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(expected_origin.as_str())
|
||||
);
|
||||
return;
|
||||
}
|
||||
let clean_metadata_header = clean_requests[1]
|
||||
.header("x-codex-turn-metadata")
|
||||
.expect("follow-up request should include x-codex-turn-metadata");
|
||||
let clean_parsed: serde_json::Value = serde_json::from_str(&clean_metadata_header)
|
||||
.expect("x-codex-turn-metadata should be valid JSON");
|
||||
let clean_workspace = clean_parsed
|
||||
.get("workspaces")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.and_then(|workspaces| {
|
||||
workspaces
|
||||
.get(&expected_repo_root)
|
||||
.or_else(|| workspaces.values().next())
|
||||
})
|
||||
.expect("metadata should include expected repository root");
|
||||
assert_eq!(
|
||||
clean_workspace
|
||||
.get("latest_git_commit_hash")
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(expected_head.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
clean_workspace
|
||||
.get("associated_remote_urls")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.and_then(|remotes| remotes.get("origin"))
|
||||
.and_then(serde_json::Value::as_str),
|
||||
Some(expected_origin.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
clean_workspace
|
||||
.get("has_changes")
|
||||
.and_then(serde_json::Value::as_bool),
|
||||
Some(false)
|
||||
);
|
||||
|
||||
panic!(
|
||||
"x-codex-turn-metadata with git workspace info was never observed within 5s after git setup"
|
||||
std::fs::write(cwd.join("untracked.txt"), "new file").expect("write untracked file");
|
||||
|
||||
let dirty_turn_recorder = responses::mount_response_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse_response(first_response),
|
||||
responses::sse_response(second_response),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
test.submit_turn("run a shell command")
|
||||
.await
|
||||
.expect("submit dirty turn prompt");
|
||||
|
||||
let dirty_requests = dirty_turn_recorder.requests();
|
||||
assert_eq!(dirty_requests.len(), 2);
|
||||
let dirty_turn_id_initial = dirty_requests[0]
|
||||
.header("x-codex-turn-id")
|
||||
.expect("x-codex-turn-id should be present on initial dirty request");
|
||||
let dirty_turn_id_follow_up = dirty_requests[1]
|
||||
.header("x-codex-turn-id")
|
||||
.expect("x-codex-turn-id should be present on follow-up dirty request");
|
||||
assert_eq!(dirty_turn_id_initial, dirty_turn_id_follow_up);
|
||||
assert_ne!(clean_turn_id_initial, dirty_turn_id_initial);
|
||||
|
||||
let dirty_metadata_header = dirty_requests[1]
|
||||
.header("x-codex-turn-metadata")
|
||||
.expect("dirty follow-up request should include x-codex-turn-metadata");
|
||||
let dirty_parsed: serde_json::Value = serde_json::from_str(&dirty_metadata_header)
|
||||
.expect("x-codex-turn-metadata should be valid JSON");
|
||||
let dirty_workspace = dirty_parsed
|
||||
.get("workspaces")
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.and_then(|workspaces| {
|
||||
workspaces
|
||||
.get(&expected_repo_root)
|
||||
.or_else(|| workspaces.values().next())
|
||||
})
|
||||
.expect("metadata should include expected repository root");
|
||||
assert_eq!(
|
||||
dirty_workspace
|
||||
.get("has_changes")
|
||||
.and_then(serde_json::Value::as_bool),
|
||||
Some(true)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1416,7 +1416,15 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
});
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("responses stream to start");
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ async fn responses_websocket_preconnect_reuses_connection() {
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
client_session
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
|
||||
.await
|
||||
.expect("websocket prewarm failed");
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
@@ -131,7 +131,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
client_session
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
|
||||
.await
|
||||
.expect("websocket prewarm failed");
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
@@ -143,6 +143,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
@@ -172,7 +173,7 @@ async fn responses_websocket_prewarm_uses_model_preference_when_feature_disabled
|
||||
let harness = websocket_harness_with_options(&server, false, false, false, true).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
client_session
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
|
||||
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
|
||||
.await
|
||||
.expect("websocket prewarm failed");
|
||||
|
||||
@@ -318,6 +319,7 @@ async fn responses_websocket_emits_reasoning_included_event() {
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
@@ -389,6 +391,7 @@ async fn responses_websocket_emits_rate_limit_events() {
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
@@ -840,6 +843,7 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
@@ -1080,6 +1084,7 @@ async fn stream_until_complete(
|
||||
harness.effort,
|
||||
harness.summary,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("websocket stream failed");
|
||||
|
||||
@@ -18,6 +18,7 @@ use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
const TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
const TURN_ID_HEADER: &str = "x-codex-turn-id";
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_turn_state_persists_within_turn_and_resets_after() -> Result<()> {
|
||||
@@ -65,6 +66,18 @@ async fn responses_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
);
|
||||
assert_eq!(requests[2].header(TURN_STATE_HEADER), None);
|
||||
|
||||
let first_turn_request_id = requests[0]
|
||||
.header(TURN_ID_HEADER)
|
||||
.expect("first request should include x-codex-turn-id");
|
||||
let first_turn_follow_up_id = requests[1]
|
||||
.header(TURN_ID_HEADER)
|
||||
.expect("follow-up request should include x-codex-turn-id");
|
||||
let second_turn_id = requests[2]
|
||||
.header(TURN_ID_HEADER)
|
||||
.expect("second turn request should include x-codex-turn-id");
|
||||
assert_eq!(first_turn_request_id, first_turn_follow_up_id);
|
||||
assert_ne!(first_turn_request_id, second_turn_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -120,6 +133,16 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
);
|
||||
assert_eq!(handshakes[2].header(TURN_STATE_HEADER), None);
|
||||
|
||||
// First handshake is startup preconnect; no turn id is available yet.
|
||||
assert_eq!(handshakes[0].header(TURN_ID_HEADER), None);
|
||||
let first_turn_follow_up_id = handshakes[1]
|
||||
.header(TURN_ID_HEADER)
|
||||
.expect("turn-time reconnect should include x-codex-turn-id");
|
||||
let second_turn_id = handshakes[2]
|
||||
.header(TURN_ID_HEADER)
|
||||
.expect("second turn handshake should include x-codex-turn-id");
|
||||
assert_ne!(first_turn_follow_up_id, second_turn_id);
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user