Compare commits

..

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
5eb7e86114 Unify realtime shutdown in core
Co-authored-by: Codex <noreply@openai.com>
2026-03-17 00:11:28 -07:00
17 changed files with 435 additions and 451 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2500,6 +2500,7 @@ dependencies = [
"chrono",
"clap",
"codex-ansi-escape",
"codex-app-server-client",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",

View File

@@ -38,7 +38,6 @@ where
let network_sandbox_policy = NetworkSandboxPolicy::from(sandbox_policy);
let args = create_linux_sandbox_command_args_for_policies(
command,
command_cwd.as_path(),
sandbox_policy,
&file_system_sandbox_policy,
network_sandbox_policy,
@@ -77,7 +76,6 @@ pub(crate) fn allow_network_for_proxy(enforce_managed_network: bool) -> bool {
#[allow(clippy::too_many_arguments)]
pub(crate) fn create_linux_sandbox_command_args_for_policies(
command: Vec<String>,
command_cwd: &Path,
sandbox_policy: &SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
@@ -95,16 +93,10 @@ pub(crate) fn create_linux_sandbox_command_args_for_policies(
.to_str()
.unwrap_or_else(|| panic!("cwd must be valid UTF-8"))
.to_string();
let command_cwd = command_cwd
.to_str()
.unwrap_or_else(|| panic!("command cwd must be valid UTF-8"))
.to_string();
let mut linux_cmd: Vec<String> = vec![
"--sandbox-policy-cwd".to_string(),
sandbox_policy_cwd,
"--command-cwd".to_string(),
command_cwd,
"--sandbox-policy".to_string(),
sandbox_policy_json,
"--file-system-sandbox-policy".to_string(),
@@ -128,26 +120,16 @@ pub(crate) fn create_linux_sandbox_command_args_for_policies(
#[cfg(test)]
pub(crate) fn create_linux_sandbox_command_args(
command: Vec<String>,
command_cwd: &Path,
sandbox_policy_cwd: &Path,
use_legacy_landlock: bool,
allow_network_for_proxy: bool,
) -> Vec<String> {
let command_cwd = command_cwd
.to_str()
.unwrap_or_else(|| panic!("command cwd must be valid UTF-8"))
.to_string();
let sandbox_policy_cwd = sandbox_policy_cwd
.to_str()
.unwrap_or_else(|| panic!("cwd must be valid UTF-8"))
.to_string();
let mut linux_cmd: Vec<String> = vec![
"--sandbox-policy-cwd".to_string(),
sandbox_policy_cwd,
"--command-cwd".to_string(),
command_cwd,
];
let mut linux_cmd: Vec<String> = vec!["--sandbox-policy-cwd".to_string(), sandbox_policy_cwd];
if use_legacy_landlock {
linux_cmd.push("--use-legacy-landlock".to_string());
}

View File

@@ -4,17 +4,15 @@ use pretty_assertions::assert_eq;
#[test]
fn legacy_landlock_flag_is_included_when_requested() {
let command = vec!["/bin/true".to_string()];
let command_cwd = Path::new("/tmp/link");
let cwd = Path::new("/tmp");
let default_bwrap =
create_linux_sandbox_command_args(command.clone(), command_cwd, cwd, false, false);
let default_bwrap = create_linux_sandbox_command_args(command.clone(), cwd, false, false);
assert_eq!(
default_bwrap.contains(&"--use-legacy-landlock".to_string()),
false
);
let legacy_landlock = create_linux_sandbox_command_args(command, command_cwd, cwd, true, false);
let legacy_landlock = create_linux_sandbox_command_args(command, cwd, true, false);
assert_eq!(
legacy_landlock.contains(&"--use-legacy-landlock".to_string()),
true
@@ -24,10 +22,9 @@ fn legacy_landlock_flag_is_included_when_requested() {
#[test]
fn proxy_flag_is_included_when_requested() {
let command = vec!["/bin/true".to_string()];
let command_cwd = Path::new("/tmp/link");
let cwd = Path::new("/tmp");
let args = create_linux_sandbox_command_args(command, command_cwd, cwd, true, true);
let args = create_linux_sandbox_command_args(command, cwd, true, true);
assert_eq!(
args.contains(&"--allow-network-for-proxy".to_string()),
true
@@ -37,7 +34,6 @@ fn proxy_flag_is_included_when_requested() {
#[test]
fn split_policy_flags_are_included() {
let command = vec!["/bin/true".to_string()];
let command_cwd = Path::new("/tmp/link");
let cwd = Path::new("/tmp");
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
@@ -45,7 +41,6 @@ fn split_policy_flags_are_included() {
let args = create_linux_sandbox_command_args_for_policies(
command,
command_cwd,
&sandbox_policy,
&file_system_sandbox_policy,
network_sandbox_policy,
@@ -64,11 +59,6 @@ fn split_policy_flags_are_included() {
.any(|window| window[0] == "--network-sandbox-policy" && window[1] == "\"restricted\""),
true
);
assert_eq!(
args.windows(2)
.any(|window| window[0] == "--command-cwd" && window[1] == "/tmp/link"),
true
);
}
#[test]

View File

@@ -56,6 +56,19 @@ const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
#[derive(Debug)]
enum RealtimeConversationEnd {
Requested,
TransportClosed,
Error(RealtimeConversationError),
}
#[derive(Debug)]
enum RealtimeConversationError {
Emit(String),
AlreadySent,
}
pub(crate) struct RealtimeConversationManager {
state: Mutex<Option<ConversationState>>,
}
@@ -344,6 +357,23 @@ pub(crate) async fn handle_start(
sess: &Arc<Session>,
sub_id: String,
params: ConversationStartParams,
) -> CodexResult<()> {
if let Err(err) = handle_start_inner(sess, &sub_id, params).await {
error!("failed to start realtime conversation: {err}");
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
)
.await;
}
Ok(())
}
async fn handle_start_inner(
sess: &Arc<Session>,
sub_id: &str,
params: ConversationStartParams,
) -> CodexResult<()> {
let provider = sess.provider().await;
let auth = sess.services.auth_manager.auth().await;
@@ -392,23 +422,15 @@ pub(crate) async fn handle_start(
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
info!("starting realtime conversation");
let (events_rx, realtime_active) = match sess
let (events_rx, realtime_active) = sess
.conversation
.start(api_provider, extra_headers, session_config)
.await
{
Ok(events_rx) => events_rx,
Err(err) => {
error!("failed to start realtime conversation: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
return Ok(());
}
};
.await?;
info!("realtime conversation started");
sess.send_event_raw(Event {
id: sub_id.clone(),
id: sub_id.to_string(),
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
session_id: requested_session_id,
}),
@@ -416,11 +438,13 @@ pub(crate) async fn handle_start(
.await;
let sess_clone = Arc::clone(sess);
let sub_id = sub_id.to_string();
tokio::spawn(async move {
let ev = |msg| Event {
id: sub_id.clone(),
msg,
};
let mut end = RealtimeConversationEnd::TransportClosed;
while let Ok(event) = events_rx.recv().await {
// if not audio out, log the event
if !matches!(event, RealtimeEvent::AudioOut(_)) {
@@ -429,6 +453,9 @@ pub(crate) async fn handle_start(
"received realtime conversation event"
);
}
if matches!(event, RealtimeEvent::Error(_)) {
end = RealtimeConversationEnd::Error(RealtimeConversationError::AlreadySent);
}
let maybe_routed_text = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
realtime_text_from_handoff_request(handoff)
@@ -449,14 +476,10 @@ pub(crate) async fn handle_start(
.await;
}
if realtime_active.swap(false, Ordering::Relaxed) {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
reason: Some("transport_closed".to_string()),
},
)))
.await;
if matches!(end, RealtimeConversationEnd::TransportClosed) {
info!("realtime conversation transport closed");
}
end_realtime_conversation(&sess_clone, sub_id, end).await;
}
});
@@ -470,7 +493,17 @@ pub(crate) async fn handle_audio(
) {
if let Err(err) = sess.conversation.audio_in(params.frame).await {
error!("failed to append realtime audio: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
if sess.conversation.running_state().await.is_some() {
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
)
.await;
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
@@ -545,25 +578,22 @@ pub(crate) async fn handle_text(
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
if let Err(err) = sess.conversation.text_in(params.text).await {
error!("failed to append realtime text: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
if sess.conversation.running_state().await.is_some() {
end_realtime_conversation(
sess,
sub_id,
RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())),
)
.await;
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
match sess.conversation.shutdown().await {
Ok(()) => {
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
reason: Some("requested".to_string()),
}),
})
.await;
}
Err(err) => {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
}
}
end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await;
}
fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
@@ -771,11 +801,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
Ok(None) => {
let _ = events_tx
.send(RealtimeEvent::Error(
"realtime websocket connection is closed".to_string(),
))
.await;
break;
}
Err(err) => {
@@ -868,6 +893,36 @@ async fn send_conversation_error(
.await;
}
async fn end_realtime_conversation(
sess: &Arc<Session>,
sub_id: String,
end: RealtimeConversationEnd,
) {
let _ = sess.conversation.shutdown().await;
if let RealtimeConversationEnd::Error(RealtimeConversationError::Emit(message)) = &end {
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message.clone()),
}),
})
.await;
}
let reason = match end {
RealtimeConversationEnd::Requested => Some("requested".to_string()),
RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()),
RealtimeConversationEnd::Error(_) => Some("error".to_string()),
};
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { reason }),
})
.await;
}
#[cfg(test)]
#[path = "realtime_conversation_tests.rs"]
mod tests;

View File

@@ -672,7 +672,6 @@ impl SandboxManager {
let allow_proxy_network = allow_network_for_proxy(enforce_managed_network);
let mut args = create_linux_sandbox_command_args_for_policies(
command.clone(),
spec.cwd.as_path(),
&effective_policy,
&effective_file_system_policy,
effective_network_policy,

View File

@@ -35,15 +35,8 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use dunce::canonicalize;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
#[cfg(unix)]
use std::path::Path;
use tempfile::TempDir;
#[cfg(unix)]
fn symlink_dir(original: &Path, link: &Path) -> std::io::Result<()> {
std::os::unix::fs::symlink(original, link)
}
#[test]
fn danger_full_access_defaults_to_no_sandbox_without_network_requirements() {
let manager = SandboxManager::new();
@@ -224,41 +217,6 @@ fn normalize_additional_permissions_preserves_network() {
);
}
#[cfg(unix)]
#[test]
fn normalize_additional_permissions_canonicalizes_symlinked_write_paths() {
let temp_dir = TempDir::new().expect("create temp dir");
let real_root = temp_dir.path().join("real");
let link_root = temp_dir.path().join("link");
let write_dir = real_root.join("write");
std::fs::create_dir_all(&write_dir).expect("create write dir");
symlink_dir(&real_root, &link_root).expect("create symlinked root");
let link_write_dir =
AbsolutePathBuf::from_absolute_path(link_root.join("write")).expect("link write dir");
let expected_write_dir = AbsolutePathBuf::from_absolute_path(
write_dir.canonicalize().expect("canonicalize write dir"),
)
.expect("absolute canonical write dir");
let permissions = normalize_additional_permissions(PermissionProfile {
file_system: Some(FileSystemPermissions {
read: Some(vec![]),
write: Some(vec![link_write_dir]),
}),
..Default::default()
})
.expect("permissions");
assert_eq!(
permissions.file_system,
Some(FileSystemPermissions {
read: Some(vec![]),
write: Some(vec![expected_write_dir]),
})
);
}
#[test]
fn normalize_additional_permissions_drops_empty_nested_profiles() {
let permissions = normalize_additional_permissions(PermissionProfile {

View File

@@ -12,6 +12,7 @@ use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::SessionSource;
@@ -381,6 +382,15 @@ impl EnvGuard {
}
Self { key, original }
}
fn unset(key: &'static str) -> Self {
let original = std::env::var_os(key);
// SAFETY: this guard restores the original value before the test exits.
unsafe {
std::env::remove_var(key);
}
Self { key, original }
}
}
impl Drop for EnvGuard {
@@ -427,6 +437,48 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> {
skip_if_no_network!(Ok(()));
let _env_guard = EnvGuard::unset(OPENAI_API_KEY_ENV_VAR);
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build_with_websocket_server(&server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}) => Some(message.clone()),
_ => None,
})
.await;
assert_eq!(err, "realtime conversation requires API key auth");
let closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
assert_eq!(
closed,
RealtimeConversationClosedEvent {
reason: Some("error".to_string()),
}
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_text_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -16,8 +16,10 @@ use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::PathBuf;
use codex_core::error::CodexErr;
use codex_core::error::Result;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::WritableRoot;
use codex_utils_absolute_path::AbsolutePathBuf;
/// Linux "platform defaults" that keep common system binaries and dynamic
@@ -39,10 +41,10 @@ const LINUX_PLATFORM_DEFAULT_READ_ROOTS: &[&str] = &[
/// Options that control how bubblewrap is invoked.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BwrapOptions {
/// Whether to mount a fresh `/proc` inside the sandbox.
/// Whether to mount a fresh `/proc` inside the PID namespace.
///
/// This is the secure default, but some restrictive container environments
/// deny `--proc /proc`.
/// deny `--proc /proc` even when PID namespaces are available.
pub mount_proc: bool,
/// How networking should be configured inside the bubblewrap sandbox.
pub network_mode: BwrapNetworkMode,
@@ -94,8 +96,7 @@ pub(crate) struct BwrapArgs {
pub(crate) fn create_bwrap_command_args(
command: Vec<String>,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
sandbox_policy_cwd: &Path,
command_cwd: &Path,
cwd: &Path,
options: BwrapOptions,
) -> Result<BwrapArgs> {
if file_system_sandbox_policy.has_full_disk_write_access() {
@@ -109,13 +110,7 @@ pub(crate) fn create_bwrap_command_args(
};
}
create_bwrap_flags(
command,
file_system_sandbox_policy,
sandbox_policy_cwd,
command_cwd,
options,
)
create_bwrap_flags(command, file_system_sandbox_policy, cwd, options)
}
fn create_bwrap_flags_full_filesystem(command: Vec<String>, options: BwrapOptions) -> BwrapArgs {
@@ -149,15 +144,13 @@ fn create_bwrap_flags_full_filesystem(command: Vec<String>, options: BwrapOption
fn create_bwrap_flags(
command: Vec<String>,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
sandbox_policy_cwd: &Path,
command_cwd: &Path,
cwd: &Path,
options: BwrapOptions,
) -> Result<BwrapArgs> {
let BwrapArgs {
args: filesystem_args,
preserved_files,
} = create_filesystem_args(file_system_sandbox_policy, sandbox_policy_cwd)?;
let normalized_command_cwd = normalize_command_cwd_for_bwrap(command_cwd);
} = create_filesystem_args(file_system_sandbox_policy, cwd)?;
let mut args = Vec::new();
args.push("--new-session".to_string());
args.push("--die-with-parent".to_string());
@@ -165,6 +158,7 @@ fn create_bwrap_flags(
// Request a user namespace explicitly rather than relying on bubblewrap's
// auto-enable behavior, which is skipped when the caller runs as uid 0.
args.push("--unshare-user".to_string());
// Isolate the PID namespace.
args.push("--unshare-pid".to_string());
if options.network_mode.should_unshare_network() {
args.push("--unshare-net".to_string());
@@ -174,14 +168,6 @@ fn create_bwrap_flags(
args.push("--proc".to_string());
args.push("/proc".to_string());
}
if normalized_command_cwd.as_path() != command_cwd {
// Bubblewrap otherwise inherits the helper's logical cwd, which can be
// a symlink alias that disappears once the sandbox only mounts
// canonical roots. Enter the canonical command cwd explicitly so
// relative paths stay aligned with the mounted filesystem view.
args.push("--chdir".to_string());
args.push(path_to_string(normalized_command_cwd.as_path()));
}
args.push("--".to_string());
args.extend(command);
Ok(BwrapArgs {
@@ -210,15 +196,9 @@ fn create_filesystem_args(
file_system_sandbox_policy: &FileSystemSandboxPolicy,
cwd: &Path,
) -> Result<BwrapArgs> {
// Bubblewrap requires bind mount targets to exist. Skip missing writable
// roots so mixed-platform configs can keep harmless paths for other
// environments without breaking Linux command startup.
let writable_roots = file_system_sandbox_policy
.get_writable_roots_with_cwd(cwd)
.into_iter()
.filter(|writable_root| writable_root.root.as_path().exists())
.collect::<Vec<_>>();
let writable_roots = file_system_sandbox_policy.get_writable_roots_with_cwd(cwd);
let unreadable_roots = file_system_sandbox_policy.get_unreadable_roots_with_cwd(cwd);
ensure_mount_targets_exist(&writable_roots)?;
let mut args = if file_system_sandbox_policy.has_full_disk_read_access() {
// Read-only root, then mount a minimal device tree.
@@ -388,6 +368,23 @@ fn create_filesystem_args(
})
}
/// Validate that writable roots exist before constructing mounts.
///
/// Bubblewrap requires bind mount targets to exist. We fail fast with a clear
/// error so callers can present an actionable message.
fn ensure_mount_targets_exist(writable_roots: &[WritableRoot]) -> Result<()> {
for writable_root in writable_roots {
let root = writable_root.root.as_path();
if !root.exists() {
return Err(CodexErr::UnsupportedOperation(format!(
"Sandbox expected writable root {root}, but it does not exist.",
root = root.display()
)));
}
}
Ok(())
}
fn path_to_string(path: &Path) -> String {
path.to_string_lossy().to_string()
}
@@ -396,12 +393,6 @@ fn path_depth(path: &Path) -> usize {
path.components().count()
}
fn normalize_command_cwd_for_bwrap(command_cwd: &Path) -> PathBuf {
command_cwd
.canonicalize()
.unwrap_or_else(|_| command_cwd.to_path_buf())
}
fn append_mount_target_parent_dir_args(args: &mut Vec<String>, mount_target: &Path, anchor: &Path) {
let mount_target_dir = if mount_target.is_dir() {
mount_target
@@ -616,7 +607,6 @@ mod tests {
command.clone(),
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
Path::new("/"),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::FullAccess,
@@ -634,7 +624,6 @@ mod tests {
command,
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
Path::new("/"),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::ProxyOnly,
@@ -661,97 +650,6 @@ mod tests {
);
}
#[cfg(unix)]
#[test]
fn restricted_policy_chdirs_to_canonical_command_cwd() {
let temp_dir = TempDir::new().expect("temp dir");
let real_root = temp_dir.path().join("real");
let real_subdir = real_root.join("subdir");
let link_root = temp_dir.path().join("link");
std::fs::create_dir_all(&real_subdir).expect("create real subdir");
std::os::unix::fs::symlink(&real_root, &link_root).expect("create symlinked root");
let sandbox_policy_cwd = AbsolutePathBuf::from_absolute_path(&link_root)
.expect("absolute symlinked root")
.to_path_buf();
let command_cwd = link_root.join("subdir");
let canonical_command_cwd = real_subdir
.canonicalize()
.expect("canonicalize command cwd");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Minimal,
},
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::CurrentWorkingDirectory,
},
access: FileSystemAccessMode::Write,
},
]);
let args = create_bwrap_command_args(
vec!["/bin/true".to_string()],
&policy,
sandbox_policy_cwd.as_path(),
&command_cwd,
BwrapOptions::default(),
)
.expect("create bwrap args");
let canonical_command_cwd = path_to_string(&canonical_command_cwd);
let link_command_cwd = path_to_string(&command_cwd);
assert!(
args.args
.windows(2)
.any(|window| { window == ["--chdir", canonical_command_cwd.as_str()] })
);
assert!(
!args
.args
.windows(2)
.any(|window| { window == ["--chdir", link_command_cwd.as_str()] })
);
}
#[test]
fn ignores_missing_writable_roots() {
let temp_dir = TempDir::new().expect("temp dir");
let existing_root = temp_dir.path().join("existing");
let missing_root = temp_dir.path().join("missing");
std::fs::create_dir(&existing_root).expect("create existing root");
let policy = SandboxPolicy::WorkspaceWrite {
writable_roots: vec![
AbsolutePathBuf::try_from(existing_root.as_path()).expect("absolute existing root"),
AbsolutePathBuf::try_from(missing_root.as_path()).expect("absolute missing root"),
],
read_only_access: Default::default(),
network_access: false,
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
};
let args = create_filesystem_args(&FileSystemSandboxPolicy::from(&policy), temp_dir.path())
.expect("filesystem args");
let existing_root = path_to_string(&existing_root);
let missing_root = path_to_string(&missing_root);
assert!(
args.args.windows(3).any(|window| {
window == ["--bind", existing_root.as_str(), existing_root.as_str()]
}),
"existing writable root should be rebound writable",
);
assert!(
!args.args.iter().any(|arg| arg == &missing_root),
"missing writable root should be skipped",
);
}
#[test]
fn mounts_dev_before_writable_dev_binds() {
let sandbox_policy = SandboxPolicy::WorkspaceWrite {

View File

@@ -31,15 +31,6 @@ pub struct LandlockCommand {
#[arg(long = "sandbox-policy-cwd")]
pub sandbox_policy_cwd: PathBuf,
/// The logical working directory for the command being sandboxed.
///
/// This can intentionally differ from `sandbox_policy_cwd` when the
/// command runs from a symlinked alias of the policy workspace. Keep it
/// explicit so bubblewrap can preserve the caller's logical cwd when that
/// alias would otherwise disappear inside the sandbox namespace.
#[arg(long = "command-cwd", hide = true)]
pub command_cwd: Option<PathBuf>,
/// Legacy compatibility policy.
///
/// Newer callers pass split filesystem/network policies as well so the
@@ -100,7 +91,6 @@ pub struct LandlockCommand {
pub fn run_main() -> ! {
let LandlockCommand {
sandbox_policy_cwd,
command_cwd,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
@@ -187,7 +177,6 @@ pub fn run_main() -> ! {
};
let inner = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: &sandbox_policy_cwd,
command_cwd: command_cwd.as_deref(),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &file_system_sandbox_policy,
network_sandbox_policy,
@@ -197,7 +186,6 @@ pub fn run_main() -> ! {
});
run_bwrap_with_proc_fallback(
&sandbox_policy_cwd,
command_cwd.as_deref(),
&file_system_sandbox_policy,
network_sandbox_policy,
inner,
@@ -399,7 +387,6 @@ fn ensure_legacy_landlock_mode_supports_policy(
fn run_bwrap_with_proc_fallback(
sandbox_policy_cwd: &Path,
command_cwd: Option<&Path>,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
inner: Vec<String>,
@@ -408,12 +395,10 @@ fn run_bwrap_with_proc_fallback(
) -> ! {
let network_mode = bwrap_network_mode(network_sandbox_policy, allow_network_for_proxy);
let mut mount_proc = mount_proc;
let command_cwd = command_cwd.unwrap_or(sandbox_policy_cwd);
if mount_proc
&& !preflight_proc_mount_support(
sandbox_policy_cwd,
command_cwd,
file_system_sandbox_policy,
network_mode,
)
@@ -431,7 +416,6 @@ fn run_bwrap_with_proc_fallback(
inner,
file_system_sandbox_policy,
sandbox_policy_cwd,
command_cwd,
options,
);
exec_vendored_bwrap(bwrap_args.args, bwrap_args.preserved_files);
@@ -454,14 +438,12 @@ fn build_bwrap_argv(
inner: Vec<String>,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
sandbox_policy_cwd: &Path,
command_cwd: &Path,
options: BwrapOptions,
) -> crate::bwrap::BwrapArgs {
let mut bwrap_args = create_bwrap_command_args(
inner,
file_system_sandbox_policy,
sandbox_policy_cwd,
command_cwd,
options,
)
.unwrap_or_else(|err| panic!("error building bubblewrap command: {err:?}"));
@@ -486,23 +468,17 @@ fn build_bwrap_argv(
fn preflight_proc_mount_support(
sandbox_policy_cwd: &Path,
command_cwd: &Path,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_mode: BwrapNetworkMode,
) -> bool {
let preflight_argv = build_preflight_bwrap_argv(
sandbox_policy_cwd,
command_cwd,
file_system_sandbox_policy,
network_mode,
);
let preflight_argv =
build_preflight_bwrap_argv(sandbox_policy_cwd, file_system_sandbox_policy, network_mode);
let stderr = run_bwrap_in_child_capture_stderr(preflight_argv);
!is_proc_mount_failure(stderr.as_str())
}
fn build_preflight_bwrap_argv(
sandbox_policy_cwd: &Path,
command_cwd: &Path,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_mode: BwrapNetworkMode,
) -> crate::bwrap::BwrapArgs {
@@ -511,7 +487,6 @@ fn build_preflight_bwrap_argv(
preflight_command,
file_system_sandbox_policy,
sandbox_policy_cwd,
command_cwd,
BwrapOptions {
mount_proc: true,
network_mode,
@@ -616,7 +591,6 @@ fn is_proc_mount_failure(stderr: &str) -> bool {
struct InnerSeccompCommandArgs<'a> {
sandbox_policy_cwd: &'a Path,
command_cwd: Option<&'a Path>,
sandbox_policy: &'a SandboxPolicy,
file_system_sandbox_policy: &'a FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
@@ -629,7 +603,6 @@ struct InnerSeccompCommandArgs<'a> {
fn build_inner_seccomp_command(args: InnerSeccompCommandArgs<'_>) -> Vec<String> {
let InnerSeccompCommandArgs {
sandbox_policy_cwd,
command_cwd,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
@@ -658,12 +631,6 @@ fn build_inner_seccomp_command(args: InnerSeccompCommandArgs<'_>) -> Vec<String>
current_exe.to_string_lossy().to_string(),
"--sandbox-policy-cwd".to_string(),
sandbox_policy_cwd.to_string_lossy().to_string(),
];
if let Some(command_cwd) = command_cwd {
inner.push("--command-cwd".to_string());
inner.push(command_cwd.to_string_lossy().to_string());
}
inner.extend([
"--sandbox-policy".to_string(),
policy_json,
"--file-system-sandbox-policy".to_string(),
@@ -671,7 +638,7 @@ fn build_inner_seccomp_command(args: InnerSeccompCommandArgs<'_>) -> Vec<String>
"--network-sandbox-policy".to_string(),
network_policy_json,
"--apply-seccomp-then-exec".to_string(),
]);
];
if allow_network_for_proxy {
inner.push("--allow-network-for-proxy".to_string());
let proxy_route_spec = proxy_route_spec

View File

@@ -44,7 +44,6 @@ fn inserts_bwrap_argv0_before_command_separator() {
vec!["/bin/true".to_string()],
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::FullAccess,
@@ -81,7 +80,6 @@ fn inserts_unshare_net_when_network_isolation_requested() {
vec!["/bin/true".to_string()],
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::Isolated,
@@ -98,7 +96,6 @@ fn inserts_unshare_net_when_proxy_only_network_mode_requested() {
vec!["/bin/true".to_string()],
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::ProxyOnly,
@@ -166,7 +163,6 @@ fn root_write_read_only_carveout_requires_direct_runtime_enforcement() {
fn managed_proxy_preflight_argv_is_wrapped_for_full_access_policy() {
let mode = bwrap_network_mode(NetworkSandboxPolicy::Enabled, true);
let argv = build_preflight_bwrap_argv(
Path::new("/"),
Path::new("/"),
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
mode,
@@ -180,7 +176,6 @@ fn managed_proxy_inner_command_includes_route_spec() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
command_cwd: Some(Path::new("/tmp/link")),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
@@ -198,7 +193,6 @@ fn inner_command_includes_split_policy_flags() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
command_cwd: Some(Path::new("/tmp/link")),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
@@ -209,10 +203,6 @@ fn inner_command_includes_split_policy_flags() {
assert!(args.iter().any(|arg| arg == "--file-system-sandbox-policy"));
assert!(args.iter().any(|arg| arg == "--network-sandbox-policy"));
assert!(
args.windows(2)
.any(|window| { window == ["--command-cwd", "/tmp/link"] })
);
}
#[test]
@@ -220,7 +210,6 @@ fn non_managed_inner_command_omits_route_spec() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
command_cwd: Some(Path::new("/tmp/link")),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
@@ -238,7 +227,6 @@ fn managed_proxy_inner_command_requires_route_spec() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
command_cwd: Some(Path::new("/tmp/link")),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,

View File

@@ -310,32 +310,6 @@ async fn test_writable_root() {
.await;
}
#[tokio::test]
async fn sandbox_ignores_missing_writable_roots_under_bwrap() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let tempdir = tempfile::tempdir().expect("tempdir");
let existing_root = tempdir.path().join("existing");
let missing_root = tempdir.path().join("missing");
std::fs::create_dir(&existing_root).expect("create existing root");
let output = run_cmd_result_with_writable_roots(
&["bash", "-lc", "printf sandbox-ok"],
&[existing_root, missing_root],
LONG_TIMEOUT_MS,
false,
true,
)
.await
.expect("sandboxed command should execute");
assert_eq!(output.exit_code, 0);
assert_eq!(output.stdout.text, "sandbox-ok");
}
#[tokio::test]
async fn test_no_new_privs_is_enabled() {
let output = run_cmd_output(

View File

@@ -1229,88 +1229,6 @@ mod tests {
);
}
#[cfg(unix)]
#[test]
fn current_working_directory_special_path_canonicalizes_symlinked_cwd() {
let cwd = TempDir::new().expect("tempdir");
let real_root = cwd.path().join("real");
let link_root = cwd.path().join("link");
let blocked = real_root.join("blocked");
let agents_dir = real_root.join(".agents");
let codex_dir = real_root.join(".codex");
fs::create_dir_all(&blocked).expect("create blocked");
fs::create_dir_all(&agents_dir).expect("create .agents");
fs::create_dir_all(&codex_dir).expect("create .codex");
symlink_dir(&real_root, &link_root).expect("create symlinked cwd");
let link_blocked =
AbsolutePathBuf::from_absolute_path(link_root.join("blocked")).expect("link blocked");
let expected_root = AbsolutePathBuf::from_absolute_path(
real_root.canonicalize().expect("canonicalize real root"),
)
.expect("absolute canonical root");
let expected_blocked = AbsolutePathBuf::from_absolute_path(
blocked.canonicalize().expect("canonicalize blocked"),
)
.expect("absolute canonical blocked");
let expected_agents = AbsolutePathBuf::from_absolute_path(
agents_dir.canonicalize().expect("canonicalize .agents"),
)
.expect("absolute canonical .agents");
let expected_codex = AbsolutePathBuf::from_absolute_path(
codex_dir.canonicalize().expect("canonicalize .codex"),
)
.expect("absolute canonical .codex");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Minimal,
},
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::CurrentWorkingDirectory,
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: link_blocked },
access: FileSystemAccessMode::None,
},
]);
assert_eq!(
policy.get_readable_roots_with_cwd(&link_root),
vec![expected_root.clone()]
);
assert_eq!(
policy.get_unreadable_roots_with_cwd(&link_root),
vec![expected_blocked.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(&link_root);
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, expected_root);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_blocked)
);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_agents)
);
assert!(
writable_roots[0]
.read_only_subpaths
.contains(&expected_codex)
);
}
#[cfg(unix)]
#[test]
fn writable_roots_preserve_symlinked_protected_subpaths() {

View File

@@ -29,6 +29,7 @@ base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-ansi-escape = { workspace = true }
codex-app-server-client = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-backend-client = { workspace = true }

View File

@@ -39,6 +39,7 @@ use crate::tui::TuiEvent;
use crate::update_action::UpdateAction;
use crate::version::CODEX_CLI_VERSION;
use codex_ansi_escape::ansi_escape_line;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_protocol::ConfigLayerSource;
use codex_core::AuthManager;
use codex_core::CodexAuth;
@@ -52,7 +53,6 @@ use codex_core::config::types::ApprovalsReviewer;
use codex_core::config::types::ModelAvailabilityNuxConfig;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::features::Feature;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
@@ -113,6 +113,7 @@ use tokio::task::JoinHandle;
use toml::Value as TomlValue;
mod agent_navigation;
mod app_server_adapter;
mod pending_interactive_replay;
use self::agent_navigation::AgentNavigationDirection;
@@ -1947,7 +1948,7 @@ impl App {
#[allow(clippy::too_many_arguments)]
pub async fn run(
tui: &mut tui::Tui,
auth_manager: Arc<AuthManager>,
mut app_server: InProcessAppServerClient,
mut config: Config,
cli_kv_overrides: Vec<(String, TomlValue)>,
harness_overrides: ConfigOverrides,
@@ -1967,20 +1968,8 @@ impl App {
let harness_overrides =
normalize_harness_overrides_for_cwd(harness_overrides, &config.cwd)?;
let thread_manager = Arc::new(ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Cli,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
let auth_manager = app_server.auth_manager();
let thread_manager = app_server.thread_manager();
let mut model = thread_manager
.get_models_manager()
.get_default_model(&config.model, RefreshStrategy::Offline)
@@ -1998,6 +1987,13 @@ impl App {
)
.await;
if let Some(exit_info) = exit_info {
app_server
.shutdown()
.await
.inspect_err(|err| {
tracing::warn!("app-server shutdown failed: {err}");
})
.ok();
return Ok(exit_info);
}
if let Some(updated_model) = config.model.clone() {
@@ -2229,6 +2225,7 @@ impl App {
let mut thread_created_rx = thread_manager.subscribe_thread_created();
let mut listen_for_threads = true;
let mut listen_for_app_server_events = true;
let mut waiting_for_initial_session_configured = wait_for_initial_session_configured;
#[cfg(not(debug_assertions))]
@@ -2288,6 +2285,16 @@ impl App {
Err(err) => break Err(err),
}
}
app_server_event = app_server.next_event(), if listen_for_app_server_events => {
match app_server_event {
Some(event) => app.handle_app_server_event(&app_server, event).await,
None => {
listen_for_app_server_events = false;
tracing::warn!("app-server event stream closed");
}
}
AppRunControl::Continue
}
// Listen on new thread creation due to collab tools.
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
@@ -2318,6 +2325,9 @@ impl App {
}
}
};
if let Err(err) = app_server.shutdown().await {
tracing::warn!(error = %err, "failed to shut down embedded app server");
}
let clear_result = tui.terminal.clear();
let exit_reason = match exit_reason_result {
Ok(exit_reason) => {

View File

@@ -0,0 +1,72 @@
/*
This module holds the temporary adapter layer between the TUI and the app
server during the hybrid migration period.
For now, the TUI still owns its existing direct-core behavior, but startup
allocates a local in-process app server and drains its event stream. Keeping
the app-server-specific wiring here keeps that transitional logic out of the
main `app.rs` orchestration path.
As more TUI flows move onto the app-server surface directly, this adapter
should shrink and eventually disappear.
*/
use super::App;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessServerEvent;
use codex_app_server_protocol::JSONRPCErrorError;
impl App {
pub(super) async fn handle_app_server_event(
&mut self,
app_server_client: &InProcessAppServerClient,
event: InProcessServerEvent,
) {
match event {
InProcessServerEvent::Lagged { skipped } => {
tracing::warn!(
skipped,
"app-server event consumer lagged; dropping ignored events"
);
}
InProcessServerEvent::ServerNotification(_) => {}
InProcessServerEvent::LegacyNotification(_) => {}
InProcessServerEvent::ServerRequest(request) => {
let request_id = request.id().clone();
tracing::warn!(
?request_id,
"rejecting app-server request while TUI still uses direct core APIs"
);
if let Err(err) = self
.reject_app_server_request(
app_server_client,
request_id,
"TUI client does not yet handle this app-server server request".to_string(),
)
.await
{
tracing::warn!("{err}");
}
}
}
}
async fn reject_app_server_request(
&self,
app_server_client: &InProcessAppServerClient,
request_id: codex_app_server_protocol::RequestId,
reason: String,
) -> std::result::Result<(), String> {
app_server_client
.reject_server_request(
request_id,
JSONRPCErrorError {
code: -32000,
message: reason,
data: None,
},
)
.await
.map_err(|err| format!("failed to reject app-server request: {err}"))
}
}

View File

@@ -7,6 +7,10 @@ use additional_dirs::add_dir_warning_message;
use app::App;
pub use app::AppExitInfo;
pub use app::ExitReason;
use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessClientStartArgs;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
use codex_core::CodexAuth;
@@ -46,12 +50,15 @@ use codex_state::log_db;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_oss::ensure_oss_provider_ready;
use codex_utils_oss::get_default_model_for_oss_provider;
use color_eyre::eyre::WrapErr;
use cwd_prompt::CwdPromptAction;
use cwd_prompt::CwdPromptOutcome;
use cwd_prompt::CwdSelection;
use std::fs::OpenOptions;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::error;
use tracing_appender::non_blocking;
use tracing_subscriber::EnvFilter;
@@ -239,10 +246,74 @@ pub use public_widgets::composer_input::ComposerAction;
pub use public_widgets::composer_input::ComposerInput;
// (tests access modules directly within the crate)
async fn start_embedded_app_server(
arg0_paths: Arg0DispatchPaths,
config: Config,
cli_kv_overrides: Vec<(String, toml::Value)>,
loader_overrides: LoaderOverrides,
cloud_requirements: CloudRequirementsLoader,
feedback: codex_feedback::CodexFeedback,
) -> color_eyre::Result<InProcessAppServerClient> {
start_embedded_app_server_with(
arg0_paths,
config,
cli_kv_overrides,
loader_overrides,
cloud_requirements,
feedback,
InProcessAppServerClient::start,
)
.await
}
async fn start_embedded_app_server_with<F, Fut>(
arg0_paths: Arg0DispatchPaths,
config: Config,
cli_kv_overrides: Vec<(String, toml::Value)>,
loader_overrides: LoaderOverrides,
cloud_requirements: CloudRequirementsLoader,
feedback: codex_feedback::CodexFeedback,
start_client: F,
) -> color_eyre::Result<InProcessAppServerClient>
where
F: FnOnce(InProcessClientStartArgs) -> Fut,
Fut: Future<Output = std::io::Result<InProcessAppServerClient>>,
{
let config_warnings = config
.startup_warnings
.iter()
.map(|warning| ConfigWarningNotification {
summary: warning.clone(),
details: None,
path: None,
range: None,
})
.collect();
let client = start_client(InProcessClientStartArgs {
arg0_paths,
config: Arc::new(config),
cli_overrides: cli_kv_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source: codex_protocol::protocol::SessionSource::Cli,
enable_codex_api_key_env: false,
client_name: "codex-tui".to_string(),
client_version: env!("CARGO_PKG_VERSION").to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await
.wrap_err("failed to start embedded app server")?;
Ok(client)
}
pub async fn run_main(
mut cli: Cli,
arg0_paths: Arg0DispatchPaths,
_loader_overrides: LoaderOverrides,
loader_overrides: LoaderOverrides,
) -> std::io::Result<AppExitInfo> {
let (sandbox_mode, approval_policy) = if cli.full_auto {
(
@@ -540,6 +611,8 @@ pub async fn run_main(
run_ratatui_app(
cli,
arg0_paths,
loader_overrides,
config,
overrides,
cli_kv_overrides,
@@ -553,6 +626,8 @@ pub async fn run_main(
#[allow(clippy::too_many_arguments)]
async fn run_ratatui_app(
cli: Cli,
arg0_paths: Arg0DispatchPaths,
loader_overrides: LoaderOverrides,
initial_config: Config,
overrides: ConfigOverrides,
cli_kv_overrides: Vec<(String, toml::Value)>,
@@ -950,10 +1025,27 @@ async fn run_ratatui_app(
let use_alt_screen = determine_alt_screen_mode(no_alt_screen, config.tui_alternate_screen);
tui.set_alt_screen_enabled(use_alt_screen);
let app_server = match start_embedded_app_server(
arg0_paths,
config.clone(),
cli_kv_overrides.clone(),
loader_overrides,
cloud_requirements.clone(),
feedback.clone(),
)
.await
{
Ok(app_server) => app_server,
Err(err) => {
restore();
session_log::log_session_end();
return Err(err);
}
};
let app_result = App::run(
&mut tui,
auth_manager,
app_server,
config,
cli_kv_overrides.clone(),
overrides.clone(),
@@ -1236,6 +1328,20 @@ mod tests {
.await
}
async fn start_test_embedded_app_server(
config: Config,
) -> color_eyre::Result<InProcessAppServerClient> {
start_embedded_app_server(
Arg0DispatchPaths::default(),
config,
Vec::new(),
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
)
.await
}
#[tokio::test]
#[serial]
async fn windows_shows_trust_prompt_without_sandbox() -> std::io::Result<()> {
@@ -1252,6 +1358,51 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn embedded_app_server_exposes_client_manager_accessors() -> color_eyre::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let app_server = start_test_embedded_app_server(config).await?;
assert!(Arc::ptr_eq(
&app_server.auth_manager(),
&app_server.auth_manager()
));
assert!(Arc::ptr_eq(
&app_server.thread_manager(),
&app_server.thread_manager()
));
app_server.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn embedded_app_server_start_failure_is_returned() -> color_eyre::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let result = start_embedded_app_server_with(
Arg0DispatchPaths::default(),
config,
Vec::new(),
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
|_args| async { Err(std::io::Error::other("boom")) },
)
.await;
let err = match result {
Ok(_) => panic!("startup failure should be returned"),
Err(err) => err,
};
assert!(
err.to_string()
.contains("failed to start embedded app server"),
"error should preserve the embedded app server startup context"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn windows_shows_trust_prompt_with_sandbox() -> std::io::Result<()> {

View File

@@ -89,17 +89,6 @@ impl App {
);
}
notification => {
if !app_server_client.is_remote()
&& matches!(
notification,
ServerNotification::TurnCompleted(_)
| ServerNotification::ThreadRealtimeItemAdded(_)
| ServerNotification::ThreadRealtimeOutputAudioDelta(_)
| ServerNotification::ThreadRealtimeError(_)
)
{
return;
}
if let Some((thread_id, events)) =
server_notification_thread_events(notification)
{
@@ -127,9 +116,6 @@ impl App {
AppServerEvent::LegacyNotification(notification) => {
if let Some((thread_id, event)) = legacy_thread_event(notification.params) {
self.pending_app_server_requests.note_legacy_event(&event);
if legacy_event_is_shadowed_by_server_notification(&event.msg) {
return;
}
if self.primary_thread_id.is_none()
|| matches!(event.msg, EventMsg::SessionConfigured(_))
&& self.primary_thread_id == Some(thread_id)
@@ -212,24 +198,6 @@ fn legacy_thread_event(params: Option<Value>) -> Option<(ThreadId, Event)> {
Some((thread_id, event))
}
fn legacy_event_is_shadowed_by_server_notification(msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::TokenCount(_)
| EventMsg::Error(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::TurnStarted(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationClosed(_)
)
}
fn server_notification_thread_events(
notification: ServerNotification,
) -> Option<(ThreadId, Vec<Event>)> {