mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
V3
This commit is contained in:
@@ -121,7 +121,6 @@ use crate::safety::assess_safety_for_untrusted_command;
|
||||
use crate::shell;
|
||||
use crate::state::SessionState;
|
||||
use crate::state::TurnState;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::unified_exec::UnifiedExecSessionManager;
|
||||
use crate::user_instructions::UserInstructions;
|
||||
use crate::user_notification::UserNotification;
|
||||
@@ -791,7 +790,7 @@ impl Session {
|
||||
|
||||
async fn on_exec_command_begin(
|
||||
&self,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
turn_state: &TurnState,
|
||||
exec_command_context: ExecCommandContext,
|
||||
) {
|
||||
let ExecCommandContext {
|
||||
@@ -806,7 +805,7 @@ impl Session {
|
||||
user_explicitly_approved_this_action,
|
||||
changes,
|
||||
}) => {
|
||||
turn_diff_tracker.on_patch_begin(&changes);
|
||||
turn_state.on_patch_begin(&changes).await;
|
||||
|
||||
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
|
||||
call_id,
|
||||
@@ -833,7 +832,7 @@ impl Session {
|
||||
|
||||
async fn on_exec_command_end(
|
||||
&self,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
turn_state: &TurnState,
|
||||
sub_id: &str,
|
||||
call_id: &str,
|
||||
output: &ExecToolCallOutput,
|
||||
@@ -881,7 +880,7 @@ impl Session {
|
||||
// If this is an apply_patch, after we emit the end patch, emit a second event
|
||||
// with the full turn diff if there is one.
|
||||
if is_apply_patch {
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
let unified_diff = turn_state.take_unified_diff().await;
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||||
let event = Event {
|
||||
@@ -898,7 +897,7 @@ impl Session {
|
||||
/// Returns the output of the exec tool call.
|
||||
async fn run_exec_with_events<'a>(
|
||||
&self,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
turn_state: &TurnState,
|
||||
begin_ctx: ExecCommandContext,
|
||||
exec_args: ExecInvokeArgs<'a>,
|
||||
) -> crate::error::Result<ExecToolCallOutput> {
|
||||
@@ -906,7 +905,7 @@ impl Session {
|
||||
let sub_id = begin_ctx.sub_id.clone();
|
||||
let call_id = begin_ctx.call_id.clone();
|
||||
|
||||
self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
|
||||
self.on_exec_command_begin(turn_state, begin_ctx.clone())
|
||||
.await;
|
||||
|
||||
let result = process_exec_tool_call(
|
||||
@@ -935,14 +934,8 @@ impl Session {
|
||||
&output_stderr
|
||||
}
|
||||
};
|
||||
self.on_exec_command_end(
|
||||
turn_diff_tracker,
|
||||
&sub_id,
|
||||
&call_id,
|
||||
borrowed,
|
||||
is_apply_patch,
|
||||
)
|
||||
.await;
|
||||
self.on_exec_command_end(turn_state, &sub_id, &call_id, borrowed, is_apply_patch)
|
||||
.await;
|
||||
|
||||
result
|
||||
}
|
||||
@@ -1668,7 +1661,7 @@ async fn spawn_review_thread(
|
||||
async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
let turn_context = turn_state.turn_context();
|
||||
let sub_id = turn_state.sub_id().to_string();
|
||||
let Some(initial_input_for_turn) = turn_state.initial_input() else {
|
||||
let Some(initial_input_for_turn) = turn_state.take_initial_input().await else {
|
||||
return;
|
||||
};
|
||||
let event = Event {
|
||||
@@ -1683,48 +1676,23 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
// model sees a fresh conversation without the parent session's history.
|
||||
// For normal turns, continue recording to the session history as before.
|
||||
let is_review_mode = turn_context.is_review_mode;
|
||||
let mut review_thread_history: Vec<ResponseItem> = Vec::new();
|
||||
let mut current_turn_readiness = turn_state.initial_readiness();
|
||||
if is_review_mode {
|
||||
// Seed review threads with environment context so the model knows the working directory.
|
||||
review_thread_history.extend(sess.build_initial_context(turn_context.as_ref()));
|
||||
review_thread_history.push(initial_input_for_turn.clone().into());
|
||||
let mut history = sess.build_initial_context(turn_context.as_ref());
|
||||
history.push(initial_input_for_turn.clone().into());
|
||||
turn_state.set_review_history(history).await;
|
||||
} else {
|
||||
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
let mut turn_diff_tracker = TurnDiffTracker::new();
|
||||
let mut auto_compact_recently_attempted = false;
|
||||
|
||||
loop {
|
||||
// Note that pending_input represents follow-up messages submitted while the
|
||||
// model is running. These are queued on the TurnState mailbox.
|
||||
let (pending_input, maybe_new_readiness) = turn_state
|
||||
.drain_mailbox(current_turn_readiness.clone())
|
||||
.await;
|
||||
if let Some(readiness) = maybe_new_readiness {
|
||||
current_turn_readiness = Some(readiness);
|
||||
}
|
||||
let (pending_input, turn_readiness) = turn_state.drain_mailbox().await;
|
||||
|
||||
// Construct the input that we will send to the model.
|
||||
//
|
||||
// - For review threads, use the isolated in-memory history so the
|
||||
// model sees a fresh conversation (no parent history/user_instructions).
|
||||
//
|
||||
// - For normal turns, use the session's full history. When using the
|
||||
// chat completions API (or ZDR clients), the model needs the full
|
||||
// conversation history on each turn. The rollout file, however, should
|
||||
// only record the new items that originated in this turn so that it
|
||||
// represents an append-only log without duplicates.
|
||||
let turn_input: Vec<ResponseItem> = if is_review_mode {
|
||||
if !pending_input.is_empty() {
|
||||
review_thread_history.extend(pending_input.clone());
|
||||
turn_state.extend_review_history(&pending_input).await;
|
||||
}
|
||||
review_thread_history.clone()
|
||||
turn_state.review_history().await
|
||||
} else {
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
sess.turn_input_with_history(pending_input).await
|
||||
@@ -1743,13 +1711,13 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
match run_turn(
|
||||
&sess,
|
||||
turn_context.as_ref(),
|
||||
&mut turn_diff_tracker,
|
||||
turn_state.as_ref(),
|
||||
sub_id.clone(),
|
||||
turn_input,
|
||||
current_turn_readiness.clone(),
|
||||
turn_readiness.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -1864,8 +1832,9 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
if is_review_mode {
|
||||
review_thread_history
|
||||
.extend(items_to_record_in_conversation_history.clone());
|
||||
turn_state
|
||||
.extend_review_history(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
} else {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
@@ -1873,7 +1842,7 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
}
|
||||
|
||||
if token_limit_reached {
|
||||
if auto_compact_recently_attempted {
|
||||
if turn_state.mark_auto_compact_attempted().await {
|
||||
let limit_str = limit.to_string();
|
||||
let current_tokens = total_usage_tokens
|
||||
.map(|tokens| tokens.to_string())
|
||||
@@ -1889,17 +1858,19 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
sess.send_event(event).await;
|
||||
break;
|
||||
}
|
||||
auto_compact_recently_attempted = true;
|
||||
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto_compact_recently_attempted = false;
|
||||
turn_state.reset_auto_compact_attempted().await;
|
||||
|
||||
if responses.is_empty() {
|
||||
last_agent_message = get_last_assistant_message_from_turn(
|
||||
let last_agent_message = get_last_assistant_message_from_turn(
|
||||
&items_to_record_in_conversation_history,
|
||||
);
|
||||
turn_state
|
||||
.set_last_agent_message(last_agent_message.clone())
|
||||
.await;
|
||||
sess.notifier()
|
||||
.notify(&UserNotification::AgentTurnComplete {
|
||||
turn_id: sub_id.clone(),
|
||||
@@ -1925,6 +1896,8 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
|
||||
}
|
||||
}
|
||||
|
||||
let last_agent_message = turn_state.last_agent_message().await;
|
||||
|
||||
// If this was a review thread and we have a final assistant message,
|
||||
// try to parse it as a ReviewOutput.
|
||||
//
|
||||
@@ -1976,12 +1949,12 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
||||
|
||||
async fn run_turn(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
turn_state: &TurnState,
|
||||
sub_id: String,
|
||||
input: Vec<ResponseItem>,
|
||||
turn_readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
let turn_context = turn_state.turn_context();
|
||||
let tools = get_openai_tools(
|
||||
&turn_context.tools_config,
|
||||
Some(sess.mcp_connection_manager.list_all_tools()),
|
||||
@@ -1998,8 +1971,8 @@ async fn run_turn(
|
||||
loop {
|
||||
match try_run_turn(
|
||||
sess,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
turn_state,
|
||||
turn_context.as_ref(),
|
||||
&sub_id,
|
||||
&prompt,
|
||||
turn_readiness.clone(),
|
||||
@@ -2068,8 +2041,8 @@ struct TurnRunResult {
|
||||
|
||||
async fn try_run_turn(
|
||||
sess: &Session,
|
||||
turn_state: &TurnState,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: &str,
|
||||
prompt: &Prompt,
|
||||
turn_readiness: Option<Arc<ReadinessFlag>>,
|
||||
@@ -2170,8 +2143,8 @@ async fn try_run_turn(
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
let response = handle_response_item(
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
item.clone(),
|
||||
turn_readiness.clone(),
|
||||
@@ -2200,7 +2173,7 @@ async fn try_run_turn(
|
||||
sess.update_token_usage_info(sub_id, turn_context, token_usage.as_ref())
|
||||
.await;
|
||||
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
let unified_diff = turn_state.take_unified_diff().await;
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||||
let event = Event {
|
||||
@@ -2261,8 +2234,8 @@ async fn try_run_turn(
|
||||
|
||||
async fn handle_response_item(
|
||||
sess: &Session,
|
||||
turn_state: &TurnState,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: &str,
|
||||
item: ResponseItem,
|
||||
turn_readiness: Option<Arc<ReadinessFlag>>,
|
||||
@@ -2282,8 +2255,8 @@ async fn handle_response_item(
|
||||
Some(
|
||||
handle_function_call(
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id.to_string(),
|
||||
name,
|
||||
arguments,
|
||||
@@ -2327,8 +2300,8 @@ async fn handle_response_item(
|
||||
handle_container_exec_with_params(
|
||||
exec_params,
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id.to_string(),
|
||||
effective_call_id,
|
||||
)
|
||||
@@ -2344,8 +2317,8 @@ async fn handle_response_item(
|
||||
} => Some(
|
||||
handle_custom_tool_call(
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id.to_string(),
|
||||
name,
|
||||
input,
|
||||
@@ -2455,8 +2428,8 @@ async fn handle_unified_exec_tool_call(
|
||||
|
||||
async fn handle_function_call(
|
||||
sess: &Session,
|
||||
turn_state: &TurnState,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: String,
|
||||
name: String,
|
||||
arguments: String,
|
||||
@@ -2473,8 +2446,8 @@ async fn handle_function_call(
|
||||
handle_container_exec_with_params(
|
||||
params,
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
@@ -2569,8 +2542,8 @@ async fn handle_function_call(
|
||||
handle_container_exec_with_params(
|
||||
exec_params,
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
@@ -2647,8 +2620,8 @@ async fn handle_function_call(
|
||||
|
||||
async fn handle_custom_tool_call(
|
||||
sess: &Session,
|
||||
turn_state: &TurnState,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: String,
|
||||
name: String,
|
||||
input: String,
|
||||
@@ -2668,8 +2641,8 @@ async fn handle_custom_tool_call(
|
||||
let resp = handle_container_exec_with_params(
|
||||
exec_params,
|
||||
sess,
|
||||
turn_state,
|
||||
turn_context,
|
||||
turn_diff_tracker,
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
@@ -2760,8 +2733,8 @@ fn maybe_translate_shell_command(
|
||||
async fn handle_container_exec_with_params(
|
||||
params: ExecParams,
|
||||
sess: &Session,
|
||||
turn_state: &TurnState,
|
||||
turn_context: &TurnContext,
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
sub_id: String,
|
||||
call_id: String,
|
||||
) -> ResponseInputItem {
|
||||
@@ -2935,7 +2908,7 @@ async fn handle_container_exec_with_params(
|
||||
let params = maybe_translate_shell_command(params, sess, turn_context);
|
||||
let output_result = sess
|
||||
.run_exec_with_events(
|
||||
turn_diff_tracker,
|
||||
turn_state,
|
||||
exec_command_context.clone(),
|
||||
ExecInvokeArgs {
|
||||
params: params.clone(),
|
||||
@@ -2972,7 +2945,7 @@ async fn handle_container_exec_with_params(
|
||||
}
|
||||
Err(CodexErr::Sandbox(error)) => {
|
||||
handle_sandbox_error(
|
||||
turn_diff_tracker,
|
||||
turn_state,
|
||||
params,
|
||||
exec_command_context,
|
||||
error,
|
||||
@@ -2993,7 +2966,7 @@ async fn handle_container_exec_with_params(
|
||||
}
|
||||
|
||||
async fn handle_sandbox_error(
|
||||
turn_diff_tracker: &mut TurnDiffTracker,
|
||||
turn_state: &TurnState,
|
||||
params: ExecParams,
|
||||
exec_command_context: ExecCommandContext,
|
||||
error: SandboxErr,
|
||||
@@ -3070,7 +3043,7 @@ async fn handle_sandbox_error(
|
||||
// examined and the sandbox has been set to `None`.
|
||||
let retry_output_result = sess
|
||||
.run_exec_with_events(
|
||||
turn_diff_tracker,
|
||||
turn_state,
|
||||
exec_command_context.clone(),
|
||||
ExecInvokeArgs {
|
||||
params,
|
||||
@@ -3795,13 +3768,16 @@ mod tests {
|
||||
async fn rejects_escalated_permissions_when_policy_not_on_request() {
|
||||
use crate::exec::ExecParams;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::InputItem;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::state::TurnState;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
let (session, mut turn_context) = make_session_and_context();
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let mut turn_context = Arc::new(turn_context);
|
||||
// Ensure policy is NOT OnRequest so the early rejection path triggers
|
||||
turn_context.approval_policy = AskForApproval::OnFailure;
|
||||
Arc::get_mut(&mut turn_context).unwrap().approval_policy = AskForApproval::OnFailure;
|
||||
|
||||
let params = ExecParams {
|
||||
command: if cfg!(windows) {
|
||||
@@ -3829,20 +3805,27 @@ mod tests {
|
||||
..params.clone()
|
||||
};
|
||||
|
||||
let mut turn_diff_tracker = TurnDiffTracker::new();
|
||||
|
||||
let sub_id = "test-sub".to_string();
|
||||
let call_id = "test-call".to_string();
|
||||
|
||||
let resp = handle_container_exec_with_params(
|
||||
params,
|
||||
&session,
|
||||
&turn_context,
|
||||
&mut turn_diff_tracker,
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
.await;
|
||||
let resp = {
|
||||
let turn_state = TurnState::new(
|
||||
sub_id.clone(),
|
||||
Arc::clone(&turn_context),
|
||||
Vec::<InputItem>::new(),
|
||||
None,
|
||||
);
|
||||
|
||||
handle_container_exec_with_params(
|
||||
params,
|
||||
&session,
|
||||
&turn_state,
|
||||
turn_context.as_ref(),
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
let ResponseInputItem::FunctionCallOutput { output, .. } = resp else {
|
||||
panic!("expected FunctionCallOutput");
|
||||
@@ -3857,13 +3840,20 @@ mod tests {
|
||||
|
||||
// Now retry the same command WITHOUT escalated permissions; should succeed.
|
||||
// Force DangerFullAccess to avoid platform sandbox dependencies in tests.
|
||||
turn_context.sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
Arc::get_mut(&mut turn_context).unwrap().sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
|
||||
let turn_state = TurnState::new(
|
||||
"test-sub".to_string(),
|
||||
Arc::clone(&turn_context),
|
||||
Vec::<InputItem>::new(),
|
||||
None,
|
||||
);
|
||||
|
||||
let resp2 = handle_container_exec_with_params(
|
||||
params2,
|
||||
&session,
|
||||
&turn_context,
|
||||
&mut turn_diff_tracker,
|
||||
&turn_state,
|
||||
turn_context.as_ref(),
|
||||
"test-sub".to_string(),
|
||||
"test-call-2".to_string(),
|
||||
)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -10,8 +12,10 @@ use crate::client::ModelClient;
|
||||
use crate::config_types::ShellEnvironmentPolicy;
|
||||
use crate::openai_tools::ToolsConfig;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::FileChange;
|
||||
use crate::protocol::InputItem;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
@@ -40,18 +44,37 @@ impl TurnContext {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnMailbox {
|
||||
latest_readiness: Option<Arc<ReadinessFlag>>,
|
||||
struct TurnRuntime {
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
current_readiness: Option<Arc<ReadinessFlag>>,
|
||||
pending: VecDeque<ResponseInputItem>,
|
||||
review_history: Vec<ResponseItem>,
|
||||
last_agent_message: Option<String>,
|
||||
auto_compact_recently_attempted: bool,
|
||||
diff_tracker: TurnDiffTracker,
|
||||
}
|
||||
|
||||
impl TurnRuntime {
|
||||
fn new(
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
initial_input,
|
||||
current_readiness: readiness,
|
||||
pending: VecDeque::new(),
|
||||
review_history: Vec::new(),
|
||||
last_agent_message: None,
|
||||
auto_compact_recently_attempted: false,
|
||||
diff_tracker: TurnDiffTracker::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TurnState {
|
||||
sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
initial_readiness: Option<Arc<ReadinessFlag>>,
|
||||
mailbox: Mutex<TurnMailbox>,
|
||||
runtime: Mutex<TurnRuntime>,
|
||||
}
|
||||
|
||||
impl TurnState {
|
||||
@@ -66,13 +89,11 @@ impl TurnState {
|
||||
} else {
|
||||
Some(initial_input.into())
|
||||
};
|
||||
|
||||
let runtime = TurnRuntime::new(initial_input, readiness);
|
||||
Self {
|
||||
sub_id,
|
||||
turn_context,
|
||||
initial_input,
|
||||
initial_readiness: readiness,
|
||||
mailbox: Mutex::new(TurnMailbox::default()),
|
||||
runtime: Mutex::new(runtime),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,12 +105,20 @@ impl TurnState {
|
||||
Arc::clone(&self.turn_context)
|
||||
}
|
||||
|
||||
pub(crate) fn initial_input(&self) -> Option<ResponseInputItem> {
|
||||
self.initial_input.clone()
|
||||
pub(crate) async fn take_initial_input(&self) -> Option<ResponseInputItem> {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.initial_input.take()
|
||||
}
|
||||
|
||||
pub(crate) fn initial_readiness(&self) -> Option<Arc<ReadinessFlag>> {
|
||||
self.initial_readiness.clone()
|
||||
pub(crate) async fn drain_mailbox(&self) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
let items = runtime
|
||||
.pending
|
||||
.drain(..)
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<_>>();
|
||||
let readiness = runtime.current_readiness.clone();
|
||||
(items, readiness)
|
||||
}
|
||||
|
||||
pub(crate) async fn enqueue_user_input(
|
||||
@@ -97,24 +126,73 @@ impl TurnState {
|
||||
items: Vec<InputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) {
|
||||
let mut mailbox = self.mailbox.lock().await;
|
||||
if let Some(flag) = readiness {
|
||||
mailbox.latest_readiness = Some(flag);
|
||||
if readiness.is_some() {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.current_readiness = readiness;
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let response: ResponseInputItem = items.into();
|
||||
runtime.pending.push_back(response);
|
||||
return;
|
||||
}
|
||||
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let input: ResponseInputItem = items.into();
|
||||
mailbox.pending.push_back(input);
|
||||
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
let response: ResponseInputItem = items.into();
|
||||
runtime.pending.push_back(response);
|
||||
}
|
||||
|
||||
pub(crate) async fn drain_mailbox(
|
||||
&self,
|
||||
current: Option<Arc<ReadinessFlag>>,
|
||||
) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
|
||||
let mut mailbox = self.mailbox.lock().await;
|
||||
let readiness = mailbox.latest_readiness.take().or(current);
|
||||
let items = mailbox.pending.drain(..).map(ResponseItem::from).collect();
|
||||
(items, readiness)
|
||||
pub(crate) async fn set_review_history(&self, history: Vec<ResponseItem>) {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.review_history = history;
|
||||
}
|
||||
|
||||
pub(crate) async fn extend_review_history(&self, items: &[ResponseItem]) {
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.review_history.extend(items.iter().cloned());
|
||||
}
|
||||
|
||||
pub(crate) async fn review_history(&self) -> Vec<ResponseItem> {
|
||||
let runtime = self.runtime.lock().await;
|
||||
runtime.review_history.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_auto_compact_attempted(&self) -> bool {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
let already_attempted = runtime.auto_compact_recently_attempted;
|
||||
runtime.auto_compact_recently_attempted = true;
|
||||
already_attempted
|
||||
}
|
||||
|
||||
pub(crate) async fn reset_auto_compact_attempted(&self) {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.auto_compact_recently_attempted = false;
|
||||
}
|
||||
|
||||
pub(crate) async fn set_last_agent_message(&self, message: Option<String>) {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.last_agent_message = message;
|
||||
}
|
||||
|
||||
pub(crate) async fn last_agent_message(&self) -> Option<String> {
|
||||
let runtime = self.runtime.lock().await;
|
||||
runtime.last_agent_message.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn on_patch_begin(&self, changes: &HashMap<PathBuf, FileChange>) {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.diff_tracker.on_patch_begin(changes);
|
||||
}
|
||||
|
||||
pub(crate) async fn take_unified_diff(&self) -> Result<Option<String>> {
|
||||
let mut runtime = self.runtime.lock().await;
|
||||
runtime.diff_tracker.get_unified_diff()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,6 +278,9 @@ impl App {
|
||||
AppEvent::ConversationHistory(ev) => {
|
||||
self.on_conversation_history_for_backtrack(tui, ev).await?;
|
||||
}
|
||||
AppEvent::GhostSnapshotResult(event) => {
|
||||
self.chat_widget.handle_ghost_snapshot_event(event);
|
||||
}
|
||||
AppEvent::ExitRequest => {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::PathBuf;
|
||||
use codex_core::protocol::ConversationPathResponseEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_file_search::FileMatch;
|
||||
use codex_git_tooling::GhostCommit;
|
||||
|
||||
use crate::history_cell::HistoryCell;
|
||||
|
||||
@@ -68,6 +69,9 @@ pub(crate) enum AppEvent {
|
||||
/// Forwarded conversation history snapshot from the current conversation.
|
||||
ConversationHistory(ConversationPathResponseEvent),
|
||||
|
||||
/// Result of a ghost snapshot capture attempt.
|
||||
GhostSnapshotResult(GhostSnapshotEvent),
|
||||
|
||||
/// Open the branch picker option from the review popup.
|
||||
OpenReviewBranchPicker(PathBuf),
|
||||
|
||||
@@ -77,3 +81,12 @@ pub(crate) enum AppEvent {
|
||||
/// Open the custom prompt option from the review popup.
|
||||
OpenReviewCustomPrompt,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum GhostSnapshotEvent {
|
||||
Success(GhostCommit),
|
||||
Disabled {
|
||||
message: String,
|
||||
hint: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::GhostSnapshotEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::BottomPane;
|
||||
use crate::bottom_pane::BottomPaneParams;
|
||||
@@ -109,7 +110,10 @@ use codex_git_tooling::GhostCommit;
|
||||
use codex_git_tooling::GitToolingError;
|
||||
use codex_git_tooling::create_ghost_commit;
|
||||
use codex_git_tooling::restore_ghost_commit;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tracing::warn;
|
||||
|
||||
const MAX_TRACKED_GHOST_COMMITS: usize = 20;
|
||||
|
||||
@@ -814,7 +818,7 @@ impl ChatWidget {
|
||||
pending_notification: None,
|
||||
is_review_mode: false,
|
||||
ghost_snapshots: Vec::new(),
|
||||
ghost_snapshots_disabled: true,
|
||||
ghost_snapshots_disabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -876,7 +880,7 @@ impl ChatWidget {
|
||||
pending_notification: None,
|
||||
is_review_mode: false,
|
||||
ghost_snapshots: Vec::new(),
|
||||
ghost_snapshots_disabled: true,
|
||||
ghost_snapshots_disabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1128,8 +1132,59 @@ impl ChatWidget {
|
||||
|
||||
let readiness_flag = Arc::new(ReadinessFlag::new());
|
||||
agent::send_turn_readiness(&self.turn_readiness, Arc::clone(&readiness_flag));
|
||||
|
||||
self.capture_ghost_snapshot();
|
||||
let readiness_to_mark = Arc::clone(&readiness_flag);
|
||||
let capture_snapshot = !self.ghost_snapshots_disabled;
|
||||
let repo_path = self.config.cwd.clone();
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(token) = readiness_to_mark.subscribe().await {
|
||||
let _ = readiness_to_mark.mark_ready(token).await;
|
||||
}
|
||||
if capture_snapshot {
|
||||
let event = match spawn_blocking(move || {
|
||||
let options = CreateGhostCommitOptions::new(repo_path.as_path());
|
||||
create_ghost_commit(&options)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(commit)) => {
|
||||
AppEvent::GhostSnapshotResult(GhostSnapshotEvent::Success(commit))
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
warn!("failed to create ghost snapshot: {err}");
|
||||
let (message, hint) = match &err {
|
||||
GitToolingError::NotAGitRepository { .. } => (
|
||||
"Snapshots disabled: current directory is not a Git repository."
|
||||
.to_string(),
|
||||
None,
|
||||
),
|
||||
_ => (
|
||||
format!("Snapshots disabled after error: {err}"),
|
||||
Some(
|
||||
"Restart Codex after resolving the issue to re-enable snapshots."
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
};
|
||||
AppEvent::GhostSnapshotResult(GhostSnapshotEvent::Disabled {
|
||||
message,
|
||||
hint,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to join ghost snapshot task: {err}");
|
||||
AppEvent::GhostSnapshotResult(GhostSnapshotEvent::Disabled {
|
||||
message: format!("Snapshots disabled after internal error: {err}"),
|
||||
hint: Some(
|
||||
"Restart Codex after resolving the issue to re-enable snapshots."
|
||||
.to_string(),
|
||||
),
|
||||
})
|
||||
}
|
||||
};
|
||||
app_event_tx.send(event);
|
||||
}
|
||||
});
|
||||
|
||||
let mut items: Vec<InputItem> = Vec::new();
|
||||
|
||||
@@ -1162,37 +1217,17 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
fn capture_ghost_snapshot(&mut self) {
|
||||
if self.ghost_snapshots_disabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let options = CreateGhostCommitOptions::new(&self.config.cwd);
|
||||
match create_ghost_commit(&options) {
|
||||
Ok(commit) => {
|
||||
pub(crate) fn handle_ghost_snapshot_event(&mut self, event: GhostSnapshotEvent) {
|
||||
match event {
|
||||
GhostSnapshotEvent::Success(commit) => {
|
||||
self.ghost_snapshots.push(commit);
|
||||
if self.ghost_snapshots.len() > MAX_TRACKED_GHOST_COMMITS {
|
||||
self.ghost_snapshots.remove(0);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
GhostSnapshotEvent::Disabled { message, hint } => {
|
||||
self.ghost_snapshots_disabled = true;
|
||||
let (message, hint) = match &err {
|
||||
GitToolingError::NotAGitRepository { .. } => (
|
||||
"Snapshots disabled: current directory is not a Git repository."
|
||||
.to_string(),
|
||||
None,
|
||||
),
|
||||
_ => (
|
||||
format!("Snapshots disabled after error: {err}"),
|
||||
Some(
|
||||
"Restart Codex after resolving the issue to re-enable snapshots."
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
};
|
||||
self.add_info_message(message, hint);
|
||||
tracing::warn!("failed to create ghost snapshot: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,5 +97,6 @@ pub fn built_in_slash_commands() -> Vec<(&'static str, SlashCommand)> {
|
||||
}
|
||||
|
||||
fn beta_features_enabled() -> bool {
|
||||
std::env::var_os("BETA_FEATURE").is_some()
|
||||
true
|
||||
// std::env::var_os("BETA_FEATURE").is_some()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user