mirror of
https://github.com/openai/codex.git
synced 2026-03-25 17:46:50 +03:00
Compare commits
2 Commits
jif/exec-s
...
stack/util
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c76abed20d | ||
|
|
2a63e0e698 |
21
codex-rs/Cargo.lock
generated
21
codex-rs/Cargo.lock
generated
@@ -1883,6 +1883,7 @@ dependencies = [
|
||||
"codex-features",
|
||||
"codex-git-utils",
|
||||
"codex-hooks",
|
||||
"codex-instructions",
|
||||
"codex-login",
|
||||
"codex-network-proxy",
|
||||
"codex-otel",
|
||||
@@ -1904,6 +1905,7 @@ dependencies = [
|
||||
"codex-utils-image",
|
||||
"codex-utils-output-truncation",
|
||||
"codex-utils-path",
|
||||
"codex-utils-plugins",
|
||||
"codex-utils-pty",
|
||||
"codex-utils-readiness",
|
||||
"codex-utils-stream-parser",
|
||||
@@ -2030,7 +2032,6 @@ name = "codex-exec-server"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"clap",
|
||||
@@ -2175,6 +2176,15 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-instructions"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-protocol",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-keyring-store"
|
||||
version = "0.0.0"
|
||||
@@ -2938,6 +2948,15 @@ dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-utils-plugins"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-utils-pty"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -25,6 +25,7 @@ members = [
|
||||
"skills",
|
||||
"core",
|
||||
"hooks",
|
||||
"instructions",
|
||||
"secrets",
|
||||
"exec",
|
||||
"exec-server",
|
||||
@@ -68,6 +69,7 @@ members = [
|
||||
"utils/oss",
|
||||
"utils/output-truncation",
|
||||
"utils/path-utils",
|
||||
"utils/plugins",
|
||||
"utils/fuzzy-match",
|
||||
"utils/stream-parser",
|
||||
"codex-client",
|
||||
@@ -122,6 +124,7 @@ codex-features = { path = "features" }
|
||||
codex-file-search = { path = "file-search" }
|
||||
codex-git-utils = { path = "git-utils" }
|
||||
codex-hooks = { path = "hooks" }
|
||||
codex-instructions = { path = "instructions" }
|
||||
codex-keyring-store = { path = "keyring-store" }
|
||||
codex-linux-sandbox = { path = "linux-sandbox" }
|
||||
codex-lmstudio = { path = "lmstudio" }
|
||||
@@ -160,6 +163,7 @@ codex-utils-json-to-toml = { path = "utils/json-to-toml" }
|
||||
codex-utils-oss = { path = "utils/oss" }
|
||||
codex-utils-output-truncation = { path = "utils/output-truncation" }
|
||||
codex-utils-path = { path = "utils/path-utils" }
|
||||
codex-utils-plugins = { path = "utils/plugins" }
|
||||
codex-utils-pty = { path = "utils/pty" }
|
||||
codex-utils-readiness = { path = "utils/readiness" }
|
||||
codex-utils-rustls-provider = { path = "utils/rustls-provider" }
|
||||
|
||||
@@ -42,6 +42,7 @@ codex-skills = { workspace = true }
|
||||
codex-execpolicy = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-hooks = { workspace = true }
|
||||
codex-instructions = { workspace = true }
|
||||
codex-network-proxy = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-artifacts = { workspace = true }
|
||||
@@ -57,6 +58,7 @@ codex-utils-image = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
codex-utils-output-truncation = { workspace = true }
|
||||
codex-utils-path = { workspace = true }
|
||||
codex-utils-plugins = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
codex-utils-readiness = { workspace = true }
|
||||
codex-secrets = { workspace = true }
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
use codex_instructions::AGENTS_MD_FRAGMENT;
|
||||
use codex_instructions::ContextualUserFragmentDefinition;
|
||||
use codex_instructions::SKILL_FRAGMENT;
|
||||
use codex_protocol::items::HookPromptItem;
|
||||
use codex_protocol::items::parse_hook_prompt_fragment;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_CLOSE_TAG;
|
||||
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
|
||||
|
||||
pub(crate) const AGENTS_MD_START_MARKER: &str = "# AGENTS.md instructions for ";
|
||||
pub(crate) const AGENTS_MD_END_MARKER: &str = "</INSTRUCTIONS>";
|
||||
pub(crate) const SKILL_OPEN_TAG: &str = "<skill>";
|
||||
pub(crate) const SKILL_CLOSE_TAG: &str = "</skill>";
|
||||
pub(crate) const USER_SHELL_COMMAND_OPEN_TAG: &str = "<user_shell_command>";
|
||||
pub(crate) const USER_SHELL_COMMAND_CLOSE_TAG: &str = "</user_shell_command>";
|
||||
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
|
||||
@@ -16,64 +14,11 @@ pub(crate) const TURN_ABORTED_CLOSE_TAG: &str = "</turn_aborted>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_OPEN_TAG: &str = "<subagent_notification>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_CLOSE_TAG: &str = "</subagent_notification>";
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) struct ContextualUserFragmentDefinition {
|
||||
start_marker: &'static str,
|
||||
end_marker: &'static str,
|
||||
}
|
||||
|
||||
impl ContextualUserFragmentDefinition {
|
||||
pub(crate) const fn new(start_marker: &'static str, end_marker: &'static str) -> Self {
|
||||
Self {
|
||||
start_marker,
|
||||
end_marker,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn matches_text(&self, text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let starts_with_marker = trimmed
|
||||
.get(..self.start_marker.len())
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.start_marker));
|
||||
let trimmed = trimmed.trim_end();
|
||||
let ends_with_marker = trimmed
|
||||
.get(trimmed.len().saturating_sub(self.end_marker.len())..)
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.end_marker));
|
||||
starts_with_marker && ends_with_marker
|
||||
}
|
||||
|
||||
pub(crate) const fn start_marker(&self) -> &'static str {
|
||||
self.start_marker
|
||||
}
|
||||
|
||||
pub(crate) const fn end_marker(&self) -> &'static str {
|
||||
self.end_marker
|
||||
}
|
||||
|
||||
pub(crate) fn wrap(&self, body: String) -> String {
|
||||
format!("{}\n{}\n{}", self.start_marker, body, self.end_marker)
|
||||
}
|
||||
|
||||
pub(crate) fn into_message(self, text: String) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text }],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const AGENTS_MD_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(AGENTS_MD_START_MARKER, AGENTS_MD_END_MARKER);
|
||||
pub(crate) const ENVIRONMENT_CONTEXT_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(
|
||||
ENVIRONMENT_CONTEXT_OPEN_TAG,
|
||||
ENVIRONMENT_CONTEXT_CLOSE_TAG,
|
||||
);
|
||||
pub(crate) const SKILL_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(SKILL_OPEN_TAG, SKILL_CLOSE_TAG);
|
||||
pub(crate) const USER_SHELL_COMMAND_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(
|
||||
USER_SHELL_COMMAND_OPEN_TAG,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
use codex_protocol::items::HookPromptFragment;
|
||||
use codex_protocol::items::build_hook_prompt_message;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
#[test]
|
||||
fn detects_environment_context_fragment() {
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
mod user_instructions;
|
||||
|
||||
pub(crate) use user_instructions::SkillInstructions;
|
||||
pub use user_instructions::USER_INSTRUCTIONS_PREFIX;
|
||||
pub(crate) use user_instructions::UserInstructions;
|
||||
pub(crate) use codex_instructions::SkillInstructions;
|
||||
pub use codex_instructions::USER_INSTRUCTIONS_PREFIX;
|
||||
pub(crate) use codex_instructions::UserInstructions;
|
||||
|
||||
@@ -1,4 +1,2 @@
|
||||
// Default plaintext sigil for tools.
|
||||
pub const TOOL_MENTION_SIGIL: char = '$';
|
||||
// Plugins use `@` in linked plaintext outside TUI.
|
||||
pub const PLUGIN_TEXT_MENTION_SIGIL: char = '@';
|
||||
pub use codex_utils_plugins::mention_syntax::PLUGIN_TEXT_MENTION_SIGIL;
|
||||
pub use codex_utils_plugins::mention_syntax::TOOL_MENTION_SIGIL;
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
pub(crate) use codex_utils_plugins::PLUGIN_MANIFEST_PATH;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::fs;
|
||||
use std::path::Component;
|
||||
use std::path::Path;
|
||||
|
||||
pub(crate) const PLUGIN_MANIFEST_PATH: &str = ".codex-plugin/plugin.json";
|
||||
const MAX_DEFAULT_PROMPT_COUNT: usize = 3;
|
||||
const MAX_DEFAULT_PROMPT_LEN: usize = 128;
|
||||
|
||||
|
||||
@@ -25,8 +25,6 @@ use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use async_trait::async_trait;
|
||||
use codex_features::Feature;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_otel::metrics::names::TOOL_CALL_UNIFIED_EXEC_METRIC;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use serde::Deserialize;
|
||||
use std::path::PathBuf;
|
||||
@@ -262,7 +260,6 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
});
|
||||
}
|
||||
|
||||
emit_unified_exec_tty_metric(&turn.session_telemetry, tty);
|
||||
manager
|
||||
.exec_command(
|
||||
ExecCommandRequest {
|
||||
@@ -326,14 +323,6 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_unified_exec_tty_metric(session_telemetry: &SessionTelemetry, tty: bool) {
|
||||
session_telemetry.counter(
|
||||
TOOL_CALL_UNIFIED_EXEC_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[("tty", if tty { "true" } else { "false" })],
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn get_command(
|
||||
args: &ExecCommandArgs,
|
||||
session_shell: Arc<Shell>,
|
||||
|
||||
@@ -45,12 +45,9 @@ use futures::future::BoxFuture;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Request payload used by the unified-exec runtime after approvals and
|
||||
/// sandbox preferences have been resolved for the current turn.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UnifiedExecRequest {
|
||||
pub command: Vec<String>,
|
||||
pub process_id: i32,
|
||||
pub cwd: PathBuf,
|
||||
pub env: HashMap<String, String>,
|
||||
pub explicit_env_overrides: HashMap<String, String>,
|
||||
@@ -64,8 +61,6 @@ pub struct UnifiedExecRequest {
|
||||
pub exec_approval_requirement: ExecApprovalRequirement,
|
||||
}
|
||||
|
||||
/// Cache key for approval decisions that can be reused across equivalent
|
||||
/// unified-exec launches.
|
||||
#[derive(serde::Serialize, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct UnifiedExecApprovalKey {
|
||||
pub command: Vec<String>,
|
||||
@@ -75,15 +70,12 @@ pub struct UnifiedExecApprovalKey {
|
||||
pub additional_permissions: Option<PermissionProfile>,
|
||||
}
|
||||
|
||||
/// Runtime adapter that keeps policy and sandbox orchestration on the
|
||||
/// unified-exec side while delegating process startup to the manager.
|
||||
pub struct UnifiedExecRuntime<'a> {
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
shell_mode: UnifiedExecShellMode,
|
||||
}
|
||||
|
||||
impl<'a> UnifiedExecRuntime<'a> {
|
||||
/// Creates a runtime bound to the shared unified-exec process manager.
|
||||
pub fn new(manager: &'a UnifiedExecProcessManager, shell_mode: UnifiedExecShellMode) -> Self {
|
||||
Self {
|
||||
manager,
|
||||
@@ -240,24 +232,12 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.await?
|
||||
{
|
||||
Some(prepared) => {
|
||||
if ctx
|
||||
.turn
|
||||
.environment
|
||||
.experimental_exec_server_url()
|
||||
.is_some()
|
||||
{
|
||||
return Err(ToolError::Rejected(
|
||||
"unified_exec zsh-fork is not supported when experimental_exec_server_url is configured".to_string(),
|
||||
));
|
||||
}
|
||||
return self
|
||||
.manager
|
||||
.open_session_with_exec_env(
|
||||
req.process_id,
|
||||
&prepared.exec_request,
|
||||
req.tty,
|
||||
prepared.spawn_lifecycle,
|
||||
ctx.turn.environment.as_ref(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
@@ -288,13 +268,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.env_for(command, options, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
self.manager
|
||||
.open_session_with_exec_env(
|
||||
req.process_id,
|
||||
&exec_env,
|
||||
req.tty,
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
ctx.turn.environment.as_ref(),
|
||||
)
|
||||
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||
|
||||
@@ -20,7 +20,6 @@ use crate::protocol::ExecCommandSource;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::tools::events::ToolEmitter;
|
||||
use crate::tools::events::ToolEventCtx;
|
||||
use crate::tools::events::ToolEventFailure;
|
||||
use crate::tools::events::ToolEventStage;
|
||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
|
||||
@@ -122,36 +121,21 @@ pub(crate) fn spawn_exit_watcher(
|
||||
exit_token.cancelled().await;
|
||||
output_drained.notified().await;
|
||||
|
||||
let exit_code = process.exit_code().unwrap_or(-1);
|
||||
let duration = Instant::now().saturating_duration_since(started_at);
|
||||
if let Some(message) = process.failure_message() {
|
||||
emit_failed_exec_end_for_unified_exec(
|
||||
session_ref,
|
||||
turn_ref,
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
Some(process_id.to_string()),
|
||||
transcript,
|
||||
message,
|
||||
duration,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let exit_code = process.exit_code().unwrap_or(-1);
|
||||
emit_exec_end_for_unified_exec(
|
||||
session_ref,
|
||||
turn_ref,
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
Some(process_id.to_string()),
|
||||
transcript,
|
||||
String::new(),
|
||||
exit_code,
|
||||
duration,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
emit_exec_end_for_unified_exec(
|
||||
session_ref,
|
||||
turn_ref,
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
Some(process_id.to_string()),
|
||||
transcript,
|
||||
String::new(),
|
||||
exit_code,
|
||||
duration,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -229,52 +213,6 @@ pub(crate) async fn emit_exec_end_for_unified_exec(
|
||||
.await;
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn emit_failed_exec_end_for_unified_exec(
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
process_id: Option<String>,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
message: String,
|
||||
duration: Duration,
|
||||
) {
|
||||
let stdout = resolve_aggregated_output(&transcript, String::new()).await;
|
||||
let aggregated_output = if stdout.is_empty() {
|
||||
message.clone()
|
||||
} else {
|
||||
format!("{stdout}\n{message}")
|
||||
};
|
||||
let output = ExecToolCallOutput {
|
||||
exit_code: -1,
|
||||
stdout: StreamOutput::new(stdout),
|
||||
stderr: StreamOutput::new(message),
|
||||
aggregated_output: StreamOutput::new(aggregated_output),
|
||||
duration,
|
||||
timed_out: false,
|
||||
};
|
||||
let event_ctx = ToolEventCtx::new(
|
||||
session_ref.as_ref(),
|
||||
turn_ref.as_ref(),
|
||||
&call_id,
|
||||
/*turn_diff_tracker*/ None,
|
||||
);
|
||||
let emitter = ToolEmitter::unified_exec(
|
||||
&command,
|
||||
cwd,
|
||||
ExecCommandSource::UnifiedExecStartup,
|
||||
process_id,
|
||||
);
|
||||
emitter
|
||||
.emit(
|
||||
event_ctx,
|
||||
ToolEventStage::Failure(ToolEventFailure::Output(output)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn split_valid_utf8_prefix(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
|
||||
split_valid_utf8_prefix_with_max(buffer, UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES)
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ use thiserror::Error;
|
||||
pub(crate) enum UnifiedExecError {
|
||||
#[error("Failed to create unified exec process: {message}")]
|
||||
CreateProcess { message: String },
|
||||
#[error("Unified exec process failed: {message}")]
|
||||
ProcessFailed { message: String },
|
||||
// The model is trained on `session_id`, but internally we track a `process_id`.
|
||||
#[error("Unknown process id {process_id}")]
|
||||
UnknownProcessId { process_id: i32 },
|
||||
@@ -30,10 +28,6 @@ impl UnifiedExecError {
|
||||
Self::CreateProcess { message }
|
||||
}
|
||||
|
||||
pub(crate) fn process_failed(message: String) -> Self {
|
||||
Self::ProcessFailed { message }
|
||||
}
|
||||
|
||||
pub(crate) fn sandbox_denied(message: String, output: ExecToolCallOutput) -> Self {
|
||||
Self::SandboxDenied { message, output }
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
//! This keeps policy logic and user interaction centralized while the PTY/process
|
||||
//! concerns remain isolated here. The implementation is split between:
|
||||
//! - `process.rs`: PTY process lifecycle + output buffering.
|
||||
//! - `process_state.rs`: shared exit/failure state for local and remote processes.
|
||||
//! - `process_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -43,7 +42,6 @@ mod errors;
|
||||
mod head_tail_buffer;
|
||||
mod process;
|
||||
mod process_manager;
|
||||
mod process_state;
|
||||
|
||||
pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
|
||||
process_manager::set_deterministic_process_ids_for_tests(enabled);
|
||||
@@ -169,10 +167,6 @@ pub(crate) fn generate_chunk_id() -> String {
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(unix)]
|
||||
#[path = "process_tests.rs"]
|
||||
mod process_tests;
|
||||
#[cfg(test)]
|
||||
#[cfg(unix)]
|
||||
#[path = "mod_tests.rs"]
|
||||
|
||||
@@ -3,26 +3,27 @@ use super::*;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::exec::ExecCapturePolicy;
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::sandboxing::ExecRequest;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::tools::context::ExecCommandToolOutput;
|
||||
use crate::truncate::approx_token_count;
|
||||
use crate::unified_exec::ExecCommandRequest;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use codex_sandboxing::SandboxType;
|
||||
use core_test_support::get_remote_test_env;
|
||||
use core_test_support::skip_if_sandbox;
|
||||
use core_test_support::test_codex::test_env as remote_test_env;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
|
||||
async fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let (session, mut turn) = make_session_and_context().await;
|
||||
turn.approval_policy
|
||||
.set(AskForApproval::Never)
|
||||
.expect("test setup should allow updating approval policy");
|
||||
turn.sandbox_policy
|
||||
.set(SandboxPolicy::DangerFullAccess)
|
||||
.expect("test setup should allow updating sandbox policy");
|
||||
turn.file_system_sandbox_policy =
|
||||
codex_protocol::permissions::FileSystemSandboxPolicy::from(turn.sandbox_policy.get());
|
||||
turn.network_sandbox_policy =
|
||||
codex_protocol::permissions::NetworkSandboxPolicy::from(turn.sandbox_policy.get());
|
||||
(Arc::new(session), Arc::new(turn))
|
||||
}
|
||||
|
||||
@@ -31,139 +32,36 @@ async fn exec_command(
|
||||
turn: &Arc<TurnContext>,
|
||||
cmd: &str,
|
||||
yield_time_ms: u64,
|
||||
workdir: Option<PathBuf>,
|
||||
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
|
||||
exec_command_with_tty(session, turn, cmd, yield_time_ms, workdir, true).await
|
||||
}
|
||||
|
||||
fn shell_env() -> HashMap<String, String> {
|
||||
std::env::vars().collect()
|
||||
}
|
||||
|
||||
fn test_exec_request(
|
||||
turn: &TurnContext,
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
env: HashMap<String, String>,
|
||||
) -> ExecRequest {
|
||||
ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
env,
|
||||
network: None,
|
||||
expiration: ExecExpiration::DefaultTimeout,
|
||||
capture_policy: ExecCapturePolicy::ShellTool,
|
||||
sandbox: SandboxType::None,
|
||||
windows_sandbox_level: turn.windows_sandbox_level,
|
||||
windows_sandbox_private_desktop: false,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
sandbox_policy: turn.sandbox_policy.get().clone(),
|
||||
file_system_sandbox_policy: turn.file_system_sandbox_policy.clone(),
|
||||
network_sandbox_policy: turn.network_sandbox_policy,
|
||||
justification: None,
|
||||
arg0: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn exec_command_with_tty(
|
||||
session: &Arc<Session>,
|
||||
turn: &Arc<TurnContext>,
|
||||
cmd: &str,
|
||||
yield_time_ms: u64,
|
||||
workdir: Option<PathBuf>,
|
||||
tty: bool,
|
||||
) -> Result<ExecCommandToolOutput, UnifiedExecError> {
|
||||
let manager = &session.services.unified_exec_manager;
|
||||
let process_id = manager.allocate_process_id().await;
|
||||
let cwd = workdir.unwrap_or_else(|| turn.cwd.clone());
|
||||
let command = vec!["bash".to_string(), "-lc".to_string(), cmd.to_string()];
|
||||
let request = test_exec_request(turn, command.clone(), cwd.clone(), shell_env());
|
||||
|
||||
let process = Arc::new(
|
||||
manager
|
||||
.open_session_with_exec_env(
|
||||
process_id,
|
||||
&request,
|
||||
tty,
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
turn.environment.as_ref(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
let context =
|
||||
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
|
||||
let started_at = Instant::now();
|
||||
let process_started_alive = !process.has_exited() && process.exit_code().is_none();
|
||||
if process_started_alive {
|
||||
let entry = ProcessEntry {
|
||||
process: Arc::clone(&process),
|
||||
call_id: context.call_id.clone(),
|
||||
process_id,
|
||||
command: command.clone(),
|
||||
tty,
|
||||
network_approval_id: None,
|
||||
session: Arc::downgrade(session),
|
||||
last_used: started_at,
|
||||
};
|
||||
manager
|
||||
.process_store
|
||||
.lock()
|
||||
.await
|
||||
.processes
|
||||
.insert(process_id, entry);
|
||||
}
|
||||
let process_id = session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.allocate_process_id()
|
||||
.await;
|
||||
|
||||
let OutputHandles {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = process.output_handles();
|
||||
let deadline = started_at + Duration::from_millis(yield_time_ms);
|
||||
let collected = UnifiedExecProcessManager::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
Some(session.subscribe_out_of_band_elicitation_pause_state()),
|
||||
deadline,
|
||||
)
|
||||
.await;
|
||||
let wall_time = Instant::now().saturating_duration_since(started_at);
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let has_exited = process.has_exited();
|
||||
let exit_code = process.exit_code();
|
||||
let response_process_id = if process_started_alive && !has_exited {
|
||||
Some(process_id)
|
||||
} else {
|
||||
manager.release_process_id(process_id).await;
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ExecCommandToolOutput {
|
||||
event_call_id: context.call_id,
|
||||
chunk_id: generate_chunk_id(),
|
||||
wall_time,
|
||||
raw_output: collected,
|
||||
max_output_tokens: None,
|
||||
process_id: response_process_id,
|
||||
exit_code,
|
||||
original_token_count: Some(approx_token_count(&text)),
|
||||
session_command: Some(command),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestSpawnLifecycle {
|
||||
inherited_fds: Vec<i32>,
|
||||
}
|
||||
|
||||
impl SpawnLifecycle for TestSpawnLifecycle {
|
||||
fn inherited_fds(&self) -> Vec<i32> {
|
||||
self.inherited_fds.clone()
|
||||
}
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.exec_command(
|
||||
ExecCommandRequest {
|
||||
command: vec!["bash".to_string(), "-lc".to_string(), cmd.to_string()],
|
||||
process_id,
|
||||
yield_time_ms,
|
||||
max_output_tokens: None,
|
||||
workdir: None,
|
||||
network: None,
|
||||
tty: true,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
additional_permissions: None,
|
||||
additional_permissions_preapproved: false,
|
||||
justification: None,
|
||||
prefix_rule: None,
|
||||
},
|
||||
&context,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn write_stdin(
|
||||
@@ -223,7 +121,7 @@ async fn unified_exec_persists_across_requests() -> anyhow::Result<()> {
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
|
||||
let process_id = open_shell.process_id.expect("expected process_id");
|
||||
|
||||
write_stdin(
|
||||
@@ -255,7 +153,7 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let shell_a = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
|
||||
let shell_a = exec_command(&session, &turn, "bash -i", 2_500).await?;
|
||||
let session_a = shell_a.process_id.expect("expected process id");
|
||||
|
||||
write_stdin(
|
||||
@@ -266,14 +164,7 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_2 = exec_command(
|
||||
&session,
|
||||
&turn,
|
||||
"echo $CODEX_INTERACTIVE_SHELL_VAR",
|
||||
2_500,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let out_2 = exec_command(&session, &turn, "echo $CODEX_INTERACTIVE_SHELL_VAR", 2_500).await?;
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert!(
|
||||
out_2.process_id.is_none(),
|
||||
@@ -307,7 +198,7 @@ async fn unified_exec_timeouts() -> anyhow::Result<()> {
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
|
||||
let process_id = open_shell.process_id.expect("expected process id");
|
||||
|
||||
write_stdin(
|
||||
@@ -356,14 +247,7 @@ async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> {
|
||||
});
|
||||
|
||||
let started = tokio::time::Instant::now();
|
||||
let response = exec_command(
|
||||
&session,
|
||||
&turn,
|
||||
"sleep 1 && echo unified-exec-done",
|
||||
250,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let response = exec_command(&session, &turn, "sleep 1 && echo unified-exec-done", 250).await?;
|
||||
|
||||
assert!(
|
||||
started.elapsed() >= Duration::from_secs(2),
|
||||
@@ -386,7 +270,7 @@ async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> {
|
||||
async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let result = exec_command(&session, &turn, "echo codex", 120_000, None).await?;
|
||||
let result = exec_command(&session, &turn, "echo codex", 120_000).await?;
|
||||
|
||||
assert!(result.process_id.is_some());
|
||||
assert!(result.truncated_output().contains("codex"));
|
||||
@@ -398,7 +282,7 @@ async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
|
||||
#[ignore] // Ignored while we have a better way to test this.
|
||||
async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> {
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
let result = exec_command(&session, &turn, "echo codex", 2_500, None).await?;
|
||||
let result = exec_command(&session, &turn, "echo codex", 2_500).await?;
|
||||
|
||||
assert!(
|
||||
result.process_id.is_some(),
|
||||
@@ -426,7 +310,7 @@ async fn reusing_completed_process_returns_unknown_process() -> anyhow::Result<(
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500, None).await?;
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
|
||||
let process_id = open_shell.process_id.expect("expected process id");
|
||||
|
||||
write_stdin(&session, process_id, "exit\n", 2_500).await?;
|
||||
@@ -457,120 +341,3 @@ async fn reusing_completed_process_returns_unknown_process() -> anyhow::Result<(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn completed_pipe_commands_preserve_exit_code() -> anyhow::Result<()> {
|
||||
let (_, turn) = make_session_and_context().await;
|
||||
let request = test_exec_request(
|
||||
&turn,
|
||||
vec!["bash".to_string(), "-lc".to_string(), "exit 17".to_string()],
|
||||
PathBuf::from("/tmp"),
|
||||
shell_env(),
|
||||
);
|
||||
|
||||
let environment = codex_exec_server::Environment::default();
|
||||
let process = UnifiedExecProcessManager::default()
|
||||
.open_session_with_exec_env(
|
||||
1234,
|
||||
&request,
|
||||
false,
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
&environment,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(process.has_exited());
|
||||
assert_eq!(process.exit_code(), Some(17));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_uses_remote_exec_server_when_configured() -> anyhow::Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
let Some(_remote_env) = get_remote_test_env() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let remote_test_env = remote_test_env().await?;
|
||||
let (_, turn) = make_session_and_context().await;
|
||||
let request = test_exec_request(
|
||||
&turn,
|
||||
vec!["bash".to_string(), "-i".to_string()],
|
||||
PathBuf::from("/tmp"),
|
||||
shell_env(),
|
||||
);
|
||||
|
||||
let manager = UnifiedExecProcessManager::default();
|
||||
let process = manager
|
||||
.open_session_with_exec_env(
|
||||
1234,
|
||||
&request,
|
||||
true,
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
remote_test_env.environment(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
process.write(b"printf 'remote-unified-exec\\n'\n").await?;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
let crate::unified_exec::process::OutputHandles {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = process.output_handles();
|
||||
let collected = UnifiedExecProcessManager::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
None,
|
||||
Instant::now() + Duration::from_millis(2_500),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(String::from_utf8_lossy(&collected).contains("remote-unified-exec"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_exec_server_rejects_inherited_fd_launches() -> anyhow::Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
let Some(_remote_env) = get_remote_test_env() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let remote_test_env = remote_test_env().await?;
|
||||
let (_, mut turn) = make_session_and_context().await;
|
||||
turn.environment = Arc::new(remote_test_env.environment().clone());
|
||||
|
||||
let request = test_exec_request(
|
||||
&turn,
|
||||
vec!["bash".to_string(), "-lc".to_string(), "echo ok".to_string()],
|
||||
PathBuf::from("/tmp"),
|
||||
shell_env(),
|
||||
);
|
||||
|
||||
let manager = UnifiedExecProcessManager::default();
|
||||
let err = manager
|
||||
.open_session_with_exec_env(
|
||||
1234,
|
||||
&request,
|
||||
true,
|
||||
Box::new(TestSpawnLifecycle {
|
||||
inherited_fds: vec![42],
|
||||
}),
|
||||
turn.environment.as_ref(),
|
||||
)
|
||||
.await
|
||||
.expect_err("expected inherited fd rejection");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"Failed to create unified exec process: remote exec-server does not support inherited file descriptors"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::sync::atomic::Ordering;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -15,12 +15,8 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec::is_likely_sandbox_denied;
|
||||
use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ExecSessionEvent;
|
||||
use codex_exec_server::StartedExecProcess;
|
||||
use codex_exec_server::WriteStatus;
|
||||
use codex_protocol::protocol::TruncationPolicy;
|
||||
use codex_sandboxing::SandboxType;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::formatted_truncate_text;
|
||||
use codex_utils_pty::ExecCommandSession;
|
||||
use codex_utils_pty::SpawnedPty;
|
||||
@@ -28,7 +24,6 @@ use codex_utils_pty::SpawnedPty;
|
||||
use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
|
||||
use super::UnifiedExecError;
|
||||
use super::head_tail_buffer::HeadTailBuffer;
|
||||
use super::process_state::ProcessState;
|
||||
|
||||
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
|
||||
/// Returns file descriptors that must stay open across the child `exec()`.
|
||||
@@ -46,13 +41,11 @@ pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
|
||||
pub(crate) type SpawnLifecycleHandle = Box<dyn SpawnLifecycle>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// Spawn lifecycle that performs no extra setup around process launch.
|
||||
pub(crate) struct NoopSpawnLifecycle;
|
||||
|
||||
impl SpawnLifecycle for NoopSpawnLifecycle {}
|
||||
|
||||
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
|
||||
/// Shared output state exposed to polling and streaming consumers.
|
||||
pub(crate) struct OutputHandles {
|
||||
pub(crate) output_buffer: OutputBuffer,
|
||||
pub(crate) output_notify: Arc<Notify>,
|
||||
@@ -61,44 +54,27 @@ pub(crate) struct OutputHandles {
|
||||
pub(crate) cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
/// Transport-specific process handle used by unified exec.
|
||||
enum ProcessHandle {
|
||||
Local(Box<ExecCommandSession>),
|
||||
Remote(Arc<dyn ExecProcess>),
|
||||
}
|
||||
|
||||
/// Unified wrapper over local PTY sessions and exec-server-backed processes.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnifiedExecProcess {
|
||||
process_handle: ProcessHandle,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
process_handle: ExecCommandSession,
|
||||
output_rx: broadcast::Receiver<Vec<u8>>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
output_closed_notify: Arc<Notify>,
|
||||
cancellation_token: CancellationToken,
|
||||
output_drained: Arc<Notify>,
|
||||
state_tx: watch::Sender<ProcessState>,
|
||||
state_rx: watch::Receiver<ProcessState>,
|
||||
output_task: Option<JoinHandle<()>>,
|
||||
output_task: JoinHandle<()>,
|
||||
sandbox_type: SandboxType,
|
||||
_spawn_lifecycle: Option<SpawnLifecycleHandle>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for UnifiedExecProcess {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("UnifiedExecProcess")
|
||||
.field("has_exited", &self.has_exited())
|
||||
.field("exit_code", &self.exit_code())
|
||||
.field("sandbox_type", &self.sandbox_type)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
_spawn_lifecycle: SpawnLifecycleHandle,
|
||||
}
|
||||
|
||||
impl UnifiedExecProcess {
|
||||
fn new(
|
||||
process_handle: ProcessHandle,
|
||||
pub(super) fn new(
|
||||
process_handle: ExecCommandSession,
|
||||
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
sandbox_type: SandboxType,
|
||||
spawn_lifecycle: Option<SpawnLifecycleHandle>,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Self {
|
||||
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
@@ -106,50 +82,48 @@ impl UnifiedExecProcess {
|
||||
let output_closed_notify = Arc::new(Notify::new());
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let output_drained = Arc::new(Notify::new());
|
||||
let (output_tx, _) = broadcast::channel(64);
|
||||
let (state_tx, state_rx) = watch::channel(ProcessState::default());
|
||||
let mut receiver = initial_output_rx;
|
||||
let output_rx = receiver.resubscribe();
|
||||
let buffer_clone = Arc::clone(&output_buffer);
|
||||
let notify_clone = Arc::clone(&output_notify);
|
||||
let output_closed_clone = Arc::clone(&output_closed);
|
||||
let output_closed_notify_clone = Arc::clone(&output_closed_notify);
|
||||
let output_task = tokio::spawn(async move {
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Ok(chunk) => {
|
||||
let mut guard = buffer_clone.lock().await;
|
||||
guard.push_chunk(chunk);
|
||||
drop(guard);
|
||||
notify_clone.notify_waiters();
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
output_closed_clone.store(true, Ordering::Release);
|
||||
output_closed_notify_clone.notify_waiters();
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
process_handle,
|
||||
output_tx,
|
||||
output_rx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
output_drained,
|
||||
state_tx,
|
||||
state_rx,
|
||||
output_task: None,
|
||||
output_task,
|
||||
sandbox_type,
|
||||
_spawn_lifecycle: spawn_lifecycle,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn write(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
|
||||
match &self.process_handle {
|
||||
ProcessHandle::Local(process_handle) => process_handle
|
||||
.writer_sender()
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map(|()| ())
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin),
|
||||
ProcessHandle::Remote(process_handle) => {
|
||||
match process_handle.write(data.to_vec()).await {
|
||||
Ok(response) => match response.status {
|
||||
WriteStatus::Accepted => Ok(()),
|
||||
WriteStatus::UnknownProcess | WriteStatus::StdinClosed => {
|
||||
let state = self.state_rx.borrow().clone();
|
||||
let _ = self.state_tx.send_replace(state.exited(state.exit_code));
|
||||
self.cancellation_token.cancel();
|
||||
Err(UnifiedExecError::WriteToStdin)
|
||||
}
|
||||
WriteStatus::Starting => Err(UnifiedExecError::WriteToStdin),
|
||||
},
|
||||
Err(err) => Err(UnifiedExecError::process_failed(err.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
|
||||
self.process_handle.writer_sender()
|
||||
}
|
||||
|
||||
pub(super) fn output_handles(&self) -> OutputHandles {
|
||||
@@ -163,7 +137,7 @@ impl UnifiedExecProcess {
|
||||
}
|
||||
|
||||
pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
|
||||
self.output_tx.subscribe()
|
||||
self.output_rx.resubscribe()
|
||||
}
|
||||
|
||||
pub(super) fn cancellation_token(&self) -> CancellationToken {
|
||||
@@ -175,39 +149,19 @@ impl UnifiedExecProcess {
|
||||
}
|
||||
|
||||
pub(super) fn has_exited(&self) -> bool {
|
||||
let state = self.state_rx.borrow().clone();
|
||||
match &self.process_handle {
|
||||
ProcessHandle::Local(process_handle) => state.has_exited || process_handle.has_exited(),
|
||||
ProcessHandle::Remote(_) => state.has_exited,
|
||||
}
|
||||
self.process_handle.has_exited()
|
||||
}
|
||||
|
||||
pub(super) fn exit_code(&self) -> Option<i32> {
|
||||
let state = self.state_rx.borrow().clone();
|
||||
match &self.process_handle {
|
||||
ProcessHandle::Local(process_handle) => {
|
||||
state.exit_code.or_else(|| process_handle.exit_code())
|
||||
}
|
||||
ProcessHandle::Remote(_) => state.exit_code,
|
||||
}
|
||||
self.process_handle.exit_code()
|
||||
}
|
||||
|
||||
pub(super) fn terminate(&self) {
|
||||
self.output_closed.store(true, Ordering::Release);
|
||||
self.output_closed_notify.notify_waiters();
|
||||
match &self.process_handle {
|
||||
ProcessHandle::Local(process_handle) => process_handle.terminate(),
|
||||
ProcessHandle::Remote(process_handle) => {
|
||||
let process_handle = Arc::clone(process_handle);
|
||||
tokio::spawn(async move {
|
||||
let _ = process_handle.terminate().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
self.process_handle.terminate();
|
||||
self.cancellation_token.cancel();
|
||||
if let Some(output_task) = &self.output_task {
|
||||
output_task.abort();
|
||||
}
|
||||
self.output_task.abort();
|
||||
}
|
||||
|
||||
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
|
||||
@@ -219,10 +173,6 @@ impl UnifiedExecProcess {
|
||||
self.sandbox_type
|
||||
}
|
||||
|
||||
pub(super) fn failure_message(&self) -> Option<String> {
|
||||
self.state_rx.borrow().failure_message.clone()
|
||||
}
|
||||
|
||||
pub(super) async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
|
||||
let _ =
|
||||
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
|
||||
@@ -282,49 +232,29 @@ impl UnifiedExecProcess {
|
||||
mut exit_rx,
|
||||
} = spawned;
|
||||
let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx);
|
||||
let mut managed = Self::new(
|
||||
ProcessHandle::Local(Box::new(process_handle)),
|
||||
sandbox_type,
|
||||
Some(spawn_lifecycle),
|
||||
);
|
||||
managed.output_task = Some(Self::spawn_local_output_task(
|
||||
output_rx,
|
||||
Arc::clone(&managed.output_buffer),
|
||||
Arc::clone(&managed.output_notify),
|
||||
Arc::clone(&managed.output_closed),
|
||||
Arc::clone(&managed.output_closed_notify),
|
||||
managed.output_tx.clone(),
|
||||
));
|
||||
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
|
||||
|
||||
match exit_rx.try_recv() {
|
||||
Ok(exit_code) => {
|
||||
managed.signal_exit(Some(exit_code));
|
||||
managed.check_for_sandbox_denial().await?;
|
||||
return Ok(managed);
|
||||
}
|
||||
Err(TryRecvError::Closed) => {
|
||||
managed.signal_exit(/*exit_code*/ None);
|
||||
managed.check_for_sandbox_denial().await?;
|
||||
return Ok(managed);
|
||||
}
|
||||
Err(TryRecvError::Empty) => {}
|
||||
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));
|
||||
|
||||
if exit_ready {
|
||||
managed.signal_exit();
|
||||
managed.check_for_sandbox_denial().await?;
|
||||
return Ok(managed);
|
||||
}
|
||||
|
||||
if let Ok(exit_result) =
|
||||
tokio::time::timeout(Duration::from_millis(150), &mut exit_rx).await
|
||||
if tokio::time::timeout(Duration::from_millis(150), &mut exit_rx)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
managed.signal_exit(exit_result.ok());
|
||||
managed.signal_exit();
|
||||
managed.check_for_sandbox_denial().await?;
|
||||
return Ok(managed);
|
||||
}
|
||||
|
||||
tokio::spawn({
|
||||
let state_tx = managed.state_tx.clone();
|
||||
let cancellation_token = managed.cancellation_token.clone();
|
||||
async move {
|
||||
let exit_code = exit_rx.await.ok();
|
||||
let state = state_tx.borrow().clone();
|
||||
let _ = state_tx.send_replace(state.exited(exit_code));
|
||||
let _ = exit_rx.await;
|
||||
cancellation_token.cancel();
|
||||
}
|
||||
});
|
||||
@@ -332,115 +262,11 @@ impl UnifiedExecProcess {
|
||||
Ok(managed)
|
||||
}
|
||||
|
||||
fn spawn_remote_output_task(
|
||||
started: StartedExecProcess,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
output_closed_notify: Arc<Notify>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
state_tx: watch::Sender<ProcessState>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> JoinHandle<()> {
|
||||
let StartedExecProcess { mut events, .. } = started;
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match events.recv().await {
|
||||
Some(ExecSessionEvent::Output { chunk, .. }) => {
|
||||
let mut guard = output_buffer.lock().await;
|
||||
guard.push_chunk(chunk.clone());
|
||||
drop(guard);
|
||||
let _ = output_tx.send(chunk);
|
||||
output_notify.notify_waiters();
|
||||
}
|
||||
Some(ExecSessionEvent::Exited { exit_code, .. }) => {
|
||||
let state = state_tx.borrow().clone();
|
||||
let _ = state_tx.send_replace(state.exited(Some(exit_code)));
|
||||
cancellation_token.cancel();
|
||||
}
|
||||
Some(ExecSessionEvent::Closed { .. }) => {
|
||||
let state = state_tx.borrow().clone();
|
||||
let _ = state_tx.send_replace(state.exited(state.exit_code));
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
Some(ExecSessionEvent::Failed { message }) => {
|
||||
let state = state_tx.borrow().clone();
|
||||
let _ = state_tx.send_replace(state.failed(message));
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
let state = state_tx.borrow().clone();
|
||||
let _ = state_tx.send_replace(state.exited(state.exit_code));
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_local_output_task(
|
||||
mut receiver: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
output_closed_notify: Arc<Notify>,
|
||||
output_tx: broadcast::Sender<Vec<u8>>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Ok(chunk) => {
|
||||
let mut guard = buffer.lock().await;
|
||||
guard.push_chunk(chunk.clone());
|
||||
drop(guard);
|
||||
let _ = output_tx.send(chunk);
|
||||
output_notify.notify_waiters();
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn signal_exit(&self, exit_code: Option<i32>) {
|
||||
let state = self.state_rx.borrow().clone();
|
||||
let _ = self.state_tx.send_replace(state.exited(exit_code));
|
||||
fn signal_exit(&self) {
|
||||
self.cancellation_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(StartedExecProcess, SandboxType)> for UnifiedExecProcess {
|
||||
fn from((started, sandbox_type): (StartedExecProcess, SandboxType)) -> Self {
|
||||
let process_handle = ProcessHandle::Remote(Arc::clone(&started.process));
|
||||
let mut managed = Self::new(process_handle, sandbox_type, /*spawn_lifecycle*/ None);
|
||||
managed.output_task = Some(Self::spawn_remote_output_task(
|
||||
started,
|
||||
Arc::clone(&managed.output_buffer),
|
||||
Arc::clone(&managed.output_notify),
|
||||
Arc::clone(&managed.output_closed),
|
||||
Arc::clone(&managed.output_closed_notify),
|
||||
managed.output_tx.clone(),
|
||||
managed.state_tx.clone(),
|
||||
managed.cancellation_token.clone(),
|
||||
));
|
||||
managed
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UnifiedExecProcess {
|
||||
fn drop(&mut self) {
|
||||
self.terminate();
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
@@ -39,7 +40,6 @@ use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use crate::unified_exec::WARNING_UNIFIED_EXEC_PROCESSES;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use crate::unified_exec::async_watcher::emit_exec_end_for_unified_exec;
|
||||
use crate::unified_exec::async_watcher::emit_failed_exec_end_for_unified_exec;
|
||||
use crate::unified_exec::async_watcher::spawn_exit_watcher;
|
||||
use crate::unified_exec::async_watcher::start_streaming_output;
|
||||
use crate::unified_exec::clamp_yield_time;
|
||||
@@ -89,9 +89,8 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
|
||||
env
|
||||
}
|
||||
|
||||
/// Borrowed process state prepared for a `write_stdin` or poll operation.
|
||||
struct PreparedProcessHandles {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
@@ -103,10 +102,6 @@ struct PreparedProcessHandles {
|
||||
tty: bool,
|
||||
}
|
||||
|
||||
fn exec_server_process_id(process_id: i32) -> String {
|
||||
process_id.to_string()
|
||||
}
|
||||
|
||||
impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn allocate_process_id(&self) -> i32 {
|
||||
loop {
|
||||
@@ -248,29 +243,6 @@ impl UnifiedExecProcessManager {
|
||||
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let chunk_id = generate_chunk_id();
|
||||
if let Some(message) = process.failure_message() {
|
||||
if !process_started_alive {
|
||||
emit_failed_exec_end_for_unified_exec(
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
request.command.clone(),
|
||||
cwd.clone(),
|
||||
Some(request.process_id.to_string()),
|
||||
Arc::clone(&transcript),
|
||||
message.clone(),
|
||||
wall_time,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
self.release_process_id(request.process_id).await;
|
||||
finish_deferred_network_approval(
|
||||
context.session.as_ref(),
|
||||
deferred_network_approval.take(),
|
||||
)
|
||||
.await;
|
||||
return Err(UnifiedExecError::process_failed(message));
|
||||
}
|
||||
let process_id = request.process_id;
|
||||
let (response_process_id, exit_code) = if process_started_alive {
|
||||
match self.refresh_process_state(process_id).await {
|
||||
@@ -340,7 +312,7 @@ impl UnifiedExecProcessManager {
|
||||
let process_id = request.process_id;
|
||||
|
||||
let PreparedProcessHandles {
|
||||
process,
|
||||
writer_tx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -352,31 +324,15 @@ impl UnifiedExecProcessManager {
|
||||
tty,
|
||||
..
|
||||
} = self.prepare_process_handles(process_id).await?;
|
||||
let mut status_after_write = None;
|
||||
|
||||
if !request.input.is_empty() {
|
||||
if !tty {
|
||||
return Err(UnifiedExecError::StdinClosed);
|
||||
}
|
||||
match process.write(request.input.as_bytes()).await {
|
||||
Ok(()) => {
|
||||
// Give the remote process a brief window to react so that we are
|
||||
// more likely to capture its output in the poll below.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let status = self.refresh_process_state(process_id).await;
|
||||
if matches!(status, ProcessStatus::Exited { .. }) {
|
||||
status_after_write = Some(status);
|
||||
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
|
||||
process.terminate();
|
||||
self.release_process_id(process_id).await;
|
||||
return Err(err);
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
||||
// Give the remote process a brief window to react so that we are
|
||||
// more likely to capture its output in the poll below.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
let yield_time_ms = {
|
||||
@@ -406,20 +362,12 @@ impl UnifiedExecProcessManager {
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let original_token_count = approx_token_count(&text);
|
||||
let chunk_id = generate_chunk_id();
|
||||
if let Some(message) = process.failure_message() {
|
||||
self.release_process_id(process_id).await;
|
||||
return Err(UnifiedExecError::process_failed(message));
|
||||
}
|
||||
|
||||
// After polling, refresh_process_state tells us whether the PTY is
|
||||
// still alive or has exited and been removed from the store; we thread
|
||||
// that through so the handler can tag TerminalInteraction with an
|
||||
// appropriate process_id and exit_code.
|
||||
let status = if let Some(status) = status_after_write {
|
||||
status
|
||||
} else {
|
||||
self.refresh_process_state(process_id).await
|
||||
};
|
||||
let status = self.refresh_process_state(process_id).await;
|
||||
let (process_id, exit_code, event_call_id) = match status {
|
||||
ProcessStatus::Alive {
|
||||
exit_code,
|
||||
@@ -507,7 +455,7 @@ impl UnifiedExecProcessManager {
|
||||
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
|
||||
|
||||
Ok(PreparedProcessHandles {
|
||||
process: Arc::clone(&entry.process),
|
||||
writer_tx: entry.process.writer_sender(),
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -520,6 +468,16 @@ impl UnifiedExecProcessManager {
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_input(
|
||||
writer_tx: &mpsc::Sender<Vec<u8>>,
|
||||
data: &[u8],
|
||||
) -> Result<(), UnifiedExecError> {
|
||||
writer_tx
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn store_process(
|
||||
&self,
|
||||
@@ -581,11 +539,9 @@ impl UnifiedExecProcessManager {
|
||||
|
||||
pub(crate) async fn open_session_with_exec_env(
|
||||
&self,
|
||||
process_id: i32,
|
||||
env: &ExecRequest,
|
||||
tty: bool,
|
||||
mut spawn_lifecycle: SpawnLifecycleHandle,
|
||||
environment: &codex_exec_server::Environment,
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
let (program, args) = env
|
||||
.command
|
||||
@@ -593,28 +549,6 @@ impl UnifiedExecProcessManager {
|
||||
.ok_or(UnifiedExecError::MissingCommandLine)?;
|
||||
let inherited_fds = spawn_lifecycle.inherited_fds();
|
||||
|
||||
if environment.experimental_exec_server_url().is_some() {
|
||||
if !inherited_fds.is_empty() {
|
||||
return Err(UnifiedExecError::create_process(
|
||||
"remote exec-server does not support inherited file descriptors".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let started = environment
|
||||
.get_exec_backend()
|
||||
.start(codex_exec_server::ExecParams {
|
||||
process_id: exec_server_process_id(process_id),
|
||||
argv: env.command.clone(),
|
||||
cwd: env.cwd.clone(),
|
||||
env: env.env.clone(),
|
||||
tty,
|
||||
arg0: env.arg0.clone(),
|
||||
})
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
return Ok(UnifiedExecProcess::from((started, env.sandbox)));
|
||||
}
|
||||
|
||||
let spawn_result = if tty {
|
||||
codex_utils_pty::pty::spawn_process_with_inherited_fds(
|
||||
program,
|
||||
@@ -677,7 +611,6 @@ impl UnifiedExecProcessManager {
|
||||
.await;
|
||||
let req = UnifiedExecToolRequest {
|
||||
command: request.command.clone(),
|
||||
process_id: request.process_id,
|
||||
cwd,
|
||||
env,
|
||||
explicit_env_overrides: context.turn.shell_environment_policy.r#set.clone(),
|
||||
|
||||
@@ -34,11 +34,6 @@ fn unified_exec_env_overrides_existing_values() {
|
||||
assert_eq!(env.get("PATH"), Some(&"/usr/bin".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_server_process_id_matches_unified_exec_process_id() {
|
||||
assert_eq!(exec_server_process_id(4321), "4321");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pruning_prefers_exited_processes_outside_recently_used() {
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq)]
|
||||
pub(crate) struct ProcessState {
|
||||
pub(crate) has_exited: bool,
|
||||
pub(crate) exit_code: Option<i32>,
|
||||
pub(crate) failure_message: Option<String>,
|
||||
}
|
||||
|
||||
impl ProcessState {
|
||||
pub(crate) fn exited(&self, exit_code: Option<i32>) -> Self {
|
||||
Self {
|
||||
has_exited: true,
|
||||
exit_code,
|
||||
failure_message: self.failure_message.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn failed(&self, message: String) -> Self {
|
||||
Self {
|
||||
has_exited: true,
|
||||
exit_code: self.exit_code,
|
||||
failure_message: Some(message),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
use super::process::UnifiedExecProcess;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use async_trait::async_trait;
|
||||
use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ExecServerError;
|
||||
use codex_exec_server::ProcessId;
|
||||
use codex_exec_server::StartedExecProcess;
|
||||
use codex_exec_server::WriteResponse;
|
||||
use codex_exec_server::WriteStatus;
|
||||
use codex_sandboxing::SandboxType;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
struct MockExecProcess {
|
||||
process_id: ProcessId,
|
||||
write_response: WriteResponse,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecProcess for MockExecProcess {
|
||||
fn process_id(&self) -> &ProcessId {
|
||||
&self.process_id
|
||||
}
|
||||
|
||||
async fn write(&self, _chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
|
||||
Ok(self.write_response.clone())
|
||||
}
|
||||
|
||||
async fn terminate(&self) -> Result<(), ExecServerError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_process(write_status: WriteStatus) -> UnifiedExecProcess {
|
||||
let (_events_tx, events_rx) = mpsc::channel(1);
|
||||
let started = StartedExecProcess {
|
||||
process: Arc::new(MockExecProcess {
|
||||
process_id: "test-process".to_string().into(),
|
||||
write_response: WriteResponse {
|
||||
status: write_status,
|
||||
},
|
||||
}),
|
||||
events: events_rx,
|
||||
};
|
||||
|
||||
UnifiedExecProcess::from((started, SandboxType::None))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_write_unknown_process_marks_process_exited() {
|
||||
let process = remote_process(WriteStatus::UnknownProcess);
|
||||
|
||||
let err = process
|
||||
.write(b"hello")
|
||||
.await
|
||||
.expect_err("expected write failure");
|
||||
|
||||
assert!(matches!(err, UnifiedExecError::WriteToStdin));
|
||||
assert!(process.has_exited());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_write_closed_stdin_marks_process_exited() {
|
||||
let process = remote_process(WriteStatus::StdinClosed);
|
||||
|
||||
let err = process
|
||||
.write(b"hello")
|
||||
.await
|
||||
.expect_err("expected write failure");
|
||||
|
||||
assert!(matches!(err, UnifiedExecError::WriteToStdin));
|
||||
assert!(process.has_exited());
|
||||
}
|
||||
@@ -15,7 +15,6 @@ path = "src/bin/codex-exec-server.rs"
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arc-swap = "1.8.2"
|
||||
async-trait = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCopyResponse;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
@@ -19,25 +17,22 @@ use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_app_server_protocol::FsWriteFileResponse;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::debug;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::client_api::ExecServerClientConnectOptions;
|
||||
use crate::client_api::RemoteExecServerConnectArgs;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::process::ExecSessionEvent;
|
||||
use crate::process::SESSION_EVENT_CHANNEL_CAPACITY;
|
||||
use crate::protocol::EXEC_CLOSED_METHOD;
|
||||
use crate::process::ExecServerEvent;
|
||||
use crate::protocol::EXEC_EXITED_METHOD;
|
||||
use crate::protocol::EXEC_METHOD;
|
||||
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
use crate::protocol::EXEC_WRITE_METHOD;
|
||||
use crate::protocol::ExecClosedNotification;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecParams;
|
||||
@@ -97,15 +92,7 @@ impl RemoteExecServerConnectArgs {
|
||||
|
||||
struct Inner {
|
||||
client: RpcClient,
|
||||
// The remote transport delivers one shared notification stream for every
|
||||
// process on the connection. Keep a local process_id -> sender registry so
|
||||
// we can demux those connection-global notifications into the single
|
||||
// process-scoped event channel returned by ExecBackend::start().
|
||||
sessions: ArcSwap<HashMap<String, mpsc::Sender<ExecSessionEvent>>>,
|
||||
// ArcSwap makes reads cheap on the hot notification path, but writes still
|
||||
// need serialization so concurrent register/remove operations do not
|
||||
// overwrite each other's copy-on-write updates.
|
||||
sessions_write_lock: Mutex<()>,
|
||||
events_tx: broadcast::Sender<ExecServerEvent>,
|
||||
reader_task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
@@ -171,6 +158,10 @@ impl ExecServerClient {
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn event_receiver(&self) -> broadcast::Receiver<ExecServerEvent> {
|
||||
self.inner.events_tx.subscribe()
|
||||
}
|
||||
|
||||
pub async fn initialize(
|
||||
&self,
|
||||
options: ExecServerClientConnectOptions,
|
||||
@@ -316,35 +307,6 @@ impl ExecServerClient {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub(crate) async fn register_session(
|
||||
&self,
|
||||
process_id: &str,
|
||||
) -> Result<mpsc::Receiver<ExecSessionEvent>, ExecServerError> {
|
||||
let (events_tx, events_rx) = mpsc::channel(SESSION_EVENT_CHANNEL_CAPACITY);
|
||||
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
|
||||
let sessions = self.inner.sessions.load();
|
||||
if sessions.contains_key(process_id) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"session already registered for process {process_id}"
|
||||
)));
|
||||
}
|
||||
let mut next_sessions = sessions.as_ref().clone();
|
||||
next_sessions.insert(process_id.to_string(), events_tx);
|
||||
self.inner.sessions.store(Arc::new(next_sessions));
|
||||
Ok(events_rx)
|
||||
}
|
||||
|
||||
pub(crate) async fn unregister_session(&self, process_id: &str) {
|
||||
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
|
||||
let sessions = self.inner.sessions.load();
|
||||
if !sessions.contains_key(process_id) {
|
||||
return;
|
||||
}
|
||||
let mut next_sessions = sessions.as_ref().clone();
|
||||
next_sessions.remove(process_id);
|
||||
self.inner.sessions.store(Arc::new(next_sessions));
|
||||
}
|
||||
|
||||
async fn connect(
|
||||
connection: JsonRpcConnection,
|
||||
options: ExecServerClientConnectOptions,
|
||||
@@ -360,18 +322,13 @@ impl ExecServerClient {
|
||||
&& let Err(err) =
|
||||
handle_server_notification(&inner, notification).await
|
||||
{
|
||||
fail_all_sessions(
|
||||
&inner,
|
||||
format!("exec-server notification handling failed: {err}"),
|
||||
)
|
||||
.await;
|
||||
warn!("exec-server client closing after protocol error: {err}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
RpcClientEvent::Disconnected { reason } => {
|
||||
if let Some(inner) = weak.upgrade() {
|
||||
fail_all_sessions(&inner, disconnected_message(reason.as_deref()))
|
||||
.await;
|
||||
if let Some(reason) = reason {
|
||||
warn!("exec-server client transport disconnected: {reason}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -381,8 +338,7 @@ impl ExecServerClient {
|
||||
|
||||
Inner {
|
||||
client: rpc_client,
|
||||
sessions: ArcSwap::from_pointee(HashMap::new()),
|
||||
sessions_write_lock: Mutex::new(()),
|
||||
events_tx: broadcast::channel(256).0,
|
||||
reader_task,
|
||||
}
|
||||
});
|
||||
@@ -414,32 +370,6 @@ impl From<RpcCallError> for ExecServerError {
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnected_message(reason: Option<&str>) -> String {
|
||||
match reason {
|
||||
Some(reason) => format!("exec-server transport disconnected: {reason}"),
|
||||
None => "exec-server transport disconnected".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fail_all_sessions(inner: &Arc<Inner>, message: String) {
|
||||
let sessions = {
|
||||
let _sessions_write_guard = inner.sessions_write_lock.lock().await;
|
||||
let sessions = inner.sessions.load();
|
||||
let drained_sessions = sessions.as_ref().clone();
|
||||
inner.sessions.store(Arc::new(HashMap::new()));
|
||||
drained_sessions
|
||||
};
|
||||
|
||||
for (_, events_tx) in sessions {
|
||||
// Do not block disconnect handling behind a full bounded queue. Best
|
||||
// effort deliver a terminal failure event, then drop the sender so
|
||||
// receivers still observe EOF if the queue was already saturated.
|
||||
let _ = events_tx.try_send(ExecSessionEvent::Failed {
|
||||
message: message.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_notification(
|
||||
inner: &Arc<Inner>,
|
||||
notification: JSONRPCNotification,
|
||||
@@ -448,53 +378,12 @@ async fn handle_server_notification(
|
||||
EXEC_OUTPUT_DELTA_METHOD => {
|
||||
let params: ExecOutputDeltaNotification =
|
||||
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
|
||||
// Remote exec-server notifications are connection-global, so route
|
||||
// each event to the single local receiver that owns this process.
|
||||
let events_tx = inner.sessions.load().get(¶ms.process_id).cloned();
|
||||
if let Some(events_tx) = events_tx {
|
||||
let _ = events_tx
|
||||
.send(ExecSessionEvent::Output {
|
||||
seq: params.seq,
|
||||
stream: params.stream,
|
||||
chunk: params.chunk.into_inner(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
let _ = inner.events_tx.send(ExecServerEvent::OutputDelta(params));
|
||||
}
|
||||
EXEC_EXITED_METHOD => {
|
||||
let params: ExecExitedNotification =
|
||||
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
|
||||
let events_tx = inner.sessions.load().get(¶ms.process_id).cloned();
|
||||
if let Some(events_tx) = events_tx {
|
||||
let _ = events_tx
|
||||
.send(ExecSessionEvent::Exited {
|
||||
seq: params.seq,
|
||||
exit_code: params.exit_code,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
EXEC_CLOSED_METHOD => {
|
||||
let params: ExecClosedNotification =
|
||||
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
|
||||
let events_tx = {
|
||||
let _sessions_write_guard = inner.sessions_write_lock.lock().await;
|
||||
let sessions = inner.sessions.load();
|
||||
let events_tx = sessions.get(¶ms.process_id).cloned();
|
||||
if events_tx.is_some() {
|
||||
// Closed is the terminal lifecycle event for this process,
|
||||
// so drop the routing entry before forwarding it.
|
||||
let mut next_sessions = sessions.as_ref().clone();
|
||||
next_sessions.remove(¶ms.process_id);
|
||||
inner.sessions.store(Arc::new(next_sessions));
|
||||
}
|
||||
events_tx
|
||||
};
|
||||
if let Some(events_tx) = events_tx {
|
||||
let _ = events_tx
|
||||
.send(ExecSessionEvent::Closed { seq: params.seq })
|
||||
.await;
|
||||
}
|
||||
let _ = inner.events_tx.send(ExecServerEvent::Exited(params));
|
||||
}
|
||||
other => {
|
||||
debug!("ignoring unknown exec-server notification: {other}");
|
||||
|
||||
@@ -6,19 +6,19 @@ use crate::RemoteExecServerConnectArgs;
|
||||
use crate::file_system::ExecutorFileSystem;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::local_process::LocalProcess;
|
||||
use crate::process::ExecBackend;
|
||||
use crate::process::ExecProcess;
|
||||
use crate::remote_file_system::RemoteFileSystem;
|
||||
use crate::remote_process::RemoteProcess;
|
||||
|
||||
pub trait ExecutorEnvironment: Send + Sync {
|
||||
fn get_exec_backend(&self) -> Arc<dyn ExecBackend>;
|
||||
fn get_executor(&self) -> Arc<dyn ExecProcess>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Environment {
|
||||
experimental_exec_server_url: Option<String>,
|
||||
remote_exec_server_client: Option<ExecServerClient>,
|
||||
exec_backend: Arc<dyn ExecBackend>,
|
||||
executor: Arc<dyn ExecProcess>,
|
||||
}
|
||||
|
||||
impl Default for Environment {
|
||||
@@ -34,7 +34,7 @@ impl Default for Environment {
|
||||
Self {
|
||||
experimental_exec_server_url: None,
|
||||
remote_exec_server_client: None,
|
||||
exec_backend: Arc::new(local_process),
|
||||
executor: Arc::new(local_process),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,24 +68,24 @@ impl Environment {
|
||||
None
|
||||
};
|
||||
|
||||
let exec_backend: Arc<dyn ExecBackend> =
|
||||
if let Some(client) = remote_exec_server_client.clone() {
|
||||
Arc::new(RemoteProcess::new(client))
|
||||
} else {
|
||||
let local_process = LocalProcess::default();
|
||||
local_process
|
||||
.initialize()
|
||||
.map_err(|err| ExecServerError::Protocol(err.message))?;
|
||||
local_process
|
||||
.initialized()
|
||||
.map_err(ExecServerError::Protocol)?;
|
||||
Arc::new(local_process)
|
||||
};
|
||||
let executor: Arc<dyn ExecProcess> = if let Some(client) = remote_exec_server_client.clone()
|
||||
{
|
||||
Arc::new(RemoteProcess::new(client))
|
||||
} else {
|
||||
let local_process = LocalProcess::default();
|
||||
local_process
|
||||
.initialize()
|
||||
.map_err(|err| ExecServerError::Protocol(err.message))?;
|
||||
local_process
|
||||
.initialized()
|
||||
.map_err(ExecServerError::Protocol)?;
|
||||
Arc::new(local_process)
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
experimental_exec_server_url,
|
||||
remote_exec_server_client,
|
||||
exec_backend,
|
||||
executor,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -93,8 +93,8 @@ impl Environment {
|
||||
self.experimental_exec_server_url.as_deref()
|
||||
}
|
||||
|
||||
pub fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
|
||||
Arc::clone(&self.exec_backend)
|
||||
pub fn get_executor(&self) -> Arc<dyn ExecProcess> {
|
||||
Arc::clone(&self.executor)
|
||||
}
|
||||
|
||||
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
|
||||
@@ -107,8 +107,8 @@ impl Environment {
|
||||
}
|
||||
|
||||
impl ExecutorEnvironment for Environment {
|
||||
fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
|
||||
Arc::clone(&self.exec_backend)
|
||||
fn get_executor(&self) -> Arc<dyn ExecProcess> {
|
||||
Arc::clone(&self.executor)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ mod tests {
|
||||
let environment = Environment::default();
|
||||
|
||||
let response = environment
|
||||
.get_exec_backend()
|
||||
.get_executor()
|
||||
.start(crate::ExecParams {
|
||||
process_id: "default-env-proc".to_string(),
|
||||
argv: vec!["true".to_string()],
|
||||
@@ -142,6 +142,11 @@ mod tests {
|
||||
.await
|
||||
.expect("start process");
|
||||
|
||||
assert_eq!(response.process.process_id().as_str(), "default-env-proc");
|
||||
assert_eq!(
|
||||
response,
|
||||
crate::ExecResponse {
|
||||
process_id: "default-env-proc".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,12 +39,8 @@ pub use file_system::FileMetadata;
|
||||
pub use file_system::FileSystemResult;
|
||||
pub use file_system::ReadDirectoryEntry;
|
||||
pub use file_system::RemoveOptions;
|
||||
pub use process::ExecBackend;
|
||||
pub use process::ExecProcess;
|
||||
pub use process::ExecSessionEvent;
|
||||
pub use process::ProcessId;
|
||||
pub use process::StartedExecProcess;
|
||||
pub use protocol::ExecClosedNotification;
|
||||
pub use process::ExecServerEvent;
|
||||
pub use protocol::ExecExitedNotification;
|
||||
pub use protocol::ExecOutputDeltaNotification;
|
||||
pub use protocol::ExecOutputStream;
|
||||
@@ -58,7 +54,6 @@ pub use protocol::TerminateParams;
|
||||
pub use protocol::TerminateResponse;
|
||||
pub use protocol::WriteParams;
|
||||
pub use protocol::WriteResponse;
|
||||
pub use protocol::WriteStatus;
|
||||
pub use server::DEFAULT_LISTEN_URL;
|
||||
pub use server::ExecServerListenUrlParseError;
|
||||
pub use server::run_main;
|
||||
|
||||
@@ -11,17 +11,13 @@ use codex_utils_pty::ExecCommandSession;
|
||||
use codex_utils_pty::TerminalSize;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ExecBackend;
|
||||
use crate::ExecProcess;
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecSessionEvent;
|
||||
use crate::ProcessId;
|
||||
use crate::StartedExecProcess;
|
||||
use crate::process::SESSION_EVENT_CHANNEL_CAPACITY;
|
||||
use crate::protocol::EXEC_CLOSED_METHOD;
|
||||
use crate::protocol::ExecClosedNotification;
|
||||
use crate::ExecServerEvent;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
@@ -35,7 +31,6 @@ use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::protocol::WriteStatus;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::rpc::RpcServerOutboundMessage;
|
||||
use crate::rpc::internal_error;
|
||||
@@ -43,6 +38,7 @@ use crate::rpc::invalid_params;
|
||||
use crate::rpc::invalid_request;
|
||||
|
||||
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
|
||||
const EVENT_CHANNEL_CAPACITY: usize = 256;
|
||||
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
|
||||
#[cfg(test)]
|
||||
const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25);
|
||||
@@ -64,9 +60,6 @@ struct RunningProcess {
|
||||
next_seq: u64,
|
||||
exit_code: Option<i32>,
|
||||
output_notify: Arc<Notify>,
|
||||
session_events_tx: mpsc::Sender<ExecSessionEvent>,
|
||||
open_streams: usize,
|
||||
closed: bool,
|
||||
}
|
||||
|
||||
enum ProcessEntry {
|
||||
@@ -76,6 +69,7 @@ enum ProcessEntry {
|
||||
|
||||
struct Inner {
|
||||
notifications: RpcNotificationSender,
|
||||
events_tx: broadcast::Sender<ExecServerEvent>,
|
||||
processes: Mutex<HashMap<String, ProcessEntry>>,
|
||||
initialize_requested: AtomicBool,
|
||||
initialized: AtomicBool,
|
||||
@@ -86,11 +80,6 @@ pub(crate) struct LocalProcess {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct LocalExecProcess {
|
||||
process_id: ProcessId,
|
||||
backend: LocalProcess,
|
||||
}
|
||||
|
||||
impl Default for LocalProcess {
|
||||
fn default() -> Self {
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
@@ -105,6 +94,7 @@ impl LocalProcess {
|
||||
Self {
|
||||
inner: Arc::new(Inner {
|
||||
notifications,
|
||||
events_tx: broadcast::channel(EVENT_CHANNEL_CAPACITY).0,
|
||||
processes: Mutex::new(HashMap::new()),
|
||||
initialize_requested: AtomicBool::new(false),
|
||||
initialized: AtomicBool::new(false),
|
||||
@@ -162,12 +152,10 @@ impl LocalProcess {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_process(
|
||||
&self,
|
||||
params: ExecParams,
|
||||
) -> Result<(ExecResponse, mpsc::Receiver<ExecSessionEvent>), JSONRPCErrorError> {
|
||||
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
|
||||
self.require_initialized_for("exec")?;
|
||||
let process_id = params.process_id.clone();
|
||||
|
||||
let (program, args) = params
|
||||
.argv
|
||||
.split_first()
|
||||
@@ -215,7 +203,6 @@ impl LocalProcess {
|
||||
};
|
||||
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
let (session_events_tx, session_events_rx) = mpsc::channel(SESSION_EVENT_CHANNEL_CAPACITY);
|
||||
{
|
||||
let mut process_map = self.inner.processes.lock().await;
|
||||
process_map.insert(
|
||||
@@ -228,9 +215,6 @@ impl LocalProcess {
|
||||
next_seq: 1,
|
||||
exit_code: None,
|
||||
output_notify: Arc::clone(&output_notify),
|
||||
session_events_tx: session_events_tx.clone(),
|
||||
open_streams: 2,
|
||||
closed: false,
|
||||
})),
|
||||
);
|
||||
}
|
||||
@@ -264,13 +248,7 @@ impl LocalProcess {
|
||||
output_notify,
|
||||
));
|
||||
|
||||
Ok((ExecResponse { process_id }, session_events_rx))
|
||||
}
|
||||
|
||||
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
|
||||
self.start_process(params)
|
||||
.await
|
||||
.map(|(response, _)| response)
|
||||
Ok(ExecResponse { process_id })
|
||||
}
|
||||
|
||||
pub(crate) async fn exec_read(
|
||||
@@ -278,7 +256,6 @@ impl LocalProcess {
|
||||
params: ReadParams,
|
||||
) -> Result<ReadResponse, JSONRPCErrorError> {
|
||||
self.require_initialized_for("exec")?;
|
||||
let _process_id = params.process_id.clone();
|
||||
let after_seq = params.after_seq.unwrap_or(0);
|
||||
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
|
||||
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
|
||||
@@ -332,11 +309,6 @@ impl LocalProcess {
|
||||
|| response.exited
|
||||
|| tokio::time::Instant::now() >= deadline
|
||||
{
|
||||
let _total_bytes: usize = response
|
||||
.chunks
|
||||
.iter()
|
||||
.map(|chunk| chunk.chunk.0.len())
|
||||
.sum();
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
@@ -353,24 +325,22 @@ impl LocalProcess {
|
||||
params: WriteParams,
|
||||
) -> Result<WriteResponse, JSONRPCErrorError> {
|
||||
self.require_initialized_for("exec")?;
|
||||
let _process_id = params.process_id.clone();
|
||||
let _input_bytes = params.chunk.0.len();
|
||||
let writer_tx = {
|
||||
let process_map = self.inner.processes.lock().await;
|
||||
let Some(process) = process_map.get(¶ms.process_id) else {
|
||||
return Ok(WriteResponse {
|
||||
status: WriteStatus::UnknownProcess,
|
||||
});
|
||||
};
|
||||
let process = process_map.get(¶ms.process_id).ok_or_else(|| {
|
||||
invalid_request(format!("unknown process id {}", params.process_id))
|
||||
})?;
|
||||
let ProcessEntry::Running(process) = process else {
|
||||
return Ok(WriteResponse {
|
||||
status: WriteStatus::Starting,
|
||||
});
|
||||
return Err(invalid_request(format!(
|
||||
"process id {} is starting",
|
||||
params.process_id
|
||||
)));
|
||||
};
|
||||
if !process.tty {
|
||||
return Ok(WriteResponse {
|
||||
status: WriteStatus::StdinClosed,
|
||||
});
|
||||
return Err(invalid_request(format!(
|
||||
"stdin is closed for process {}",
|
||||
params.process_id
|
||||
)));
|
||||
}
|
||||
process.session.writer_sender()
|
||||
};
|
||||
@@ -380,9 +350,7 @@ impl LocalProcess {
|
||||
.await
|
||||
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
|
||||
|
||||
Ok(WriteResponse {
|
||||
status: WriteStatus::Accepted,
|
||||
})
|
||||
Ok(WriteResponse { accepted: true })
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate_process(
|
||||
@@ -390,7 +358,6 @@ impl LocalProcess {
|
||||
params: TerminateParams,
|
||||
) -> Result<TerminateResponse, JSONRPCErrorError> {
|
||||
self.require_initialized_for("exec")?;
|
||||
let _process_id = params.process_id.clone();
|
||||
let running = {
|
||||
let process_map = self.inner.processes.lock().await;
|
||||
match process_map.get(¶ms.process_id) {
|
||||
@@ -410,38 +377,15 @@ impl LocalProcess {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecBackend for LocalProcess {
|
||||
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
|
||||
let (response, events) = self
|
||||
.start_process(params)
|
||||
.await
|
||||
.map_err(map_handler_error)?;
|
||||
Ok(StartedExecProcess {
|
||||
process: Arc::new(LocalExecProcess {
|
||||
process_id: response.process_id.into(),
|
||||
backend: self.clone(),
|
||||
}),
|
||||
events,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecProcess for LocalExecProcess {
|
||||
fn process_id(&self) -> &ProcessId {
|
||||
&self.process_id
|
||||
impl ExecProcess for LocalProcess {
|
||||
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
self.exec(params).await.map_err(map_handler_error)
|
||||
}
|
||||
|
||||
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
|
||||
self.backend.write(&self.process_id, chunk).await
|
||||
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
|
||||
self.exec_read(params).await.map_err(map_handler_error)
|
||||
}
|
||||
|
||||
async fn terminate(&self) -> Result<(), ExecServerError> {
|
||||
self.backend.terminate(&self.process_id).await
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalProcess {
|
||||
async fn write(
|
||||
&self,
|
||||
process_id: &str,
|
||||
@@ -455,13 +399,16 @@ impl LocalProcess {
|
||||
.map_err(map_handler_error)
|
||||
}
|
||||
|
||||
async fn terminate(&self, process_id: &str) -> Result<(), ExecServerError> {
|
||||
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
|
||||
self.terminate_process(TerminateParams {
|
||||
process_id: process_id.to_string(),
|
||||
})
|
||||
.await
|
||||
.map_err(map_handler_error)?;
|
||||
Ok(())
|
||||
.map_err(map_handler_error)
|
||||
}
|
||||
|
||||
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
|
||||
self.inner.events_tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -480,8 +427,7 @@ async fn stream_output(
|
||||
output_notify: Arc<Notify>,
|
||||
) {
|
||||
while let Some(chunk) = receiver.recv().await {
|
||||
let _chunk_len = chunk.len();
|
||||
let (events_tx, event, notification) = {
|
||||
let notification = {
|
||||
let mut processes = inner.processes.lock().await;
|
||||
let Some(entry) = processes.get_mut(&process_id) else {
|
||||
break;
|
||||
@@ -502,25 +448,21 @@ async fn stream_output(
|
||||
break;
|
||||
};
|
||||
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
|
||||
warn!(
|
||||
"retained output cap exceeded for process {process_id}; dropping oldest output"
|
||||
);
|
||||
}
|
||||
let event = ExecSessionEvent::Output {
|
||||
seq,
|
||||
ExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
stream,
|
||||
chunk: chunk.clone(),
|
||||
};
|
||||
(
|
||||
process.session_events_tx.clone(),
|
||||
event,
|
||||
ExecOutputDeltaNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
stream,
|
||||
chunk: chunk.into(),
|
||||
},
|
||||
)
|
||||
chunk: chunk.into(),
|
||||
}
|
||||
};
|
||||
output_notify.notify_waiters();
|
||||
let _ = events_tx.send(event).await;
|
||||
let _ = inner
|
||||
.events_tx
|
||||
.send(ExecServerEvent::OutputDelta(notification.clone()));
|
||||
|
||||
if inner
|
||||
.notifications
|
||||
.notify(crate::protocol::EXEC_OUTPUT_DELTA_METHOD, ¬ification)
|
||||
@@ -530,8 +472,6 @@ async fn stream_output(
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
finish_output_stream(process_id, inner).await;
|
||||
}
|
||||
|
||||
async fn watch_exit(
|
||||
@@ -541,39 +481,28 @@ async fn watch_exit(
|
||||
output_notify: Arc<Notify>,
|
||||
) {
|
||||
let exit_code = exit_rx.await.unwrap_or(-1);
|
||||
let notification = {
|
||||
{
|
||||
let mut processes = inner.processes.lock().await;
|
||||
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
|
||||
let seq = process.next_seq;
|
||||
process.next_seq += 1;
|
||||
process.exit_code = Some(exit_code);
|
||||
Some((
|
||||
process.session_events_tx.clone(),
|
||||
ExecSessionEvent::Exited { seq, exit_code },
|
||||
ExecExitedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
exit_code,
|
||||
},
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
output_notify.notify_waiters();
|
||||
if let Some((events_tx, event, notification)) = notification {
|
||||
let _ = events_tx.send(event).await;
|
||||
if inner
|
||||
.notifications
|
||||
.notify(crate::protocol::EXEC_EXITED_METHOD, ¬ification)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;
|
||||
output_notify.notify_waiters();
|
||||
let notification = ExecExitedNotification {
|
||||
process_id: process_id.clone(),
|
||||
exit_code,
|
||||
};
|
||||
let _ = inner
|
||||
.events_tx
|
||||
.send(ExecServerEvent::Exited(notification.clone()));
|
||||
if inner
|
||||
.notifications
|
||||
.notify(crate::protocol::EXEC_EXITED_METHOD, ¬ification)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
|
||||
let mut processes = inner.processes.lock().await;
|
||||
@@ -584,56 +513,3 @@ async fn watch_exit(
|
||||
processes.remove(&process_id);
|
||||
}
|
||||
}
|
||||
|
||||
async fn finish_output_stream(process_id: String, inner: Arc<Inner>) {
|
||||
{
|
||||
let mut processes = inner.processes.lock().await;
|
||||
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
|
||||
return;
|
||||
};
|
||||
|
||||
if process.open_streams > 0 {
|
||||
process.open_streams -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
maybe_emit_closed(process_id, inner).await;
|
||||
}
|
||||
|
||||
async fn maybe_emit_closed(process_id: String, inner: Arc<Inner>) {
|
||||
let notification = {
|
||||
let mut processes = inner.processes.lock().await;
|
||||
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
|
||||
return;
|
||||
};
|
||||
|
||||
if process.closed || process.open_streams != 0 || process.exit_code.is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
process.closed = true;
|
||||
let seq = process.next_seq;
|
||||
process.next_seq += 1;
|
||||
Some((
|
||||
process.session_events_tx.clone(),
|
||||
ExecSessionEvent::Closed { seq },
|
||||
ExecClosedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
},
|
||||
))
|
||||
};
|
||||
|
||||
let Some((events_tx, event, notification)) = notification else {
|
||||
return;
|
||||
};
|
||||
|
||||
let _ = events_tx.send(event).await;
|
||||
|
||||
if inner
|
||||
.notifications
|
||||
.notify(EXEC_CLOSED_METHOD, ¬ification)
|
||||
.await
|
||||
.is_err()
|
||||
{}
|
||||
}
|
||||
|
||||
@@ -1,90 +1,35 @@
|
||||
use std::fmt;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::ExecServerError;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::ExecExitedNotification;
|
||||
use crate::protocol::ExecOutputDeltaNotification;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteResponse;
|
||||
|
||||
pub(crate) const SESSION_EVENT_CHANNEL_CAPACITY: usize = 2048;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ExecSessionEvent {
|
||||
Output {
|
||||
seq: u64,
|
||||
stream: ExecOutputStream,
|
||||
chunk: Vec<u8>,
|
||||
},
|
||||
Exited {
|
||||
seq: u64,
|
||||
exit_code: i32,
|
||||
},
|
||||
Closed {
|
||||
seq: u64,
|
||||
},
|
||||
Failed {
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct ProcessId(String);
|
||||
|
||||
pub struct StartedExecProcess {
|
||||
pub process: Arc<dyn ExecProcess>,
|
||||
pub events: mpsc::Receiver<ExecSessionEvent>,
|
||||
}
|
||||
|
||||
impl ProcessId {
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> String {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ProcessId {
|
||||
type Target = str;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<str> for ProcessId {
|
||||
fn as_ref(&self) -> &str {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ProcessId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for ProcessId {
|
||||
fn from(value: String) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
pub enum ExecServerEvent {
|
||||
OutputDelta(ExecOutputDeltaNotification),
|
||||
Exited(ExecExitedNotification),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ExecProcess: Send + Sync {
|
||||
fn process_id(&self) -> &ProcessId;
|
||||
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError>;
|
||||
|
||||
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError>;
|
||||
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError>;
|
||||
|
||||
async fn terminate(&self) -> Result<(), ExecServerError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ExecBackend: Send + Sync {
|
||||
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError>;
|
||||
async fn write(
|
||||
&self,
|
||||
process_id: &str,
|
||||
chunk: Vec<u8>,
|
||||
) -> Result<WriteResponse, ExecServerError>;
|
||||
|
||||
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError>;
|
||||
|
||||
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent>;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ pub const EXEC_WRITE_METHOD: &str = "process/write";
|
||||
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
|
||||
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
|
||||
pub const EXEC_EXITED_METHOD: &str = "process/exited";
|
||||
pub const EXEC_CLOSED_METHOD: &str = "process/closed";
|
||||
pub const FS_READ_FILE_METHOD: &str = "fs/readFile";
|
||||
pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile";
|
||||
pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory";
|
||||
@@ -100,19 +99,10 @@ pub struct WriteParams {
|
||||
pub chunk: ByteChunk,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum WriteStatus {
|
||||
Accepted,
|
||||
UnknownProcess,
|
||||
StdinClosed,
|
||||
Starting,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WriteResponse {
|
||||
pub status: WriteStatus,
|
||||
pub accepted: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -139,7 +129,6 @@ pub enum ExecOutputStream {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExecOutputDeltaNotification {
|
||||
pub process_id: String,
|
||||
pub seq: u64,
|
||||
pub stream: ExecOutputStream,
|
||||
pub chunk: ByteChunk,
|
||||
}
|
||||
@@ -148,17 +137,9 @@ pub struct ExecOutputDeltaNotification {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExecExitedNotification {
|
||||
pub process_id: String,
|
||||
pub seq: u64,
|
||||
pub exit_code: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExecClosedNotification {
|
||||
pub process_id: String,
|
||||
pub seq: u64,
|
||||
}
|
||||
|
||||
mod base64_bytes {
|
||||
use super::BASE64_STANDARD;
|
||||
use base64::Engine as _;
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::ExecBackend;
|
||||
use crate::ExecProcess;
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::ProcessId;
|
||||
use crate::StartedExecProcess;
|
||||
use crate::ExecServerEvent;
|
||||
use crate::protocol::ExecParams;
|
||||
use crate::protocol::ExecResponse;
|
||||
use crate::protocol::ReadParams;
|
||||
use crate::protocol::ReadResponse;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteResponse;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -16,15 +17,21 @@ pub(crate) struct RemoteProcess {
|
||||
client: ExecServerClient,
|
||||
}
|
||||
|
||||
struct RemoteExecProcess {
|
||||
process_id: ProcessId,
|
||||
backend: RemoteProcess,
|
||||
}
|
||||
|
||||
impl RemoteProcess {
|
||||
pub(crate) fn new(client: ExecServerClient) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecProcess for RemoteProcess {
|
||||
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
|
||||
self.client.exec(params).await
|
||||
}
|
||||
|
||||
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
|
||||
self.client.read(params).await
|
||||
}
|
||||
|
||||
async fn write(
|
||||
&self,
|
||||
@@ -34,43 +41,11 @@ impl RemoteProcess {
|
||||
self.client.write(process_id, chunk).await
|
||||
}
|
||||
|
||||
async fn terminate_process(&self, process_id: &str) -> Result<(), ExecServerError> {
|
||||
self.client.terminate(process_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecBackend for RemoteProcess {
|
||||
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
|
||||
let process_id = params.process_id.clone();
|
||||
let events = self.client.register_session(&process_id).await?;
|
||||
if let Err(err) = self.client.exec(params).await {
|
||||
self.client.unregister_session(&process_id).await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(StartedExecProcess {
|
||||
process: Arc::new(RemoteExecProcess {
|
||||
process_id: process_id.into(),
|
||||
backend: self.clone(),
|
||||
}),
|
||||
events,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecProcess for RemoteExecProcess {
|
||||
fn process_id(&self) -> &ProcessId {
|
||||
&self.process_id
|
||||
}
|
||||
|
||||
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
|
||||
self.backend.write(&self.process_id, chunk).await
|
||||
}
|
||||
|
||||
async fn terminate(&self) -> Result<(), ExecServerError> {
|
||||
self.backend.terminate_process(&self.process_id).await
|
||||
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
|
||||
self.client.terminate(process_id).await
|
||||
}
|
||||
|
||||
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
|
||||
self.client.event_receiver()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::connection::JsonRpcConnectionEvent;
|
||||
@@ -191,12 +192,12 @@ impl RpcClient {
|
||||
if let Err(err) =
|
||||
handle_server_message(&pending_for_reader, &event_tx, message).await
|
||||
{
|
||||
let _ = err;
|
||||
warn!("JSON-RPC client closing after protocol error: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
JsonRpcConnectionEvent::MalformedMessage { reason } => {
|
||||
let _ = reason;
|
||||
warn!("JSON-RPC client closing after malformed message: {reason}");
|
||||
break;
|
||||
}
|
||||
JsonRpcConnectionEvent::Disconnected { reason } => {
|
||||
|
||||
@@ -6,23 +6,19 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::ExecBackend;
|
||||
use codex_exec_server::ExecParams;
|
||||
use codex_exec_server::ExecProcess;
|
||||
use codex_exec_server::ExecSessionEvent;
|
||||
use codex_exec_server::StartedExecProcess;
|
||||
use codex_exec_server::ExecResponse;
|
||||
use codex_exec_server::ReadParams;
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_case::test_case;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use common::exec_server::ExecServerHarness;
|
||||
use common::exec_server::exec_server;
|
||||
|
||||
struct ProcessContext {
|
||||
backend: Arc<dyn ExecBackend>,
|
||||
server: Option<ExecServerHarness>,
|
||||
process: Arc<dyn ExecProcess>,
|
||||
_server: Option<ExecServerHarness>,
|
||||
}
|
||||
|
||||
async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
|
||||
@@ -30,22 +26,22 @@ async fn create_process_context(use_remote: bool) -> Result<ProcessContext> {
|
||||
let server = exec_server().await?;
|
||||
let environment = Environment::create(Some(server.websocket_url().to_string())).await?;
|
||||
Ok(ProcessContext {
|
||||
backend: environment.get_exec_backend(),
|
||||
server: Some(server),
|
||||
process: environment.get_executor(),
|
||||
_server: Some(server),
|
||||
})
|
||||
} else {
|
||||
let environment = Environment::create(None).await?;
|
||||
Ok(ProcessContext {
|
||||
backend: environment.get_exec_backend(),
|
||||
server: None,
|
||||
process: environment.get_executor(),
|
||||
_server: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let session = context
|
||||
.backend
|
||||
let response = context
|
||||
.process
|
||||
.start(ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec!["true".to_string()],
|
||||
@@ -55,193 +51,28 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), "proc-1");
|
||||
let mut events = session.events;
|
||||
|
||||
let mut exit_code = None;
|
||||
loop {
|
||||
match timeout(Duration::from_secs(2), events.recv()).await? {
|
||||
Some(event) => match event {
|
||||
ExecSessionEvent::Exited {
|
||||
exit_code: code, ..
|
||||
} => exit_code = Some(code),
|
||||
ExecSessionEvent::Closed { .. } => break,
|
||||
ExecSessionEvent::Output { .. } => {}
|
||||
ExecSessionEvent::Failed { message } => {
|
||||
anyhow::bail!("process failed before Closed event: {message}")
|
||||
}
|
||||
},
|
||||
None => anyhow::bail!("event stream closed before Closed event"),
|
||||
assert_eq!(
|
||||
response,
|
||||
ExecResponse {
|
||||
process_id: "proc-1".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(exit_code, Some(0));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn collect_process_output_from_events(
|
||||
session: Arc<dyn ExecProcess>,
|
||||
mut events: mpsc::Receiver<ExecSessionEvent>,
|
||||
) -> Result<(String, i32, bool)> {
|
||||
let mut output = String::new();
|
||||
let mut exit_code = None;
|
||||
loop {
|
||||
match timeout(Duration::from_secs(2), events.recv()).await? {
|
||||
Some(event) => match event {
|
||||
ExecSessionEvent::Output { chunk, .. } => {
|
||||
output.push_str(&String::from_utf8_lossy(&chunk));
|
||||
}
|
||||
ExecSessionEvent::Exited {
|
||||
exit_code: code, ..
|
||||
} => exit_code = Some(code),
|
||||
ExecSessionEvent::Closed { .. } => {
|
||||
break;
|
||||
}
|
||||
ExecSessionEvent::Failed { message } => {
|
||||
anyhow::bail!("process failed before Closed event: {message}");
|
||||
}
|
||||
},
|
||||
None => {
|
||||
anyhow::bail!("event stream closed before Closed event");
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(session);
|
||||
Ok((output, exit_code.unwrap_or(-1), true))
|
||||
}
|
||||
|
||||
async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stream".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 0.05; printf 'session output\\n'".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
let StartedExecProcess { process, events } = session;
|
||||
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
|
||||
assert_eq!(output, "session output\n");
|
||||
assert_eq!(exit_code, 0);
|
||||
assert!(closed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stdin".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone(),
|
||||
argv: vec![
|
||||
"/usr/bin/python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"import sys; line = sys.stdin.readline(); sys.stdout.write(f'from-stdin:{line}'); sys.stdout.flush()".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env: Default::default(),
|
||||
tty: true,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
session.process.write(b"hello\n".to_vec()).await?;
|
||||
let StartedExecProcess { process, events } = session;
|
||||
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
|
||||
|
||||
assert!(
|
||||
output.contains("from-stdin:hello"),
|
||||
"unexpected output: {output:?}"
|
||||
);
|
||||
assert_eq!(exit_code, 0);
|
||||
assert!(closed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_preserves_queued_events_before_subscribe(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: "proc-queued".to_string(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"printf 'queued output\\n'".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let StartedExecProcess { process, events } = session;
|
||||
let (output, exit_code, closed) = collect_process_output_from_events(process, events).await?;
|
||||
assert_eq!(output, "queued output\n");
|
||||
assert_eq!(exit_code, 0);
|
||||
assert!(closed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
|
||||
let mut context = create_process_context(/*use_remote*/ true).await?;
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: "proc-disconnect".to_string(),
|
||||
argv: vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 10".to_string(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let server = context
|
||||
.server
|
||||
.as_mut()
|
||||
.expect("remote context should include exec-server harness");
|
||||
server.shutdown().await?;
|
||||
|
||||
let mut events = session.events;
|
||||
let mut next_seq = 0;
|
||||
loop {
|
||||
match timeout(Duration::from_secs(2), events.recv()).await? {
|
||||
Some(ExecSessionEvent::Failed { message }) => {
|
||||
assert!(
|
||||
message.starts_with("exec-server transport disconnected"),
|
||||
"unexpected failure message: {message}"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Some(ExecSessionEvent::Output { .. } | ExecSessionEvent::Exited { .. }) => {}
|
||||
Some(ExecSessionEvent::Closed { .. }) => {
|
||||
anyhow::bail!("received Closed instead of transport failure")
|
||||
}
|
||||
None => anyhow::bail!("event stream closed before Failed event"),
|
||||
let read = context
|
||||
.process
|
||||
.read(ReadParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
after_seq: Some(next_seq),
|
||||
max_bytes: None,
|
||||
wait_ms: Some(100),
|
||||
})
|
||||
.await?;
|
||||
next_seq = read.next_seq;
|
||||
if read.exited {
|
||||
assert_eq!(read.exit_code, Some(0));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -254,24 +85,3 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
|
||||
async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_starts_and_exits(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_process_streams_output(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_streams_output(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_write_then_read(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_process_preserves_queued_events_before_subscribe(use_remote: bool) -> Result<()> {
|
||||
assert_exec_process_preserves_queued_events_before_subscribe(use_remote).await
|
||||
}
|
||||
|
||||
16
codex-rs/instructions/BUILD.bazel
Normal file
16
codex-rs/instructions/BUILD.bazel
Normal file
@@ -0,0 +1,16 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "instructions",
|
||||
crate_name = "codex_instructions",
|
||||
compile_data = glob(
|
||||
include = ["**"],
|
||||
exclude = [
|
||||
"BUILD.bazel",
|
||||
"Cargo.toml",
|
||||
],
|
||||
allow_empty = True,
|
||||
) + [
|
||||
"//codex-rs:node-version.txt",
|
||||
],
|
||||
)
|
||||
20
codex-rs/instructions/Cargo.toml
Normal file
20
codex-rs/instructions/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-instructions"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
name = "codex_instructions"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-protocol = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
61
codex-rs/instructions/src/fragment.rs
Normal file
61
codex-rs/instructions/src/fragment.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
pub const AGENTS_MD_START_MARKER: &str = "# AGENTS.md instructions for ";
|
||||
pub const AGENTS_MD_END_MARKER: &str = "</INSTRUCTIONS>";
|
||||
pub const SKILL_OPEN_TAG: &str = "<skill>";
|
||||
pub const SKILL_CLOSE_TAG: &str = "</skill>";
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct ContextualUserFragmentDefinition {
|
||||
start_marker: &'static str,
|
||||
end_marker: &'static str,
|
||||
}
|
||||
|
||||
impl ContextualUserFragmentDefinition {
|
||||
pub const fn new(start_marker: &'static str, end_marker: &'static str) -> Self {
|
||||
Self {
|
||||
start_marker,
|
||||
end_marker,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn matches_text(&self, text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let starts_with_marker = trimmed
|
||||
.get(..self.start_marker.len())
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.start_marker));
|
||||
let trimmed = trimmed.trim_end();
|
||||
let ends_with_marker = trimmed
|
||||
.get(trimmed.len().saturating_sub(self.end_marker.len())..)
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(self.end_marker));
|
||||
starts_with_marker && ends_with_marker
|
||||
}
|
||||
|
||||
pub const fn start_marker(&self) -> &'static str {
|
||||
self.start_marker
|
||||
}
|
||||
|
||||
pub const fn end_marker(&self) -> &'static str {
|
||||
self.end_marker
|
||||
}
|
||||
|
||||
pub fn wrap(&self, body: String) -> String {
|
||||
format!("{}\n{}\n{}", self.start_marker, body, self.end_marker)
|
||||
}
|
||||
|
||||
pub fn into_message(self, text: String) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text }],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub const AGENTS_MD_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(AGENTS_MD_START_MARKER, AGENTS_MD_END_MARKER);
|
||||
pub const SKILL_FRAGMENT: ContextualUserFragmentDefinition =
|
||||
ContextualUserFragmentDefinition::new(SKILL_OPEN_TAG, SKILL_CLOSE_TAG);
|
||||
15
codex-rs/instructions/src/lib.rs
Normal file
15
codex-rs/instructions/src/lib.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
//! User and skill instruction payloads and contextual user fragment markers for Codex prompts.
|
||||
|
||||
mod fragment;
|
||||
mod user_instructions;
|
||||
|
||||
pub use fragment::AGENTS_MD_END_MARKER;
|
||||
pub use fragment::AGENTS_MD_FRAGMENT;
|
||||
pub use fragment::AGENTS_MD_START_MARKER;
|
||||
pub use fragment::ContextualUserFragmentDefinition;
|
||||
pub use fragment::SKILL_CLOSE_TAG;
|
||||
pub use fragment::SKILL_FRAGMENT;
|
||||
pub use fragment::SKILL_OPEN_TAG;
|
||||
pub use user_instructions::SkillInstructions;
|
||||
pub use user_instructions::USER_INSTRUCTIONS_PREFIX;
|
||||
pub use user_instructions::UserInstructions;
|
||||
@@ -3,20 +3,21 @@ use serde::Serialize;
|
||||
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
use crate::contextual_user_message::AGENTS_MD_FRAGMENT;
|
||||
use crate::contextual_user_message::SKILL_FRAGMENT;
|
||||
use crate::fragment::AGENTS_MD_FRAGMENT;
|
||||
use crate::fragment::AGENTS_MD_START_MARKER;
|
||||
use crate::fragment::SKILL_FRAGMENT;
|
||||
|
||||
pub const USER_INSTRUCTIONS_PREFIX: &str = "# AGENTS.md instructions for ";
|
||||
pub const USER_INSTRUCTIONS_PREFIX: &str = AGENTS_MD_START_MARKER;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(rename = "user_instructions", rename_all = "snake_case")]
|
||||
pub(crate) struct UserInstructions {
|
||||
pub struct UserInstructions {
|
||||
pub directory: String,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
impl UserInstructions {
|
||||
pub(crate) fn serialize_to_text(&self) -> String {
|
||||
pub fn serialize_to_text(&self) -> String {
|
||||
format!(
|
||||
"{prefix}{directory}\n\n<INSTRUCTIONS>\n{contents}\n{suffix}",
|
||||
prefix = AGENTS_MD_FRAGMENT.start_marker(),
|
||||
@@ -35,14 +36,12 @@ impl From<UserInstructions> for ResponseItem {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(rename = "skill_instructions", rename_all = "snake_case")]
|
||||
pub(crate) struct SkillInstructions {
|
||||
pub struct SkillInstructions {
|
||||
pub name: String,
|
||||
pub path: String,
|
||||
pub contents: String,
|
||||
}
|
||||
|
||||
impl SkillInstructions {}
|
||||
|
||||
impl From<SkillInstructions> for ResponseItem {
|
||||
fn from(si: SkillInstructions) -> Self {
|
||||
SKILL_FRAGMENT.into_message(SKILL_FRAGMENT.wrap(format!(
|
||||
@@ -1,7 +1,11 @@
|
||||
use super::*;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use crate::fragment::AGENTS_MD_FRAGMENT;
|
||||
use crate::fragment::SKILL_FRAGMENT;
|
||||
|
||||
#[test]
|
||||
fn test_user_instructions() {
|
||||
let user_instructions = UserInstructions {
|
||||
@@ -1,6 +1,5 @@
|
||||
pub const TOOL_CALL_COUNT_METRIC: &str = "codex.tool.call";
|
||||
pub const TOOL_CALL_DURATION_METRIC: &str = "codex.tool.call.duration_ms";
|
||||
pub const TOOL_CALL_UNIFIED_EXEC_METRIC: &str = "codex.tool.unified_exec";
|
||||
pub const API_CALL_COUNT_METRIC: &str = "codex.api_request";
|
||||
pub const API_CALL_DURATION_METRIC: &str = "codex.api_request.duration_ms";
|
||||
pub const SSE_EVENT_COUNT_METRIC: &str = "codex.sse_event";
|
||||
|
||||
@@ -26,15 +26,6 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py <plugin-nam
|
||||
python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin --with-marketplace
|
||||
```
|
||||
|
||||
For a home-local plugin, treat `<home>` as the root and use:
|
||||
|
||||
```bash
|
||||
python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin \
|
||||
--path ~/plugins \
|
||||
--marketplace-path ~/.agents/plugins/marketplace.json \
|
||||
--with-marketplace
|
||||
```
|
||||
|
||||
4. Generate/adjust optional companion folders as needed:
|
||||
|
||||
```bash
|
||||
@@ -46,7 +37,6 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin -
|
||||
|
||||
## What this skill creates
|
||||
|
||||
- If the user has not made the plugin location explicit, ask whether they want a repo-local plugin or a home-local plugin before generating marketplace entries.
|
||||
- Creates plugin root at `/<parent-plugin-directory>/<plugin-name>/`.
|
||||
- Always creates `/<parent-plugin-directory>/<plugin-name>/.codex-plugin/plugin.json`.
|
||||
- Fills the manifest with the full schema shape, placeholder values, and the complete `interface` section.
|
||||
@@ -68,8 +58,6 @@ python3 .agents/skills/plugin-creator/scripts/create_basic_plugin.py my-plugin -
|
||||
## Marketplace workflow
|
||||
|
||||
- `marketplace.json` always lives at `<repo-root>/.agents/plugins/marketplace.json`.
|
||||
- For a home-local plugin, use the same convention with `<home>` as the root:
|
||||
`~/.agents/plugins/marketplace.json` plus `./plugins/<plugin-name>`.
|
||||
- Marketplace root metadata supports top-level `name` plus optional `interface.displayName`.
|
||||
- Treat plugin order in `plugins[]` as render order in Codex. Append new entries unless a user explicitly asks to reorder the list.
|
||||
- `displayName` belongs inside the marketplace `interface` object, not individual `plugins[]` entries.
|
||||
|
||||
@@ -115,10 +115,8 @@
|
||||
"source": "local",
|
||||
"path": "./plugins/linear"
|
||||
},
|
||||
"policy": {
|
||||
"installation": "AVAILABLE",
|
||||
"authentication": "ON_INSTALL"
|
||||
},
|
||||
"installPolicy": "AVAILABLE",
|
||||
"authPolicy": "ON_INSTALL",
|
||||
"category": "Productivity"
|
||||
}
|
||||
]
|
||||
@@ -144,9 +142,7 @@
|
||||
- `source` (`string`): Use `local` for this repo workflow.
|
||||
- `path` (`string`): Relative plugin path based on the marketplace root.
|
||||
- Repo plugin: `./plugins/<plugin-name>`
|
||||
- Local plugin in `~/.agents/plugins/marketplace.json`: `./plugins/<plugin-name>`
|
||||
- The same relative path convention is used for both repo-rooted and home-rooted marketplaces.
|
||||
- Example: with `~/.agents/plugins/marketplace.json`, `./plugins/<plugin-name>` resolves to `~/plugins/<plugin-name>`.
|
||||
- Local plugin in `~/.agents/plugins/marketplace.json`: `./.codex/plugins/<plugin-name>`
|
||||
- `policy` (`object`): Marketplace policy block. Always include it.
|
||||
- `installation` (`string`): Availability policy.
|
||||
- Allowed values: `NOT_AVAILABLE`, `AVAILABLE`, `INSTALLED_BY_DEFAULT`
|
||||
|
||||
@@ -191,10 +191,7 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument(
|
||||
"--path",
|
||||
default=str(DEFAULT_PLUGIN_PARENT),
|
||||
help=(
|
||||
"Parent directory for plugin creation (defaults to <cwd>/plugins). "
|
||||
"When using a home-rooted marketplace, use <home>/plugins."
|
||||
),
|
||||
help="Parent directory for plugin creation (defaults to <cwd>/plugins)",
|
||||
)
|
||||
parser.add_argument("--with-skills", action="store_true", help="Create skills/ directory")
|
||||
parser.add_argument("--with-hooks", action="store_true", help="Create hooks/ directory")
|
||||
@@ -205,19 +202,12 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument(
|
||||
"--with-marketplace",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Create or update <cwd>/.agents/plugins/marketplace.json. "
|
||||
"Marketplace entries always point to ./plugins/<plugin-name> relative to the "
|
||||
"marketplace root."
|
||||
),
|
||||
help="Create or update <cwd>/.agents/plugins/marketplace.json",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--marketplace-path",
|
||||
default=str(DEFAULT_MARKETPLACE_PATH),
|
||||
help=(
|
||||
"Path to marketplace.json (defaults to <cwd>/.agents/plugins/marketplace.json). "
|
||||
"For a home-rooted marketplace, use <home>/.agents/plugins/marketplace.json."
|
||||
),
|
||||
help="Path to marketplace.json (defaults to <cwd>/.agents/plugins/marketplace.json)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--install-policy",
|
||||
|
||||
6
codex-rs/utils/plugins/BUILD.bazel
Normal file
6
codex-rs/utils/plugins/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "plugins",
|
||||
crate_name = "codex_utils_plugins",
|
||||
)
|
||||
21
codex-rs/utils/plugins/Cargo.toml
Normal file
21
codex-rs/utils/plugins/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-utils-plugins"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
name = "codex_utils_plugins"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
7
codex-rs/utils/plugins/src/lib.rs
Normal file
7
codex-rs/utils/plugins/src/lib.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
//! Plugin path resolution and plaintext mention sigils shared across Codex crates.
|
||||
|
||||
pub mod mention_syntax;
|
||||
pub mod plugin_namespace;
|
||||
|
||||
pub use plugin_namespace::PLUGIN_MANIFEST_PATH;
|
||||
pub use plugin_namespace::plugin_namespace_for_skill_path;
|
||||
7
codex-rs/utils/plugins/src/mention_syntax.rs
Normal file
7
codex-rs/utils/plugins/src/mention_syntax.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
//! Sigils for tool/plugin mentions in plaintext (shared across Codex crates).
|
||||
|
||||
/// Default plaintext sigil for tools.
|
||||
pub const TOOL_MENTION_SIGIL: char = '$';
|
||||
|
||||
/// Plugins use `@` in linked plaintext outside TUI.
|
||||
pub const PLUGIN_TEXT_MENTION_SIGIL: char = '@';
|
||||
70
codex-rs/utils/plugins/src/plugin_namespace.rs
Normal file
70
codex-rs/utils/plugins/src/plugin_namespace.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
//! Resolve plugin namespace from skill file paths by walking ancestors for `plugin.json`.
|
||||
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
/// Relative path from a plugin root to its manifest file.
|
||||
pub const PLUGIN_MANIFEST_PATH: &str = ".codex-plugin/plugin.json";
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct RawPluginManifestName {
|
||||
#[serde(default)]
|
||||
name: String,
|
||||
}
|
||||
|
||||
fn plugin_manifest_name(plugin_root: &Path) -> Option<String> {
|
||||
let manifest_path = plugin_root.join(PLUGIN_MANIFEST_PATH);
|
||||
if !manifest_path.is_file() {
|
||||
return None;
|
||||
}
|
||||
let contents = fs::read_to_string(&manifest_path).ok()?;
|
||||
let RawPluginManifestName { name: raw_name } = serde_json::from_str(&contents).ok()?;
|
||||
Some(
|
||||
plugin_root
|
||||
.file_name()
|
||||
.and_then(|entry| entry.to_str())
|
||||
.filter(|_| raw_name.trim().is_empty())
|
||||
.unwrap_or(raw_name.as_str())
|
||||
.to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the plugin manifest `name` for the nearest ancestor of `path` that contains a valid
|
||||
/// plugin manifest (same `name` rules as full manifest loading in codex-core).
|
||||
pub fn plugin_namespace_for_skill_path(path: &Path) -> Option<String> {
|
||||
for ancestor in path.ancestors() {
|
||||
if let Some(name) = plugin_manifest_name(ancestor) {
|
||||
return Some(name);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::plugin_namespace_for_skill_path;
|
||||
use std::fs;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn uses_manifest_name() {
|
||||
let tmp = tempdir().expect("tempdir");
|
||||
let plugin_root = tmp.path().join("plugins/sample");
|
||||
let skill_path = plugin_root.join("skills/search/SKILL.md");
|
||||
|
||||
fs::create_dir_all(skill_path.parent().expect("parent")).expect("mkdir");
|
||||
fs::create_dir_all(plugin_root.join(".codex-plugin")).expect("mkdir manifest");
|
||||
fs::write(
|
||||
plugin_root.join(".codex-plugin/plugin.json"),
|
||||
r#"{"name":"sample"}"#,
|
||||
)
|
||||
.expect("write manifest");
|
||||
fs::write(&skill_path, "---\ndescription: search\n---\n").expect("write skill");
|
||||
|
||||
assert_eq!(
|
||||
plugin_namespace_for_skill_path(&skill_path),
|
||||
Some("sample".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user