mirror of
https://github.com/openai/codex.git
synced 2026-03-29 19:46:30 +03:00
Compare commits
1 Commits
dev/friel/
...
dev/cc/aut
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c60230ec9e |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2226,6 +2226,7 @@ dependencies = [
|
||||
"codex-terminal-detection",
|
||||
"codex-utils-template",
|
||||
"core_test_support",
|
||||
"fd-lock",
|
||||
"keyring",
|
||||
"once_cell",
|
||||
"os_info",
|
||||
|
||||
@@ -214,6 +214,7 @@ encoding_rs = "0.8.35"
|
||||
env-flags = "0.1.1"
|
||||
env_logger = "0.11.9"
|
||||
eventsource-stream = "0.2.3"
|
||||
fd-lock = "4.0.4"
|
||||
futures = { version = "0.3", default-features = false }
|
||||
gethostname = "1.1.0"
|
||||
globset = "0.4"
|
||||
|
||||
@@ -18,6 +18,7 @@ codex-keyring-store = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-utils-template = { workspace = true }
|
||||
fd-lock = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::auth::refresh_lock::auth_refresh_lock_path;
|
||||
use crate::auth::storage::FileAuthStorage;
|
||||
use crate::auth::storage::get_auth_file;
|
||||
use crate::token_data::IdTokenInfo;
|
||||
@@ -7,12 +8,20 @@ use crate::token_data::PlanType as InternalPlanType;
|
||||
use codex_protocol::account::PlanType as AccountPlanType;
|
||||
|
||||
use base64::Engine;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::config_types::ForcedLoginMethod;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tempfile::NamedTempFile;
|
||||
use tempfile::tempdir;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_without_id_token() {
|
||||
@@ -240,12 +249,209 @@ fn refresh_failure_is_scoped_to_the_matching_auth_snapshot() {
|
||||
assert_eq!(manager.refresh_failure_for_auth(&updated_auth), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(auth_refresh)]
|
||||
async fn managed_refresh_serializes_across_managers() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.set_delay(std::time::Duration::from_millis(200))
|
||||
.set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
})),
|
||||
)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let context = RefreshTokenTestContext::new(&server);
|
||||
context.write_auth(&managed_auth_dot_json(
|
||||
"initial-access-token",
|
||||
"initial-refresh-token",
|
||||
"org_mine",
|
||||
));
|
||||
|
||||
let manager_a = AuthManager::shared(
|
||||
context.codex_home.path().to_path_buf(),
|
||||
false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
);
|
||||
let manager_b = AuthManager::shared(
|
||||
context.codex_home.path().to_path_buf(),
|
||||
false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
);
|
||||
|
||||
let (result_a, result_b) = tokio::join!(manager_a.refresh_token(), manager_b.refresh_token());
|
||||
result_a.expect("first refresh should succeed");
|
||||
result_b.expect("second refresh should observe persisted auth");
|
||||
|
||||
let stored = context.load_auth();
|
||||
let stored_tokens = stored.tokens.expect("stored tokens");
|
||||
assert_eq!(stored_tokens.access_token, "new-access-token");
|
||||
assert_eq!(stored_tokens.refresh_token, "new-refresh-token");
|
||||
|
||||
let cached_a = manager_a
|
||||
.auth_cached()
|
||||
.expect("first manager auth cached")
|
||||
.get_token_data()
|
||||
.expect("first manager token data");
|
||||
let cached_b = manager_b
|
||||
.auth_cached()
|
||||
.expect("second manager auth cached")
|
||||
.get_token_data()
|
||||
.expect("second manager token data");
|
||||
assert_eq!(cached_a, stored_tokens);
|
||||
assert_eq!(cached_b, stored_tokens);
|
||||
|
||||
assert!(auth_refresh_lock_path(context.codex_home.path()).exists());
|
||||
server.verify().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial(auth_refresh)]
|
||||
async fn managed_refresh_returns_transient_error_when_lock_file_cannot_be_opened() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/oauth/token"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token"
|
||||
})))
|
||||
.expect(0)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let endpoint = format!("{}/oauth/token", server.uri());
|
||||
let _guard = EnvVarGuard::set(REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, endpoint.as_str());
|
||||
let codex_home = NamedTempFile::new().expect("temp file");
|
||||
let auth = CodexAuth::from_auth_dot_json(
|
||||
codex_home.path(),
|
||||
managed_auth_dot_json("initial-access-token", "initial-refresh-token", "org_mine"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("managed auth");
|
||||
let manager =
|
||||
AuthManager::from_auth_for_testing_with_home(auth, codex_home.path().to_path_buf());
|
||||
|
||||
let err = manager
|
||||
.refresh_token()
|
||||
.await
|
||||
.expect_err("lock open should fail");
|
||||
assert!(matches!(err, RefreshTokenError::Transient(_)));
|
||||
|
||||
server.verify().await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn managed_refresh_lock_path_is_shared_across_persistent_store_modes() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let expected = auth_refresh_lock_path(codex_home.path());
|
||||
|
||||
assert_eq!(expected, codex_home.path().join("auth-refresh.lock"));
|
||||
for mode in [
|
||||
AuthCredentialsStoreMode::File,
|
||||
AuthCredentialsStoreMode::Auto,
|
||||
AuthCredentialsStoreMode::Keyring,
|
||||
] {
|
||||
let _ = mode;
|
||||
assert_eq!(auth_refresh_lock_path(codex_home.path()), expected);
|
||||
}
|
||||
}
|
||||
|
||||
struct AuthFileParams {
|
||||
openai_api_key: Option<String>,
|
||||
chatgpt_plan_type: Option<String>,
|
||||
chatgpt_account_id: Option<String>,
|
||||
}
|
||||
|
||||
struct RefreshTokenTestContext {
|
||||
codex_home: tempfile::TempDir,
|
||||
_env_guard: EnvVarGuard,
|
||||
}
|
||||
|
||||
impl RefreshTokenTestContext {
|
||||
fn new(server: &MockServer) -> Self {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let endpoint = format!("{}/oauth/token", server.uri());
|
||||
let env_guard = EnvVarGuard::set(REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, endpoint.as_str());
|
||||
Self {
|
||||
codex_home,
|
||||
_env_guard: env_guard,
|
||||
}
|
||||
}
|
||||
|
||||
fn write_auth(&self, auth_dot_json: &AuthDotJson) {
|
||||
save_auth(
|
||||
self.codex_home.path(),
|
||||
auth_dot_json,
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("save auth");
|
||||
}
|
||||
|
||||
fn load_auth(&self) -> AuthDotJson {
|
||||
load_auth_dot_json(self.codex_home.path(), AuthCredentialsStoreMode::File)
|
||||
.expect("load auth")
|
||||
.expect("auth should exist")
|
||||
}
|
||||
}
|
||||
|
||||
fn managed_auth_dot_json(access_token: &str, refresh_token: &str, account_id: &str) -> AuthDotJson {
|
||||
let id_token = fake_id_token(Some("pro"), Some(account_id));
|
||||
AuthDotJson {
|
||||
auth_mode: Some(ApiAuthMode::Chatgpt),
|
||||
openai_api_key: None,
|
||||
tokens: Some(TokenData {
|
||||
id_token: crate::token_data::parse_chatgpt_jwt_claims(&id_token)
|
||||
.expect("fake id token should parse"),
|
||||
access_token: access_token.to_string(),
|
||||
refresh_token: refresh_token.to_string(),
|
||||
account_id: Some(account_id.to_string()),
|
||||
}),
|
||||
last_refresh: Some(Utc::now() - Duration::days(1)),
|
||||
}
|
||||
}
|
||||
|
||||
fn fake_id_token(chatgpt_plan_type: Option<&str>, chatgpt_account_id: Option<&str>) -> String {
|
||||
#[derive(Serialize)]
|
||||
struct Header {
|
||||
alg: &'static str,
|
||||
typ: &'static str,
|
||||
}
|
||||
|
||||
let header = Header {
|
||||
alg: "none",
|
||||
typ: "JWT",
|
||||
};
|
||||
let mut auth_payload = serde_json::json!({
|
||||
"chatgpt_user_id": "user-12345",
|
||||
"user_id": "user-12345",
|
||||
});
|
||||
if let Some(chatgpt_plan_type) = chatgpt_plan_type {
|
||||
auth_payload["chatgpt_plan_type"] =
|
||||
serde_json::Value::String(chatgpt_plan_type.to_string());
|
||||
}
|
||||
if let Some(chatgpt_account_id) = chatgpt_account_id {
|
||||
auth_payload["chatgpt_account_id"] =
|
||||
serde_json::Value::String(chatgpt_account_id.to_string());
|
||||
}
|
||||
|
||||
let payload = serde_json::json!({
|
||||
"email": "user@example.com",
|
||||
"email_verified": true,
|
||||
"https://api.openai.com/auth": auth_payload,
|
||||
});
|
||||
let encode = |bytes: &[u8]| base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes);
|
||||
let header_b64 = encode(&serde_json::to_vec(&header).expect("serialize header"));
|
||||
let payload_b64 = encode(&serde_json::to_vec(&payload).expect("serialize payload"));
|
||||
let signature_b64 = encode(b"sig");
|
||||
format!("{header_b64}.{payload_b64}.{signature_b64}")
|
||||
}
|
||||
|
||||
fn write_auth_file(params: AuthFileParams, codex_home: &Path) -> std::io::Result<String> {
|
||||
let auth_file = get_auth_file(codex_home);
|
||||
// Create a minimal valid JWT for the id_token field.
|
||||
|
||||
@@ -19,6 +19,8 @@ use codex_protocol::config_types::ForcedLoginMethod;
|
||||
|
||||
use crate::auth::error::RefreshTokenFailedError;
|
||||
use crate::auth::error::RefreshTokenFailedReason;
|
||||
use crate::auth::refresh_lock::AuthRefreshLockGuard;
|
||||
use crate::auth::refresh_lock::auth_refresh_lock_path;
|
||||
pub use crate::auth::storage::AuthCredentialsStoreMode;
|
||||
pub use crate::auth::storage::AuthDotJson;
|
||||
use crate::auth::storage::AuthStorageBackend;
|
||||
@@ -846,6 +848,12 @@ enum ReloadOutcome {
|
||||
Skipped,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum ManagedRefreshOutcome {
|
||||
ReloadedChanged,
|
||||
Refreshed,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum UnauthorizedRecoveryMode {
|
||||
Managed,
|
||||
@@ -1312,29 +1320,22 @@ impl AuthManager {
|
||||
/// can assume that some other instance already refreshed it. If the persisted
|
||||
/// token is the same as the cached, then ask the token authority to refresh.
|
||||
pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> {
|
||||
let _refresh_guard = self.refresh_lock.lock().await;
|
||||
let auth_before_reload = self.auth_cached();
|
||||
if auth_before_reload
|
||||
.as_ref()
|
||||
.is_some_and(CodexAuth::is_api_key_auth)
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
let expected_account_id = auth_before_reload
|
||||
.as_ref()
|
||||
.and_then(CodexAuth::get_account_id);
|
||||
let auth_before_reload = match self.auth_cached() {
|
||||
Some(auth) => auth,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
match self.reload_if_account_id_matches(expected_account_id.as_deref()) {
|
||||
ReloadOutcome::ReloadedChanged => {
|
||||
tracing::info!("Skipping token refresh because auth changed after guarded reload.");
|
||||
match auth_before_reload {
|
||||
CodexAuth::ApiKey(_) => Ok(()),
|
||||
CodexAuth::Chatgpt(_) => {
|
||||
let expected_account_id = auth_before_reload.get_account_id();
|
||||
self.refresh_managed_auth(expected_account_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
ReloadOutcome::ReloadedNoChange => self.refresh_token_from_authority_impl().await,
|
||||
ReloadOutcome::Skipped => {
|
||||
Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE.to_string(),
|
||||
)))
|
||||
CodexAuth::ChatgptAuthTokens(_) => {
|
||||
let _refresh_guard = self.refresh_lock.lock().await;
|
||||
self.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1344,8 +1345,83 @@ impl AuthManager {
|
||||
/// observe refreshed token. If the token refresh fails, returns the error to
|
||||
/// the caller.
|
||||
pub async fn refresh_token_from_authority(&self) -> Result<(), RefreshTokenError> {
|
||||
let auth = match self.auth_cached() {
|
||||
Some(auth) => auth,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
match auth {
|
||||
CodexAuth::ApiKey(_) => Ok(()),
|
||||
CodexAuth::Chatgpt(_) => {
|
||||
let expected_account_id = auth.get_account_id();
|
||||
self.refresh_managed_auth(expected_account_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
CodexAuth::ChatgptAuthTokens(_) => {
|
||||
let _refresh_guard = self.refresh_lock.lock().await;
|
||||
self.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_managed_auth(
|
||||
&self,
|
||||
expected_account_id: Option<String>,
|
||||
) -> Result<ManagedRefreshOutcome, RefreshTokenError> {
|
||||
let _refresh_guard = self.refresh_lock.lock().await;
|
||||
self.refresh_token_from_authority_impl().await
|
||||
let lock_path = auth_refresh_lock_path(&self.codex_home);
|
||||
tracing::info!(
|
||||
path = %lock_path.display(),
|
||||
"Waiting for managed auth refresh inter-process lock."
|
||||
);
|
||||
let managed_refresh_guard = AuthRefreshLockGuard::acquire(&self.codex_home)
|
||||
.await
|
||||
.map_err(RefreshTokenError::Transient)?;
|
||||
tracing::info!(
|
||||
path = %lock_path.display(),
|
||||
"Acquired managed auth refresh inter-process lock."
|
||||
);
|
||||
|
||||
let result = self
|
||||
.refresh_managed_auth_while_locked(expected_account_id.as_deref())
|
||||
.await;
|
||||
let release_result = managed_refresh_guard
|
||||
.release()
|
||||
.await
|
||||
.map_err(RefreshTokenError::Transient);
|
||||
tracing::info!(
|
||||
path = %lock_path.display(),
|
||||
"Released managed auth refresh inter-process lock."
|
||||
);
|
||||
|
||||
match (result, release_result) {
|
||||
(Err(err), _) => Err(err),
|
||||
(Ok(_), Err(err)) => Err(err),
|
||||
(Ok(outcome), Ok(())) => Ok(outcome),
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_managed_auth_while_locked(
|
||||
&self,
|
||||
expected_account_id: Option<&str>,
|
||||
) -> Result<ManagedRefreshOutcome, RefreshTokenError> {
|
||||
match self.reload_if_account_id_matches(expected_account_id) {
|
||||
ReloadOutcome::ReloadedChanged => {
|
||||
tracing::info!("Skipping token refresh because auth changed after guarded reload.");
|
||||
Ok(ManagedRefreshOutcome::ReloadedChanged)
|
||||
}
|
||||
ReloadOutcome::ReloadedNoChange => {
|
||||
self.refresh_token_from_authority_impl().await?;
|
||||
Ok(ManagedRefreshOutcome::Refreshed)
|
||||
}
|
||||
ReloadOutcome::Skipped => {
|
||||
Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE.to_string(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_token_from_authority_impl(&self) -> Result<(), RefreshTokenError> {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod default_client;
|
||||
pub mod error;
|
||||
mod refresh_lock;
|
||||
mod storage;
|
||||
mod util;
|
||||
|
||||
|
||||
116
codex-rs/login/src/auth/refresh_lock.rs
Normal file
116
codex-rs/login/src/auth/refresh_lock.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use fd_lock::RwLock;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const AUTH_REFRESH_LOCK_FILE: &str = "auth-refresh.lock";
|
||||
|
||||
pub(super) fn auth_refresh_lock_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join(AUTH_REFRESH_LOCK_FILE)
|
||||
}
|
||||
|
||||
pub(super) fn open_auth_refresh_lock(codex_home: &Path) -> std::io::Result<RwLock<File>> {
|
||||
std::fs::create_dir_all(codex_home)?;
|
||||
let lock_path = auth_refresh_lock_path(codex_home);
|
||||
let mut options = OpenOptions::new();
|
||||
options.create(true).read(true).write(true).truncate(false);
|
||||
#[cfg(unix)]
|
||||
{
|
||||
options.mode(0o600);
|
||||
}
|
||||
let file = options.open(lock_path)?;
|
||||
Ok(RwLock::new(file))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct AuthRefreshLockGuard {
|
||||
release_tx: Option<oneshot::Sender<()>>,
|
||||
join_handle: JoinHandle<std::io::Result<()>>,
|
||||
}
|
||||
|
||||
impl AuthRefreshLockGuard {
|
||||
pub(super) async fn acquire(codex_home: &Path) -> std::io::Result<Self> {
|
||||
let codex_home = codex_home.to_path_buf();
|
||||
let (acquired_tx, acquired_rx) = oneshot::channel();
|
||||
let (release_tx, release_rx) = oneshot::channel();
|
||||
let join_handle = tokio::task::spawn_blocking(move || {
|
||||
let mut lock = open_auth_refresh_lock(&codex_home)?;
|
||||
let _guard = lock.write()?;
|
||||
let _ = acquired_tx.send(Ok(()));
|
||||
let _ = release_rx.blocking_recv();
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match acquired_rx.await {
|
||||
Ok(Ok(())) => Ok(Self {
|
||||
release_tx: Some(release_tx),
|
||||
join_handle,
|
||||
}),
|
||||
Ok(Err(err)) => {
|
||||
let _ = join_handle.await;
|
||||
Err(err)
|
||||
}
|
||||
Err(_) => Err(std::io::Error::other(
|
||||
"auth refresh lock acquisition task ended unexpectedly",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn release(mut self) -> std::io::Result<()> {
|
||||
if let Some(release_tx) = self.release_tx.take() {
|
||||
let _ = release_tx.send(());
|
||||
}
|
||||
self.join_handle
|
||||
.await
|
||||
.map_err(|err| std::io::Error::other(format!("auth refresh lock task failed: {err}")))?
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn auth_refresh_lock_path_is_under_codex_home() {
|
||||
let codex_home = PathBuf::from("/tmp/example-codex-home");
|
||||
|
||||
assert_eq!(
|
||||
auth_refresh_lock_path(&codex_home),
|
||||
codex_home.join(AUTH_REFRESH_LOCK_FILE)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_auth_refresh_lock_creates_lock_file() {
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
|
||||
open_auth_refresh_lock(codex_home.path()).expect("open lock");
|
||||
|
||||
assert!(auth_refresh_lock_path(codex_home.path()).exists());
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn open_auth_refresh_lock_creates_unix_private_permissions() {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let codex_home = tempdir().expect("tempdir");
|
||||
let lock_path = auth_refresh_lock_path(codex_home.path());
|
||||
|
||||
open_auth_refresh_lock(codex_home.path()).expect("open lock");
|
||||
|
||||
let mode = std::fs::metadata(lock_path)
|
||||
.expect("lock metadata")
|
||||
.permissions()
|
||||
.mode()
|
||||
& 0o777;
|
||||
assert_eq!(mode, 0o600);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user