core: box wrapper futures to reduce stack pressure (#13429)

Follow-up to [#13388](https://github.com/openai/codex/pull/13388). This
uses the same general fix pattern as
[#12421](https://github.com/openai/codex/pull/12421), but in the
`codex-core` compact/resume/fork path.

## Why

`compact_resume_after_second_compaction_preserves_history` started
overflowing the stack on Windows CI after `#13388`.

The important part is that this was not a compaction-recursion bug. The
test exercises a path with several thin `async fn` wrappers around much
larger thread-spawn, resume, and fork futures. When one `async fn`
awaits another inline, the outer future stores the callee future as part
of its own state machine. In a long wrapper chain, that means a caller
can accidentally inline a lot more state than the source code suggests.

That is exactly what was happening here:

- `ThreadManager` convenience methods such as `start_thread`,
`resume_thread_from_rollout`, and `fork_thread` were inlining the larger
spawn/resume futures beneath them.
- `core_test_support::test_codex` added another wrapper layer on top of
those same paths.
- `compact_resume_fork` adds a few more helpers, and this particular
test drives the resume/fork path multiple times.

On Windows, that was enough to push both the libtest thread and Tokio
worker threads over the edge. The previous 8 MiB test-thread workaround
proved the failure was stack-related, but it did not address the
underlying future size.

## How This Was Debugged

The useful debugging pattern here was to turn the CI-only failure into a
local low-stack repro.

1. First, remove the explicit large-stack harness so the test runs on
the normal `#[tokio::test]` path.
2. Build the test binary normally.
3. Re-run the already-built `tests/all` binary directly with
progressively smaller `RUST_MIN_STACK` values.

Running the built binary directly matters: it keeps the reduced stack
size focused on the test process instead of also applying it to `cargo`
and `rustc`.

That made it possible to answer two questions quickly:

- Does the failure still reproduce without the workaround? Yes.
- Does boxing the wrapper futures actually buy back stack headroom? Also
yes.

After this change, the built test binary passes with
`RUST_MIN_STACK=917504` and still overflows at `786432`, which is enough
evidence to justify removing the explicit 8 MiB override while keeping a
deterministic low-stack repro for future debugging.

If we hit a similar issue again, the first places to inspect are thin
`async fn` wrappers that mostly forward into a much larger async
implementation.

## `Box::pin()` Primer

`async fn` compiles into a state machine. If a wrapper does this:

```rust
async fn wrapper() {
    inner().await;
}
```

then `wrapper()` stores the full `inner()` future inline as part of its
own state.

If the wrapper instead does this:

```rust
async fn wrapper() {
    Box::pin(inner()).await;
}
```

then the child future lives on the heap, and the outer future only
stores a pinned pointer to it. That usually trades one allocation for a
substantially smaller outer future, which is exactly the tradeoff we
want when the problem is stack pressure rather than raw CPU time.

Useful references:

-
[`Box::pin`](https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin)
- [Async book:
Pinning](https://rust-lang.github.io/async-book/04_pinning/01_chapter.html)

## What Changed

- Boxed the wrapper futures in `core/src/thread_manager.rs` around
`start_thread`, `resume_thread_from_rollout`, `fork_thread`, and the
corresponding `ThreadManagerState` spawn helpers so callers no longer
inline the full spawn/resume state machine through multiple layers.
- Boxed the matching test-only wrapper futures in
`core/tests/common/test_codex.rs` and
`core/tests/suite/compact_resume_fork.rs`, which sit directly on top of
the same path.
- Restored `compact_resume_after_second_compaction_preserves_history` in
`core/tests/suite/compact_resume_fork.rs` to a normal `#[tokio::test]`
and removed the explicit `TEST_STACK_SIZE_BYTES` thread/runtime sizing.
- Simplified a tiny helper in `compact_resume_fork` by making
`fetch_conversation_path()` synchronous, which removes one more
unnecessary future layer from the test path.

## Verification

- `cargo test -p codex-core --test all
suite::compact_resume_fork::compact_resume_after_second_compaction_preserves_history
-- --exact --nocapture`
- `cargo test -p codex-core --test all suite::compact_resume_fork --
--nocapture`
- Re-ran the built `codex-core` `tests/all` binary directly with reduced
stack sizes:
  - `RUST_MIN_STACK=917504` passes
  - `RUST_MIN_STACK=786432` still overflows
- `cargo test -p codex-core`
- Still fails locally in unrelated existing integration areas that
expect the `codex` / `test_stdio_server` binaries or hit the existing
`search_tool` wiremock mismatches.
This commit is contained in:
Michael Bolin
2026-03-03 21:44:52 -08:00
committed by GitHub
parent d622bff384
commit 7134220f3c
3 changed files with 72 additions and 96 deletions

View File

@@ -312,8 +312,9 @@ impl ThreadManager {
} }
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> { pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
self.start_thread_with_tools(config, Vec::new(), false) // Box delegated thread-spawn futures so these convenience wrappers do
.await // not inline the full spawn path into every caller's async state.
Box::pin(self.start_thread_with_tools(config, Vec::new(), false)).await
} }
pub async fn start_thread_with_tools( pub async fn start_thread_with_tools(
@@ -322,12 +323,12 @@ impl ThreadManager {
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>, dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool, persist_extended_history: bool,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.start_thread_with_tools_and_service_name( Box::pin(self.start_thread_with_tools_and_service_name(
config, config,
dynamic_tools, dynamic_tools,
persist_extended_history, persist_extended_history,
None, None,
) ))
.await .await
} }
@@ -338,17 +339,16 @@ impl ThreadManager {
persist_extended_history: bool, persist_extended_history: bool,
metrics_service_name: Option<String>, metrics_service_name: Option<String>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.state Box::pin(self.state.spawn_thread(
.spawn_thread( config,
config, InitialHistory::New,
InitialHistory::New, Arc::clone(&self.state.auth_manager),
Arc::clone(&self.state.auth_manager), self.agent_control(),
self.agent_control(), dynamic_tools,
dynamic_tools, persist_extended_history,
persist_extended_history, metrics_service_name,
metrics_service_name, ))
) .await
.await
} }
pub async fn resume_thread_from_rollout( pub async fn resume_thread_from_rollout(
@@ -358,7 +358,7 @@ impl ThreadManager {
auth_manager: Arc<AuthManager>, auth_manager: Arc<AuthManager>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.resume_thread_with_history(config, initial_history, auth_manager, false) Box::pin(self.resume_thread_with_history(config, initial_history, auth_manager, false))
.await .await
} }
@@ -369,17 +369,16 @@ impl ThreadManager {
auth_manager: Arc<AuthManager>, auth_manager: Arc<AuthManager>,
persist_extended_history: bool, persist_extended_history: bool,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.state Box::pin(self.state.spawn_thread(
.spawn_thread( config,
config, initial_history,
initial_history, auth_manager,
auth_manager, self.agent_control(),
self.agent_control(), Vec::new(),
Vec::new(), persist_extended_history,
persist_extended_history, None,
None, ))
) .await
.await
} }
/// Removes the thread from the manager's internal map, though the thread is stored /// Removes the thread from the manager's internal map, though the thread is stored
@@ -411,17 +410,16 @@ impl ThreadManager {
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
let history = RolloutRecorder::get_rollout_history(&path).await?; let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message); let history = truncate_before_nth_user_message(history, nth_user_message);
self.state Box::pin(self.state.spawn_thread(
.spawn_thread( config,
config, history,
history, Arc::clone(&self.state.auth_manager),
Arc::clone(&self.state.auth_manager), self.agent_control(),
self.agent_control(), Vec::new(),
Vec::new(), persist_extended_history,
persist_extended_history, None,
None, ))
) .await
.await
} }
pub(crate) fn agent_control(&self) -> AgentControl { pub(crate) fn agent_control(&self) -> AgentControl {
@@ -474,14 +472,14 @@ impl ThreadManagerState {
config: Config, config: Config,
agent_control: AgentControl, agent_control: AgentControl,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.spawn_new_thread_with_source( Box::pin(self.spawn_new_thread_with_source(
config, config,
agent_control, agent_control,
self.session_source.clone(), self.session_source.clone(),
false, false,
None, None,
None, None,
) ))
.await .await
} }
@@ -494,7 +492,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>, metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>, inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.spawn_thread_with_source( Box::pin(self.spawn_thread_with_source(
config, config,
InitialHistory::New, InitialHistory::New,
Arc::clone(&self.auth_manager), Arc::clone(&self.auth_manager),
@@ -504,7 +502,7 @@ impl ThreadManagerState {
persist_extended_history, persist_extended_history,
metrics_service_name, metrics_service_name,
inherited_shell_snapshot, inherited_shell_snapshot,
) ))
.await .await
} }
@@ -517,7 +515,7 @@ impl ThreadManagerState {
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>, inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.spawn_thread_with_source( Box::pin(self.spawn_thread_with_source(
config, config,
initial_history, initial_history,
Arc::clone(&self.auth_manager), Arc::clone(&self.auth_manager),
@@ -527,7 +525,7 @@ impl ThreadManagerState {
false, false,
None, None,
inherited_shell_snapshot, inherited_shell_snapshot,
) ))
.await .await
} }
@@ -540,7 +538,7 @@ impl ThreadManagerState {
persist_extended_history: bool, persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>, inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.spawn_thread_with_source( Box::pin(self.spawn_thread_with_source(
config, config,
initial_history, initial_history,
Arc::clone(&self.auth_manager), Arc::clone(&self.auth_manager),
@@ -550,7 +548,7 @@ impl ThreadManagerState {
persist_extended_history, persist_extended_history,
None, None,
inherited_shell_snapshot, inherited_shell_snapshot,
) ))
.await .await
} }
@@ -566,7 +564,7 @@ impl ThreadManagerState {
persist_extended_history: bool, persist_extended_history: bool,
metrics_service_name: Option<String>, metrics_service_name: Option<String>,
) -> CodexResult<NewThread> { ) -> CodexResult<NewThread> {
self.spawn_thread_with_source( Box::pin(self.spawn_thread_with_source(
config, config,
initial_history, initial_history,
auth_manager, auth_manager,
@@ -576,7 +574,7 @@ impl ThreadManagerState {
persist_extended_history, persist_extended_history,
metrics_service_name, metrics_service_name,
None, None,
) ))
.await .await
} }

View File

@@ -105,7 +105,7 @@ impl TestCodexBuilder {
Some(home) => home, Some(home) => home,
None => Arc::new(TempDir::new()?), None => Arc::new(TempDir::new()?),
}; };
self.build_with_home(server, home, None).await Box::pin(self.build_with_home(server, home, None)).await
} }
pub async fn build_with_streaming_server( pub async fn build_with_streaming_server(
@@ -117,8 +117,7 @@ impl TestCodexBuilder {
Some(home) => home, Some(home) => home,
None => Arc::new(TempDir::new()?), None => Arc::new(TempDir::new()?),
}; };
self.build_with_home_and_base_url(format!("{base_url}/v1"), home, None) Box::pin(self.build_with_home_and_base_url(format!("{base_url}/v1"), home, None)).await
.await
} }
pub async fn build_with_websocket_server( pub async fn build_with_websocket_server(
@@ -139,8 +138,7 @@ impl TestCodexBuilder {
.enable(Feature::ResponsesWebsockets) .enable(Feature::ResponsesWebsockets)
.expect("test config should allow feature update"); .expect("test config should allow feature update");
})); }));
self.build_with_home_and_base_url(base_url, home, None) Box::pin(self.build_with_home_and_base_url(base_url, home, None)).await
.await
} }
pub async fn resume( pub async fn resume(
@@ -149,7 +147,7 @@ impl TestCodexBuilder {
home: Arc<TempDir>, home: Arc<TempDir>,
rollout_path: PathBuf, rollout_path: PathBuf,
) -> anyhow::Result<TestCodex> { ) -> anyhow::Result<TestCodex> {
self.build_with_home(server, home, Some(rollout_path)).await Box::pin(self.build_with_home(server, home, Some(rollout_path))).await
} }
async fn build_with_home( async fn build_with_home(
@@ -160,7 +158,7 @@ impl TestCodexBuilder {
) -> anyhow::Result<TestCodex> { ) -> anyhow::Result<TestCodex> {
let base_url = format!("{}/v1", server.uri()); let base_url = format!("{}/v1", server.uri());
let (config, cwd) = self.prepare_config(base_url, &home).await?; let (config, cwd) = self.prepare_config(base_url, &home).await?;
self.build_from_config(config, cwd, home, resume_from).await Box::pin(self.build_from_config(config, cwd, home, resume_from)).await
} }
async fn build_with_home_and_base_url( async fn build_with_home_and_base_url(
@@ -170,7 +168,7 @@ impl TestCodexBuilder {
resume_from: Option<PathBuf>, resume_from: Option<PathBuf>,
) -> anyhow::Result<TestCodex> { ) -> anyhow::Result<TestCodex> {
let (config, cwd) = self.prepare_config(base_url, &home).await?; let (config, cwd) = self.prepare_config(base_url, &home).await?;
self.build_from_config(config, cwd, home, resume_from).await Box::pin(self.build_from_config(config, cwd, home, resume_from)).await
} }
async fn build_from_config( async fn build_from_config(
@@ -201,11 +199,14 @@ impl TestCodexBuilder {
let new_conversation = match resume_from { let new_conversation = match resume_from {
Some(path) => { Some(path) => {
let auth_manager = codex_core::test_support::auth_manager_from_auth(auth); let auth_manager = codex_core::test_support::auth_manager_from_auth(auth);
thread_manager Box::pin(thread_manager.resume_thread_from_rollout(
.resume_thread_from_rollout(config.clone(), path, auth_manager) config.clone(),
.await? path,
auth_manager,
))
.await?
} }
None => thread_manager.start_thread(config.clone()).await?, None => Box::pin(thread_manager.start_thread(config.clone())).await?,
}; };
Ok(TestCodex { Ok(TestCodex {

View File

@@ -157,7 +157,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn(&base, "hello world").await; user_turn(&base, "hello world").await;
compact_conversation(&base).await; compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await; user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base).await; let base_path = fetch_conversation_path(&base);
assert!( assert!(
base_path.exists(), base_path.exists(),
"compact+resume test expects base path {base_path:?} to exist", "compact+resume test expects base path {base_path:?} to exist",
@@ -165,7 +165,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
let resumed = resume_conversation(&manager, &config, base_path).await; let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await; user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed).await; let resumed_path = fetch_conversation_path(&resumed);
assert!( assert!(
resumed_path.exists(), resumed_path.exists(),
"compact+resume test expects resumed path {resumed_path:?} to exist", "compact+resume test expects resumed path {resumed_path:?} to exist",
@@ -292,33 +292,10 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
assert_eq!(requests.len(), 5); assert_eq!(requests.len(), 5);
} }
#[test] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Scenario: after the forked branch is compacted, resuming again should reuse /// Scenario: after the forked branch is compacted, resuming again should reuse
/// the compacted history and only append the new user message. /// the compacted history and only append the new user message.
fn compact_resume_after_second_compaction_preserves_history() -> Result<()> { async fn compact_resume_after_second_compaction_preserves_history() -> Result<()> {
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
let handle = std::thread::Builder::new()
.name("compact_resume_after_second_compaction_preserves_history".to_string())
.stack_size(TEST_STACK_SIZE_BYTES)
.spawn(|| -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.thread_stack_size(TEST_STACK_SIZE_BYTES)
.enable_all()
.build()?;
runtime.block_on(compact_resume_after_second_compaction_preserves_history_impl())
})?;
match handle.join() {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!(
"compact_resume_after_second_compaction_preserves_history thread panicked"
)),
}
}
async fn compact_resume_after_second_compaction_preserves_history_impl() -> Result<()> {
if network_disabled() { if network_disabled() {
println!("Skipping test because network is disabled in this sandbox"); println!("Skipping test because network is disabled in this sandbox");
return Ok(()); return Ok(());
@@ -335,7 +312,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu
user_turn(&base, "hello world").await; user_turn(&base, "hello world").await;
compact_conversation(&base).await; compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await; user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base).await; let base_path = fetch_conversation_path(&base);
assert!( assert!(
base_path.exists(), base_path.exists(),
"second compact test expects base path {base_path:?} to exist", "second compact test expects base path {base_path:?} to exist",
@@ -343,7 +320,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu
let resumed = resume_conversation(&manager, &config, base_path).await; let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await; user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed).await; let resumed_path = fetch_conversation_path(&resumed);
assert!( assert!(
resumed_path.exists(), resumed_path.exists(),
"second compact test expects resumed path {resumed_path:?} to exist", "second compact test expects resumed path {resumed_path:?} to exist",
@@ -354,7 +331,7 @@ async fn compact_resume_after_second_compaction_preserves_history_impl() -> Resu
compact_conversation(&forked).await; compact_conversation(&forked).await;
user_turn(&forked, "AFTER_COMPACT_2").await; user_turn(&forked, "AFTER_COMPACT_2").await;
let forked_path = fetch_conversation_path(&forked).await; let forked_path = fetch_conversation_path(&forked);
assert!( assert!(
forked_path.exists(), forked_path.exists(),
"second compact test expects forked path {forked_path:?} to exist", "second compact test expects forked path {forked_path:?} to exist",
@@ -558,7 +535,9 @@ async fn start_test_conversation(
config.model = Some(model); config.model = Some(model);
} }
}); });
let test = builder.build(server).await.expect("create conversation"); let test = Box::pin(builder.build(server))
.await
.expect("create conversation");
(test.home, test.config, test.thread_manager, test.codex) (test.home, test.config, test.thread_manager, test.codex)
} }
@@ -595,7 +574,7 @@ async fn compact_conversation(conversation: &Arc<CodexThread>) {
wait_for_event(conversation, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; wait_for_event(conversation, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
} }
async fn fetch_conversation_path(conversation: &Arc<CodexThread>) -> std::path::PathBuf { fn fetch_conversation_path(conversation: &Arc<CodexThread>) -> std::path::PathBuf {
conversation.rollout_path().expect("rollout path") conversation.rollout_path().expect("rollout path")
} }
@@ -607,8 +586,7 @@ async fn resume_conversation(
let auth_manager = codex_core::test_support::auth_manager_from_auth( let auth_manager = codex_core::test_support::auth_manager_from_auth(
codex_core::CodexAuth::from_api_key("dummy"), codex_core::CodexAuth::from_api_key("dummy"),
); );
manager Box::pin(manager.resume_thread_from_rollout(config.clone(), path, auth_manager))
.resume_thread_from_rollout(config.clone(), path, auth_manager)
.await .await
.expect("resume conversation") .expect("resume conversation")
.thread .thread
@@ -621,8 +599,7 @@ async fn fork_thread(
path: std::path::PathBuf, path: std::path::PathBuf,
nth_user_message: usize, nth_user_message: usize,
) -> Arc<CodexThread> { ) -> Arc<CodexThread> {
manager Box::pin(manager.fork_thread(nth_user_message, config.clone(), path, false))
.fork_thread(nth_user_message, config.clone(), path, false)
.await .await
.expect("fork conversation") .expect("fork conversation")
.thread .thread