Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
1cc9eb24ce Register named exec environments for app-server threads
Add execEnvironment register/list requests, thread start selection, and thread metadata plumbing so app-server threads can bind to a named exec-server environment.

Co-authored-by: Codex <noreply@openai.com>
2026-04-13 19:33:56 -07:00
10 changed files with 422 additions and 15 deletions

View File

@@ -241,6 +241,14 @@ client_request_definitions! {
inspect_params: true,
response: v2::ThreadStartResponse,
},
ExecServerEnvironmentRegister => "execEnvironment/register" {
params: v2::ExecServerEnvironmentRegisterParams,
response: v2::ExecServerEnvironmentRegisterResponse,
},
ExecServerEnvironmentList => "execEnvironment/list" {
params: v2::ExecServerEnvironmentListParams,
response: v2::ExecServerEnvironmentListResponse,
},
ThreadResume => "thread/resume" {
params: v2::ThreadResumeParams,
inspect_params: true,
@@ -1395,6 +1403,7 @@ mod tests {
thread: v2::Thread {
id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
forked_from_id: None,
exec_environment_name: None,
preview: "first prompt".to_string(),
ephemeral: true,
model_provider: "openai".to_string(),
@@ -1433,6 +1442,7 @@ mod tests {
"thread": {
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8",
"forkedFromId": null,
"execEnvironmentName": null,
"preview": "first prompt",
"ephemeral": true,
"modelProvider": "openai",

View File

@@ -2683,6 +2683,9 @@ pub struct ThreadStartParams {
#[experimental("thread/start.mockExperimentalField")]
#[ts(optional = nullable)]
pub mock_experimental_field: Option<String>,
/// Optional named exec-server environment to use for this thread.
#[ts(optional = nullable)]
pub exec_environment_name: Option<String>,
/// If true, opt into emitting raw Responses API items on the event stream.
/// This is for internal use only (e.g. Codex Cloud).
#[experimental("thread/start.experimentalRawEvents")]
@@ -2695,6 +2698,56 @@ pub struct ThreadStartParams {
pub persist_extended_history: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecServerEnvironment {
/// Unique name used to reference this exec-server environment.
pub name: String,
/// Exec-server URL used when binding a thread to this environment.
pub exec_server_url: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecServerEnvironmentRegisterParams {
/// Human-readable name for selecting this environment in thread start.
pub name: String,
/// Exec-server URL to register under this environment name.
pub exec_server_url: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecServerEnvironmentRegisterResponse {
/// The environment entry that was registered.
pub environment: ExecServerEnvironment,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecServerEnvironmentListParams {
/// Opaque pagination cursor returned by a previous call.
#[ts(optional = nullable)]
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
#[ts(optional = nullable)]
pub limit: Option<u32>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecServerEnvironmentListResponse {
pub data: Vec<ExecServerEnvironment>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// If None, there are no more items to return.
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -3718,6 +3771,8 @@ pub struct Thread {
pub id: String,
/// Source thread id when this thread was created by forking another thread.
pub forked_from_id: Option<String>,
/// Optional named exec-server environment selected for this thread.
pub exec_environment_name: Option<String>,
/// Usually the first user message in the thread, if available.
pub preview: String,
/// Whether the thread is ephemeral and should not be materialized on disk.
@@ -8576,6 +8631,7 @@ mod tests {
"thread": {
"id": "thread-id",
"forkedFromId": null,
"execEnvironmentName": null,
"preview": "",
"ephemeral": false,
"modelProvider": "openai",

View File

@@ -46,6 +46,11 @@ use codex_app_server_protocol::CommandExecWriteParams;
use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::ExecServerEnvironment;
use codex_app_server_protocol::ExecServerEnvironmentListParams;
use codex_app_server_protocol::ExecServerEnvironmentListResponse;
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
use codex_app_server_protocol::ExecServerEnvironmentRegisterResponse;
use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature;
use codex_app_server_protocol::ExperimentalFeatureListParams;
use codex_app_server_protocol::ExperimentalFeatureListResponse;
@@ -244,6 +249,7 @@ use codex_core::sandboxing::SandboxPermissions;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
use codex_core::windows_sandbox::WindowsSandboxSetupRequest;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::LOCAL_FS;
use codex_features::FEATURES;
use codex_features::Feature;
@@ -439,6 +445,7 @@ pub(crate) struct CodexMessageProcessor {
cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
exec_environment_registry: ExecEnvironmentRegistry,
active_login: Arc<Mutex<Option<ActiveLogin>>>,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
@@ -472,6 +479,69 @@ struct ListenerTaskContext {
codex_home: PathBuf,
}
#[derive(Clone, Default)]
pub(crate) struct ExecEnvironmentRegistry {
environments: Arc<RwLock<BTreeMap<String, String>>>,
}
impl ExecEnvironmentRegistry {
fn register(
&self,
name: String,
exec_server_url: String,
) -> Result<ExecServerEnvironment, String> {
let mut environments = self
.environments
.write()
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
environments.insert(name.clone(), exec_server_url.clone());
Ok(ExecServerEnvironment {
name,
exec_server_url,
})
}
fn list(
&self,
cursor: Option<String>,
limit: Option<u32>,
) -> Result<ExecServerEnvironmentListResponse, String> {
let environments = self
.environments
.read()
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
let effective_limit = limit.unwrap_or(u32::MAX).max(1) as usize;
let mut data = Vec::new();
for (name, exec_server_url) in environments.iter() {
if cursor.as_ref().is_some_and(|cursor| name <= cursor) {
continue;
}
data.push(ExecServerEnvironment {
name: name.clone(),
exec_server_url: exec_server_url.clone(),
});
if data.len() >= effective_limit {
break;
}
}
let next_cursor = data.last().and_then(|last| {
environments
.range(last.name.clone()..)
.nth(1)
.map(|_| last.name.clone())
});
Ok(ExecServerEnvironmentListResponse { data, next_cursor })
}
fn resolve(&self, name: &str) -> Result<Option<String>, String> {
let environments = self
.environments
.read()
.map_err(|_| "exec environment registry lock poisoned".to_string())?;
Ok(environments.get(name).cloned())
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum EnsureConversationListenerResult {
Attached,
@@ -599,6 +669,7 @@ pub(crate) struct CodexMessageProcessorArgs {
pub(crate) cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
pub(crate) runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
pub(crate) exec_environment_registry: ExecEnvironmentRegistry,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
}
@@ -672,6 +743,7 @@ impl CodexMessageProcessor {
cli_overrides,
runtime_feature_enablement,
cloud_requirements,
exec_environment_registry,
feedback,
log_db,
} = args;
@@ -685,6 +757,7 @@ impl CodexMessageProcessor {
cli_overrides,
runtime_feature_enablement,
cloud_requirements,
exec_environment_registry,
active_login: Arc::new(Mutex::new(None)),
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
thread_state_manager: ThreadStateManager::new(),
@@ -852,6 +925,14 @@ impl CodexMessageProcessor {
)
.await;
}
ClientRequest::ExecServerEnvironmentRegister { request_id, params } => {
self.exec_environment_register(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ExecServerEnvironmentList { request_id, params } => {
self.exec_environment_list(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadUnsubscribe { request_id, params } => {
self.thread_unsubscribe(to_connection_request_id(request_id), params)
.await;
@@ -2240,6 +2321,7 @@ impl CodexMessageProcessor {
experimental_raw_events,
personality,
ephemeral,
exec_environment_name,
session_start_source,
persist_extended_history,
} = params;
@@ -2271,6 +2353,24 @@ impl CodexMessageProcessor {
};
let request_trace = request_context.request_trace();
let runtime_feature_enablement = self.current_runtime_feature_enablement();
let selected_exec_environment = match exec_environment_name {
Some(name) => match self.exec_environment_registry.resolve(&name) {
Ok(Some(exec_server_url)) => Some((name, exec_server_url)),
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("unknown exec environment: {name}"),
)
.await;
return;
}
Err(message) => {
self.send_internal_error(request_id, message).await;
return;
}
},
None => None,
};
let thread_start_task = async move {
Self::thread_start_task(
listener_task_context,
@@ -2283,6 +2383,7 @@ impl CodexMessageProcessor {
config,
typesafe_overrides,
dynamic_tools,
selected_exec_environment,
session_start_source,
persist_extended_history,
service_name,
@@ -2359,6 +2460,7 @@ impl CodexMessageProcessor {
config_overrides: Option<HashMap<String, serde_json::Value>>,
typesafe_overrides: ConfigOverrides,
dynamic_tools: Option<Vec<ApiDynamicToolSpec>>,
selected_exec_environment: Option<(String, String)>,
session_start_source: Option<codex_app_server_protocol::ThreadStartSource>,
persist_extended_history: bool,
service_name: Option<String>,
@@ -2496,10 +2598,17 @@ impl CodexMessageProcessor {
.collect()
};
let core_dynamic_tool_count = core_dynamic_tools.len();
let selected_exec_environment_name =
selected_exec_environment.as_ref().map(|(name, _)| name.clone());
let environment_manager_override = selected_exec_environment
.as_ref()
.map(|(_, exec_server_url)| {
Arc::new(EnvironmentManager::new(Some(exec_server_url.clone())))
});
match listener_task_context
.thread_manager
.start_thread_with_tools_and_service_name(
.start_thread_with_tools_service_name_and_environment_manager(
config,
match session_start_source
.unwrap_or(codex_app_server_protocol::ThreadStartSource::Startup)
@@ -2510,6 +2619,7 @@ impl CodexMessageProcessor {
core_dynamic_tools,
persist_extended_history,
service_name,
environment_manager_override,
request_trace,
)
.instrument(tracing::info_span!(
@@ -2552,6 +2662,7 @@ impl CodexMessageProcessor {
&config_snapshot,
session_configured.rollout_path.clone(),
);
thread.exec_environment_name = selected_exec_environment_name.clone();
// Auto-attach a thread listener when starting a thread.
Self::log_listener_attach_result(
@@ -2582,6 +2693,13 @@ impl CodexMessageProcessor {
))
.await;
let thread_state = listener_task_context
.thread_state_manager
.thread_state(thread_id)
.await;
thread_state.lock().await.exec_environment_name =
selected_exec_environment_name.clone();
thread.status = resolve_thread_status(
listener_task_context
.thread_watch_manager
@@ -3755,18 +3873,18 @@ impl CodexMessageProcessor {
.loaded_statuses_for_threads(status_ids)
.await;
let data = threads
.into_iter()
.map(|(conversation_id, mut thread)| {
if let Some(title) = names.get(&conversation_id).cloned() {
set_thread_name_from_title(&mut thread, title);
}
if let Some(status) = statuses.get(&thread.id) {
thread.status = status.clone();
}
thread
})
.collect();
let mut data = Vec::with_capacity(threads.len());
for (conversation_id, mut thread) in threads {
if let Some(title) = names.get(&conversation_id).cloned() {
set_thread_name_from_title(&mut thread, title);
}
if let Some(status) = statuses.get(&thread.id) {
thread.status = status.clone();
}
self.attach_exec_environment_name(conversation_id, &mut thread)
.await;
data.push(thread);
}
let response = ThreadListResponse { data, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
@@ -3934,6 +4052,7 @@ impl CodexMessageProcessor {
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_uuid, &mut thread).await;
self.attach_exec_environment_name(thread_uuid, &mut thread).await;
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_rollout_items_from_rollout(rollout_path).await {
@@ -3984,6 +4103,62 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn exec_environment_register(
&self,
request_id: ConnectionRequestId,
params: ExecServerEnvironmentRegisterParams,
) {
if params.name.trim().is_empty() {
self.send_invalid_request_error(
request_id,
"exec environment name must not be empty".to_string(),
)
.await;
return;
}
if params.exec_server_url.trim().is_empty() {
self.send_invalid_request_error(
request_id,
"exec_server_url must not be empty".to_string(),
)
.await;
return;
}
match self
.exec_environment_registry
.register(params.name, params.exec_server_url)
{
Ok(environment) => {
self.outgoing
.send_response(
request_id,
ExecServerEnvironmentRegisterResponse { environment },
)
.await;
}
Err(message) => self.send_internal_error(request_id, message).await,
}
}
async fn exec_environment_list(
&self,
request_id: ConnectionRequestId,
params: ExecServerEnvironmentListParams,
) {
match self.exec_environment_registry.list(params.cursor, params.limit) {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(message) => self.send_internal_error(request_id, message).await,
}
}
async fn attach_exec_environment_name(&self, thread_id: ThreadId, thread: &mut Thread) {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
if let Some(exec_environment_name) = thread_state.lock().await.exec_environment_name.clone()
{
thread.exec_environment_name = Some(exec_environment_name);
}
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}
@@ -9608,6 +9783,7 @@ fn build_thread_from_snapshot(
Thread {
id: thread_id.to_string(),
forked_from_id: None,
exec_environment_name: None,
preview: String::new(),
ephemeral: config_snapshot.ephemeral,
model_provider: config_snapshot.model_provider_id.clone(),
@@ -9651,6 +9827,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
Thread {
id: conversation_id.to_string(),
forked_from_id: None,
exec_environment_name: None,
preview,
ephemeral: false,
model_provider,

View File

@@ -8,6 +8,7 @@ use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::codex_message_processor::ExecEnvironmentRegistry;
use crate::config_api::ConfigApi;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::external_agent_config_api::ExternalAgentConfigApi;
@@ -246,6 +247,7 @@ impl MessageProcessor {
let cli_overrides = Arc::new(RwLock::new(cli_overrides));
let runtime_feature_enablement = Arc::new(RwLock::new(BTreeMap::new()));
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let exec_environment_registry = ExecEnvironmentRegistry::default();
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager: auth_manager.clone(),
thread_manager: Arc::clone(&thread_manager),
@@ -256,6 +258,7 @@ impl MessageProcessor {
cli_overrides: cli_overrides.clone(),
runtime_feature_enablement: runtime_feature_enablement.clone(),
cloud_requirements: cloud_requirements.clone(),
exec_environment_registry,
feedback,
log_db,
});

View File

@@ -58,6 +58,7 @@ pub(crate) struct ThreadState {
pub(crate) pending_interrupts: PendingInterruptQueue,
pub(crate) pending_rollbacks: Option<ConnectionRequestId>,
pub(crate) turn_summary: TurnSummary,
pub(crate) exec_environment_name: Option<String>,
pub(crate) cancel_tx: Option<oneshot::Sender<()>>,
pub(crate) experimental_raw_events: bool,
pub(crate) listener_generation: u64,

View File

@@ -24,6 +24,8 @@ use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
use codex_app_server_protocol::ConfigValueWriteParams;
use codex_app_server_protocol::ExperimentalFeatureListParams;
use codex_app_server_protocol::ExecServerEnvironmentListParams;
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCreateDirectoryParams;
@@ -336,6 +338,24 @@ impl McpProcess {
self.send_request("thread/start", params).await
}
/// Send an `execEnvironment/register` JSON-RPC request.
pub async fn send_exec_environment_register_request(
&mut self,
params: ExecServerEnvironmentRegisterParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("execEnvironment/register", params).await
}
/// Send an `execEnvironment/list` JSON-RPC request.
pub async fn send_exec_environment_list_request(
&mut self,
params: ExecServerEnvironmentListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("execEnvironment/list", params).await
}
/// Send a `thread/resume` JSON-RPC request.
pub async fn send_thread_resume_request(
&mut self,

View File

@@ -270,6 +270,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
developer_instructions: None,
personality: None,
ephemeral: None,
exec_environment_name: None,
session_start_source: None,
dynamic_tools: None,
mock_experimental_field: None,

View File

@@ -6,6 +6,10 @@ use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ExecServerEnvironmentListParams;
use codex_app_server_protocol::ExecServerEnvironmentListResponse;
use codex_app_server_protocol::ExecServerEnvironmentRegisterParams;
use codex_app_server_protocol::ExecServerEnvironmentRegisterResponse;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
@@ -526,6 +530,101 @@ async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Resul
Ok(())
}
#[tokio::test]
async fn registered_exec_environment_is_exposed_on_thread_surfaces() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let register_id = mcp
.send_exec_environment_register_request(ExecServerEnvironmentRegisterParams {
name: "dev-remote".to_string(),
exec_server_url: "none".to_string(),
})
.await?;
let register_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(register_id)),
)
.await??;
let ExecServerEnvironmentRegisterResponse { environment } =
to_response::<ExecServerEnvironmentRegisterResponse>(register_resp)?;
assert_eq!(environment.name, "dev-remote");
assert_eq!(environment.exec_server_url, "none");
let list_id = mcp
.send_exec_environment_list_request(ExecServerEnvironmentListParams {
cursor: None,
limit: Some(10),
})
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ExecServerEnvironmentListResponse { data, next_cursor } =
to_response::<ExecServerEnvironmentListResponse>(list_resp)?;
assert_eq!(next_cursor, None);
assert_eq!(data.len(), 1);
assert_eq!(data[0].name, "dev-remote");
assert_eq!(data[0].exec_server_url, "none");
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
exec_environment_name: Some("dev-remote".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let start_result = start_resp.result.clone();
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
assert_eq!(thread.exec_environment_name.as_deref(), Some("dev-remote"));
assert_eq!(
start_result
.get("thread")
.and_then(Value::as_object)
.and_then(|thread| thread.get("execEnvironmentName"))
.and_then(Value::as_str),
Some("dev-remote"),
"thread/start must serialize `thread.exec_environment_name` on the wire"
);
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id.clone(),
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let read_result = read_resp.result.clone();
let ThreadReadResponse { thread: read_thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read_thread.exec_environment_name.as_deref(), Some("dev-remote"));
assert_eq!(
read_result
.get("thread")
.and_then(Value::as_object)
.and_then(|thread| thread.get("execEnvironmentName"))
.and_then(Value::as_str),
Some("dev-remote"),
"thread/read must serialize `thread.exec_environment_name` on the wire"
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -238,10 +238,13 @@ impl AgentControl {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
None,
)
.await?
}
(None, _) => state.spawn_new_thread(config, self.clone()).await?,
(None, _) => state
.spawn_new_thread(config, self.clone(), None)
.await?,
};
agent_metadata.agent_id = Some(new_thread.thread_id);
reservation.commit(agent_metadata.clone());

View File

@@ -496,6 +496,28 @@ impl ThreadManager {
persist_extended_history: bool,
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
Box::pin(self.start_thread_with_tools_service_name_and_environment_manager(
config,
initial_history,
dynamic_tools,
persist_extended_history,
metrics_service_name,
/*environment_manager_override*/ None,
parent_trace,
))
.await
}
pub async fn start_thread_with_tools_service_name_and_environment_manager(
&self,
config: Config,
initial_history: InitialHistory,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
environment_manager_override: Option<Arc<EnvironmentManager>>,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
Box::pin(self.state.spawn_thread(
config,
@@ -505,6 +527,7 @@ impl ThreadManager {
dynamic_tools,
persist_extended_history,
metrics_service_name,
environment_manager_override,
parent_trace,
/*user_shell_override*/ None,
))
@@ -545,6 +568,7 @@ impl ThreadManager {
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
/*environment_manager_override*/ None,
parent_trace,
/*user_shell_override*/ None,
))
@@ -564,6 +588,7 @@ impl ThreadManager {
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*environment_manager_override*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
))
@@ -586,6 +611,7 @@ impl ThreadManager {
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*environment_manager_override*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
))
@@ -694,6 +720,7 @@ impl ThreadManager {
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
/*environment_manager_override*/ None,
parent_trace,
/*user_shell_override*/ None,
))
@@ -760,6 +787,7 @@ impl ThreadManagerState {
&self,
config: Config,
agent_control: AgentControl,
environment_manager_override: Option<Arc<EnvironmentManager>>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_new_thread_with_source(
config,
@@ -769,6 +797,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
environment_manager_override,
))
.await
}
@@ -783,6 +812,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
environment_manager_override: Option<Arc<EnvironmentManager>>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -795,6 +825,7 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
environment_manager_override,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -822,6 +853,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
/*environment_manager_override*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -850,6 +882,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
/*environment_manager_override*/ None,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -867,6 +900,7 @@ impl ThreadManagerState {
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
environment_manager_override: Option<Arc<EnvironmentManager>>,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
@@ -881,6 +915,7 @@ impl ThreadManagerState {
metrics_service_name,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
environment_manager_override,
parent_trace,
user_shell_override,
))
@@ -900,6 +935,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
environment_manager_override: Option<Arc<EnvironmentManager>>,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
@@ -914,7 +950,8 @@ impl ThreadManagerState {
config,
auth_manager,
models_manager: Arc::clone(&self.models_manager),
environment_manager: Arc::clone(&self.environment_manager),
environment_manager: environment_manager_override
.unwrap_or_else(|| Arc::clone(&self.environment_manager)),
skills_manager: Arc::clone(&self.skills_manager),
plugins_manager: Arc::clone(&self.plugins_manager),
mcp_manager: Arc::clone(&self.mcp_manager),