From 7134220f3c1a5119cc0ea775b1807fe186b8a614 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Tue, 3 Mar 2026 21:44:52 -0800 Subject: [PATCH] core: box wrapper futures to reduce stack pressure (#13429) Follow-up to [#13388](https://github.com/openai/codex/pull/13388). This uses the same general fix pattern as [#12421](https://github.com/openai/codex/pull/12421), but in the `codex-core` compact/resume/fork path. ## Why `compact_resume_after_second_compaction_preserves_history` started overflowing the stack on Windows CI after `#13388`. The important part is that this was not a compaction-recursion bug. The test exercises a path with several thin `async fn` wrappers around much larger thread-spawn, resume, and fork futures. When one `async fn` awaits another inline, the outer future stores the callee future as part of its own state machine. In a long wrapper chain, that means a caller can accidentally inline a lot more state than the source code suggests. That is exactly what was happening here: - `ThreadManager` convenience methods such as `start_thread`, `resume_thread_from_rollout`, and `fork_thread` were inlining the larger spawn/resume futures beneath them. - `core_test_support::test_codex` added another wrapper layer on top of those same paths. - `compact_resume_fork` adds a few more helpers, and this particular test drives the resume/fork path multiple times. On Windows, that was enough to push both the libtest thread and Tokio worker threads over the edge. The previous 8 MiB test-thread workaround proved the failure was stack-related, but it did not address the underlying future size. ## How This Was Debugged The useful debugging pattern here was to turn the CI-only failure into a local low-stack repro. 1. First, remove the explicit large-stack harness so the test runs on the normal `#[tokio::test]` path. 2. Build the test binary normally. 3. Re-run the already-built `tests/all` binary directly with progressively smaller `RUST_MIN_STACK` values. Running the built binary directly matters: it keeps the reduced stack size focused on the test process instead of also applying it to `cargo` and `rustc`. That made it possible to answer two questions quickly: - Does the failure still reproduce without the workaround? Yes. - Does boxing the wrapper futures actually buy back stack headroom? Also yes. After this change, the built test binary passes with `RUST_MIN_STACK=917504` and still overflows at `786432`, which is enough evidence to justify removing the explicit 8 MiB override while keeping a deterministic low-stack repro for future debugging. If we hit a similar issue again, the first places to inspect are thin `async fn` wrappers that mostly forward into a much larger async implementation. ## `Box::pin()` Primer `async fn` compiles into a state machine. If a wrapper does this: ```rust async fn wrapper() { inner().await; } ``` then `wrapper()` stores the full `inner()` future inline as part of its own state. If the wrapper instead does this: ```rust async fn wrapper() { Box::pin(inner()).await; } ``` then the child future lives on the heap, and the outer future only stores a pinned pointer to it. That usually trades one allocation for a substantially smaller outer future, which is exactly the tradeoff we want when the problem is stack pressure rather than raw CPU time. Useful references: - [`Box::pin`](https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin) - [Async book: Pinning](https://rust-lang.github.io/async-book/04_pinning/01_chapter.html) ## What Changed - Boxed the wrapper futures in `core/src/thread_manager.rs` around `start_thread`, `resume_thread_from_rollout`, `fork_thread`, and the corresponding `ThreadManagerState` spawn helpers so callers no longer inline the full spawn/resume state machine through multiple layers. - Boxed the matching test-only wrapper futures in `core/tests/common/test_codex.rs` and `core/tests/suite/compact_resume_fork.rs`, which sit directly on top of the same path. - Restored `compact_resume_after_second_compaction_preserves_history` in `core/tests/suite/compact_resume_fork.rs` to a normal `#[tokio::test]` and removed the explicit `TEST_STACK_SIZE_BYTES` thread/runtime sizing. - Simplified a tiny helper in `compact_resume_fork` by making `fetch_conversation_path()` synchronous, which removes one more unnecessary future layer from the test path. ## Verification - `cargo test -p codex-core --test all suite::compact_resume_fork::compact_resume_after_second_compaction_preserves_history -- --exact --nocapture` - `cargo test -p codex-core --test all suite::compact_resume_fork -- --nocapture` - Re-ran the built `codex-core` `tests/all` binary directly with reduced stack sizes: - `RUST_MIN_STACK=917504` passes - `RUST_MIN_STACK=786432` still overflows - `cargo test -p codex-core` - Still fails locally in unrelated existing integration areas that expect the `codex` / `test_stdio_server` binaries or hit the existing `search_tool` wiremock mismatches. --- codex-rs/core/src/thread_manager.rs | 94 +++++++++---------- codex-rs/core/tests/common/test_codex.rs | 25 ++--- .../core/tests/suite/compact_resume_fork.rs | 49 +++------- 3 files changed, 72 insertions(+), 96 deletions(-) diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 2723ad955c..68ccae1776 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -312,8 +312,9 @@ impl ThreadManager { } pub async fn start_thread(&self, config: Config) -> CodexResult { - self.start_thread_with_tools(config, Vec::new(), false) - .await + // Box delegated thread-spawn futures so these convenience wrappers do + // not inline the full spawn path into every caller's async state. + Box::pin(self.start_thread_with_tools(config, Vec::new(), false)).await } pub async fn start_thread_with_tools( @@ -322,12 +323,12 @@ impl ThreadManager { dynamic_tools: Vec, persist_extended_history: bool, ) -> CodexResult { - self.start_thread_with_tools_and_service_name( + Box::pin(self.start_thread_with_tools_and_service_name( config, dynamic_tools, persist_extended_history, None, - ) + )) .await } @@ -338,17 +339,16 @@ impl ThreadManager { persist_extended_history: bool, metrics_service_name: Option, ) -> CodexResult { - self.state - .spawn_thread( - config, - InitialHistory::New, - Arc::clone(&self.state.auth_manager), - self.agent_control(), - dynamic_tools, - persist_extended_history, - metrics_service_name, - ) - .await + Box::pin(self.state.spawn_thread( + config, + InitialHistory::New, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + dynamic_tools, + persist_extended_history, + metrics_service_name, + )) + .await } pub async fn resume_thread_from_rollout( @@ -358,7 +358,7 @@ impl ThreadManager { auth_manager: Arc, ) -> CodexResult { let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; - self.resume_thread_with_history(config, initial_history, auth_manager, false) + Box::pin(self.resume_thread_with_history(config, initial_history, auth_manager, false)) .await } @@ -369,17 +369,16 @@ impl ThreadManager { auth_manager: Arc, persist_extended_history: bool, ) -> CodexResult { - self.state - .spawn_thread( - config, - initial_history, - auth_manager, - self.agent_control(), - Vec::new(), - persist_extended_history, - None, - ) - .await + Box::pin(self.state.spawn_thread( + config, + initial_history, + auth_manager, + self.agent_control(), + Vec::new(), + persist_extended_history, + None, + )) + .await } /// Removes the thread from the manager's internal map, though the thread is stored @@ -411,17 +410,16 @@ impl ThreadManager { ) -> CodexResult { let history = RolloutRecorder::get_rollout_history(&path).await?; let history = truncate_before_nth_user_message(history, nth_user_message); - self.state - .spawn_thread( - config, - history, - Arc::clone(&self.state.auth_manager), - self.agent_control(), - Vec::new(), - persist_extended_history, - None, - ) - .await + Box::pin(self.state.spawn_thread( + config, + history, + Arc::clone(&self.state.auth_manager), + self.agent_control(), + Vec::new(), + persist_extended_history, + None, + )) + .await } pub(crate) fn agent_control(&self) -> AgentControl { @@ -474,14 +472,14 @@ impl ThreadManagerState { config: Config, agent_control: AgentControl, ) -> CodexResult { - self.spawn_new_thread_with_source( + Box::pin(self.spawn_new_thread_with_source( config, agent_control, self.session_source.clone(), false, None, None, - ) + )) .await } @@ -494,7 +492,7 @@ impl ThreadManagerState { metrics_service_name: Option, inherited_shell_snapshot: Option>, ) -> CodexResult { - self.spawn_thread_with_source( + Box::pin(self.spawn_thread_with_source( config, InitialHistory::New, Arc::clone(&self.auth_manager), @@ -504,7 +502,7 @@ impl ThreadManagerState { persist_extended_history, metrics_service_name, inherited_shell_snapshot, - ) + )) .await } @@ -517,7 +515,7 @@ impl ThreadManagerState { inherited_shell_snapshot: Option>, ) -> CodexResult { let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; - self.spawn_thread_with_source( + Box::pin(self.spawn_thread_with_source( config, initial_history, Arc::clone(&self.auth_manager), @@ -527,7 +525,7 @@ impl ThreadManagerState { false, None, inherited_shell_snapshot, - ) + )) .await } @@ -540,7 +538,7 @@ impl ThreadManagerState { persist_extended_history: bool, inherited_shell_snapshot: Option>, ) -> CodexResult { - self.spawn_thread_with_source( + Box::pin(self.spawn_thread_with_source( config, initial_history, Arc::clone(&self.auth_manager), @@ -550,7 +548,7 @@ impl ThreadManagerState { persist_extended_history, None, inherited_shell_snapshot, - ) + )) .await } @@ -566,7 +564,7 @@ impl ThreadManagerState { persist_extended_history: bool, metrics_service_name: Option, ) -> CodexResult { - self.spawn_thread_with_source( + Box::pin(self.spawn_thread_with_source( config, initial_history, auth_manager, @@ -576,7 +574,7 @@ impl ThreadManagerState { persist_extended_history, metrics_service_name, None, - ) + )) .await } diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index a79838df00..7bd5763d19 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -105,7 +105,7 @@ impl TestCodexBuilder { Some(home) => home, None => Arc::new(TempDir::new()?), }; - self.build_with_home(server, home, None).await + Box::pin(self.build_with_home(server, home, None)).await } pub async fn build_with_streaming_server( @@ -117,8 +117,7 @@ impl TestCodexBuilder { Some(home) => home, None => Arc::new(TempDir::new()?), }; - self.build_with_home_and_base_url(format!("{base_url}/v1"), home, None) - .await + Box::pin(self.build_with_home_and_base_url(format!("{base_url}/v1"), home, None)).await } pub async fn build_with_websocket_server( @@ -139,8 +138,7 @@ impl TestCodexBuilder { .enable(Feature::ResponsesWebsockets) .expect("test config should allow feature update"); })); - self.build_with_home_and_base_url(base_url, home, None) - .await + Box::pin(self.build_with_home_and_base_url(base_url, home, None)).await } pub async fn resume( @@ -149,7 +147,7 @@ impl TestCodexBuilder { home: Arc, rollout_path: PathBuf, ) -> anyhow::Result { - self.build_with_home(server, home, Some(rollout_path)).await + Box::pin(self.build_with_home(server, home, Some(rollout_path))).await } async fn build_with_home( @@ -160,7 +158,7 @@ impl TestCodexBuilder { ) -> anyhow::Result { let base_url = format!("{}/v1", server.uri()); let (config, cwd) = self.prepare_config(base_url, &home).await?; - self.build_from_config(config, cwd, home, resume_from).await + Box::pin(self.build_from_config(config, cwd, home, resume_from)).await } async fn build_with_home_and_base_url( @@ -170,7 +168,7 @@ impl TestCodexBuilder { resume_from: Option, ) -> anyhow::Result { let (config, cwd) = self.prepare_config(base_url, &home).await?; - self.build_from_config(config, cwd, home, resume_from).await + Box::pin(self.build_from_config(config, cwd, home, resume_from)).await } async fn build_from_config( @@ -201,11 +199,14 @@ impl TestCodexBuilder { let new_conversation = match resume_from { Some(path) => { let auth_manager = codex_core::test_support::auth_manager_from_auth(auth); - thread_manager - .resume_thread_from_rollout(config.clone(), path, auth_manager) - .await? + Box::pin(thread_manager.resume_thread_from_rollout( + config.clone(), + path, + auth_manager, + )) + .await? } - None => thread_manager.start_thread(config.clone()).await?, + None => Box::pin(thread_manager.start_thread(config.clone())).await?, }; Ok(TestCodex { diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index fe536077a3..be79c11708 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -157,7 +157,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn(&base, "hello world").await; compact_conversation(&base).await; user_turn(&base, "AFTER_COMPACT").await; - let base_path = fetch_conversation_path(&base).await; + let base_path = fetch_conversation_path(&base); assert!( base_path.exists(), "compact+resume test expects base path {base_path:?} to exist", @@ -165,7 +165,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { let resumed = resume_conversation(&manager, &config, base_path).await; user_turn(&resumed, "AFTER_RESUME").await; - let resumed_path = fetch_conversation_path(&resumed).await; + let resumed_path = fetch_conversation_path(&resumed); assert!( resumed_path.exists(), "compact+resume test expects resumed path {resumed_path:?} to exist", @@ -292,33 +292,10 @@ async fn compact_resume_and_fork_preserve_model_history_view() { assert_eq!(requests.len(), 5); } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Scenario: after the forked branch is compacted, resuming again should reuse /// the compacted history and only append the new user message. -fn compact_resume_after_second_compaction_preserves_history() -> Result<()> { - const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024; - - let handle = std::thread::Builder::new() - .name("compact_resume_after_second_compaction_preserves_history".to_string()) - .stack_size(TEST_STACK_SIZE_BYTES) - .spawn(|| -> Result<()> { - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .thread_stack_size(TEST_STACK_SIZE_BYTES) - .enable_all() - .build()?; - runtime.block_on(compact_resume_after_second_compaction_preserves_history_impl()) - })?; - - match handle.join() { - Ok(result) => result, - Err(_) => Err(anyhow::anyhow!( - "compact_resume_after_second_compaction_preserves_history thread panicked" - )), - } -} - -async fn compact_resume_after_second_compaction_preserves_history_impl() -> Result<()> { +async fn compact_resume_after_second_compaction_preserves_history() -> Result<()> { if network_disabled() { println!("Skipping test because network is disabled in this sandbox"); return Ok(()); @@ -335,7 +312,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu user_turn(&base, "hello world").await; compact_conversation(&base).await; user_turn(&base, "AFTER_COMPACT").await; - let base_path = fetch_conversation_path(&base).await; + let base_path = fetch_conversation_path(&base); assert!( base_path.exists(), "second compact test expects base path {base_path:?} to exist", @@ -343,7 +320,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu let resumed = resume_conversation(&manager, &config, base_path).await; user_turn(&resumed, "AFTER_RESUME").await; - let resumed_path = fetch_conversation_path(&resumed).await; + let resumed_path = fetch_conversation_path(&resumed); assert!( resumed_path.exists(), "second compact test expects resumed path {resumed_path:?} to exist", @@ -354,7 +331,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu compact_conversation(&forked).await; user_turn(&forked, "AFTER_COMPACT_2").await; - let forked_path = fetch_conversation_path(&forked).await; + let forked_path = fetch_conversation_path(&forked); assert!( forked_path.exists(), "second compact test expects forked path {forked_path:?} to exist", @@ -558,7 +535,9 @@ async fn start_test_conversation( config.model = Some(model); } }); - let test = builder.build(server).await.expect("create conversation"); + let test = Box::pin(builder.build(server)) + .await + .expect("create conversation"); (test.home, test.config, test.thread_manager, test.codex) } @@ -595,7 +574,7 @@ async fn compact_conversation(conversation: &Arc) { wait_for_event(conversation, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; } -async fn fetch_conversation_path(conversation: &Arc) -> std::path::PathBuf { +fn fetch_conversation_path(conversation: &Arc) -> std::path::PathBuf { conversation.rollout_path().expect("rollout path") } @@ -607,8 +586,7 @@ async fn resume_conversation( let auth_manager = codex_core::test_support::auth_manager_from_auth( codex_core::CodexAuth::from_api_key("dummy"), ); - manager - .resume_thread_from_rollout(config.clone(), path, auth_manager) + Box::pin(manager.resume_thread_from_rollout(config.clone(), path, auth_manager)) .await .expect("resume conversation") .thread @@ -621,8 +599,7 @@ async fn fork_thread( path: std::path::PathBuf, nth_user_message: usize, ) -> Arc { - manager - .fork_thread(nth_user_message, config.clone(), path, false) + Box::pin(manager.fork_thread(nth_user_message, config.clone(), path, false)) .await .expect("fork conversation") .thread