mirror of
https://github.com/openai/codex.git
synced 2026-04-30 19:32:04 +03:00
Add cached environment manager for exec server URL (#15785)
Add environment manager that is a singleton and is created early in app-server (before skill manager, before config loading). Use an environment variable to point to a running exec server.
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::RemoteExecServerConnectArgs;
|
||||
@@ -10,13 +12,49 @@ use crate::process::ExecProcess;
|
||||
use crate::remote_file_system::RemoteFileSystem;
|
||||
use crate::remote_process::RemoteProcess;
|
||||
|
||||
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
|
||||
pub trait ExecutorEnvironment: Send + Sync {
|
||||
fn get_executor(&self) -> Arc<dyn ExecProcess>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
current_environment: OnceCell<Arc<Environment>>,
|
||||
}
|
||||
|
||||
impl EnvironmentManager {
|
||||
pub fn new(exec_server_url: Option<String>) -> Self {
|
||||
Self {
|
||||
exec_server_url: normalize_exec_server_url(exec_server_url),
|
||||
current_environment: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_env() -> Self {
|
||||
Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok())
|
||||
}
|
||||
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
}
|
||||
|
||||
pub async fn current(&self) -> Result<Arc<Environment>, ExecServerError> {
|
||||
self.current_environment
|
||||
.get_or_try_init(|| async {
|
||||
Ok(Arc::new(
|
||||
Environment::create(self.exec_server_url.clone()).await?,
|
||||
))
|
||||
})
|
||||
.await
|
||||
.map(Arc::clone)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Environment {
|
||||
experimental_exec_server_url: Option<String>,
|
||||
exec_server_url: Option<String>,
|
||||
remote_exec_server_client: Option<ExecServerClient>,
|
||||
executor: Arc<dyn ExecProcess>,
|
||||
}
|
||||
@@ -32,7 +70,7 @@ impl Default for Environment {
|
||||
}
|
||||
|
||||
Self {
|
||||
experimental_exec_server_url: None,
|
||||
exec_server_url: None,
|
||||
remote_exec_server_client: None,
|
||||
executor: Arc::new(local_process),
|
||||
}
|
||||
@@ -42,19 +80,15 @@ impl Default for Environment {
|
||||
impl std::fmt::Debug for Environment {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Environment")
|
||||
.field(
|
||||
"experimental_exec_server_url",
|
||||
&self.experimental_exec_server_url,
|
||||
)
|
||||
.field("exec_server_url", &self.exec_server_url)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Environment {
|
||||
pub async fn create(
|
||||
experimental_exec_server_url: Option<String>,
|
||||
) -> Result<Self, ExecServerError> {
|
||||
let remote_exec_server_client = if let Some(url) = &experimental_exec_server_url {
|
||||
pub async fn create(exec_server_url: Option<String>) -> Result<Self, ExecServerError> {
|
||||
let exec_server_url = normalize_exec_server_url(exec_server_url);
|
||||
let remote_exec_server_client = if let Some(url) = &exec_server_url {
|
||||
Some(
|
||||
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
|
||||
websocket_url: url.clone(),
|
||||
@@ -83,14 +117,14 @@ impl Environment {
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
experimental_exec_server_url,
|
||||
exec_server_url,
|
||||
remote_exec_server_client,
|
||||
executor,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn experimental_exec_server_url(&self) -> Option<&str> {
|
||||
self.experimental_exec_server_url.as_deref()
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
}
|
||||
|
||||
pub fn get_executor(&self) -> Arc<dyn ExecProcess> {
|
||||
@@ -106,6 +140,13 @@ impl Environment {
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_exec_server_url(exec_server_url: Option<String>) -> Option<String> {
|
||||
exec_server_url.and_then(|url| {
|
||||
let url = url.trim();
|
||||
(!url.is_empty()).then(|| url.to_string())
|
||||
})
|
||||
}
|
||||
|
||||
impl ExecutorEnvironment for Environment {
|
||||
fn get_executor(&self) -> Arc<dyn ExecProcess> {
|
||||
Arc::clone(&self.executor)
|
||||
@@ -114,17 +155,39 @@ impl ExecutorEnvironment for Environment {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::Environment;
|
||||
use super::EnvironmentManager;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_without_remote_exec_server_url_does_not_connect() {
|
||||
let environment = Environment::create(None).await.expect("create environment");
|
||||
let environment = Environment::create(/*exec_server_url*/ None)
|
||||
.await
|
||||
.expect("create environment");
|
||||
|
||||
assert_eq!(environment.experimental_exec_server_url(), None);
|
||||
assert_eq!(environment.exec_server_url(), None);
|
||||
assert!(environment.remote_exec_server_client.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn environment_manager_normalizes_empty_url() {
|
||||
let manager = EnvironmentManager::new(Some(String::new()));
|
||||
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_current_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager.current().await.expect("get current environment");
|
||||
let second = manager.current().await.expect("get current environment");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_environment_has_ready_local_executor() {
|
||||
let environment = Environment::default();
|
||||
|
||||
@@ -30,7 +30,9 @@ pub use codex_app_server_protocol::FsRemoveParams;
|
||||
pub use codex_app_server_protocol::FsRemoveResponse;
|
||||
pub use codex_app_server_protocol::FsWriteFileParams;
|
||||
pub use codex_app_server_protocol::FsWriteFileResponse;
|
||||
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
|
||||
pub use environment::Environment;
|
||||
pub use environment::EnvironmentManager;
|
||||
pub use environment::ExecutorEnvironment;
|
||||
pub use file_system::CopyOptions;
|
||||
pub use file_system::CreateDirectoryOptions;
|
||||
|
||||
@@ -10,6 +10,7 @@ use codex_app_server_protocol::FsRemoveParams;
|
||||
use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use tokio::io;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::CopyOptions;
|
||||
use crate::CreateDirectoryOptions;
|
||||
@@ -30,6 +31,7 @@ pub(crate) struct RemoteFileSystem {
|
||||
|
||||
impl RemoteFileSystem {
|
||||
pub(crate) fn new(client: ExecServerClient) -> Self {
|
||||
trace!("remote fs new");
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
@@ -37,6 +39,7 @@ impl RemoteFileSystem {
|
||||
#[async_trait]
|
||||
impl ExecutorFileSystem for RemoteFileSystem {
|
||||
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>> {
|
||||
trace!("remote fs read_file");
|
||||
let response = self
|
||||
.client
|
||||
.fs_read_file(FsReadFileParams { path: path.clone() })
|
||||
@@ -51,6 +54,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
}
|
||||
|
||||
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()> {
|
||||
trace!("remote fs write_file");
|
||||
self.client
|
||||
.fs_write_file(FsWriteFileParams {
|
||||
path: path.clone(),
|
||||
@@ -66,6 +70,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
path: &AbsolutePathBuf,
|
||||
options: CreateDirectoryOptions,
|
||||
) -> FileSystemResult<()> {
|
||||
trace!("remote fs create_directory");
|
||||
self.client
|
||||
.fs_create_directory(FsCreateDirectoryParams {
|
||||
path: path.clone(),
|
||||
@@ -77,6 +82,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
}
|
||||
|
||||
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata> {
|
||||
trace!("remote fs get_metadata");
|
||||
let response = self
|
||||
.client
|
||||
.fs_get_metadata(FsGetMetadataParams { path: path.clone() })
|
||||
@@ -94,6 +100,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
) -> FileSystemResult<Vec<ReadDirectoryEntry>> {
|
||||
trace!("remote fs read_directory");
|
||||
let response = self
|
||||
.client
|
||||
.fs_read_directory(FsReadDirectoryParams { path: path.clone() })
|
||||
@@ -111,6 +118,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
}
|
||||
|
||||
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> {
|
||||
trace!("remote fs remove");
|
||||
self.client
|
||||
.fs_remove(FsRemoveParams {
|
||||
path: path.clone(),
|
||||
@@ -128,6 +136,7 @@ impl ExecutorFileSystem for RemoteFileSystem {
|
||||
destination_path: &AbsolutePathBuf,
|
||||
options: CopyOptions,
|
||||
) -> FileSystemResult<()> {
|
||||
trace!("remote fs copy");
|
||||
self.client
|
||||
.fs_copy(FsCopyParams {
|
||||
source_path: source_path.clone(),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::ExecProcess;
|
||||
use crate::ExecServerClient;
|
||||
@@ -19,6 +20,7 @@ pub(crate) struct RemoteProcess {
|
||||
|
||||
impl RemoteProcess {
|
||||
pub(crate) fn new(client: ExecServerClient) -> Self {
|
||||
trace!("remote process new");
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
@@ -26,10 +28,12 @@ impl RemoteProcess {
|
||||
#[async_trait]
|
||||
impl ExecProcess for RemoteProcess {
|
||||
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
trace!("remote process start");
|
||||
self.client.exec(params).await
|
||||
}
|
||||
|
||||
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
|
||||
trace!("remote process read");
|
||||
self.client.read(params).await
|
||||
}
|
||||
|
||||
@@ -38,14 +42,17 @@ impl ExecProcess for RemoteProcess {
|
||||
process_id: &str,
|
||||
chunk: Vec<u8>,
|
||||
) -> Result<WriteResponse, ExecServerError> {
|
||||
trace!("remote process write");
|
||||
self.client.write(process_id, chunk).await
|
||||
}
|
||||
|
||||
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
|
||||
trace!("remote process terminate");
|
||||
self.client.terminate(process_id).await
|
||||
}
|
||||
|
||||
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
|
||||
trace!("remote process subscribe_events");
|
||||
self.client.event_receiver()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user