mirror of
https://github.com/openai/codex.git
synced 2026-04-14 19:41:45 +03:00
Compare commits
1 Commits
pr17704
...
exec-env-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cc9eb24ce |
@@ -241,6 +241,14 @@ client_request_definitions! {
|
||||
inspect_params: true,
|
||||
response: v2::ThreadStartResponse,
|
||||
},
|
||||
ExecServerEnvironmentRegister => "execEnvironment/register" {
|
||||
params: v2::ExecServerEnvironmentRegisterParams,
|
||||
response: v2::ExecServerEnvironmentRegisterResponse,
|
||||
},
|
||||
ExecServerEnvironmentList => "execEnvironment/list" {
|
||||
params: v2::ExecServerEnvironmentListParams,
|
||||
response: v2::ExecServerEnvironmentListResponse,
|
||||
},
|
||||
ThreadResume => "thread/resume" {
|
||||
params: v2::ThreadResumeParams,
|
||||
inspect_params: true,
|
||||
@@ -1395,6 +1403,7 @@ mod tests {
|
||||
thread: v2::Thread {
|
||||
id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
|
||||
forked_from_id: None,
|
||||
exec_environment_name: None,
|
||||
preview: "first prompt".to_string(),
|
||||
ephemeral: true,
|
||||
model_provider: "openai".to_string(),
|
||||
@@ -1433,6 +1442,7 @@ mod tests {
|
||||
"thread": {
|
||||
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8",
|
||||
"forkedFromId": null,
|
||||
"execEnvironmentName": null,
|
||||
"preview": "first prompt",
|
||||
"ephemeral": true,
|
||||
"modelProvider": "openai",
|
||||
|
||||
@@ -2683,6 +2683,9 @@ pub struct ThreadStartParams {
|
||||
#[experimental("thread/start.mockExperimentalField")]
|
||||
#[ts(optional = nullable)]
|
||||
pub mock_experimental_field: Option<String>,
|
||||
/// Optional named exec-server environment to use for this thread.
|
||||
#[ts(optional = nullable)]
|
||||
pub exec_environment_name: Option<String>,
|
||||
/// If true, opt into emitting raw Responses API items on the event stream.
|
||||
/// This is for internal use only (e.g. Codex Cloud).
|
||||
#[experimental("thread/start.experimentalRawEvents")]
|
||||
@@ -2695,6 +2698,56 @@ pub struct ThreadStartParams {
|
||||
pub persist_extended_history: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ExecServerEnvironment {
|
||||
/// Unique name used to reference this exec-server environment.
|
||||
pub name: String,
|
||||
/// Exec-server URL used when binding a thread to this environment.
|
||||
pub exec_server_url: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ExecServerEnvironmentRegisterParams {
|
||||
/// Human-readable name for selecting this environment in thread start.
|
||||
pub name: String,
|
||||
/// Exec-server URL to register under this environment name.
|
||||
pub exec_server_url: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ExecServerEnvironmentRegisterResponse {
|
||||
/// The environment entry that was registered.
|
||||
pub environment: ExecServerEnvironment,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ExecServerEnvironmentListParams {
|
||||
/// Opaque pagination cursor returned by a previous call.
|
||||
#[ts(optional = nullable)]
|
||||
pub cursor: Option<String>,
|
||||
/// Optional page size; defaults to a reasonable server-side value.
|
||||
#[ts(optional = nullable)]
|
||||
pub limit: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ExecServerEnvironmentListResponse {
|
||||
pub data: Vec<ExecServerEnvironment>,
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
/// If None, there are no more items to return.
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -3718,6 +3771,8 @@ pub struct Thread {
|
||||
pub id: String,
|
||||
/// Source thread id when this thread was created by forking another thread.
|
||||
pub forked_from_id: Option<String>,
|
||||
/// Optional named exec-server environment selected for this thread.
|
||||
pub exec_environment_name: Option<String>,
|
||||
/// Usually the first user message in the thread, if available.
|
||||
pub preview: String,
|
||||
/// Whether the thread is ephemeral and should not be materialized on disk.
|
||||
@@ -8576,6 +8631,7 @@ mod tests {
|
||||
"thread": {
|
||||
"id": "thread-id",
|
||||
"forkedFromId": null,
|
||||
"execEnvironmentName": null,
|
||||
"preview": "",
|
||||
"ephemeral": false,
|
||||
"modelProvider": "openai",
|
||||
|
||||
@@ -46,6 +46,11 @@ use codex_app_server_protocol::CommandExecWriteParams;
|
||||
use codex_app_server_protocol::ConversationGitInfo;
|
||||
use codex_app_server_protocol::ConversationSummary;
|
||||
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
|
||||
use codex_app_server_protocol::ExecServerEnvironment;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentListParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentListResponse;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentRegisterResponse;
|
||||
use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature;
|
||||
use codex_app_server_protocol::ExperimentalFeatureListParams;
|
||||
use codex_app_server_protocol::ExperimentalFeatureListResponse;
|
||||
@@ -244,6 +249,7 @@ use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
|
||||
use codex_core::windows_sandbox::WindowsSandboxSetupRequest;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::LOCAL_FS;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
@@ -439,6 +445,7 @@ pub(crate) struct CodexMessageProcessor {
|
||||
cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
|
||||
runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
exec_environment_registry: ExecEnvironmentRegistry,
|
||||
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
||||
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
@@ -472,6 +479,69 @@ struct ListenerTaskContext {
|
||||
codex_home: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct ExecEnvironmentRegistry {
|
||||
environments: Arc<RwLock<BTreeMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl ExecEnvironmentRegistry {
|
||||
fn register(
|
||||
&self,
|
||||
name: String,
|
||||
exec_server_url: String,
|
||||
) -> Result<ExecServerEnvironment, String> {
|
||||
let mut environments = self
|
||||
.environments
|
||||
.write()
|
||||
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
|
||||
environments.insert(name.clone(), exec_server_url.clone());
|
||||
Ok(ExecServerEnvironment {
|
||||
name,
|
||||
exec_server_url,
|
||||
})
|
||||
}
|
||||
|
||||
fn list(
|
||||
&self,
|
||||
cursor: Option<String>,
|
||||
limit: Option<u32>,
|
||||
) -> Result<ExecServerEnvironmentListResponse, String> {
|
||||
let environments = self
|
||||
.environments
|
||||
.read()
|
||||
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
|
||||
let effective_limit = limit.unwrap_or(u32::MAX).max(1) as usize;
|
||||
let mut data = Vec::new();
|
||||
for (name, exec_server_url) in environments.iter() {
|
||||
if cursor.as_ref().is_some_and(|cursor| name <= cursor) {
|
||||
continue;
|
||||
}
|
||||
data.push(ExecServerEnvironment {
|
||||
name: name.clone(),
|
||||
exec_server_url: exec_server_url.clone(),
|
||||
});
|
||||
if data.len() >= effective_limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let next_cursor = data.last().and_then(|last| {
|
||||
environments
|
||||
.range(last.name.clone()..)
|
||||
.nth(1)
|
||||
.map(|_| last.name.clone())
|
||||
});
|
||||
Ok(ExecServerEnvironmentListResponse { data, next_cursor })
|
||||
}
|
||||
|
||||
fn resolve(&self, name: &str) -> Result<Option<String>, String> {
|
||||
let environments = self
|
||||
.environments
|
||||
.read()
|
||||
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
|
||||
Ok(environments.get(name).cloned())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum EnsureConversationListenerResult {
|
||||
Attached,
|
||||
@@ -599,6 +669,7 @@ pub(crate) struct CodexMessageProcessorArgs {
|
||||
pub(crate) cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
|
||||
pub(crate) runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
|
||||
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
pub(crate) exec_environment_registry: ExecEnvironmentRegistry,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
}
|
||||
@@ -672,6 +743,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides,
|
||||
runtime_feature_enablement,
|
||||
cloud_requirements,
|
||||
exec_environment_registry,
|
||||
feedback,
|
||||
log_db,
|
||||
} = args;
|
||||
@@ -685,6 +757,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides,
|
||||
runtime_feature_enablement,
|
||||
cloud_requirements,
|
||||
exec_environment_registry,
|
||||
active_login: Arc::new(Mutex::new(None)),
|
||||
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
|
||||
thread_state_manager: ThreadStateManager::new(),
|
||||
@@ -852,6 +925,14 @@ impl CodexMessageProcessor {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExecServerEnvironmentRegister { request_id, params } => {
|
||||
self.exec_environment_register(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExecServerEnvironmentList { request_id, params } => {
|
||||
self.exec_environment_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadUnsubscribe { request_id, params } => {
|
||||
self.thread_unsubscribe(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -2240,6 +2321,7 @@ impl CodexMessageProcessor {
|
||||
experimental_raw_events,
|
||||
personality,
|
||||
ephemeral,
|
||||
exec_environment_name,
|
||||
session_start_source,
|
||||
persist_extended_history,
|
||||
} = params;
|
||||
@@ -2271,6 +2353,24 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
let request_trace = request_context.request_trace();
|
||||
let runtime_feature_enablement = self.current_runtime_feature_enablement();
|
||||
let selected_exec_environment = match exec_environment_name {
|
||||
Some(name) => match self.exec_environment_registry.resolve(&name) {
|
||||
Ok(Some(exec_server_url)) => Some((name, exec_server_url)),
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("unknown exec environment: {name}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(message) => {
|
||||
self.send_internal_error(request_id, message).await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
let thread_start_task = async move {
|
||||
Self::thread_start_task(
|
||||
listener_task_context,
|
||||
@@ -2283,6 +2383,7 @@ impl CodexMessageProcessor {
|
||||
config,
|
||||
typesafe_overrides,
|
||||
dynamic_tools,
|
||||
selected_exec_environment,
|
||||
session_start_source,
|
||||
persist_extended_history,
|
||||
service_name,
|
||||
@@ -2359,6 +2460,7 @@ impl CodexMessageProcessor {
|
||||
config_overrides: Option<HashMap<String, serde_json::Value>>,
|
||||
typesafe_overrides: ConfigOverrides,
|
||||
dynamic_tools: Option<Vec<ApiDynamicToolSpec>>,
|
||||
selected_exec_environment: Option<(String, String)>,
|
||||
session_start_source: Option<codex_app_server_protocol::ThreadStartSource>,
|
||||
persist_extended_history: bool,
|
||||
service_name: Option<String>,
|
||||
@@ -2496,10 +2598,17 @@ impl CodexMessageProcessor {
|
||||
.collect()
|
||||
};
|
||||
let core_dynamic_tool_count = core_dynamic_tools.len();
|
||||
let selected_exec_environment_name =
|
||||
selected_exec_environment.as_ref().map(|(name, _)| name.clone());
|
||||
let environment_manager_override = selected_exec_environment
|
||||
.as_ref()
|
||||
.map(|(_, exec_server_url)| {
|
||||
Arc::new(EnvironmentManager::new(Some(exec_server_url.clone())))
|
||||
});
|
||||
|
||||
match listener_task_context
|
||||
.thread_manager
|
||||
.start_thread_with_tools_and_service_name(
|
||||
.start_thread_with_tools_service_name_and_environment_manager(
|
||||
config,
|
||||
match session_start_source
|
||||
.unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup)
|
||||
@@ -2510,6 +2619,7 @@ impl CodexMessageProcessor {
|
||||
core_dynamic_tools,
|
||||
persist_extended_history,
|
||||
service_name,
|
||||
environment_manager_override,
|
||||
request_trace,
|
||||
)
|
||||
.instrument(tracing::info_span!(
|
||||
@@ -2552,6 +2662,7 @@ impl CodexMessageProcessor {
|
||||
&config_snapshot,
|
||||
session_configured.rollout_path.clone(),
|
||||
);
|
||||
thread.exec_environment_name = selected_exec_environment_name.clone();
|
||||
|
||||
// Auto-attach a thread listener when starting a thread.
|
||||
Self::log_listener_attach_result(
|
||||
@@ -2582,6 +2693,13 @@ impl CodexMessageProcessor {
|
||||
))
|
||||
.await;
|
||||
|
||||
let thread_state = listener_task_context
|
||||
.thread_state_manager
|
||||
.thread_state(thread_id)
|
||||
.await;
|
||||
thread_state.lock().await.exec_environment_name =
|
||||
selected_exec_environment_name.clone();
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
listener_task_context
|
||||
.thread_watch_manager
|
||||
@@ -3755,18 +3873,18 @@ impl CodexMessageProcessor {
|
||||
.loaded_statuses_for_threads(status_ids)
|
||||
.await;
|
||||
|
||||
let data = threads
|
||||
.into_iter()
|
||||
.map(|(conversation_id, mut thread)| {
|
||||
if let Some(title) = names.get(&conversation_id).cloned() {
|
||||
set_thread_name_from_title(&mut thread, title);
|
||||
}
|
||||
if let Some(status) = statuses.get(&thread.id) {
|
||||
thread.status = status.clone();
|
||||
}
|
||||
thread
|
||||
})
|
||||
.collect();
|
||||
let mut data = Vec::with_capacity(threads.len());
|
||||
for (conversation_id, mut thread) in threads {
|
||||
if let Some(title) = names.get(&conversation_id).cloned() {
|
||||
set_thread_name_from_title(&mut thread, title);
|
||||
}
|
||||
if let Some(status) = statuses.get(&thread.id) {
|
||||
thread.status = status.clone();
|
||||
}
|
||||
self.attach_exec_environment_name(conversation_id, &mut thread)
|
||||
.await;
|
||||
data.push(thread);
|
||||
}
|
||||
let response = ThreadListResponse { data, next_cursor };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
@@ -3934,6 +4052,7 @@ impl CodexMessageProcessor {
|
||||
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
|
||||
}
|
||||
self.attach_thread_name(thread_uuid, &mut thread).await;
|
||||
self.attach_exec_environment_name(thread_uuid, &mut thread).await;
|
||||
|
||||
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
|
||||
match read_rollout_items_from_rollout(rollout_path).await {
|
||||
@@ -3984,6 +4103,62 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn exec_environment_register(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ExecServerEnvironmentRegisterParams,
|
||||
) {
|
||||
if params.name.trim().is_empty() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"exec environment name must not be empty".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
if params.exec_server_url.trim().is_empty() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"exec_server_url must not be empty".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
match self
|
||||
.exec_environment_registry
|
||||
.register(params.name, params.exec_server_url)
|
||||
{
|
||||
Ok(environment) => {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
ExecServerEnvironmentRegisterResponse { environment },
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(message) => self.send_internal_error(request_id, message).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn exec_environment_list(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ExecServerEnvironmentListParams,
|
||||
) {
|
||||
match self.exec_environment_registry.list(params.cursor, params.limit) {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
Err(message) => self.send_internal_error(request_id, message).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn attach_exec_environment_name(&self, thread_id: ThreadId, thread: &mut Thread) {
|
||||
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
|
||||
if let Some(exec_environment_name) = thread_state.lock().await.exec_environment_name.clone()
|
||||
{
|
||||
thread.exec_environment_name = Some(exec_environment_name);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.thread_manager.subscribe_thread_created()
|
||||
}
|
||||
@@ -9608,6 +9783,7 @@ fn build_thread_from_snapshot(
|
||||
Thread {
|
||||
id: thread_id.to_string(),
|
||||
forked_from_id: None,
|
||||
exec_environment_name: None,
|
||||
preview: String::new(),
|
||||
ephemeral: config_snapshot.ephemeral,
|
||||
model_provider: config_snapshot.model_provider_id.clone(),
|
||||
@@ -9651,6 +9827,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
Thread {
|
||||
id: conversation_id.to_string(),
|
||||
forked_from_id: None,
|
||||
exec_environment_name: None,
|
||||
preview,
|
||||
ephemeral: false,
|
||||
model_provider,
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_message_processor::CodexMessageProcessorArgs;
|
||||
use crate::codex_message_processor::ExecEnvironmentRegistry;
|
||||
use crate::config_api::ConfigApi;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::external_agent_config_api::ExternalAgentConfigApi;
|
||||
@@ -246,6 +247,7 @@ impl MessageProcessor {
|
||||
let cli_overrides = Arc::new(RwLock::new(cli_overrides));
|
||||
let runtime_feature_enablement = Arc::new(RwLock::new(BTreeMap::new()));
|
||||
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
|
||||
let exec_environment_registry = ExecEnvironmentRegistry::default();
|
||||
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager: auth_manager.clone(),
|
||||
thread_manager: Arc::clone(&thread_manager),
|
||||
@@ -256,6 +258,7 @@ impl MessageProcessor {
|
||||
cli_overrides: cli_overrides.clone(),
|
||||
runtime_feature_enablement: runtime_feature_enablement.clone(),
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
exec_environment_registry,
|
||||
feedback,
|
||||
log_db,
|
||||
});
|
||||
|
||||
@@ -58,6 +58,7 @@ pub(crate) struct ThreadState {
|
||||
pub(crate) pending_interrupts: PendingInterruptQueue,
|
||||
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
|
||||
pub(crate) turn_summary: TurnSummary,
|
||||
pub(crate) exec_environment_name: Option<String>,
|
||||
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,
|
||||
pub(crate) experimental_raw_events: bool,
|
||||
pub(crate) listener_generation: u64,
|
||||
|
||||
@@ -24,6 +24,8 @@ use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
use codex_app_server_protocol::ExperimentalFeatureListParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentListParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
|
||||
use codex_app_server_protocol::FeedbackUploadParams;
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
@@ -336,6 +338,24 @@ impl McpProcess {
|
||||
self.send_request("thread/start", params).await
|
||||
}
|
||||
|
||||
/// Send an `execEnvironment/register` JSON-RPC request.
|
||||
pub async fn send_exec_environment_register_request(
|
||||
&mut self,
|
||||
params: ExecServerEnvironmentRegisterParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("execEnvironment/register", params).await
|
||||
}
|
||||
|
||||
/// Send an `execEnvironment/list` JSON-RPC request.
|
||||
pub async fn send_exec_environment_list_request(
|
||||
&mut self,
|
||||
params: ExecServerEnvironmentListParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("execEnvironment/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/resume` JSON-RPC request.
|
||||
pub async fn send_thread_resume_request(
|
||||
&mut self,
|
||||
|
||||
@@ -270,6 +270,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
|
||||
developer_instructions: None,
|
||||
personality: None,
|
||||
ephemeral: None,
|
||||
exec_environment_name: None,
|
||||
session_start_source: None,
|
||||
dynamic_tools: None,
|
||||
mock_experimental_field: None,
|
||||
|
||||
@@ -6,6 +6,10 @@ use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentListParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentListResponse;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
|
||||
use codex_app_server_protocol::ExecServerEnvironmentRegisterResponse;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
@@ -526,6 +530,101 @@ async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn registered_exec_environment_is_exposed_on_thread_surfaces() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let register_id = mcp
|
||||
.send_exec_environment_register_request(ExecServerEnvironmentRegisterParams {
|
||||
name: "dev-remote".to_string(),
|
||||
exec_server_url: "none".to_string(),
|
||||
})
|
||||
.await?;
|
||||
let register_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(register_id)),
|
||||
)
|
||||
.await??;
|
||||
let ExecServerEnvironmentRegisterResponse { environment } =
|
||||
to_response::<ExecServerEnvironmentRegisterResponse>(register_resp)?;
|
||||
assert_eq!(environment.name, "dev-remote");
|
||||
assert_eq!(environment.exec_server_url, "none");
|
||||
|
||||
let list_id = mcp
|
||||
.send_exec_environment_list_request(ExecServerEnvironmentListParams {
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
})
|
||||
.await?;
|
||||
let list_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
||||
)
|
||||
.await??;
|
||||
let ExecServerEnvironmentListResponse { data, next_cursor } =
|
||||
to_response::<ExecServerEnvironmentListResponse>(list_resp)?;
|
||||
assert_eq!(next_cursor, None);
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].name, "dev-remote");
|
||||
assert_eq!(data[0].exec_server_url, "none");
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
exec_environment_name: Some("dev-remote".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let start_result = start_resp.result.clone();
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
assert_eq!(thread.exec_environment_name.as_deref(), Some("dev-remote"));
|
||||
assert_eq!(
|
||||
start_result
|
||||
.get("thread")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|thread| thread.get("execEnvironmentName"))
|
||||
.and_then(Value::as_str),
|
||||
Some("dev-remote"),
|
||||
"thread/start must serialize `thread.exec_environment_name` on the wire"
|
||||
);
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: thread.id.clone(),
|
||||
include_turns: false,
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let read_result = read_resp.result.clone();
|
||||
let ThreadReadResponse { thread: read_thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
assert_eq!(read_thread.exec_environment_name.as_deref(), Some("dev-remote"));
|
||||
assert_eq!(
|
||||
read_result
|
||||
.get("thread")
|
||||
.and_then(Value::as_object)
|
||||
.and_then(|thread| thread.get("execEnvironmentName"))
|
||||
.and_then(Value::as_str),
|
||||
Some("dev-remote"),
|
||||
"thread/read must serialize `thread.exec_environment_name` on the wire"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
|
||||
@@ -238,10 +238,13 @@ impl AgentControl {
|
||||
/*metrics_service_name*/ None,
|
||||
inherited_shell_snapshot,
|
||||
inherited_exec_policy,
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
(None, _) => state.spawn_new_thread(config, self.clone()).await?,
|
||||
(None, _) => state
|
||||
.spawn_new_thread(config, self.clone(), None)
|
||||
.await?,
|
||||
};
|
||||
agent_metadata.agent_id = Some(new_thread.thread_id);
|
||||
reservation.commit(agent_metadata.clone());
|
||||
|
||||
@@ -496,6 +496,28 @@ impl ThreadManager {
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.start_thread_with_tools_service_name_and_environment_manager(
|
||||
config,
|
||||
initial_history,
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
/*environment_manager_override*/ None,
|
||||
parent_trace,
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn start_thread_with_tools_service_name_and_environment_manager(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
environment_manager_override: Option<Arc<EnvironmentManager>>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.state.spawn_thread(
|
||||
config,
|
||||
@@ -505,6 +527,7 @@ impl ThreadManager {
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
environment_manager_override,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -545,6 +568,7 @@ impl ThreadManager {
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
/*metrics_service_name*/ None,
|
||||
/*environment_manager_override*/ None,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -564,6 +588,7 @@ impl ThreadManager {
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*metrics_service_name*/ None,
|
||||
/*environment_manager_override*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ Some(user_shell_override),
|
||||
))
|
||||
@@ -586,6 +611,7 @@ impl ThreadManager {
|
||||
Vec::new(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*metrics_service_name*/ None,
|
||||
/*environment_manager_override*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ Some(user_shell_override),
|
||||
))
|
||||
@@ -694,6 +720,7 @@ impl ThreadManager {
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
/*metrics_service_name*/ None,
|
||||
/*environment_manager_override*/ None,
|
||||
parent_trace,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -760,6 +787,7 @@ impl ThreadManagerState {
|
||||
&self,
|
||||
config: Config,
|
||||
agent_control: AgentControl,
|
||||
environment_manager_override: Option<Arc<EnvironmentManager>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.spawn_new_thread_with_source(
|
||||
config,
|
||||
@@ -769,6 +797,7 @@ impl ThreadManagerState {
|
||||
/*metrics_service_name*/ None,
|
||||
/*inherited_shell_snapshot*/ None,
|
||||
/*inherited_exec_policy*/ None,
|
||||
environment_manager_override,
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -783,6 +812,7 @@ impl ThreadManagerState {
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
|
||||
environment_manager_override: Option<Arc<EnvironmentManager>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
Box::pin(self.spawn_thread_with_source(
|
||||
config,
|
||||
@@ -795,6 +825,7 @@ impl ThreadManagerState {
|
||||
metrics_service_name,
|
||||
inherited_shell_snapshot,
|
||||
inherited_exec_policy,
|
||||
environment_manager_override,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -822,6 +853,7 @@ impl ThreadManagerState {
|
||||
/*metrics_service_name*/ None,
|
||||
inherited_shell_snapshot,
|
||||
inherited_exec_policy,
|
||||
/*environment_manager_override*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -850,6 +882,7 @@ impl ThreadManagerState {
|
||||
/*metrics_service_name*/ None,
|
||||
inherited_shell_snapshot,
|
||||
inherited_exec_policy,
|
||||
/*environment_manager_override*/ None,
|
||||
/*parent_trace*/ None,
|
||||
/*user_shell_override*/ None,
|
||||
))
|
||||
@@ -867,6 +900,7 @@ impl ThreadManagerState {
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
environment_manager_override: Option<Arc<EnvironmentManager>>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
) -> CodexResult<NewThread> {
|
||||
@@ -881,6 +915,7 @@ impl ThreadManagerState {
|
||||
metrics_service_name,
|
||||
/*inherited_shell_snapshot*/ None,
|
||||
/*inherited_exec_policy*/ None,
|
||||
environment_manager_override,
|
||||
parent_trace,
|
||||
user_shell_override,
|
||||
))
|
||||
@@ -900,6 +935,7 @@ impl ThreadManagerState {
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
|
||||
environment_manager_override: Option<Arc<EnvironmentManager>>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
) -> CodexResult<NewThread> {
|
||||
@@ -914,7 +950,8 @@ impl ThreadManagerState {
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager: Arc::clone(&self.models_manager),
|
||||
environment_manager: Arc::clone(&self.environment_manager),
|
||||
environment_manager: environment_manager_override
|
||||
.unwrap_or_else(|| Arc::clone(&self.environment_manager)),
|
||||
skills_manager: Arc::clone(&self.skills_manager),
|
||||
plugins_manager: Arc::clone(&self.plugins_manager),
|
||||
mcp_manager: Arc::clone(&self.mcp_manager),
|
||||
|
||||
Reference in New Issue
Block a user