Compare commits

...

4 Commits

Author SHA1 Message Date
viyatb-oai
6f1ae2348d fix: mention cloud-managed requirements in startup error
Co-authored-by: Codex <noreply@openai.com>
2026-04-13 15:05:50 -07:00
viyatb-oai
947338916e fix: satisfy cloud requirements argument lint
Co-authored-by: Codex <noreply@openai.com>
2026-04-13 14:26:46 -07:00
viyatb-oai
c6dd6683d3 test: remove cloud requirements CLI e2e
Co-authored-by: Codex noreply@openai.com
2026-04-13 12:37:21 -07:00
viyatb-oai
553bd29d67 fix: use stale cloud requirements cache fallback
Co-authored-by: Codex noreply@openai.com
2026-04-13 12:33:40 -07:00

View File

@@ -47,6 +47,7 @@ const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5;
const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json";
const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
const CLOUD_REQUIREMENTS_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
const CLOUD_REQUIREMENTS_STALE_CACHE_TTL: Duration = Duration::from_secs(4 * 60 * 60);
const CLOUD_REQUIREMENTS_FETCH_ATTEMPT_METRIC: &str = "codex.cloud_requirements.fetch_attempt";
const CLOUD_REQUIREMENTS_FETCH_FINAL_METRIC: &str = "codex.cloud_requirements.fetch_final";
const CLOUD_REQUIREMENTS_LOAD_METRIC: &str = "codex.cloud_requirements.load";
@@ -136,6 +137,51 @@ impl CloudRequirementsCacheSignedPayload {
.as_deref()
.and_then(|contents| parse_cloud_requirements(contents).ok().flatten())
}
fn is_fresh(&self, now: DateTime<Utc>) -> bool {
self.expires_at > now
}
fn can_use_stale(&self, now: DateTime<Utc>) -> bool {
if self.is_fresh(now) {
return false;
}
let Ok(stale_ttl) = ChronoDuration::from_std(CLOUD_REQUIREMENTS_STALE_CACHE_TTL) else {
return false;
};
self.expires_at
.checked_add_signed(stale_ttl)
.is_some_and(|stale_expires_at| stale_expires_at > now)
}
}
fn is_cloud_requirements_eligible_auth(auth: &CodexAuth) -> bool {
let Some(plan_type) = auth.account_plan_type() else {
return false;
};
auth.is_chatgpt_auth()
&& (plan_type.is_business_like() || matches!(plan_type, PlanType::Enterprise))
}
fn format_cloud_requirements_load_failed_message(
attempt_count: usize,
status_code: Option<u16>,
) -> String {
let status = status_code
.map(|status_code| format!("last backend status: {status_code}"))
.unwrap_or_else(|| "last failure did not include an HTTP status".to_string());
format!(
"{CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE} after {attempt_count} attempts ({status}). Codex could not use a valid cached copy for this account, so startup stopped to avoid running without cloud-managed requirements. Please try again."
)
}
fn format_cloud_requirements_timeout_message(timeout: Duration) -> String {
format!(
"timed out waiting for workspace-managed config after {}s. Codex could not use a valid cached copy for this account, so startup stopped to avoid running without cloud-managed requirements. Please try again.",
timeout.as_secs()
)
}
fn sign_cache_payload(payload_bytes: &[u8]) -> Option<String> {
let mut mac = HmacSha256::new_from_slice(CLOUD_REQUIREMENTS_CACHE_WRITE_HMAC_KEY).ok()?;
@@ -274,26 +320,29 @@ impl CloudRequirementsService {
let _timer =
codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]);
let started_at = Instant::now();
let fetch_result = timeout(self.timeout, self.fetch())
.await
.inspect_err(|_| {
let message = format!(
"Timed out waiting for cloud requirements after {}s",
self.timeout.as_secs()
);
tracing::error!("{message}");
emit_load_metric("startup", "error");
})
.map_err(|_| {
CloudRequirementsLoadError::new(
let fetch_result = match timeout(self.timeout, self.fetch()).await {
Ok(fetch_result) => fetch_result,
Err(_) => {
let err = CloudRequirementsLoadError::new(
CloudRequirementsLoadErrorCode::Timeout,
/*status_code*/ None,
format!(
"timed out waiting for cloud requirements after {}s",
self.timeout.as_secs()
),
)
})?;
format_cloud_requirements_timeout_message(self.timeout),
);
if let Some(stale_cache) = self.load_stale_cache_for_current_auth().await {
tracing::warn!(
path = %self.cache_path.display(),
expires_at = %stale_cache.expires_at,
error = %err,
"Using stale cloud requirements cache after remote load timed out"
);
Ok(stale_cache.requirements())
} else {
tracing::error!("{err}");
emit_load_metric("startup", "error");
return Err(err);
}
}
};
let result = match fetch_result {
Ok(result) => result,
@@ -328,33 +377,57 @@ impl CloudRequirementsService {
let Some(auth) = self.auth_manager.auth().await else {
return Ok(None);
};
let Some(plan_type) = auth.account_plan_type() else {
return Ok(None);
};
if !auth.is_chatgpt_auth()
|| !(plan_type.is_business_like() || matches!(plan_type, PlanType::Enterprise))
{
if !is_cloud_requirements_eligible_auth(&auth) {
return Ok(None);
}
let (chatgpt_user_id, account_id) = auth_identity(&auth);
match self
.load_cache(chatgpt_user_id.as_deref(), account_id.as_deref())
let cache_payload = match self
.load_cache_payload(chatgpt_user_id.as_deref(), account_id.as_deref())
.await
{
Ok(signed_payload) => {
tracing::info!(
path = %self.cache_path.display(),
"Using cached cloud requirements"
);
return Ok(signed_payload.requirements());
let now = Utc::now();
if signed_payload.is_fresh(now) {
tracing::info!(
path = %self.cache_path.display(),
"Using cached cloud requirements"
);
return Ok(signed_payload.requirements());
}
self.log_cache_load_status(&CacheLoadStatus::CacheExpired);
Some(signed_payload)
}
Err(cache_load_status) => {
self.log_cache_load_status(&cache_load_status);
None
}
};
let stale_cache =
cache_payload.filter(|signed_payload| signed_payload.can_use_stale(Utc::now()));
match self.fetch_with_retries(auth, "startup").await {
Ok(requirements) => Ok(requirements),
Err(err) => {
if err.code() == CloudRequirementsLoadErrorCode::RequestFailed
&& let Some(stale_cache) = stale_cache
{
tracing::warn!(
path = %self.cache_path.display(),
expires_at = %stale_cache.expires_at,
error = %err,
"Using stale cloud requirements cache after remote load failed"
);
return Ok(stale_cache.requirements());
}
tracing::error!(
path = %self.cache_path.display(),
error = %err,
"Failed to load cloud requirements"
);
Err(err)
}
}
self.fetch_with_retries(auth, "startup").await
}
async fn fetch_with_retries(
@@ -517,17 +590,41 @@ impl CloudRequirementsService {
CLOUD_REQUIREMENTS_MAX_ATTEMPTS,
last_status_code,
);
tracing::error!(
path = %self.cache_path.display(),
"{CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE}"
);
Err(CloudRequirementsLoadError::new(
CloudRequirementsLoadErrorCode::RequestFailed,
last_status_code,
CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE,
format_cloud_requirements_load_failed_message(
CLOUD_REQUIREMENTS_MAX_ATTEMPTS,
last_status_code,
),
))
}
async fn load_stale_cache_for_current_auth(
&self,
) -> Option<CloudRequirementsCacheSignedPayload> {
let auth = self.auth_manager.auth().await?;
if !is_cloud_requirements_eligible_auth(&auth) {
return None;
}
let (chatgpt_user_id, account_id) = auth_identity(&auth);
let cache_payload = match self
.load_cache_payload(chatgpt_user_id.as_deref(), account_id.as_deref())
.await
{
Ok(cache_payload) => cache_payload,
Err(cache_load_status) => {
self.log_cache_load_status(&cache_load_status);
return None;
}
};
cache_payload
.can_use_stale(Utc::now())
.then_some(cache_payload)
}
async fn refresh_cache_in_background(&self) {
loop {
sleep(CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL).await;
@@ -548,12 +645,7 @@ impl CloudRequirementsService {
let Some(auth) = self.auth_manager.auth().await else {
return false;
};
let Some(plan_type) = auth.account_plan_type() else {
return false;
};
if !auth.is_chatgpt_auth()
|| !(plan_type.is_business_like() || matches!(plan_type, PlanType::Enterprise))
{
if !is_cloud_requirements_eligible_auth(&auth) {
return false;
}
@@ -571,7 +663,7 @@ impl CloudRequirementsService {
true
}
async fn load_cache(
async fn load_cache_payload(
&self,
chatgpt_user_id: Option<&str>,
account_id: Option<&str>,
@@ -619,10 +711,6 @@ impl CloudRequirementsService {
return Err(CacheLoadStatus::CacheIdentityMismatch);
}
if cache_file.signed_payload.expires_at <= Utc::now() {
return Err(CacheLoadStatus::CacheExpired);
}
Ok(cache_file.signed_payload)
}
@@ -846,6 +934,20 @@ mod tests {
Ok(())
}
fn write_signed_cache(codex_home: &Path, signed_payload: CloudRequirementsCacheSignedPayload) {
let payload_bytes = cache_payload_bytes(&signed_payload).expect("payload");
let signature = sign_cache_payload(&payload_bytes).expect("sign payload");
let cache_file = CloudRequirementsCacheFile {
signed_payload,
signature,
};
std::fs::write(
codex_home.join(CLOUD_REQUIREMENTS_CACHE_FILENAME),
serde_json::to_vec_pretty(&cache_file).expect("serialize cache"),
)
.expect("write cache");
}
fn auth_manager_with_api_key() -> Arc<AuthManager> {
let tmp = tempdir().expect("tempdir");
let auth_json = json!({
@@ -1007,6 +1109,19 @@ mod tests {
FetchAttemptError::Retryable(RetryableFailureKind::Request { status_code: None })
}
fn request_error_with_status(status_code: u16) -> FetchAttemptError {
FetchAttemptError::Retryable(RetryableFailureKind::Request {
status_code: Some(status_code),
})
}
fn approval_policy_requirements(policy: AskForApproval) -> ConfigRequirementsToml {
ConfigRequirementsToml {
allowed_approval_policies: Some(vec![policy]),
..Default::default()
}
}
struct StaticFetcher {
contents: Option<String>,
}
@@ -1322,7 +1437,11 @@ enabled = false
let err = result.expect_err("cloud requirements timeout should fail closed");
assert!(
err.to_string()
.contains("timed out waiting for cloud requirements")
.contains("timed out waiting for workspace-managed config")
);
assert!(
err.to_string()
.contains("Codex could not use a valid cached copy")
);
}
@@ -1951,6 +2070,125 @@ enabled = false
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
#[tokio::test(start_paused = true)]
async fn fetch_cloud_requirements_uses_stale_cache_when_remote_fetch_fails() {
let codex_home = tempdir().expect("tempdir");
write_signed_cache(
codex_home.path(),
CloudRequirementsCacheSignedPayload {
cached_at: Utc::now() - ChronoDuration::minutes(31),
expires_at: Utc::now() - ChronoDuration::minutes(1),
chatgpt_user_id: Some("user-12345".to_string()),
account_id: Some("account-12345".to_string()),
contents: Some(r#"allowed_approval_policies = ["on-request"]"#.to_string()),
},
);
let retryable_error = Err(request_error_with_status(/*status_code*/ 503));
let fetcher = Arc::new(SequenceFetcher::new(vec![
retryable_error;
CLOUD_REQUIREMENTS_MAX_ATTEMPTS
]));
let service = CloudRequirementsService::new(
auth_manager_with_plan("business"),
fetcher.clone(),
codex_home.path().to_path_buf(),
CLOUD_REQUIREMENTS_TIMEOUT,
);
let handle = tokio::spawn(async move { service.fetch().await });
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
assert_eq!(
handle.await.expect("cloud requirements task"),
Ok(Some(approval_policy_requirements(
AskForApproval::OnRequest
)))
);
assert_eq!(
fetcher.request_count.load(Ordering::SeqCst),
CLOUD_REQUIREMENTS_MAX_ATTEMPTS
);
}
#[tokio::test(start_paused = true)]
async fn fetch_cloud_requirements_rejects_stale_cache_after_grace_period() {
let codex_home = tempdir().expect("tempdir");
let stale_ttl =
ChronoDuration::from_std(CLOUD_REQUIREMENTS_STALE_CACHE_TTL).expect("stale cache TTL");
write_signed_cache(
codex_home.path(),
CloudRequirementsCacheSignedPayload {
cached_at: Utc::now() - stale_ttl - ChronoDuration::minutes(31),
expires_at: Utc::now() - stale_ttl - ChronoDuration::minutes(1),
chatgpt_user_id: Some("user-12345".to_string()),
account_id: Some("account-12345".to_string()),
contents: Some(r#"allowed_approval_policies = ["on-request"]"#.to_string()),
},
);
let retryable_error = Err(request_error_with_status(/*status_code*/ 503));
let fetcher = Arc::new(SequenceFetcher::new(vec![
retryable_error;
CLOUD_REQUIREMENTS_MAX_ATTEMPTS
]));
let service = CloudRequirementsService::new(
auth_manager_with_plan("business"),
fetcher.clone(),
codex_home.path().to_path_buf(),
CLOUD_REQUIREMENTS_TIMEOUT,
);
let handle = tokio::spawn(async move { service.fetch().await });
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
let err = handle
.await
.expect("cloud requirements task")
.expect_err("too-stale cache should not be used");
let err_text = err.to_string();
assert!(err_text.contains("failed to load your workspace-managed config after 5 attempts"));
assert!(err_text.contains("last backend status: 503"));
assert!(err_text.contains("valid cached copy"));
assert_eq!(err.code(), CloudRequirementsLoadErrorCode::RequestFailed);
}
#[tokio::test(start_paused = true)]
async fn fetch_cloud_requirements_uses_stale_cache_when_remote_fetch_times_out() {
let codex_home = tempdir().expect("tempdir");
write_signed_cache(
codex_home.path(),
CloudRequirementsCacheSignedPayload {
cached_at: Utc::now() - ChronoDuration::minutes(31),
expires_at: Utc::now() - ChronoDuration::minutes(1),
chatgpt_user_id: Some("user-12345".to_string()),
account_id: Some("account-12345".to_string()),
contents: Some(r#"allowed_approval_policies = ["on-request"]"#.to_string()),
},
);
let service = CloudRequirementsService::new(
auth_manager_with_plan("business"),
Arc::new(PendingFetcher),
codex_home.path().to_path_buf(),
CLOUD_REQUIREMENTS_TIMEOUT,
);
let handle = tokio::spawn(async move { service.fetch_with_timeout().await });
tokio::time::advance(CLOUD_REQUIREMENTS_TIMEOUT + Duration::from_millis(1)).await;
assert_eq!(
handle.await.expect("cloud requirements task"),
Ok(Some(approval_policy_requirements(
AskForApproval::OnRequest
)))
);
}
#[tokio::test]
async fn fetch_cloud_requirements_writes_signed_cache() {
let codex_home = tempdir().expect("tempdir");
@@ -2048,10 +2286,10 @@ enabled = false
.await
.expect("cloud requirements task")
.expect_err("cloud requirements retry exhaustion should fail closed");
assert_eq!(
err.to_string(),
"failed to load your workspace-managed config"
);
let err_text = err.to_string();
assert!(err_text.contains("failed to load your workspace-managed config after 5 attempts"));
assert!(err_text.contains("last failure did not include an HTTP status"));
assert!(err_text.contains("valid cached copy"));
assert_eq!(err.code(), CloudRequirementsLoadErrorCode::RequestFailed);
assert_eq!(
fetcher.request_count.load(Ordering::SeqCst),