Merge remote-tracking branch 'origin/main' into jif/fork

# Conflicts:
#	codex-rs/cli/src/main.rs
#	codex-rs/tui/src/cli.rs
This commit is contained in:
jif-oai
2025-11-19 11:41:17 +00:00
147 changed files with 7156 additions and 2010 deletions

View File

@@ -60,6 +60,8 @@ use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::ReviewStartParams;
use codex_app_server_protocol::ReviewTarget;
use codex_app_server_protocol::SandboxMode;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
@@ -109,12 +111,15 @@ use codex_core::config_loader::load_config_as_toml;
use codex_core::default_client::get_codex_user_agent;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::features::Feature;
use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox;
use codex_core::git_info::git_diff_to_remote;
use codex_core::parse_cursor;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewRequest;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::read_head_for_summary;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
@@ -151,6 +156,14 @@ use uuid::Uuid;
type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>;
pub(crate) type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInterruptQueue>>>;
/// Per-conversation accumulation of the latest states e.g. error message while a turn runs.
#[derive(Default, Clone)]
pub(crate) struct TurnSummary {
pub(crate) last_error_message: Option<String>,
}
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ConversationId, TurnSummary>>>;
// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
struct ActiveLogin {
@@ -158,8 +171,8 @@ struct ActiveLogin {
login_id: Uuid,
}
impl ActiveLogin {
fn drop(&self) {
impl Drop for ActiveLogin {
fn drop(&mut self) {
self.shutdown_handle.shutdown();
}
}
@@ -175,6 +188,7 @@ pub(crate) struct CodexMessageProcessor {
active_login: Arc<Mutex<Option<ActiveLogin>>>,
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: PendingInterrupts,
turn_summary_store: TurnSummaryStore,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}
@@ -227,11 +241,97 @@ impl CodexMessageProcessor {
conversation_listeners: HashMap::new(),
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
fn review_request_from_target(
target: ReviewTarget,
append_to_original_thread: bool,
) -> Result<(ReviewRequest, String), JSONRPCErrorError> {
fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
}
}
match target {
// TODO(jif) those messages will be extracted in a follow-up PR.
ReviewTarget::UncommittedChanges => Ok((
ReviewRequest {
prompt: "Review the current code changes (staged, unstaged, and untracked files) and provide prioritized findings.".to_string(),
user_facing_hint: "current changes".to_string(),
append_to_original_thread,
},
"Review uncommitted changes".to_string(),
)),
ReviewTarget::BaseBranch { branch } => {
let branch = branch.trim().to_string();
if branch.is_empty() {
return Err(invalid_request("branch must not be empty".to_string()));
}
let prompt = format!("Review the code changes against the base branch '{branch}'. Start by finding the merge diff between the current branch and {branch}'s upstream e.g. (`git merge-base HEAD \"$(git rev-parse --abbrev-ref \"{branch}@{{upstream}}\")\"`), then run `git diff` against that SHA to see what changes we would merge into the {branch} branch. Provide prioritized, actionable findings.");
let hint = format!("changes against '{branch}'");
let display = format!("Review changes against base branch '{branch}'");
Ok((
ReviewRequest {
prompt,
user_facing_hint: hint,
append_to_original_thread,
},
display,
))
}
ReviewTarget::Commit { sha, title } => {
let sha = sha.trim().to_string();
if sha.is_empty() {
return Err(invalid_request("sha must not be empty".to_string()));
}
let brief_title = title
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty());
let prompt = if let Some(title) = brief_title.clone() {
format!("Review the code changes introduced by commit {sha} (\"{title}\"). Provide prioritized, actionable findings.")
} else {
format!("Review the code changes introduced by commit {sha}. Provide prioritized, actionable findings.")
};
let short_sha = sha.chars().take(7).collect::<String>();
let hint = format!("commit {short_sha}");
let display = if let Some(title) = brief_title {
format!("Review commit {short_sha}: {title}")
} else {
format!("Review commit {short_sha}")
};
Ok((
ReviewRequest {
prompt,
user_facing_hint: hint,
append_to_original_thread,
},
display,
))
}
ReviewTarget::Custom { instructions } => {
let trimmed = instructions.trim().to_string();
if trimmed.is_empty() {
return Err(invalid_request("instructions must not be empty".to_string()));
}
Ok((
ReviewRequest {
prompt: trimmed.clone(),
user_facing_hint: trimmed.clone(),
append_to_original_thread,
},
trimmed,
))
}
}
}
pub async fn process_request(&mut self, request: ClientRequest) {
match request {
ClientRequest::Initialize { .. } => {
@@ -263,6 +363,9 @@ impl CodexMessageProcessor {
ClientRequest::TurnInterrupt { request_id, params } => {
self.turn_interrupt(request_id, params).await;
}
ClientRequest::ReviewStart { request_id, params } => {
self.review_start(request_id, params).await;
}
ClientRequest::NewConversation { request_id, params } => {
// Do not tokio::spawn() to process new_conversation()
// asynchronously because we need to ensure the conversation is
@@ -417,7 +520,7 @@ impl CodexMessageProcessor {
{
let mut guard = self.active_login.lock().await;
if let Some(active) = guard.take() {
active.drop();
drop(active);
}
}
@@ -525,7 +628,7 @@ impl CodexMessageProcessor {
{
let mut guard = self.active_login.lock().await;
if let Some(existing) = guard.take() {
existing.drop();
drop(existing);
}
*guard = Some(ActiveLogin {
shutdown_handle: shutdown_handle.clone(),
@@ -615,7 +718,7 @@ impl CodexMessageProcessor {
{
let mut guard = self.active_login.lock().await;
if let Some(existing) = guard.take() {
existing.drop();
drop(existing);
}
*guard = Some(ActiveLogin {
shutdown_handle: shutdown_handle.clone(),
@@ -704,7 +807,7 @@ impl CodexMessageProcessor {
let mut guard = self.active_login.lock().await;
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
if let Some(active) = guard.take() {
active.drop();
drop(active);
}
Ok(())
} else {
@@ -758,7 +861,7 @@ impl CodexMessageProcessor {
{
let mut guard = self.active_login.lock().await;
if let Some(active) = guard.take() {
active.drop();
drop(active);
}
}
@@ -1147,7 +1250,17 @@ impl CodexMessageProcessor {
..Default::default()
};
let config = match derive_config_from_params(overrides, cli_overrides).await {
// Persist windows sandbox feature.
// TODO: persist default config in general.
let mut cli_overrides = cli_overrides.unwrap_or_default();
if cfg!(target_os = "windows") && self.config.features.enabled(Feature::WindowsSandbox) {
cli_overrides.insert(
"features.enable_experimental_windows_sandbox".to_string(),
serde_json::json!(true),
);
}
let config = match derive_config_from_params(overrides, Some(cli_overrides)).await {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
@@ -1212,8 +1325,12 @@ impl CodexMessageProcessor {
match self.conversation_manager.new_conversation(config).await {
Ok(new_conv) => {
let conversation_id = new_conv.conversation_id;
let rollout_path = new_conv.session_configured.rollout_path.clone();
let NewConversation {
conversation_id,
session_configured,
..
} = new_conv;
let rollout_path = session_configured.rollout_path.clone();
let fallback_provider = self.config.model_provider_id.as_str();
// A bit hacky, but the summary contains a lot of useful information for the thread
@@ -1238,8 +1355,22 @@ impl CodexMessageProcessor {
}
};
let SessionConfiguredEvent {
model,
model_provider_id,
cwd,
approval_policy,
sandbox_policy,
..
} = session_configured;
let response = ThreadStartResponse {
thread: thread.clone(),
model,
model_provider: model_provider_id,
cwd,
approval_policy: approval_policy.into(),
sandbox: sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
// Auto-attach a conversation listener when starting a thread.
@@ -1552,7 +1683,15 @@ impl CodexMessageProcessor {
return;
}
};
let response = ThreadResumeResponse { thread };
let response = ThreadResumeResponse {
thread,
model: session_configured.model,
model_provider: session_configured.model_provider_id,
cwd: session_configured.cwd,
approval_policy: session_configured.approval_policy.into(),
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
@@ -2282,9 +2421,6 @@ impl CodexMessageProcessor {
}
};
// Keep a copy of v2 inputs for the notification payload.
let v2_inputs_for_notif = params.input.clone();
// Map v2 input items to core input items.
let mapped_items: Vec<CoreInputItem> = params
.input
@@ -2324,12 +2460,8 @@ impl CodexMessageProcessor {
Ok(turn_id) => {
let turn = Turn {
id: turn_id.clone(),
items: vec![ThreadItem::UserMessage {
id: turn_id,
content: v2_inputs_for_notif,
}],
items: vec![],
status: TurnStatus::InProgress,
error: None,
};
let response = TurnStartResponse { turn: turn.clone() };
@@ -2352,6 +2484,64 @@ impl CodexMessageProcessor {
}
}
async fn review_start(&self, request_id: RequestId, params: ReviewStartParams) {
let ReviewStartParams {
thread_id,
target,
append_to_original_thread,
} = params;
let (_, conversation) = match self.conversation_from_thread_id(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let (review_request, display_text) =
match Self::review_request_from_target(target, append_to_original_thread) {
Ok(value) => value,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
let turn_id = conversation.submit(Op::Review { review_request }).await;
match turn_id {
Ok(turn_id) => {
let mut items = Vec::new();
if !display_text.is_empty() {
items.push(ThreadItem::UserMessage {
id: turn_id.clone(),
content: vec![V2UserInput::Text { text: display_text }],
});
}
let turn = Turn {
id: turn_id.clone(),
items,
status: TurnStatus::InProgress,
};
let response = TurnStartResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
let notif = TurnStartedNotification { turn };
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start review: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn turn_interrupt(&mut self, request_id: RequestId, params: TurnInterruptParams) {
let TurnInterruptParams { thread_id, .. } = params;
@@ -2451,6 +2641,7 @@ impl CodexMessageProcessor {
let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone();
let turn_summary_store = self.turn_summary_store.clone();
let api_version_for_task = api_version;
tokio::spawn(async move {
loop {
@@ -2507,6 +2698,7 @@ impl CodexMessageProcessor {
conversation.clone(),
outgoing_for_task.clone(),
pending_interrupts.clone(),
turn_summary_store.clone(),
api_version_for_task,
)
.await;