chore: unify conversation with thread name (#8830)

Done and verified by Codex + refactor feature of RustRover
This commit is contained in:
jif-oai
2026-01-07 17:04:53 +00:00
committed by GitHub
parent 0d788e6263
commit 116059c3a0
83 changed files with 1094 additions and 1203 deletions

View File

@@ -16,10 +16,10 @@ pub use cli::ReviewArgs;
use codex_common::oss::ensure_oss_provider_ready;
use codex_common::oss::get_default_model_for_oss_provider;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
use codex_core::NewConversation;
use codex_core::NewThread;
use codex_core::OLLAMA_OSS_PROVIDER_ID;
use codex_core::ThreadManager;
use codex_core::auth::enforce_login_restrictions;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
@@ -55,7 +55,7 @@ use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::default_client::set_default_originator;
use codex_core::find_conversation_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
enum InitialOperation {
UserTurn {
@@ -286,33 +286,29 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let default_model = conversation_manager
let thread_manager = ThreadManager::new(auth_manager.clone(), SessionSource::Exec);
let default_model = thread_manager
.get_models_manager()
.get_model(&config.model, &config)
.await;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewConversation {
conversation_id: _,
conversation,
let NewThread {
thread_id: _,
thread,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command.as_ref() {
let resume_path = resolve_resume_path(&config, args).await?;
if let Some(path) = resume_path {
conversation_manager
.resume_conversation_from_rollout(config.clone(), path, auth_manager.clone())
thread_manager
.resume_thread_from_rollout(config.clone(), path, auth_manager.clone())
.await?
} else {
conversation_manager
.new_conversation(config.clone())
.await?
thread_manager.start_thread(config.clone()).await?
}
} else {
conversation_manager
.new_conversation(config.clone())
.await?
thread_manager.start_thread(config.clone()).await?
};
let (initial_operation, prompt_summary) = match (command, prompt, images) {
(Some(ExecCommand::Review(review_cli)), _, _) => {
@@ -378,20 +374,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
{
let conversation = conversation.clone();
let thread = thread.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any inflight task.
conversation.submit(Op::Interrupt).await.ok();
thread.submit(Op::Interrupt).await.ok();
// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = conversation.next_event() => match res {
res = thread.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");
@@ -420,7 +416,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
items,
output_schema,
} => {
let task_id = conversation
let task_id = thread
.submit(Op::UserTurn {
items,
cwd: default_cwd,
@@ -436,7 +432,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
task_id
}
InitialOperation::Review { review_request } => {
let task_id = conversation.submit(Op::Review { review_request }).await?;
let task_id = thread.submit(Op::Review { review_request }).await?;
info!("Sent review request with event ID: {task_id}");
task_id
}
@@ -449,7 +445,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
while let Some(event) = rx.recv().await {
if let EventMsg::ElicitationRequest(ev) = &event.msg {
// Automatically cancel elicitation requests in exec mode.
conversation
thread
.submit(Op::ResolveElicitation {
server_name: ev.server_name.clone(),
request_id: ev.id.clone(),
@@ -464,7 +460,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
conversation.submit(Op::Shutdown).await?;
thread.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
@@ -485,7 +481,7 @@ async fn resolve_resume_path(
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
let default_provider_filter = vec![config.model_provider_id.clone()];
match codex_core::RolloutRecorder::list_conversations(
match codex_core::RolloutRecorder::list_threads(
&config.codex_home,
1,
None,
@@ -497,12 +493,12 @@ async fn resolve_resume_path(
{
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
Err(e) => {
error!("Error listing conversations: {e}");
error!("Error listing threads: {e}");
Ok(None)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
let path = find_conversation_path_by_id_str(&config.codex_home, id_str).await?;
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
} else {
Ok(None)