Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
4c54b2004d feat: zsh fork in exec server 2026-03-26 19:09:17 +01:00
15 changed files with 307 additions and 82 deletions

View File

@@ -220,6 +220,12 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
network.apply_to_env(&mut env);
}
if let UnifiedExecShellMode::ZshFork(zsh_fork_config) = &self.shell_mode {
if !ctx.turn.environment.exec_capabilities().zsh_fork {
return Err(ToolError::Rejected(
"unified_exec zsh-fork is not supported by the configured exec backend"
.to_string(),
));
}
let command =
build_sandbox_command(&command, &req.cwd, &env, req.additional_permissions.clone())
.map_err(|_| ToolError::Rejected("missing command line for PTY".to_string()))?;
@@ -240,17 +246,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.await?
{
Some(prepared) => {
if ctx.turn.environment.exec_server_url().is_some() {
return Err(ToolError::Rejected(
"unified_exec zsh-fork is not supported when exec_server_url is configured".to_string(),
));
}
return self
.manager
.open_session_with_exec_env(
req.process_id,
&prepared.exec_request,
req.tty,
codex_exec_server::ExecLaunch::ZshFork,
prepared.spawn_lifecycle,
ctx.turn.environment.as_ref(),
)
@@ -287,6 +289,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
req.process_id,
&exec_env,
req.tty,
codex_exec_server::ExecLaunch::Direct,
Box::new(NoopSpawnLifecycle),
ctx.turn.environment.as_ref(),
)

View File

@@ -89,6 +89,7 @@ async fn exec_command_with_tty(
process_id,
&request,
tty,
codex_exec_server::ExecLaunch::Direct,
Box::new(NoopSpawnLifecycle),
turn.environment.as_ref(),
)
@@ -478,6 +479,7 @@ async fn completed_pipe_commands_preserve_exit_code() -> anyhow::Result<()> {
1234,
&request,
false,
codex_exec_server::ExecLaunch::Direct,
Box::new(NoopSpawnLifecycle),
&environment,
)
@@ -520,6 +522,7 @@ async fn unified_exec_uses_remote_exec_server_when_configured() -> anyhow::Resul
1234,
&request,
true,
codex_exec_server::ExecLaunch::Direct,
Box::new(NoopSpawnLifecycle),
remote_test_env.environment(),
)
@@ -574,6 +577,7 @@ async fn remote_exec_server_rejects_inherited_fd_launches() -> anyhow::Result<()
1234,
&request,
true,
codex_exec_server::ExecLaunch::ZshFork,
Box::new(TestSpawnLifecycle {
inherited_fds: vec![42],
}),

View File

@@ -584,6 +584,7 @@ impl UnifiedExecProcessManager {
process_id: i32,
env: &ExecRequest,
tty: bool,
launch: codex_exec_server::ExecLaunch,
mut spawn_lifecycle: SpawnLifecycleHandle,
environment: &codex_exec_server::Environment,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
@@ -602,14 +603,17 @@ impl UnifiedExecProcessManager {
let started = environment
.get_exec_backend()
.start(codex_exec_server::ExecParams {
process_id: exec_server_process_id(process_id).into(),
argv: env.command.clone(),
cwd: env.cwd.clone(),
env: env.env.clone(),
tty,
arg0: env.arg0.clone(),
})
.start(codex_exec_server::ExecStartRequest::new(
codex_exec_server::ExecParams {
process_id: exec_server_process_id(process_id).into(),
argv: env.command.clone(),
cwd: env.cwd.clone(),
env: env.env.clone(),
tty,
arg0: env.arg0.clone(),
},
launch,
))
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
return UnifiedExecProcess::from_remote_started(started, env.sandbox).await;

View File

@@ -61,7 +61,12 @@ Request params:
Response:
```json
{}
{
"capabilities": {
"direct": true,
"zshFork": false
}
}
```
### `initialized`

View File

@@ -37,6 +37,7 @@ use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecCapabilities;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
@@ -130,6 +131,7 @@ impl Drop for Inner {
#[derive(Clone)]
pub struct ExecServerClient {
inner: Arc<Inner>,
capabilities: ExecCapabilities,
}
#[derive(Debug, thiserror::Error)]
@@ -392,9 +394,19 @@ impl ExecServerClient {
}
});
let client = Self { inner };
client.initialize(options).await?;
Ok(client)
let client = Self {
inner,
capabilities: ExecCapabilities::direct_only(),
};
let initialize_response = client.initialize(options).await?;
Ok(Self {
inner: Arc::clone(&client.inner),
capabilities: initialize_response.capabilities,
})
}
pub fn capabilities(&self) -> ExecCapabilities {
self.capabilities
}
async fn notify_initialized(&self) -> Result<(), ExecServerError> {
@@ -647,6 +659,7 @@ mod tests {
use crate::connection::JsonRpcConnection;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::ExecCapabilities;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
@@ -693,8 +706,10 @@ mod tests {
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(InitializeResponse {})
.expect("initialize response should serialize"),
result: serde_json::to_value(InitializeResponse {
capabilities: ExecCapabilities::direct_only(),
})
.expect("initialize response should serialize"),
}),
)
.await;

View File

@@ -9,6 +9,7 @@ use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
use crate::process::ExecBackend;
use crate::protocol::ExecCapabilities;
use crate::remote_file_system::RemoteFileSystem;
use crate::remote_process::RemoteProcess;
@@ -131,6 +132,17 @@ impl Environment {
Arc::clone(&self.exec_backend)
}
pub fn exec_capabilities(&self) -> ExecCapabilities {
if self.exec_server_url.is_some() {
self.exec_backend.capabilities()
} else {
// Local unified_exec still owns zsh-fork setup and passes inherited
// FDs directly, so the environment supports zsh-fork even though
// the in-process exec backend only starts direct launches.
ExecCapabilities::direct_and_zsh_fork()
}
}
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
if let Some(client) = self.remote_exec_server_client.clone() {
Arc::new(RemoteFileSystem::new(client))
@@ -159,6 +171,7 @@ mod tests {
use super::Environment;
use super::EnvironmentManager;
use crate::ExecCapabilities;
use crate::ProcessId;
use pretty_assertions::assert_eq;
@@ -169,6 +182,10 @@ mod tests {
.expect("create environment");
assert_eq!(environment.exec_server_url(), None);
assert_eq!(
environment.exec_capabilities(),
ExecCapabilities::direct_and_zsh_fork()
);
assert!(environment.remote_exec_server_client.is_none());
}
@@ -195,14 +212,17 @@ mod tests {
let response = environment
.get_exec_backend()
.start(crate::ExecParams {
process_id: ProcessId::from("default-env-proc"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir().expect("read current dir"),
env: Default::default(),
tty: false,
arg0: None,
})
.start(
crate::ExecParams {
process_id: ProcessId::from("default-env-proc"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir().expect("read current dir"),
env: Default::default(),
tty: false,
arg0: None,
}
.into(),
)
.await
.expect("start process");

View File

@@ -43,9 +43,12 @@ pub use file_system::FileSystemResult;
pub use file_system::ReadDirectoryEntry;
pub use file_system::RemoveOptions;
pub use process::ExecBackend;
pub use process::ExecLaunch;
pub use process::ExecProcess;
pub use process::ExecStartRequest;
pub use process::StartedExecProcess;
pub use process_id::ProcessId;
pub use protocol::ExecCapabilities;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;

View File

@@ -15,11 +15,14 @@ use tokio::sync::mpsc;
use tokio::sync::watch;
use crate::ExecBackend;
use crate::ExecLaunch;
use crate::ExecProcess;
use crate::ExecServerError;
use crate::ExecStartRequest;
use crate::ProcessId;
use crate::StartedExecProcess;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::ExecCapabilities;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
@@ -134,7 +137,9 @@ impl LocalProcess {
"initialize may only be sent once per connection".to_string(),
));
}
Ok(InitializeResponse {})
Ok(InitializeResponse {
capabilities: ExecCapabilities::direct_only(),
})
}
pub(crate) fn initialized(&self) -> Result<(), String> {
@@ -413,7 +418,21 @@ impl LocalProcess {
#[async_trait]
impl ExecBackend for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
fn capabilities(&self) -> ExecCapabilities {
ExecCapabilities::direct_only()
}
async fn start(
&self,
request: ExecStartRequest,
) -> Result<StartedExecProcess, ExecServerError> {
let ExecStartRequest { params, launch } = request;
if !matches!(launch, ExecLaunch::Direct) {
return Err(ExecServerError::Protocol(
"zsh-fork launch is not supported by the local exec backend yet".to_string(),
));
}
let (response, wake_tx) = self
.start_process(params)
.await

View File

@@ -5,6 +5,7 @@ use tokio::sync::watch;
use crate::ExecServerError;
use crate::ProcessId;
use crate::protocol::ExecCapabilities;
use crate::protocol::ExecParams;
use crate::protocol::ReadResponse;
use crate::protocol::WriteResponse;
@@ -13,6 +14,33 @@ pub struct StartedExecProcess {
pub process: Arc<dyn ExecProcess>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecStartRequest {
pub params: ExecParams,
pub launch: ExecLaunch,
}
impl ExecStartRequest {
pub fn new(params: ExecParams, launch: ExecLaunch) -> Self {
Self { params, launch }
}
}
impl From<ExecParams> for ExecStartRequest {
fn from(params: ExecParams) -> Self {
Self {
params,
launch: ExecLaunch::Direct,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecLaunch {
Direct,
ZshFork,
}
#[async_trait]
pub trait ExecProcess: Send + Sync {
fn process_id(&self) -> &ProcessId;
@@ -33,5 +61,8 @@ pub trait ExecProcess: Send + Sync {
#[async_trait]
pub trait ExecBackend: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError>;
fn capabilities(&self) -> ExecCapabilities;
async fn start(&self, request: ExecStartRequest)
-> Result<StartedExecProcess, ExecServerError>;
}

View File

@@ -48,7 +48,32 @@ pub struct InitializeParams {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeResponse {}
pub struct InitializeResponse {
pub capabilities: ExecCapabilities,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecCapabilities {
pub direct: bool,
pub zsh_fork: bool,
}
impl ExecCapabilities {
pub const fn direct_only() -> Self {
Self {
direct: true,
zsh_fork: false,
}
}
pub const fn direct_and_zsh_fork() -> Self {
Self {
direct: true,
zsh_fork: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]

View File

@@ -5,18 +5,21 @@ use tokio::sync::watch;
use tracing::trace;
use crate::ExecBackend;
use crate::ExecLaunch;
use crate::ExecProcess;
use crate::ExecServerError;
use crate::ExecStartRequest;
use crate::StartedExecProcess;
use crate::client::ExecServerClient;
use crate::client::Session;
use crate::protocol::ExecParams;
use crate::protocol::ExecCapabilities;
use crate::protocol::ReadResponse;
use crate::protocol::WriteResponse;
#[derive(Clone)]
pub(crate) struct RemoteProcess {
client: ExecServerClient,
capabilities: ExecCapabilities,
}
struct RemoteExecProcess {
@@ -26,13 +29,31 @@ struct RemoteExecProcess {
impl RemoteProcess {
pub(crate) fn new(client: ExecServerClient) -> Self {
trace!("remote process new");
Self { client }
let capabilities = client.capabilities();
Self {
client,
capabilities,
}
}
}
#[async_trait]
impl ExecBackend for RemoteProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
fn capabilities(&self) -> ExecCapabilities {
self.capabilities
}
async fn start(
&self,
request: ExecStartRequest,
) -> Result<StartedExecProcess, ExecServerError> {
let ExecStartRequest { params, launch } = request;
if !matches!(launch, ExecLaunch::Direct) {
return Err(ExecServerError::Protocol(
"zsh-fork launch is not supported by remote exec-server yet".to_string(),
));
}
let process_id = params.process_id.clone();
let session = self.client.register_session(&process_id).await?;
if let Err(err) = self.client.exec(params).await {

View File

@@ -7,6 +7,7 @@ use tokio::sync::mpsc;
use super::ExecServerHandler;
use crate::ProcessId;
use crate::protocol::ExecCapabilities;
use crate::protocol::ExecParams;
use crate::protocol::InitializeResponse;
use crate::protocol::TerminateParams;
@@ -39,7 +40,9 @@ async fn initialized_handler() -> Arc<ExecServerHandler> {
)));
assert_eq!(
handler.initialize().expect("initialize"),
InitializeResponse {}
InitializeResponse {
capabilities: ExecCapabilities::direct_only(),
}
);
handler.initialized().expect("initialized");
handler

View File

@@ -7,8 +7,11 @@ use std::sync::Arc;
use anyhow::Result;
use codex_exec_server::Environment;
use codex_exec_server::ExecBackend;
use codex_exec_server::ExecCapabilities;
use codex_exec_server::ExecLaunch;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecStartRequest;
use codex_exec_server::ProcessId;
use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
@@ -43,18 +46,65 @@ async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
}
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_backend_reports_direct_only_capabilities(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
assert_eq!(
context.backend.capabilities(),
ExecCapabilities::direct_only()
);
Ok(())
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_backend_rejects_zsh_fork_launches_until_supported(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let result = context
.backend
.start(ExecStartRequest::new(
ExecParams {
process_id: ProcessId::from("proc-zsh-fork"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
},
ExecLaunch::ZshFork,
))
.await;
let Err(err) = result else {
panic!("zsh-fork launch should be rejected");
};
assert!(
err.to_string().contains("zsh-fork launch is not supported"),
"unexpected error: {err}"
);
Ok(())
}
async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-1"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.start(
ExecParams {
process_id: ProcessId::from("proc-1"),
argv: vec!["true".to_string()],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
}
.into(),
)
.await?;
assert_eq!(session.process.process_id().as_str(), "proc-1");
let wake_rx = session.process.subscribe_wake();
@@ -119,18 +169,21 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
let process_id = "proc-stream".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.05; printf 'session output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.start(
ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.05; printf 'session output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
}
.into(),
)
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
@@ -159,7 +212,8 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
env: Default::default(),
tty: true,
arg0: None,
})
}
.into())
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
@@ -184,18 +238,21 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe(
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-queued"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'queued output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.start(
ExecParams {
process_id: ProcessId::from("proc-queued"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'queued output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
}
.into(),
)
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
@@ -214,18 +271,21 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
let mut context = create_process_context(/*use_remote*/ true).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-disconnect"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 10".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.start(
ExecParams {
process_id: ProcessId::from("proc-disconnect"),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 10".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
}
.into(),
)
.await?;
let server = context

View File

@@ -4,6 +4,7 @@ mod common;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::ExecCapabilities;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use common::exec_server::exec_server;
@@ -27,7 +28,12 @@ async fn exec_server_accepts_initialize() -> anyhow::Result<()> {
};
assert_eq!(id, initialize_id);
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
assert_eq!(
initialize_response,
InitializeResponse {
capabilities: ExecCapabilities::direct_only(),
}
);
server.shutdown().await?;
Ok(())

View File

@@ -5,6 +5,7 @@ mod common;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::ExecCapabilities;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use common::exec_server::exec_server;
@@ -53,7 +54,12 @@ async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> any
};
assert_eq!(id, initialize_id);
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
assert_eq!(initialize_response, InitializeResponse {});
assert_eq!(
initialize_response,
InitializeResponse {
capabilities: ExecCapabilities::direct_only(),
}
);
server.shutdown().await?;
Ok(())