support best of n

This commit is contained in:
easong-openai
2025-09-26 04:29:33 -07:00
parent e20e4edbab
commit 5fa64b7ae1
10 changed files with 992 additions and 295 deletions

View File

@@ -1,5 +1,6 @@
use crate::ApplyOutcome;
use crate::ApplyStatus;
use crate::AttemptStatus;
use crate::CloudBackend;
use crate::DiffSummary;
use crate::Error;
@@ -7,11 +8,13 @@ use crate::Result;
use crate::TaskId;
use crate::TaskStatus;
use crate::TaskSummary;
use crate::TurnAttempt;
use crate::api::TaskText;
use chrono::DateTime;
use chrono::Utc;
use serde_json::Value;
use std::cmp::Ordering;
use std::collections::HashMap;
use codex_backend_client as backend;
@@ -143,233 +146,71 @@ impl CloudBackend for HttpClient {
if messages.is_empty() {
messages.extend(extract_assistant_messages_from_body(&body));
}
Ok(TaskText { prompt, messages })
}
async fn apply_task(&self, _id: TaskId) -> Result<ApplyOutcome> {
let id = _id.0;
// Fetch diff fresh and apply locally via git (unified diffs).
let details = self
.backend
.get_task_details(&id)
.await
.map_err(|e| Error::Http(format!("get_task_details failed: {e}")))?;
let diff = details
.unified_diff()
.ok_or_else(|| Error::Msg(format!("No diff available for task {id}")))?;
// Enforce unified diff format only
if !is_unified_diff(&diff) {
let summary = summarize_patch_for_logging(&diff);
append_error_log(&format!(
"apply_error: id={id} format=non-unified; {summary}"
));
return Ok(ApplyOutcome {
applied: false,
status: ApplyStatus::Error,
message: "Expected unified git diff; backend returned an incompatible format."
.to_string(),
skipped_paths: Vec::new(),
conflict_paths: Vec::new(),
});
}
// Run the new Git apply engine
let req = codex_git_apply::ApplyGitRequest {
cwd: std::env::current_dir().unwrap_or_else(|_| std::env::temp_dir()),
diff: diff.clone(),
revert: false,
preflight: false,
};
let r = codex_git_apply::apply_git_patch(&req)
.map_err(|e| Error::Io(format!("git apply failed to run: {e}")))?;
let status = if r.exit_code == 0 {
ApplyStatus::Success
} else if !r.applied_paths.is_empty() || !r.conflicted_paths.is_empty() {
ApplyStatus::Partial
} else {
ApplyStatus::Error
};
let is_preflight = r.cmd_for_log.contains("--check");
let applied = matches!(status, ApplyStatus::Success) && !is_preflight;
let message = if is_preflight {
match status {
ApplyStatus::Success => format!("Preflight passed for task {id} (applies cleanly)"),
ApplyStatus::Partial => format!(
"Preflight: patch does not fully apply for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
ApplyStatus::Error => format!(
"Preflight failed for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
}
} else {
match status {
ApplyStatus::Success => {
format!(
"Applied task {id} locally ({} files)",
r.applied_paths.len()
)
}
ApplyStatus::Partial => {
format!(
"Apply partially succeeded for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
)
}
ApplyStatus::Error => {
format!(
"Apply failed for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
)
}
}
};
// Log details on partial and error
if matches!(status, ApplyStatus::Partial | ApplyStatus::Error)
|| (is_preflight && !matches!(status, ApplyStatus::Success))
{
let mut log = String::new();
let summary = summarize_patch_for_logging(&diff);
use std::fmt::Write as _;
let _ = writeln!(
&mut log,
"apply_result: id={} status={:?} applied={} skipped={} conflicts={} cmd={}",
id,
status,
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len(),
r.cmd_for_log
);
let _ = writeln!(
&mut log,
"stdout_tail=\n{}\nstderr_tail=\n{}",
tail(&r.stdout, 2000),
tail(&r.stderr, 2000)
);
let _ = writeln!(&mut log, "{summary}");
let _ = writeln!(
&mut log,
"----- PATCH BEGIN -----\n{diff}\n----- PATCH END -----"
);
append_error_log(&log);
}
Ok(ApplyOutcome {
applied,
status,
message,
skipped_paths: r.skipped_paths,
conflict_paths: r.conflicted_paths,
let turn_map = details.current_assistant_turn.as_ref();
let turn_id = turn_map
.and_then(|m| m.get("id"))
.and_then(Value::as_str)
.map(|s| s.to_string());
let sibling_turn_ids = turn_map
.and_then(|m| m.get("sibling_turn_ids"))
.and_then(Value::as_array)
.map(|arr| {
arr.iter()
.filter_map(Value::as_str)
.map(|s| s.to_string())
.collect()
})
.unwrap_or_default();
let attempt_placement = turn_map
.and_then(|m| m.get("attempt_placement"))
.and_then(Value::as_i64);
let attempt_status = attempt_status_from_str(
turn_map
.and_then(|m| m.get("turn_status"))
.and_then(Value::as_str),
);
Ok(TaskText {
prompt,
messages,
turn_id,
sibling_turn_ids,
attempt_placement,
attempt_status,
})
}
async fn apply_task_preflight(&self, _id: TaskId) -> Result<ApplyOutcome> {
let id = _id.0;
// Fetch diff fresh and apply locally via git (unified diffs).
let details = self
async fn list_sibling_attempts(
&self,
task: TaskId,
turn_id: String,
) -> Result<Vec<TurnAttempt>> {
let resp = self
.backend
.get_task_details(&id)
.list_sibling_turns(&task.0, &turn_id)
.await
.map_err(|e| Error::Http(format!("get_task_details failed: {e}")))?;
let diff = details
.unified_diff()
.ok_or_else(|| Error::Msg(format!("No diff available for task {id}")))?;
// Enforce unified diff format only
if !is_unified_diff(&diff) {
let summary = summarize_patch_for_logging(&diff);
append_error_log(&format!(
"apply_error: id={id} format=non-unified; {summary}"
));
return Ok(ApplyOutcome {
applied: false,
status: ApplyStatus::Error,
message: "Expected unified git diff; backend returned an incompatible format."
.to_string(),
skipped_paths: Vec::new(),
conflict_paths: Vec::new(),
});
}
.map_err(|e| Error::Http(format!("list_sibling_turns failed: {e}")))?;
// Preflight: run with --check (no changes)
let req = codex_git_apply::ApplyGitRequest {
cwd: std::env::current_dir().unwrap_or_else(|_| std::env::temp_dir()),
diff: diff.clone(),
revert: false,
preflight: true,
};
let r = codex_git_apply::apply_git_patch(&req)
.map_err(|e| Error::Io(format!("git apply failed to run: {e}")))?;
let mut attempts: Vec<TurnAttempt> = resp
.sibling_turns
.iter()
.filter_map(turn_attempt_from_map)
.collect();
attempts.sort_by(compare_attempts);
Ok(attempts)
}
let status = if r.exit_code == 0 {
ApplyStatus::Success
} else if !r.applied_paths.is_empty() || !r.conflicted_paths.is_empty() {
ApplyStatus::Partial
} else {
ApplyStatus::Error
};
let message = match status {
ApplyStatus::Success => format!("Preflight passed for task {id} (applies cleanly)"),
ApplyStatus::Partial => format!(
"Preflight: patch does not fully apply for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
ApplyStatus::Error => format!(
"Preflight failed for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
};
async fn apply_task(&self, _id: TaskId, diff_override: Option<String>) -> Result<ApplyOutcome> {
let id = _id.0;
self.apply_with_diff(id, diff_override, false).await
}
if !matches!(status, ApplyStatus::Success) {
let mut log = String::new();
let summary = summarize_patch_for_logging(&diff);
use std::fmt::Write as _;
let _ = writeln!(
&mut log,
"apply_preflight_result: id={} status={:?} applied={} skipped={} conflicts={} cmd={}",
id,
status,
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len(),
r.cmd_for_log
);
let _ = writeln!(
&mut log,
"stdout_tail=\n{}\nstderr_tail=\n{}",
tail(&r.stdout, 2000),
tail(&r.stderr, 2000)
);
let _ = writeln!(&mut log, "{summary}");
let _ = writeln!(
&mut log,
"----- PATCH BEGIN -----\n{diff}\n----- PATCH END -----"
);
append_error_log(&log);
}
Ok(ApplyOutcome {
applied: false,
status,
message,
skipped_paths: r.skipped_paths,
conflict_paths: r.conflicted_paths,
})
async fn apply_task_preflight(
&self,
_id: TaskId,
diff_override: Option<String>,
) -> Result<ApplyOutcome> {
let id = _id.0;
self.apply_with_diff(id, diff_override, true).await
}
async fn create_task(
@@ -430,6 +271,147 @@ impl CloudBackend for HttpClient {
/// Best-effort extraction of assistant text messages from a raw `get_task_details` body.
/// Falls back to worklog messages when structured turns are not present.
impl HttpClient {
async fn apply_with_diff(
&self,
id: String,
diff_override: Option<String>,
preflight: bool,
) -> Result<ApplyOutcome> {
let diff = match diff_override {
Some(diff) => diff,
None => {
let details = self
.backend
.get_task_details(&id)
.await
.map_err(|e| Error::Http(format!("get_task_details failed: {e}")))?;
details
.unified_diff()
.ok_or_else(|| Error::Msg(format!("No diff available for task {id}")))?
}
};
if !is_unified_diff(&diff) {
let summary = summarize_patch_for_logging(&diff);
let mode = if preflight { "preflight" } else { "apply" };
append_error_log(&format!(
"apply_error: id={id} mode={mode} format=non-unified; {summary}"
));
return Ok(ApplyOutcome {
applied: false,
status: ApplyStatus::Error,
message: "Expected unified git diff; backend returned an incompatible format."
.to_string(),
skipped_paths: Vec::new(),
conflict_paths: Vec::new(),
});
}
let req = codex_git_apply::ApplyGitRequest {
cwd: std::env::current_dir().unwrap_or_else(|_| std::env::temp_dir()),
diff: diff.clone(),
revert: false,
preflight,
};
let r = codex_git_apply::apply_git_patch(&req)
.map_err(|e| Error::Io(format!("git apply failed to run: {e}")))?;
let status = if r.exit_code == 0 {
ApplyStatus::Success
} else if !r.applied_paths.is_empty() || !r.conflicted_paths.is_empty() {
ApplyStatus::Partial
} else {
ApplyStatus::Error
};
let applied = matches!(status, ApplyStatus::Success) && !preflight;
let message = if preflight {
match status {
ApplyStatus::Success => format!("Preflight passed for task {id} (applies cleanly)"),
ApplyStatus::Partial => format!(
"Preflight: patch does not fully apply for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
ApplyStatus::Error => format!(
"Preflight failed for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
}
} else {
match status {
ApplyStatus::Success => format!(
"Applied task {id} locally ({} files)",
r.applied_paths.len()
),
ApplyStatus::Partial => format!(
"Apply partially succeeded for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
ApplyStatus::Error => format!(
"Apply failed for task {id} (applied={}, skipped={}, conflicts={})",
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len()
),
}
};
if matches!(status, ApplyStatus::Partial | ApplyStatus::Error)
|| (preflight && !matches!(status, ApplyStatus::Success))
{
let mut log = String::new();
let summary = summarize_patch_for_logging(&diff);
let mode = if preflight { "preflight" } else { "apply" };
use std::fmt::Write as _;
let _ = writeln!(
&mut log,
"apply_result: mode={} id={} status={:?} applied={} skipped={} conflicts={} cmd={}",
mode,
id,
status,
r.applied_paths.len(),
r.skipped_paths.len(),
r.conflicted_paths.len(),
r.cmd_for_log
);
let _ = writeln!(
&mut log,
"stdout_tail=
{}
stderr_tail=
{}",
tail(&r.stdout, 2000),
tail(&r.stderr, 2000)
);
let _ = writeln!(&mut log, "{summary}");
let _ = writeln!(
&mut log,
"----- PATCH BEGIN -----
{}
----- PATCH END -----",
diff
);
append_error_log(&log);
}
Ok(ApplyOutcome {
applied,
status,
message,
skipped_paths: r.skipped_paths,
conflict_paths: r.conflicted_paths,
})
}
}
fn extract_assistant_messages_from_body(body: &str) -> Vec<String> {
let mut msgs = Vec::new();
if let Ok(full) = serde_json::from_str::<serde_json::Value>(body)
@@ -473,6 +455,125 @@ fn extract_assistant_messages_from_body(body: &str) -> Vec<String> {
msgs
}
fn turn_attempt_from_map(turn: &HashMap<String, Value>) -> Option<TurnAttempt> {
let turn_id = turn.get("id").and_then(Value::as_str)?.to_string();
let attempt_placement = turn.get("attempt_placement").and_then(Value::as_i64);
let created_at = parse_timestamp_value(turn.get("created_at"));
let status = attempt_status_from_str(turn.get("turn_status").and_then(Value::as_str));
let diff = extract_diff_from_turn(turn);
let messages = extract_assistant_messages_from_turn(turn);
Some(TurnAttempt {
turn_id,
attempt_placement,
created_at,
status,
diff,
messages,
})
}
fn compare_attempts(a: &TurnAttempt, b: &TurnAttempt) -> Ordering {
match (a.attempt_placement, b.attempt_placement) {
(Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => match (a.created_at, b.created_at) {
(Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => a.turn_id.cmp(&b.turn_id),
},
}
}
fn extract_diff_from_turn(turn: &HashMap<String, Value>) -> Option<String> {
let items = turn.get("output_items").and_then(Value::as_array)?;
for item in items {
match item.get("type").and_then(Value::as_str) {
Some("output_diff") => {
if let Some(diff) = item.get("diff").and_then(Value::as_str) {
if !diff.is_empty() {
return Some(diff.to_string());
}
}
}
Some("pr") => {
if let Some(diff) = item
.get("output_diff")
.and_then(Value::as_object)
.and_then(|od| od.get("diff"))
.and_then(Value::as_str)
{
if !diff.is_empty() {
return Some(diff.to_string());
}
}
}
_ => {}
}
}
None
}
fn extract_assistant_messages_from_turn(turn: &HashMap<String, Value>) -> Vec<String> {
let mut msgs = Vec::new();
if let Some(items) = turn.get("output_items").and_then(Value::as_array) {
for item in items {
if item.get("type").and_then(Value::as_str) != Some("message") {
continue;
}
if let Some(content) = item.get("content").and_then(Value::as_array) {
for part in content {
if part.get("content_type").and_then(Value::as_str) == Some("text")
&& let Some(txt) = part.get("text").and_then(Value::as_str)
{
if !txt.is_empty() {
msgs.push(txt.to_string());
}
}
}
}
}
}
if msgs.is_empty() {
if let Some(err) = turn.get("error").and_then(Value::as_object) {
let message = err.get("message").and_then(Value::as_str).unwrap_or("");
let code = err.get("code").and_then(Value::as_str).unwrap_or("");
if !message.is_empty() || !code.is_empty() {
let text = if !code.is_empty() && !message.is_empty() {
format!("{code}: {message}")
} else if !code.is_empty() {
code.to_string()
} else {
message.to_string()
};
msgs.push(format!("Task failed: {text}"));
}
}
}
msgs
}
fn parse_timestamp_value(v: Option<&Value>) -> Option<DateTime<Utc>> {
let raw = v?.as_f64()?;
let secs = raw as i64;
let nanos = ((raw - secs as f64) * 1_000_000_000.0) as u32;
Some(DateTime::<Utc>::from(
std::time::UNIX_EPOCH + std::time::Duration::new(secs.max(0) as u64, nanos),
))
}
fn attempt_status_from_str(s: Option<&str>) -> AttemptStatus {
match s.unwrap_or("") {
"pending" => AttemptStatus::Pending,
"in_progress" => AttemptStatus::InProgress,
"completed" => AttemptStatus::Completed,
"failed" => AttemptStatus::Failed,
"cancelled" => AttemptStatus::Cancelled,
_ => AttemptStatus::Unknown,
}
}
fn map_task_list_item_to_summary(src: backend::TaskListItem) -> TaskSummary {
fn env_label_from_status_display(v: Option<&HashMap<String, Value>>) -> Option<String> {
let obj = v?;
@@ -527,6 +628,15 @@ fn map_task_list_item_to_summary(src: backend::TaskListItem) -> TaskSummary {
out
}
fn attempt_total_from_status_display(v: Option<&HashMap<String, Value>>) -> Option<usize> {
let map = v?;
let latest = map
.get("latest_turn_status_display")
.and_then(Value::as_object)?;
let siblings = latest.get("sibling_turn_ids").and_then(Value::as_array)?;
Some(siblings.len().saturating_add(1))
}
TaskSummary {
id: TaskId(src.id),
title: src.title,
@@ -539,6 +649,7 @@ fn map_task_list_item_to_summary(src: backend::TaskListItem) -> TaskSummary {
.pull_requests
.as_ref()
.map_or(false, |prs| !prs.is_empty()),
attempt_total: attempt_total_from_status_display(src.task_status_display.as_ref()),
}
}