Compare commits

...

13 Commits

Author SHA1 Message Date
Eric Traut
036bf50597 More fixes 2026-02-06 13:24:10 -08:00
Eric Traut
6850000109 Fix lint 2026-02-06 13:16:14 -08:00
Eric Traut
aa83b6d468 Fixed lint 2026-02-06 13:08:13 -08:00
Eric Traut
557e65044b More improvements and fixes 2026-02-06 12:50:39 -08:00
Eric Traut
317e2080dc Code fix 2026-02-06 12:42:25 -08:00
Eric Traut
d03e67e52a Code review feedback 2026-02-06 12:37:53 -08:00
Eric Traut
a9144cd82f More fixes 2026-02-06 12:23:54 -08:00
Eric Traut
9143f0c03a Test fixes 2026-02-06 12:13:22 -08:00
Eric Traut
717b1c9839 Fix lint 2026-02-06 11:42:21 -08:00
Eric Traut
c268d55764 Merge branch 'main' into etraut/rollout_creation_strategy 2026-02-06 11:29:43 -08:00
Eric Traut
71edf960e0 Add comment 2026-02-06 11:23:30 -08:00
Eric Traut
e6ade2d748 Materialize rollout state on-demand for v2 thread APIs
### Change summary

Defer rollout file creation until needed.
* Add a core API to force rollout persistence for loaded non-ephemeral threads:
* seeds initial context if needed
* flushes rollout and returns persisted path

Add concurrency guard to make lazy rollout initialization idempotent under concurrent calls.

Add centralized app-server rollout-path resolver that:
* uses in-memory thread state when loaded
* forces persistence on demand for rollout-dependent calls
* falls back to on-disk lookup for unloaded threads
* maps ephemeral threads to invalid-request errors for rollout-dependent operations

Route rollout-dependent endpoints through the resolver (v2 + shared legacy surfaces), including:
* thread/archive
* thread/resume (thread-id path)
* thread/fork (thread-id path)
* resumeConversation
* forkConversation
* thread summary by thread id
* detached review parent-thread path resolution
* feedback include_logs rollout resolution

Remove stale cached rollout-path assumptions in rollback/detached-review flows by resolving via thread id when needed.

No wire-schema changes; behavior-only change.

v1 compatibility is not expanded in this PR.

### Tests updated/added

* thread_start: assert rollout is absent immediately after thread/start; created after first completed turn.
* thread_resume: resume by thread id succeeds for just-started thread via on-demand persistence; path-vs-thread-id precedence test updated.
* thread_fork: fork by thread id succeeds for just-started thread.
* thread_archive: archive succeeds for just-started thread and materializes before archive.
* thread_unarchive: adjusted for deferred creation timing.
* thread_rollback: rollback path no longer depends on stale cached rollout path.
* Detached review targeted test verified for lazy path behavior.
* Core tests for new persistence API
2026-02-06 11:19:11 -08:00
shijie-openai
71de55886e Feat: update rollout creation strategy 2026-02-06 09:13:00 -08:00
16 changed files with 799 additions and 242 deletions

View File

@@ -71,6 +71,7 @@ use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_core::CodexThread;
use codex_core::RolloutPersistenceStatus;
use codex_core::parse_command::shlex_join;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::CodexErrorInfo as CoreCodexErrorInfo;
@@ -1062,14 +1063,28 @@ pub(crate) async fn apply_bespoke_event_handling(
};
if let Some(request_id) = pending {
let Some(rollout_path) = conversation.rollout_path() else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "thread has no persisted rollout".to_string(),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
// Rollback responses are rebuilt from rollout-on-disk, and `thread/start`
// can defer rollout file creation, so force persistence before reading.
let rollout_path = match conversation.ensure_rollout_persisted().await {
Ok(RolloutPersistenceStatus::Persisted(path)) => path,
Ok(RolloutPersistenceStatus::Ephemeral) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "thread has no persisted rollout".to_string(),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to persist rollout for rollback: {err}"),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
};
let response = match read_summary_from_rollout(
rollout_path.as_path(),

View File

@@ -154,6 +154,7 @@ use codex_core::CodexThread;
use codex_core::Cursor as RolloutCursor;
use codex_core::InitialHistory;
use codex_core::NewThread;
use codex_core::RolloutPersistenceStatus;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::SteerInputError;
@@ -1740,15 +1741,13 @@ impl CodexMessageProcessor {
} = new_thread;
let rollout_path = match session_configured.rollout_path {
Some(path) => path,
None => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "rollout path missing for v1 conversation".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
None => match self.resolve_rollout_path_for_thread_id(thread_id).await {
Ok(path) => path,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
},
};
let response = NewConversationResponse {
conversation_id: thread_id,
@@ -1845,7 +1844,7 @@ impl CodexMessageProcessor {
match self
.thread_manager
.start_thread_with_tools(config, core_dynamic_tools)
.start_thread_with_tools_deferred_rollout(config, core_dynamic_tools)
.await
{
Ok(new_conv) => {
@@ -1973,29 +1972,13 @@ impl CodexMessageProcessor {
}
};
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {thread_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate thread id {thread_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let rollout_path = match self.resolve_rollout_path_for_thread_id(thread_id).await {
Ok(path) => path,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
match self.archive_thread_common(thread_id, &rollout_path).await {
Ok(()) => {
@@ -2587,27 +2570,13 @@ impl CodexMessageProcessor {
}
};
let path = match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
)
.await
let path = match self
.resolve_rollout_path_for_thread_id(existing_thread_id)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {existing_thread_id}"),
)
.await;
return;
}
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {existing_thread_id}: {err}"),
)
.await;
self.outgoing.send_error(request_id, err).await;
return;
}
};
@@ -2773,30 +2742,17 @@ impl CodexMessageProcessor {
}
};
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
)
.await
let rollout_path = match self
.resolve_rollout_path_for_thread_id(existing_thread_id)
.await
{
Ok(Some(p)) => (p, Some(existing_thread_id)),
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {existing_thread_id}"),
)
.await;
return;
}
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {existing_thread_id}: {err}"),
)
.await;
self.outgoing.send_error(request_id, err).await;
return;
}
}
};
(rollout_path, Some(existing_thread_id))
};
let history_cwd =
@@ -2969,21 +2925,12 @@ impl CodexMessageProcessor {
}
}
GetConversationSummaryParams::ThreadId { conversation_id } => {
match codex_core::find_thread_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
match self
.resolve_rollout_path_for_thread_id(conversation_id)
.await
{
Ok(Some(p)) => p,
_ => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"no rollout found for conversation id {conversation_id}"
),
data: None,
};
Ok(path) => path,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
@@ -3651,36 +3598,25 @@ impl CodexMessageProcessor {
}
}
} else if let Some(conversation_id) = conversation_id {
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
let found_path = match self
.resolve_rollout_path_for_thread_id(conversation_id)
.await
{
Ok(Some(found_path)) => {
match RolloutRecorder::get_rollout_history(&found_path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
found_path.display()
),
).await;
return;
}
}
}
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for conversation id {conversation_id}"),
)
.await;
Ok(path) => path,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
match RolloutRecorder::get_rollout_history(&found_path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate conversation id {conversation_id}: {err}"),
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
found_path.display()
),
)
.await;
return;
@@ -3849,27 +3785,17 @@ impl CodexMessageProcessor {
let (rollout_path, source_thread_id) = if let Some(path) = path {
(path, None)
} else if let Some(conversation_id) = conversation_id {
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
let rollout_path = match self
.resolve_rollout_path_for_thread_id(conversation_id)
.await
{
Ok(Some(found_path)) => (found_path, Some(conversation_id)),
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for conversation id {conversation_id}"),
)
.await;
return;
}
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate conversation id {conversation_id}: {err}"),
)
.await;
self.outgoing.send_error(request_id, err).await;
return;
}
}
};
(rollout_path, Some(conversation_id))
} else {
self.send_invalid_request_error(
request_id,
@@ -4767,19 +4693,9 @@ impl CodexMessageProcessor {
review_request: ReviewRequest,
display_text: &str,
) -> std::result::Result<(), JSONRPCErrorError> {
let rollout_path =
find_thread_path_by_id_str(&self.config.codex_home, &parent_thread_id.to_string())
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
data: None,
})?
.ok_or_else(|| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {parent_thread_id}"),
data: None,
})?;
let rollout_path = self
.resolve_rollout_path_for_thread_id(parent_thread_id)
.await?;
let mut config = self.config.as_ref().clone();
if let Some(review_model) = &config.review_model {
@@ -4813,7 +4729,23 @@ impl CodexMessageProcessor {
}
let fallback_provider = self.config.model_provider_id.as_str();
if let Some(rollout_path) = review_thread.rollout_path() {
let review_rollout_path = if let Some(path) = session_configured.rollout_path.clone() {
Some(path)
} else {
match self.resolve_rollout_path_for_thread_id(thread_id).await {
Ok(path) => Some(path),
Err(err) => {
tracing::warn!(
"review thread {} has no persisted rollout path: {}",
session_configured.session_id,
err.message
);
None
}
}
};
if let Some(rollout_path) = review_rollout_path {
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
Ok(summary) => {
let thread = summary_to_thread(summary);
@@ -4830,11 +4762,6 @@ impl CodexMessageProcessor {
);
}
}
} else {
tracing::warn!(
"review thread {} has no rollout path",
session_configured.session_id
);
}
let turn_id = review_thread
@@ -5196,7 +5123,13 @@ impl CodexMessageProcessor {
let validated_rollout_path = if include_logs {
match conversation_id {
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
Some(conv_id) => match self.resolve_rollout_path_for_thread_id(conv_id).await {
Ok(path) => Some(path),
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
},
None => None,
}
} else {
@@ -5245,10 +5178,46 @@ impl CodexMessageProcessor {
}
}
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
match self.thread_manager.get_thread(conversation_id).await {
Ok(conv) => conv.rollout_path(),
Err(_) => None,
async fn resolve_rollout_path_for_thread_id(
&self,
thread_id: ThreadId,
) -> Result<PathBuf, JSONRPCErrorError> {
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
match thread.ensure_rollout_persisted().await {
Ok(RolloutPersistenceStatus::Persisted(path)) => return Ok(path),
Ok(RolloutPersistenceStatus::Ephemeral) => {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"thread `{thread_id}` is ephemeral and has no persisted rollout"
),
data: None,
});
}
Err(err) => {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to persist rollout for thread id {thread_id}: {err}"
),
data: None,
});
}
}
}
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await {
Ok(Some(path)) => Ok(path),
Ok(None) => Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for thread id {thread_id}"),
data: None,
}),
Err(err) => Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate thread id {thread_id}: {err}"),
data: None,
}),
}
}
}

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -18,7 +19,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test]
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let server = create_mock_responses_server_repeating_assistant("Done").await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -38,16 +40,6 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(!thread.id.is_empty());
// Locate the rollout path recorded for this thread id.
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
assert!(
rollout_path.exists(),
"expected {} to exist",
rollout_path.display()
);
// Archive the thread.
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
@@ -61,16 +53,17 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
// Locate the rollout path recorded for this thread id.
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_none(),
"archived thread should no longer have an active rollout path"
);
// Verify file moved.
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
// The archived file keeps the original filename (rollout-...-<id>.jsonl).
let archived_rollout_path =
archived_directory.join(rollout_path.file_name().expect("rollout file name"));
assert!(
!rollout_path.exists(),
"expected rollout path {} to be moved",
rollout_path.display()
);
find_archived_rollout_by_thread_id(&archived_directory, &thread.id)?;
assert!(
archived_rollout_path.exists(),
"expected archived rollout path {} to exist",
@@ -80,14 +73,45 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
std::fs::write(config_toml, config_contents(server_uri))
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
fn config_contents(server_uri: &str) -> String {
format!(
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
)
}
fn find_archived_rollout_by_thread_id(
archived_directory: &Path,
thread_id: &str,
) -> std::io::Result<std::path::PathBuf> {
let entries = std::fs::read_dir(archived_directory)?;
for entry in entries {
let path = entry?.path();
if let Some(file_name) = path.file_name().and_then(|name| name.to_str())
&& file_name.ends_with(&format!("{thread_id}.jsonl"))
{
return Ok(path);
}
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("no archived rollout found for thread id {thread_id}"),
))
}

View File

@@ -10,6 +10,8 @@ use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
@@ -117,6 +119,50 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_fork_materializes_rollout_for_fresh_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse {
thread: forked_thread,
..
} = to_response::<ThreadForkResponse>(fork_resp)?;
assert_ne!(forked_thread.id, thread.id);
assert!(
forked_thread.path.is_some(),
"forked thread should have a persisted rollout path"
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -72,9 +72,14 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
let ThreadResumeResponse {
thread: resumed, ..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
let mut expected = thread;
expected.updated_at = resumed.updated_at;
assert_eq!(resumed, expected);
assert_eq!(resumed.id, thread.id);
assert_eq!(resumed.model_provider, thread.model_provider);
assert_eq!(resumed.source, thread.source);
assert_eq!(resumed.cwd, thread.cwd);
assert!(
resumed.path.is_some(),
"thread/resume should materialize rollout persistence on demand"
);
Ok(())
}
@@ -322,7 +327,23 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let thread_path = thread.path.clone().expect("thread path");
let first_resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let first_resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_by_id,
..
} = to_response::<ThreadResumeResponse>(first_resume_resp)?;
let thread_path = resumed_by_id.path.clone().expect("thread path");
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: "not-a-valid-thread-id".to_string(),
@@ -339,7 +360,7 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
let ThreadResumeResponse {
thread: resumed, ..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
let mut expected = thread;
let mut expected = resumed_by_id;
expected.updated_at = resumed.updated_at;
assert_eq!(resumed, expected);

View File

@@ -9,7 +9,10 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::config::set_project_trust_level;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::openai_models::ReasoningEffort;
use std::path::Path;
@@ -59,6 +62,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
thread.created_at > 0,
"created_at should be a positive UNIX timestamp"
);
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_none(),
"fresh threads should not create rollout files until first turn"
);
// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
@@ -70,6 +78,33 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread, thread);
// First turn should create the rollout file lazily.
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let _: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_some(),
"first completed turn should create rollout file"
);
Ok(())
}

View File

@@ -1,6 +1,8 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
@@ -9,6 +11,8 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use std::fs::FileTimes;
@@ -24,7 +28,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test]
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let server = create_mock_responses_server_repeating_assistant("Done").await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -42,6 +47,27 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let _: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
@@ -108,14 +134,25 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
std::fs::write(config_toml, config_contents(server_uri))
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
fn config_contents(server_uri: &str) -> String {
format!(
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
)
}

View File

@@ -230,6 +230,9 @@
"powershell_utf8": {
"type": "boolean"
},
"remote_compaction": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
@@ -1270,6 +1273,9 @@
"powershell_utf8": {
"type": "boolean"
},
"remote_compaction": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},

View File

@@ -104,6 +104,7 @@ use crate::client::ModelClient;
use crate::client::ModelClientSession;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::codex_thread::RolloutPersistenceStatus;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::compact::collect_user_messages;
use crate::config::Config;
@@ -266,6 +267,7 @@ impl Codex {
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
conversation_history: InitialHistory,
defer_new_rollout_creation: bool,
session_source: SessionSource,
agent_control: AgentControl,
dynamic_tools: Vec<DynamicToolSpec>,
@@ -314,7 +316,7 @@ impl Codex {
// Resolve base instructions for the session. Priority order:
// 1. config.base_instructions override
// 2. conversation history => session_meta.base_instructions
// 3. base_intructions for current model
// 3. base_instructions for current model
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
let base_instructions = config
.base_instructions
@@ -399,6 +401,7 @@ impl Codex {
tx_event.clone(),
agent_status_tx.clone(),
conversation_history,
defer_new_rollout_creation,
session_source_clone,
skills_manager,
file_watcher,
@@ -483,6 +486,12 @@ impl Codex {
pub(crate) fn state_db(&self) -> Option<state_db::StateDbHandle> {
self.session.state_db()
}
/// Ensure this thread's rollout is persisted and return whether it is
/// persisted or ephemeral.
pub(crate) async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
self.session.ensure_rollout_persisted().await
}
}
/// Context for an initialized model agent
@@ -499,6 +508,7 @@ pub(crate) struct Session {
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) services: SessionServices,
defer_new_rollout_creation: bool,
next_internal_sub_id: AtomicU64,
}
@@ -840,6 +850,7 @@ impl Session {
tx_event: Sender<Event>,
agent_status: watch::Sender<AgentStatus>,
initial_history: InitialHistory,
defer_new_rollout_creation: bool,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
@@ -859,26 +870,9 @@ impl Session {
let forked_from_id = initial_history.forked_from_id();
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => {
let conversation_id = ThreadId::default();
(
conversation_id,
RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source,
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
),
)
}
InitialHistory::Resumed(resumed_history) => (
resumed_history.conversation_id,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
),
let conversation_id = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => ThreadId::default(),
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
let state_builder = match &initial_history {
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
@@ -895,17 +889,63 @@ impl Session {
// - load history metadata
let rollout_fut = async {
if config.ephemeral {
Ok::<_, anyhow::Error>((None, None))
Ok::<_, anyhow::Error>((None, None, None))
} else {
let state_db_ctx = state_db::init_if_enabled(&config, None).await;
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx))
match &initial_history {
InitialHistory::New => {
let rollout_params = RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source.clone(),
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
);
if defer_new_rollout_creation {
Ok((None, state_db_ctx, Some(rollout_params)))
} else {
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx, None))
}
}
InitialHistory::Forked(_) => {
let rollout_params = RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source.clone(),
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
);
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx, None))
}
InitialHistory::Resumed(resumed_history) => {
let rollout_recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx, None))
}
}
}
};
@@ -930,10 +970,11 @@ impl Session {
(auth, mcp_servers, auth_statuses),
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
e
})?;
let (rollout_recorder, state_db_ctx, pending_rollout_create) =
rollout_recorder_and_state_db.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
e
})?;
let rollout_path = rollout_recorder
.as_ref()
.map(|rec| rec.rollout_path.clone());
@@ -1043,6 +1084,8 @@ impl Session {
),
hooks: Hooks::new(config.as_ref()),
rollout: Mutex::new(rollout_recorder),
rollout_init_lock: Mutex::new(()),
pending_rollout_create: Mutex::new(pending_rollout_create),
user_shell: Arc::new(default_shell),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
@@ -1076,6 +1119,7 @@ impl Session {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
defer_new_rollout_creation,
next_internal_sub_id: AtomicU64::new(0),
});
@@ -1192,6 +1236,89 @@ impl Session {
}
}
/// Lazily instantiate the rollout recorder when deferred creation is enabled.
///
/// This is idempotent and concurrency-safe:
/// - If a recorder already exists, it returns immediately.
/// - If initialization fails, pending creation params are restored so a
/// later turn can retry.
async fn ensure_rollout_initialized_for_turn(
&self,
turn_context: &TurnContext,
) -> std::io::Result<()> {
let _rollout_init_lock = self.services.rollout_init_lock.lock().await;
{
let rollout = self.services.rollout.lock().await;
if rollout.is_some() {
return Ok(());
}
}
let params = {
let mut pending = self.services.pending_rollout_create.lock().await;
pending.take()
};
let Some(params) = params else {
return Ok(());
};
let recorder = match RolloutRecorder::new(
turn_context.config.as_ref(),
params.clone(),
self.services.state_db.clone(),
None,
)
.await
{
Ok(recorder) => recorder,
Err(err) => {
let mut pending = self.services.pending_rollout_create.lock().await;
if pending.is_none() {
*pending = Some(params);
}
return Err(err);
}
};
let mut rollout = self.services.rollout.lock().await;
if rollout.is_none() {
*rollout = Some(recorder);
}
Ok(())
}
/// Ensure this thread has a persisted rollout on disk and return its status.
///
/// For non-ephemeral threads this guarantees a concrete rollout path by:
/// initializing the recorder if needed, seeding initial context if needed,
/// and flushing buffered rollout writes.
async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
let turn_context = self.new_default_turn().await;
if turn_context.config.ephemeral {
return Ok(RolloutPersistenceStatus::Ephemeral);
}
self.ensure_rollout_initialized_for_turn(&turn_context)
.await?;
self.seed_initial_context_if_needed(&turn_context).await;
self.flush_rollout().await;
let rollout_path = {
let rollout = self.services.rollout.lock().await;
rollout
.as_ref()
.map(|recorder| recorder.rollout_path.clone())
};
let Some(rollout_path) = rollout_path else {
return Err(CodexErr::Fatal(format!(
"rollout recorder missing for thread {}",
self.conversation_id
)));
};
Ok(RolloutPersistenceStatus::Persisted(rollout_path))
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -1220,15 +1347,21 @@ impl Session {
let turn_context = self.new_default_turn().await;
match conversation_history {
InitialHistory::New => {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
if self.defer_new_rollout_creation {
// Defer initial context persistence until the first turn starts.
// This lets turn/start overrides be reflected in the seeded context.
self.flush_rollout().await;
} else {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
@@ -2645,8 +2778,15 @@ impl Session {
}
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
// Seed with context in case there is an OverrideTurnContext first.
let mut previous_context: Option<Arc<TurnContext>> = Some(sess.new_default_turn().await);
// Non-deferred sessions persist an initial baseline at thread start, so we
// keep the default turn context for first-turn diffing.
// Deferred sessions seed initial context on first turn with the current
// settings, so a baseline here would duplicate first-turn updates.
let mut previous_context: Option<Arc<TurnContext>> = if sess.defer_new_rollout_creation {
None
} else {
Some(sess.new_default_turn().await)
};
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
@@ -2926,6 +3066,20 @@ mod handlers {
// Attempt to inject input into current task.
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
if let Err(err) = sess
.ensure_rollout_initialized_for_turn(&current_context)
.await
{
sess.send_event_raw(Event {
id: current_context.sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: format!("failed to initialize rollout recorder: {err}"),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
return;
}
sess.seed_initial_context_if_needed(&current_context).await;
let resumed_model = sess.take_pending_resume_previous_model().await;
let update_items = sess.build_settings_update_items(
@@ -4891,6 +5045,7 @@ mod tests {
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use crate::codex_thread::RolloutPersistenceStatus;
use crate::protocol::CompactedItem;
use crate::protocol::CreditsSnapshot;
use crate::protocol::InitialHistory;
@@ -5091,6 +5246,26 @@ mod tests {
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn record_initial_history_new_defers_initial_context_until_first_turn() {
let (mut session, turn_context) = make_session_and_context().await;
session.defer_new_rollout_creation = true;
{
let mut state = session.state.lock().await;
state.initial_context_seeded = false;
}
session.record_initial_history(InitialHistory::New).await;
let history = session.state.lock().await.clone_history();
assert_eq!(history.raw_items(), Vec::<ResponseItem>::new());
session.seed_initial_context_if_needed(&turn_context).await;
let expected = session.build_initial_context(&turn_context).await;
let history = session.state.lock().await.clone_history();
assert_eq!(history.raw_items(), expected);
}
#[tokio::test]
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
let (session, turn_context) = make_session_and_context().await;
@@ -5248,6 +5423,170 @@ mod tests {
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn lazy_rollout_creation_writes_session_meta_then_initial_context_then_turn_context() {
let (session, turn_context) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
state.initial_context_seeded = false;
}
let base_instructions = session.get_base_instructions().await;
let session_source = {
let state = session.state.lock().await;
state.session_configuration.session_source.clone()
};
{
let mut pending = session.services.pending_rollout_create.lock().await;
*pending = Some(RolloutRecorderParams::new(
session.conversation_id,
None,
session_source,
base_instructions,
Vec::new(),
));
}
let conversation_id = session.conversation_id.to_string();
let rollout_path = crate::rollout::find_thread_path_by_id_str(
&turn_context.config.codex_home,
&conversation_id,
)
.await
.expect("lookup rollout path before first turn");
assert_eq!(rollout_path, None);
session
.ensure_rollout_initialized_for_turn(&turn_context)
.await
.expect("initialize rollout recorder lazily");
session.seed_initial_context_if_needed(&turn_context).await;
session
.persist_rollout_items(&[RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.model_info.slug.clone(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: turn_context.user_instructions.clone(),
developer_instructions: turn_context.developer_instructions.clone(),
final_output_json_schema: turn_context.final_output_json_schema.clone(),
truncation_policy: Some(turn_context.truncation_policy.into()),
})])
.await;
session.flush_rollout().await;
let rollout_path = crate::rollout::find_thread_path_by_id_str(
&turn_context.config.codex_home,
&conversation_id,
)
.await
.expect("lookup rollout path after first turn")
.expect("rollout path should exist");
let (items, _, _) = RolloutRecorder::load_rollout_items(&rollout_path)
.await
.expect("load rollout items");
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
let initial_context_len = session.build_initial_context(&turn_context).await.len();
assert_eq!(
items
.iter()
.skip(1)
.take(initial_context_len)
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
.count(),
initial_context_len
);
assert!(matches!(items.last(), Some(RolloutItem::TurnContext(_))));
}
#[tokio::test]
async fn ensure_rollout_persisted_is_idempotent() {
let (session, turn_context) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
state.initial_context_seeded = false;
}
let base_instructions = session.get_base_instructions().await;
let session_source = {
let state = session.state.lock().await;
state.session_configuration.session_source.clone()
};
{
let mut pending = session.services.pending_rollout_create.lock().await;
*pending = Some(RolloutRecorderParams::new(
session.conversation_id,
None,
session_source,
base_instructions,
Vec::new(),
));
}
let first = session
.ensure_rollout_persisted()
.await
.expect("first persistence call");
let second = session
.ensure_rollout_persisted()
.await
.expect("second persistence call");
let first_path = match first {
RolloutPersistenceStatus::Persisted(path) => path,
RolloutPersistenceStatus::Ephemeral => panic!("expected persisted rollout"),
};
let second_path = match second {
RolloutPersistenceStatus::Persisted(path) => path,
RolloutPersistenceStatus::Ephemeral => panic!("expected persisted rollout"),
};
assert_eq!(first_path, second_path);
let (items, _, _) = RolloutRecorder::load_rollout_items(&first_path)
.await
.expect("load rollout items");
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
let initial_context_len = session.build_initial_context(&turn_context).await.len();
assert_eq!(
items
.iter()
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
.count(),
initial_context_len
);
}
#[tokio::test]
async fn ensure_rollout_persisted_returns_ephemeral_when_session_is_ephemeral() {
let (session, _) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
config.ephemeral = true;
state.session_configuration.original_config_do_not_use = Arc::new(config);
}
let outcome = session
.ensure_rollout_persisted()
.await
.expect("ephemeral persistence check");
assert_eq!(outcome, RolloutPersistenceStatus::Ephemeral);
let codex_home = session.codex_home().await;
let rollout_path = crate::rollout::find_thread_path_by_id_str(
&codex_home,
&session.conversation_id.to_string(),
)
.await
.expect("lookup rollout path");
assert_eq!(rollout_path, None);
}
#[tokio::test]
async fn thread_rollback_drops_last_turn_from_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
@@ -5804,6 +6143,8 @@ mod tests {
),
hooks: Hooks::new(&config),
rollout: Mutex::new(None),
rollout_init_lock: Mutex::new(()),
pending_rollout_create: Mutex::new(None),
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
@@ -5847,6 +6188,7 @@ mod tests {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
defer_new_rollout_creation: false,
next_internal_sub_id: AtomicU64::new(0),
};
@@ -5934,6 +6276,8 @@ mod tests {
),
hooks: Hooks::new(&config),
rollout: Mutex::new(None),
rollout_init_lock: Mutex::new(()),
pending_rollout_create: Mutex::new(None),
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
@@ -5977,6 +6321,7 @@ mod tests {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
defer_new_rollout_creation: false,
next_internal_sub_id: AtomicU64::new(0),
});

View File

@@ -56,6 +56,7 @@ pub(crate) async fn run_codex_thread_interactive(
Arc::clone(&parent_session.services.skills_manager),
Arc::clone(&parent_session.services.file_watcher),
initial_history.unwrap_or(InitialHistory::New),
false,
SessionSource::SubAgent(SubAgentSource::Review),
parent_session.services.agent_control.clone(),
Vec::new(),

View File

@@ -16,6 +16,12 @@ use tokio::sync::watch;
use crate::state_db::StateDbHandle;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RolloutPersistenceStatus {
Persisted(PathBuf),
Ephemeral,
}
#[derive(Clone, Debug)]
pub struct ThreadConfigSnapshot {
pub model: String,
@@ -76,6 +82,11 @@ impl CodexThread {
self.rollout_path.clone()
}
/// Ensure this thread has a persisted rollout and return its status.
pub async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
self.codex.ensure_rollout_persisted().await
}
pub fn state_db(&self) -> Option<StateDbHandle> {
self.codex.state_db()
}

View File

@@ -97,6 +97,8 @@ pub enum Feature {
WindowsSandboxElevated,
/// Refresh remote models and emit AppReady once the list is available.
RemoteModels,
/// Enable remote compaction via the Responses API.
RemoteCompaction,
/// Experimental shell snapshotting.
ShellSnapshot,
/// Enable runtime metrics snapshots via a manual reader.
@@ -495,6 +497,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: true,
},
FeatureSpec {
id: Feature::RemoteCompaction,
key: "remote_compaction",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::PowershellUtf8,
key: "powershell_utf8",

View File

@@ -17,6 +17,7 @@ pub use codex::SteerInputError;
mod codex_thread;
mod compact_remote;
pub use codex_thread::CodexThread;
pub use codex_thread::RolloutPersistenceStatus;
pub use codex_thread::ThreadConfigSnapshot;
mod agent;
mod codex_delegate;

View File

@@ -10,6 +10,7 @@ use crate::file_watcher::FileWatcher;
use crate::hooks::Hooks;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::models_manager::manager::ModelsManager;
use crate::rollout::RolloutRecorderParams;
use crate::skills::SkillsManager;
use crate::state_db::StateDbHandle;
use crate::tools::sandboxing::ApprovalStore;
@@ -26,6 +27,8 @@ pub(crate) struct SessionServices {
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) hooks: Hooks,
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
pub(crate) rollout_init_lock: Mutex<()>,
pub(crate) pending_rollout_create: Mutex<Option<RolloutRecorderParams>>,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) show_raw_agent_reasoning: bool,
pub(crate) exec_policy: ExecPolicyManager,

View File

@@ -91,6 +91,12 @@ pub struct NewThread {
pub session_configured: SessionConfiguredEvent,
}
struct SpawnThreadWithSourceOptions {
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
defer_new_rollout_creation: bool,
}
/// [`ThreadManager`] is responsible for creating threads and maintaining
/// them in memory.
pub struct ThreadManager {
@@ -265,6 +271,24 @@ impl ThreadManager {
Arc::clone(&self.state.auth_manager),
self.agent_control(),
dynamic_tools,
false,
)
.await
}
pub async fn start_thread_with_tools_deferred_rollout(
&self,
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
) -> CodexResult<NewThread> {
self.state
.spawn_thread(
config,
InitialHistory::New,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
dynamic_tools,
true,
)
.await
}
@@ -293,6 +317,7 @@ impl ThreadManager {
auth_manager,
self.agent_control(),
Vec::new(),
false,
)
.await
}
@@ -332,6 +357,7 @@ impl ThreadManager {
Arc::clone(&self.state.auth_manager),
self.agent_control(),
Vec::new(),
false,
)
.await
}
@@ -399,8 +425,11 @@ impl ThreadManagerState {
InitialHistory::New,
Arc::clone(&self.auth_manager),
agent_control,
session_source,
Vec::new(),
SpawnThreadWithSourceOptions {
session_source,
dynamic_tools: Vec::new(),
defer_new_rollout_creation: false,
},
)
.await
}
@@ -413,26 +442,29 @@ impl ThreadManagerState {
auth_manager: Arc<AuthManager>,
agent_control: AgentControl,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
defer_new_rollout_creation: bool,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
initial_history,
auth_manager,
agent_control,
self.session_source.clone(),
dynamic_tools,
SpawnThreadWithSourceOptions {
session_source: self.session_source.clone(),
dynamic_tools,
defer_new_rollout_creation,
},
)
.await
}
pub(crate) async fn spawn_thread_with_source(
async fn spawn_thread_with_source(
&self,
config: Config,
initial_history: InitialHistory,
auth_manager: Arc<AuthManager>,
agent_control: AgentControl,
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
options: SpawnThreadWithSourceOptions,
) -> CodexResult<NewThread> {
self.file_watcher.register_config(&config);
let CodexSpawnOk {
@@ -444,9 +476,10 @@ impl ThreadManagerState {
Arc::clone(&self.skills_manager),
Arc::clone(&self.file_watcher),
initial_history,
session_source,
options.defer_new_rollout_creation,
options.session_source,
agent_control,
dynamic_tools,
options.dynamic_tools,
)
.await?;
self.finalize_thread_spawn(codex, thread_id).await

View File

@@ -4,6 +4,8 @@ use std::fs;
use anyhow::Result;
use codex_core::CodexAuth;
#[allow(unused_imports)]
use codex_core::features::Feature;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ItemCompletedEvent;
use codex_core::protocol::ItemStartedEvent;