mirror of
https://github.com/openai/codex.git
synced 2026-05-01 03:42:05 +03:00
feat: fork conversation/thread (#8866)
## Summary - add thread/conversation fork endpoints to the protocol (v1 + v2) - implement fork handling in app-server using thread manager and config overrides - add fork coverage in app-server tests and document `thread/fork` usage
This commit is contained in:
@@ -28,6 +28,8 @@ use codex_app_server_protocol::ConversationSummary;
|
||||
use codex_app_server_protocol::ExecOneOffCommandResponse;
|
||||
use codex_app_server_protocol::FeedbackUploadParams;
|
||||
use codex_app_server_protocol::FeedbackUploadResponse;
|
||||
use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::ForkConversationResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchResponse;
|
||||
use codex_app_server_protocol::GetAccountParams;
|
||||
@@ -86,6 +88,8 @@ use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -126,6 +130,7 @@ use codex_core::config::ConfigService;
|
||||
use codex_core::config::edit::ConfigEditsBuilder;
|
||||
use codex_core::config::types::McpServerTransportConfig;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::error::CodexErr;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec_env::create_env;
|
||||
use codex_core::features::Feature;
|
||||
@@ -369,6 +374,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ThreadResume { request_id, params } => {
|
||||
self.thread_resume(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadFork { request_id, params } => {
|
||||
self.thread_fork(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadArchive { request_id, params } => {
|
||||
self.thread_archive(request_id, params).await;
|
||||
}
|
||||
@@ -438,6 +446,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ResumeConversation { request_id, params } => {
|
||||
self.handle_resume_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ForkConversation { request_id, params } => {
|
||||
self.handle_fork_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ArchiveConversation { request_id, params } => {
|
||||
self.archive_conversation(request_id, params).await;
|
||||
}
|
||||
@@ -1858,6 +1869,198 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_fork(&mut self, request_id: RequestId, params: ThreadForkParams) {
|
||||
let ThreadForkParams {
|
||||
thread_id,
|
||||
path,
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox,
|
||||
config: cli_overrides,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
} = params;
|
||||
|
||||
let overrides_requested = model.is_some()
|
||||
|| model_provider.is_some()
|
||||
|| cwd.is_some()
|
||||
|| approval_policy.is_some()
|
||||
|| sandbox.is_some()
|
||||
|| cli_overrides.is_some()
|
||||
|| base_instructions.is_some()
|
||||
|| developer_instructions.is_some();
|
||||
|
||||
let config = if overrides_requested {
|
||||
let overrides = self.build_thread_config_overrides(
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
);
|
||||
|
||||
// Persist windows sandbox feature.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
if cfg!(windows) && self.config.features.enabled(Feature::WindowsSandbox) {
|
||||
cli_overrides.insert(
|
||||
"features.experimental_windows_sandbox".to_string(),
|
||||
serde_json::json!(true),
|
||||
);
|
||||
}
|
||||
|
||||
match derive_config_from_params(&self.cli_overrides, Some(cli_overrides), overrides)
|
||||
.await
|
||||
{
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("error deriving config: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.config.as_ref().clone()
|
||||
};
|
||||
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path
|
||||
} else {
|
||||
let existing_thread_id = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid thread id: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let fallback_model_provider = config.model_provider_id.clone();
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
CodexErr::Io(_) | CodexErr::Json(_) => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
|
||||
_ => (INTERNAL_ERROR_CODE, format!("error forking thread: {err}")),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let SessionConfiguredEvent {
|
||||
rollout_path,
|
||||
initial_messages,
|
||||
..
|
||||
} = session_configured;
|
||||
// Auto-attach a conversation listener when forking a thread.
|
||||
if let Err(err) = self
|
||||
.attach_conversation_listener(thread_id, false, ApiVersion::V2)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
"failed to attach listener for thread {}: {}",
|
||||
thread_id,
|
||||
err.message
|
||||
);
|
||||
}
|
||||
|
||||
let mut thread = match read_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
|
||||
let response = ThreadForkResponse {
|
||||
thread: thread.clone(),
|
||||
model: session_configured.model,
|
||||
model_provider: session_configured.model_provider_id,
|
||||
cwd: session_configured.cwd,
|
||||
approval_policy: session_configured.approval_policy.into(),
|
||||
sandbox: session_configured.sandbox_policy.into(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_thread_summary(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
@@ -2481,6 +2684,166 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_fork_conversation(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
params: ForkConversationParams,
|
||||
) {
|
||||
let ForkConversationParams {
|
||||
path,
|
||||
conversation_id,
|
||||
overrides,
|
||||
} = params;
|
||||
|
||||
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
|
||||
let config = match overrides {
|
||||
Some(overrides) => {
|
||||
let NewConversationParams {
|
||||
model,
|
||||
model_provider,
|
||||
profile,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox: sandbox_mode,
|
||||
config: cli_overrides,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
compact_prompt,
|
||||
include_apply_patch_tool,
|
||||
} = overrides;
|
||||
|
||||
// Persist windows sandbox feature.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
if cfg!(windows) && self.config.features.enabled(Feature::WindowsSandbox) {
|
||||
cli_overrides.insert(
|
||||
"features.experimental_windows_sandbox".to_string(),
|
||||
serde_json::json!(true),
|
||||
);
|
||||
}
|
||||
|
||||
let overrides = ConfigOverrides {
|
||||
model,
|
||||
config_profile: profile,
|
||||
cwd: cwd.map(PathBuf::from),
|
||||
approval_policy,
|
||||
sandbox_mode,
|
||||
model_provider,
|
||||
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
compact_prompt,
|
||||
include_apply_patch_tool,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
derive_config_from_params(&self.cli_overrides, Some(cli_overrides), overrides).await
|
||||
}
|
||||
None => Ok(self.config.as_ref().clone()),
|
||||
};
|
||||
let config = match config {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("error deriving config: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path
|
||||
} else if let Some(conversation_id) = conversation_id {
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(Some(found_path)) => found_path,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for conversation id {conversation_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate conversation id {conversation_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"either path or conversation id must be provided".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
CodexErr::Io(_) | CodexErr::Json(_) => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
|
||||
_ => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
format!("error forking conversation: {err}"),
|
||||
),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::SessionConfigured(
|
||||
SessionConfiguredNotification {
|
||||
session_id: session_configured.session_id,
|
||||
model: session_configured.model.clone(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
history_log_id: session_configured.history_log_id,
|
||||
history_entry_count: session_configured.history_entry_count,
|
||||
initial_messages: session_configured.initial_messages.clone(),
|
||||
rollout_path: session_configured.rollout_path.clone(),
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let initial_messages = session_configured
|
||||
.initial_messages
|
||||
.map(|msgs| msgs.into_iter().collect());
|
||||
|
||||
// Reply with conversation id + model and initial messages (when present)
|
||||
let response = ForkConversationResponse {
|
||||
conversation_id: thread_id,
|
||||
model: session_configured.model.clone(),
|
||||
initial_messages,
|
||||
rollout_path: session_configured.rollout_path.clone(),
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn send_invalid_request_error(&self, request_id: RequestId, message: String) {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
|
||||
Reference in New Issue
Block a user