mirror of
https://github.com/openai/codex.git
synced 2026-04-26 09:21:02 +03:00
Compare commits
2 Commits
pr17693
...
starr/env-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
021c8b0a36 | ||
|
|
fd9082c392 |
@@ -427,6 +427,7 @@ pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) models_manager: Arc<ModelsManager>,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
pub(crate) environment_id: Option<String>,
|
||||
pub(crate) skills_manager: Arc<SkillsManager>,
|
||||
pub(crate) plugins_manager: Arc<PluginsManager>,
|
||||
pub(crate) mcp_manager: Arc<McpManager>,
|
||||
@@ -480,6 +481,7 @@ impl Codex {
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager,
|
||||
environment_id,
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
mcp_manager,
|
||||
@@ -500,7 +502,7 @@ impl Codex {
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
|
||||
let environment = environment_manager
|
||||
.current()
|
||||
.environment(environment_id.as_deref())
|
||||
.await
|
||||
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
|
||||
let fs = environment
|
||||
|
||||
@@ -82,6 +82,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
environment_manager: Arc::new(EnvironmentManager::from_environment(
|
||||
parent_ctx.environment.as_deref(),
|
||||
)),
|
||||
environment_id: None,
|
||||
skills_manager: Arc::clone(&parent_session.services.skills_manager),
|
||||
plugins_manager: Arc::clone(&parent_session.services.plugins_manager),
|
||||
mcp_manager: Arc::clone(&parent_session.services.mcp_manager),
|
||||
|
||||
@@ -435,6 +435,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
||||
environment_id: None,
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
mcp_manager,
|
||||
|
||||
@@ -471,6 +471,7 @@ impl ThreadManager {
|
||||
config,
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -480,6 +481,7 @@ impl ThreadManager {
|
||||
config: Config,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
environment_id: Option<String>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.start_thread_with_tools_and_service_name(
|
||||
config,
|
||||
@@ -488,6 +490,7 @@ impl ThreadManager {
|
||||
persist_extended_history,
|
||||
/*metrics_service_name*/ None,
|
||||
/*parent_trace*/ None,
|
||||
environment_id,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -500,6 +503,7 @@ impl ThreadManager {
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
environment_id: Option<String>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
@@ -511,6 +515,7 @@ impl ThreadManager {
|
||||
metrics_service_name,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
environment_id,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -551,6 +556,7 @@ impl ThreadManager {
|
||||
/*metrics_service_name*/ None,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -570,6 +576,7 @@ impl ThreadManager {
|
||||
/*metrics_service_name*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ Some(user_shell_override),
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -592,6 +599,7 @@ impl ThreadManager {
|
||||
/*metrics_service_name*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ Some(user_shell_override),
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -700,6 +708,7 @@ impl ThreadManager {
|
||||
/*metrics_service_name*/ None,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -801,6 +810,7 @@ impl ThreadManagerState {
|
||||
inherited_exec_policy,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -828,6 +838,7 @@ impl ThreadManagerState {
|
||||
inherited_exec_policy,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -856,6 +867,7 @@ impl ThreadManagerState {
|
||||
inherited_exec_policy,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
/*environment_id*/ None,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -873,6 +885,7 @@ impl ThreadManagerState {
|
||||
metrics_service_name: Option<String>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
environment_id: Option<String>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.spawn_thread_with_source(
|
||||
config,
|
||||
@@ -887,6 +900,7 @@ impl ThreadManagerState {
|
||||
/*inherited_exec_policy*/ None,
|
||||
parent_trace,
|
||||
user_shell_override,
|
||||
environment_id,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -906,10 +920,11 @@ impl ThreadManagerState {
|
||||
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
environment_id: Option<String>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let environment = self
|
||||
.environment_manager
|
||||
.current()
|
||||
.environment(environment_id.as_deref())
|
||||
.await
|
||||
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
|
||||
let watch_registration = match environment.as_ref() {
|
||||
@@ -932,6 +947,7 @@ impl ThreadManagerState {
|
||||
auth_manager,
|
||||
models_manager: Arc::clone(&self.models_manager),
|
||||
environment_manager: Arc::clone(&self.environment_manager),
|
||||
environment_id,
|
||||
skills_manager: Arc::clone(&self.skills_manager),
|
||||
plugins_manager: Arc::clone(&self.plugins_manager),
|
||||
mcp_manager: Arc::clone(&self.mcp_manager),
|
||||
|
||||
@@ -202,6 +202,8 @@ pub struct TestCodexBuilder {
|
||||
workspace_setups: Vec<Box<WorkspaceSetup>>,
|
||||
home: Option<Arc<TempDir>>,
|
||||
user_shell_override: Option<Shell>,
|
||||
environment_manager_override: Option<Arc<codex_exec_server::EnvironmentManager>>,
|
||||
thread_environment_id: Option<String>,
|
||||
}
|
||||
|
||||
impl TestCodexBuilder {
|
||||
@@ -253,6 +255,19 @@ impl TestCodexBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_environment_manager(
|
||||
mut self,
|
||||
environment_manager: Arc<codex_exec_server::EnvironmentManager>,
|
||||
) -> Self {
|
||||
self.environment_manager_override = Some(environment_manager);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_thread_environment_id(mut self, environment_id: impl Into<String>) -> Self {
|
||||
self.thread_environment_id = Some(environment_id.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_windows_cmd_shell(self) -> Self {
|
||||
if cfg!(windows) {
|
||||
self.with_user_shell(get_shell_by_model_provided_path(&PathBuf::from("cmd.exe")))
|
||||
@@ -348,9 +363,17 @@ impl TestCodexBuilder {
|
||||
let (config, fallback_cwd) = self
|
||||
.prepare_config(base_url, &home, test_env.cwd().clone())
|
||||
.await?;
|
||||
let environment_manager = Arc::new(codex_exec_server::EnvironmentManager::new(
|
||||
test_env.exec_server_url().map(str::to_owned),
|
||||
));
|
||||
let environment_manager = self
|
||||
.environment_manager_override
|
||||
.clone()
|
||||
.unwrap_or_else(|| {
|
||||
Arc::new(codex_exec_server::EnvironmentManager::new(
|
||||
test_env.exec_server_url().map(str::to_owned),
|
||||
))
|
||||
});
|
||||
let selected_environment = environment_manager
|
||||
.environment(self.thread_environment_id.as_deref())
|
||||
.await?;
|
||||
let file_system = test_env.environment().get_filesystem();
|
||||
let mut workspace_setups = vec![];
|
||||
swap(&mut self.workspace_setups, &mut workspace_setups);
|
||||
@@ -365,6 +388,7 @@ impl TestCodexBuilder {
|
||||
resume_from,
|
||||
test_env,
|
||||
environment_manager,
|
||||
selected_environment,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -377,6 +401,7 @@ impl TestCodexBuilder {
|
||||
resume_from: Option<PathBuf>,
|
||||
test_env: TestEnv,
|
||||
environment_manager: Arc<codex_exec_server::EnvironmentManager>,
|
||||
selected_environment: Option<Arc<codex_exec_server::Environment>>,
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let auth = self.auth.clone();
|
||||
let thread_manager = if config.model_catalog.is_some() {
|
||||
@@ -398,8 +423,15 @@ impl TestCodexBuilder {
|
||||
};
|
||||
let thread_manager = Arc::new(thread_manager);
|
||||
let user_shell_override = self.user_shell_override.clone();
|
||||
let thread_environment_id = self.thread_environment_id.clone();
|
||||
|
||||
let new_conversation = match (resume_from, user_shell_override) {
|
||||
(Some(_), _) if thread_environment_id.is_some() => {
|
||||
anyhow::bail!("test harness does not support resuming with thread_environment_id")
|
||||
}
|
||||
(_, Some(_)) if thread_environment_id.is_some() => anyhow::bail!(
|
||||
"test harness does not support user_shell_override with thread_environment_id"
|
||||
),
|
||||
(Some(path), Some(user_shell_override)) => {
|
||||
let auth_manager = codex_core::test_support::auth_manager_from_auth(auth);
|
||||
Box::pin(
|
||||
@@ -433,7 +465,15 @@ impl TestCodexBuilder {
|
||||
)
|
||||
.await?
|
||||
}
|
||||
(None, None) => Box::pin(thread_manager.start_thread(config.clone())).await?,
|
||||
(None, None) => {
|
||||
Box::pin(thread_manager.start_thread_with_tools(
|
||||
config.clone(),
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
thread_environment_id,
|
||||
))
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(TestCodex {
|
||||
@@ -443,6 +483,7 @@ impl TestCodexBuilder {
|
||||
codex: new_conversation.thread,
|
||||
session_configured: new_conversation.session_configured,
|
||||
thread_manager,
|
||||
selected_environment,
|
||||
_test_env: test_env,
|
||||
})
|
||||
}
|
||||
@@ -533,6 +574,7 @@ pub struct TestCodex {
|
||||
pub session_configured: SessionConfiguredEvent,
|
||||
pub config: Config,
|
||||
pub thread_manager: Arc<ThreadManager>,
|
||||
selected_environment: Option<Arc<codex_exec_server::Environment>>,
|
||||
_test_env: TestEnv,
|
||||
}
|
||||
|
||||
@@ -553,6 +595,10 @@ impl TestCodex {
|
||||
&self._test_env
|
||||
}
|
||||
|
||||
pub fn selected_environment(&self) -> Option<&codex_exec_server::Environment> {
|
||||
self.selected_environment.as_deref()
|
||||
}
|
||||
|
||||
pub fn fs(&self) -> Arc<dyn ExecutorFileSystem> {
|
||||
self._test_env.environment().get_filesystem()
|
||||
}
|
||||
@@ -879,6 +925,8 @@ pub fn test_codex() -> TestCodexBuilder {
|
||||
workspace_setups: vec![],
|
||||
home: None,
|
||||
user_shell_override: None,
|
||||
environment_manager_override: None,
|
||||
thread_environment_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2364,6 +2364,7 @@ async fn code_mode_can_call_hidden_dynamic_tools() -> Result<()> {
|
||||
defer_loading: true,
|
||||
}],
|
||||
/*persist_extended_history*/ false,
|
||||
/*environment_id*/ None,
|
||||
)
|
||||
.await?;
|
||||
let mut test = base_test;
|
||||
|
||||
@@ -2,6 +2,8 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_exec_server::CopyOptions;
|
||||
use codex_exec_server::CreateDirectoryOptions;
|
||||
use codex_exec_server::EnvironmentConfig;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::FileSystemSandboxContext;
|
||||
use codex_exec_server::RemoveOptions;
|
||||
use codex_protocol::protocol::ReadOnlyAccess;
|
||||
@@ -9,14 +11,137 @@ use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::PathBufExt;
|
||||
use core_test_support::get_remote_test_env;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::test_codex::test_env;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
const REMOTE_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_TEST_REMOTE_EXEC_SERVER_URL";
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn thread_can_start_and_complete_turn_with_disabled_default_environment() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let _mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-disabled"),
|
||||
ev_assistant_message("msg-disabled", "done"),
|
||||
ev_completed("resp-disabled"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let environment_manager = Arc::new(EnvironmentManager::new(Some("none".to_string())));
|
||||
let mut builder = test_codex().with_environment_manager(environment_manager);
|
||||
let test = builder.build(&server).await?;
|
||||
assert!(test.selected_environment().is_none());
|
||||
|
||||
test.submit_turn("hello from disabled env").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn thread_can_start_and_complete_turn_with_named_local_environment() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let _mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-local"),
|
||||
ev_assistant_message("msg-local", "done"),
|
||||
ev_completed("resp-local"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None));
|
||||
environment_manager
|
||||
.register_environment(
|
||||
"local".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_environment_manager(environment_manager)
|
||||
.with_thread_environment_id("local");
|
||||
let test = builder.build(&server).await?;
|
||||
let selected_environment = test
|
||||
.selected_environment()
|
||||
.context("named local environment should resolve")?;
|
||||
assert!(!selected_environment.is_remote());
|
||||
|
||||
test.submit_turn("hello from named local env").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn thread_can_start_and_complete_turn_with_named_remote_environment() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
let Some(_remote_env) = get_remote_test_env() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let remote_exec_server_url =
|
||||
std::env::var(REMOTE_EXEC_SERVER_URL_ENV_VAR).with_context(|| {
|
||||
format!(
|
||||
"{REMOTE_EXEC_SERVER_URL_ENV_VAR} must be set for named remote environment tests"
|
||||
)
|
||||
})?;
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let _mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-remote"),
|
||||
ev_assistant_message("msg-remote", "done"),
|
||||
ev_completed("resp-remote"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None));
|
||||
environment_manager
|
||||
.register_environment(
|
||||
"remote".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: Some(remote_exec_server_url),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_environment_manager(environment_manager)
|
||||
.with_thread_environment_id("remote");
|
||||
let test = builder.build_remote_aware(&server).await?;
|
||||
let selected_environment = test
|
||||
.selected_environment()
|
||||
.context("named remote environment should resolve")?;
|
||||
assert!(selected_environment.is_remote());
|
||||
|
||||
test.submit_turn("hello from named remote env").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_test_env_can_connect_and_use_filesystem() -> Result<()> {
|
||||
let Some(_remote_env) = get_remote_test_env() else {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
@@ -15,16 +17,31 @@ use crate::remote_process::RemoteProcess;
|
||||
|
||||
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
|
||||
pub type EnvironmentId = String;
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct EnvironmentConfig {
|
||||
pub exec_server_url: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct RegisteredEnvironment {
|
||||
pub environment_id: EnvironmentId,
|
||||
pub config: EnvironmentConfig,
|
||||
}
|
||||
|
||||
/// Lazily creates and caches the active environment for a session.
|
||||
///
|
||||
/// The manager keeps the session's environment selection stable so subagents
|
||||
/// and follow-up turns preserve an explicit disabled state.
|
||||
#[derive(Debug)]
|
||||
pub struct EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
default_environment_config: EnvironmentConfig,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
disabled: bool,
|
||||
current_environment: OnceCell<Option<Arc<Environment>>>,
|
||||
default_disabled: bool,
|
||||
default_environment: OnceCell<Option<Arc<Environment>>>,
|
||||
environment_configs: RwLock<HashMap<EnvironmentId, EnvironmentConfig>>,
|
||||
environment_cache: RwLock<HashMap<EnvironmentId, Arc<OnceCell<Option<Arc<Environment>>>>>>,
|
||||
}
|
||||
|
||||
impl Default for EnvironmentManager {
|
||||
@@ -45,12 +62,14 @@ impl EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
let (exec_server_url, default_disabled) = normalize_exec_server_url(exec_server_url);
|
||||
Self {
|
||||
exec_server_url,
|
||||
default_environment_config: EnvironmentConfig { exec_server_url },
|
||||
local_runtime_paths,
|
||||
disabled,
|
||||
current_environment: OnceCell::new(),
|
||||
default_disabled,
|
||||
default_environment: OnceCell::new(),
|
||||
environment_configs: RwLock::new(HashMap::new()),
|
||||
environment_cache: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,51 +93,168 @@ impl EnvironmentManager {
|
||||
/// disabled mode when no environment is available.
|
||||
pub fn from_environment(environment: Option<&Environment>) -> Self {
|
||||
match environment {
|
||||
Some(environment) => Self {
|
||||
exec_server_url: environment.exec_server_url().map(str::to_owned),
|
||||
local_runtime_paths: environment.local_runtime_paths().cloned(),
|
||||
disabled: false,
|
||||
current_environment: OnceCell::new(),
|
||||
},
|
||||
Some(environment) => Self::new_with_runtime_paths(
|
||||
environment.exec_server_url().map(str::to_owned),
|
||||
environment.local_runtime_paths().cloned(),
|
||||
),
|
||||
None => Self {
|
||||
exec_server_url: None,
|
||||
default_environment_config: EnvironmentConfig::default(),
|
||||
local_runtime_paths: None,
|
||||
disabled: true,
|
||||
current_environment: OnceCell::new(),
|
||||
default_disabled: true,
|
||||
default_environment: OnceCell::new(),
|
||||
environment_configs: RwLock::new(HashMap::new()),
|
||||
environment_cache: RwLock::new(HashMap::new()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when one is configured.
|
||||
/// Returns the default remote exec-server URL when one is configured.
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
self.default_environment_config.exec_server_url.as_deref()
|
||||
}
|
||||
|
||||
/// Returns true when this manager is configured to use a remote exec server.
|
||||
/// Returns true when the default environment is configured to use a remote exec server.
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.default_environment_config.exec_server_url.is_some()
|
||||
}
|
||||
|
||||
pub async fn register_environment(
|
||||
&self,
|
||||
environment_id: EnvironmentId,
|
||||
config: EnvironmentConfig,
|
||||
) -> Result<(), ExecServerError> {
|
||||
let environment_id = normalize_registered_environment_id(environment_id)?;
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(config.exec_server_url);
|
||||
if disabled {
|
||||
return Err(ExecServerError::Protocol(
|
||||
"named environments cannot use the reserved disabled value 'none'".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let config = EnvironmentConfig { exec_server_url };
|
||||
self.environment_configs
|
||||
.write()
|
||||
.await
|
||||
.insert(environment_id.clone(), config);
|
||||
self.environment_cache
|
||||
.write()
|
||||
.await
|
||||
.insert(environment_id, Arc::new(OnceCell::new()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_environments(
|
||||
&self,
|
||||
cursor: Option<&str>,
|
||||
limit: Option<u32>,
|
||||
) -> (Vec<RegisteredEnvironment>, Option<String>) {
|
||||
let cursor = normalize_environment_id(cursor).map(str::to_owned);
|
||||
let limit = usize::try_from(limit.unwrap_or(100)).unwrap_or(100);
|
||||
let configs = self.environment_configs.read().await;
|
||||
let mut environment_ids = configs.keys().cloned().collect::<Vec<_>>();
|
||||
environment_ids.sort();
|
||||
|
||||
let start_index = cursor
|
||||
.as_ref()
|
||||
.and_then(|cursor| environment_ids.iter().position(|id| id == cursor))
|
||||
.map_or(0, |index| index.saturating_add(1));
|
||||
|
||||
let selected_ids = environment_ids
|
||||
.into_iter()
|
||||
.skip(start_index)
|
||||
.take(limit.saturating_add(1))
|
||||
.collect::<Vec<_>>();
|
||||
let has_more = selected_ids.len() > limit;
|
||||
let next_cursor = if has_more {
|
||||
selected_ids.get(limit.saturating_sub(1)).cloned()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let data = selected_ids
|
||||
.into_iter()
|
||||
.take(limit)
|
||||
.filter_map(|environment_id| {
|
||||
configs
|
||||
.get(&environment_id)
|
||||
.cloned()
|
||||
.map(|config| RegisteredEnvironment {
|
||||
environment_id,
|
||||
config,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
(data, next_cursor)
|
||||
}
|
||||
|
||||
/// Returns the cached environment, creating it on first access.
|
||||
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
self.current_environment
|
||||
pub async fn environment(
|
||||
&self,
|
||||
environment_id: Option<&str>,
|
||||
) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
match normalize_environment_id(environment_id) {
|
||||
None => self.default_environment().await,
|
||||
Some(environment_id) => self.named_environment(environment_id).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn default_environment(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
self.default_environment
|
||||
.get_or_try_init(|| async {
|
||||
if self.disabled {
|
||||
if self.default_disabled {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(Arc::new(
|
||||
Environment::create_with_runtime_paths(
|
||||
self.exec_server_url.clone(),
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
)))
|
||||
self.build_environment(&self.default_environment_config)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map(Option::as_ref)
|
||||
.map(std::option::Option::<&Arc<Environment>>::cloned)
|
||||
}
|
||||
|
||||
async fn named_environment(
|
||||
&self,
|
||||
environment_id: &str,
|
||||
) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
let config = self
|
||||
.environment_configs
|
||||
.read()
|
||||
.await
|
||||
.get(environment_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
ExecServerError::Protocol(format!("unknown environment id: {environment_id}"))
|
||||
})?;
|
||||
let environment_cell = {
|
||||
let mut environment_cache = self.environment_cache.write().await;
|
||||
Arc::clone(
|
||||
environment_cache
|
||||
.entry(environment_id.to_string())
|
||||
.or_insert_with(|| Arc::new(OnceCell::new())),
|
||||
)
|
||||
};
|
||||
|
||||
environment_cell
|
||||
.get_or_try_init(|| async { self.build_environment(&config).await })
|
||||
.await
|
||||
.map(Option::as_ref)
|
||||
.map(std::option::Option::<&Arc<Environment>>::cloned)
|
||||
}
|
||||
|
||||
async fn build_environment(
|
||||
&self,
|
||||
config: &EnvironmentConfig,
|
||||
) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
Ok(Some(Arc::new(
|
||||
Environment::create_with_runtime_paths(
|
||||
config.exec_server_url.clone(),
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Concrete execution/filesystem environment selected for a session.
|
||||
@@ -236,12 +372,33 @@ fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>
|
||||
Some(url) => (Some(url.to_string()), false),
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_environment_id(environment_id: Option<&str>) -> Option<&str> {
|
||||
match environment_id.map(str::trim) {
|
||||
None | Some("") => None,
|
||||
Some(environment_id) if environment_id.eq_ignore_ascii_case("default") => None,
|
||||
Some(environment_id) => Some(environment_id),
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_registered_environment_id(
|
||||
environment_id: String,
|
||||
) -> Result<EnvironmentId, ExecServerError> {
|
||||
match normalize_environment_id(Some(&environment_id)) {
|
||||
None => Err(ExecServerError::Protocol(
|
||||
"environment id is reserved for the default environment".to_string(),
|
||||
)),
|
||||
Some(environment_id) => Ok(environment_id.to_string()),
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::Environment;
|
||||
use super::EnvironmentConfig;
|
||||
use super::EnvironmentManager;
|
||||
use super::RegisteredEnvironment;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::ProcessId;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -260,7 +417,6 @@ mod tests {
|
||||
fn environment_manager_normalizes_empty_url() {
|
||||
let manager = EnvironmentManager::new(Some(String::new()));
|
||||
|
||||
assert!(!manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -269,7 +425,6 @@ mod tests {
|
||||
fn environment_manager_treats_none_value_as_disabled() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
assert!(manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -283,11 +438,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_current_caches_environment() {
|
||||
async fn environment_manager_default_environment_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager.current().await.expect("get current environment");
|
||||
let second = manager.current().await.expect("get current environment");
|
||||
let first = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment");
|
||||
let second = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment");
|
||||
|
||||
let first = first.expect("local environment");
|
||||
let second = second.expect("local environment");
|
||||
@@ -308,9 +469,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let environment = manager
|
||||
.current()
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get current environment")
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
|
||||
@@ -321,18 +482,182 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disabled_environment_manager_has_no_current_environment() {
|
||||
async fn disabled_environment_manager_has_no_default_environment() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
assert!(
|
||||
manager
|
||||
.current()
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get current environment")
|
||||
.expect("get default environment")
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_named_environment_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
manager
|
||||
.register_environment(
|
||||
"dev".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("register environment");
|
||||
|
||||
let first = manager
|
||||
.environment(Some("dev"))
|
||||
.await
|
||||
.expect("get named environment")
|
||||
.expect("local environment");
|
||||
let second = manager
|
||||
.environment(Some("dev"))
|
||||
.await
|
||||
.expect("get named environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_unknown_environment_id() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let error = manager
|
||||
.environment(Some("missing"))
|
||||
.await
|
||||
.expect_err("unknown environment id should error");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: unknown environment id: missing"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_treats_default_environment_id_as_default() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
let second = manager
|
||||
.environment(Some("default"))
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_registering_default_environment_id() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let error = manager
|
||||
.register_environment(
|
||||
"default".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("default id should be reserved");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: environment id is reserved for the default environment"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_registering_disabled_named_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let error = manager
|
||||
.register_environment(
|
||||
"disabled".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: Some("none".to_string()),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("named disabled environment should be rejected");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: named environments cannot use the reserved disabled value 'none'"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_lists_named_environments_sorted_with_pagination() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
manager
|
||||
.register_environment(
|
||||
"beta".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("register beta environment");
|
||||
manager
|
||||
.register_environment(
|
||||
"alpha".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("register alpha environment");
|
||||
manager
|
||||
.register_environment(
|
||||
"gamma".to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("register gamma environment");
|
||||
|
||||
let (first_page, next_cursor) = manager.list_environments(/*cursor*/ None, Some(2)).await;
|
||||
assert_eq!(
|
||||
first_page,
|
||||
vec![
|
||||
RegisteredEnvironment {
|
||||
environment_id: "alpha".to_string(),
|
||||
config: EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
},
|
||||
RegisteredEnvironment {
|
||||
environment_id: "beta".to_string(),
|
||||
config: EnvironmentConfig {
|
||||
exec_server_url: Some("ws://127.0.0.1:8765".to_string()),
|
||||
},
|
||||
},
|
||||
]
|
||||
);
|
||||
assert_eq!(next_cursor, Some("beta".to_string()));
|
||||
|
||||
let (second_page, next_cursor) = manager.list_environments(Some("beta"), Some(2)).await;
|
||||
assert_eq!(
|
||||
second_page,
|
||||
vec![RegisteredEnvironment {
|
||||
environment_id: "gamma".to_string(),
|
||||
config: EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
}]
|
||||
);
|
||||
assert_eq!(next_cursor, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_environment_has_ready_local_executor() {
|
||||
let environment = Environment::default();
|
||||
|
||||
@@ -24,7 +24,9 @@ pub use client_api::ExecServerClientConnectOptions;
|
||||
pub use client_api::RemoteExecServerConnectArgs;
|
||||
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
|
||||
pub use environment::Environment;
|
||||
pub use environment::EnvironmentConfig;
|
||||
pub use environment::EnvironmentManager;
|
||||
pub use environment::RegisteredEnvironment;
|
||||
pub use file_system::CopyOptions;
|
||||
pub use file_system::CreateDirectoryOptions;
|
||||
pub use file_system::ExecutorFileSystem;
|
||||
|
||||
Reference in New Issue
Block a user