user message metadata

This commit is contained in:
Roy Han
2026-03-24 11:12:37 -07:00
parent 7141347e82
commit 07c0d34fa4
38 changed files with 659 additions and 118 deletions

View File

@@ -14,8 +14,11 @@ use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::AppInvocation;
use crate::analytics_client::CodexTurnEvent;
use crate::analytics_client::InputMessageMetadata;
use crate::analytics_client::InputMessageRole;
use crate::analytics_client::InvocationType;
use crate::analytics_client::TurnMetadata;
use crate::analytics_client::UserMessageType;
use crate::analytics_client::build_track_events_context;
use crate::apps::render_apps_section;
use crate::auth_env_telemetry::collect_auth_env_telemetry;
@@ -107,6 +110,7 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnContextNetworkItem;
use codex_protocol::protocol::UserMessageType as SubmittedUserMessageType;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::request_permissions::PermissionGrantScope;
use codex_protocol::request_permissions::RequestPermissionProfile;
@@ -236,6 +240,9 @@ pub(crate) struct PreviousTurnSettings {
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::git_info::get_git_repo_root;
use crate::guardian::GuardianReviewSessionManager;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
@@ -320,8 +327,6 @@ use crate::skills::injection::ToolMentionKind;
use crate::skills::injection::app_id_from_path;
use crate::skills::injection::tool_kind_for_path;
use crate::skills::resolve_skill_dependencies_for_turn;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
@@ -350,7 +355,6 @@ use crate::unified_exec::UnifiedExecProcessManager;
use crate::util::backoff;
use crate::windows_sandbox::WindowsSandboxLevelExt;
use codex_async_utils::OrCancelExt;
use codex_git_utils::get_git_repo_root;
use codex_otel::SessionTelemetry;
use codex_otel::TelemetryAuthMode;
use codex_otel::metrics::names::THREAD_STARTED_METRIC;
@@ -404,7 +408,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
pub(crate) skills_watcher: Arc<SkillsWatcher>,
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) conversation_history: InitialHistory,
pub(crate) session_source: SessionSource,
pub(crate) agent_control: AgentControl,
@@ -457,7 +461,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager,
skills_watcher,
file_watcher,
conversation_history,
session_source,
agent_control,
@@ -645,7 +649,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager.clone(),
skills_watcher,
file_watcher,
agent_control,
)
.await
@@ -848,6 +852,7 @@ pub(crate) struct TurnContext {
pub(crate) user_instructions: Option<String>,
pub(crate) collaboration_mode: CollaborationMode,
pub(crate) personality: Option<Personality>,
pub(crate) user_message_type: Option<SubmittedUserMessageType>,
pub(crate) approval_policy: Constrained<AskForApproval>,
pub(crate) sandbox_policy: Constrained<SandboxPolicy>,
pub(crate) file_system_sandbox_policy: FileSystemSandboxPolicy,
@@ -1185,6 +1190,7 @@ pub(crate) struct SessionSettingsUpdate {
pub(crate) final_output_json_schema: Option<Option<Value>>,
pub(crate) personality: Option<Personality>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) user_message_type: Option<SubmittedUserMessageType>,
}
impl Session {
@@ -1298,13 +1304,13 @@ impl Session {
self.out_of_band_elicitation_paused.send_replace(paused);
}
fn start_skills_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.skills_watcher.subscribe();
fn start_file_watcher_listener(self: &Arc<Self>) {
let mut rx = self.services.file_watcher.subscribe();
let weak_sess = Arc::downgrade(self);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(SkillsWatcherEvent::SkillsChanged { .. }) => {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
let Some(sess) = weak_sess.upgrade() else {
break;
};
@@ -1403,6 +1409,7 @@ impl Session {
user_instructions: session_configuration.user_instructions.clone(),
collaboration_mode: session_configuration.collaboration_mode.clone(),
personality: session_configuration.personality,
user_message_type: None,
approval_policy: session_configuration.approval_policy.clone(),
sandbox_policy: session_configuration.sandbox_policy.clone(),
file_system_sandbox_policy: session_configuration.file_system_sandbox_policy.clone(),
@@ -1440,7 +1447,7 @@ impl Session {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
skills_watcher: Arc<SkillsWatcher>,
file_watcher: Arc<FileWatcher>,
agent_control: AgentControl,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -1859,7 +1866,7 @@ impl Session {
skills_manager,
plugins_manager: Arc::clone(&plugins_manager),
mcp_manager: Arc::clone(&mcp_manager),
skills_watcher,
file_watcher,
agent_control,
network_proxy,
network_approval: Arc::clone(&network_approval),
@@ -1938,7 +1945,7 @@ impl Session {
}
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_skills_watcher_listener();
sess.start_file_watcher_listener();
// Construct sandbox_state before MCP startup so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
@@ -2394,6 +2401,7 @@ impl Session {
sub_id,
session_configuration,
updates.final_output_json_schema,
updates.user_message_type,
sandbox_policy_changed,
)
.await)
@@ -2404,6 +2412,7 @@ impl Session {
sub_id: String,
session_configuration: SessionConfiguration,
final_output_json_schema: Option<Option<Value>>,
user_message_type: Option<SubmittedUserMessageType>,
sandbox_policy_changed: bool,
) -> Arc<TurnContext> {
let per_turn_config = Self::build_per_turn_config(&session_configuration);
@@ -2471,6 +2480,7 @@ impl Session {
if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
}
turn_context.user_message_type = user_message_type;
let turn_context = Arc::new(turn_context);
turn_context.turn_metadata_state.spawn_git_enrichment_task();
turn_context
@@ -2575,6 +2585,7 @@ impl Session {
sub_id,
session_configuration,
/*final_output_json_schema*/ None,
/*user_message_type*/ None,
/*sandbox_policy_changed*/ false,
)
.await
@@ -2739,6 +2750,16 @@ impl Session {
))
}
async fn active_turn_tracking(&self) -> Option<crate::analytics_client::TrackEventsContext> {
let active = self.active_turn.lock().await;
let (_, task) = active.as_ref()?.tasks.first()?;
Some(build_track_events_context(
task.turn_context.model_info.slug.clone(),
self.conversation_id.to_string(),
task.turn_context.sub_id.clone(),
))
}
pub(crate) async fn record_execpolicy_amendment_message(
&self,
sub_id: &str,
@@ -3886,6 +3907,7 @@ impl Session {
let Some((active_turn_id, _)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let active_turn_id = active_turn_id.clone();
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
@@ -3913,7 +3935,20 @@ impl Session {
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
Ok(active_turn_id.clone())
drop(turn_state);
drop(active);
if let Some(tracking) = self.active_turn_tracking().await {
self.services
.analytics_events_client
.track_input_message_metadata(
tracking,
InputMessageMetadata {
message_role: InputMessageRole::User,
user_message_type: UserMessageType::PromptSteering,
},
);
}
Ok(active_turn_id)
}
/// Returns the input if there was no task running to inject into.
@@ -3957,17 +3992,6 @@ impl Session {
}
}
pub(crate) async fn pending_input_snapshot(&self) -> Vec<ResponseInputItem> {
let active = self.active_turn.lock().await;
match active.as_ref() {
Some(at) => {
let ts = at.turn_state.lock().await;
ts.pending_input_snapshot()
}
None => Vec::with_capacity(0),
}
}
/// Queue response items to be injected into the next active turn created for this session.
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
@@ -3982,12 +4006,6 @@ impl Session {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn queued_response_items_for_next_turn_snapshot(
&self,
) -> Vec<ResponseInputItem> {
self.idle_pending_input.lock().await.clone()
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
@@ -4320,7 +4338,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
.await;
false
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
Op::UserInput { .. } | Op::UserInputWithMetadata { .. } | Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
false
}
@@ -4603,6 +4621,7 @@ mod handlers {
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
user_message_type: None,
},
)
}
@@ -4613,6 +4632,19 @@ mod handlers {
items,
SessionSettingsUpdate {
final_output_json_schema: Some(final_output_json_schema),
user_message_type: None,
..Default::default()
},
),
Op::UserInputWithMetadata {
items,
final_output_json_schema,
user_message_type,
} => (
items,
SessionSettingsUpdate {
final_output_json_schema: Some(final_output_json_schema),
user_message_type,
..Default::default()
},
),
@@ -4651,27 +4683,27 @@ mod handlers {
}
}
/// Records an inter-agent assistant envelope and, when requested, wakes the recipient by
/// starting a regular turn if the session is currently idle.
pub async fn inter_agent_communication(
sess: &Arc<Session>,
sub_id: String,
communication: InterAgentCommunication,
) {
let pending_item = communication.to_response_input_item();
if let Ok(()) = sess.inject_response_items(vec![pending_item.clone()]).await {
if sess
.inject_response_items(vec![pending_item.clone()])
.await
.is_ok()
{
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
sess.queue_response_items_for_next_turn(vec![pending_item])
.await;
if communication.trigger_turn {
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new())
.await;
}
sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new())
.await;
}
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
@@ -5678,6 +5710,41 @@ pub(crate) async fn run_turn(
.await;
user_prompt_submit_outcome.additional_contexts
};
if !input.is_empty() {
let user_message_type = turn_context
.user_message_type
.unwrap_or(SubmittedUserMessageType::Prompt);
sess.services
.analytics_events_client
.track_input_message_metadata(
tracking.clone(),
InputMessageMetadata {
message_role: InputMessageRole::User,
user_message_type,
},
);
sess.services.analytics_events_client.track_turn_metadata(
tracking.clone(),
TurnMetadata {
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
collaboration_mode: turn_context.collaboration_mode.mode,
personality: turn_context.personality,
num_input_images: input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
},
);
}
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
@@ -5991,30 +6058,6 @@ pub(crate) async fn run_turn(
}
}
if !input.is_empty() {
sess.services.analytics_events_client.track_turn_event(
tracking,
CodexTurnEvent {
model_provider: turn_context.config.model_provider_id.clone(),
sandbox_policy: turn_context.sandbox_policy.get().clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: Some(turn_context.reasoning_summary),
service_tier: turn_context.config.service_tier,
approval_policy: turn_context.approval_policy.value(),
approvals_reviewer: turn_context.config.approvals_reviewer,
sandbox_network_access: turn_context.network_sandbox_policy.is_enabled(),
collaboration_mode: turn_context.collaboration_mode.mode,
personality: turn_context.personality,
num_input_images: input
.iter()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
},
);
}
last_agent_message
}