Compare commits

...

2 Commits

Author SHA1 Message Date
Colin Young
d692b74007 Add auth 401 observability to client bug reports (#14611)
CXC-392

  [With
  401](https://openai.sentry.io/issues/7333870443/?project=4510195390611458&query=019ce8f8-560c-7f10-a00a-c59553740674&referrer=issue-stream)
  <img width="1909" height="555" alt="401 auth tags in Sentry"
  src="https://github.com/user-attachments/assets/412ea950-61c4-4780-9697-15c270971ee3"
  />


  - auth_401_*: preserved facts from the latest unauthorized response snapshot
  - auth_*: latest auth-related facts from the latest request attempt
  - auth_recovery_*: unauthorized recovery state and follow-up result


  Without 401
  <img width="1917" height="522" alt="happy-path auth tags in Sentry"
  src="https://github.com/user-attachments/assets/3381ed28-8022-43b0-b6c0-623a630e679f"
  />

  ###### Summary
  - Add client-visible 401 diagnostics for auth attachment, upstream auth classification, and 401 request id / cf-ray correlation.
  - Record unauthorized recovery mode, phase, outcome, and retry/follow-up status without changing auth behavior.
  - Surface the highest-signal auth and recovery fields on uploaded client bug reports so they are usable in Sentry.
  - Preserve original unauthorized evidence under `auth_401_*` while keeping follow-up result tags separate.

  ###### Rationale (from spec findings)
  - The dominant bucket needed proof of whether the client attached auth before send or upstream still classified the request as missing auth.
  - Client uploads needed to show whether unauthorized recovery ran and what the client tried next.
  - Request id and cf-ray needed to be preserved on the unauthorized response so server-side correlation is immediate.
  - The bug-report path needed the same auth evidence as the request telemetry path, otherwise the observability would not be operationally useful.

  ###### Scope
  - Add auth 401 and unauthorized-recovery observability in `codex-rs/core`, `codex-rs/codex-api`, and `codex-rs/otel`, including feedback-tag surfacing.
  - Keep auth semantics, refresh behavior, retry behavior, endpoint classification, and geo-denial follow-up work out of this PR.

  ###### Trade-offs
  - This exports only safe auth evidence: header presence/name, upstream auth classification, request ids, and recovery state. It does not export token values or raw upstream bodies.
  - This keeps websocket connection reuse as a transport clue because it can help distinguish stale reused sessions from fresh reconnects.
  - Misroute/base-url classification and geo-denial are intentionally deferred to a separate follow-up PR so this review stays focused on the dominant auth 401 bucket.

  ###### Client follow-up
  - PR 2 will add misroute/provider and geo-denial observability plus the matching feedback-tag surfacing.
  - A separate host/app-server PR should log auth-decision inputs so pre-send host auth state can be correlated with client request evidence.
  - `device_id` remains intentionally separate until there is a safe existing source on the feedback upload path.

  ###### Testing
  - `cargo test -p codex-core refresh_available_models_sorts_by_priority`
  - `cargo test -p codex-core emit_feedback_request_tags_`
  - `cargo test -p codex-core emit_feedback_auth_recovery_tags_`
  - `cargo test -p codex-core auth_request_telemetry_context_tracks_attached_auth_and_retry_phase`
  - `cargo test -p codex-core extract_response_debug_context_decodes_identity_headers`
  - `cargo test -p codex-core identity_auth_details`
  - `cargo test -p codex-core telemetry_error_messages_preserve_non_http_details`
  - `cargo test -p codex-core --all-features --no-run`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_api_request_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_connect_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_request_transport_observability`
2026-03-14 15:38:51 -07:00
viyatb-oai
9060dc7557 fix: fix symlinked writable roots in sandbox policies (#14674)
## Summary
- normalize effective readable, writable, and unreadable sandbox roots
after resolving special paths so symlinked roots use canonical runtime
paths
- add a protocol regression test for a symlinked writable root with a
denied child and update protocol expectations to canonicalized effective
paths
- update macOS seatbelt tests to assert against effective normalized
roots produced by the shared policy helpers

## Testing
- just fmt
- cargo test -p codex-protocol
- cargo test -p codex-core explicit_unreadable_paths_are_excluded_
- cargo clippy -p codex-protocol -p codex-core --tests -- -D warnings

## Notes
- This is intended to fix the symlinked TMPDIR bind failure in
bubblewrap described in #14672.
Fixes #14672
2026-03-14 13:24:43 -07:00
22 changed files with 2512 additions and 110 deletions

View File

@@ -399,7 +399,7 @@ impl CloudRequirementsService {
"Cloud requirements request was unauthorized; attempting auth recovery"
);
match auth_recovery.next().await {
Ok(()) => {
Ok(_) => {
let Some(refreshed_auth) = self.auth_manager.auth().await else {
tracing::error!(
"Auth recovery succeeded but no auth is available for cloud requirements"

View File

@@ -214,6 +214,7 @@ impl ResponsesWebsocketConnection {
pub async fn stream_request(
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
@@ -258,6 +259,7 @@ impl ResponsesWebsocketConnection {
request_body,
idle_timeout,
telemetry,
connection_reused,
)
.await
};
@@ -534,6 +536,7 @@ async fn run_websocket_response_stream(
request_body: Value,
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let request_text = match serde_json::to_string(&request_body) {
@@ -553,7 +556,11 @@ async fn run_websocket_response_stream(
.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")));
if let Some(t) = telemetry.as_ref() {
t.on_ws_request(request_start.elapsed(), result.as_ref().err());
t.on_ws_request(
request_start.elapsed(),
result.as_ref().err(),
connection_reused,
);
}
result?;

View File

@@ -33,7 +33,7 @@ pub trait SseTelemetry: Send + Sync {
/// Telemetry for Responses WebSocket transport.
pub trait WebsocketTelemetry: Send + Sync {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool);
fn on_ws_event(
&self,

View File

@@ -1,3 +1,4 @@
use base64::Engine;
use chrono::DateTime;
use chrono::Utc;
use codex_api::AuthProvider as ApiAuthProvider;
@@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message;
use codex_api::rate_limits::parse_rate_limit_for_limit;
use http::HeaderMap;
use serde::Deserialize;
use serde_json::Value;
use crate::auth::CodexAuth;
use crate::error::CodexErr;
@@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url: None,
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
}),
ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
ApiError::Transport(transport) => match transport {
@@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url,
cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER),
request_id: extract_request_id(headers.as_ref()),
identity_authorization_error: extract_header(
headers.as_ref(),
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
),
identity_error_code: extract_x_error_json_code(headers.as_ref()),
})
}
}
@@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit";
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[cfg(test)]
#[path = "api_bridge_tests.rs"]
@@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option<String> {
})
}
fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option<String> {
let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?;
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(Value::as_str)
.map(str::to_string)
}
pub(crate) fn auth_provider_from_auth(
auth: Option<CodexAuth>,
provider: &ModelProviderInfo,
@@ -191,6 +215,26 @@ pub(crate) struct CoreAuthProvider {
account_id: Option<String>,
}
impl CoreAuthProvider {
pub(crate) fn auth_header_attached(&self) -> bool {
self.token
.as_ref()
.is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok())
}
pub(crate) fn auth_header_name(&self) -> Option<&'static str> {
self.auth_header_attached().then_some("authorization")
}
#[cfg(test)]
pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self {
Self {
token: token.map(str::to_string),
account_id: account_id.map(str::to_string),
}
}
}
impl ApiAuthProvider for CoreAuthProvider {
fn bearer_token(&self) -> Option<String> {
self.token.clone()

View File

@@ -1,4 +1,5 @@
use super::*;
use base64::Engine;
use pretty_assertions::assert_eq;
#[test]
@@ -94,3 +95,49 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() {
None
);
}
#[test]
fn map_api_error_extracts_identity_auth_details_from_headers() {
let mut headers = HeaderMap::new();
headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401"));
headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401"));
headers.insert(
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
http::HeaderValue::from_static("missing_authorization_header"),
);
let x_error_json =
base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#);
headers.insert(
X_ERROR_JSON_HEADER,
http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
);
let err = map_api_error(ApiError::Transport(TransportError::Http {
status: http::StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
}));
let CodexErr::UnexpectedStatus(err) = err else {
panic!("expected CodexErr::UnexpectedStatus, got {err:?}");
};
assert_eq!(err.request_id.as_deref(), Some("req-401"));
assert_eq!(err.cf_ray.as_deref(), Some("ray-401"));
assert_eq!(
err.identity_authorization_error.as_deref(),
Some("missing_authorization_header")
);
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
}
#[test]
fn core_auth_provider_reports_when_auth_header_will_attach() {
let auth = CoreAuthProvider {
token: Some("access-token".to_string()),
account_id: None,
};
assert!(auth.auth_header_attached());
assert_eq!(auth.auth_header_name(), Some("authorization"));
}

View File

@@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery {
mode: UnauthorizedRecoveryMode,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct UnauthorizedRecoveryStepResult {
auth_state_changed: Option<bool>,
}
impl UnauthorizedRecoveryStepResult {
pub fn auth_state_changed(&self) -> Option<bool> {
self.auth_state_changed
}
}
impl UnauthorizedRecovery {
fn new(manager: Arc<AuthManager>) -> Self {
let cached_auth = manager.auth_cached();
@@ -917,7 +928,46 @@ impl UnauthorizedRecovery {
!matches!(self.step, UnauthorizedRecoveryStep::Done)
}
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
pub fn unavailable_reason(&self) -> &'static str {
if !self
.manager
.auth_cached()
.as_ref()
.is_some_and(CodexAuth::is_chatgpt_auth)
{
return "not_chatgpt_auth";
}
if self.mode == UnauthorizedRecoveryMode::External
&& !self.manager.has_external_auth_refresher()
{
return "no_external_refresher";
}
if matches!(self.step, UnauthorizedRecoveryStep::Done) {
return "recovery_exhausted";
}
"ready"
}
pub fn mode_name(&self) -> &'static str {
match self.mode {
UnauthorizedRecoveryMode::Managed => "managed",
UnauthorizedRecoveryMode::External => "external",
}
}
pub fn step_name(&self) -> &'static str {
match self.step {
UnauthorizedRecoveryStep::Reload => "reload",
UnauthorizedRecoveryStep::RefreshToken => "refresh_token",
UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh",
UnauthorizedRecoveryStep::Done => "done",
}
}
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
if !self.has_next() {
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
@@ -931,8 +981,17 @@ impl UnauthorizedRecovery {
.manager
.reload_if_account_id_matches(self.expected_account_id.as_deref())
{
ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => {
ReloadOutcome::ReloadedChanged => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
ReloadOutcome::ReloadedNoChange => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(false),
});
}
ReloadOutcome::Skipped => {
self.step = UnauthorizedRecoveryStep::Done;
@@ -946,16 +1005,24 @@ impl UnauthorizedRecovery {
UnauthorizedRecoveryStep::RefreshToken => {
self.manager.refresh_token_from_authority().await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::ExternalRefresh => {
self.manager
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
.await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::Done => {}
}
Ok(())
Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: None,
})
}
}

View File

@@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use tempfile::tempdir;
#[tokio::test]
@@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> {
Ok(())
}
#[test]
fn unauthorized_recovery_reports_mode_and_step_names() {
let dir = tempdir().unwrap();
let manager = AuthManager::shared(
dir.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let managed = UnauthorizedRecovery {
manager: Arc::clone(&manager),
step: UnauthorizedRecoveryStep::Reload,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::Managed,
};
assert_eq!(managed.mode_name(), "managed");
assert_eq!(managed.step_name(), "reload");
let external = UnauthorizedRecovery {
manager,
step: UnauthorizedRecoveryStep::ExternalRefresh,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::External,
};
assert_eq!(external.mode_name(), "external");
assert_eq!(external.step_name(), "external_refresh");
}
struct AuthFileParams {
openai_api_key: Option<String>,
chatgpt_plan_type: Option<String>,

View File

@@ -75,6 +75,7 @@ use http::HeaderValue;
use http::StatusCode as HttpStatusCode;
use reqwest::StatusCode;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
@@ -85,6 +86,7 @@ use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::auth::RefreshTokenError;
use crate::client_common::Prompt;
@@ -97,7 +99,14 @@ use crate::error::Result;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::extract_response_debug_context_from_api_error;
use crate::response_debug_context::telemetry_api_error_message;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_auth_recovery_tags;
use crate::util::emit_feedback_request_tags;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
@@ -105,7 +114,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const RESPONSES_ENDPOINT: &str = "/responses";
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
pub fn ws_version_from_features(config: &Config) -> bool {
config
.features
@@ -144,6 +155,17 @@ struct CurrentClientSetup {
api_auth: CoreAuthProvider,
}
#[derive(Clone, Copy)]
struct RequestRouteTelemetry {
endpoint: &'static str,
}
impl RequestRouteTelemetry {
fn for_endpoint(endpoint: &'static str) -> Self {
Self { endpoint }
}
}
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
@@ -201,6 +223,23 @@ struct WebsocketSession {
connection: Option<ApiWebSocketConnection>,
last_request: Option<ResponsesApiRequest>,
last_response_rx: Option<oneshot::Receiver<LastResponse>>,
connection_reused: StdMutex<bool>,
}
impl WebsocketSession {
fn set_connection_reused(&self, connection_reused: bool) {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = connection_reused;
}
fn connection_reused(&self) -> bool {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
enum WebsocketStreamOutcome {
@@ -291,7 +330,15 @@ impl ModelClient {
}
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
);
let client =
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -351,7 +398,15 @@ impl ModelClient {
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
);
let client =
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -391,8 +446,16 @@ impl ModelClient {
}
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
fn build_request_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
}
@@ -458,6 +521,7 @@ impl ModelClient {
///
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
/// behavior remains consistent across both flows.
#[allow(clippy::too_many_arguments)]
async fn connect_websocket(
&self,
session_telemetry: &SessionTelemetry,
@@ -465,17 +529,69 @@ impl ModelClient {
api_auth: CoreAuthProvider,
turn_state: Option<Arc<OnceLock<String>>>,
turn_metadata_header: Option<&str>,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry);
ApiWebSocketResponsesClient::new(api_provider, api_auth)
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(
session_telemetry,
auth_context,
request_route_telemetry,
);
let start = Instant::now();
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
.connect(
headers,
crate::default_client::default_headers(),
turn_state,
Some(websocket_telemetry),
)
.await
.await;
let error_message = result.as_ref().err().map(telemetry_api_error_message);
let response_debug = result
.as_ref()
.err()
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
let status = result.as_ref().err().and_then(api_error_http_status);
session_telemetry.record_websocket_connect(
start.elapsed(),
status,
error_message.as_deref(),
auth_context.auth_header_attached,
auth_context.auth_header_name,
auth_context.retry_after_unauthorized,
auth_context.recovery_mode,
auth_context.recovery_phase,
request_route_telemetry.endpoint,
false,
response_debug.request_id.as_deref(),
response_debug.cf_ray.as_deref(),
response_debug.auth_error.as_deref(),
response_debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: request_route_telemetry.endpoint,
auth_header_attached: auth_context.auth_header_attached,
auth_header_name: auth_context.auth_header_name,
auth_mode: auth_context.auth_mode,
auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized),
auth_recovery_mode: auth_context.recovery_mode,
auth_recovery_phase: auth_context.recovery_phase,
auth_connection_reused: Some(false),
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: auth_context
.retry_after_unauthorized
.then_some(result.is_ok()),
auth_recovery_followup_status: auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
result
}
/// Builds websocket handshake headers for both prewarm and turn-time reconnect.
@@ -718,7 +834,11 @@ impl ModelClientSession {
"failed to build websocket prewarm client setup: {err}"
))
})?;
let auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
);
let connection = self
.client
.connect_websocket(
@@ -727,9 +847,12 @@ impl ModelClientSession {
client_setup.api_auth,
Some(Arc::clone(&self.turn_state)),
None,
auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
)
.await?;
self.websocket_session.connection = Some(connection);
self.websocket_session.set_connection_reused(false);
Ok(())
}
/// Returns a websocket connection for this turn.
@@ -742,17 +865,22 @@ impl ModelClientSession {
wire_api = %self.client.state.provider.wire_api,
transport = "responses_websocket",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some()
turn.has_metadata_header = params.turn_metadata_header.is_some()
)
)]
async fn websocket_connection(
&mut self,
session_telemetry: &SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&str>,
options: &ApiResponsesOptions,
params: WebsocketConnectParams<'_>,
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
let WebsocketConnectParams {
session_telemetry,
api_provider,
api_auth,
turn_metadata_header,
options,
auth_context,
request_route_telemetry,
} = params;
let needs_new = match self.websocket_session.connection.as_ref() {
Some(conn) => conn.is_closed().await,
None => true,
@@ -773,9 +901,14 @@ impl ModelClientSession {
api_auth,
Some(turn_state),
turn_metadata_header,
auth_context,
request_route_telemetry,
)
.await?;
self.websocket_session.connection = Some(new_conn);
self.websocket_session.set_connection_reused(false);
} else {
self.websocket_session.set_connection_reused(true);
}
self.websocket_session
@@ -840,11 +973,20 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) =
Self::build_streaming_telemetry(session_telemetry);
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
session_telemetry,
request_auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -872,7 +1014,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -911,8 +1060,14 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -933,13 +1088,17 @@ impl ModelClientSession {
}
match self
.websocket_connection(
.websocket_connection(WebsocketConnectParams {
session_telemetry,
client_setup.api_provider,
client_setup.api_auth,
api_provider: client_setup.api_provider,
api_auth: client_setup.api_auth,
turn_metadata_header,
&options,
)
options: &options,
auth_context: request_auth_context,
request_route_telemetry: RequestRouteTelemetry::for_endpoint(
RESPONSES_ENDPOINT,
),
})
.await
{
Ok(_) => {}
@@ -951,7 +1110,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -968,7 +1134,7 @@ impl ModelClientSession {
"websocket connection is unavailable".to_string(),
))
})?
.stream_request(ws_request)
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let (stream, last_request_rx) =
@@ -981,8 +1147,14 @@ impl ModelClientSession {
/// Builds request and SSE telemetry for streaming API calls.
fn build_streaming_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
(request_telemetry, sse_telemetry)
@@ -991,8 +1163,14 @@ impl ModelClientSession {
/// Builds telemetry for the Responses API WebSocket transport.
fn build_websocket_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
websocket_telemetry
}
@@ -1126,6 +1304,7 @@ impl ModelClientSession {
self.websocket_session.connection = None;
self.websocket_session.last_request = None;
self.websocket_session.last_response_rx = None;
self.websocket_session.set_connection_reused(false);
}
activated
}
@@ -1264,30 +1443,209 @@ where
///
/// When refresh succeeds, the caller should retry the API call; otherwise
/// the mapped `CodexErr` is returned to the caller.
#[derive(Clone, Copy, Debug)]
struct UnauthorizedRecoveryExecution {
mode: &'static str,
phase: &'static str,
}
#[derive(Clone, Copy, Debug, Default)]
struct PendingUnauthorizedRetry {
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl PendingUnauthorizedRetry {
fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self {
Self {
retry_after_unauthorized: true,
recovery_mode: Some(recovery.mode),
recovery_phase: Some(recovery.phase),
}
}
}
#[derive(Clone, Copy, Debug, Default)]
struct AuthRequestTelemetryContext {
auth_mode: Option<&'static str>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl AuthRequestTelemetryContext {
fn new(
auth_mode: Option<AuthMode>,
api_auth: &CoreAuthProvider,
retry: PendingUnauthorizedRetry,
) -> Self {
Self {
auth_mode: auth_mode.map(|mode| match mode {
AuthMode::ApiKey => "ApiKey",
AuthMode::Chatgpt => "Chatgpt",
}),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
retry_after_unauthorized: retry.retry_after_unauthorized,
recovery_mode: retry.recovery_mode,
recovery_phase: retry.recovery_phase,
}
}
}
struct WebsocketConnectParams<'a> {
session_telemetry: &'a SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&'a str>,
options: &'a ApiResponsesOptions,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
async fn handle_unauthorized(
transport: TransportError,
auth_recovery: &mut Option<UnauthorizedRecovery>,
) -> Result<()> {
session_telemetry: &SessionTelemetry,
) -> Result<UnauthorizedRecoveryExecution> {
let debug = extract_response_debug_context(&transport);
if let Some(recovery) = auth_recovery
&& recovery.has_next()
{
let mode = recovery.mode_name();
let phase = recovery.step_name();
return match recovery.next().await {
Ok(_) => Ok(()),
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
Ok(step_result) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
step_result.auth_state_changed(),
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Ok(UnauthorizedRecoveryExecution { mode, phase })
}
Err(RefreshTokenError::Permanent(failed)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::RefreshTokenFailed(failed))
}
Err(RefreshTokenError::Transient(other)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::Io(other))
}
};
}
let (mode, phase, recovery_reason) = match auth_recovery.as_ref() {
Some(recovery) => (
recovery.mode_name(),
recovery.step_name(),
Some(recovery.unavailable_reason()),
),
None => ("none", "none", Some("auth_manager_missing")),
};
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
recovery_reason,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(map_api_error(ApiError::Transport(transport)))
}
fn api_error_http_status(error: &ApiError) -> Option<u16> {
match error {
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
_ => None,
}
}
struct ApiTelemetry {
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
impl ApiTelemetry {
fn new(session_telemetry: SessionTelemetry) -> Self {
Self { session_telemetry }
fn new(
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Self {
Self {
session_telemetry,
auth_context,
request_route_telemetry,
}
}
}
@@ -1299,13 +1657,50 @@ impl RequestTelemetry for ApiTelemetry {
error: Option<&TransportError>,
duration: Duration,
) {
let error_message = error.map(std::string::ToString::to_string);
let error_message = error.map(telemetry_transport_error_message);
let status = status.map(|s| s.as_u16());
let debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
self.session_telemetry.record_api_request(
attempt,
status.map(|s| s.as_u16()),
status,
error_message.as_deref(),
duration,
self.auth_context.auth_header_attached,
self.auth_context.auth_header_name,
self.auth_context.retry_after_unauthorized,
self.auth_context.recovery_mode,
self.auth_context.recovery_phase,
self.request_route_telemetry.endpoint,
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
}
@@ -1323,10 +1718,40 @@ impl SseTelemetry for ApiTelemetry {
}
impl WebsocketTelemetry for ApiTelemetry {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
let error_message = error.map(std::string::ToString::to_string);
self.session_telemetry
.record_websocket_request(duration, error_message.as_deref());
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) {
let error_message = error.map(telemetry_api_error_message);
let status = error.and_then(api_error_http_status);
let debug = error
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
self.session_telemetry.record_websocket_request(
duration,
error_message.as_deref(),
connection_reused,
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: Some(connection_reused),
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
fn on_ws_event(

View File

@@ -1,4 +1,7 @@
use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelInfo;
@@ -94,3 +97,22 @@ async fn summarize_memories_returns_empty_for_empty_input() {
.expect("empty summarize request should succeed");
assert_eq!(output.len(), 0);
}
#[test]
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
let auth_context = AuthRequestTelemetryContext::new(
Some(crate::auth::AuthMode::Chatgpt),
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
mode: "managed",
phase: "refresh_token",
}),
);
assert_eq!(auth_context.auth_mode, Some("Chatgpt"));
assert!(auth_context.auth_header_attached);
assert_eq!(auth_context.auth_header_name, Some("authorization"));
assert!(auth_context.retry_after_unauthorized);
assert_eq!(auth_context.recovery_mode, Some("managed"));
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
}

View File

@@ -292,6 +292,8 @@ pub struct UnexpectedResponseError {
pub url: Option<String>,
pub cf_ray: Option<String>,
pub request_id: Option<String>,
pub identity_authorization_error: Option<String>,
pub identity_error_code: Option<String>,
}
const CLOUDFLARE_BLOCKED_MESSAGE: &str =
@@ -346,6 +348,12 @@ impl UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
Some(message)
}
@@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
write!(f, "{message}")
}
}

View File

@@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() {
url: Some("http://example.com/blocked".to_string()),
cf_ray: Some("ray-id".to_string()),
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/blocked";
@@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() {
url: Some("http://example.com/plain".to_string()),
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/plain";
@@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: None,
request_id: Some("req-123".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() {
url: Some("http://example.com/long".to_string()),
cf_ray: None,
request_id: Some("req-long".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::BAD_GATEWAY.to_string();
let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES));
@@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()),
request_id: Some("req-xyz".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
);
}
#[test]
fn unexpected_status_includes_identity_auth_details() {
let err = UnexpectedResponseError {
status: StatusCode::UNAUTHORIZED,
body: "plain text error".to_string(),
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
cf_ray: Some("cf-ray-auth-401-test".to_string()),
request_id: Some("req-auth".to_string()),
identity_authorization_error: Some("missing_authorization_header".to_string()),
identity_error_code: Some("token_expired".to_string()),
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
err.to_string(),
format!(
"unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: cf-ray-auth-401-test, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired"
)
);
}
#[test]
fn usage_limit_reached_includes_hours_and_minutes() {
let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();

View File

@@ -87,6 +87,7 @@ pub use model_provider_info::WireApi;
pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
mod event_mapping;
mod response_debug_context;
pub mod review_format;
pub mod review_prompts;
mod seatbelt_permissions;

View File

@@ -3,6 +3,7 @@ use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::error::CodexErr;
@@ -11,8 +12,15 @@ use crate::model_provider_info::ModelProviderInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets;
use crate::models_manager::model_info;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_request_tags;
use codex_api::ModelsClient;
use codex_api::RequestTelemetry;
use codex_api::ReqwestTransport;
use codex_api::TransportError;
use codex_otel::TelemetryAuthMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelPreset;
@@ -32,6 +40,82 @@ use tracing::instrument;
const MODEL_CACHE_FILE: &str = "models_cache.json";
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
const MODELS_ENDPOINT: &str = "/models";
#[derive(Clone)]
struct ModelsRequestTelemetry {
auth_mode: Option<String>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
}
impl RequestTelemetry for ModelsRequestTelemetry {
fn on_request(
&self,
attempt: u64,
status: Option<http::StatusCode>,
error: Option<&TransportError>,
duration: Duration,
) {
let success = status.is_some_and(|code| code.is_success()) && error.is_none();
let error_message = error.map(telemetry_transport_error_message);
let response_debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
let status = status.map(|status| status.as_u16());
tracing::event!(
target: "codex_otel.log_only",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
tracing::event!(
target: "codex_otel.trace_safe",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: MODELS_ENDPOINT,
auth_header_attached: self.auth_header_attached,
auth_header_name: self.auth_header_name,
auth_mode: self.auth_mode.as_deref(),
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
}
}
/// Strategy for refreshing available models.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -330,11 +414,17 @@ impl ModelsManager {
let _timer =
codex_otel::start_global_timer("codex.remote_models.fetch_update.duration_ms", &[]);
let auth = self.auth_manager.auth().await;
let auth_mode = self.auth_manager.auth_mode();
let auth_mode = auth.as_ref().map(CodexAuth::auth_mode);
let api_provider = self.provider.to_api_provider(auth_mode)?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let client = ModelsClient::new(transport, api_provider, api_auth);
let request_telemetry: Arc<dyn RequestTelemetry> = Arc::new(ModelsRequestTelemetry {
auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
});
let client = ModelsClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry));
let client_version = crate::models_manager::client_version_to_whole();
let (models, etag) = timeout(

View File

@@ -0,0 +1,167 @@
use base64::Engine;
use codex_api::TransportError;
use codex_api::error::ApiError;
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub(crate) struct ResponseDebugContext {
pub(crate) request_id: Option<String>,
pub(crate) cf_ray: Option<String>,
pub(crate) auth_error: Option<String>,
pub(crate) auth_error_code: Option<String>,
}
pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
let mut context = ResponseDebugContext::default();
let TransportError::Http {
headers, body: _, ..
} = transport
else {
return context;
};
let extract_header = |name: &str| {
headers
.as_ref()
.and_then(|headers| headers.get(name))
.and_then(|value| value.to_str().ok())
.map(str::to_string)
};
context.request_id =
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
context.cf_ray = extract_header(CF_RAY_HEADER);
context.auth_error = extract_header(AUTH_ERROR_HEADER);
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
});
context
}
pub(crate) fn extract_response_debug_context_from_api_error(
error: &ApiError,
) -> ResponseDebugContext {
match error {
ApiError::Transport(transport) => extract_response_debug_context(transport),
_ => ResponseDebugContext::default(),
}
}
pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String {
match error {
TransportError::Http { status, .. } => format!("http {}", status.as_u16()),
TransportError::RetryLimit => "retry limit reached".to_string(),
TransportError::Timeout => "timeout".to_string(),
TransportError::Network(err) => err.to_string(),
TransportError::Build(err) => err.to_string(),
}
}
pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String {
match error {
ApiError::Transport(transport) => telemetry_transport_error_message(transport),
ApiError::Api { status, .. } => format!("api error {}", status.as_u16()),
ApiError::Stream(err) => err.to_string(),
ApiError::ContextWindowExceeded => "context window exceeded".to_string(),
ApiError::QuotaExceeded => "quota exceeded".to_string(),
ApiError::UsageNotIncluded => "usage not included".to_string(),
ApiError::Retryable { .. } => "retryable error".to_string(),
ApiError::RateLimit(_) => "rate limit".to_string(),
ApiError::InvalidRequest { .. } => "invalid request".to_string(),
ApiError::ServerOverloaded => "server overloaded".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::ResponseDebugContext;
use super::extract_response_debug_context;
use super::telemetry_api_error_message;
use super::telemetry_transport_error_message;
use codex_api::TransportError;
use codex_api::error::ApiError;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use pretty_assertions::assert_eq;
#[test]
fn extract_response_debug_context_decodes_identity_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-oai-request-id", HeaderValue::from_static("req-auth"));
headers.insert("cf-ray", HeaderValue::from_static("ray-auth"));
headers.insert(
"x-openai-authorization-error",
HeaderValue::from_static("missing_authorization_header"),
);
headers.insert(
"x-error-json",
HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="),
);
let context = extract_response_debug_context(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"error":{"message":"plain text error"},"status":401}"#.to_string()),
});
assert_eq!(
context,
ResponseDebugContext {
request_id: Some("req-auth".to_string()),
cf_ray: Some("ray-auth".to_string()),
auth_error: Some("missing_authorization_header".to_string()),
auth_error_code: Some("token_expired".to_string()),
}
);
}
#[test]
fn telemetry_error_messages_omit_http_bodies() {
let transport = TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()),
};
assert_eq!(telemetry_transport_error_message(&transport), "http 401");
assert_eq!(
telemetry_api_error_message(&ApiError::Transport(transport)),
"http 401"
);
}
#[test]
fn telemetry_error_messages_preserve_non_http_details() {
let network = TransportError::Network("dns lookup failed".to_string());
let build = TransportError::Build("invalid header value".to_string());
let stream = ApiError::Stream("socket closed".to_string());
assert_eq!(
telemetry_transport_error_message(&network),
"dns lookup failed"
);
assert_eq!(
telemetry_transport_error_message(&build),
"invalid header value"
);
assert_eq!(telemetry_api_error_message(&stream), "socket closed");
}
}

View File

@@ -126,6 +126,8 @@ fn explicit_unreadable_paths_are_excluded_from_full_disk_read_and_write_access()
);
let policy = seatbelt_policy_arg(&args);
let unreadable_roots = file_system_policy.get_unreadable_roots_with_cwd(Path::new("/"));
let unreadable_root = unreadable_roots.first().expect("expected unreadable root");
assert!(
policy.contains("(require-not (subpath (param \"READABLE_ROOT_0_RO_0\")))"),
"expected read carveout in policy:\n{policy}"
@@ -136,12 +138,12 @@ fn explicit_unreadable_paths_are_excluded_from_full_disk_read_and_write_access()
);
assert!(
args.iter()
.any(|arg| arg == "-DREADABLE_ROOT_0_RO_0=/tmp/codex-unreadable"),
.any(|arg| arg == &format!("-DREADABLE_ROOT_0_RO_0={}", unreadable_root.display())),
"expected read carveout parameter in args: {args:#?}"
);
assert!(
args.iter()
.any(|arg| arg == "-DWRITABLE_ROOT_0_RO_0=/tmp/codex-unreadable"),
.any(|arg| arg == &format!("-DWRITABLE_ROOT_0_RO_0={}", unreadable_root.display())),
"expected write carveout parameter in args: {args:#?}"
);
}
@@ -172,18 +174,22 @@ fn explicit_unreadable_paths_are_excluded_from_readable_roots() {
);
let policy = seatbelt_policy_arg(&args);
let readable_roots = file_system_policy.get_readable_roots_with_cwd(Path::new("/"));
let readable_root = readable_roots.first().expect("expected readable root");
let unreadable_roots = file_system_policy.get_unreadable_roots_with_cwd(Path::new("/"));
let unreadable_root = unreadable_roots.first().expect("expected unreadable root");
assert!(
policy.contains("(require-not (subpath (param \"READABLE_ROOT_0_RO_0\")))"),
"expected read carveout in policy:\n{policy}"
);
assert!(
args.iter()
.any(|arg| arg == "-DREADABLE_ROOT_0=/tmp/codex-readable"),
.any(|arg| arg == &format!("-DREADABLE_ROOT_0={}", readable_root.display())),
"expected readable root parameter in args: {args:#?}"
);
assert!(
args.iter()
.any(|arg| arg == "-DREADABLE_ROOT_0_RO_0=/tmp/codex-readable/private"),
.any(|arg| arg == &format!("-DREADABLE_ROOT_0_RO_0={}", unreadable_root.display())),
"expected read carveout parameter in args: {args:#?}"
);
}

View File

@@ -37,6 +37,111 @@ macro_rules! feedback_tags {
};
}
pub(crate) struct FeedbackRequestTags<'a> {
pub endpoint: &'a str,
pub auth_header_attached: bool,
pub auth_header_name: Option<&'a str>,
pub auth_mode: Option<&'a str>,
pub auth_retry_after_unauthorized: Option<bool>,
pub auth_recovery_mode: Option<&'a str>,
pub auth_recovery_phase: Option<&'a str>,
pub auth_connection_reused: Option<bool>,
pub auth_request_id: Option<&'a str>,
pub auth_cf_ray: Option<&'a str>,
pub auth_error: Option<&'a str>,
pub auth_error_code: Option<&'a str>,
pub auth_recovery_followup_success: Option<bool>,
pub auth_recovery_followup_status: Option<u16>,
}
struct Auth401FeedbackSnapshot<'a> {
request_id: &'a str,
cf_ray: &'a str,
error: &'a str,
error_code: &'a str,
}
impl<'a> Auth401FeedbackSnapshot<'a> {
fn from_optional_fields(
request_id: Option<&'a str>,
cf_ray: Option<&'a str>,
error: Option<&'a str>,
error_code: Option<&'a str>,
) -> Self {
Self {
request_id: request_id.unwrap_or(""),
cf_ray: cf_ray.unwrap_or(""),
error: error.unwrap_or(""),
error_code: error_code.unwrap_or(""),
}
}
}
pub(crate) fn emit_feedback_request_tags(tags: &FeedbackRequestTags<'_>) {
let auth_header_name = tags.auth_header_name.unwrap_or("");
let auth_mode = tags.auth_mode.unwrap_or("");
let auth_retry_after_unauthorized = tags
.auth_retry_after_unauthorized
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_mode = tags.auth_recovery_mode.unwrap_or("");
let auth_recovery_phase = tags.auth_recovery_phase.unwrap_or("");
let auth_connection_reused = tags
.auth_connection_reused
.map_or_else(String::new, |value| value.to_string());
let auth_request_id = tags.auth_request_id.unwrap_or("");
let auth_cf_ray = tags.auth_cf_ray.unwrap_or("");
let auth_error = tags.auth_error.unwrap_or("");
let auth_error_code = tags.auth_error_code.unwrap_or("");
let auth_recovery_followup_success = tags
.auth_recovery_followup_success
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_followup_status = tags
.auth_recovery_followup_status
.map_or_else(String::new, |value| value.to_string());
feedback_tags!(
endpoint = tags.endpoint,
auth_header_attached = tags.auth_header_attached,
auth_header_name = auth_header_name,
auth_mode = auth_mode,
auth_retry_after_unauthorized = auth_retry_after_unauthorized,
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_connection_reused = auth_connection_reused,
auth_request_id = auth_request_id,
auth_cf_ray = auth_cf_ray,
auth_error = auth_error,
auth_error_code = auth_error_code,
auth_recovery_followup_success = auth_recovery_followup_success,
auth_recovery_followup_status = auth_recovery_followup_status
);
}
pub(crate) fn emit_feedback_auth_recovery_tags(
auth_recovery_mode: &str,
auth_recovery_phase: &str,
auth_recovery_outcome: &str,
auth_request_id: Option<&str>,
auth_cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let auth_401 = Auth401FeedbackSnapshot::from_optional_fields(
auth_request_id,
auth_cf_ray,
auth_error,
auth_error_code,
);
feedback_tags!(
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_recovery_outcome = auth_recovery_outcome,
auth_401_request_id = auth_401.request_id,
auth_401_cf_ray = auth_401.cf_ray,
auth_401_error = auth_401.error,
auth_401_error_code = auth_401.error_code
);
}
pub fn backoff(attempt: u64) -> Duration {
let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
let base = (INITIAL_DELAY_MS as f64 * exp) as u64;

View File

@@ -1,4 +1,15 @@
use super::*;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::Event;
use tracing::Subscriber;
use tracing::field::Visit;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt;
#[test]
fn test_try_parse_error_message() {
@@ -32,6 +43,298 @@ fn feedback_tags_macro_compiles() {
feedback_tags!(model = "gpt-5", cached = true, debug_only = OnlyDebug);
}
#[derive(Default)]
struct TagCollectorVisitor {
tags: BTreeMap<String, String>,
}
impl Visit for TagCollectorVisitor {
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.tags
.insert(field.name().to_string(), format!("{value:?}"));
}
}
#[derive(Clone)]
struct TagCollectorLayer {
tags: Arc<Mutex<BTreeMap<String, String>>>,
}
impl<S> Layer<S> for TagCollectorLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "feedback_tags" {
return;
}
let mut visitor = TagCollectorVisitor::default();
event.record(&mut visitor);
self.tags.lock().unwrap().extend(visitor.tags);
}
}
#[test]
fn emit_feedback_request_tags_records_sentry_feedback_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("endpoint").map(String::as_str),
Some("\"/responses\"")
);
assert_eq!(
tags.get("auth_header_attached").map(String::as_str),
Some("true")
);
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"authorization\"")
);
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"true\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"200\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"ray-401\"")
);
assert_eq!(
tags.get("auth_401_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"token_expired\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_failed_transient",
Some("req-401-a"),
Some("ray-401-a"),
Some("missing_authorization_header"),
Some("token_expired"),
);
emit_feedback_auth_recovery_tags(
"managed",
"done",
"recovery_not_run",
Some("req-401-b"),
None,
None,
None,
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401-b\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_401_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"\"")
);
}
#[test]
fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(true),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: None,
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(false),
auth_recovery_followup_status: Some(401),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_cf_ray").map(String::as_str),
Some("\"ray-123\"")
);
assert_eq!(
tags.get("auth_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"false\"")
);
}
#[test]
fn emit_feedback_request_tags_clears_stale_latest_auth_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: None,
auth_mode: None,
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: None,
auth_cf_ray: None,
auth_error: None,
auth_error_code: None,
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_mode").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_cf_ray").map(String::as_str), Some("\"\""));
assert_eq!(tags.get("auth_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"\"")
);
}
#[test]
fn normalize_thread_name_trims_and_rejects_empty() {
assert_eq!(normalize_thread_name(" "), None);

View File

@@ -340,17 +340,43 @@ impl SessionTelemetry {
Ok(response) => (Some(response.status().as_u16()), None),
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
};
self.record_api_request(attempt, status, error.as_deref(), duration);
self.record_api_request(
attempt,
status,
error.as_deref(),
duration,
false,
None,
false,
None,
None,
"unknown",
None,
None,
None,
None,
);
response
}
#[allow(clippy::too_many_arguments)]
pub fn record_api_request(
&self,
attempt: u64,
status: Option<u16>,
error: Option<&str>,
duration: Duration,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
let success_str = if success { "true" } else { "false" };
@@ -375,13 +401,76 @@ impl SessionTelemetry {
http.response.status_code = status,
error.message = error,
attempt = attempt,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) {
#[allow(clippy::too_many_arguments)]
pub fn record_websocket_connect(
&self,
duration: Duration,
status: Option<u16>,
error: Option<&str>,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
connection_reused: bool,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = error.is_none()
&& status
.map(|code| (200..=299).contains(&code))
.unwrap_or(true);
let success_str = if success { "true" } else { "false" };
log_and_trace_event!(
self,
common: {
event.name = "codex.websocket_connect",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success_str,
error.message = error,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.connection_reused = connection_reused,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(
&self,
duration: Duration,
error: Option<&str>,
connection_reused: bool,
) {
let success_str = if error.is_none() { "true" } else { "false" };
self.counter(
WEBSOCKET_REQUEST_COUNT_METRIC,
@@ -400,6 +489,39 @@ impl SessionTelemetry {
duration_ms = %duration.as_millis(),
success = success_str,
error.message = error,
auth.connection_reused = connection_reused,
},
log: {},
trace: {},
);
}
#[allow(clippy::too_many_arguments)]
pub fn record_auth_recovery(
&self,
mode: &str,
step: &str,
outcome: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
recovery_reason: Option<&str>,
auth_state_changed: Option<bool>,
) {
log_and_trace_event!(
self,
common: {
event.name = "codex.auth_recovery",
auth.mode = mode,
auth.step = step,
auth.outcome = outcome,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
auth.recovery_reason = recovery_reason,
auth.state_changed = auth_state_changed,
},
log: {},
trace: {},

View File

@@ -297,3 +297,462 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() {
assert!(!tool_trace_attrs.contains_key("mcp_server"));
assert!(!tool_trace_attrs.contains_key("mcp_server_origin"));
}
#[test]
fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_auth_recovery(
"managed",
"reload",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
None,
Some(true),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery");
let recovery_log_attrs = log_attributes(&recovery_log.record);
assert_eq!(
recovery_log_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_log_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_log_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_log_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_log_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_log_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_log_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
let spans = span_exporter.get_finished_spans().expect("span export");
assert_eq!(spans.len(), 1);
let span_events = &spans[0].events.events;
assert_eq!(span_events.len(), 1);
let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery");
let recovery_trace_attrs = span_event_attributes(recovery_trace_event);
assert_eq!(
recovery_trace_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_trace_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_trace_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_trace_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_trace_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_trace_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
}
#[test]
fn otel_export_routing_policy_routes_api_request_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_api_request(
1,
Some(401),
Some("http 401"),
std::time::Duration::from_millis(42),
true,
Some("authorization"),
true,
Some("managed"),
Some("refresh_token"),
"/responses",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.api_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_log_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_mode")
.map(String::as_str),
Some("managed")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("refresh_token")
);
assert_eq!(
request_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
request_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_trace_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_connect_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_connect(
std::time::Duration::from_millis(17),
Some(401),
Some("http 401"),
true,
Some("authorization"),
true,
Some("managed"),
Some("reload"),
"/responses",
false,
Some("req-ws-401"),
Some("ray-ws-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect");
let connect_log_attrs = log_attributes(&connect_log.record);
assert_eq!(
connect_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
connect_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
connect_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
connect_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
connect_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("false")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let connect_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect");
let connect_trace_attrs = span_event_attributes(connect_trace_event);
assert_eq!(
connect_trace_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("reload")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_request_transport_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_request(
std::time::Duration::from_millis(23),
Some("stream error"),
true,
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.websocket_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs.get("error.message").map(String::as_str),
Some("stream error")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
}

View File

@@ -47,8 +47,23 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
None,
None,
);
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
manager.record_websocket_request(Duration::from_millis(400), None);
manager.record_api_request(
1,
Some(200),
None,
Duration::from_millis(300),
false,
None,
false,
None,
None,
"/responses",
None,
None,
None,
None,
);
manager.record_websocket_request(Duration::from_millis(400), None, false);
let sse_response: std::result::Result<
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
tokio::time::error::Elapsed,

View File

@@ -378,6 +378,7 @@ impl FileSystemSandboxPolicy {
.filter(|entry| self.can_read_path_with_cwd(entry.path.as_path(), cwd))
.map(|entry| entry.path)
.collect(),
/*normalize_effective_paths*/ true,
)
}
@@ -389,39 +390,96 @@ impl FileSystemSandboxPolicy {
}
let resolved_entries = self.resolved_entries_with_cwd(cwd);
let read_only_roots = dedup_absolute_paths(
resolved_entries
.iter()
.filter(|entry| !entry.access.can_write())
.filter(|entry| !self.can_write_path_with_cwd(entry.path.as_path(), cwd))
.map(|entry| entry.path.clone())
.collect(),
);
let writable_entries: Vec<AbsolutePathBuf> = resolved_entries
.iter()
.filter(|entry| entry.access.can_write())
.filter(|entry| self.can_write_path_with_cwd(entry.path.as_path(), cwd))
.map(|entry| entry.path.clone())
.collect();
dedup_absolute_paths(
resolved_entries
.into_iter()
.filter(|entry| entry.access.can_write())
.filter(|entry| self.can_write_path_with_cwd(entry.path.as_path(), cwd))
.map(|entry| entry.path)
.collect(),
writable_entries.clone(),
/*normalize_effective_paths*/ true,
)
.into_iter()
.map(|root| {
// Filesystem-root policies stay in their effective canonical form
// so root-wide aliases do not create duplicate top-level masks.
// Example: keep `/var/...` normalized under `/` instead of
// materializing both `/var/...` and `/private/var/...`.
let preserve_raw_carveout_paths = root.as_path().parent().is_some();
let raw_writable_roots: Vec<&AbsolutePathBuf> = writable_entries
.iter()
.filter(|path| normalize_effective_absolute_path((*path).clone()) == root)
.collect();
let mut read_only_subpaths = default_read_only_subpaths_for_writable_root(&root);
// Narrower explicit non-write entries carve out broader writable roots.
// More specific write entries still remain writable because they appear
// as separate WritableRoot values and are checked independently.
// Preserve symlink path components that live under the writable root
// so downstream sandboxes can still mask the symlink inode itself.
// Example: if `<root>/.codex -> <root>/decoy`, bwrap must still see
// `<root>/.codex`, not only the resolved `<root>/decoy`.
read_only_subpaths.extend(
read_only_roots
resolved_entries
.iter()
.filter(|path| path.as_path() != root.as_path())
.filter(|path| path.as_path().starts_with(root.as_path()))
.cloned(),
.filter(|entry| !entry.access.can_write())
.filter(|entry| !self.can_write_path_with_cwd(entry.path.as_path(), cwd))
.filter_map(|entry| {
let effective_path = normalize_effective_absolute_path(entry.path.clone());
// Preserve the literal in-root path whenever the
// carveout itself lives under this writable root, even
// if following symlinks would resolve back to the root
// or escape outside it. Downstream sandboxes need that
// raw path so they can mask the symlink inode itself.
// Examples:
// - `<root>/linked-private -> <root>/decoy-private`
// - `<root>/linked-private -> /tmp/outside-private`
// - `<root>/alias-root -> <root>`
let raw_carveout_path = if preserve_raw_carveout_paths {
if entry.path == root {
None
} else if entry.path.as_path().starts_with(root.as_path()) {
Some(entry.path.clone())
} else {
raw_writable_roots.iter().find_map(|raw_root| {
let suffix = entry
.path
.as_path()
.strip_prefix(raw_root.as_path())
.ok()?;
if suffix.as_os_str().is_empty() {
return None;
}
root.join(suffix).ok()
})
}
} else {
None
};
if let Some(raw_carveout_path) = raw_carveout_path {
return Some(raw_carveout_path);
}
if effective_path == root
|| !effective_path.as_path().starts_with(root.as_path())
{
return None;
}
Some(effective_path)
}),
);
WritableRoot {
root,
read_only_subpaths: dedup_absolute_paths(read_only_subpaths),
// Preserve literal in-root protected paths like `.git` and
// `.codex` so downstream sandboxes can still detect and mask
// the symlink itself instead of only its resolved target.
read_only_subpaths: dedup_absolute_paths(
read_only_subpaths,
/*normalize_effective_paths*/ false,
),
}
})
.collect()
@@ -448,6 +506,7 @@ impl FileSystemSandboxPolicy {
.filter(|entry| root.as_ref() != Some(&entry.path))
.map(|entry| entry.path.clone())
.collect(),
/*normalize_effective_paths*/ true,
)
}
@@ -580,13 +639,19 @@ impl FileSystemSandboxPolicy {
} else {
ReadOnlyAccess::Restricted {
include_platform_defaults,
readable_roots: dedup_absolute_paths(readable_roots),
readable_roots: dedup_absolute_paths(
readable_roots,
/*normalize_effective_paths*/ false,
),
}
};
if workspace_root_writable {
SandboxPolicy::WorkspaceWrite {
writable_roots: dedup_absolute_paths(writable_roots),
writable_roots: dedup_absolute_paths(
writable_roots,
/*normalize_effective_paths*/ false,
),
read_only_access,
network_access: network_policy.is_enabled(),
exclude_tmpdir_env_var: !tmpdir_writable,
@@ -922,17 +987,43 @@ fn resolve_file_system_special_path(
}
}
fn dedup_absolute_paths(paths: Vec<AbsolutePathBuf>) -> Vec<AbsolutePathBuf> {
fn dedup_absolute_paths(
paths: Vec<AbsolutePathBuf>,
normalize_effective_paths: bool,
) -> Vec<AbsolutePathBuf> {
let mut deduped = Vec::with_capacity(paths.len());
let mut seen = HashSet::new();
for path in paths {
if seen.insert(path.to_path_buf()) {
deduped.push(path);
let dedup_path = if normalize_effective_paths {
normalize_effective_absolute_path(path)
} else {
path
};
if seen.insert(dedup_path.to_path_buf()) {
deduped.push(dedup_path);
}
}
deduped
}
fn normalize_effective_absolute_path(path: AbsolutePathBuf) -> AbsolutePathBuf {
let raw_path = path.to_path_buf();
for ancestor in raw_path.ancestors() {
let Ok(canonical_ancestor) = ancestor.canonicalize() else {
continue;
};
let Ok(suffix) = raw_path.strip_prefix(ancestor) else {
continue;
};
if let Ok(normalized_path) =
AbsolutePathBuf::from_absolute_path(canonical_ancestor.join(suffix))
{
return normalized_path;
}
}
path
}
fn default_read_only_subpaths_for_writable_root(
writable_root: &AbsolutePathBuf,
) -> Vec<AbsolutePathBuf> {
@@ -966,7 +1057,7 @@ fn default_read_only_subpaths_for_writable_root(
}
}
dedup_absolute_paths(subpaths)
dedup_absolute_paths(subpaths, /*normalize_effective_paths*/ false)
}
fn is_git_pointer_file(path: &AbsolutePathBuf) -> bool {
@@ -1038,8 +1129,19 @@ fn resolve_gitdir_from_file(dot_git: &AbsolutePathBuf) -> Option<AbsolutePathBuf
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[cfg(unix)]
use std::fs;
use std::path::Path;
use tempfile::TempDir;
#[cfg(unix)]
const SYMLINKED_TMPDIR_TEST_ENV: &str = "CODEX_PROTOCOL_TEST_SYMLINKED_TMPDIR";
#[cfg(unix)]
fn symlink_dir(original: &Path, link: &Path) -> std::io::Result<()> {
std::os::unix::fs::symlink(original, link)
}
#[test]
fn unknown_special_paths_are_ignored_by_legacy_bridge() -> std::io::Result<()> {
let policy = FileSystemSandboxPolicy::restricted(vec![FileSystemSandboxEntry {
@@ -1067,6 +1169,333 @@ mod tests {
Ok(())
}
#[cfg(unix)]
#[test]
fn effective_runtime_roots_canonicalize_symlinked_paths() {
let cwd = TempDir::new().expect("tempdir");
let real_root = cwd.path().join("real");
let link_root = cwd.path().join("link");
let blocked = real_root.join("blocked");
let codex_dir = real_root.join(".codex");
fs::create_dir_all(&blocked).expect("create blocked");
fs::create_dir_all(&codex_dir).expect("create .codex");
symlink_dir(&real_root, &link_root).expect("create symlinked root");
let link_root =
AbsolutePathBuf::from_absolute_path(&link_root).expect("absolute symlinked root");
let link_blocked = link_root.join("blocked").expect("symlinked blocked path");
let expected_root = AbsolutePathBuf::from_absolute_path(
real_root.canonicalize().expect("canonicalize real root"),
)
.expect("absolute canonical root");
let expected_blocked = AbsolutePathBuf::from_absolute_path(
blocked.canonicalize().expect("canonicalize blocked"),
)
.expect("absolute canonical blocked");
let expected_codex = AbsolutePathBuf::from_absolute_path(
codex_dir.canonicalize().expect("canonicalize .codex"),
)
.expect("absolute canonical .codex");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_root },
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_blocked },
access: FileSystemAccessMode::None,
},
]);
assert_eq!(
policy.get_unreadable_roots_with_cwd(cwd.path()),
vec![expected_blocked.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_blocked)
);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_codex)
);
}
#[cfg(unix)]
#[test]
fn writable_roots_preserve_symlinked_protected_subpaths() {
let cwd = TempDir::new().expect("tempdir");
let root = cwd.path().join("root");
let decoy = root.join("decoy-codex");
let dot_codex = root.join(".codex");
fs::create_dir_all(&decoy).expect("create decoy");
symlink_dir(&decoy, &dot_codex).expect("create .codex symlink");
let root = AbsolutePathBuf::from_absolute_path(&root).expect("absolute root");
let expected_dot_codex = AbsolutePathBuf::from_absolute_path(
root.as_path()
.canonicalize()
.expect("canonicalize root")
.join(".codex"),
)
.expect("absolute .codex symlink");
let unexpected_decoy =
AbsolutePathBuf::from_absolute_path(decoy.canonicalize().expect("canonicalize decoy"))
.expect("absolute canonical decoy");
let policy = FileSystemSandboxPolicy::restricted(vec![FileSystemSandboxEntry {
path: FileSystemPath::Path { path: root },
access: FileSystemAccessMode::Write,
}]);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(
writable_roots[0].read_only_subpaths,
vec![expected_dot_codex]
);
assert!(
!writable_roots[0]
.read_only_subpaths
.contains(&unexpected_decoy)
);
}
#[cfg(unix)]
#[test]
fn writable_roots_preserve_explicit_symlinked_carveouts_under_symlinked_roots() {
let cwd = TempDir::new().expect("tempdir");
let real_root = cwd.path().join("real");
let link_root = cwd.path().join("link");
let decoy = real_root.join("decoy-private");
let linked_private = real_root.join("linked-private");
fs::create_dir_all(&decoy).expect("create decoy");
symlink_dir(&real_root, &link_root).expect("create symlinked root");
symlink_dir(&decoy, &linked_private).expect("create linked-private symlink");
let link_root =
AbsolutePathBuf::from_absolute_path(&link_root).expect("absolute symlinked root");
let link_private = link_root
.join("linked-private")
.expect("symlinked linked-private path");
let expected_root = AbsolutePathBuf::from_absolute_path(
real_root.canonicalize().expect("canonicalize real root"),
)
.expect("absolute canonical root");
let expected_linked_private = expected_root
.join("linked-private")
.expect("expected linked-private path");
let unexpected_decoy =
AbsolutePathBuf::from_absolute_path(decoy.canonicalize().expect("canonicalize decoy"))
.expect("absolute canonical decoy");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_root },
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_private },
access: FileSystemAccessMode::None,
},
]);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert_eq!(
writable_roots[0].read_only_subpaths,
vec![expected_linked_private]
);
assert!(
!writable_roots[0]
.read_only_subpaths
.contains(&unexpected_decoy)
);
}
#[cfg(unix)]
#[test]
fn writable_roots_preserve_explicit_symlinked_carveouts_that_escape_root() {
let cwd = TempDir::new().expect("tempdir");
let real_root = cwd.path().join("real");
let link_root = cwd.path().join("link");
let decoy = cwd.path().join("outside-private");
let linked_private = real_root.join("linked-private");
fs::create_dir_all(&decoy).expect("create decoy");
fs::create_dir_all(&real_root).expect("create real root");
symlink_dir(&real_root, &link_root).expect("create symlinked root");
symlink_dir(&decoy, &linked_private).expect("create linked-private symlink");
let link_root =
AbsolutePathBuf::from_absolute_path(&link_root).expect("absolute symlinked root");
let link_private = link_root
.join("linked-private")
.expect("symlinked linked-private path");
let expected_root = AbsolutePathBuf::from_absolute_path(
real_root.canonicalize().expect("canonicalize real root"),
)
.expect("absolute canonical root");
let expected_linked_private = expected_root
.join("linked-private")
.expect("expected linked-private path");
let unexpected_decoy =
AbsolutePathBuf::from_absolute_path(decoy.canonicalize().expect("canonicalize decoy"))
.expect("absolute canonical decoy");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_root },
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_private },
access: FileSystemAccessMode::None,
},
]);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert_eq!(
writable_roots[0].read_only_subpaths,
vec![expected_linked_private]
);
assert!(
!writable_roots[0]
.read_only_subpaths
.contains(&unexpected_decoy)
);
}
#[cfg(unix)]
#[test]
fn writable_roots_preserve_explicit_symlinked_carveouts_that_alias_root() {
let cwd = TempDir::new().expect("tempdir");
let root = cwd.path().join("root");
let alias = root.join("alias-root");
fs::create_dir_all(&root).expect("create root");
symlink_dir(&root, &alias).expect("create alias symlink");
let root = AbsolutePathBuf::from_absolute_path(&root).expect("absolute root");
let alias = root.join("alias-root").expect("alias root path");
let expected_root = AbsolutePathBuf::from_absolute_path(
root.as_path().canonicalize().expect("canonicalize root"),
)
.expect("absolute canonical root");
let expected_alias = expected_root
.join("alias-root")
.expect("expected alias path");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: root },
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: alias },
access: FileSystemAccessMode::None,
},
]);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert_eq!(writable_roots[0].read_only_subpaths, vec![expected_alias]);
}
#[cfg(unix)]
#[test]
fn tmpdir_special_path_canonicalizes_symlinked_tmpdir() {
if std::env::var_os(SYMLINKED_TMPDIR_TEST_ENV).is_none() {
let output = std::process::Command::new(std::env::current_exe().expect("test binary"))
.env(SYMLINKED_TMPDIR_TEST_ENV, "1")
.arg("--exact")
.arg("permissions::tests::tmpdir_special_path_canonicalizes_symlinked_tmpdir")
.output()
.expect("run tmpdir subprocess test");
assert!(
output.status.success(),
"tmpdir subprocess test failed\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
return;
}
let cwd = TempDir::new().expect("tempdir");
let real_tmpdir = cwd.path().join("real-tmpdir");
let link_tmpdir = cwd.path().join("link-tmpdir");
let blocked = real_tmpdir.join("blocked");
let codex_dir = real_tmpdir.join(".codex");
fs::create_dir_all(&blocked).expect("create blocked");
fs::create_dir_all(&codex_dir).expect("create .codex");
symlink_dir(&real_tmpdir, &link_tmpdir).expect("create symlinked tmpdir");
let link_blocked =
AbsolutePathBuf::from_absolute_path(link_tmpdir.join("blocked")).expect("link blocked");
let expected_root = AbsolutePathBuf::from_absolute_path(
real_tmpdir
.canonicalize()
.expect("canonicalize real tmpdir"),
)
.expect("absolute canonical tmpdir");
let expected_blocked = AbsolutePathBuf::from_absolute_path(
blocked.canonicalize().expect("canonicalize blocked"),
)
.expect("absolute canonical blocked");
let expected_codex = AbsolutePathBuf::from_absolute_path(
codex_dir.canonicalize().expect("canonicalize .codex"),
)
.expect("absolute canonical .codex");
unsafe {
std::env::set_var("TMPDIR", &link_tmpdir);
}
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Tmpdir,
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_blocked },
access: FileSystemAccessMode::None,
},
]);
assert_eq!(
policy.get_unreadable_roots_with_cwd(cwd.path()),
vec![expected_blocked.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_blocked)
);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_codex)
);
}
#[test]
fn resolve_access_with_cwd_uses_most_specific_entry() {
let cwd = TempDir::new().expect("tempdir");
@@ -1183,6 +1612,13 @@ mod tests {
let cwd = TempDir::new().expect("tempdir");
let docs =
AbsolutePathBuf::resolve_path_against_base("docs", cwd.path()).expect("resolve docs");
let expected_docs = AbsolutePathBuf::from_absolute_path(
cwd.path()
.canonicalize()
.expect("canonicalize cwd")
.join("docs"),
)
.expect("canonical docs");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
@@ -1200,7 +1636,10 @@ mod tests {
policy.resolve_access_with_cwd(docs.as_path(), cwd.path()),
FileSystemAccessMode::Read
);
assert_eq!(policy.get_readable_roots_with_cwd(cwd.path()), vec![docs]);
assert_eq!(
policy.get_readable_roots_with_cwd(cwd.path()),
vec![expected_docs]
);
assert!(policy.get_unreadable_roots_with_cwd(cwd.path()).is_empty());
}

View File

@@ -3681,9 +3681,9 @@ mod tests {
#[test]
fn restricted_file_system_policy_treats_root_with_carveouts_as_scoped_access() {
let cwd = TempDir::new().expect("tempdir");
let cwd_absolute =
AbsolutePathBuf::from_absolute_path(cwd.path()).expect("absolute tempdir");
let root = cwd_absolute
let canonical_cwd = cwd.path().canonicalize().expect("canonicalize cwd");
let root = AbsolutePathBuf::from_absolute_path(&canonical_cwd)
.expect("absolute canonical tempdir")
.as_path()
.ancestors()
.last()
@@ -3691,6 +3691,13 @@ mod tests {
.expect("filesystem root");
let blocked = AbsolutePathBuf::resolve_path_against_base("blocked", cwd.path())
.expect("resolve blocked");
let expected_blocked = AbsolutePathBuf::from_absolute_path(
cwd.path()
.canonicalize()
.expect("canonicalize cwd")
.join("blocked"),
)
.expect("canonical blocked");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
@@ -3699,9 +3706,7 @@ mod tests {
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: blocked.clone(),
},
path: FileSystemPath::Path { path: blocked },
access: FileSystemAccessMode::None,
},
]);
@@ -3714,7 +3719,7 @@ mod tests {
);
assert_eq!(
policy.get_unreadable_roots_with_cwd(cwd.path()),
vec![blocked.clone()]
vec![expected_blocked.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
@@ -3724,7 +3729,7 @@ mod tests {
writable_roots[0]
.read_only_subpaths
.iter()
.any(|path| path.as_path() == blocked.as_path())
.any(|path| path.as_path() == expected_blocked.as_path())
);
}
@@ -3733,14 +3738,17 @@ mod tests {
let cwd = TempDir::new().expect("tempdir");
std::fs::create_dir_all(cwd.path().join(".agents")).expect("create .agents");
std::fs::create_dir_all(cwd.path().join(".codex")).expect("create .codex");
let canonical_cwd = cwd.path().canonicalize().expect("canonicalize cwd");
let cwd_absolute =
AbsolutePathBuf::from_absolute_path(cwd.path()).expect("absolute tempdir");
AbsolutePathBuf::from_absolute_path(&canonical_cwd).expect("absolute tempdir");
let secret = AbsolutePathBuf::resolve_path_against_base("secret", cwd.path())
.expect("resolve unreadable path");
let agents = AbsolutePathBuf::resolve_path_against_base(".agents", cwd.path())
.expect("resolve .agents");
let codex = AbsolutePathBuf::resolve_path_against_base(".codex", cwd.path())
.expect("resolve .codex");
let expected_secret = AbsolutePathBuf::from_absolute_path(canonical_cwd.join("secret"))
.expect("canonical secret");
let expected_agents = AbsolutePathBuf::from_absolute_path(canonical_cwd.join(".agents"))
.expect("canonical .agents");
let expected_codex = AbsolutePathBuf::from_absolute_path(canonical_cwd.join(".codex"))
.expect("canonical .codex");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
@@ -3755,9 +3763,7 @@ mod tests {
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: secret.clone(),
},
path: FileSystemPath::Path { path: secret },
access: FileSystemAccessMode::None,
},
]);
@@ -3767,43 +3773,49 @@ mod tests {
assert!(policy.include_platform_defaults());
assert_eq!(
policy.get_readable_roots_with_cwd(cwd.path()),
vec![cwd_absolute]
vec![cwd_absolute.clone()]
);
assert_eq!(
policy.get_unreadable_roots_with_cwd(cwd.path()),
vec![secret.clone()]
vec![expected_secret.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root.as_path(), cwd.path());
assert_eq!(writable_roots[0].root, cwd_absolute);
assert!(
writable_roots[0]
.read_only_subpaths
.iter()
.any(|path| path.as_path() == secret.as_path())
.any(|path| path.as_path() == expected_secret.as_path())
);
assert!(
writable_roots[0]
.read_only_subpaths
.iter()
.any(|path| path.as_path() == agents.as_path())
.any(|path| path.as_path() == expected_agents.as_path())
);
assert!(
writable_roots[0]
.read_only_subpaths
.iter()
.any(|path| path.as_path() == codex.as_path())
.any(|path| path.as_path() == expected_codex.as_path())
);
}
#[test]
fn restricted_file_system_policy_treats_read_entries_as_read_only_subpaths() {
let cwd = TempDir::new().expect("tempdir");
let canonical_cwd = cwd.path().canonicalize().expect("canonicalize cwd");
let docs =
AbsolutePathBuf::resolve_path_against_base("docs", cwd.path()).expect("resolve docs");
let docs_public = AbsolutePathBuf::resolve_path_against_base("docs/public", cwd.path())
.expect("resolve docs/public");
let expected_docs = AbsolutePathBuf::from_absolute_path(canonical_cwd.join("docs"))
.expect("canonical docs");
let expected_docs_public =
AbsolutePathBuf::from_absolute_path(canonical_cwd.join("docs/public"))
.expect("canonical docs/public");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
@@ -3812,13 +3824,11 @@ mod tests {
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: docs.clone() },
path: FileSystemPath::Path { path: docs },
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: docs_public.clone(),
},
path: FileSystemPath::Path { path: docs_public },
access: FileSystemAccessMode::Write,
},
]);
@@ -3827,8 +3837,8 @@ mod tests {
assert_eq!(
sorted_writable_roots(policy.get_writable_roots_with_cwd(cwd.path())),
vec![
(cwd.path().to_path_buf(), vec![docs.to_path_buf()]),
(docs_public.to_path_buf(), Vec::new()),
(canonical_cwd, vec![expected_docs.to_path_buf()]),
(expected_docs_public.to_path_buf(), Vec::new()),
]
);
}
@@ -3838,6 +3848,7 @@ mod tests {
let cwd = TempDir::new().expect("tempdir");
let docs =
AbsolutePathBuf::resolve_path_against_base("docs", cwd.path()).expect("resolve docs");
let canonical_cwd = cwd.path().canonicalize().expect("canonicalize cwd");
let policy = SandboxPolicy::WorkspaceWrite {
writable_roots: vec![],
read_only_access: ReadOnlyAccess::Restricted {
@@ -3854,7 +3865,7 @@ mod tests {
FileSystemSandboxPolicy::from_legacy_sandbox_policy(&policy, cwd.path())
.get_writable_roots_with_cwd(cwd.path())
),
vec![(cwd.path().to_path_buf(), Vec::new())]
vec![(canonical_cwd, Vec::new())]
);
}