mirror of
https://github.com/openai/codex.git
synced 2026-04-27 18:01:04 +03:00
Compare commits
13 Commits
pr12584
...
etraut/rol
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
036bf50597 | ||
|
|
6850000109 | ||
|
|
aa83b6d468 | ||
|
|
557e65044b | ||
|
|
317e2080dc | ||
|
|
d03e67e52a | ||
|
|
a9144cd82f | ||
|
|
9143f0c03a | ||
|
|
717b1c9839 | ||
|
|
c268d55764 | ||
|
|
71edf960e0 | ||
|
|
e6ade2d748 | ||
|
|
71de55886e |
@@ -71,6 +71,7 @@ use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::build_turns_from_event_msgs;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::RolloutPersistenceStatus;
|
||||
use codex_core::parse_command::shlex_join;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::CodexErrorInfo as CoreCodexErrorInfo;
|
||||
@@ -1062,14 +1063,28 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
};
|
||||
|
||||
if let Some(request_id) = pending {
|
||||
let Some(rollout_path) = conversation.rollout_path() else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "thread has no persisted rollout".to_string(),
|
||||
data: None,
|
||||
};
|
||||
outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
// Rollback responses are rebuilt from rollout-on-disk, and `thread/start`
|
||||
// can defer rollout file creation, so force persistence before reading.
|
||||
let rollout_path = match conversation.ensure_rollout_persisted().await {
|
||||
Ok(RolloutPersistenceStatus::Persisted(path)) => path,
|
||||
Ok(RolloutPersistenceStatus::Ephemeral) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "thread has no persisted rollout".to_string(),
|
||||
data: None,
|
||||
};
|
||||
outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to persist rollout for rollback: {err}"),
|
||||
data: None,
|
||||
};
|
||||
outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let response = match read_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
|
||||
@@ -154,6 +154,7 @@ use codex_core::CodexThread;
|
||||
use codex_core::Cursor as RolloutCursor;
|
||||
use codex_core::InitialHistory;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::RolloutPersistenceStatus;
|
||||
use codex_core::RolloutRecorder;
|
||||
use codex_core::SessionMeta;
|
||||
use codex_core::SteerInputError;
|
||||
@@ -1740,15 +1741,13 @@ impl CodexMessageProcessor {
|
||||
} = new_thread;
|
||||
let rollout_path = match session_configured.rollout_path {
|
||||
Some(path) => path,
|
||||
None => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "rollout path missing for v1 conversation".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
None => match self.resolve_rollout_path_for_thread_id(thread_id).await {
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
};
|
||||
let response = NewConversationResponse {
|
||||
conversation_id: thread_id,
|
||||
@@ -1845,7 +1844,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
match self
|
||||
.thread_manager
|
||||
.start_thread_with_tools(config, core_dynamic_tools)
|
||||
.start_thread_with_tools_deferred_rollout(config, core_dynamic_tools)
|
||||
.await
|
||||
{
|
||||
Ok(new_conv) => {
|
||||
@@ -1973,29 +1972,13 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {thread_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {thread_id}: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let rollout_path = match self.resolve_rollout_path_for_thread_id(thread_id).await {
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match self.archive_thread_common(thread_id, &rollout_path).await {
|
||||
Ok(()) => {
|
||||
@@ -2587,27 +2570,13 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
let path = match self
|
||||
.resolve_rollout_path_for_thread_id(existing_thread_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -2773,30 +2742,17 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
let rollout_path = match self
|
||||
.resolve_rollout_path_for_thread_id(existing_thread_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => (p, Some(existing_thread_id)),
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
(rollout_path, Some(existing_thread_id))
|
||||
};
|
||||
|
||||
let history_cwd =
|
||||
@@ -2969,21 +2925,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
GetConversationSummaryParams::ThreadId { conversation_id } => {
|
||||
match codex_core::find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&conversation_id.to_string(),
|
||||
)
|
||||
.await
|
||||
match self
|
||||
.resolve_rollout_path_for_thread_id(conversation_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
_ => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"no rollout found for conversation id {conversation_id}"
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
Ok(path) => path,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
@@ -3651,36 +3598,25 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
} else if let Some(conversation_id) = conversation_id {
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
|
||||
let found_path = match self
|
||||
.resolve_rollout_path_for_thread_id(conversation_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(found_path)) => {
|
||||
match RolloutRecorder::get_rollout_history(&found_path).await {
|
||||
Ok(initial_history) => initial_history,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
|
||||
found_path.display()
|
||||
),
|
||||
).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for conversation id {conversation_id}"),
|
||||
)
|
||||
.await;
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
match RolloutRecorder::get_rollout_history(&found_path).await {
|
||||
Ok(initial_history) => initial_history,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate conversation id {conversation_id}: {err}"),
|
||||
format!(
|
||||
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
|
||||
found_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
@@ -3849,27 +3785,17 @@ impl CodexMessageProcessor {
|
||||
let (rollout_path, source_thread_id) = if let Some(path) = path {
|
||||
(path, None)
|
||||
} else if let Some(conversation_id) = conversation_id {
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
|
||||
let rollout_path = match self
|
||||
.resolve_rollout_path_for_thread_id(conversation_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(found_path)) => (found_path, Some(conversation_id)),
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for conversation id {conversation_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate conversation id {conversation_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
(rollout_path, Some(conversation_id))
|
||||
} else {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
@@ -4767,19 +4693,9 @@ impl CodexMessageProcessor {
|
||||
review_request: ReviewRequest,
|
||||
display_text: &str,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(&self.config.codex_home, &parent_thread_id.to_string())
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {parent_thread_id}: {err}"),
|
||||
data: None,
|
||||
})?
|
||||
.ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {parent_thread_id}"),
|
||||
data: None,
|
||||
})?;
|
||||
let rollout_path = self
|
||||
.resolve_rollout_path_for_thread_id(parent_thread_id)
|
||||
.await?;
|
||||
|
||||
let mut config = self.config.as_ref().clone();
|
||||
if let Some(review_model) = &config.review_model {
|
||||
@@ -4813,7 +4729,23 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let fallback_provider = self.config.model_provider_id.as_str();
|
||||
if let Some(rollout_path) = review_thread.rollout_path() {
|
||||
let review_rollout_path = if let Some(path) = session_configured.rollout_path.clone() {
|
||||
Some(path)
|
||||
} else {
|
||||
match self.resolve_rollout_path_for_thread_id(thread_id).await {
|
||||
Ok(path) => Some(path),
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"review thread {} has no persisted rollout path: {}",
|
||||
session_configured.session_id,
|
||||
err.message
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(rollout_path) = review_rollout_path {
|
||||
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
|
||||
Ok(summary) => {
|
||||
let thread = summary_to_thread(summary);
|
||||
@@ -4830,11 +4762,6 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"review thread {} has no rollout path",
|
||||
session_configured.session_id
|
||||
);
|
||||
}
|
||||
|
||||
let turn_id = review_thread
|
||||
@@ -5196,7 +5123,13 @@ impl CodexMessageProcessor {
|
||||
|
||||
let validated_rollout_path = if include_logs {
|
||||
match conversation_id {
|
||||
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
|
||||
Some(conv_id) => match self.resolve_rollout_path_for_thread_id(conv_id).await {
|
||||
Ok(path) => Some(path),
|
||||
Err(err) => {
|
||||
self.outgoing.send_error(request_id, err).await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
@@ -5245,10 +5178,46 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
|
||||
match self.thread_manager.get_thread(conversation_id).await {
|
||||
Ok(conv) => conv.rollout_path(),
|
||||
Err(_) => None,
|
||||
async fn resolve_rollout_path_for_thread_id(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> Result<PathBuf, JSONRPCErrorError> {
|
||||
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
|
||||
match thread.ensure_rollout_persisted().await {
|
||||
Ok(RolloutPersistenceStatus::Persisted(path)) => return Ok(path),
|
||||
Ok(RolloutPersistenceStatus::Ephemeral) => {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"thread `{thread_id}` is ephemeral and has no persisted rollout"
|
||||
),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to persist rollout for thread id {thread_id}: {err}"
|
||||
),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await {
|
||||
Ok(Some(path)) => Ok(path),
|
||||
Ok(None) => Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for thread id {thread_id}"),
|
||||
data: None,
|
||||
}),
|
||||
Err(err) => Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("failed to locate thread id {thread_id}: {err}"),
|
||||
data: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -18,7 +19,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test]
|
||||
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -38,16 +40,6 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
assert!(!thread.id.is_empty());
|
||||
|
||||
// Locate the rollout path recorded for this thread id.
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist");
|
||||
assert!(
|
||||
rollout_path.exists(),
|
||||
"expected {} to exist",
|
||||
rollout_path.display()
|
||||
);
|
||||
|
||||
// Archive the thread.
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
@@ -61,16 +53,17 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
// Locate the rollout path recorded for this thread id.
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
|
||||
assert!(
|
||||
rollout_path.is_none(),
|
||||
"archived thread should no longer have an active rollout path"
|
||||
);
|
||||
|
||||
// Verify file moved.
|
||||
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
|
||||
// The archived file keeps the original filename (rollout-...-<id>.jsonl).
|
||||
let archived_rollout_path =
|
||||
archived_directory.join(rollout_path.file_name().expect("rollout file name"));
|
||||
assert!(
|
||||
!rollout_path.exists(),
|
||||
"expected rollout path {} to be moved",
|
||||
rollout_path.display()
|
||||
);
|
||||
find_archived_rollout_by_thread_id(&archived_directory, &thread.id)?;
|
||||
assert!(
|
||||
archived_rollout_path.exists(),
|
||||
"expected archived rollout path {} to exist",
|
||||
@@ -80,14 +73,45 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(config_toml, config_contents())
|
||||
std::fs::write(config_toml, config_contents(server_uri))
|
||||
}
|
||||
|
||||
fn config_contents() -> &'static str {
|
||||
r#"model = "mock-model"
|
||||
fn config_contents(server_uri: &str) -> String {
|
||||
format!(
|
||||
r#"model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
)
|
||||
}
|
||||
|
||||
fn find_archived_rollout_by_thread_id(
|
||||
archived_directory: &Path,
|
||||
thread_id: &str,
|
||||
) -> std::io::Result<std::path::PathBuf> {
|
||||
let entries = std::fs::read_dir(archived_directory)?;
|
||||
for entry in entries {
|
||||
let path = entry?.path();
|
||||
if let Some(file_name) = path.file_name().and_then(|name| name.to_str())
|
||||
&& file_name.ends_with(&format!("{thread_id}.jsonl"))
|
||||
{
|
||||
return Ok(path);
|
||||
}
|
||||
}
|
||||
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
format!("no archived rollout found for thread id {thread_id}"),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
@@ -117,6 +119,50 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_materializes_rollout_for_fresh_thread() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: thread.id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse {
|
||||
thread: forked_thread,
|
||||
..
|
||||
} = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
assert_ne!(forked_thread.id, thread.id);
|
||||
assert!(
|
||||
forked_thread.path.is_some(),
|
||||
"forked thread should have a persisted rollout path"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
|
||||
@@ -72,9 +72,14 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed, ..
|
||||
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
let mut expected = thread;
|
||||
expected.updated_at = resumed.updated_at;
|
||||
assert_eq!(resumed, expected);
|
||||
assert_eq!(resumed.id, thread.id);
|
||||
assert_eq!(resumed.model_provider, thread.model_provider);
|
||||
assert_eq!(resumed.source, thread.source);
|
||||
assert_eq!(resumed.cwd, thread.cwd);
|
||||
assert!(
|
||||
resumed.path.is_some(),
|
||||
"thread/resume should materialize rollout persistence on demand"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -322,7 +327,23 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let thread_path = thread.path.clone().expect("thread path");
|
||||
let first_resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: thread.id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let first_resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed_by_id,
|
||||
..
|
||||
} = to_response::<ThreadResumeResponse>(first_resume_resp)?;
|
||||
let thread_path = resumed_by_id.path.clone().expect("thread path");
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: "not-a-valid-thread-id".to_string(),
|
||||
@@ -339,7 +360,7 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed, ..
|
||||
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
let mut expected = thread;
|
||||
let mut expected = resumed_by_id;
|
||||
expected.updated_at = resumed.updated_at;
|
||||
assert_eq!(resumed, expected);
|
||||
|
||||
|
||||
@@ -9,7 +9,10 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::config::set_project_trust_level;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use std::path::Path;
|
||||
@@ -59,6 +62,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
thread.created_at > 0,
|
||||
"created_at should be a positive UNIX timestamp"
|
||||
);
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
|
||||
assert!(
|
||||
rollout_path.is_none(),
|
||||
"fresh threads should not create rollout files until first turn"
|
||||
);
|
||||
|
||||
// A corresponding thread/started notification should arrive.
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
@@ -70,6 +78,33 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
assert_eq!(started.thread, thread);
|
||||
|
||||
// First turn should create the rollout file lazily.
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
|
||||
assert!(
|
||||
rollout_path.is_some(),
|
||||
"first completed turn should create rollout file"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
@@ -9,6 +11,8 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use std::fs::FileTimes;
|
||||
@@ -24,7 +28,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test]
|
||||
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -42,6 +47,27 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist");
|
||||
@@ -108,14 +134,25 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(config_toml, config_contents())
|
||||
std::fs::write(config_toml, config_contents(server_uri))
|
||||
}
|
||||
|
||||
fn config_contents() -> &'static str {
|
||||
r#"model = "mock-model"
|
||||
fn config_contents(server_uri: &str) -> String {
|
||||
format!(
|
||||
r#"model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
)
|
||||
}
|
||||
|
||||
@@ -230,6 +230,9 @@
|
||||
"powershell_utf8": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1270,6 +1273,9 @@
|
||||
"powershell_utf8": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_compaction": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"remote_models": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -104,6 +104,7 @@ use crate::client::ModelClient;
|
||||
use crate::client::ModelClientSession;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::codex_thread::RolloutPersistenceStatus;
|
||||
use crate::codex_thread::ThreadConfigSnapshot;
|
||||
use crate::compact::collect_user_messages;
|
||||
use crate::config::Config;
|
||||
@@ -266,6 +267,7 @@ impl Codex {
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
conversation_history: InitialHistory,
|
||||
defer_new_rollout_creation: bool,
|
||||
session_source: SessionSource,
|
||||
agent_control: AgentControl,
|
||||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
@@ -314,7 +316,7 @@ impl Codex {
|
||||
// Resolve base instructions for the session. Priority order:
|
||||
// 1. config.base_instructions override
|
||||
// 2. conversation history => session_meta.base_instructions
|
||||
// 3. base_intructions for current model
|
||||
// 3. base_instructions for current model
|
||||
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
|
||||
let base_instructions = config
|
||||
.base_instructions
|
||||
@@ -399,6 +401,7 @@ impl Codex {
|
||||
tx_event.clone(),
|
||||
agent_status_tx.clone(),
|
||||
conversation_history,
|
||||
defer_new_rollout_creation,
|
||||
session_source_clone,
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
@@ -483,6 +486,12 @@ impl Codex {
|
||||
pub(crate) fn state_db(&self) -> Option<state_db::StateDbHandle> {
|
||||
self.session.state_db()
|
||||
}
|
||||
|
||||
/// Ensure this thread's rollout is persisted and return whether it is
|
||||
/// persisted or ephemeral.
|
||||
pub(crate) async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
|
||||
self.session.ensure_rollout_persisted().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for an initialized model agent
|
||||
@@ -499,6 +508,7 @@ pub(crate) struct Session {
|
||||
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(crate) services: SessionServices,
|
||||
defer_new_rollout_creation: bool,
|
||||
next_internal_sub_id: AtomicU64,
|
||||
}
|
||||
|
||||
@@ -840,6 +850,7 @@ impl Session {
|
||||
tx_event: Sender<Event>,
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
initial_history: InitialHistory,
|
||||
defer_new_rollout_creation: bool,
|
||||
session_source: SessionSource,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
@@ -859,26 +870,9 @@ impl Session {
|
||||
|
||||
let forked_from_id = initial_history.forked_from_id();
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ThreadId::default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source,
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
),
|
||||
)
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => (
|
||||
resumed_history.conversation_id,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
),
|
||||
let conversation_id = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => ThreadId::default(),
|
||||
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
|
||||
};
|
||||
let state_builder = match &initial_history {
|
||||
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
|
||||
@@ -895,17 +889,63 @@ impl Session {
|
||||
// - load history metadata
|
||||
let rollout_fut = async {
|
||||
if config.ephemeral {
|
||||
Ok::<_, anyhow::Error>((None, None))
|
||||
Ok::<_, anyhow::Error>((None, None, None))
|
||||
} else {
|
||||
let state_db_ctx = state_db::init_if_enabled(&config, None).await;
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx))
|
||||
match &initial_history {
|
||||
InitialHistory::New => {
|
||||
let rollout_params = RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source.clone(),
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
);
|
||||
if defer_new_rollout_creation {
|
||||
Ok((None, state_db_ctx, Some(rollout_params)))
|
||||
} else {
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx, None))
|
||||
}
|
||||
}
|
||||
InitialHistory::Forked(_) => {
|
||||
let rollout_params = RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source.clone(),
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
);
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx, None))
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -930,10 +970,11 @@ impl Session {
|
||||
(auth, mcp_servers, auth_statuses),
|
||||
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
|
||||
|
||||
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
e
|
||||
})?;
|
||||
let (rollout_recorder, state_db_ctx, pending_rollout_create) =
|
||||
rollout_recorder_and_state_db.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
e
|
||||
})?;
|
||||
let rollout_path = rollout_recorder
|
||||
.as_ref()
|
||||
.map(|rec| rec.rollout_path.clone());
|
||||
@@ -1043,6 +1084,8 @@ impl Session {
|
||||
),
|
||||
hooks: Hooks::new(config.as_ref()),
|
||||
rollout: Mutex::new(rollout_recorder),
|
||||
rollout_init_lock: Mutex::new(()),
|
||||
pending_rollout_create: Mutex::new(pending_rollout_create),
|
||||
user_shell: Arc::new(default_shell),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
@@ -1076,6 +1119,7 @@ impl Session {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
defer_new_rollout_creation,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
@@ -1192,6 +1236,89 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
/// Lazily instantiate the rollout recorder when deferred creation is enabled.
|
||||
///
|
||||
/// This is idempotent and concurrency-safe:
|
||||
/// - If a recorder already exists, it returns immediately.
|
||||
/// - If initialization fails, pending creation params are restored so a
|
||||
/// later turn can retry.
|
||||
async fn ensure_rollout_initialized_for_turn(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> std::io::Result<()> {
|
||||
let _rollout_init_lock = self.services.rollout_init_lock.lock().await;
|
||||
{
|
||||
let rollout = self.services.rollout.lock().await;
|
||||
if rollout.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let params = {
|
||||
let mut pending = self.services.pending_rollout_create.lock().await;
|
||||
pending.take()
|
||||
};
|
||||
let Some(params) = params else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let recorder = match RolloutRecorder::new(
|
||||
turn_context.config.as_ref(),
|
||||
params.clone(),
|
||||
self.services.state_db.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(recorder) => recorder,
|
||||
Err(err) => {
|
||||
let mut pending = self.services.pending_rollout_create.lock().await;
|
||||
if pending.is_none() {
|
||||
*pending = Some(params);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let mut rollout = self.services.rollout.lock().await;
|
||||
if rollout.is_none() {
|
||||
*rollout = Some(recorder);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure this thread has a persisted rollout on disk and return its status.
|
||||
///
|
||||
/// For non-ephemeral threads this guarantees a concrete rollout path by:
|
||||
/// initializing the recorder if needed, seeding initial context if needed,
|
||||
/// and flushing buffered rollout writes.
|
||||
async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
if turn_context.config.ephemeral {
|
||||
return Ok(RolloutPersistenceStatus::Ephemeral);
|
||||
}
|
||||
|
||||
self.ensure_rollout_initialized_for_turn(&turn_context)
|
||||
.await?;
|
||||
self.seed_initial_context_if_needed(&turn_context).await;
|
||||
self.flush_rollout().await;
|
||||
|
||||
let rollout_path = {
|
||||
let rollout = self.services.rollout.lock().await;
|
||||
rollout
|
||||
.as_ref()
|
||||
.map(|recorder| recorder.rollout_path.clone())
|
||||
};
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
return Err(CodexErr::Fatal(format!(
|
||||
"rollout recorder missing for thread {}",
|
||||
self.conversation_id
|
||||
)));
|
||||
};
|
||||
|
||||
Ok(RolloutPersistenceStatus::Persisted(rollout_path))
|
||||
}
|
||||
|
||||
fn next_internal_sub_id(&self) -> String {
|
||||
let id = self
|
||||
.next_internal_sub_id
|
||||
@@ -1220,15 +1347,21 @@ impl Session {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(&turn_context).await;
|
||||
self.record_conversation_items(&turn_context, &items).await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.initial_context_seeded = true;
|
||||
if self.defer_new_rollout_creation {
|
||||
// Defer initial context persistence until the first turn starts.
|
||||
// This lets turn/start overrides be reflected in the seeded context.
|
||||
self.flush_rollout().await;
|
||||
} else {
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(&turn_context).await;
|
||||
self.record_conversation_items(&turn_context, &items).await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.initial_context_seeded = true;
|
||||
}
|
||||
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
|
||||
self.flush_rollout().await;
|
||||
}
|
||||
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
|
||||
self.flush_rollout().await;
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_items = resumed_history.history;
|
||||
@@ -2645,8 +2778,15 @@ impl Session {
|
||||
}
|
||||
|
||||
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
|
||||
// Seed with context in case there is an OverrideTurnContext first.
|
||||
let mut previous_context: Option<Arc<TurnContext>> = Some(sess.new_default_turn().await);
|
||||
// Non-deferred sessions persist an initial baseline at thread start, so we
|
||||
// keep the default turn context for first-turn diffing.
|
||||
// Deferred sessions seed initial context on first turn with the current
|
||||
// settings, so a baseline here would duplicate first-turn updates.
|
||||
let mut previous_context: Option<Arc<TurnContext>> = if sess.defer_new_rollout_creation {
|
||||
None
|
||||
} else {
|
||||
Some(sess.new_default_turn().await)
|
||||
};
|
||||
|
||||
// To break out of this loop, send Op::Shutdown.
|
||||
while let Ok(sub) = rx_sub.recv().await {
|
||||
@@ -2926,6 +3066,20 @@ mod handlers {
|
||||
|
||||
// Attempt to inject input into current task.
|
||||
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
|
||||
if let Err(err) = sess
|
||||
.ensure_rollout_initialized_for_turn(¤t_context)
|
||||
.await
|
||||
{
|
||||
sess.send_event_raw(Event {
|
||||
id: current_context.sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!("failed to initialize rollout recorder: {err}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
sess.seed_initial_context_if_needed(¤t_context).await;
|
||||
let resumed_model = sess.take_pending_resume_previous_model().await;
|
||||
let update_items = sess.build_settings_update_items(
|
||||
@@ -4891,6 +5045,7 @@ mod tests {
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
|
||||
use crate::codex_thread::RolloutPersistenceStatus;
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::CreditsSnapshot;
|
||||
use crate::protocol::InitialHistory;
|
||||
@@ -5091,6 +5246,26 @@ mod tests {
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_new_defers_initial_context_until_first_turn() {
|
||||
let (mut session, turn_context) = make_session_and_context().await;
|
||||
session.defer_new_rollout_creation = true;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
state.initial_context_seeded = false;
|
||||
}
|
||||
|
||||
session.record_initial_history(InitialHistory::New).await;
|
||||
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(history.raw_items(), Vec::<ResponseItem>::new());
|
||||
|
||||
session.seed_initial_context_if_needed(&turn_context).await;
|
||||
let expected = session.build_initial_context(&turn_context).await;
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(history.raw_items(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
@@ -5248,6 +5423,170 @@ mod tests {
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lazy_rollout_creation_writes_session_meta_then_initial_context_then_turn_context() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
state.initial_context_seeded = false;
|
||||
}
|
||||
let base_instructions = session.get_base_instructions().await;
|
||||
let session_source = {
|
||||
let state = session.state.lock().await;
|
||||
state.session_configuration.session_source.clone()
|
||||
};
|
||||
{
|
||||
let mut pending = session.services.pending_rollout_create.lock().await;
|
||||
*pending = Some(RolloutRecorderParams::new(
|
||||
session.conversation_id,
|
||||
None,
|
||||
session_source,
|
||||
base_instructions,
|
||||
Vec::new(),
|
||||
));
|
||||
}
|
||||
|
||||
let conversation_id = session.conversation_id.to_string();
|
||||
let rollout_path = crate::rollout::find_thread_path_by_id_str(
|
||||
&turn_context.config.codex_home,
|
||||
&conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("lookup rollout path before first turn");
|
||||
assert_eq!(rollout_path, None);
|
||||
|
||||
session
|
||||
.ensure_rollout_initialized_for_turn(&turn_context)
|
||||
.await
|
||||
.expect("initialize rollout recorder lazily");
|
||||
session.seed_initial_context_if_needed(&turn_context).await;
|
||||
session
|
||||
.persist_rollout_items(&[RolloutItem::TurnContext(TurnContextItem {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.model_info.slug.clone(),
|
||||
personality: turn_context.personality,
|
||||
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
|
||||
effort: turn_context.reasoning_effort,
|
||||
summary: turn_context.reasoning_summary,
|
||||
user_instructions: turn_context.user_instructions.clone(),
|
||||
developer_instructions: turn_context.developer_instructions.clone(),
|
||||
final_output_json_schema: turn_context.final_output_json_schema.clone(),
|
||||
truncation_policy: Some(turn_context.truncation_policy.into()),
|
||||
})])
|
||||
.await;
|
||||
session.flush_rollout().await;
|
||||
|
||||
let rollout_path = crate::rollout::find_thread_path_by_id_str(
|
||||
&turn_context.config.codex_home,
|
||||
&conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("lookup rollout path after first turn")
|
||||
.expect("rollout path should exist");
|
||||
let (items, _, _) = RolloutRecorder::load_rollout_items(&rollout_path)
|
||||
.await
|
||||
.expect("load rollout items");
|
||||
|
||||
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
|
||||
let initial_context_len = session.build_initial_context(&turn_context).await.len();
|
||||
assert_eq!(
|
||||
items
|
||||
.iter()
|
||||
.skip(1)
|
||||
.take(initial_context_len)
|
||||
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
|
||||
.count(),
|
||||
initial_context_len
|
||||
);
|
||||
assert!(matches!(items.last(), Some(RolloutItem::TurnContext(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ensure_rollout_persisted_is_idempotent() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
state.initial_context_seeded = false;
|
||||
}
|
||||
|
||||
let base_instructions = session.get_base_instructions().await;
|
||||
let session_source = {
|
||||
let state = session.state.lock().await;
|
||||
state.session_configuration.session_source.clone()
|
||||
};
|
||||
{
|
||||
let mut pending = session.services.pending_rollout_create.lock().await;
|
||||
*pending = Some(RolloutRecorderParams::new(
|
||||
session.conversation_id,
|
||||
None,
|
||||
session_source,
|
||||
base_instructions,
|
||||
Vec::new(),
|
||||
));
|
||||
}
|
||||
|
||||
let first = session
|
||||
.ensure_rollout_persisted()
|
||||
.await
|
||||
.expect("first persistence call");
|
||||
let second = session
|
||||
.ensure_rollout_persisted()
|
||||
.await
|
||||
.expect("second persistence call");
|
||||
|
||||
let first_path = match first {
|
||||
RolloutPersistenceStatus::Persisted(path) => path,
|
||||
RolloutPersistenceStatus::Ephemeral => panic!("expected persisted rollout"),
|
||||
};
|
||||
let second_path = match second {
|
||||
RolloutPersistenceStatus::Persisted(path) => path,
|
||||
RolloutPersistenceStatus::Ephemeral => panic!("expected persisted rollout"),
|
||||
};
|
||||
assert_eq!(first_path, second_path);
|
||||
|
||||
let (items, _, _) = RolloutRecorder::load_rollout_items(&first_path)
|
||||
.await
|
||||
.expect("load rollout items");
|
||||
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
|
||||
|
||||
let initial_context_len = session.build_initial_context(&turn_context).await.len();
|
||||
assert_eq!(
|
||||
items
|
||||
.iter()
|
||||
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
|
||||
.count(),
|
||||
initial_context_len
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ensure_rollout_persisted_returns_ephemeral_when_session_is_ephemeral() {
|
||||
let (session, _) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
|
||||
config.ephemeral = true;
|
||||
state.session_configuration.original_config_do_not_use = Arc::new(config);
|
||||
}
|
||||
|
||||
let outcome = session
|
||||
.ensure_rollout_persisted()
|
||||
.await
|
||||
.expect("ephemeral persistence check");
|
||||
assert_eq!(outcome, RolloutPersistenceStatus::Ephemeral);
|
||||
|
||||
let codex_home = session.codex_home().await;
|
||||
let rollout_path = crate::rollout::find_thread_path_by_id_str(
|
||||
&codex_home,
|
||||
&session.conversation_id.to_string(),
|
||||
)
|
||||
.await
|
||||
.expect("lookup rollout path");
|
||||
assert_eq!(rollout_path, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_drops_last_turn_from_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
@@ -5804,6 +6143,8 @@ mod tests {
|
||||
),
|
||||
hooks: Hooks::new(&config),
|
||||
rollout: Mutex::new(None),
|
||||
rollout_init_lock: Mutex::new(()),
|
||||
pending_rollout_create: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
@@ -5847,6 +6188,7 @@ mod tests {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
defer_new_rollout_creation: false,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
};
|
||||
|
||||
@@ -5934,6 +6276,8 @@ mod tests {
|
||||
),
|
||||
hooks: Hooks::new(&config),
|
||||
rollout: Mutex::new(None),
|
||||
rollout_init_lock: Mutex::new(()),
|
||||
pending_rollout_create: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
@@ -5977,6 +6321,7 @@ mod tests {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
defer_new_rollout_creation: false,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
Arc::clone(&parent_session.services.skills_manager),
|
||||
Arc::clone(&parent_session.services.file_watcher),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
false,
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
parent_session.services.agent_control.clone(),
|
||||
Vec::new(),
|
||||
|
||||
@@ -16,6 +16,12 @@ use tokio::sync::watch;
|
||||
|
||||
use crate::state_db::StateDbHandle;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum RolloutPersistenceStatus {
|
||||
Persisted(PathBuf),
|
||||
Ephemeral,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ThreadConfigSnapshot {
|
||||
pub model: String,
|
||||
@@ -76,6 +82,11 @@ impl CodexThread {
|
||||
self.rollout_path.clone()
|
||||
}
|
||||
|
||||
/// Ensure this thread has a persisted rollout and return its status.
|
||||
pub async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
|
||||
self.codex.ensure_rollout_persisted().await
|
||||
}
|
||||
|
||||
pub fn state_db(&self) -> Option<StateDbHandle> {
|
||||
self.codex.state_db()
|
||||
}
|
||||
|
||||
@@ -97,6 +97,8 @@ pub enum Feature {
|
||||
WindowsSandboxElevated,
|
||||
/// Refresh remote models and emit AppReady once the list is available.
|
||||
RemoteModels,
|
||||
/// Enable remote compaction via the Responses API.
|
||||
RemoteCompaction,
|
||||
/// Experimental shell snapshotting.
|
||||
ShellSnapshot,
|
||||
/// Enable runtime metrics snapshots via a manual reader.
|
||||
@@ -495,6 +497,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: true,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::RemoteCompaction,
|
||||
key: "remote_compaction",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::PowershellUtf8,
|
||||
key: "powershell_utf8",
|
||||
|
||||
@@ -17,6 +17,7 @@ pub use codex::SteerInputError;
|
||||
mod codex_thread;
|
||||
mod compact_remote;
|
||||
pub use codex_thread::CodexThread;
|
||||
pub use codex_thread::RolloutPersistenceStatus;
|
||||
pub use codex_thread::ThreadConfigSnapshot;
|
||||
mod agent;
|
||||
mod codex_delegate;
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::file_watcher::FileWatcher;
|
||||
use crate::hooks::Hooks;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::state_db::StateDbHandle;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
@@ -26,6 +27,8 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) hooks: Hooks,
|
||||
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
|
||||
pub(crate) rollout_init_lock: Mutex<()>,
|
||||
pub(crate) pending_rollout_create: Mutex<Option<RolloutRecorderParams>>,
|
||||
pub(crate) user_shell: Arc<crate::shell::Shell>,
|
||||
pub(crate) show_raw_agent_reasoning: bool,
|
||||
pub(crate) exec_policy: ExecPolicyManager,
|
||||
|
||||
@@ -91,6 +91,12 @@ pub struct NewThread {
|
||||
pub session_configured: SessionConfiguredEvent,
|
||||
}
|
||||
|
||||
struct SpawnThreadWithSourceOptions {
|
||||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
defer_new_rollout_creation: bool,
|
||||
}
|
||||
|
||||
/// [`ThreadManager`] is responsible for creating threads and maintaining
|
||||
/// them in memory.
|
||||
pub struct ThreadManager {
|
||||
@@ -265,6 +271,24 @@ impl ThreadManager {
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
dynamic_tools,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn start_thread_with_tools_deferred_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.state
|
||||
.spawn_thread(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
dynamic_tools,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -293,6 +317,7 @@ impl ThreadManager {
|
||||
auth_manager,
|
||||
self.agent_control(),
|
||||
Vec::new(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -332,6 +357,7 @@ impl ThreadManager {
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
Vec::new(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -399,8 +425,11 @@ impl ThreadManagerState {
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.auth_manager),
|
||||
agent_control,
|
||||
session_source,
|
||||
Vec::new(),
|
||||
SpawnThreadWithSourceOptions {
|
||||
session_source,
|
||||
dynamic_tools: Vec::new(),
|
||||
defer_new_rollout_creation: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -413,26 +442,29 @@ impl ThreadManagerState {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
agent_control: AgentControl,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
defer_new_rollout_creation: bool,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
initial_history,
|
||||
auth_manager,
|
||||
agent_control,
|
||||
self.session_source.clone(),
|
||||
dynamic_tools,
|
||||
SpawnThreadWithSourceOptions {
|
||||
session_source: self.session_source.clone(),
|
||||
dynamic_tools,
|
||||
defer_new_rollout_creation,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_thread_with_source(
|
||||
async fn spawn_thread_with_source(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
options: SpawnThreadWithSourceOptions,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.file_watcher.register_config(&config);
|
||||
let CodexSpawnOk {
|
||||
@@ -444,9 +476,10 @@ impl ThreadManagerState {
|
||||
Arc::clone(&self.skills_manager),
|
||||
Arc::clone(&self.file_watcher),
|
||||
initial_history,
|
||||
session_source,
|
||||
options.defer_new_rollout_creation,
|
||||
options.session_source,
|
||||
agent_control,
|
||||
dynamic_tools,
|
||||
options.dynamic_tools,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_thread_spawn(codex, thread_id).await
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::fs;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexAuth;
|
||||
#[allow(unused_imports)]
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ItemCompletedEvent;
|
||||
use codex_core::protocol::ItemStartedEvent;
|
||||
|
||||
Reference in New Issue
Block a user