Compare commits

...

2 Commits

Author SHA1 Message Date
starr-openai
021c8b0a36 Thread environment selection through core spawn
Co-authored-by: Codex <noreply@openai.com>
2026-04-15 15:06:47 -07:00
starr-openai
fd9082c392 Add environment registry to exec-server
Co-authored-by: Codex <noreply@openai.com>
2026-04-15 15:06:35 -07:00
9 changed files with 567 additions and 46 deletions

View File

@@ -427,6 +427,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) models_manager: Arc<ModelsManager>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) environment_id: Option<String>,
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
@@ -480,6 +481,7 @@ impl Codex {
auth_manager,
models_manager,
environment_manager,
environment_id,
skills_manager,
plugins_manager,
mcp_manager,
@@ -500,7 +502,7 @@ impl Codex {
let (tx_event, rx_event) = async_channel::unbounded();
let environment = environment_manager
.current()
.environment(environment_id.as_deref())
.await
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
let fs = environment

View File

@@ -82,6 +82,7 @@ pub(crate) async fn run_codex_thread_interactive(
environment_manager: Arc::new(EnvironmentManager::from_environment(
parent_ctx.environment.as_deref(),
)),
environment_id: None,
skills_manager: Arc::clone(&parent_session.services.skills_manager),
plugins_manager: Arc::clone(&parent_session.services.plugins_manager),
mcp_manager: Arc::clone(&parent_session.services.mcp_manager),

View File

@@ -435,6 +435,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
auth_manager,
models_manager,
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
environment_id: None,
skills_manager,
plugins_manager,
mcp_manager,

View File

@@ -471,6 +471,7 @@ impl ThreadManager {
config,
Vec::new(),
/*persist_extended_history*/ false,
/*environment_id*/ None,
))
.await
}
@@ -480,6 +481,7 @@ impl ThreadManager {
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
environment_id: Option<String>,
) -> CodexResult<NewThread> {
Box::pin(self.start_thread_with_tools_and_service_name(
config,
@@ -488,6 +490,7 @@ impl ThreadManager {
persist_extended_history,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
environment_id,
))
.await
}
@@ -500,6 +503,7 @@ impl ThreadManager {
persist_extended_history: bool,
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
environment_id: Option<String>,
) -> CodexResult<NewThread> {
Box::pin(self.state.spawn_thread(
config,
@@ -511,6 +515,7 @@ impl ThreadManager {
metrics_service_name,
parent_trace,
/*user_shell_override*/ None,
environment_id,
))
.await
}
@@ -551,6 +556,7 @@ impl ThreadManager {
/*metrics_service_name*/ None,
parent_trace,
/*user_shell_override*/ None,
/*environment_id*/ None,
))
.await
}
@@ -570,6 +576,7 @@ impl ThreadManager {
/*metrics_service_name*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
/*environment_id*/ None,
))
.await
}
@@ -592,6 +599,7 @@ impl ThreadManager {
/*metrics_service_name*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
/*environment_id*/ None,
))
.await
}
@@ -700,6 +708,7 @@ impl ThreadManager {
/*metrics_service_name*/ None,
parent_trace,
/*user_shell_override*/ None,
/*environment_id*/ None,
))
.await
}
@@ -801,6 +810,7 @@ impl ThreadManagerState {
inherited_exec_policy,
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
))
.await
}
@@ -828,6 +838,7 @@ impl ThreadManagerState {
inherited_exec_policy,
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
))
.await
}
@@ -856,6 +867,7 @@ impl ThreadManagerState {
inherited_exec_policy,
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
))
.await
}
@@ -873,6 +885,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
environment_id: Option<String>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -887,6 +900,7 @@ impl ThreadManagerState {
/*inherited_exec_policy*/ None,
parent_trace,
user_shell_override,
environment_id,
))
.await
}
@@ -906,10 +920,11 @@ impl ThreadManagerState {
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
environment_id: Option<String>,
) -> CodexResult<NewThread> {
let environment = self
.environment_manager
.current()
.environment(environment_id.as_deref())
.await
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
let watch_registration = match environment.as_ref() {
@@ -932,6 +947,7 @@ impl ThreadManagerState {
auth_manager,
models_manager: Arc::clone(&self.models_manager),
environment_manager: Arc::clone(&self.environment_manager),
environment_id,
skills_manager: Arc::clone(&self.skills_manager),
plugins_manager: Arc::clone(&self.plugins_manager),
mcp_manager: Arc::clone(&self.mcp_manager),

View File

@@ -202,6 +202,8 @@ pub struct TestCodexBuilder {
workspace_setups: Vec<Box<WorkspaceSetup>>,
home: Option<Arc<TempDir>>,
user_shell_override: Option<Shell>,
environment_manager_override: Option<Arc<codex_exec_server::EnvironmentManager>>,
thread_environment_id: Option<String>,
}
impl TestCodexBuilder {
@@ -253,6 +255,19 @@ impl TestCodexBuilder {
self
}
pub fn with_environment_manager(
mut self,
environment_manager: Arc<codex_exec_server::EnvironmentManager>,
) -> Self {
self.environment_manager_override = Some(environment_manager);
self
}
pub fn with_thread_environment_id(mut self, environment_id: impl Into<String>) -> Self {
self.thread_environment_id = Some(environment_id.into());
self
}
pub fn with_windows_cmd_shell(self) -> Self {
if cfg!(windows) {
self.with_user_shell(get_shell_by_model_provided_path(&PathBuf::from("cmd.exe")))
@@ -348,9 +363,17 @@ impl TestCodexBuilder {
let (config, fallback_cwd) = self
.prepare_config(base_url, &home, test_env.cwd().clone())
.await?;
let environment_manager = Arc::new(codex_exec_server::EnvironmentManager::new(
test_env.exec_server_url().map(str::to_owned),
));
let environment_manager = self
.environment_manager_override
.clone()
.unwrap_or_else(|| {
Arc::new(codex_exec_server::EnvironmentManager::new(
test_env.exec_server_url().map(str::to_owned),
))
});
let selected_environment = environment_manager
.environment(self.thread_environment_id.as_deref())
.await?;
let file_system = test_env.environment().get_filesystem();
let mut workspace_setups = vec![];
swap(&mut self.workspace_setups, &mut workspace_setups);
@@ -365,6 +388,7 @@ impl TestCodexBuilder {
resume_from,
test_env,
environment_manager,
selected_environment,
))
.await
}
@@ -377,6 +401,7 @@ impl TestCodexBuilder {
resume_from: Option<PathBuf>,
test_env: TestEnv,
environment_manager: Arc<codex_exec_server::EnvironmentManager>,
selected_environment: Option<Arc<codex_exec_server::Environment>>,
) -> anyhow::Result<TestCodex> {
let auth = self.auth.clone();
let thread_manager = if config.model_catalog.is_some() {
@@ -398,8 +423,15 @@ impl TestCodexBuilder {
};
let thread_manager = Arc::new(thread_manager);
let user_shell_override = self.user_shell_override.clone();
let thread_environment_id = self.thread_environment_id.clone();
let new_conversation = match (resume_from, user_shell_override) {
(Some(_), _) if thread_environment_id.is_some() => {
anyhow::bail!("test harness does not support resuming with thread_environment_id")
}
(_, Some(_)) if thread_environment_id.is_some() => anyhow::bail!(
"test harness does not support user_shell_override with thread_environment_id"
),
(Some(path), Some(user_shell_override)) => {
let auth_manager = codex_core::test_support::auth_manager_from_auth(auth);
Box::pin(
@@ -433,7 +465,15 @@ impl TestCodexBuilder {
)
.await?
}
(None, None) => Box::pin(thread_manager.start_thread(config.clone())).await?,
(None, None) => {
Box::pin(thread_manager.start_thread_with_tools(
config.clone(),
Vec::new(),
/*persist_extended_history*/ false,
thread_environment_id,
))
.await?
}
};
Ok(TestCodex {
@@ -443,6 +483,7 @@ impl TestCodexBuilder {
codex: new_conversation.thread,
session_configured: new_conversation.session_configured,
thread_manager,
selected_environment,
_test_env: test_env,
})
}
@@ -533,6 +574,7 @@ pub struct TestCodex {
pub session_configured: SessionConfiguredEvent,
pub config: Config,
pub thread_manager: Arc<ThreadManager>,
selected_environment: Option<Arc<codex_exec_server::Environment>>,
_test_env: TestEnv,
}
@@ -553,6 +595,10 @@ impl TestCodex {
&self._test_env
}
pub fn selected_environment(&self) -> Option<&codex_exec_server::Environment> {
self.selected_environment.as_deref()
}
pub fn fs(&self) -> Arc<dyn ExecutorFileSystem> {
self._test_env.environment().get_filesystem()
}
@@ -879,6 +925,8 @@ pub fn test_codex() -> TestCodexBuilder {
workspace_setups: vec![],
home: None,
user_shell_override: None,
environment_manager_override: None,
thread_environment_id: None,
}
}

View File

@@ -2364,6 +2364,7 @@ async fn code_mode_can_call_hidden_dynamic_tools() -> Result<()> {
defer_loading: true,
}],
/*persist_extended_history*/ false,
/*environment_id*/ None,
)
.await?;
let mut test = base_test;

View File

@@ -2,6 +2,8 @@ use anyhow::Context;
use anyhow::Result;
use codex_exec_server::CopyOptions;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::EnvironmentConfig;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::FileSystemSandboxContext;
use codex_exec_server::RemoveOptions;
use codex_protocol::protocol::ReadOnlyAccess;
@@ -9,14 +11,137 @@ use codex_protocol::protocol::SandboxPolicy;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathBufExt;
use core_test_support::get_remote_test_env;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::test_env;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
const REMOTE_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_TEST_REMOTE_EXEC_SERVER_URL";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_can_start_and_complete_turn_with_disabled_default_environment() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-disabled"),
ev_assistant_message("msg-disabled", "done"),
ev_completed("resp-disabled"),
]),
)
.await;
let environment_manager = Arc::new(EnvironmentManager::new(Some("none".to_string())));
let mut builder = test_codex().with_environment_manager(environment_manager);
let test = builder.build(&server).await?;
assert!(test.selected_environment().is_none());
test.submit_turn("hello from disabled env").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_can_start_and_complete_turn_with_named_local_environment() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-local"),
ev_assistant_message("msg-local", "done"),
ev_completed("resp-local"),
]),
)
.await;
let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None));
environment_manager
.register_environment(
"local".to_string(),
EnvironmentConfig {
exec_server_url: None,
},
)
.await?;
let mut builder = test_codex()
.with_environment_manager(environment_manager)
.with_thread_environment_id("local");
let test = builder.build(&server).await?;
let selected_environment = test
.selected_environment()
.context("named local environment should resolve")?;
assert!(!selected_environment.is_remote());
test.submit_turn("hello from named local env").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_can_start_and_complete_turn_with_named_remote_environment() -> Result<()> {
skip_if_no_network!(Ok(()));
let Some(_remote_env) = get_remote_test_env() else {
return Ok(());
};
let remote_exec_server_url =
std::env::var(REMOTE_EXEC_SERVER_URL_ENV_VAR).with_context(|| {
format!(
"{REMOTE_EXEC_SERVER_URL_ENV_VAR} must be set for named remote environment tests"
)
})?;
let server = start_mock_server().await;
let _mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-remote"),
ev_assistant_message("msg-remote", "done"),
ev_completed("resp-remote"),
]),
)
.await;
let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None));
environment_manager
.register_environment(
"remote".to_string(),
EnvironmentConfig {
exec_server_url: Some(remote_exec_server_url),
},
)
.await?;
let mut builder = test_codex()
.with_environment_manager(environment_manager)
.with_thread_environment_id("remote");
let test = builder.build_remote_aware(&server).await?;
let selected_environment = test
.selected_environment()
.context("named remote environment should resolve")?;
assert!(selected_environment.is_remote());
test.submit_turn("hello from named remote env").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_test_env_can_connect_and_use_filesystem() -> Result<()> {
let Some(_remote_env) = get_remote_test_env() else {

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use crate::ExecServerClient;
use crate::ExecServerError;
@@ -15,16 +17,31 @@ use crate::remote_process::RemoteProcess;
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
pub type EnvironmentId = String;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct EnvironmentConfig {
pub exec_server_url: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RegisteredEnvironment {
pub environment_id: EnvironmentId,
pub config: EnvironmentConfig,
}
/// Lazily creates and caches the active environment for a session.
///
/// The manager keeps the session's environment selection stable so subagents
/// and follow-up turns preserve an explicit disabled state.
#[derive(Debug)]
pub struct EnvironmentManager {
exec_server_url: Option<String>,
default_environment_config: EnvironmentConfig,
local_runtime_paths: Option<ExecServerRuntimePaths>,
disabled: bool,
current_environment: OnceCell<Option<Arc<Environment>>>,
default_disabled: bool,
default_environment: OnceCell<Option<Arc<Environment>>>,
environment_configs: RwLock<HashMap<EnvironmentId, EnvironmentConfig>>,
environment_cache: RwLock<HashMap<EnvironmentId, Arc<OnceCell<Option<Arc<Environment>>>>>>,
}
impl Default for EnvironmentManager {
@@ -45,12 +62,14 @@ impl EnvironmentManager {
exec_server_url: Option<String>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Self {
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
let (exec_server_url, default_disabled) = normalize_exec_server_url(exec_server_url);
Self {
exec_server_url,
default_environment_config: EnvironmentConfig { exec_server_url },
local_runtime_paths,
disabled,
current_environment: OnceCell::new(),
default_disabled,
default_environment: OnceCell::new(),
environment_configs: RwLock::new(HashMap::new()),
environment_cache: RwLock::new(HashMap::new()),
}
}
@@ -74,51 +93,168 @@ impl EnvironmentManager {
/// disabled mode when no environment is available.
pub fn from_environment(environment: Option<&Environment>) -> Self {
match environment {
Some(environment) => Self {
exec_server_url: environment.exec_server_url().map(str::to_owned),
local_runtime_paths: environment.local_runtime_paths().cloned(),
disabled: false,
current_environment: OnceCell::new(),
},
Some(environment) => Self::new_with_runtime_paths(
environment.exec_server_url().map(str::to_owned),
environment.local_runtime_paths().cloned(),
),
None => Self {
exec_server_url: None,
default_environment_config: EnvironmentConfig::default(),
local_runtime_paths: None,
disabled: true,
current_environment: OnceCell::new(),
default_disabled: true,
default_environment: OnceCell::new(),
environment_configs: RwLock::new(HashMap::new()),
environment_cache: RwLock::new(HashMap::new()),
},
}
}
/// Returns the remote exec-server URL when one is configured.
/// Returns the default remote exec-server URL when one is configured.
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
self.default_environment_config.exec_server_url.as_deref()
}
/// Returns true when this manager is configured to use a remote exec server.
/// Returns true when the default environment is configured to use a remote exec server.
pub fn is_remote(&self) -> bool {
self.exec_server_url.is_some()
self.default_environment_config.exec_server_url.is_some()
}
pub async fn register_environment(
&self,
environment_id: EnvironmentId,
config: EnvironmentConfig,
) -> Result<(), ExecServerError> {
let environment_id = normalize_registered_environment_id(environment_id)?;
let (exec_server_url, disabled) = normalize_exec_server_url(config.exec_server_url);
if disabled {
return Err(ExecServerError::Protocol(
"named environments cannot use the reserved disabled value 'none'".to_string(),
));
}
let config = EnvironmentConfig { exec_server_url };
self.environment_configs
.write()
.await
.insert(environment_id.clone(), config);
self.environment_cache
.write()
.await
.insert(environment_id, Arc::new(OnceCell::new()));
Ok(())
}
pub async fn list_environments(
&self,
cursor: Option<&str>,
limit: Option<u32>,
) -> (Vec<RegisteredEnvironment>, Option<String>) {
let cursor = normalize_environment_id(cursor).map(str::to_owned);
let limit = usize::try_from(limit.unwrap_or(100)).unwrap_or(100);
let configs = self.environment_configs.read().await;
let mut environment_ids = configs.keys().cloned().collect::<Vec<_>>();
environment_ids.sort();
let start_index = cursor
.as_ref()
.and_then(|cursor| environment_ids.iter().position(|id| id == cursor))
.map_or(0, |index| index.saturating_add(1));
let selected_ids = environment_ids
.into_iter()
.skip(start_index)
.take(limit.saturating_add(1))
.collect::<Vec<_>>();
let has_more = selected_ids.len() > limit;
let next_cursor = if has_more {
selected_ids.get(limit.saturating_sub(1)).cloned()
} else {
None
};
let data = selected_ids
.into_iter()
.take(limit)
.filter_map(|environment_id| {
configs
.get(&environment_id)
.cloned()
.map(|config| RegisteredEnvironment {
environment_id,
config,
})
})
.collect();
(data, next_cursor)
}
/// Returns the cached environment, creating it on first access.
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
self.current_environment
pub async fn environment(
&self,
environment_id: Option<&str>,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
match normalize_environment_id(environment_id) {
None => self.default_environment().await,
Some(environment_id) => self.named_environment(environment_id).await,
}
}
async fn default_environment(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
self.default_environment
.get_or_try_init(|| async {
if self.disabled {
if self.default_disabled {
Ok(None)
} else {
Ok(Some(Arc::new(
Environment::create_with_runtime_paths(
self.exec_server_url.clone(),
self.local_runtime_paths.clone(),
)
.await?,
)))
self.build_environment(&self.default_environment_config)
.await
}
})
.await
.map(Option::as_ref)
.map(std::option::Option::<&Arc<Environment>>::cloned)
}
async fn named_environment(
&self,
environment_id: &str,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
let config = self
.environment_configs
.read()
.await
.get(environment_id)
.cloned()
.ok_or_else(|| {
ExecServerError::Protocol(format!("unknown environment id: {environment_id}"))
})?;
let environment_cell = {
let mut environment_cache = self.environment_cache.write().await;
Arc::clone(
environment_cache
.entry(environment_id.to_string())
.or_insert_with(|| Arc::new(OnceCell::new())),
)
};
environment_cell
.get_or_try_init(|| async { self.build_environment(&config).await })
.await
.map(Option::as_ref)
.map(std::option::Option::<&Arc<Environment>>::cloned)
}
async fn build_environment(
&self,
config: &EnvironmentConfig,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
Ok(Some(Arc::new(
Environment::create_with_runtime_paths(
config.exec_server_url.clone(),
self.local_runtime_paths.clone(),
)
.await?,
)))
}
}
/// Concrete execution/filesystem environment selected for a session.
@@ -236,12 +372,33 @@ fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>
Some(url) => (Some(url.to_string()), false),
}
}
fn normalize_environment_id(environment_id: Option<&str>) -> Option<&str> {
match environment_id.map(str::trim) {
None | Some("") => None,
Some(environment_id) if environment_id.eq_ignore_ascii_case("default") => None,
Some(environment_id) => Some(environment_id),
}
}
fn normalize_registered_environment_id(
environment_id: String,
) -> Result<EnvironmentId, ExecServerError> {
match normalize_environment_id(Some(&environment_id)) {
None => Err(ExecServerError::Protocol(
"environment id is reserved for the default environment".to_string(),
)),
Some(environment_id) => Ok(environment_id.to_string()),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::Environment;
use super::EnvironmentConfig;
use super::EnvironmentManager;
use super::RegisteredEnvironment;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
use pretty_assertions::assert_eq;
@@ -260,7 +417,6 @@ mod tests {
fn environment_manager_normalizes_empty_url() {
let manager = EnvironmentManager::new(Some(String::new()));
assert!(!manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
@@ -269,7 +425,6 @@ mod tests {
fn environment_manager_treats_none_value_as_disabled() {
let manager = EnvironmentManager::new(Some("none".to_string()));
assert!(manager.disabled);
assert_eq!(manager.exec_server_url(), None);
assert!(!manager.is_remote());
}
@@ -283,11 +438,17 @@ mod tests {
}
#[tokio::test]
async fn environment_manager_current_caches_environment() {
async fn environment_manager_default_environment_caches_environment() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let first = manager.current().await.expect("get current environment");
let second = manager.current().await.expect("get current environment");
let first = manager
.environment(None)
.await
.expect("get default environment");
let second = manager
.environment(None)
.await
.expect("get default environment");
let first = first.expect("local environment");
let second = second.expect("local environment");
@@ -308,9 +469,9 @@ mod tests {
);
let environment = manager
.current()
.environment(None)
.await
.expect("get current environment")
.expect("get default environment")
.expect("local environment");
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
@@ -321,18 +482,182 @@ mod tests {
}
#[tokio::test]
async fn disabled_environment_manager_has_no_current_environment() {
async fn disabled_environment_manager_has_no_default_environment() {
let manager = EnvironmentManager::new(Some("none".to_string()));
assert!(
manager
.current()
.environment(None)
.await
.expect("get current environment")
.expect("get default environment")
.is_none()
);
}
#[tokio::test]
async fn environment_manager_named_environment_caches_environment() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
manager
.register_environment(
"dev".to_string(),
EnvironmentConfig {
exec_server_url: None,
},
)
.await
.expect("register environment");
let first = manager
.environment(Some("dev"))
.await
.expect("get named environment")
.expect("local environment");
let second = manager
.environment(Some("dev"))
.await
.expect("get named environment")
.expect("local environment");
assert!(Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn environment_manager_rejects_unknown_environment_id() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let error = manager
.environment(Some("missing"))
.await
.expect_err("unknown environment id should error");
assert_eq!(
error.to_string(),
"exec-server protocol error: unknown environment id: missing"
);
}
#[tokio::test]
async fn environment_manager_treats_default_environment_id_as_default() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let first = manager
.environment(None)
.await
.expect("get default environment")
.expect("local environment");
let second = manager
.environment(Some("default"))
.await
.expect("get default environment")
.expect("local environment");
assert!(Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn environment_manager_rejects_registering_default_environment_id() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let error = manager
.register_environment(
"default".to_string(),
EnvironmentConfig {
exec_server_url: None,
},
)
.await
.expect_err("default id should be reserved");
assert_eq!(
error.to_string(),
"exec-server protocol error: environment id is reserved for the default environment"
);
}
#[tokio::test]
async fn environment_manager_rejects_registering_disabled_named_environment() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
let error = manager
.register_environment(
"disabled".to_string(),
EnvironmentConfig {
exec_server_url: Some("none".to_string()),
},
)
.await
.expect_err("named disabled environment should be rejected");
assert_eq!(
error.to_string(),
"exec-server protocol error: named environments cannot use the reserved disabled value 'none'"
);
}
#[tokio::test]
async fn environment_manager_lists_named_environments_sorted_with_pagination() {
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
manager
.register_environment(
"beta".to_string(),
EnvironmentConfig {
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
},
)
.await
.expect("register beta environment");
manager
.register_environment(
"alpha".to_string(),
EnvironmentConfig {
exec_server_url: None,
},
)
.await
.expect("register alpha environment");
manager
.register_environment(
"gamma".to_string(),
EnvironmentConfig {
exec_server_url: None,
},
)
.await
.expect("register gamma environment");
let (first_page, next_cursor) = manager.list_environments(/*cursor*/ None, Some(2)).await;
assert_eq!(
first_page,
vec![
RegisteredEnvironment {
environment_id: "alpha".to_string(),
config: EnvironmentConfig {
exec_server_url: None,
},
},
RegisteredEnvironment {
environment_id: "beta".to_string(),
config: EnvironmentConfig {
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
},
},
]
);
assert_eq!(next_cursor, Some("beta".to_string()));
let (second_page, next_cursor) = manager.list_environments(Some("beta"), Some(2)).await;
assert_eq!(
second_page,
vec![RegisteredEnvironment {
environment_id: "gamma".to_string(),
config: EnvironmentConfig {
exec_server_url: None,
},
}]
);
assert_eq!(next_cursor, None);
}
#[tokio::test]
async fn default_environment_has_ready_local_executor() {
let environment = Environment::default();

View File

@@ -24,7 +24,9 @@ pub use client_api::ExecServerClientConnectOptions;
pub use client_api::RemoteExecServerConnectArgs;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::Environment;
pub use environment::EnvironmentConfig;
pub use environment::EnvironmentManager;
pub use environment::RegisteredEnvironment;
pub use file_system::CopyOptions;
pub use file_system::CreateDirectoryOptions;
pub use file_system::ExecutorFileSystem;