This commit is contained in:
jif-oai
2025-10-14 09:39:59 +01:00
parent 8245a4f53b
commit f5e055ae36
3 changed files with 112 additions and 115 deletions

View File

@@ -185,6 +185,22 @@ struct AggregatedVerifierVerdict {
verdicts: Vec<VerifierReport>,
}
#[derive(Serialize)]
struct DirectionRequestPayload<'a> {
#[serde(rename = "type")]
kind: &'static str,
prompt: &'a str,
}
#[derive(Serialize)]
struct VerificationRequestPayload<'a> {
#[serde(rename = "type")]
kind: &'static str,
claim_path: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
notes: Option<&'a str>,
}
struct SessionCleanup {
conversation_id: ConversationId,
conversation: Arc<CodexConversation>,
@@ -268,25 +284,10 @@ impl InftyOrchestrator {
let mut cleanup = Vec::new();
let solver_session = match self
.spawn_role_session(&run_id, &run_path, solver.clone())
.spawn_and_register_role(&run_id, &run_path, &solver, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
if let Some(path) = solver.config_path.clone() {
if let Err(err) = store.set_role_config_path(&session.role, path) {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
@@ -294,25 +295,10 @@ impl InftyOrchestrator {
};
let director_session = match self
.spawn_role_session(&run_id, &run_path, director.clone())
.spawn_and_register_role(&run_id, &run_path, &director, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
if let Some(path) = director.config_path.clone() {
if let Err(err) = store.set_role_config_path(&session.role, path) {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
@@ -322,25 +308,10 @@ impl InftyOrchestrator {
let mut verifier_sessions = Vec::with_capacity(verifiers.len());
for verifier in verifiers {
let session = match self
.spawn_role_session(&run_id, &run_path, verifier.clone())
.spawn_and_register_role(&run_id, &run_path, &verifier, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Some(path) = verifier.config_path.clone() {
if let Err(err) = store.set_role_config_path(&session.role, path) {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
}
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_spawn(cleanup, &run_path).await;
return Err(err);
@@ -373,19 +344,10 @@ impl InftyOrchestrator {
let run_path = store.path().to_path_buf();
let solver_session = match self
.resume_role_session(&run_id, &run_path, &solver, &store)
.resume_and_register_role(&run_id, &run_path, &solver, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_resume(cleanup).await;
return Err(err);
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_resume(cleanup).await;
return Err(err);
@@ -393,19 +355,10 @@ impl InftyOrchestrator {
};
let director_session = match self
.resume_role_session(&run_id, &run_path, &director, &store)
.resume_and_register_role(&run_id, &run_path, &director, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_resume(cleanup).await;
return Err(err);
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_resume(cleanup).await;
return Err(err);
@@ -415,19 +368,10 @@ impl InftyOrchestrator {
let mut verifier_sessions = Vec::with_capacity(verifiers.len());
for verifier in verifiers.iter() {
let session = match self
.resume_role_session(&run_id, &run_path, verifier, &store)
.resume_and_register_role(&run_id, &run_path, verifier, &mut store, &mut cleanup)
.await
{
Ok(session) => {
cleanup.push(SessionCleanup::new(&session));
if let Err(err) =
store.update_rollout_path(&session.role, session.rollout_path.clone())
{
self.cleanup_failed_resume(cleanup).await;
return Err(err);
}
session
}
Ok(session) => session,
Err(err) => {
self.cleanup_failed_resume(cleanup).await;
return Err(err);
@@ -465,9 +409,14 @@ impl InftyOrchestrator {
) -> Result<RunOutcome> {
let mut solver_events = self.stream_events(sessions.solver.conversation_id)?;
if let Some(objective) = options.objective.clone() {
self.post_to_role(&sessions.run_id, &sessions.solver.role, objective, None)
.await?;
if let Some(objective) = &options.objective {
self.post_to_role(
&sessions.run_id,
&sessions.solver.role,
objective.as_str(),
None,
)
.await?;
sessions.store.touch()?;
}
@@ -477,21 +426,16 @@ impl InftyOrchestrator {
if let Some(signal) = parse_solver_signal(&agent_msg.message) {
match signal {
SolverSignal::DirectionRequest { prompt } => {
let _ = prompt;
self.handle_direction_request(
sessions,
&agent_msg.message,
options,
)
.await?;
self.handle_direction_request(sessions, &prompt, options)
.await?;
sessions.store.touch()?;
}
SolverSignal::VerificationRequest { claim_path, notes } => {
let _ = (claim_path, notes);
let pass = self
.handle_verification_request(
sessions,
&agent_msg.message,
&claim_path,
notes.as_deref(),
options,
)
.await?;
@@ -547,14 +491,19 @@ impl InftyOrchestrator {
async fn handle_direction_request(
&self,
sessions: &RunSessions,
raw_message: &str,
prompt: &str,
options: &RunExecutionOptions,
) -> Result<()> {
let request = DirectionRequestPayload {
kind: "direction_request",
prompt,
};
let request_text = serde_json::to_string_pretty(&request)?;
let handle = self
.post_to_role(
&sessions.run_id,
&sessions.director.role,
raw_message.to_string(),
request_text,
Some(director_schema()),
)
.await?;
@@ -578,7 +527,8 @@ impl InftyOrchestrator {
async fn handle_verification_request(
&self,
sessions: &RunSessions,
raw_message: &str,
claim_path: &str,
notes: Option<&str>,
options: &RunExecutionOptions,
) -> Result<bool> {
if sessions.verifiers.is_empty() {
@@ -590,14 +540,21 @@ impl InftyOrchestrator {
return Ok(true);
}
let request = VerificationRequestPayload {
kind: "verification_request",
claim_path,
notes,
};
let request_text = serde_json::to_string_pretty(&request)?;
let mut collected = Vec::with_capacity(sessions.verifiers.len());
let schema = verifier_schema();
for verifier in &sessions.verifiers {
let handle = self
.post_to_role(
&sessions.run_id,
&verifier.role,
raw_message.to_string(),
Some(verifier_schema()),
request_text.as_str(),
Some(schema.clone()),
)
.await?;
let response = self
@@ -620,14 +577,14 @@ impl InftyOrchestrator {
async fn cleanup_failed_spawn(&self, sessions: Vec<SessionCleanup>, run_path: &Path) {
self.shutdown_sessions(sessions).await;
if run_path.exists() {
if let Err(err) = fs::remove_dir_all(run_path) {
warn!(
path = %run_path.display(),
?err,
"failed to remove run directory after spawn failure"
);
}
if run_path.exists()
&& let Err(err) = fs::remove_dir_all(run_path)
{
warn!(
path = %run_path.display(),
?err,
"failed to remove run directory after spawn failure"
);
}
}
@@ -721,6 +678,44 @@ impl InftyOrchestrator {
self.hub.stream_events(conversation_id)
}
async fn spawn_and_register_role(
&self,
run_id: &str,
run_path: &Path,
role_config: &RoleConfig,
store: &mut RunStore,
cleanup: &mut Vec<SessionCleanup>,
) -> Result<RoleSession> {
let session = self
.spawn_role_session(run_id, run_path, role_config.clone())
.await?;
cleanup.push(SessionCleanup::new(&session));
store.update_rollout_path(&session.role, session.rollout_path.clone())?;
if let Some(path) = role_config.config_path.clone() {
store.set_role_config_path(&session.role, path)?;
}
Ok(session)
}
async fn resume_and_register_role(
&self,
run_id: &str,
run_path: &Path,
role_config: &RoleConfig,
store: &mut RunStore,
cleanup: &mut Vec<SessionCleanup>,
) -> Result<RoleSession> {
let session = self
.resume_role_session(run_id, run_path, role_config, store)
.await?;
cleanup.push(SessionCleanup::new(&session));
store.update_rollout_path(&session.role, session.rollout_path.clone())?;
if let Some(path) = role_config.config_path.clone() {
store.set_role_config_path(&session.role, path)?;
}
Ok(session)
}
async fn spawn_role_session(
&self,
run_id: &str,