use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; use codex_login::AuthManager; use codex_login::CodexAuth; use reqwest::StatusCode; use serde::Deserialize; use serde::Serialize; use serde_json::Value; use sha2::Digest as _; use tokio::time::sleep; use tokio_tungstenite::connect_async; use tracing::info; use tracing::warn; use uuid::Uuid; use crate::ExecServerError; use crate::ExecServerRuntimePaths; use crate::connection::JsonRpcConnection; use crate::server::ConnectionProcessor; const PROTOCOL_VERSION: &str = "codex-exec-server-v1"; const ERROR_BODY_PREVIEW_BYTES: usize = 4096; #[derive(Clone)] struct CloudEnvironmentClient { base_url: String, http: reqwest::Client, auth_manager: Arc, } impl std::fmt::Debug for CloudEnvironmentClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CloudEnvironmentClient") .field("base_url", &self.base_url) .finish_non_exhaustive() } } impl CloudEnvironmentClient { fn new(base_url: String, auth_manager: Arc) -> Result { let base_url = normalize_base_url(base_url)?; Ok(Self { base_url, http: reqwest::Client::new(), auth_manager, }) } async fn register_executor( &self, request: &CloudEnvironmentRegisterExecutorRequest, ) -> Result { self.post_json("/api/cloud/executor", request).await } async fn post_json(&self, path: &str, request: &T) -> Result where T: Serialize + Sync, R: for<'de> Deserialize<'de>, { for attempt in 0..=1 { let auth = cloud_environment_chatgpt_auth(&self.auth_manager).await?; let response = self .http .post(endpoint_url(&self.base_url, path)) .bearer_auth(chatgpt_bearer_token(&auth)?) .header("chatgpt-account-id", chatgpt_account_id(&auth)?) .json(request) .send() .await?; if response.status().is_success() { return response.json::().await.map_err(ExecServerError::from); } let status = response.status(); let body = response.text().await.unwrap_or_default(); if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) && attempt == 0 && recover_unauthorized(&self.auth_manager).await { continue; } return Err(cloud_http_error(status, &body)); } unreachable!("cloud environments request loop is bounded to two attempts") } } #[derive(Debug, Clone, Eq, PartialEq, Serialize)] struct CloudEnvironmentRegisterExecutorRequest { idempotency_id: String, #[serde(skip_serializing_if = "Option::is_none")] environment_id: Option, #[serde(skip_serializing_if = "Option::is_none")] name: Option, labels: BTreeMap, metadata: Value, } #[derive(Debug, Clone, Eq, PartialEq, Deserialize)] struct CloudEnvironmentExecutorRegistrationResponse { id: String, environment_id: String, url: String, } /// Configuration for registering an exec-server for remote use. #[derive(Debug, Clone, Eq, PartialEq)] pub struct RemoteExecutorConfig { pub base_url: String, pub environment_id: Option, pub name: String, } impl RemoteExecutorConfig { pub fn new(base_url: String) -> Self { Self { base_url, environment_id: None, name: "codex-exec-server".to_string(), } } fn registration_request( &self, auth: &CodexAuth, registration_id: Uuid, ) -> Result { Ok(CloudEnvironmentRegisterExecutorRequest { idempotency_id: self.default_idempotency_id(auth, registration_id)?, environment_id: self.environment_id.clone(), name: Some(self.name.clone()), labels: BTreeMap::new(), metadata: Value::Object(Default::default()), }) } fn default_idempotency_id( &self, auth: &CodexAuth, registration_id: Uuid, ) -> Result { let mut hasher = sha2::Sha256::new(); hasher.update(chatgpt_account_id(auth)?.as_bytes()); hasher.update(b"\0"); hasher.update(self.environment_id.as_deref().unwrap_or("auto")); hasher.update(b"\0"); hasher.update(self.name.as_bytes()); hasher.update(b"\0"); hasher.update(PROTOCOL_VERSION); hasher.update(b"\0"); hasher.update(registration_id.as_bytes()); let digest = hasher.finalize(); Ok(format!("codex-exec-server-{digest:x}")) } } /// Register an exec-server for remote use and serve requests over the returned /// rendezvous websocket. pub async fn run_remote_executor( config: RemoteExecutorConfig, auth_manager: Arc, runtime_paths: ExecServerRuntimePaths, ) -> Result<(), ExecServerError> { let client = CloudEnvironmentClient::new(config.base_url.clone(), auth_manager.clone())?; let processor = ConnectionProcessor::new(runtime_paths); let registration_id = Uuid::new_v4(); let mut backoff = Duration::from_secs(1); loop { let auth = cloud_environment_chatgpt_auth(&auth_manager).await?; let request = config.registration_request(&auth, registration_id)?; let response = client.register_executor(&request).await?; eprintln!( "codex exec-server remote executor {} registered in environment {}", response.id, response.environment_id ); match connect_async(response.url.as_str()).await { Ok((websocket, _)) => { backoff = Duration::from_secs(1); processor .run_connection(JsonRpcConnection::from_websocket( websocket, "cloud exec-server websocket".to_string(), )) .await; } Err(err) => { warn!("failed to connect cloud exec-server websocket: {err}"); } } sleep(backoff).await; backoff = (backoff * 2).min(Duration::from_secs(30)); } } async fn cloud_environment_chatgpt_auth( auth_manager: &AuthManager, ) -> Result { let mut reloaded = false; let auth = loop { let Some(auth) = auth_manager.auth().await else { if reloaded { return Err(ExecServerError::CloudEnvironmentAuth( "cloud environments require ChatGPT authentication".to_string(), )); } auth_manager.reload().await; reloaded = true; continue; }; if !auth.is_chatgpt_auth() { return Err(ExecServerError::CloudEnvironmentAuth( "cloud environments require ChatGPT authentication; API key auth is not supported" .to_string(), )); } if auth.get_account_id().is_none() && !reloaded { auth_manager.reload().await; reloaded = true; continue; } break auth; }; let _ = chatgpt_bearer_token(&auth)?; let _ = chatgpt_account_id(&auth)?; Ok(auth) } fn chatgpt_bearer_token(auth: &CodexAuth) -> Result { auth.get_token() .map_err(|err| ExecServerError::CloudEnvironmentAuth(err.to_string())) .and_then(|token| { if token.is_empty() { Err(ExecServerError::CloudEnvironmentAuth( "cloud environments require a non-empty ChatGPT bearer token".to_string(), )) } else { Ok(token) } }) } fn chatgpt_account_id(auth: &CodexAuth) -> Result { auth.get_account_id().ok_or_else(|| { ExecServerError::CloudEnvironmentAuth( "cloud environments are waiting for a ChatGPT account id".to_string(), ) }) } async fn recover_unauthorized(auth_manager: &Arc) -> bool { let mut recovery = auth_manager.unauthorized_recovery(); if !recovery.has_next() { return false; } let mode = recovery.mode_name(); let step = recovery.step_name(); match recovery.next().await { Ok(step_result) => { info!( "cloud environment auth recovery succeeded: mode={mode}, step={step}, auth_state_changed={:?}", step_result.auth_state_changed() ); true } Err(err) => { warn!("cloud environment auth recovery failed: mode={mode}, step={step}: {err}"); false } } } #[derive(Deserialize)] struct CloudErrorBody { error: Option, } #[derive(Deserialize)] struct CloudError { code: Option, message: Option, } fn normalize_base_url(base_url: String) -> Result { let trimmed = base_url.trim().trim_end_matches('/').to_string(); if trimmed.is_empty() { return Err(ExecServerError::CloudEnvironmentConfig( "cloud environments base URL is required".to_string(), )); } Ok(trimmed) } fn endpoint_url(base_url: &str, path: &str) -> String { format!("{base_url}/{}", path.trim_start_matches('/')) } fn cloud_http_error(status: StatusCode, body: &str) -> ExecServerError { let parsed = serde_json::from_str::(body).ok(); let (code, message) = parsed .and_then(|body| body.error) .map(|error| { ( error.code, error.message.unwrap_or_else(|| { preview_error_body(body).unwrap_or_else(|| "empty error body".to_string()) }), ) }) .unwrap_or_else(|| { ( None, preview_error_body(body) .unwrap_or_else(|| "empty or malformed error body".to_string()), ) }); ExecServerError::CloudEnvironmentHttp { status, code, message, } } fn preview_error_body(body: &str) -> Option { let trimmed = body.trim(); if trimmed.is_empty() { return None; } Some(trimmed.chars().take(ERROR_BODY_PREVIEW_BYTES).collect()) } #[cfg(test)] mod tests { use codex_login::CodexAuth; use pretty_assertions::assert_eq; use serde_json::json; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; use wiremock::matchers::body_json; use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; use super::*; fn auth_manager() -> Arc { AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()) } #[tokio::test] async fn register_executor_posts_with_chatgpt_auth_headers() { let server = MockServer::start().await; let registration_id = Uuid::from_u128(1); let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); let request = RemoteExecutorConfig::new(server.uri()) .registration_request(&auth, registration_id) .expect("registration request"); let expected_request = serde_json::to_value(&request).expect("serialize request"); Mock::given(method("POST")) .and(path("/api/cloud/executor")) .and(header("authorization", "Bearer Access Token")) .and(header("chatgpt-account-id", "account_id")) .and(body_json(expected_request)) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "id": "exec-1", "environment_id": "env-1", "url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc" }))) .mount(&server) .await; let client = CloudEnvironmentClient::new(server.uri(), auth_manager()).expect("client"); let response = client .register_executor(&request) .await .expect("register executor"); assert_eq!( response, CloudEnvironmentExecutorRegistrationResponse { id: "exec-1".to_string(), environment_id: "env-1".to_string(), url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(), } ); } }