Compare commits

...

3 Commits

Author SHA1 Message Date
Dylan Hurd
19c984961b simplify 2026-04-27 22:18:42 -07:00
Dylan Hurd
9d92bc3de4 prevent shutdown leak 2026-04-27 21:56:03 -07:00
Dylan Hurd
dd4d8bd5d5 fix(auto-review) Simplify trunk locking 2026-04-27 20:29:29 -07:00
2 changed files with 219 additions and 52 deletions

View File

@@ -85,6 +85,7 @@ pub(crate) struct GuardianReviewSessionManager {
struct GuardianReviewSessionState {
trunk: Option<Arc<GuardianReviewSession>>,
ephemeral_reviews: Vec<Arc<GuardianReviewSession>>,
is_shutdown: bool,
}
struct GuardianReviewSession {
@@ -262,6 +263,7 @@ impl GuardianReviewSessionManager {
pub(crate) async fn shutdown(&self) {
let (review_session, ephemeral_reviews) = {
let mut state = self.state.lock().await;
state.is_shutdown = true;
(
state.trunk.take(),
std::mem::take(&mut state.ephemeral_reviews),
@@ -275,10 +277,6 @@ impl GuardianReviewSessionManager {
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "review session selection and trunk spawning must stay serialized"
)]
pub(super) async fn run_review(
&self,
params: GuardianReviewSessionParams,
@@ -295,6 +293,12 @@ impl GuardianReviewSessionManager {
.await
{
Ok(mut state) => {
if state.is_shutdown {
return (
GuardianReviewSessionOutcome::Aborted,
GuardianReviewAnalyticsResult::without_session(),
);
}
if let Some(trunk) = state.trunk.as_ref()
&& trunk.reuse_key != next_reuse_key
&& trunk.review_lock.try_acquire().is_ok()
@@ -302,53 +306,82 @@ impl GuardianReviewSessionManager {
stale_trunk_to_shutdown = state.trunk.take();
}
if state.trunk.is_none() {
let spawn_cancel_token = CancellationToken::new();
let review_session = match run_before_review_deadline_with_cancel(
deadline,
params.external_cancel.as_ref(),
&spawn_cancel_token,
Box::pin(spawn_guardian_review_session(
&params,
params.spawn_config.clone(),
next_reuse_key.clone(),
spawn_cancel_token.clone(),
/*fork_snapshot*/ None,
)),
)
.await
{
Ok(Ok(review_session)) => Arc::new(review_session),
Ok(Err(err)) => {
return (
GuardianReviewSessionOutcome::PromptBuildFailed(err),
GuardianReviewAnalyticsResult::without_session(),
);
}
Err(outcome) => {
return (outcome, GuardianReviewAnalyticsResult::without_session());
}
};
state.trunk = Some(Arc::clone(&review_session));
spawned_trunk = true;
}
state.trunk.as_ref().cloned()
}
Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()),
};
if let Some(review_session) = stale_trunk_to_shutdown {
if let Some(review_session) = stale_trunk_to_shutdown.take() {
review_session.shutdown_in_background();
}
let Some(trunk) = trunk_candidate else {
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(
"guardian review session was not available after spawn"
))),
GuardianReviewAnalyticsResult::without_session(),
);
let trunk = match trunk_candidate {
Some(trunk) => trunk,
None => {
let spawn_cancel_token = CancellationToken::new();
let cold_start = run_before_review_deadline_with_cancel(
deadline,
params.external_cancel.as_ref(),
&spawn_cancel_token,
Box::pin(async {
let spawned_review_session = Arc::new(
spawn_guardian_review_session(
&params,
params.spawn_config.clone(),
next_reuse_key.clone(),
spawn_cancel_token.clone(),
/*fork_snapshot*/ None,
)
.await?,
);
let mut state = self.state.lock().await;
if state.is_shutdown {
spawned_review_session.shutdown_in_background();
Ok(None)
} else if let Some(current_trunk) = state.trunk.as_ref().cloned() {
if current_trunk.reuse_key != next_reuse_key
&& current_trunk.review_lock.try_acquire().is_ok()
{
stale_trunk_to_shutdown =
state.trunk.replace(Arc::clone(&spawned_review_session));
Ok(Some((spawned_review_session, true)))
} else {
spawned_review_session.shutdown_in_background();
Ok(Some((current_trunk, false)))
}
} else {
state.trunk = Some(Arc::clone(&spawned_review_session));
Ok(Some((spawned_review_session, true)))
}
}),
)
.await;
let Some((trunk, installed_spawned_trunk)) = (match cold_start {
Ok(Ok(cold_start)) => cold_start,
Ok(Err(err)) => {
return (
GuardianReviewSessionOutcome::PromptBuildFailed(err),
GuardianReviewAnalyticsResult::without_session(),
);
}
Err(outcome) => {
return (outcome, GuardianReviewAnalyticsResult::without_session());
}
}) else {
return (
GuardianReviewSessionOutcome::Aborted,
GuardianReviewAnalyticsResult::without_session(),
);
};
spawned_trunk = installed_spawned_trunk;
if installed_spawned_trunk
&& let Some(review_session) = stale_trunk_to_shutdown.take()
{
review_session.shutdown_in_background();
}
trunk
}
};
if trunk.reuse_key != next_reuse_key {
@@ -468,12 +501,15 @@ impl GuardianReviewSessionManager {
}
}
async fn register_active_ephemeral(&self, review_session: Arc<GuardianReviewSession>) {
self.state
.lock()
.await
.ephemeral_reviews
.push(review_session);
async fn register_active_ephemeral(&self, review_session: Arc<GuardianReviewSession>) -> bool {
let mut state = self.state.lock().await;
if state.is_shutdown {
review_session.shutdown_in_background();
false
} else {
state.ephemeral_reviews.push(review_session);
true
}
}
async fn take_active_ephemeral(
@@ -521,8 +557,15 @@ impl GuardianReviewSessionManager {
}
Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()),
};
self.register_active_ephemeral(Arc::clone(&review_session))
.await;
if !self
.register_active_ephemeral(Arc::clone(&review_session))
.await
{
return (
GuardianReviewSessionOutcome::Aborted,
GuardianReviewAnalyticsResult::without_session(),
);
}
let mut cleanup =
EphemeralReviewCleanup::new(Arc::clone(&self.state), Arc::clone(&review_session));

View File

@@ -1600,6 +1600,130 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_parallel_cold_reviews_both_complete() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let first_assessment = serde_json::json!({
"risk_level": "low",
"user_authorization": "high",
"outcome": "allow",
"rationale": "first parallel guardian rationale",
})
.to_string();
let second_assessment = serde_json::json!({
"risk_level": "low",
"user_authorization": "high",
"outcome": "allow",
"rationale": "second parallel guardian rationale",
})
.to_string();
let (server, _) = start_streaming_sse_server(vec![
vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-guardian-parallel-1"),
ev_assistant_message("msg-guardian-parallel-1", &first_assessment),
ev_completed("resp-guardian-parallel-1"),
]),
}],
vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-guardian-parallel-2"),
ev_assistant_message("msg-guardian-parallel-2", &second_assessment),
ev_completed("resp-guardian-parallel-2"),
]),
}],
])
.await;
let (session, turn) = guardian_test_session_and_turn_with_base_url(server.uri()).await;
seed_guardian_parent_history(&session, &turn).await;
let first_session = Arc::clone(&session);
let first_turn = Arc::clone(&turn);
let first_review = tokio::spawn(async move {
run_guardian_review_session_for_test(
first_session,
first_turn,
GuardianApprovalRequest::Shell {
id: "shell-guardian-parallel-1".to_string(),
command: vec!["git".to_string(), "status".to_string()],
cwd: test_path_buf("/repo/codex-rs/core").abs(),
sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault,
additional_permissions: None,
justification: Some("Inspect repo state before proceeding.".to_string()),
},
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
)
.await
});
let second_session = Arc::clone(&session);
let second_turn = Arc::clone(&turn);
let second_review = tokio::spawn(async move {
run_guardian_review_session_for_test(
second_session,
second_turn,
GuardianApprovalRequest::Shell {
id: "shell-guardian-parallel-2".to_string(),
command: vec!["git".to_string(), "diff".to_string()],
cwd: test_path_buf("/repo/codex-rs/core").abs(),
sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault,
additional_permissions: None,
justification: Some("Inspect pending changes before proceeding.".to_string()),
},
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
)
.await
});
let (first_outcome, second_outcome) = tokio::time::timeout(Duration::from_secs(10), async {
tokio::try_join!(first_review, second_review)
})
.await
.expect("parallel guardian reviews should complete")?;
let (GuardianReviewOutcome::Completed(first_assessment), first_metadata) = first_outcome else {
panic!("expected first guardian assessment");
};
let (GuardianReviewOutcome::Completed(second_assessment), second_metadata) = second_outcome
else {
panic!("expected second guardian assessment");
};
assert_eq!(first_assessment.outcome, GuardianAssessmentOutcome::Allow);
assert_eq!(second_assessment.outcome, GuardianAssessmentOutcome::Allow);
for metadata in [&first_metadata, &second_metadata] {
assert!(matches!(
metadata.guardian_session_kind,
Some(
codex_analytics::GuardianReviewSessionKind::TrunkNew
| codex_analytics::GuardianReviewSessionKind::TrunkReused
| codex_analytics::GuardianReviewSessionKind::EphemeralForked
)
));
ThreadId::from_string(
metadata
.guardian_thread_id
.as_deref()
.expect("guardian thread id"),
)
.expect("guardian thread id should be a valid UUID");
}
let requests = server.requests().await;
assert_eq!(requests.len(), 2);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));