Compare commits

...

3 Commits

Author SHA1 Message Date
pash-openai
c300ec25b8 Merge branch 'main' into codex/pash/hybrid-turn-metadata-main 2026-02-12 16:31:13 -08:00
pash
29c79ac63b ci: rerun checks 2026-02-12 15:55:31 -08:00
pash
d049004391 turn metadata non-blocking + has_changes 2026-02-12 15:18:53 -08:00
12 changed files with 431 additions and 131 deletions

View File

@@ -101,6 +101,7 @@ use crate::tools::spec::create_tools_json_for_responses_api;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_ID_HEADER: &str = "x-codex-turn-id";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
@@ -393,9 +394,11 @@ impl ModelClient {
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_state: Option<Arc<OnceLock<String>>>,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
let headers =
self.build_websocket_headers(turn_state.as_ref(), turn_id_header, turn_metadata_header);
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(otel_manager);
ApiWebSocketResponsesClient::new(api_provider, api_auth)
.connect(
@@ -414,12 +417,15 @@ impl ModelClient {
fn build_websocket_headers(
&self,
turn_state: Option<&Arc<OnceLock<String>>>,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> ApiHeaderMap {
let turn_id_header = parse_turn_id_header(turn_id_header);
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let mut headers = build_responses_headers(
self.state.beta_features_header.as_deref(),
turn_state,
turn_id_header.as_ref(),
turn_metadata_header.as_ref(),
);
headers.extend(build_conversation_headers(Some(
@@ -523,9 +529,11 @@ impl ModelClientSession {
/// regardless of transport choice.
fn build_responses_options(
&self,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
compression: Compression,
) -> ApiResponsesOptions {
let turn_id_header = parse_turn_id_header(turn_id_header);
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let conversation_id = self.client.state.conversation_id.to_string();
@@ -535,6 +543,7 @@ impl ModelClientSession {
extra_headers: build_responses_headers(
self.client.state.beta_features_header.as_deref(),
Some(&self.turn_state),
turn_id_header.as_ref(),
turn_metadata_header.as_ref(),
),
compression,
@@ -630,6 +639,7 @@ impl ModelClientSession {
&mut self,
otel_manager: &OtelManager,
model_info: &ModelInfo,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> std::result::Result<(), ApiError> {
if !self.client.responses_websocket_enabled(model_info) || self.client.websockets_disabled()
@@ -653,6 +663,7 @@ impl ModelClientSession {
client_setup.api_provider,
client_setup.api_auth,
Some(Arc::clone(&self.turn_state)),
turn_id_header,
turn_metadata_header,
)
.await?;
@@ -666,6 +677,7 @@ impl ModelClientSession {
otel_manager: &OtelManager,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
options: &ApiResponsesOptions,
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
@@ -688,6 +700,7 @@ impl ModelClientSession {
api_provider,
api_auth,
Some(turn_state),
turn_id_header,
turn_metadata_header,
)
.await?;
@@ -722,6 +735,7 @@ impl ModelClientSession {
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
@@ -744,7 +758,8 @@ impl ModelClientSession {
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
let options =
self.build_responses_options(turn_id_header, turn_metadata_header, compression);
let request = self.build_responses_request(
&client_setup.api_provider,
@@ -786,6 +801,7 @@ impl ModelClientSession {
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> Result<WebsocketStreamOutcome> {
let auth_manager = self.client.state.auth_manager.clone();
@@ -797,7 +813,8 @@ impl ModelClientSession {
let client_setup = self.client.current_client_setup().await?;
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
let options =
self.build_responses_options(turn_id_header, turn_metadata_header, compression);
let request = self.build_responses_request(
&client_setup.api_provider,
prompt,
@@ -812,6 +829,7 @@ impl ModelClientSession {
otel_manager,
client_setup.api_provider,
client_setup.api_auth,
turn_id_header,
turn_metadata_header,
&options,
)
@@ -885,6 +903,7 @@ impl ModelClientSession {
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
turn_id_header: Option<&str>,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.wire_api;
@@ -901,6 +920,7 @@ impl ModelClientSession {
otel_manager,
effort,
summary,
turn_id_header,
turn_metadata_header,
)
.await?
@@ -918,6 +938,7 @@ impl ModelClientSession {
otel_manager,
effort,
summary,
turn_id_header,
turn_metadata_header,
)
.await
@@ -954,10 +975,10 @@ impl ModelClientSession {
}
}
/// Parses per-turn metadata into an HTTP header value.
///
/// Invalid values are treated as absent so callers can compare and propagate
/// metadata with the same sanitization path used when constructing headers.
fn parse_turn_id_header(turn_id_header: Option<&str>) -> Option<HeaderValue> {
turn_id_header.and_then(|value| HeaderValue::from_str(value).ok())
}
fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<HeaderValue> {
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
}
@@ -968,10 +989,12 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
///
/// - `x-codex-beta-features`: comma-separated beta feature keys enabled for the session.
/// - `x-codex-turn-state`: sticky routing token captured earlier in the turn.
/// - `x-codex-turn-id`: current turn id (`sub_id`) when available for this request.
/// - `x-codex-turn-metadata`: optional per-turn metadata for observability.
fn build_responses_headers(
beta_features_header: Option<&str>,
turn_state: Option<&Arc<OnceLock<String>>>,
turn_id_header: Option<&HeaderValue>,
turn_metadata_header: Option<&HeaderValue>,
) -> ApiHeaderMap {
let mut headers = ApiHeaderMap::new();
@@ -987,6 +1010,9 @@ fn build_responses_headers(
{
headers.insert(X_CODEX_TURN_STATE_HEADER, header_value);
}
if let Some(turn_id_header) = turn_id_header {
headers.insert(X_CODEX_TURN_ID_HEADER, turn_id_header.clone());
}
if let Some(header_value) = turn_metadata_header {
headers.insert(X_CODEX_TURN_METADATA_HEADER, header_value.clone());
}

View File

@@ -37,6 +37,8 @@ use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::terminal;
use crate::truncate::TruncationPolicy;
use crate::turn_metadata::TurnMetadataHeaderJob;
use crate::turn_metadata::TurnMetadataPoll;
use crate::turn_metadata::build_turn_metadata_header;
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
use crate::util::error_or_panic;
@@ -90,7 +92,6 @@ use rmcp::model::RequestId;
use serde_json;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio::sync::watch;
@@ -102,7 +103,6 @@ use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace;
use tracing::trace_span;
use tracing::warn;
use uuid::Uuid;
@@ -559,7 +559,7 @@ pub(crate) struct TurnContext {
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) js_repl: Arc<JsReplHandle>,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
turn_metadata_header: OnceCell<Option<String>>,
turn_metadata_job: Arc<TurnMetadataHeaderJob>,
}
impl TurnContext {
pub(crate) fn model_context_window(&self) -> Option<i64> {
@@ -639,7 +639,7 @@ impl TurnContext {
truncation_policy,
js_repl: Arc::clone(&self.js_repl),
dynamic_tools: self.dynamic_tools.clone(),
turn_metadata_header: self.turn_metadata_header.clone(),
turn_metadata_job: self.turn_metadata_job.clone(),
}
}
@@ -655,39 +655,23 @@ impl TurnContext {
.unwrap_or(compact::SUMMARIZATION_PROMPT)
}
async fn build_turn_metadata_header(&self) -> Option<String> {
let sandbox = sandbox_tag(&self.sandbox_policy, self.windows_sandbox_level);
self.turn_metadata_header
.get_or_init(|| async {
build_turn_metadata_header(self.cwd.as_path(), Some(sandbox)).await
})
.await
.clone()
}
/// Resolves the per-turn metadata header under a shared timeout policy.
///
/// This uses the same timeout helper as websocket startup prewarm so both turn execution and
/// background prewarm observe identical "timeout means best-effort fallback" behavior.
pub async fn resolve_turn_metadata_header(&self) -> Option<String> {
resolve_turn_metadata_header_with_timeout(
self.build_turn_metadata_header(),
self.turn_metadata_header.get().cloned().flatten(),
)
.await
/// Returns turn metadata if ready, without waiting on metadata computation.
pub(crate) fn poll_turn_metadata_header(&self) -> TurnMetadataPoll {
self.turn_metadata_job.poll()
}
/// Starts best-effort background computation of turn metadata.
///
/// This warms the cached value used by [`TurnContext::resolve_turn_metadata_header`] so turns
/// and websocket prewarm are less likely to pay metadata construction latency on demand.
/// Requests do not await this task. Callers should poll via
/// [`TurnContext::poll_turn_metadata_header`] and attach metadata only when ready.
pub fn spawn_turn_metadata_header_task(self: &Arc<Self>) {
let context = Arc::clone(self);
tokio::spawn(async move {
trace!("Spawning turn metadata calculation task");
context.build_turn_metadata_header().await;
trace!("Turn metadata calculation task completed");
});
let sandbox = sandbox_tag(&self.sandbox_policy, self.windows_sandbox_level).to_string();
self.turn_metadata_job
.spawn(self.cwd.clone(), Some(sandbox));
}
pub(crate) fn cancel_turn_metadata_header_task(&self) {
self.turn_metadata_job.cancel();
}
}
@@ -944,7 +928,7 @@ impl Session {
truncation_policy: model_info.truncation_policy.into(),
js_repl,
dynamic_tools: session_configuration.dynamic_tools.clone(),
turn_metadata_header: OnceCell::new(),
turn_metadata_job: Arc::new(TurnMetadataHeaderJob::default()),
}
}
@@ -4028,7 +4012,7 @@ async fn spawn_review_thread(
js_repl: Arc::clone(&sess.js_repl),
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
truncation_policy: model_info.truncation_policy.into(),
turn_metadata_header: parent_turn_context.turn_metadata_header.clone(),
turn_metadata_job: Arc::new(TurnMetadataHeaderJob::default()),
};
// Seed the child task with the review prompt as the initial user message.
@@ -4038,6 +4022,7 @@ async fn spawn_review_thread(
text_elements: Vec::new(),
}];
let tc = Arc::new(review_turn_context);
tc.spawn_turn_metadata_header_task();
sess.spawn_task(tc.clone(), input, ReviewTask::new()).await;
// Announce entering review mode so UIs can switch modes.
@@ -4116,7 +4101,14 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
/// back to the model in the next sampling request.
/// - If the model sends only an assistant message, we record it in the
/// conversation history and consider the turn complete.
///
struct TurnMetadataJobCancellation<'a>(&'a TurnContext);
impl Drop for TurnMetadataJobCancellation<'_> {
fn drop(&mut self) {
self.0.cancel_turn_metadata_header_task();
}
}
pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -4124,6 +4116,8 @@ pub(crate) async fn run_turn(
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
let _turn_metadata_job_cancellation = TurnMetadataJobCancellation(turn_context.as_ref());
if input.is_empty() {
return None;
}
@@ -4248,7 +4242,6 @@ pub(crate) async fn run_turn(
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
// one instance across retries within this turn.
let mut client_session =
@@ -4300,6 +4293,10 @@ pub(crate) async fn run_turn(
})
.map(|user_message| user_message.message())
.collect::<Vec<String>>();
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
TurnMetadataPoll::Ready(header) => header,
TurnMetadataPoll::Pending => None,
};
match run_sampling_request(
Arc::clone(&sess),
Arc::clone(&turn_context),
@@ -5269,6 +5266,7 @@ async fn try_run_sampling_request(
&turn_context.otel_manager,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
Some(turn_context.sub_id.as_str()),
turn_metadata_header,
)
.instrument(trace_span!("stream_request"))

View File

@@ -17,6 +17,7 @@ use crate::protocol::WarningEvent;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::turn_metadata::TurnMetadataPoll;
use crate::util::backoff;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
@@ -85,7 +86,6 @@ async fn run_compact_task_inner(
let max_retries = turn_context.provider.stream_max_retries();
let mut retries = 0;
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket append tracking)
// survives retries within this compact turn.
@@ -124,6 +124,10 @@ async fn run_compact_task_inner(
personality: turn_context.personality,
..Default::default()
};
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
TurnMetadataPoll::Ready(header) => header,
TurnMetadataPoll::Pending => None,
};
let attempt_result = drain_to_completed(
&sess,
turn_context.as_ref(),
@@ -388,6 +392,7 @@ async fn drain_to_completed(
&turn_context.otel_manager,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
Some(turn_context.sub_id.as_str()),
turn_metadata_header,
)
.await?;

View File

@@ -150,6 +150,18 @@ pub async fn get_head_commit_hash(cwd: &Path) -> Option<String> {
}
}
/// Return whether the repository has tracked or untracked modifications.
///
/// Uses `git status --porcelain` where empty output means the working tree is clean.
pub async fn get_has_changes(cwd: &Path) -> Option<bool> {
let output = run_git_command_with_timeout(&["status", "--porcelain"], cwd).await?;
if !output.status.success() {
return None;
}
Some(!output.stdout.is_empty())
}
fn parse_git_remote_urls(stdout: &str) -> Option<BTreeMap<String, String>> {
let mut remotes = BTreeMap::new();
for line in stdout.lines() {
@@ -254,7 +266,11 @@ pub async fn git_diff_to_remote(cwd: &Path) -> Option<GitDiffToRemote> {
/// Run a git command with a timeout to prevent blocking on large repositories
async fn run_git_command_with_timeout(args: &[&str], cwd: &Path) -> Option<std::process::Output> {
let mut command = Command::new("git");
command.args(args).current_dir(cwd).kill_on_drop(true);
command
.arg("--no-optional-locks")
.args(args)
.current_dir(cwd)
.kill_on_drop(true);
let result = timeout(GIT_COMMAND_TIMEOUT, command.output()).await;
match result {
@@ -962,6 +978,43 @@ mod tests {
assert_eq!(git_info.branch, Some("feature-branch".to_string()));
}
#[tokio::test]
async fn test_get_has_changes_clean_repo_returns_false() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
let has_changes = get_has_changes(&repo_path)
.await
.expect("git status should succeed");
assert!(!has_changes);
}
#[tokio::test]
async fn test_get_has_changes_returns_true_for_tracked_modifications() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
fs::write(repo_path.join("test.txt"), "modified tracked file").expect("write tracked file");
let has_changes = get_has_changes(&repo_path)
.await
.expect("git status should succeed");
assert!(has_changes);
}
#[tokio::test]
async fn test_get_has_changes_returns_true_for_untracked_files() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
fs::write(repo_path.join("new-file.txt"), "new untracked file").expect("write new file");
let has_changes = get_has_changes(&repo_path)
.await
.expect("git status should succeed");
assert!(has_changes);
}
#[tokio::test]
async fn test_get_git_working_tree_state_clean_repo() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");

View File

@@ -141,6 +141,7 @@ pub use codex_shell_command::parse_command;
pub use codex_shell_command::powershell;
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
pub use client::X_CODEX_TURN_ID_HEADER;
pub use client::X_CODEX_TURN_METADATA_HEADER;
pub use exec_policy::ExecPolicyError;
pub use exec_policy::check_execpolicy_for_warnings;

View File

@@ -8,6 +8,7 @@ use crate::memories::phase_one;
use crate::memories::prompts::build_stage_one_input_message;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::rollout::policy::should_persist_response_item_for_memories;
use crate::turn_metadata::TurnMetadataPoll;
use codex_api::ResponseEvent;
use codex_otel::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@@ -29,6 +30,7 @@ use tracing::warn;
#[derive(Clone, Debug)]
pub(in crate::memories) struct Phase1RequestContext {
pub(in crate::memories) turn_id: String,
pub(in crate::memories) model_info: ModelInfo,
pub(in crate::memories) otel_manager: OtelManager,
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
@@ -124,6 +126,7 @@ impl Phase1RequestContext {
turn_metadata_header: Option<String>,
) -> Self {
Self {
turn_id: turn_context.sub_id.clone(),
model_info: turn_context.model_info.clone(),
otel_manager: turn_context.otel_manager.clone(),
reasoning_effort: turn_context.reasoning_effort,
@@ -174,10 +177,11 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
let turn_context = session.new_default_turn().await;
Phase1RequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
)
let turn_metadata_header = match turn_context.poll_turn_metadata_header() {
TurnMetadataPoll::Ready(header) => header,
TurnMetadataPoll::Pending => None,
};
Phase1RequestContext::from_turn_context(turn_context.as_ref(), turn_metadata_header)
}
async fn run_jobs(
@@ -283,6 +287,7 @@ mod job {
&stage_one_context.otel_manager,
stage_one_context.reasoning_effort,
stage_one_context.reasoning_summary,
Some(stage_one_context.turn_id.as_str()),
stage_one_context.turn_metadata_header.as_deref(),
)
.await?;

View File

@@ -45,7 +45,12 @@ impl RegularTask {
let mut client_session = model_client.new_session();
let turn_metadata_header = turn_metadata_header.await;
match client_session
.prewarm_websocket(&otel_manager, &model_info, turn_metadata_header.as_deref())
.prewarm_websocket(
&otel_manager,
&model_info,
None,
turn_metadata_header.as_deref(),
)
.await
{
Ok(()) => Some(client_session),

View File

@@ -1,19 +1,25 @@
//! Helpers for computing and resolving optional per-turn metadata headers.
//! Helpers for computing optional per-turn metadata headers.
//!
//! This module owns both metadata construction and the shared timeout policy used by
//! turn execution and startup websocket prewarm. Keeping timeout behavior centralized
//! ensures both call sites treat timeout as the same best-effort fallback condition.
//! startup websocket prewarm. Turn-time request attachment is handled via a non-blocking
//! background job (`TurnMetadataHeaderJob`) so request send paths never await metadata.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::Duration;
use serde::Serialize;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tracing::warn;
use crate::git_info::get_git_remote_urls_assume_git_repo;
use crate::git_info::get_git_repo_root;
use crate::git_info::get_has_changes;
use crate::git_info::get_head_commit_hash;
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
@@ -22,8 +28,8 @@ pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(
///
/// On timeout, this logs a warning and returns the provided fallback header.
///
/// Keeping this helper centralized avoids drift between turn-time metadata resolution and startup
/// websocket prewarm, both of which need identical timeout semantics.
/// Keeping this helper centralized avoids drift between startup websocket prewarm and any other
/// timeout-bounded one-shot metadata call sites.
pub(crate) async fn resolve_turn_metadata_header_with_timeout<F>(
build_header: F,
fallback_on_timeout: Option<String>,
@@ -48,6 +54,8 @@ struct TurnMetadataWorkspace {
#[serde(default, skip_serializing_if = "Option::is_none")]
associated_remote_urls: Option<BTreeMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
has_changes: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
latest_git_commit_hash: Option<String>,
}
@@ -62,11 +70,16 @@ struct TurnMetadata {
pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Option<String> {
let repo_root = get_git_repo_root(cwd);
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
get_head_commit_hash(cwd),
get_git_remote_urls_assume_git_repo(cwd)
get_git_remote_urls_assume_git_repo(cwd),
get_has_changes(cwd)
);
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() && sandbox.is_none() {
if latest_git_commit_hash.is_none()
&& associated_remote_urls.is_none()
&& has_changes.is_none()
&& sandbox.is_none()
{
return None;
}
@@ -76,6 +89,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
repo_root.to_string_lossy().into_owned(),
TurnMetadataWorkspace {
associated_remote_urls,
has_changes,
latest_git_commit_hash,
},
);
@@ -86,3 +100,93 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
})
.ok()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum TurnMetadataPoll {
Pending,
Ready(Option<String>),
}
#[derive(Debug, Default)]
enum TurnMetadataHeaderJobState {
#[default]
NotStarted,
Pending {
receiver: oneshot::Receiver<Option<String>>,
task: JoinHandle<()>,
},
Ready(Option<String>),
}
#[derive(Debug, Default)]
pub(crate) struct TurnMetadataHeaderJob {
state: Mutex<TurnMetadataHeaderJobState>,
}
impl TurnMetadataHeaderJob {
pub(crate) fn spawn(&self, cwd: PathBuf, sandbox: Option<String>) {
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if !matches!(*state, TurnMetadataHeaderJobState::NotStarted) {
return;
}
let Ok(handle) = tokio::runtime::Handle::try_current() else {
*state = TurnMetadataHeaderJobState::Ready(None);
return;
};
let (tx, rx) = oneshot::channel::<Option<String>>();
let task = handle.spawn(async move {
let header = build_turn_metadata_header(cwd.as_path(), sandbox.as_deref()).await;
let _ = tx.send(header);
});
*state = TurnMetadataHeaderJobState::Pending { receiver: rx, task };
}
pub(crate) fn poll(&self) -> TurnMetadataPoll {
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match &mut *state {
TurnMetadataHeaderJobState::NotStarted => TurnMetadataPoll::Pending,
TurnMetadataHeaderJobState::Ready(header) => TurnMetadataPoll::Ready(header.clone()),
TurnMetadataHeaderJobState::Pending { receiver, .. } => match receiver.try_recv() {
Ok(header) => {
*state = TurnMetadataHeaderJobState::Ready(header.clone());
TurnMetadataPoll::Ready(header)
}
Err(TryRecvError::Empty) => TurnMetadataPoll::Pending,
Err(TryRecvError::Closed) => {
*state = TurnMetadataHeaderJobState::Ready(None);
TurnMetadataPoll::Ready(None)
}
},
}
}
pub(crate) fn cancel(&self) {
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let TurnMetadataHeaderJobState::Pending { task, .. } = &mut *state {
task.abort();
}
*state = TurnMetadataHeaderJobState::Ready(None);
}
}
impl Drop for TurnMetadataHeaderJob {
fn drop(&mut self) {
let Ok(state) = self.state.get_mut() else {
return;
};
if let TurnMetadataHeaderJobState::Pending { task, .. } = state {
task.abort();
}
}
}

View File

@@ -111,7 +111,15 @@ async fn responses_stream_includes_subagent_header_on_review() {
}];
let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
None,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
@@ -217,7 +225,15 @@ async fn responses_stream_includes_subagent_header_on_other() {
}];
let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
None,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
@@ -322,7 +338,15 @@ async fn responses_respects_model_info_overrides_from_config() {
}];
let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
None,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
@@ -357,31 +381,20 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
core_test_support::skip_if_no_network!();
let server = responses::start_mock_server().await;
let response_body = responses::sse(vec![
let first_response = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_shell_command_call("call-1", "sleep 1"),
responses::ev_completed("resp-1"),
]);
let second_response = responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-1", "done"),
responses::ev_completed("resp-2"),
]);
let test = test_codex().build(&server).await.expect("build test codex");
let cwd = test.cwd_path();
let first_request = responses::mount_sse_once(&server, response_body.clone()).await;
test.submit_turn("hello")
.await
.expect("submit first turn prompt");
let initial_header = first_request
.single_request()
.header("x-codex-turn-metadata")
.expect("x-codex-turn-metadata header should be present");
let initial_parsed: serde_json::Value =
serde_json::from_str(&initial_header).expect("x-codex-turn-metadata should be valid JSON");
assert_eq!(
initial_parsed
.get("sandbox")
.and_then(serde_json::Value::as_str),
Some("none")
);
let git_config_global = cwd.join("empty-git-config");
std::fs::write(&git_config_global, "").expect("write empty git config");
let run_git = |args: &[&str]| {
@@ -423,57 +436,111 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
.expect("git remote get-url output should be valid UTF-8")
.trim()
.to_string();
let expected_repo_root = std::fs::canonicalize(cwd)
.unwrap_or_else(|_| cwd.to_path_buf())
.to_string_lossy()
.into_owned();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
let request_recorder = responses::mount_sse_once(&server, response_body.clone()).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
test.submit_turn("hello")
.await
.expect("submit post-git turn prompt");
let clean_turn_recorder = responses::mount_response_sequence(
&server,
vec![
responses::sse_response(first_response.clone()),
responses::sse_response(second_response.clone()),
],
)
.await;
test.submit_turn("run a shell command")
.await
.expect("submit clean turn prompt");
let maybe_metadata = request_recorder
.single_request()
.header("x-codex-turn-metadata")
.and_then(|header_value| {
let parsed: serde_json::Value = serde_json::from_str(&header_value).ok()?;
let workspace = parsed
.get("workspaces")
.and_then(serde_json::Value::as_object)
.and_then(|workspaces| workspaces.values().next())
.cloned()?;
Some((parsed, workspace))
});
let Some((parsed, workspace)) = maybe_metadata else {
if tokio::time::Instant::now() >= deadline {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
continue;
};
let clean_requests = clean_turn_recorder.requests();
assert_eq!(clean_requests.len(), 2);
let clean_turn_id_initial = clean_requests[0]
.header("x-codex-turn-id")
.expect("x-codex-turn-id should be present on initial request");
let clean_turn_id_follow_up = clean_requests[1]
.header("x-codex-turn-id")
.expect("x-codex-turn-id should be present on follow-up request");
assert_eq!(clean_turn_id_initial, clean_turn_id_follow_up);
assert_eq!(
parsed.get("sandbox").and_then(serde_json::Value::as_str),
Some("none")
);
assert_eq!(
workspace
.get("latest_git_commit_hash")
.and_then(serde_json::Value::as_str),
Some(expected_head.as_str())
);
assert_eq!(
workspace
.get("associated_remote_urls")
.and_then(serde_json::Value::as_object)
.and_then(|remotes| remotes.get("origin"))
.and_then(serde_json::Value::as_str),
Some(expected_origin.as_str())
);
return;
}
let clean_metadata_header = clean_requests[1]
.header("x-codex-turn-metadata")
.expect("follow-up request should include x-codex-turn-metadata");
let clean_parsed: serde_json::Value = serde_json::from_str(&clean_metadata_header)
.expect("x-codex-turn-metadata should be valid JSON");
let clean_workspace = clean_parsed
.get("workspaces")
.and_then(serde_json::Value::as_object)
.and_then(|workspaces| {
workspaces
.get(&expected_repo_root)
.or_else(|| workspaces.values().next())
})
.expect("metadata should include expected repository root");
assert_eq!(
clean_workspace
.get("latest_git_commit_hash")
.and_then(serde_json::Value::as_str),
Some(expected_head.as_str())
);
assert_eq!(
clean_workspace
.get("associated_remote_urls")
.and_then(serde_json::Value::as_object)
.and_then(|remotes| remotes.get("origin"))
.and_then(serde_json::Value::as_str),
Some(expected_origin.as_str())
);
assert_eq!(
clean_workspace
.get("has_changes")
.and_then(serde_json::Value::as_bool),
Some(false)
);
panic!(
"x-codex-turn-metadata with git workspace info was never observed within 5s after git setup"
std::fs::write(cwd.join("untracked.txt"), "new file").expect("write untracked file");
let dirty_turn_recorder = responses::mount_response_sequence(
&server,
vec![
responses::sse_response(first_response),
responses::sse_response(second_response),
],
)
.await;
test.submit_turn("run a shell command")
.await
.expect("submit dirty turn prompt");
let dirty_requests = dirty_turn_recorder.requests();
assert_eq!(dirty_requests.len(), 2);
let dirty_turn_id_initial = dirty_requests[0]
.header("x-codex-turn-id")
.expect("x-codex-turn-id should be present on initial dirty request");
let dirty_turn_id_follow_up = dirty_requests[1]
.header("x-codex-turn-id")
.expect("x-codex-turn-id should be present on follow-up dirty request");
assert_eq!(dirty_turn_id_initial, dirty_turn_id_follow_up);
assert_ne!(clean_turn_id_initial, dirty_turn_id_initial);
let dirty_metadata_header = dirty_requests[1]
.header("x-codex-turn-metadata")
.expect("dirty follow-up request should include x-codex-turn-metadata");
let dirty_parsed: serde_json::Value = serde_json::from_str(&dirty_metadata_header)
.expect("x-codex-turn-metadata should be valid JSON");
let dirty_workspace = dirty_parsed
.get("workspaces")
.and_then(serde_json::Value::as_object)
.and_then(|workspaces| {
workspaces
.get(&expected_repo_root)
.or_else(|| workspaces.values().next())
})
.expect("metadata should include expected repository root");
assert_eq!(
dirty_workspace
.get("has_changes")
.and_then(serde_json::Value::as_bool),
Some(true)
);
}

View File

@@ -1416,7 +1416,15 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
});
let mut stream = client_session
.stream(&prompt, &model_info, &otel_manager, effort, summary, None)
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
None,
None,
)
.await
.expect("responses stream to start");

View File

@@ -106,7 +106,7 @@ async fn responses_websocket_preconnect_reuses_connection() {
let harness = websocket_harness(&server).await;
let mut client_session = harness.client.new_session();
client_session
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
.await
.expect("websocket prewarm failed");
let prompt = prompt_with_input(vec![message_item("hello")]);
@@ -131,7 +131,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
let harness = websocket_harness(&server).await;
let mut client_session = harness.client.new_session();
client_session
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
.await
.expect("websocket prewarm failed");
let prompt = prompt_with_input(vec![message_item("hello")]);
@@ -143,6 +143,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");
@@ -172,7 +173,7 @@ async fn responses_websocket_prewarm_uses_model_preference_when_feature_disabled
let harness = websocket_harness_with_options(&server, false, false, false, true).await;
let mut client_session = harness.client.new_session();
client_session
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None)
.prewarm_websocket(&harness.otel_manager, &harness.model_info, None, None)
.await
.expect("websocket prewarm failed");
@@ -318,6 +319,7 @@ async fn responses_websocket_emits_reasoning_included_event() {
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");
@@ -389,6 +391,7 @@ async fn responses_websocket_emits_rate_limit_events() {
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");
@@ -840,6 +843,7 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");
@@ -1080,6 +1084,7 @@ async fn stream_until_complete(
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");

View File

@@ -18,6 +18,7 @@ use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
const TURN_STATE_HEADER: &str = "x-codex-turn-state";
const TURN_ID_HEADER: &str = "x-codex-turn-id";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_turn_state_persists_within_turn_and_resets_after() -> Result<()> {
@@ -65,6 +66,18 @@ async fn responses_turn_state_persists_within_turn_and_resets_after() -> Result<
);
assert_eq!(requests[2].header(TURN_STATE_HEADER), None);
let first_turn_request_id = requests[0]
.header(TURN_ID_HEADER)
.expect("first request should include x-codex-turn-id");
let first_turn_follow_up_id = requests[1]
.header(TURN_ID_HEADER)
.expect("follow-up request should include x-codex-turn-id");
let second_turn_id = requests[2]
.header(TURN_ID_HEADER)
.expect("second turn request should include x-codex-turn-id");
assert_eq!(first_turn_request_id, first_turn_follow_up_id);
assert_ne!(first_turn_request_id, second_turn_id);
Ok(())
}
@@ -120,6 +133,16 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
);
assert_eq!(handshakes[2].header(TURN_STATE_HEADER), None);
// First handshake is startup preconnect; no turn id is available yet.
assert_eq!(handshakes[0].header(TURN_ID_HEADER), None);
let first_turn_follow_up_id = handshakes[1]
.header(TURN_ID_HEADER)
.expect("turn-time reconnect should include x-codex-turn-id");
let second_turn_id = handshakes[2]
.header(TURN_ID_HEADER)
.expect("second turn handshake should include x-codex-turn-id");
assert_ne!(first_turn_follow_up_id, second_turn_id);
server.shutdown().await;
Ok(())
}