Compare commits

..

15 Commits

Author SHA1 Message Date
jif-oai
ce3c6e28eb fix merge 2026-03-25 14:00:49 +00:00
jif-oai
5ee7b2a147 nit 2026-03-25 13:47:39 +00:00
jif-oai
d6253a2969 Merge remote-tracking branch 'origin/main' into jif/exec-server-for-ue
# Conflicts:
#	codex-rs/core/src/unified_exec/process.rs
2026-03-25 13:44:11 +00:00
jif-oai
5a44a31154 Make unified exec part 2026-03-25 13:43:31 +00:00
jif-oai
047ea642d2 chore: tty metric (#15766) 2026-03-25 13:34:43 +00:00
jif-oai
a2cda61546 v1 closing 2026-03-25 11:17:36 +00:00
jif-oai
a5b99aa0b8 small revert 2026-03-25 11:12:55 +00:00
jif-oai
732ebd63f6 clean up 2026-03-25 10:56:07 +00:00
jif-oai
8cae6b69a3 Simplify subscribers 2026-03-25 10:51:55 +00:00
jif-oai
3037b80396 Simplify subscribers 2026-03-25 09:51:07 +00:00
xl-openai
f5dccab5cf Update plugin creator skill. (#15734)
Add support for home-local plugin + fix policy.
2026-03-25 01:55:10 -07:00
jif-oai
9e91c882fd Move to an arcswap 2026-03-24 23:12:20 +00:00
jif-oai
deec6cc824 nits in exec server 2026-03-24 22:43:47 +00:00
jif-oai
7462f690ac nits in exec server 2026-03-24 22:23:59 +00:00
jif-oai
3ce6a55209 feat: exec-server prep for unified exec 2026-03-24 22:14:43 +00:00
37 changed files with 1639 additions and 472 deletions

11
codex-rs/Cargo.lock generated
View File

@@ -1883,7 +1883,6 @@ dependencies = [
"codex-features",
"codex-git-utils",
"codex-hooks",
"codex-instructions",
"codex-login",
"codex-network-proxy",
"codex-otel",
@@ -2031,6 +2030,7 @@ name = "codex-exec-server"
version = "0.0.0"
dependencies = [
"anyhow",
"arc-swap",
"async-trait",
"base64 0.22.1",
"clap",
@@ -2175,15 +2175,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "codex-instructions"
version = "0.0.0"
dependencies = [
"codex-protocol",
"pretty_assertions",
"serde",
]
[[package]]
name = "codex-keyring-store"
version = "0.0.0"

View File

@@ -25,7 +25,6 @@ members = [
"skills",
"core",
"hooks",
"instructions",
"secrets",
"exec",
"exec-server",
@@ -123,7 +122,6 @@ codex-features = { path = "features" }
codex-file-search = { path = "file-search" }
codex-git-utils = { path = "git-utils" }
codex-hooks = { path = "hooks" }
codex-instructions = { path = "instructions" }
codex-keyring-store = { path = "keyring-store" }
codex-linux-sandbox = { path = "linux-sandbox" }
codex-lmstudio = { path = "lmstudio" }

View File

@@ -42,7 +42,6 @@ codex-skills = { workspace = true }
codex-execpolicy = { workspace = true }
codex-git-utils = { workspace = true }
codex-hooks = { workspace = true }
codex-instructions = { workspace = true }
codex-network-proxy = { workspace = true }
codex-otel = { workspace = true }
codex-artifacts = { workspace = true }

View File

@@ -1,12 +1,14 @@
use codex_instructions::AGENTS_MD_FRAGMENT;
use codex_instructions::ContextualUserFragmentDefinition;
use codex_instructions::SKILL_FRAGMENT;
use codex_protocol::items::HookPromptItem;
use codex_protocol::items::parse_hook_prompt_fragment;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_CLOSE_TAG;
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
pub(crate) const AGENTS_MD_START_MARKER: &str = "# AGENTS.md instructions for ";
pub(crate) const AGENTS_MD_END_MARKER: &str = "</INSTRUCTIONS>";
pub(crate) const SKILL_OPEN_TAG: &str = "<skill>";
pub(crate) const SKILL_CLOSE_TAG: &str = "</skill>";
pub(crate) const USER_SHELL_COMMAND_OPEN_TAG: &str = "<user_shell_command>";
pub(crate) const USER_SHELL_COMMAND_CLOSE_TAG: &str = "</user_shell_command>";
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
@@ -14,11 +16,64 @@ pub(crate) const TURN_ABORTED_CLOSE_TAG: &str = "</turn_aborted>";
pub(crate) const SUBAGENT_NOTIFICATION_OPEN_TAG: &str = "<subagent_notification>";
pub(crate) const SUBAGENT_NOTIFICATION_CLOSE_TAG: &str = "</subagent_notification>";
#[derive(Clone, Copy)]
pub(crate) struct ContextualUserFragmentDefinition {
start_marker: &'static str,
end_marker: &'static str,
}
impl ContextualUserFragmentDefinition {
pub(crate) const fn new(start_marker: &'static str, end_marker: &'static str) -> Self {
Self {
start_marker,
end_marker,
}
}
pub(crate) fn matches_text(&self, text: &str) -> bool {
let trimmed = text.trim_start();
let starts_with_marker = trimmed
.get(..self.start_marker.len())
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.start_marker));
let trimmed = trimmed.trim_end();
let ends_with_marker = trimmed
.get(trimmed.len().saturating_sub(self.end_marker.len())..)
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.end_marker));
starts_with_marker && ends_with_marker
}
pub(crate) const fn start_marker(&self) -> &'static str {
self.start_marker
}
pub(crate) const fn end_marker(&self) -> &'static str {
self.end_marker
}
pub(crate) fn wrap(&self, body: String) -> String {
format!("{}\n{}\n{}", self.start_marker, body, self.end_marker)
}
pub(crate) fn into_message(self, text: String) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text }],
end_turn: None,
phase: None,
}
}
}
pub(crate) const AGENTS_MD_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(AGENTS_MD_START_MARKER, AGENTS_MD_END_MARKER);
pub(crate) const ENVIRONMENT_CONTEXT_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(
ENVIRONMENT_CONTEXT_OPEN_TAG,
ENVIRONMENT_CONTEXT_CLOSE_TAG,
);
pub(crate) const SKILL_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(SKILL_OPEN_TAG, SKILL_CLOSE_TAG);
pub(crate) const USER_SHELL_COMMAND_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(
USER_SHELL_COMMAND_OPEN_TAG,

View File

@@ -1,7 +1,6 @@
use super::*;
use codex_protocol::items::HookPromptFragment;
use codex_protocol::items::build_hook_prompt_message;
use codex_protocol::models::ResponseItem;
#[test]
fn detects_environment_context_fragment() {

View File

@@ -1,3 +1,5 @@
pub(crate) use codex_instructions::SkillInstructions;
pub use codex_instructions::USER_INSTRUCTIONS_PREFIX;
pub(crate) use codex_instructions::UserInstructions;
mod user_instructions;
pub(crate) use user_instructions::SkillInstructions;
pub use user_instructions::USER_INSTRUCTIONS_PREFIX;
pub(crate) use user_instructions::UserInstructions;

View File

@@ -3,21 +3,20 @@ use serde::Serialize;
use codex_protocol::models::ResponseItem;
use crate::fragment::AGENTS_MD_FRAGMENT;
use crate::fragment::AGENTS_MD_START_MARKER;
use crate::fragment::SKILL_FRAGMENT;
use crate::contextual_user_message::AGENTS_MD_FRAGMENT;
use crate::contextual_user_message::SKILL_FRAGMENT;
pub const USER_INSTRUCTIONS_PREFIX: &str = AGENTS_MD_START_MARKER;
pub const USER_INSTRUCTIONS_PREFIX: &str = "# AGENTS.md instructions for ";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename = "user_instructions", rename_all = "snake_case")]
pub struct UserInstructions {
pub(crate) struct UserInstructions {
pub directory: String,
pub text: String,
}
impl UserInstructions {
pub fn serialize_to_text(&self) -> String {
pub(crate) fn serialize_to_text(&self) -> String {
format!(
"{prefix}{directory}\n\n<INSTRUCTIONS>\n{contents}\n{suffix}",
prefix = AGENTS_MD_FRAGMENT.start_marker(),
@@ -36,12 +35,14 @@ impl From<UserInstructions> for ResponseItem {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename = "skill_instructions", rename_all = "snake_case")]
pub struct SkillInstructions {
pub(crate) struct SkillInstructions {
pub name: String,
pub path: String,
pub contents: String,
}
impl SkillInstructions {}
impl From<SkillInstructions> for ResponseItem {
fn from(si: SkillInstructions) -> Self {
SKILL_FRAGMENT.into_message(SKILL_FRAGMENT.wrap(format!(

View File

@@ -1,11 +1,7 @@
use super::*;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use crate::fragment::AGENTS_MD_FRAGMENT;
use crate::fragment::SKILL_FRAGMENT;
#[test]
fn test_user_instructions() {
let user_instructions = UserInstructions {

View File

@@ -25,6 +25,8 @@ use crate::unified_exec::UnifiedExecProcessManager;
use crate::unified_exec::WriteStdinRequest;
use async_trait::async_trait;
use codex_features::Feature;
use codex_otel::SessionTelemetry;
use codex_otel::metrics::names::TOOL_CALL_UNIFIED_EXEC_METRIC;
use codex_protocol::models::PermissionProfile;
use serde::Deserialize;
use std::path::PathBuf;
@@ -260,6 +262,7 @@ impl ToolHandler for UnifiedExecHandler {
});
}
emit_unified_exec_tty_metric(&turn.session_telemetry, tty);
manager
.exec_command(
ExecCommandRequest {
@@ -323,6 +326,14 @@ impl ToolHandler for UnifiedExecHandler {
}
}
fn emit_unified_exec_tty_metric(session_telemetry: &SessionTelemetry, tty: bool) {
session_telemetry.counter(
TOOL_CALL_UNIFIED_EXEC_METRIC,
/*inc*/ 1,
&[("tty", if tty { "true" } else { "false" })],
);
}
pub(crate) fn get_command(
args: &ExecCommandArgs,
session_shell: Arc<Shell>,

View File

@@ -45,9 +45,12 @@ use futures::future::BoxFuture;
use std::collections::HashMap;
use std::path::PathBuf;
/// Request payload used by the unified-exec runtime after approvals and
/// sandbox preferences have been resolved for the current turn.
#[derive(Clone, Debug)]
pub struct UnifiedExecRequest {
pub command: Vec<String>,
pub process_id: i32,
pub cwd: PathBuf,
pub env: HashMap<String, String>,
pub explicit_env_overrides: HashMap<String, String>,
@@ -61,6 +64,8 @@ pub struct UnifiedExecRequest {
pub exec_approval_requirement: ExecApprovalRequirement,
}
/// Cache key for approval decisions that can be reused across equivalent
/// unified-exec launches.
#[derive(serde::Serialize, Clone, Debug, Eq, PartialEq, Hash)]
pub struct UnifiedExecApprovalKey {
pub command: Vec<String>,
@@ -70,12 +75,15 @@ pub struct UnifiedExecApprovalKey {
pub additional_permissions: Option<PermissionProfile>,
}
/// Runtime adapter that keeps policy and sandbox orchestration on the
/// unified-exec side while delegating process startup to the manager.
pub struct UnifiedExecRuntime<'a> {
manager: &'a UnifiedExecProcessManager,
shell_mode: UnifiedExecShellMode,
}
impl<'a> UnifiedExecRuntime<'a> {
/// Creates a runtime bound to the shared unified-exec process manager.
pub fn new(manager: &'a UnifiedExecProcessManager, shell_mode: UnifiedExecShellMode) -> Self {
Self {
manager,
@@ -232,12 +240,24 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.await?
{
Some(prepared) => {
if ctx
.turn
.environment
.experimental_exec_server_url()
.is_some()
{
return Err(ToolError::Rejected(
"unified_exec zsh-fork is not supported when experimental_exec_server_url is configured".to_string(),
));
}
return self
.manager
.open_session_with_exec_env(
req.process_id,
&prepared.exec_request,
req.tty,
prepared.spawn_lifecycle,
ctx.turn.environment.as_ref(),
)
.await
.map_err(|err| match err {
@@ -268,7 +288,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.env_for(command, options, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
self.manager
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
.open_session_with_exec_env(
req.process_id,
&exec_env,
req.tty,
Box::new(NoopSpawnLifecycle),
ctx.turn.environment.as_ref(),
)
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {

View File

@@ -20,6 +20,7 @@ use crate::protocol::ExecCommandSource;
use crate::protocol::ExecOutputStream;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventFailure;
use crate::tools::events::ToolEventStage;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
@@ -121,21 +122,36 @@ pub(crate) fn spawn_exit_watcher(
exit_token.cancelled().await;
output_drained.notified().await;
let exit_code = process.exit_code().unwrap_or(-1);
let duration = Instant::now().saturating_duration_since(started_at);
emit_exec_end_for_unified_exec(
session_ref,
turn_ref,
call_id,
command,
cwd,
Some(process_id.to_string()),
transcript,
String::new(),
exit_code,
duration,
)
.await;
if let Some(message) = process.failure_message() {
emit_failed_exec_end_for_unified_exec(
session_ref,
turn_ref,
call_id,
command,
cwd,
Some(process_id.to_string()),
transcript,
message,
duration,
)
.await;
} else {
let exit_code = process.exit_code().unwrap_or(-1);
emit_exec_end_for_unified_exec(
session_ref,
turn_ref,
call_id,
command,
cwd,
Some(process_id.to_string()),
transcript,
String::new(),
exit_code,
duration,
)
.await;
}
});
}
@@ -213,6 +229,52 @@ pub(crate) async fn emit_exec_end_for_unified_exec(
.await;
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn emit_failed_exec_end_for_unified_exec(
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
command: Vec<String>,
cwd: PathBuf,
process_id: Option<String>,
transcript: Arc<Mutex<HeadTailBuffer>>,
message: String,
duration: Duration,
) {
let stdout = resolve_aggregated_output(&transcript, String::new()).await;
let aggregated_output = if stdout.is_empty() {
message.clone()
} else {
format!("{stdout}\n{message}")
};
let output = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(stdout),
stderr: StreamOutput::new(message),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out: false,
};
let event_ctx = ToolEventCtx::new(
session_ref.as_ref(),
turn_ref.as_ref(),
&call_id,
/*turn_diff_tracker*/ None,
);
let emitter = ToolEmitter::unified_exec(
&command,
cwd,
ExecCommandSource::UnifiedExecStartup,
process_id,
);
emitter
.emit(
event_ctx,
ToolEventStage::Failure(ToolEventFailure::Output(output)),
)
.await;
}
fn split_valid_utf8_prefix(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
split_valid_utf8_prefix_with_max(buffer, UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES)
}

View File

@@ -5,6 +5,8 @@ use thiserror::Error;
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec process: {message}")]
CreateProcess { message: String },
#[error("Unified exec process failed: {message}")]
ProcessFailed { message: String },
// The model is trained on `session_id`, but internally we track a `process_id`.
#[error("Unknown process id {process_id}")]
UnknownProcessId { process_id: i32 },
@@ -28,6 +30,10 @@ impl UnifiedExecError {
Self::CreateProcess { message }
}
pub(crate) fn process_failed(message: String) -> Self {
Self::ProcessFailed { message }
}
pub(crate) fn sandbox_denied(message: String, output: ExecToolCallOutput) -> Self {
Self::SandboxDenied { message, output }
}

View File

@@ -19,6 +19,7 @@
//! This keeps policy logic and user interaction centralized while the PTY/process
//! concerns remain isolated here. The implementation is split between:
//! - `process.rs`: PTY process lifecycle + output buffering.
//! - `process_state.rs`: shared exit/failure state for local and remote processes.
//! - `process_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
use std::collections::HashMap;
@@ -42,6 +43,7 @@ mod errors;
mod head_tail_buffer;
mod process;
mod process_manager;
mod process_state;
pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
process_manager::set_deterministic_process_ids_for_tests(enabled);
@@ -167,6 +169,10 @@ pub(crate) fn generate_chunk_id() -> String {
.collect()
}
#[cfg(test)]
#[cfg(unix)]
#[path = "process_tests.rs"]
mod process_tests;
#[cfg(test)]
#[cfg(unix)]
#[path = "mod_tests.rs"]

View File

@@ -3,27 +3,26 @@ use super::*;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::make_session_and_context;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use crate::exec::ExecCapturePolicy;
use crate::exec::ExecExpiration;
use crate::sandboxing::ExecRequest;
use crate::tools::context::ExecCommandToolOutput;
use crate::unified_exec::ExecCommandRequest;
use crate::truncate::approx_token_count;
use crate::unified_exec::WriteStdinRequest;
use crate::unified_exec::process::OutputHandles;
use codex_sandboxing::SandboxType;
use core_test_support::get_remote_test_env;
use core_test_support::skip_if_sandbox;
use core_test_support::test_codex::test_env as remote_test_env;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::time::Duration;
use tokio::time::Instant;
async fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
let (session, mut turn) = make_session_and_context().await;
turn.approval_policy
.set(AskForApproval::Never)
.expect("test setup should allow updating approval policy");
turn.sandbox_policy
.set(SandboxPolicy::DangerFullAccess)
.expect("test setup should allow updating sandbox policy");
turn.file_system_sandbox_policy =
codex_protocol::permissions::FileSystemSandboxPolicy::from(turn.sandbox_policy.get());
turn.network_sandbox_policy =
codex_protocol::permissions::NetworkSandboxPolicy::from(turn.sandbox_policy.get());
let (session, turn) = make_session_and_context().await;
(Arc::new(session), Arc::new(turn))
}
@@ -32,36 +31,139 @@ async fn exec_command(
turn: &Arc<TurnContext>,
cmd: &str,
yield_time_ms: u64,
workdir: Option<PathBuf>,
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
exec_command_with_tty(session, turn, cmd, yield_time_ms, workdir, true).await
}
fn shell_env() -> HashMap<String, String> {
std::env::vars().collect()
}
fn test_exec_request(
turn: &TurnContext,
command: Vec<String>,
cwd: PathBuf,
env: HashMap<String, String>,
) -> ExecRequest {
ExecRequest {
command,
cwd,
env,
network: None,
expiration: ExecExpiration::DefaultTimeout,
capture_policy: ExecCapturePolicy::ShellTool,
sandbox: SandboxType::None,
windows_sandbox_level: turn.windows_sandbox_level,
windows_sandbox_private_desktop: false,
sandbox_permissions: SandboxPermissions::UseDefault,
sandbox_policy: turn.sandbox_policy.get().clone(),
file_system_sandbox_policy: turn.file_system_sandbox_policy.clone(),
network_sandbox_policy: turn.network_sandbox_policy,
justification: None,
arg0: None,
}
}
async fn exec_command_with_tty(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
cmd: &str,
yield_time_ms: u64,
workdir: Option<PathBuf>,
tty: bool,
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
let manager = &session.services.unified_exec_manager;
let process_id = manager.allocate_process_id().await;
let cwd = workdir.unwrap_or_else(|| turn.cwd.clone());
let command = vec!["bash".to_string(), "-lc".to_string(), cmd.to_string()];
let request = test_exec_request(turn, command.clone(), cwd.clone(), shell_env());
let process = Arc::new(
manager
.open_session_with_exec_env(
process_id,
&request,
tty,
Box::new(NoopSpawnLifecycle),
turn.environment.as_ref(),
)
.await?,
);
let context =
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
let process_id = session
.services
.unified_exec_manager
.allocate_process_id()
.await;
let started_at = Instant::now();
let process_started_alive = !process.has_exited() && process.exit_code().is_none();
if process_started_alive {
let entry = ProcessEntry {
process: Arc::clone(&process),
call_id: context.call_id.clone(),
process_id,
command: command.clone(),
tty,
network_approval_id: None,
session: Arc::downgrade(session),
last_used: started_at,
};
manager
.process_store
.lock()
.await
.processes
.insert(process_id, entry);
}
session
.services
.unified_exec_manager
.exec_command(
ExecCommandRequest {
command: vec!["bash".to_string(), "-lc".to_string(), cmd.to_string()],
process_id,
yield_time_ms,
max_output_tokens: None,
workdir: None,
network: None,
tty: true,
sandbox_permissions: SandboxPermissions::UseDefault,
additional_permissions: None,
additional_permissions_preapproved: false,
justification: None,
prefix_rule: None,
},
&context,
)
.await
let OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = process.output_handles();
let deadline = started_at + Duration::from_millis(yield_time_ms);
let collected = UnifiedExecProcessManager::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
Some(session.subscribe_out_of_band_elicitation_pause_state()),
deadline,
)
.await;
let wall_time = Instant::now().saturating_duration_since(started_at);
let text = String::from_utf8_lossy(&collected).to_string();
let has_exited = process.has_exited();
let exit_code = process.exit_code();
let response_process_id = if process_started_alive && !has_exited {
Some(process_id)
} else {
manager.release_process_id(process_id).await;
None
};
Ok(ExecCommandToolOutput {
event_call_id: context.call_id,
chunk_id: generate_chunk_id(),
wall_time,
raw_output: collected,
max_output_tokens: None,
process_id: response_process_id,
exit_code,
original_token_count: Some(approx_token_count(&text)),
session_command: Some(command),
})
}
#[derive(Debug)]
struct TestSpawnLifecycle {
inherited_fds: Vec<i32>,
}
impl SpawnLifecycle for TestSpawnLifecycle {
fn inherited_fds(&self) -> Vec<i32> {
self.inherited_fds.clone()
}
}
async fn write_stdin(
@@ -121,7 +223,7 @@ async fn unified_exec_persists_across_requests() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
let process_id = open_shell.process_id.expect("expected process_id");
write_stdin(
@@ -153,7 +255,7 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn().await;
let shell_a = exec_command(&session, &turn, "bash -i", 2_500).await?;
let shell_a = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
let session_a = shell_a.process_id.expect("expected process id");
write_stdin(
@@ -164,7 +266,14 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
)
.await?;
let out_2 = exec_command(&session, &turn, "echo $CODEX_INTERACTIVE_SHELL_VAR", 2_500).await?;
let out_2 = exec_command(
&session,
&turn,
"echo $CODEX_INTERACTIVE_SHELL_VAR",
2_500,
None,
)
.await?;
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
out_2.process_id.is_none(),
@@ -198,7 +307,7 @@ async fn unified_exec_timeouts() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
let process_id = open_shell.process_id.expect("expected process id");
write_stdin(
@@ -247,7 +356,14 @@ async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> {
});
let started = tokio::time::Instant::now();
let response = exec_command(&session, &turn, "sleep 1 && echo unified-exec-done", 250).await?;
let response = exec_command(
&session,
&turn,
"sleep 1 && echo unified-exec-done",
250,
None,
)
.await?;
assert!(
started.elapsed() >= Duration::from_secs(2),
@@ -270,7 +386,7 @@ async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> {
async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn().await;
let result = exec_command(&session, &turn, "echo codex", 120_000).await?;
let result = exec_command(&session, &turn, "echo codex", 120_000, None).await?;
assert!(result.process_id.is_some());
assert!(result.truncated_output().contains("codex"));
@@ -282,7 +398,7 @@ async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
#[ignore] // Ignored while we have a better way to test this.
async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn().await;
let result = exec_command(&session, &turn, "echo codex", 2_500).await?;
let result = exec_command(&session, &turn, "echo codex", 2_500, None).await?;
assert!(
result.process_id.is_some(),
@@ -310,7 +426,7 @@ async fn reusing_completed_process_returns_unknown_process() -> anyhow::Result<(
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
let process_id = open_shell.process_id.expect("expected process id");
write_stdin(&session, process_id, "exit\n", 2_500).await?;
@@ -341,3 +457,120 @@ async fn reusing_completed_process_returns_unknown_process() -> anyhow::Result<(
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn completed_pipe_commands_preserve_exit_code() -> anyhow::Result<()> {
let (_, turn) = make_session_and_context().await;
let request = test_exec_request(
&turn,
vec!["bash".to_string(), "-lc".to_string(), "exit 17".to_string()],
PathBuf::from("/tmp"),
shell_env(),
);
let environment = codex_exec_server::Environment::default();
let process = UnifiedExecProcessManager::default()
.open_session_with_exec_env(
1234,
&request,
false,
Box::new(NoopSpawnLifecycle),
&environment,
)
.await?;
assert!(process.has_exited());
assert_eq!(process.exit_code(), Some(17));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_uses_remote_exec_server_when_configured() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let Some(_remote_env) = get_remote_test_env() else {
return Ok(());
};
let remote_test_env = remote_test_env().await?;
let (_, turn) = make_session_and_context().await;
let request = test_exec_request(
&turn,
vec!["bash".to_string(), "-i".to_string()],
PathBuf::from("/tmp"),
shell_env(),
);
let manager = UnifiedExecProcessManager::default();
let process = manager
.open_session_with_exec_env(
1234,
&request,
true,
Box::new(NoopSpawnLifecycle),
remote_test_env.environment(),
)
.await?;
process.write(b"printf 'remote-unified-exec\\n'\n").await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let crate::unified_exec::process::OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = process.output_handles();
let collected = UnifiedExecProcessManager::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
None,
Instant::now() + Duration::from_millis(2_500),
)
.await;
assert!(String::from_utf8_lossy(&collected).contains("remote-unified-exec"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_exec_server_rejects_inherited_fd_launches() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let Some(_remote_env) = get_remote_test_env() else {
return Ok(());
};
let remote_test_env = remote_test_env().await?;
let (_, mut turn) = make_session_and_context().await;
turn.environment = Arc::new(remote_test_env.environment().clone());
let request = test_exec_request(
&turn,
vec!["bash".to_string(), "-lc".to_string(), "echo ok".to_string()],
PathBuf::from("/tmp"),
shell_env(),
);
let manager = UnifiedExecProcessManager::default();
let err = manager
.open_session_with_exec_env(
1234,
&request,
true,
Box::new(TestSpawnLifecycle {
inherited_fds: vec![42],
}),
turn.environment.as_ref(),
)
.await
.expect_err("expected inherited fd rejection");
assert_eq!(
err.to_string(),
"Failed to create unified exec process: remote exec-server does not support inherited file descriptors"
);
Ok(())
}

View File

@@ -6,8 +6,8 @@ use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
@@ -15,8 +15,12 @@ use tokio_util::sync::CancellationToken;
use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::exec::is_likely_sandbox_denied;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecSessionEvent;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
use codex_protocol::protocol::TruncationPolicy;
use codex_sandboxing::SandboxType;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::formatted_truncate_text;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::SpawnedPty;
@@ -24,6 +28,7 @@ use codex_utils_pty::SpawnedPty;
use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
use super::UnifiedExecError;
use super::head_tail_buffer::HeadTailBuffer;
use super::process_state::ProcessState;
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
/// Returns file descriptors that must stay open across the child `exec()`.
@@ -41,11 +46,13 @@ pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
pub(crate) type SpawnLifecycleHandle = Box<dyn SpawnLifecycle>;
#[derive(Debug, Default)]
/// Spawn lifecycle that performs no extra setup around process launch.
pub(crate) struct NoopSpawnLifecycle;
impl SpawnLifecycle for NoopSpawnLifecycle {}
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
/// Shared output state exposed to polling and streaming consumers.
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
pub(crate) output_notify: Arc<Notify>,
@@ -54,27 +61,44 @@ pub(crate) struct OutputHandles {
pub(crate) cancellation_token: CancellationToken,
}
#[derive(Debug)]
/// Transport-specific process handle used by unified exec.
enum ProcessHandle {
Local(Box<ExecCommandSession>),
Remote(Arc<dyn ExecProcess>),
}
/// Unified wrapper over local PTY sessions and exec-server-backed processes.
pub(crate) struct UnifiedExecProcess {
process_handle: ExecCommandSession,
output_rx: broadcast::Receiver<Vec<u8>>,
process_handle: ProcessHandle,
output_tx: broadcast::Sender<Vec<u8>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
cancellation_token: CancellationToken,
output_drained: Arc<Notify>,
output_task: JoinHandle<()>,
state_tx: watch::Sender<ProcessState>,
state_rx: watch::Receiver<ProcessState>,
output_task: Option<JoinHandle<()>>,
sandbox_type: SandboxType,
_spawn_lifecycle: SpawnLifecycleHandle,
_spawn_lifecycle: Option<SpawnLifecycleHandle>,
}
impl std::fmt::Debug for UnifiedExecProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnifiedExecProcess")
.field("has_exited", &self.has_exited())
.field("exit_code", &self.exit_code())
.field("sandbox_type", &self.sandbox_type)
.finish_non_exhaustive()
}
}
impl UnifiedExecProcess {
pub(super) fn new(
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
fn new(
process_handle: ProcessHandle,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
spawn_lifecycle: Option<SpawnLifecycleHandle>,
) -> Self {
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
let output_notify = Arc::new(Notify::new());
@@ -82,48 +106,50 @@ impl UnifiedExecProcess {
let output_closed_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let output_drained = Arc::new(Notify::new());
let mut receiver = initial_output_rx;
let output_rx = receiver.resubscribe();
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_closed_clone = Arc::clone(&output_closed);
let output_closed_notify_clone = Arc::clone(&output_closed_notify);
let output_task = tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(chunk) => {
let mut guard = buffer_clone.lock().await;
guard.push_chunk(chunk);
drop(guard);
notify_clone.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
output_closed_clone.store(true, Ordering::Release);
output_closed_notify_clone.notify_waiters();
break;
}
};
}
});
let (output_tx, _) = broadcast::channel(64);
let (state_tx, state_rx) = watch::channel(ProcessState::default());
Self {
process_handle,
output_rx,
output_tx,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
output_drained,
output_task,
state_tx,
state_rx,
output_task: None,
sandbox_type,
_spawn_lifecycle: spawn_lifecycle,
}
}
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.process_handle.writer_sender()
pub(super) async fn write(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
match &self.process_handle {
ProcessHandle::Local(process_handle) => process_handle
.writer_sender()
.send(data.to_vec())
.await
.map(|()| ())
.map_err(|_| UnifiedExecError::WriteToStdin),
ProcessHandle::Remote(process_handle) => {
match process_handle.write(data.to_vec()).await {
Ok(response) => match response.status {
WriteStatus::Accepted => Ok(()),
WriteStatus::UnknownProcess | WriteStatus::StdinClosed => {
let state = self.state_rx.borrow().clone();
let _ = self.state_tx.send_replace(state.exited(state.exit_code));
self.cancellation_token.cancel();
Err(UnifiedExecError::WriteToStdin)
}
WriteStatus::Starting => Err(UnifiedExecError::WriteToStdin),
},
Err(err) => Err(UnifiedExecError::process_failed(err.to_string())),
}
}
}
}
pub(super) fn output_handles(&self) -> OutputHandles {
@@ -137,7 +163,7 @@ impl UnifiedExecProcess {
}
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
self.output_rx.resubscribe()
self.output_tx.subscribe()
}
pub(super) fn cancellation_token(&self) -> CancellationToken {
@@ -149,19 +175,39 @@ impl UnifiedExecProcess {
}
pub(super) fn has_exited(&self) -> bool {
self.process_handle.has_exited()
let state = self.state_rx.borrow().clone();
match &self.process_handle {
ProcessHandle::Local(process_handle) => state.has_exited || process_handle.has_exited(),
ProcessHandle::Remote(_) => state.has_exited,
}
}
pub(super) fn exit_code(&self) -> Option<i32> {
self.process_handle.exit_code()
let state = self.state_rx.borrow().clone();
match &self.process_handle {
ProcessHandle::Local(process_handle) => {
state.exit_code.or_else(|| process_handle.exit_code())
}
ProcessHandle::Remote(_) => state.exit_code,
}
}
pub(super) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();
match &self.process_handle {
ProcessHandle::Local(process_handle) => process_handle.terminate(),
ProcessHandle::Remote(process_handle) => {
let process_handle = Arc::clone(process_handle);
tokio::spawn(async move {
let _ = process_handle.terminate().await;
});
}
}
self.cancellation_token.cancel();
self.output_task.abort();
if let Some(output_task) = &self.output_task {
output_task.abort();
}
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
@@ -173,6 +219,10 @@ impl UnifiedExecProcess {
self.sandbox_type
}
pub(super) fn failure_message(&self) -> Option<String> {
self.state_rx.borrow().failure_message.clone()
}
pub(super) async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
let _ =
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
@@ -232,29 +282,49 @@ impl UnifiedExecProcess {
mut exit_rx,
} = spawned;
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
let mut managed = Self::new(
ProcessHandle::Local(Box::new(process_handle)),
sandbox_type,
Some(spawn_lifecycle),
);
managed.output_task = Some(Self::spawn_local_output_task(
output_rx,
Arc::clone(&managed.output_buffer),
Arc::clone(&managed.output_notify),
Arc::clone(&managed.output_closed),
Arc::clone(&managed.output_closed_notify),
managed.output_tx.clone(),
));
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));
if exit_ready {
managed.signal_exit();
managed.check_for_sandbox_denial().await?;
return Ok(managed);
match exit_rx.try_recv() {
Ok(exit_code) => {
managed.signal_exit(Some(exit_code));
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
Err(TryRecvError::Closed) => {
managed.signal_exit(/*exit_code*/ None);
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
Err(TryRecvError::Empty) => {}
}
if tokio::time::timeout(Duration::from_millis(150), &mut exit_rx)
.await
.is_ok()
if let Ok(exit_result) =
tokio::time::timeout(Duration::from_millis(150), &mut exit_rx).await
{
managed.signal_exit();
managed.signal_exit(exit_result.ok());
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
tokio::spawn({
let state_tx = managed.state_tx.clone();
let cancellation_token = managed.cancellation_token.clone();
async move {
let _ = exit_rx.await;
let exit_code = exit_rx.await.ok();
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.exited(exit_code));
cancellation_token.cancel();
}
});
@@ -262,11 +332,115 @@ impl UnifiedExecProcess {
Ok(managed)
}
fn signal_exit(&self) {
fn spawn_remote_output_task(
started: StartedExecProcess,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
output_tx: broadcast::Sender<Vec<u8>>,
state_tx: watch::Sender<ProcessState>,
cancellation_token: CancellationToken,
) -> JoinHandle<()> {
let StartedExecProcess { mut events, .. } = started;
tokio::spawn(async move {
loop {
match events.recv().await {
Some(ExecSessionEvent::Output { chunk, .. }) => {
let mut guard = output_buffer.lock().await;
guard.push_chunk(chunk.clone());
drop(guard);
let _ = output_tx.send(chunk);
output_notify.notify_waiters();
}
Some(ExecSessionEvent::Exited { exit_code, .. }) => {
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.exited(Some(exit_code)));
cancellation_token.cancel();
}
Some(ExecSessionEvent::Closed { .. }) => {
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.exited(state.exit_code));
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
cancellation_token.cancel();
break;
}
Some(ExecSessionEvent::Failed { message }) => {
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.failed(message));
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
cancellation_token.cancel();
break;
}
None => {
let state = state_tx.borrow().clone();
let _ = state_tx.send_replace(state.exited(state.exit_code));
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
cancellation_token.cancel();
break;
}
}
}
})
}
fn spawn_local_output_task(
mut receiver: tokio::sync::broadcast::Receiver<Vec<u8>>,
buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
output_tx: broadcast::Sender<Vec<u8>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(chunk) => {
let mut guard = buffer.lock().await;
guard.push_chunk(chunk.clone());
drop(guard);
let _ = output_tx.send(chunk);
output_notify.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
output_closed.store(true, Ordering::Release);
output_closed_notify.notify_waiters();
break;
}
};
}
})
}
fn signal_exit(&self, exit_code: Option<i32>) {
let state = self.state_rx.borrow().clone();
let _ = self.state_tx.send_replace(state.exited(exit_code));
self.cancellation_token.cancel();
}
}
impl From<(StartedExecProcess, SandboxType)> for UnifiedExecProcess {
fn from((started, sandbox_type): (StartedExecProcess, SandboxType)) -> Self {
let process_handle = ProcessHandle::Remote(Arc::clone(&started.process));
let mut managed = Self::new(process_handle, sandbox_type, /*spawn_lifecycle*/ None);
managed.output_task = Some(Self::spawn_remote_output_task(
started,
Arc::clone(&managed.output_buffer),
Arc::clone(&managed.output_notify),
Arc::clone(&managed.output_closed),
Arc::clone(&managed.output_closed_notify),
managed.output_tx.clone(),
managed.state_tx.clone(),
managed.cancellation_token.clone(),
));
managed
}
}
impl Drop for UnifiedExecProcess {
fn drop(&mut self) {
self.terminate();

View File

@@ -7,7 +7,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::Instant;
@@ -40,6 +39,7 @@ use crate::unified_exec::UnifiedExecProcessManager;
use crate::unified_exec::WARNING_UNIFIED_EXEC_PROCESSES;
use crate::unified_exec::WriteStdinRequest;
use crate::unified_exec::async_watcher::emit_exec_end_for_unified_exec;
use crate::unified_exec::async_watcher::emit_failed_exec_end_for_unified_exec;
use crate::unified_exec::async_watcher::spawn_exit_watcher;
use crate::unified_exec::async_watcher::start_streaming_output;
use crate::unified_exec::clamp_yield_time;
@@ -89,8 +89,9 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
env
}
/// Borrowed process state prepared for a `write_stdin` or poll operation.
struct PreparedProcessHandles {
writer_tx: mpsc::Sender<Vec<u8>>,
process: Arc<UnifiedExecProcess>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
@@ -102,6 +103,10 @@ struct PreparedProcessHandles {
tty: bool,
}
fn exec_server_process_id(process_id: i32) -> String {
process_id.to_string()
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> i32 {
loop {
@@ -243,6 +248,29 @@ impl UnifiedExecProcessManager {
let text = String::from_utf8_lossy(&collected).to_string();
let chunk_id = generate_chunk_id();
if let Some(message) = process.failure_message() {
if !process_started_alive {
emit_failed_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd.clone(),
Some(request.process_id.to_string()),
Arc::clone(&transcript),
message.clone(),
wall_time,
)
.await;
}
self.release_process_id(request.process_id).await;
finish_deferred_network_approval(
context.session.as_ref(),
deferred_network_approval.take(),
)
.await;
return Err(UnifiedExecError::process_failed(message));
}
let process_id = request.process_id;
let (response_process_id, exit_code) = if process_started_alive {
match self.refresh_process_state(process_id).await {
@@ -312,7 +340,7 @@ impl UnifiedExecProcessManager {
let process_id = request.process_id;
let PreparedProcessHandles {
writer_tx,
process,
output_buffer,
output_notify,
output_closed,
@@ -324,15 +352,31 @@ impl UnifiedExecProcessManager {
tty,
..
} = self.prepare_process_handles(process_id).await?;
let mut status_after_write = None;
if !request.input.is_empty() {
if !tty {
return Err(UnifiedExecError::StdinClosed);
}
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
match process.write(request.input.as_bytes()).await {
Ok(()) => {
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(err) => {
let status = self.refresh_process_state(process_id).await;
if matches!(status, ProcessStatus::Exited { .. }) {
status_after_write = Some(status);
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
process.terminate();
self.release_process_id(process_id).await;
return Err(err);
} else {
return Err(err);
}
}
}
}
let yield_time_ms = {
@@ -362,12 +406,20 @@ impl UnifiedExecProcessManager {
let text = String::from_utf8_lossy(&collected).to_string();
let original_token_count = approx_token_count(&text);
let chunk_id = generate_chunk_id();
if let Some(message) = process.failure_message() {
self.release_process_id(process_id).await;
return Err(UnifiedExecError::process_failed(message));
}
// After polling, refresh_process_state tells us whether the PTY is
// still alive or has exited and been removed from the store; we thread
// that through so the handler can tag TerminalInteraction with an
// appropriate process_id and exit_code.
let status = self.refresh_process_state(process_id).await;
let status = if let Some(status) = status_after_write {
status
} else {
self.refresh_process_state(process_id).await
};
let (process_id, exit_code, event_call_id) = match status {
ProcessStatus::Alive {
exit_code,
@@ -455,7 +507,7 @@ impl UnifiedExecProcessManager {
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
Ok(PreparedProcessHandles {
writer_tx: entry.process.writer_sender(),
process: Arc::clone(&entry.process),
output_buffer,
output_notify,
output_closed,
@@ -468,16 +520,6 @@ impl UnifiedExecProcessManager {
})
}
async fn send_input(
writer_tx: &mpsc::Sender<Vec<u8>>,
data: &[u8],
) -> Result<(), UnifiedExecError> {
writer_tx
.send(data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)
}
#[allow(clippy::too_many_arguments)]
async fn store_process(
&self,
@@ -539,9 +581,11 @@ impl UnifiedExecProcessManager {
pub(crate) async fn open_session_with_exec_env(
&self,
process_id: i32,
env: &ExecRequest,
tty: bool,
mut spawn_lifecycle: SpawnLifecycleHandle,
environment: &codex_exec_server::Environment,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
let (program, args) = env
.command
@@ -549,6 +593,28 @@ impl UnifiedExecProcessManager {
.ok_or(UnifiedExecError::MissingCommandLine)?;
let inherited_fds = spawn_lifecycle.inherited_fds();
if environment.experimental_exec_server_url().is_some() {
if !inherited_fds.is_empty() {
return Err(UnifiedExecError::create_process(
"remote exec-server does not support inherited file descriptors".to_string(),
));
}
let started = environment
.get_exec_backend()
.start(codex_exec_server::ExecParams {
process_id: exec_server_process_id(process_id),
argv: env.command.clone(),
cwd: env.cwd.clone(),
env: env.env.clone(),
tty,
arg0: env.arg0.clone(),
})
.await
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
return Ok(UnifiedExecProcess::from((started, env.sandbox)));
}
let spawn_result = if tty {
codex_utils_pty::pty::spawn_process_with_inherited_fds(
program,
@@ -611,6 +677,7 @@ impl UnifiedExecProcessManager {
.await;
let req = UnifiedExecToolRequest {
command: request.command.clone(),
process_id: request.process_id,
cwd,
env,
explicit_env_overrides: context.turn.shell_environment_policy.r#set.clone(),

View File

@@ -34,6 +34,11 @@ fn unified_exec_env_overrides_existing_values() {
assert_eq!(env.get("PATH"), Some(&"/usr/bin".to_string()));
}
#[test]
fn exec_server_process_id_matches_unified_exec_process_id() {
assert_eq!(exec_server_process_id(4321), "4321");
}
#[test]
fn pruning_prefers_exited_processes_outside_recently_used() {
let now = Instant::now();

View File

@@ -0,0 +1,24 @@
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub(crate) struct ProcessState {
pub(crate) has_exited: bool,
pub(crate) exit_code: Option<i32>,
pub(crate) failure_message: Option<String>,
}
impl ProcessState {
pub(crate) fn exited(&self, exit_code: Option<i32>) -> Self {
Self {
has_exited: true,
exit_code,
failure_message: self.failure_message.clone(),
}
}
pub(crate) fn failed(&self, message: String) -> Self {
Self {
has_exited: true,
exit_code: self.exit_code,
failure_message: Some(message),
}
}
}

View File

@@ -0,0 +1,73 @@
use super::process::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecError;
use async_trait::async_trait;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecServerError;
use codex_exec_server::ProcessId;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteResponse;
use codex_exec_server::WriteStatus;
use codex_sandboxing::SandboxType;
use std::sync::Arc;
use tokio::sync::mpsc;
struct MockExecProcess {
process_id: ProcessId,
write_response: WriteResponse,
}
#[async_trait]
impl ExecProcess for MockExecProcess {
fn process_id(&self) -> &ProcessId {
&self.process_id
}
async fn write(&self, _chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
Ok(self.write_response.clone())
}
async fn terminate(&self) -> Result<(), ExecServerError> {
Ok(())
}
}
fn remote_process(write_status: WriteStatus) -> UnifiedExecProcess {
let (_events_tx, events_rx) = mpsc::channel(1);
let started = StartedExecProcess {
process: Arc::new(MockExecProcess {
process_id: "test-process".to_string().into(),
write_response: WriteResponse {
status: write_status,
},
}),
events: events_rx,
};
UnifiedExecProcess::from((started, SandboxType::None))
}
#[tokio::test]
async fn remote_write_unknown_process_marks_process_exited() {
let process = remote_process(WriteStatus::UnknownProcess);
let err = process
.write(b"hello")
.await
.expect_err("expected write failure");
assert!(matches!(err, UnifiedExecError::WriteToStdin));
assert!(process.has_exited());
}
#[tokio::test]
async fn remote_write_closed_stdin_marks_process_exited() {
let process = remote_process(WriteStatus::StdinClosed);
let err = process
.write(b"hello")
.await
.expect_err("expected write failure");
assert!(matches!(err, UnifiedExecError::WriteToStdin));
assert!(process.has_exited());
}

View File

@@ -15,6 +15,7 @@ path = "src/bin/codex-exec-server.rs"
workspace = true
[dependencies]
arc-swap = "1.8.2"
async-trait = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive"] }

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
@@ -17,22 +19,25 @@ use codex_app_server_protocol::FsWriteFileParams;
use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCNotification;
use serde_json::Value;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::warn;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecServerEvent;
use crate::process::ExecSessionEvent;
use crate::process::SESSION_EVENT_CHANNEL_CAPACITY;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
@@ -92,7 +97,15 @@ impl RemoteExecServerConnectArgs {
struct Inner {
client: RpcClient,
events_tx: broadcast::Sender<ExecServerEvent>,
// The remote transport delivers one shared notification stream for every
// process on the connection. Keep a local process_id -> sender registry so
// we can demux those connection-global notifications into the single
// process-scoped event channel returned by ExecBackend::start().
sessions: ArcSwap<HashMap<String, mpsc::Sender<ExecSessionEvent>>>,
// ArcSwap makes reads cheap on the hot notification path, but writes still
// need serialization so concurrent register/remove operations do not
// overwrite each other's copy-on-write updates.
sessions_write_lock: Mutex<()>,
reader_task: tokio::task::JoinHandle<()>,
}
@@ -158,10 +171,6 @@ impl ExecServerClient {
.await
}
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
}
pub async fn initialize(
&self,
options: ExecServerClientConnectOptions,
@@ -307,6 +316,35 @@ impl ExecServerClient {
.map_err(Into::into)
}
pub(crate) async fn register_session(
&self,
process_id: &str,
) -> Result<mpsc::Receiver<ExecSessionEvent>, ExecServerError> {
let (events_tx, events_rx) = mpsc::channel(SESSION_EVENT_CHANNEL_CAPACITY);
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
let sessions = self.inner.sessions.load();
if sessions.contains_key(process_id) {
return Err(ExecServerError::Protocol(format!(
"session already registered for process {process_id}"
)));
}
let mut next_sessions = sessions.as_ref().clone();
next_sessions.insert(process_id.to_string(), events_tx);
self.inner.sessions.store(Arc::new(next_sessions));
Ok(events_rx)
}
pub(crate) async fn unregister_session(&self, process_id: &str) {
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
let sessions = self.inner.sessions.load();
if !sessions.contains_key(process_id) {
return;
}
let mut next_sessions = sessions.as_ref().clone();
next_sessions.remove(process_id);
self.inner.sessions.store(Arc::new(next_sessions));
}
async fn connect(
connection: JsonRpcConnection,
options: ExecServerClientConnectOptions,
@@ -322,13 +360,18 @@ impl ExecServerClient {
&& let Err(err) =
handle_server_notification(&inner, notification).await
{
warn!("exec-server client closing after protocol error: {err}");
fail_all_sessions(
&inner,
format!("exec-server notification handling failed: {err}"),
)
.await;
return;
}
}
RpcClientEvent::Disconnected { reason } => {
if let Some(reason) = reason {
warn!("exec-server client transport disconnected: {reason}");
if let Some(inner) = weak.upgrade() {
fail_all_sessions(&inner, disconnected_message(reason.as_deref()))
.await;
}
return;
}
@@ -338,7 +381,8 @@ impl ExecServerClient {
Inner {
client: rpc_client,
events_tx: broadcast::channel(256).0,
sessions: ArcSwap::from_pointee(HashMap::new()),
sessions_write_lock: Mutex::new(()),
reader_task,
}
});
@@ -370,6 +414,32 @@ impl From<RpcCallError> for ExecServerError {
}
}
fn disconnected_message(reason: Option<&str>) -> String {
match reason {
Some(reason) => format!("exec-server transport disconnected: {reason}"),
None => "exec-server transport disconnected".to_string(),
}
}
async fn fail_all_sessions(inner: &Arc<Inner>, message: String) {
let sessions = {
let _sessions_write_guard = inner.sessions_write_lock.lock().await;
let sessions = inner.sessions.load();
let drained_sessions = sessions.as_ref().clone();
inner.sessions.store(Arc::new(HashMap::new()));
drained_sessions
};
for (_, events_tx) in sessions {
// Do not block disconnect handling behind a full bounded queue. Best
// effort deliver a terminal failure event, then drop the sender so
// receivers still observe EOF if the queue was already saturated.
let _ = events_tx.try_send(ExecSessionEvent::Failed {
message: message.clone(),
});
}
}
async fn handle_server_notification(
inner: &Arc<Inner>,
notification: JSONRPCNotification,
@@ -378,12 +448,53 @@ async fn handle_server_notification(
EXEC_OUTPUT_DELTA_METHOD => {
let params: ExecOutputDeltaNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
// Remote exec-server notifications are connection-global, so route
// each event to the single local receiver that owns this process.
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx
.send(ExecSessionEvent::Output {
seq: params.seq,
stream: params.stream,
chunk: params.chunk.into_inner(),
})
.await;
}
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx
.send(ExecSessionEvent::Exited {
seq: params.seq,
exit_code: params.exit_code,
})
.await;
}
}
EXEC_CLOSED_METHOD => {
let params: ExecClosedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let events_tx = {
let _sessions_write_guard = inner.sessions_write_lock.lock().await;
let sessions = inner.sessions.load();
let events_tx = sessions.get(&params.process_id).cloned();
if events_tx.is_some() {
// Closed is the terminal lifecycle event for this process,
// so drop the routing entry before forwarding it.
let mut next_sessions = sessions.as_ref().clone();
next_sessions.remove(&params.process_id);
inner.sessions.store(Arc::new(next_sessions));
}
events_tx
};
if let Some(events_tx) = events_tx {
let _ = events_tx
.send(ExecSessionEvent::Closed { seq: params.seq })
.await;
}
}
other => {
debug!("ignoring unknown exec-server notification: {other}");

View File

@@ -6,19 +6,19 @@ use crate::RemoteExecServerConnectArgs;
use crate::file_system::ExecutorFileSystem;
use crate::local_file_system::LocalFileSystem;
use crate::local_process::LocalProcess;
use crate::process::ExecProcess;
use crate::process::ExecBackend;
use crate::remote_file_system::RemoteFileSystem;
use crate::remote_process::RemoteProcess;
pub trait ExecutorEnvironment: Send + Sync {
fn get_executor(&self) -> Arc<dyn ExecProcess>;
fn get_exec_backend(&self) -> Arc<dyn ExecBackend>;
}
#[derive(Clone)]
pub struct Environment {
experimental_exec_server_url: Option<String>,
remote_exec_server_client: Option<ExecServerClient>,
executor: Arc<dyn ExecProcess>,
exec_backend: Arc<dyn ExecBackend>,
}
impl Default for Environment {
@@ -34,7 +34,7 @@ impl Default for Environment {
Self {
experimental_exec_server_url: None,
remote_exec_server_client: None,
executor: Arc::new(local_process),
exec_backend: Arc::new(local_process),
}
}
}
@@ -68,24 +68,24 @@ impl Environment {
None
};
let executor: Arc<dyn ExecProcess> = if let Some(client) = remote_exec_server_client.clone()
{
Arc::new(RemoteProcess::new(client))
} else {
let local_process = LocalProcess::default();
local_process
.initialize()
.map_err(|err| ExecServerError::Protocol(err.message))?;
local_process
.initialized()
.map_err(ExecServerError::Protocol)?;
Arc::new(local_process)
};
let exec_backend: Arc<dyn ExecBackend> =
if let Some(client) = remote_exec_server_client.clone() {
Arc::new(RemoteProcess::new(client))
} else {
let local_process = LocalProcess::default();
local_process
.initialize()
.map_err(|err| ExecServerError::Protocol(err.message))?;
local_process
.initialized()
.map_err(ExecServerError::Protocol)?;
Arc::new(local_process)
};
Ok(Self {
experimental_exec_server_url,
remote_exec_server_client,
executor,
exec_backend,
})
}
@@ -93,8 +93,8 @@ impl Environment {
self.experimental_exec_server_url.as_deref()
}
pub fn get_executor(&self) -> Arc<dyn ExecProcess> {
Arc::clone(&self.executor)
pub fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
Arc::clone(&self.exec_backend)
}
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
@@ -107,8 +107,8 @@ impl Environment {
}
impl ExecutorEnvironment for Environment {
fn get_executor(&self) -> Arc<dyn ExecProcess> {
Arc::clone(&self.executor)
fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
Arc::clone(&self.exec_backend)
}
}
@@ -130,7 +130,7 @@ mod tests {
let environment = Environment::default();
let response = environment
.get_executor()
.get_exec_backend()
.start(crate::ExecParams {
process_id: "default-env-proc".to_string(),
argv: vec!["true".to_string()],
@@ -142,11 +142,6 @@ mod tests {
.await
.expect("start process");
assert_eq!(
response,
crate::ExecResponse {
process_id: "default-env-proc".to_string(),
}
);
assert_eq!(response.process.process_id().as_str(), "default-env-proc");
}
}

View File

@@ -39,8 +39,12 @@ pub use file_system::FileMetadata;
pub use file_system::FileSystemResult;
pub use file_system::ReadDirectoryEntry;
pub use file_system::RemoveOptions;
pub use process::ExecBackend;
pub use process::ExecProcess;
pub use process::ExecServerEvent;
pub use process::ExecSessionEvent;
pub use process::ProcessId;
pub use process::StartedExecProcess;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
@@ -54,6 +58,7 @@ pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub use server::run_main;

View File

@@ -11,13 +11,17 @@ use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tracing::warn;
use crate::ExecBackend;
use crate::ExecProcess;
use crate::ExecServerError;
use crate::ExecServerEvent;
use crate::ExecSessionEvent;
use crate::ProcessId;
use crate::StartedExecProcess;
use crate::process::SESSION_EVENT_CHANNEL_CAPACITY;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
@@ -31,6 +35,7 @@ use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::protocol::WriteStatus;
use crate::rpc::RpcNotificationSender;
use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::internal_error;
@@ -38,7 +43,6 @@ use crate::rpc::invalid_params;
use crate::rpc::invalid_request;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
const EVENT_CHANNEL_CAPACITY: usize = 256;
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
#[cfg(test)]
const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25);
@@ -60,6 +64,9 @@ struct RunningProcess {
next_seq: u64,
exit_code: Option<i32>,
output_notify: Arc<Notify>,
session_events_tx: mpsc::Sender<ExecSessionEvent>,
open_streams: usize,
closed: bool,
}
enum ProcessEntry {
@@ -69,7 +76,6 @@ enum ProcessEntry {
struct Inner {
notifications: RpcNotificationSender,
events_tx: broadcast::Sender<ExecServerEvent>,
processes: Mutex<HashMap<String, ProcessEntry>>,
initialize_requested: AtomicBool,
initialized: AtomicBool,
@@ -80,6 +86,11 @@ pub(crate) struct LocalProcess {
inner: Arc<Inner>,
}
struct LocalExecProcess {
process_id: ProcessId,
backend: LocalProcess,
}
impl Default for LocalProcess {
fn default() -> Self {
let (outgoing_tx, mut outgoing_rx) =
@@ -94,7 +105,6 @@ impl LocalProcess {
Self {
inner: Arc::new(Inner {
notifications,
events_tx: broadcast::channel(EVENT_CHANNEL_CAPACITY).0,
processes: Mutex::new(HashMap::new()),
initialize_requested: AtomicBool::new(false),
initialized: AtomicBool::new(false),
@@ -152,10 +162,12 @@ impl LocalProcess {
Ok(())
}
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
async fn start_process(
&self,
params: ExecParams,
) -> Result<(ExecResponse, mpsc::Receiver<ExecSessionEvent>), JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
let (program, args) = params
.argv
.split_first()
@@ -203,6 +215,7 @@ impl LocalProcess {
};
let output_notify = Arc::new(Notify::new());
let (session_events_tx, session_events_rx) = mpsc::channel(SESSION_EVENT_CHANNEL_CAPACITY);
{
let mut process_map = self.inner.processes.lock().await;
process_map.insert(
@@ -215,6 +228,9 @@ impl LocalProcess {
next_seq: 1,
exit_code: None,
output_notify: Arc::clone(&output_notify),
session_events_tx: session_events_tx.clone(),
open_streams: 2,
closed: false,
})),
);
}
@@ -248,7 +264,13 @@ impl LocalProcess {
output_notify,
));
Ok(ExecResponse { process_id })
Ok((ExecResponse { process_id }, session_events_rx))
}
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
self.start_process(params)
.await
.map(|(response, _)| response)
}
pub(crate) async fn exec_read(
@@ -256,6 +278,7 @@ impl LocalProcess {
params: ReadParams,
) -> Result<ReadResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let _process_id = params.process_id.clone();
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
@@ -309,6 +332,11 @@ impl LocalProcess {
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
let _total_bytes: usize = response
.chunks
.iter()
.map(|chunk| chunk.chunk.0.len())
.sum();
return Ok(response);
}
@@ -325,22 +353,24 @@ impl LocalProcess {
params: WriteParams,
) -> Result<WriteResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let _process_id = params.process_id.clone();
let _input_bytes = params.chunk.0.len();
let writer_tx = {
let process_map = self.inner.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
invalid_request(format!("unknown process id {}", params.process_id))
})?;
let Some(process) = process_map.get(&params.process_id) else {
return Ok(WriteResponse {
status: WriteStatus::UnknownProcess,
});
};
let ProcessEntry::Running(process) = process else {
return Err(invalid_request(format!(
"process id {} is starting",
params.process_id
)));
return Ok(WriteResponse {
status: WriteStatus::Starting,
});
};
if !process.tty {
return Err(invalid_request(format!(
"stdin is closed for process {}",
params.process_id
)));
return Ok(WriteResponse {
status: WriteStatus::StdinClosed,
});
}
process.session.writer_sender()
};
@@ -350,7 +380,9 @@ impl LocalProcess {
.await
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
Ok(WriteResponse { accepted: true })
Ok(WriteResponse {
status: WriteStatus::Accepted,
})
}
pub(crate) async fn terminate_process(
@@ -358,6 +390,7 @@ impl LocalProcess {
params: TerminateParams,
) -> Result<TerminateResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let _process_id = params.process_id.clone();
let running = {
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
@@ -377,15 +410,38 @@ impl LocalProcess {
}
#[async_trait]
impl ExecProcess for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.exec(params).await.map_err(map_handler_error)
impl ExecBackend for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
let (response, events) = self
.start_process(params)
.await
.map_err(map_handler_error)?;
Ok(StartedExecProcess {
process: Arc::new(LocalExecProcess {
process_id: response.process_id.into(),
backend: self.clone(),
}),
events,
})
}
}
#[async_trait]
impl ExecProcess for LocalExecProcess {
fn process_id(&self) -> &ProcessId {
&self.process_id
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.exec_read(params).await.map_err(map_handler_error)
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
self.backend.write(&self.process_id, chunk).await
}
async fn terminate(&self) -> Result<(), ExecServerError> {
self.backend.terminate(&self.process_id).await
}
}
impl LocalProcess {
async fn write(
&self,
process_id: &str,
@@ -399,16 +455,13 @@ impl ExecProcess for LocalProcess {
.map_err(map_handler_error)
}
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
async fn terminate(&self, process_id: &str) -> Result<(), ExecServerError> {
self.terminate_process(TerminateParams {
process_id: process_id.to_string(),
})
.await
.map_err(map_handler_error)
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
.map_err(map_handler_error)?;
Ok(())
}
}
@@ -427,7 +480,8 @@ async fn stream_output(
output_notify: Arc<Notify>,
) {
while let Some(chunk) = receiver.recv().await {
let notification = {
let _chunk_len = chunk.len();
let (events_tx, event, notification) = {
let mut processes = inner.processes.lock().await;
let Some(entry) = processes.get_mut(&process_id) else {
break;
@@ -448,21 +502,25 @@ async fn stream_output(
break;
};
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
warn!(
"retained output cap exceeded for process {process_id}; dropping oldest output"
);
}
ExecOutputDeltaNotification {
process_id: process_id.clone(),
let event = ExecSessionEvent::Output {
seq,
stream,
chunk: chunk.into(),
}
chunk: chunk.clone(),
};
(
process.session_events_tx.clone(),
event,
ExecOutputDeltaNotification {
process_id: process_id.clone(),
seq,
stream,
chunk: chunk.into(),
},
)
};
output_notify.notify_waiters();
let _ = inner
.events_tx
.send(ExecServerEvent::OutputDelta(notification.clone()));
let _ = events_tx.send(event).await;
if inner
.notifications
.notify(crate::protocol::EXEC_OUTPUT_DELTA_METHOD, &notification)
@@ -472,6 +530,8 @@ async fn stream_output(
break;
}
}
finish_output_stream(process_id, inner).await;
}
async fn watch_exit(
@@ -481,28 +541,39 @@ async fn watch_exit(
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
{
let notification = {
let mut processes = inner.processes.lock().await;
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
let seq = process.next_seq;
process.next_seq += 1;
process.exit_code = Some(exit_code);
Some((
process.session_events_tx.clone(),
ExecSessionEvent::Exited { seq, exit_code },
ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
},
))
} else {
None
}
};
output_notify.notify_waiters();
if let Some((events_tx, event, notification)) = notification {
let _ = events_tx.send(event).await;
if inner
.notifications
.notify(crate::protocol::EXEC_EXITED_METHOD, &notification)
.await
.is_err()
{
return;
}
}
output_notify.notify_waiters();
let notification = ExecExitedNotification {
process_id: process_id.clone(),
exit_code,
};
let _ = inner
.events_tx
.send(ExecServerEvent::Exited(notification.clone()));
if inner
.notifications
.notify(crate::protocol::EXEC_EXITED_METHOD, &notification)
.await
.is_err()
{
return;
}
maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = inner.processes.lock().await;
@@ -513,3 +584,56 @@ async fn watch_exit(
processes.remove(&process_id);
}
}
async fn finish_output_stream(process_id: String, inner: Arc<Inner>) {
{
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
};
if process.open_streams > 0 {
process.open_streams -= 1;
}
}
maybe_emit_closed(process_id, inner).await;
}
async fn maybe_emit_closed(process_id: String, inner: Arc<Inner>) {
let notification = {
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
};
if process.closed || process.open_streams != 0 || process.exit_code.is_none() {
return;
}
process.closed = true;
let seq = process.next_seq;
process.next_seq += 1;
Some((
process.session_events_tx.clone(),
ExecSessionEvent::Closed { seq },
ExecClosedNotification {
process_id: process_id.clone(),
seq,
},
))
};
let Some((events_tx, event, notification)) = notification else {
return;
};
let _ = events_tx.send(event).await;
if inner
.notifications
.notify(EXEC_CLOSED_METHOD, &notification)
.await
.is_err()
{}
}

View File

@@ -1,35 +1,90 @@
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use crate::ExecServerError;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
pub(crate) const SESSION_EVENT_CHANNEL_CAPACITY: usize = 2048;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
pub enum ExecSessionEvent {
Output {
seq: u64,
stream: ExecOutputStream,
chunk: Vec<u8>,
},
Exited {
seq: u64,
exit_code: i32,
},
Closed {
seq: u64,
},
Failed {
message: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProcessId(String);
pub struct StartedExecProcess {
pub process: Arc<dyn ExecProcess>,
pub events: mpsc::Receiver<ExecSessionEvent>,
}
impl ProcessId {
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_inner(self) -> String {
self.0
}
}
impl Deref for ProcessId {
type Target = str;
fn deref(&self) -> &Self::Target {
self.as_str()
}
}
impl AsRef<str> for ProcessId {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl fmt::Display for ProcessId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl From<String> for ProcessId {
fn from(value: String) -> Self {
Self(value)
}
}
#[async_trait]
pub trait ExecProcess: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError>;
fn process_id(&self) -> &ProcessId;
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError>;
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError>;
async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError>;
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError>;
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent>;
async fn terminate(&self) -> Result<(), ExecServerError>;
}
#[async_trait]
pub trait ExecBackend: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError>;
}

View File

@@ -13,6 +13,7 @@ pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
pub const EXEC_CLOSED_METHOD: &str = "process/closed";
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
@@ -99,10 +100,19 @@ pub struct WriteParams {
pub chunk: ByteChunk,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum WriteStatus {
Accepted,
UnknownProcess,
StdinClosed,
Starting,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteResponse {
pub accepted: bool,
pub status: WriteStatus,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -129,6 +139,7 @@ pub enum ExecOutputStream {
#[serde(rename_all = "camelCase")]
pub struct ExecOutputDeltaNotification {
pub process_id: String,
pub seq: u64,
pub stream: ExecOutputStream,
pub chunk: ByteChunk,
}
@@ -137,9 +148,17 @@ pub struct ExecOutputDeltaNotification {
#[serde(rename_all = "camelCase")]
pub struct ExecExitedNotification {
pub process_id: String,
pub seq: u64,
pub exit_code: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecClosedNotification {
pub process_id: String,
pub seq: u64,
}
mod base64_bytes {
use super::BASE64_STANDARD;
use base64::Engine as _;

View File

@@ -1,15 +1,14 @@
use async_trait::async_trait;
use tokio::sync::broadcast;
use std::sync::Arc;
use async_trait::async_trait;
use crate::ExecBackend;
use crate::ExecProcess;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::ExecServerEvent;
use crate::ProcessId;
use crate::StartedExecProcess;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;
#[derive(Clone)]
@@ -17,21 +16,15 @@ pub(crate) struct RemoteProcess {
client: ExecServerClient,
}
struct RemoteExecProcess {
process_id: ProcessId,
backend: RemoteProcess,
}
impl RemoteProcess {
pub(crate) fn new(client: ExecServerClient) -> Self {
Self { client }
}
}
#[async_trait]
impl ExecProcess for RemoteProcess {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.client.exec(params).await
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.client.read(params).await
}
async fn write(
&self,
@@ -41,11 +34,43 @@ impl ExecProcess for RemoteProcess {
self.client.write(process_id, chunk).await
}
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
self.client.terminate(process_id).await
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
self.client.event_receiver()
async fn terminate_process(&self, process_id: &str) -> Result<(), ExecServerError> {
self.client.terminate(process_id).await?;
Ok(())
}
}
#[async_trait]
impl ExecBackend for RemoteProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
let process_id = params.process_id.clone();
let events = self.client.register_session(&process_id).await?;
if let Err(err) = self.client.exec(params).await {
self.client.unregister_session(&process_id).await;
return Err(err);
}
Ok(StartedExecProcess {
process: Arc::new(RemoteExecProcess {
process_id: process_id.into(),
backend: self.clone(),
}),
events,
})
}
}
#[async_trait]
impl ExecProcess for RemoteExecProcess {
fn process_id(&self) -> &ProcessId {
&self.process_id
}
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
self.backend.write(&self.process_id, chunk).await
}
async fn terminate(&self) -> Result<(), ExecServerError> {
self.backend.terminate_process(&self.process_id).await
}
}

View File

@@ -19,7 +19,6 @@ use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::warn;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
@@ -192,12 +191,12 @@ impl RpcClient {
if let Err(err) =
handle_server_message(&pending_for_reader, &event_tx, message).await
{
warn!("JSON-RPC client closing after protocol error: {err}");
let _ = err;
break;
}
}
JsonRpcConnectionEvent::MalformedMessage { reason } => {
warn!("JSON-RPC client closing after malformed message: {reason}");
let _ = reason;
break;
}
JsonRpcConnectionEvent::Disconnected { reason } => {

View File

@@ -6,19 +6,23 @@ use std::sync::Arc;
use anyhow::Result;
use codex_exec_server::Environment;
use codex_exec_server::ExecBackend;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecResponse;
use codex_exec_server::ReadParams;
use codex_exec_server::ExecSessionEvent;
use codex_exec_server::StartedExecProcess;
use pretty_assertions::assert_eq;
use test_case::test_case;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::time::timeout;
use common::exec_server::ExecServerHarness;
use common::exec_server::exec_server;
struct ProcessContext {
process: Arc<dyn ExecProcess>,
_server: Option<ExecServerHarness>,
backend: Arc<dyn ExecBackend>,
server: Option<ExecServerHarness>,
}
async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
@@ -26,22 +30,22 @@ async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
let server = exec_server().await?;
let environment = Environment::create(Some(server.websocket_url().to_string())).await?;
Ok(ProcessContext {
process: environment.get_executor(),
_server: Some(server),
backend: environment.get_exec_backend(),
server: Some(server),
})
} else {
let environment = Environment::create(None).await?;
Ok(ProcessContext {
process: environment.get_executor(),
_server: None,
backend: environment.get_exec_backend(),
server: None,
})
}
}
async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let response = context
.process
let session = context
.backend
.start(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["true".to_string()],
@@ -51,28 +55,193 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
arg0: None,
})
.await?;
assert_eq!(
response,
ExecResponse {
process_id: "proc-1".to_string(),
}
);
assert_eq!(session.process.process_id().as_str(), "proc-1");
let mut events = session.events;
let mut next_seq = 0;
let mut exit_code = None;
loop {
let read = context
.process
.read(ReadParams {
process_id: "proc-1".to_string(),
after_seq: Some(next_seq),
max_bytes: None,
wait_ms: Some(100),
})
.await?;
next_seq = read.next_seq;
if read.exited {
assert_eq!(read.exit_code, Some(0));
break;
match timeout(Duration::from_secs(2), events.recv()).await? {
Some(event) => match event {
ExecSessionEvent::Exited {
exit_code: code, ..
} => exit_code = Some(code),
ExecSessionEvent::Closed { .. } => break,
ExecSessionEvent::Output { .. } => {}
ExecSessionEvent::Failed { message } => {
anyhow::bail!("process failed before Closed event: {message}")
}
},
None => anyhow::bail!("event stream closed before Closed event"),
}
}
assert_eq!(exit_code, Some(0));
Ok(())
}
async fn collect_process_output_from_events(
session: Arc<dyn ExecProcess>,
mut events: mpsc::Receiver<ExecSessionEvent>,
) -> Result<(String, i32, bool)> {
let mut output = String::new();
let mut exit_code = None;
loop {
match timeout(Duration::from_secs(2), events.recv()).await? {
Some(event) => match event {
ExecSessionEvent::Output { chunk, .. } => {
output.push_str(&String::from_utf8_lossy(&chunk));
}
ExecSessionEvent::Exited {
exit_code: code, ..
} => exit_code = Some(code),
ExecSessionEvent::Closed { .. } => {
break;
}
ExecSessionEvent::Failed { message } => {
anyhow::bail!("process failed before Closed event: {message}");
}
},
None => {
anyhow::bail!("event stream closed before Closed event");
}
}
}
drop(session);
Ok((output, exit_code.unwrap_or(-1), true))
}
async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stream".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.05; printf 'session output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process, events } = session;
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
assert_eq!(output, "session output\n");
assert_eq!(exit_code, 0);
assert!(closed);
Ok(())
}
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-stdin".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone(),
argv: vec![
"/usr/bin/python3".to_string(),
"-c".to_string(),
"import sys; line = sys.stdin.readline(); sys.stdout.write(f'from-stdin:{line}'); sys.stdout.flush()".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: true,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
tokio::time::sleep(Duration::from_millis(200)).await;
session.process.write(b"hello\n".to_vec()).await?;
let StartedExecProcess { process, events } = session;
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
assert!(
output.contains("from-stdin:hello"),
"unexpected output: {output:?}"
);
assert_eq!(exit_code, 0);
assert!(closed);
Ok(())
}
async fn assert_exec_process_preserves_queued_events_before_subscribe(
use_remote: bool,
) -> Result<()> {
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: "proc-queued".to_string(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"printf 'queued output\\n'".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let StartedExecProcess { process, events } = session;
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
assert_eq!(output, "queued output\n");
assert_eq!(exit_code, 0);
assert!(closed);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
let mut context = create_process_context(/*use_remote*/ true).await?;
let session = context
.backend
.start(ExecParams {
process_id: "proc-disconnect".to_string(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 10".to_string(),
],
cwd: std::env::current_dir()?,
env: Default::default(),
tty: false,
arg0: None,
})
.await?;
let server = context
.server
.as_mut()
.expect("remote context should include exec-server harness");
server.shutdown().await?;
let mut events = session.events;
loop {
match timeout(Duration::from_secs(2), events.recv()).await? {
Some(ExecSessionEvent::Failed { message }) => {
assert!(
message.starts_with("exec-server transport disconnected"),
"unexpected failure message: {message}"
);
break;
}
Some(ExecSessionEvent::Output { .. } | ExecSessionEvent::Exited { .. }) => {}
Some(ExecSessionEvent::Closed { .. }) => {
anyhow::bail!("received Closed instead of transport failure")
}
None => anyhow::bail!("event stream closed before Failed event"),
}
}
@@ -85,3 +254,24 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
assert_exec_process_starts_and_exits(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_process_streams_output(use_remote: bool) -> Result<()> {
assert_exec_process_streams_output(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_process_write_then_read(use_remote: bool) -> Result<()> {
assert_exec_process_write_then_read(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_process_preserves_queued_events_before_subscribe(use_remote: bool) -> Result<()> {
assert_exec_process_preserves_queued_events_before_subscribe(use_remote).await
}

View File

@@ -1,16 +0,0 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "instructions",
crate_name = "codex_instructions",
compile_data = glob(
include = ["**"],
exclude = [
"BUILD.bazel",
"Cargo.toml",
],
allow_empty = True,
) + [
"//codex-rs:node-version.txt",
],
)

View File

@@ -1,20 +0,0 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-instructions"
version.workspace = true
[lib]
doctest = false
name = "codex_instructions"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -1,61 +0,0 @@
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
pub const AGENTS_MD_START_MARKER: &str = "# AGENTS.md instructions for ";
pub const AGENTS_MD_END_MARKER: &str = "</INSTRUCTIONS>";
pub const SKILL_OPEN_TAG: &str = "<skill>";
pub const SKILL_CLOSE_TAG: &str = "</skill>";
#[derive(Clone, Copy)]
pub struct ContextualUserFragmentDefinition {
start_marker: &'static str,
end_marker: &'static str,
}
impl ContextualUserFragmentDefinition {
pub const fn new(start_marker: &'static str, end_marker: &'static str) -> Self {
Self {
start_marker,
end_marker,
}
}
pub fn matches_text(&self, text: &str) -> bool {
let trimmed = text.trim_start();
let starts_with_marker = trimmed
.get(..self.start_marker.len())
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.start_marker));
let trimmed = trimmed.trim_end();
let ends_with_marker = trimmed
.get(trimmed.len().saturating_sub(self.end_marker.len())..)
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.end_marker));
starts_with_marker && ends_with_marker
}
pub const fn start_marker(&self) -> &'static str {
self.start_marker
}
pub const fn end_marker(&self) -> &'static str {
self.end_marker
}
pub fn wrap(&self, body: String) -> String {
format!("{}\n{}\n{}", self.start_marker, body, self.end_marker)
}
pub fn into_message(self, text: String) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text }],
end_turn: None,
phase: None,
}
}
}
pub const AGENTS_MD_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(AGENTS_MD_START_MARKER, AGENTS_MD_END_MARKER);
pub const SKILL_FRAGMENT: ContextualUserFragmentDefinition =
ContextualUserFragmentDefinition::new(SKILL_OPEN_TAG, SKILL_CLOSE_TAG);

View File

@@ -1,15 +0,0 @@
//! User and skill instruction payloads and contextual user fragment markers for Codex prompts.
mod fragment;
mod user_instructions;
pub use fragment::AGENTS_MD_END_MARKER;
pub use fragment::AGENTS_MD_FRAGMENT;
pub use fragment::AGENTS_MD_START_MARKER;
pub use fragment::ContextualUserFragmentDefinition;
pub use fragment::SKILL_CLOSE_TAG;
pub use fragment::SKILL_FRAGMENT;
pub use fragment::SKILL_OPEN_TAG;
pub use user_instructions::SkillInstructions;
pub use user_instructions::USER_INSTRUCTIONS_PREFIX;
pub use user_instructions::UserInstructions;

View File

@@ -1,5 +1,6 @@
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";

View File

@@ -26,6 +26,15 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py <plugin-nam
python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin --with-marketplace
```
For a home-local plugin, treat `<home>` as the root and use:
```bash
python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin \
--path ~/plugins \
--marketplace-path ~/.agents/plugins/marketplace.json \
--with-marketplace
```
4. Generate/adjust optional companion folders as needed:
```bash
@@ -37,6 +46,7 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin -
## What this skill creates
- If the user has not made the plugin location explicit, ask whether they want a repo-local plugin or a home-local plugin before generating marketplace entries.
- Creates plugin root at `/<parent-plugin-directory>/<plugin-name>/`.
- Always creates `/<parent-plugin-directory>/<plugin-name>/.codex-plugin/plugin.json`.
- Fills the manifest with the full schema shape, placeholder values, and the complete `interface` section.
@@ -58,6 +68,8 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin -
## Marketplace workflow
- `marketplace.json` always lives at `<repo-root>/.agents/plugins/marketplace.json`.
- For a home-local plugin, use the same convention with `<home>` as the root:
`~/.agents/plugins/marketplace.json` plus `./plugins/<plugin-name>`.
- Marketplace root metadata supports top-level `name` plus optional `interface.displayName`.
- Treat plugin order in `plugins[]` as render order in Codex. Append new entries unless a user explicitly asks to reorder the list.
- `displayName` belongs inside the marketplace `interface` object, not individual `plugins[]` entries.

View File

@@ -115,8 +115,10 @@
"source": "local",
"path": "./plugins/linear"
},
"installPolicy": "AVAILABLE",
"authPolicy": "ON_INSTALL",
"policy": {
"installation": "AVAILABLE",
"authentication": "ON_INSTALL"
},
"category": "Productivity"
}
]
@@ -142,7 +144,9 @@
- `source` (`string`): Use `local` for this repo workflow.
- `path` (`string`): Relative plugin path based on the marketplace root.
- Repo plugin: `./plugins/<plugin-name>`
- Local plugin in `~/.agents/plugins/marketplace.json`: `./.codex/plugins/<plugin-name>`
- Local plugin in `~/.agents/plugins/marketplace.json`: `./plugins/<plugin-name>`
- The same relative path convention is used for both repo-rooted and home-rooted marketplaces.
- Example: with `~/.agents/plugins/marketplace.json`, `./plugins/<plugin-name>` resolves to `~/plugins/<plugin-name>`.
- `policy` (`object`): Marketplace policy block. Always include it.
- `installation` (`string`): Availability policy.
- Allowed values: `NOT_AVAILABLE`, `AVAILABLE`, `INSTALLED_BY_DEFAULT`

View File

@@ -191,7 +191,10 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--path",
default=str(DEFAULT_PLUGIN_PARENT),
help="Parent directory for plugin creation (defaults to <cwd>/plugins)",
help=(
"Parent directory for plugin creation (defaults to <cwd>/plugins). "
"When using a home-rooted marketplace, use <home>/plugins."
),
)
parser.add_argument("--with-skills", action="store_true", help="Create skills/ directory")
parser.add_argument("--with-hooks", action="store_true", help="Create hooks/ directory")
@@ -202,12 +205,19 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--with-marketplace",
action="store_true",
help="Create or update <cwd>/.agents/plugins/marketplace.json",
help=(
"Create or update <cwd>/.agents/plugins/marketplace.json. "
"Marketplace entries always point to ./plugins/<plugin-name> relative to the "
"marketplace root."
),
)
parser.add_argument(
"--marketplace-path",
default=str(DEFAULT_MARKETPLACE_PATH),
help="Path to marketplace.json (defaults to <cwd>/.agents/plugins/marketplace.json)",
help=(
"Path to marketplace.json (defaults to <cwd>/.agents/plugins/marketplace.json). "
"For a home-rooted marketplace, use <home>/.agents/plugins/marketplace.json."
),
)
parser.add_argument(
"--install-policy",