mirror of
https://github.com/openai/codex.git
synced 2026-05-05 22:01:37 +03:00
- Treat explicit null thread instructions as a blank-slate override while preserving omitted-field fallback behavior. - Preserve null through rollout resume/fork and keep explicit empty strings distinct. - Add app-server v2 start/fork coverage for the tri-state instruction params.
321 lines
11 KiB
Rust
321 lines
11 KiB
Rust
use std::borrow::Cow;
|
|
use std::sync::Arc;
|
|
|
|
use codex_protocol::config_types::WebSearchMode;
|
|
use codex_protocol::items::TurnItem;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
|
|
use codex_protocol::protocol::AgentMessageDeltaEvent;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::Event;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::ExitedReviewModeEvent;
|
|
use codex_protocol::protocol::ItemCompletedEvent;
|
|
use codex_protocol::protocol::ReviewOutputEvent;
|
|
use codex_protocol::protocol::SubAgentSource;
|
|
use codex_utils_template::Template;
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
use crate::codex::Session;
|
|
use crate::codex::TurnContext;
|
|
use crate::codex_delegate::run_codex_thread_one_shot;
|
|
use crate::config::Constrained;
|
|
use crate::review_format::format_review_findings_block;
|
|
use crate::review_format::render_review_output_text;
|
|
use crate::state::TaskKind;
|
|
use codex_features::Feature;
|
|
use codex_protocol::user_input::UserInput;
|
|
use std::sync::LazyLock;
|
|
|
|
use super::SessionTask;
|
|
use super::SessionTaskContext;
|
|
|
|
static REVIEW_EXIT_SUCCESS_TEMPLATE: LazyLock<Template> = LazyLock::new(|| {
|
|
let normalized =
|
|
normalize_review_template_line_endings(crate::client_common::REVIEW_EXIT_SUCCESS_TMPL);
|
|
Template::parse(normalized.as_ref())
|
|
.unwrap_or_else(|err| panic!("review exit success template must parse: {err}"))
|
|
});
|
|
|
|
#[derive(Clone, Copy)]
|
|
pub(crate) struct ReviewTask;
|
|
|
|
impl ReviewTask {
|
|
pub(crate) fn new() -> Self {
|
|
Self
|
|
}
|
|
}
|
|
|
|
impl SessionTask for ReviewTask {
|
|
fn kind(&self) -> TaskKind {
|
|
TaskKind::Review
|
|
}
|
|
|
|
fn span_name(&self) -> &'static str {
|
|
"session_task.review"
|
|
}
|
|
|
|
async fn run(
|
|
self: Arc<Self>,
|
|
session: Arc<SessionTaskContext>,
|
|
ctx: Arc<TurnContext>,
|
|
input: Vec<UserInput>,
|
|
cancellation_token: CancellationToken,
|
|
) -> Option<String> {
|
|
session.session.services.session_telemetry.counter(
|
|
"codex.task.review",
|
|
/*inc*/ 1,
|
|
&[],
|
|
);
|
|
|
|
// Start sub-codex conversation and get the receiver for events.
|
|
let output = match start_review_conversation(
|
|
session.clone(),
|
|
ctx.clone(),
|
|
input,
|
|
cancellation_token.clone(),
|
|
)
|
|
.await
|
|
{
|
|
Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await,
|
|
None => None,
|
|
};
|
|
if !cancellation_token.is_cancelled() {
|
|
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
|
|
}
|
|
None
|
|
}
|
|
|
|
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
|
exit_review_mode(session.clone_session(), /*review_output*/ None, ctx).await;
|
|
}
|
|
}
|
|
|
|
async fn start_review_conversation(
|
|
session: Arc<SessionTaskContext>,
|
|
ctx: Arc<TurnContext>,
|
|
input: Vec<UserInput>,
|
|
cancellation_token: CancellationToken,
|
|
) -> Option<async_channel::Receiver<Event>> {
|
|
let config = ctx.config.clone();
|
|
let mut sub_agent_config = config.as_ref().clone();
|
|
// Carry over review-only feature restrictions so the delegate cannot
|
|
// re-enable blocked tools (web search, collab tools, view image).
|
|
if let Err(err) = sub_agent_config
|
|
.web_search_mode
|
|
.set(WebSearchMode::Disabled)
|
|
{
|
|
panic!("by construction Constrained<WebSearchMode> must always support Disabled: {err}");
|
|
}
|
|
let _ = sub_agent_config.features.disable(Feature::SpawnCsv);
|
|
let _ = sub_agent_config.features.disable(Feature::Collab);
|
|
|
|
// Set explicit review rubric for the sub-agent
|
|
sub_agent_config.base_instructions = Some(Some(crate::REVIEW_PROMPT.to_string()));
|
|
sub_agent_config.permissions.approval_policy = Constrained::allow_only(AskForApproval::Never);
|
|
|
|
let model = config
|
|
.review_model
|
|
.clone()
|
|
.unwrap_or_else(|| ctx.model_info.slug.clone());
|
|
sub_agent_config.model = Some(model);
|
|
(run_codex_thread_one_shot(
|
|
sub_agent_config,
|
|
session.auth_manager(),
|
|
session.models_manager(),
|
|
input,
|
|
session.clone_session(),
|
|
ctx.clone(),
|
|
cancellation_token,
|
|
SubAgentSource::Review,
|
|
/*final_output_json_schema*/ None,
|
|
/*initial_history*/ None,
|
|
)
|
|
.await)
|
|
.ok()
|
|
.map(|io| io.rx_event)
|
|
}
|
|
|
|
async fn process_review_events(
|
|
session: Arc<SessionTaskContext>,
|
|
ctx: Arc<TurnContext>,
|
|
receiver: async_channel::Receiver<Event>,
|
|
) -> Option<ReviewOutputEvent> {
|
|
let mut prev_agent_message: Option<Event> = None;
|
|
while let Ok(event) = receiver.recv().await {
|
|
match event.clone().msg {
|
|
EventMsg::AgentMessage(_) => {
|
|
if let Some(prev) = prev_agent_message.take() {
|
|
session
|
|
.clone_session()
|
|
.send_event(ctx.as_ref(), prev.msg)
|
|
.await;
|
|
}
|
|
prev_agent_message = Some(event);
|
|
}
|
|
// Suppress ItemCompleted only for assistant messages: forwarding it
|
|
// would trigger legacy AgentMessage via as_legacy_events(), which this
|
|
// review flow intentionally hides in favor of structured output.
|
|
EventMsg::ItemCompleted(ItemCompletedEvent {
|
|
item: TurnItem::AgentMessage(_),
|
|
..
|
|
})
|
|
| EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. })
|
|
| EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {}
|
|
EventMsg::TurnComplete(task_complete) => {
|
|
// Parse review output from the last agent message (if present).
|
|
let out = task_complete
|
|
.last_agent_message
|
|
.as_deref()
|
|
.map(parse_review_output_event);
|
|
return out;
|
|
}
|
|
EventMsg::TurnAborted(_) => {
|
|
// Cancellation or abort: consumer will finalize with None.
|
|
return None;
|
|
}
|
|
other => {
|
|
session
|
|
.clone_session()
|
|
.send_event(ctx.as_ref(), other)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
// Channel closed without TurnComplete: treat as interrupted.
|
|
None
|
|
}
|
|
|
|
/// Parse a ReviewOutputEvent from a text blob returned by the reviewer model.
|
|
/// If the text is valid JSON matching ReviewOutputEvent, deserialize it.
|
|
/// Otherwise, attempt to extract the first JSON object substring and parse it.
|
|
/// If parsing still fails, return a structured fallback carrying the plain text
|
|
/// in `overall_explanation`.
|
|
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
|
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
|
|
return ev;
|
|
}
|
|
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
|
|
&& start < end
|
|
&& let Some(slice) = text.get(start..=end)
|
|
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
|
|
{
|
|
return ev;
|
|
}
|
|
ReviewOutputEvent {
|
|
overall_explanation: text.to_string(),
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
|
|
/// and records a developer message with the review output.
|
|
pub(crate) async fn exit_review_mode(
|
|
session: Arc<Session>,
|
|
review_output: Option<ReviewOutputEvent>,
|
|
ctx: Arc<TurnContext>,
|
|
) {
|
|
const REVIEW_USER_MESSAGE_ID: &str = "review_rollout_user";
|
|
const REVIEW_ASSISTANT_MESSAGE_ID: &str = "review_rollout_assistant";
|
|
let (user_message, assistant_message) = if let Some(out) = review_output.clone() {
|
|
let mut findings_str = String::new();
|
|
let text = out.overall_explanation.trim();
|
|
if !text.is_empty() {
|
|
findings_str.push_str(text);
|
|
}
|
|
if !out.findings.is_empty() {
|
|
let block = format_review_findings_block(&out.findings, /*selection*/ None);
|
|
findings_str.push_str(&format!("\n{block}"));
|
|
}
|
|
let rendered = render_review_exit_success(&findings_str);
|
|
let assistant_message = render_review_output_text(&out);
|
|
(rendered, assistant_message)
|
|
} else {
|
|
let rendered = normalize_review_template_line_endings(
|
|
crate::client_common::REVIEW_EXIT_INTERRUPTED_TMPL,
|
|
)
|
|
.into_owned();
|
|
let assistant_message =
|
|
"Review was interrupted. Please re-run /review and wait for it to complete."
|
|
.to_string();
|
|
(rendered, assistant_message)
|
|
};
|
|
|
|
session
|
|
.record_conversation_items(
|
|
&ctx,
|
|
&[ResponseItem::Message {
|
|
id: Some(REVIEW_USER_MESSAGE_ID.to_string()),
|
|
role: "user".to_string(),
|
|
content: vec![ContentItem::InputText { text: user_message }],
|
|
end_turn: None,
|
|
phase: None,
|
|
}],
|
|
)
|
|
.await;
|
|
|
|
session
|
|
.send_event(
|
|
ctx.as_ref(),
|
|
EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }),
|
|
)
|
|
.await;
|
|
session
|
|
.record_response_item_and_emit_turn_item(
|
|
ctx.as_ref(),
|
|
ResponseItem::Message {
|
|
id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()),
|
|
role: "assistant".to_string(),
|
|
content: vec![ContentItem::OutputText {
|
|
text: assistant_message,
|
|
}],
|
|
end_turn: None,
|
|
phase: None,
|
|
},
|
|
)
|
|
.await;
|
|
|
|
// Review turns can run before any regular user turn, so explicitly
|
|
// materialize rollout persistence. Do this after emitting review output so
|
|
// file creation + git metadata collection cannot delay client-facing items.
|
|
session.ensure_rollout_materialized().await;
|
|
}
|
|
|
|
fn render_review_exit_success(results: &str) -> String {
|
|
REVIEW_EXIT_SUCCESS_TEMPLATE
|
|
.render([("results", results)])
|
|
.unwrap_or_else(|err| panic!("review exit success template must render: {err}"))
|
|
}
|
|
|
|
fn normalize_review_template_line_endings(template: &str) -> Cow<'_, str> {
|
|
if template.contains('\r') {
|
|
Cow::Owned(template.replace("\r\n", "\n").replace('\r', "\n"))
|
|
} else {
|
|
Cow::Borrowed(template)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::normalize_review_template_line_endings;
|
|
use super::render_review_exit_success;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn render_review_exit_success_replaces_results_placeholder() {
|
|
assert_eq!(
|
|
render_review_exit_success("Finding A\nFinding B"),
|
|
"<user_action>\n <context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>\n <action>review</action>\n <results>\n Finding A\nFinding B\n </results>\n </user_action>\n"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn normalize_review_template_line_endings_rewrites_crlf() {
|
|
assert_eq!(
|
|
normalize_review_template_line_endings("<user_action>\r\n <results>\r\n None.\r\n"),
|
|
"<user_action>\n <results>\n None.\n"
|
|
);
|
|
}
|
|
}
|