logic cleanup

This commit is contained in:
Roy Han
2026-03-24 17:21:25 -07:00
parent 7f10658d93
commit ade9c6fc5b
4 changed files with 72 additions and 423 deletions

View File

@@ -15,7 +15,6 @@ use crate::agent::agent_status_from_event;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::AppInvocation;
use crate::analytics_client::CodexTurnEvent;
use crate::analytics_client::CodexTurnSteerEvent;
use crate::analytics_client::InvocationType;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
@@ -238,9 +237,6 @@ pub(crate) struct PreviousTurnSettings {
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::git_info::get_git_repo_root;
use crate::guardian::GuardianReviewSessionManager;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
@@ -325,6 +321,8 @@ use crate::skills::injection::ToolMentionKind;
use crate::skills::injection::app_id_from_path;
use crate::skills::injection::tool_kind_for_path;
use crate::skills::resolve_skill_dependencies_for_turn;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
@@ -353,6 +351,7 @@ use crate::unified_exec::UnifiedExecProcessManager;
use crate::util::backoff;
use crate::windows_sandbox::WindowsSandboxLevelExt;
use codex_async_utils::OrCancelExt;
use codex_git_utils::get_git_repo_root;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::metrics::names::THREAD_STARTED_METRIC;
@@ -406,7 +405,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) skills_watcher: Arc<SkillsWatcher>,
pub(crate) conversation_history: InitialHistory,
pub(crate) session_source: SessionSource,
pub(crate) agent_control: AgentControl,
@@ -459,7 +458,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager,
file_watcher,
skills_watcher,
conversation_history,
session_source,
agent_control,
@@ -647,7 +646,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager.clone(),
file_watcher,
skills_watcher,
agent_control,
)
.await
@@ -1303,13 +1302,13 @@ impl Session {
self.out_of_band_elicitation_paused.send_replace(paused);
}
fn start_file_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.file_watcher.subscribe();
fn start_skills_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.skills_watcher.subscribe();
let weak_sess = Arc::downgrade(self);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
Ok(SkillsWatcherEvent::SkillsChanged { .. }) => {
let Some(sess) = weak_sess.upgrade() else {
break;
};
@@ -1446,7 +1445,7 @@ impl Session {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
file_watcher: Arc<FileWatcher>,
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -1865,7 +1864,7 @@ impl Session {
skills_manager,
plugins_manager: Arc::clone(&plugins_manager),
mcp_manager: Arc::clone(&mcp_manager),
file_watcher,
skills_watcher,
agent_control,
network_proxy,
network_approval: Arc::clone(&network_approval),
@@ -1944,7 +1943,7 @@ impl Session {
}
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_file_watcher_listener();
sess.start_skills_watcher_listener();
// Construct sandbox_state before MCP startup so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
@@ -2749,16 +2748,6 @@ impl Session {
))
}
async fn active_turn_tracking(&self) -> Option<crate::analytics_client::TrackEventsContext> {
let active = self.active_turn.lock().await;
let (_, task) = active.as_ref()?.tasks.first()?;
Some(build_track_events_context(
task.turn_context.model_info.slug.clone(),
self.conversation_id.to_string(),
task.turn_context.sub_id.clone(),
))
}
pub(crate) async fn record_execpolicy_amendment_message(
&self,
sub_id: &str,
@@ -3906,7 +3895,6 @@ impl Session {
let Some((active_turn_id, _)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let active_turn_id = active_turn_id.clone();
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
@@ -3934,14 +3922,7 @@ impl Session {
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
drop(turn_state);
drop(active);
if let Some(tracking) = self.active_turn_tracking().await {
self.services
.analytics_events_client
.track_turn_steer(tracking, CodexTurnSteerEvent);
}
Ok(active_turn_id)
Ok(active_turn_id.clone())
}
/// Returns the input if there was no task running to inject into.
@@ -3985,6 +3966,17 @@ impl Session {
}
}
pub(crate) async fn pending_input_snapshot(&self) -> Vec<ResponseInputItem> {
let active = self.active_turn.lock().await;
match active.as_ref() {
Some(at) => {
let ts = at.turn_state.lock().await;
ts.pending_input_snapshot()
}
None => Vec::with_capacity(0),
}
}
/// Queue response items to be injected into the next active turn created for this session.
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
@@ -3999,6 +3991,12 @@ impl Session {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn queued_response_items_for_next_turn_snapshot(
&self,
) -> Vec<ResponseInputItem> {
self.idle_pending_input.lock().await.clone()
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
@@ -4677,27 +4675,27 @@ mod handlers {
}
}
/// Records an inter-agent assistant envelope and, when requested, wakes the recipient by
/// starting a regular turn if the session is currently idle.
pub async fn inter_agent_communication(
sess: &Arc<Session>,
sub_id: String,
communication: InterAgentCommunication,
) {
let pending_item = communication.to_response_input_item();
if sess
.inject_response_items(vec![pending_item.clone()])
.await
.is_ok()
{
if let Ok(()) = sess.inject_response_items(vec![pending_item.clone()]).await {
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
sess.queue_response_items_for_next_turn(vec![pending_item])
.await;
sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new())
.await;
if communication.trigger_turn {
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new())
.await;
}
}
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
@@ -6025,14 +6023,11 @@ pub(crate) async fn run_turn(
sess.services.analytics_events_client.track_turn_event(
tracking,
CodexTurnEvent {
submission_type: match submission_type {
SubmissionType::Prompt => Some(SubmissionType::Prompt),
SubmissionType::PromptQueued => Some(SubmissionType::PromptQueued),
},
submission_type: Some(submission_type),
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: turn_context.reasoning_summary,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,