diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 6c434653f9..5ad3971a73 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1101,7 +1101,6 @@ pub struct ExecRunParams { pub base_instructions: Option, pub developer_instructions: Option, pub output_schema: Option, - pub ephemeral: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index a9f464a262..1f19e9c09c 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -396,6 +396,7 @@ Run a single turn to completion without subscribing to streamed turn/item events Notes: - `exec/run` is best for one-off utilities (for example, generating a title) where you just need the final result. +- `exec/run` always runs ephemerally, so it does not create a rollout and will not appear in `thread/list`. - Use `turn/start` when you want streaming events, intermediate items, or a long-lived thread. ### Example: One-off command execution diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 6b12796c14..3fe24df3b1 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -211,7 +211,7 @@ type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>; pub(crate) type PendingInterrupts = Arc>>; pub(crate) type PendingRollbacks = Arc>>; -pub(crate) type AutoAttachExclusions = Arc>>; +pub(crate) type ThreadIdsToSkipListenerAttachment = Arc>>; /// Per-conversation accumulation of the latest states e.g. error message while a turn runs. #[derive(Default, Clone)] @@ -259,7 +259,9 @@ pub(crate) struct CodexMessageProcessor { // Queue of pending rollback requests per conversation. We reply when ThreadRollback arrives. pending_rollbacks: PendingRollbacks, turn_summary_store: TurnSummaryStore, - auto_attach_exclusions: AutoAttachExclusions, + // `exec/run` consumes events directly; background listeners would drain the + // stream and prevent it from detecting completion. + thread_ids_to_skip_listener_attachment: ThreadIdsToSkipListenerAttachment, pending_fuzzy_searches: Arc>>>, feedback: CodexFeedback, } @@ -316,7 +318,7 @@ impl CodexMessageProcessor { pending_interrupts: Arc::new(Mutex::new(HashMap::new())), pending_rollbacks: Arc::new(Mutex::new(HashMap::new())), turn_summary_store: Arc::new(Mutex::new(HashMap::new())), - auto_attach_exclusions: Arc::new(Mutex::new(HashSet::new())), + thread_ids_to_skip_listener_attachment: Arc::new(Mutex::new(HashSet::new())), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), feedback, } @@ -1419,7 +1421,7 @@ impl CodexMessageProcessor { params.developer_instructions, params.personality, ); - typesafe_overrides.ephemeral = Some(params.ephemeral.unwrap_or(true)); + typesafe_overrides.ephemeral = Some(true); let config = match derive_config_from_params(&self.cli_overrides, params.config, typesafe_overrides) @@ -1454,10 +1456,14 @@ impl CodexMessageProcessor { thread_id, thread, .. } = new_thread; - let auto_attach_exclusions = self.auto_attach_exclusions.clone(); + let thread_ids_to_skip_listener_attachment = + self.thread_ids_to_skip_listener_attachment.clone(); let thread_id_for_turn = thread_id.clone(); let thread_id_for_remove = thread_id.clone(); - auto_attach_exclusions.lock().await.insert(thread_id); + thread_ids_to_skip_listener_attachment + .lock() + .await + .insert(thread_id); let response_result: Result = async { let has_turn_overrides = params.effort.is_some() @@ -1501,10 +1507,11 @@ impl CodexMessageProcessor { } .await; - let auto_attach_exclusions_for_cleanup = auto_attach_exclusions.clone(); + let thread_ids_to_skip_listener_attachment_for_cleanup = + thread_ids_to_skip_listener_attachment.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; - auto_attach_exclusions_for_cleanup + thread_ids_to_skip_listener_attachment_for_cleanup .lock() .await .remove(&thread_id_for_remove); @@ -2089,7 +2096,7 @@ impl CodexMessageProcessor { /// Best-effort: attach a listener for thread_id if missing. pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) { if self - .auto_attach_exclusions + .thread_ids_to_skip_listener_attachment .lock() .await .contains(&thread_id) diff --git a/codex-rs/app-server/tests/suite/v2/exec_run.rs b/codex-rs/app-server/tests/suite/v2/exec_run.rs index 2646d14a13..8b4432d668 100644 --- a/codex-rs/app-server/tests/suite/v2/exec_run.rs +++ b/codex-rs/app-server/tests/suite/v2/exec_run.rs @@ -50,7 +50,6 @@ async fn exec_run_completes_turn_and_returns_final_message() -> Result<()> { base_instructions: None, developer_instructions: None, output_schema: None, - ephemeral: None, }) .await?; @@ -99,7 +98,6 @@ async fn exec_run_rejects_empty_input() -> Result<()> { base_instructions: None, developer_instructions: None, output_schema: None, - ephemeral: None, }) .await?;