Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Bolin
a4ec8ce2e5 stabilize zsh-fork approvals and resume --last 2026-03-28 14:10:08 -07:00
4 changed files with 251 additions and 16 deletions

View File

@@ -472,10 +472,15 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
first_file.display(),
second_file.display()
);
// Login shells can emit an extra approval for system startup helpers
// (for example `/usr/libexec/path_helper -s` on macOS) before the target
// `rm` subcommands. Give the command enough budget to exercise the full
// approval sequence on slower CI shards.
let tool_timeout_ms = 15_000;
let tool_call_arguments = serde_json::to_string(&serde_json::json!({
"command": shell_command,
"workdir": serde_json::Value::Null,
"timeout_ms": 5000
"timeout_ms": tool_timeout_ms
}))?;
let response = responses::sse(vec![
responses::ev_response_created("resp-1"),

View File

@@ -1099,18 +1099,12 @@ fn turn_items_for_thread(
.map(|turn| turn.items.clone())
}
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
fn resumable_thread_source_kinds() -> Vec<ThreadSourceKind> {
vec![
ThreadSourceKind::Cli,
ThreadSourceKind::VsCode,
ThreadSourceKind::Exec,
ThreadSourceKind::AppServer,
ThreadSourceKind::SubAgent,
ThreadSourceKind::SubAgentReview,
ThreadSourceKind::SubAgentCompact,
ThreadSourceKind::SubAgentThreadSpawn,
ThreadSourceKind::SubAgentOther,
ThreadSourceKind::Unknown,
]
}
@@ -1169,7 +1163,7 @@ async fn resolve_resume_thread_id(
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
source_kinds: Some(resumable_thread_source_kinds()),
archived: Some(false),
cwd: None,
search_term: None,
@@ -1209,7 +1203,7 @@ async fn resolve_resume_thread_id(
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
source_kinds: Some(resumable_thread_source_kinds()),
archived: Some(false),
cwd: None,
// Thread names are attached separately from rollout titles, so name
@@ -1898,6 +1892,19 @@ mod tests {
assert_eq!(resume_lookup_model_providers(&config, &named_args), None);
}
#[test]
fn resumable_thread_source_kinds_exclude_internal_threads() {
assert_eq!(
resumable_thread_source_kinds(),
vec![
ThreadSourceKind::Cli,
ThreadSourceKind::VsCode,
ThreadSourceKind::Exec,
ThreadSourceKind::AppServer,
]
);
}
#[test]
fn turn_items_for_thread_returns_matching_turn_items() {
let thread = AppServerThread {

View File

@@ -1,10 +1,17 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Context;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_utils_cargo_bin::find_resource;
use core_test_support::test_codex_exec::test_codex_exec;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::string::ToString;
use std::time::Duration;
use tempfile::TempDir;
use uuid::Uuid;
use walkdir::WalkDir;
@@ -220,6 +227,118 @@ fn exec_resume_last_accepts_prompt_after_flag_in_json_mode() -> anyhow::Result<(
Ok(())
}
#[test]
fn exec_resume_last_ignores_newer_internal_thread() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture = exec_fixture()?;
let repo_root = exec_repo_root()?;
let marker = format!("resume-last-visible-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
test.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg(&prompt)
.assert()
.success();
let sessions_dir = test.home_path().join("sessions");
let path = find_session_file_containing_marker(&sessions_dir, &marker)
.expect("no session file found after first run");
// `updated_at` is second-granularity, so make the injected internal thread
// deterministically newer than the visible exec session.
std::thread::sleep(Duration::from_millis(1100));
let internal_thread_id = Uuid::new_v4();
let internal_rollout_path = test.home_path().join("sessions/2026/03/27").join(format!(
"rollout-2026-03-27T00-00-00-{internal_thread_id}.jsonl"
));
std::fs::create_dir_all(
internal_rollout_path
.parent()
.expect("internal rollout parent directory"),
)?;
let internal_thread_id_str = internal_thread_id.to_string();
let internal_payload = serde_json::to_value(SessionMetaLine {
meta: SessionMeta {
id: ThreadId::from_string(&internal_thread_id_str)?,
forked_from_id: None,
timestamp: "2026-03-27T00:00:00.000Z".to_string(),
cwd: repo_root.clone(),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::SubAgent(SubAgentSource::MemoryConsolidation),
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
})?;
let internal_lines = [
json!({
"timestamp": "2026-03-27T00:00:00.000Z",
"type": "session_meta",
"payload": internal_payload,
})
.to_string(),
json!({
"timestamp": "2026-03-27T00:00:00.000Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "internal memory sweep"}],
},
})
.to_string(),
json!({
"timestamp": "2026-03-27T00:00:00.000Z",
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "internal memory sweep",
"kind": "plain",
},
})
.to_string(),
];
std::fs::write(&internal_rollout_path, internal_lines.join("\n") + "\n")?;
let marker2 = format!("resume-last-visible-2-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
test.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg(&prompt2)
.arg("resume")
.arg("--last")
.assert()
.success();
let resumed_path = find_session_file_containing_marker(&sessions_dir, &marker2)
.expect("no resumed session file containing marker2");
assert_eq!(
resumed_path, path,
"resume --last should ignore newer internal threads"
);
Ok(())
}
#[test]
fn exec_resume_last_respects_cwd_filter_and_all_flag() -> anyhow::Result<()> {
let test = test_codex_exec();

View File

@@ -33,17 +33,31 @@ fn duplicate_fd_for_transfer(fd: impl AsFd, name: &str) -> anyhow::Result<OwnedF
.with_context(|| format!("failed to duplicate {name} for escalation transfer"))
}
async fn connect_escalation_stream(
handshake_client: AsyncDatagramSocket,
) -> anyhow::Result<(AsyncSocket, OwnedFd)> {
let (server, client) = AsyncSocket::pair()?;
let server_stream_guard: OwnedFd = server.into_inner().into();
let transferred_server_stream =
duplicate_fd_for_transfer(&server_stream_guard, "handshake stream")?;
const HANDSHAKE_MESSAGE: [u8; 1] = [0];
// Keep one local reference to the transferred stream alive until the server
// answers the first request. On macOS, dropping the sender's last local copy
// immediately after the datagram handshake can make the peer observe EOF
// before the received fd is fully servicing the stream.
handshake_client
.send_with_fds(&HANDSHAKE_MESSAGE, &[transferred_server_stream])
.await
.context("failed to send handshake datagram")?;
Ok((client, server_stream_guard))
}
pub async fn run_shell_escalation_execve_wrapper(
file: String,
argv: Vec<String>,
) -> anyhow::Result<i32> {
let handshake_client = get_escalate_client()?;
let (server, client) = AsyncSocket::pair()?;
const HANDSHAKE_MESSAGE: [u8; 1] = [0];
handshake_client
.send_with_fds(&HANDSHAKE_MESSAGE, &[server.into_inner().into()])
.await
.context("failed to send handshake datagram")?;
let (client, server_stream_guard) = connect_escalation_stream(handshake_client).await?;
let env = std::env::vars()
.filter(|(k, _)| !matches!(k.as_str(), ESCALATE_SOCKET_ENV_VAR | EXEC_WRAPPER_ENV_VAR))
.collect();
@@ -56,6 +70,11 @@ pub async fn run_shell_escalation_execve_wrapper(
})
.await
.context("failed to send EscalateRequest")?;
// Once the first request has been written into the stream, the local guard
// is no longer needed to bridge the datagram handoff. Dropping it here
// lets client-side reads still observe EOF if the server exits before
// replying.
drop(server_stream_guard);
let message = client
.receive::<EscalateResponse>()
.await
@@ -128,6 +147,12 @@ mod tests {
use super::*;
use std::os::fd::AsRawFd;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::time::sleep;
use tokio::time::timeout;
#[test]
fn duplicate_fd_for_transfer_does_not_close_original() {
@@ -141,4 +166,83 @@ mod tests {
assert_ne!(unsafe { libc::fcntl(original_fd, libc::F_GETFD) }, -1);
}
#[tokio::test]
async fn connect_escalation_stream_keeps_sender_alive_until_first_request_write()
-> anyhow::Result<()> {
let (server_datagram, client_datagram) = AsyncDatagramSocket::pair()?;
let client_task = tokio::spawn(async move {
let (client_stream, server_stream_guard) =
connect_escalation_stream(client_datagram).await?;
let guard_fd = server_stream_guard.as_raw_fd();
assert_ne!(unsafe { libc::fcntl(guard_fd, libc::F_GETFD) }, -1);
client_stream
.send(EscalateRequest {
file: PathBuf::from("/bin/echo"),
argv: vec!["echo".to_string(), "hello".to_string()],
workdir: AbsolutePathBuf::current_dir()?,
env: Default::default(),
})
.await?;
drop(server_stream_guard);
assert_eq!(-1, unsafe { libc::fcntl(guard_fd, libc::F_GETFD) });
let response = client_stream.receive::<EscalateResponse>().await?;
Ok::<EscalateResponse, anyhow::Error>(response)
});
let (_, mut fds) = server_datagram.receive_with_fds().await?;
assert_eq!(fds.len(), 1);
sleep(Duration::from_millis(20)).await;
let server_stream = AsyncSocket::from_fd(fds.remove(0))?;
let request = server_stream.receive::<EscalateRequest>().await?;
assert_eq!(request.file, PathBuf::from("/bin/echo"));
assert_eq!(request.argv, vec!["echo".to_string(), "hello".to_string()]);
let expected = EscalateResponse {
action: EscalateAction::Deny {
reason: Some("not now".to_string()),
},
};
server_stream.send(expected.clone()).await?;
let response = client_task.await??;
assert_eq!(response, expected);
Ok(())
}
#[tokio::test]
async fn dropping_guard_after_request_write_preserves_server_eof() -> anyhow::Result<()> {
let (server_datagram, client_datagram) = AsyncDatagramSocket::pair()?;
let client_task = tokio::spawn(async move {
let (client_stream, server_stream_guard) =
connect_escalation_stream(client_datagram).await?;
client_stream
.send(EscalateRequest {
file: PathBuf::from("/bin/echo"),
argv: vec!["echo".to_string()],
workdir: AbsolutePathBuf::current_dir()?,
env: Default::default(),
})
.await?;
drop(server_stream_guard);
let err = timeout(
Duration::from_millis(250),
client_stream.receive::<EscalateResponse>(),
)
.await
.expect("server close should not hang the client")
.expect_err("expected EOF after server closes without replying");
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
Ok::<(), anyhow::Error>(())
});
let (_, mut fds) = server_datagram.receive_with_fds().await?;
assert_eq!(fds.len(), 1);
let server_stream = AsyncSocket::from_fd(fds.remove(0))?;
let request = server_stream.receive::<EscalateRequest>().await?;
assert_eq!(request.file, PathBuf::from("/bin/echo"));
drop(server_stream);
client_task.await??;
Ok(())
}
}