mirror of
https://github.com/openai/codex.git
synced 2026-04-28 02:11:08 +03:00
feat: mem v2 - PR6 (consolidation) (#11374)
This commit is contained in:
@@ -67,11 +67,22 @@ struct SummarizeResponse {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::common::RawMemory;
|
||||
use crate::common::RawMemoryMetadata;
|
||||
use crate::provider::RetryConfig;
|
||||
use async_trait::async_trait;
|
||||
use codex_client::Request;
|
||||
use codex_client::Response;
|
||||
use codex_client::StreamResponse;
|
||||
use codex_client::TransportError;
|
||||
use http::HeaderMap;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct DummyTransport;
|
||||
@@ -96,6 +107,54 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CapturingTransport {
|
||||
last_request: Arc<Mutex<Option<Request>>>,
|
||||
response_body: Arc<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl CapturingTransport {
|
||||
fn new(response_body: Vec<u8>) -> Self {
|
||||
Self {
|
||||
last_request: Arc::new(Mutex::new(None)),
|
||||
response_body: Arc::new(response_body),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HttpTransport for CapturingTransport {
|
||||
async fn execute(&self, req: Request) -> Result<Response, TransportError> {
|
||||
*self.last_request.lock().expect("lock request store") = Some(req);
|
||||
Ok(Response {
|
||||
status: StatusCode::OK,
|
||||
headers: HeaderMap::new(),
|
||||
body: self.response_body.as_ref().clone().into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn stream(&self, _req: Request) -> Result<StreamResponse, TransportError> {
|
||||
Err(TransportError::Build("stream should not run".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn provider(base_url: &str) -> Provider {
|
||||
Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: base_url.to_string(),
|
||||
query_params: None,
|
||||
headers: HeaderMap::new(),
|
||||
retry: RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: true,
|
||||
retry_transport: true,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn path_is_memories_trace_summarize_for_wire_compatibility() {
|
||||
assert_eq!(
|
||||
@@ -103,4 +162,63 @@ mod tests {
|
||||
"memories/trace_summarize"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn summarize_input_posts_expected_payload_and_parses_output() {
|
||||
let transport = CapturingTransport::new(
|
||||
serde_json::to_vec(&json!({
|
||||
"output": [
|
||||
{
|
||||
"trace_summary": "raw summary",
|
||||
"memory_summary": "memory summary"
|
||||
}
|
||||
]
|
||||
}))
|
||||
.expect("serialize response"),
|
||||
);
|
||||
let client = MemoriesClient::new(
|
||||
transport.clone(),
|
||||
provider("https://example.com/api/codex"),
|
||||
DummyAuth,
|
||||
);
|
||||
|
||||
let input = MemorySummarizeInput {
|
||||
model: "gpt-test".to_string(),
|
||||
raw_memories: vec![RawMemory {
|
||||
id: "trace-1".to_string(),
|
||||
metadata: RawMemoryMetadata {
|
||||
source_path: "/tmp/trace.json".to_string(),
|
||||
},
|
||||
items: vec![json!({"type": "message", "role": "user", "content": []})],
|
||||
}],
|
||||
reasoning: None,
|
||||
};
|
||||
|
||||
let output = client
|
||||
.summarize_input(&input, HeaderMap::new())
|
||||
.await
|
||||
.expect("summarize input request should succeed");
|
||||
assert_eq!(output.len(), 1);
|
||||
assert_eq!(output[0].raw_memory, "raw summary");
|
||||
assert_eq!(output[0].memory_summary, "memory summary");
|
||||
|
||||
let request = transport
|
||||
.last_request
|
||||
.lock()
|
||||
.expect("lock request store")
|
||||
.clone()
|
||||
.expect("request should be captured");
|
||||
assert_eq!(request.method, Method::POST);
|
||||
assert_eq!(
|
||||
request.url,
|
||||
"https://example.com/api/codex/memories/trace_summarize"
|
||||
);
|
||||
let body = request.body.expect("request body should be present");
|
||||
assert_eq!(body["model"], "gpt-test");
|
||||
assert_eq!(body["traces"][0]["id"], "trace-1");
|
||||
assert_eq!(
|
||||
body["traces"][0]["metadata"]["source_path"],
|
||||
"/tmp/trace.json"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1116,3 +1116,103 @@ impl WebsocketTelemetry for ApiTelemetry {
|
||||
self.otel_manager.record_websocket_event(result, duration);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ModelClient;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
fn test_model_client(session_source: SessionSource) -> ModelClient {
|
||||
let provider = crate::model_provider_info::create_oss_provider_with_base_url(
|
||||
"https://example.com/v1",
|
||||
crate::model_provider_info::WireApi::Responses,
|
||||
);
|
||||
ModelClient::new(
|
||||
None,
|
||||
ThreadId::new(),
|
||||
provider,
|
||||
session_source,
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn test_model_info() -> ModelInfo {
|
||||
serde_json::from_value(json!({
|
||||
"slug": "gpt-test",
|
||||
"display_name": "gpt-test",
|
||||
"description": "desc",
|
||||
"default_reasoning_level": "medium",
|
||||
"supported_reasoning_levels": [
|
||||
{"effort": "medium", "description": "medium"}
|
||||
],
|
||||
"shell_type": "shell_command",
|
||||
"visibility": "list",
|
||||
"supported_in_api": true,
|
||||
"priority": 1,
|
||||
"upgrade": null,
|
||||
"base_instructions": "base instructions",
|
||||
"model_messages": null,
|
||||
"supports_reasoning_summaries": false,
|
||||
"support_verbosity": false,
|
||||
"default_verbosity": null,
|
||||
"apply_patch_tool_type": null,
|
||||
"truncation_policy": {"mode": "bytes", "limit": 10000},
|
||||
"supports_parallel_tool_calls": false,
|
||||
"context_window": 272000,
|
||||
"auto_compact_token_limit": null,
|
||||
"experimental_supported_tools": []
|
||||
}))
|
||||
.expect("deserialize test model info")
|
||||
}
|
||||
|
||||
fn test_otel_manager() -> OtelManager {
|
||||
OtelManager::new(
|
||||
ThreadId::new(),
|
||||
"gpt-test",
|
||||
"gpt-test",
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
"test-originator".to_string(),
|
||||
false,
|
||||
"test-terminal".to_string(),
|
||||
SessionSource::Cli,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_subagent_headers_sets_other_subagent_label() {
|
||||
let client = test_model_client(SessionSource::SubAgent(SubAgentSource::Other(
|
||||
"memory_consolidation".to_string(),
|
||||
)));
|
||||
let headers = client.build_subagent_headers();
|
||||
let value = headers
|
||||
.get("x-openai-subagent")
|
||||
.and_then(|value| value.to_str().ok());
|
||||
assert_eq!(value, Some("memory_consolidation"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn summarize_memories_returns_empty_for_empty_input() {
|
||||
let client = test_model_client(SessionSource::Cli);
|
||||
let model_info = test_model_info();
|
||||
let otel_manager = test_otel_manager();
|
||||
|
||||
let output = client
|
||||
.summarize_memories(Vec::new(), &model_info, None, &otel_manager)
|
||||
.await
|
||||
.expect("empty summarize request should succeed");
|
||||
assert_eq!(output.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,6 +169,7 @@ mod tests {
|
||||
use super::run_global_memory_consolidation;
|
||||
use crate::CodexAuth;
|
||||
use crate::ThreadManager;
|
||||
use crate::agent::control::AgentControl;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::config::Config;
|
||||
@@ -280,6 +281,14 @@ mod tests {
|
||||
.await
|
||||
.expect("shutdown spawned threads");
|
||||
}
|
||||
|
||||
fn user_input_ops_count(&self) -> usize {
|
||||
self.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
|
||||
.count()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -311,12 +320,7 @@ mod tests {
|
||||
.expect("claim while running");
|
||||
assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
|
||||
|
||||
let user_input_ops = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
|
||||
.count();
|
||||
let user_input_ops = harness.user_input_ops_count();
|
||||
assert_eq!(user_input_ops, 1);
|
||||
|
||||
harness.shutdown_threads().await;
|
||||
@@ -338,14 +342,115 @@ mod tests {
|
||||
"second dispatch should skip while the global lock is running"
|
||||
);
|
||||
|
||||
let user_input_ops = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
|
||||
.count();
|
||||
let user_input_ops = harness.user_input_ops_count();
|
||||
assert_eq!(user_input_ops, 1);
|
||||
|
||||
harness.shutdown_threads().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_with_dirty_job_and_no_stage1_outputs_skips_spawn_and_clears_dirty_flag() {
|
||||
let harness = DispatchHarness::new().await;
|
||||
harness
|
||||
.state_db
|
||||
.enqueue_global_consolidation(999)
|
||||
.await
|
||||
.expect("enqueue global consolidation");
|
||||
|
||||
let scheduled =
|
||||
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
|
||||
assert!(
|
||||
!scheduled,
|
||||
"dispatch should not spawn when no stage-1 outputs are available"
|
||||
);
|
||||
assert_eq!(harness.user_input_ops_count(), 0);
|
||||
|
||||
let claim = harness
|
||||
.state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim global job after empty dispatch");
|
||||
assert_eq!(
|
||||
claim,
|
||||
Phase2JobClaimOutcome::SkippedNotDirty,
|
||||
"empty dispatch should finalize global job as up-to-date"
|
||||
);
|
||||
|
||||
harness.shutdown_threads().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() {
|
||||
let codex_home = tempfile::tempdir().expect("create temp codex home");
|
||||
let mut config = test_config();
|
||||
config.codex_home = codex_home.path().to_path_buf();
|
||||
config.cwd = config.codex_home.clone();
|
||||
let config = Arc::new(config);
|
||||
|
||||
let state_db = codex_state::StateRuntime::init(
|
||||
config.codex_home.clone(),
|
||||
config.model_provider_id.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("initialize state db");
|
||||
|
||||
let (mut session, _turn_context) = make_session_and_context().await;
|
||||
session.services.state_db = Some(Arc::clone(&state_db));
|
||||
session.services.agent_control = AgentControl::default();
|
||||
let session = Arc::new(session);
|
||||
|
||||
let thread_id = ThreadId::new();
|
||||
let mut metadata_builder = ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
config.codex_home.join(format!("rollout-{thread_id}.jsonl")),
|
||||
Utc::now(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
metadata_builder.cwd = config.cwd.clone();
|
||||
metadata_builder.model_provider = Some(config.model_provider_id.clone());
|
||||
let metadata = metadata_builder.build(&config.model_provider_id);
|
||||
state_db
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.expect("upsert thread metadata");
|
||||
|
||||
let claim = state_db
|
||||
.try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64)
|
||||
.await
|
||||
.expect("claim stage-1 job");
|
||||
let ownership_token = match claim {
|
||||
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
state_db
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
&ownership_token,
|
||||
100,
|
||||
"raw memory",
|
||||
"rollout summary",
|
||||
)
|
||||
.await
|
||||
.expect("mark stage-1 success"),
|
||||
"stage-1 success should enqueue global consolidation"
|
||||
);
|
||||
|
||||
let scheduled = run_global_memory_consolidation(&session, Arc::clone(&config)).await;
|
||||
assert!(
|
||||
!scheduled,
|
||||
"dispatch should return false when consolidation subagent cannot be spawned"
|
||||
);
|
||||
|
||||
let retry_claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim global job after spawn failure");
|
||||
assert_eq!(
|
||||
retry_claim,
|
||||
Phase2JobClaimOutcome::SkippedNotDirty,
|
||||
"spawn failures should leave the job in retry backoff instead of running"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,3 +185,21 @@ pub(super) async fn run_memories_startup_pipeline(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::run_memories_startup_pipeline;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::config::test_config;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn startup_pipeline_is_noop_when_state_db_is_unavailable() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
let session = Arc::new(session);
|
||||
let config = Arc::new(test_config());
|
||||
run_memories_startup_pipeline(&session, config)
|
||||
.await
|
||||
.expect("startup pipeline should skip cleanly without state db");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ use crate::agent::AgentStatus;
|
||||
use crate::agent::status::is_final as is_final_agent_status;
|
||||
use crate::codex::Session;
|
||||
use codex_protocol::ThreadId;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::watch;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
@@ -21,32 +23,50 @@ pub(super) fn spawn_phase2_completion_task(
|
||||
let agent_control = session.services.agent_control.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let Some(state_db) = state_db.as_deref() else {
|
||||
let Some(state_db) = state_db else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut status_rx = match agent_control.subscribe_status(consolidation_agent_id).await {
|
||||
let status_rx = match agent_control.subscribe_status(consolidation_agent_id).await {
|
||||
Ok(status_rx) => status_rx,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}"
|
||||
);
|
||||
let _ = state_db
|
||||
.mark_global_phase2_job_failed(
|
||||
&ownership_token,
|
||||
"failed to subscribe to consolidation agent status",
|
||||
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
mark_phase2_failed_with_recovery(
|
||||
state_db.as_ref(),
|
||||
&ownership_token,
|
||||
"failed to subscribe to consolidation agent status",
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
run_phase2_completion_task(
|
||||
Arc::clone(&state_db),
|
||||
ownership_token,
|
||||
completion_watermark,
|
||||
consolidation_agent_id,
|
||||
status_rx,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_phase2_completion_task(
|
||||
state_db: Arc<codex_state::StateRuntime>,
|
||||
ownership_token: String,
|
||||
completion_watermark: i64,
|
||||
consolidation_agent_id: ThreadId,
|
||||
mut status_rx: watch::Receiver<AgentStatus>,
|
||||
) {
|
||||
let final_status = {
|
||||
let mut heartbeat_interval =
|
||||
tokio::time::interval(Duration::from_secs(PHASE_TWO_JOB_HEARTBEAT_SECONDS));
|
||||
heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
let final_status = loop {
|
||||
loop {
|
||||
let status = status_rx.borrow().clone();
|
||||
if is_final_agent_status(&status) {
|
||||
break status;
|
||||
@@ -68,55 +88,84 @@ pub(super) fn spawn_phase2_completion_task(
|
||||
{
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
debug!(
|
||||
"memory phase-2 heartbeat lost global ownership; skipping finalization"
|
||||
warn!(
|
||||
"memory phase-2 heartbeat lost global ownership; finalizing as failure"
|
||||
);
|
||||
break AgentStatus::Errored(
|
||||
"lost global phase-2 ownership during heartbeat".to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db heartbeat_global_phase2_job failed during memories startup: {err}"
|
||||
);
|
||||
return;
|
||||
break AgentStatus::Errored(format!(
|
||||
"phase-2 heartbeat update failed: {err}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if is_phase2_success(&final_status) {
|
||||
match state_db
|
||||
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
|
||||
.await
|
||||
{
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
debug!(
|
||||
"memory phase-2 success finalization skipped after global ownership changed"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db mark_global_phase2_job_succeeded failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"memory phase-2 global consolidation agent finished: agent_id={consolidation_agent_id} final_status={final_status:?}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let failure_reason = phase2_failure_reason(&final_status);
|
||||
if is_phase2_success(&final_status) {
|
||||
match state_db
|
||||
.mark_global_phase2_job_failed(
|
||||
&ownership_token,
|
||||
&failure_reason,
|
||||
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
|
||||
.await
|
||||
{
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
debug!(
|
||||
"memory phase-2 success finalization skipped after global ownership changed"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db mark_global_phase2_job_succeeded failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"memory phase-2 global consolidation agent finished: agent_id={consolidation_agent_id} final_status={final_status:?}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let failure_reason = phase2_failure_reason(&final_status);
|
||||
mark_phase2_failed_with_recovery(state_db.as_ref(), &ownership_token, &failure_reason).await;
|
||||
warn!(
|
||||
"memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}"
|
||||
);
|
||||
}
|
||||
|
||||
async fn mark_phase2_failed_with_recovery(
|
||||
state_db: &codex_state::StateRuntime,
|
||||
ownership_token: &str,
|
||||
failure_reason: &str,
|
||||
) {
|
||||
match state_db
|
||||
.mark_global_phase2_job_failed(
|
||||
ownership_token,
|
||||
failure_reason,
|
||||
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(true) => {}
|
||||
Ok(false) => match state_db
|
||||
.mark_global_phase2_job_failed_if_unowned(
|
||||
ownership_token,
|
||||
failure_reason,
|
||||
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(true) => {}
|
||||
Ok(true) => {
|
||||
debug!(
|
||||
"memory phase-2 failure finalization applied fallback update for unowned running job"
|
||||
);
|
||||
}
|
||||
Ok(false) => {
|
||||
debug!(
|
||||
"memory phase-2 failure finalization skipped after global ownership changed"
|
||||
@@ -124,14 +173,14 @@ pub(super) fn spawn_phase2_completion_task(
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db mark_global_phase2_job_failed failed during memories startup: {err}"
|
||||
"state db mark_global_phase2_job_failed_if_unowned failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
warn!("state db mark_global_phase2_job_failed failed during memories startup: {err}");
|
||||
}
|
||||
warn!(
|
||||
"memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn is_phase2_success(final_status: &AgentStatus) -> bool {
|
||||
@@ -146,7 +195,12 @@ fn phase2_failure_reason(final_status: &AgentStatus) -> String {
|
||||
mod tests {
|
||||
use super::is_phase2_success;
|
||||
use super::phase2_failure_reason;
|
||||
use super::run_phase2_completion_task;
|
||||
use crate::agent::AgentStatus;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::Phase2JobClaimOutcome;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn phase2_success_only_for_completed_status() {
|
||||
@@ -164,4 +218,167 @@ mod tests {
|
||||
assert!(reason.contains("consolidation agent finished with status"));
|
||||
assert!(reason.contains("boom"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn phase2_completion_marks_succeeded_for_completed_status() {
|
||||
let codex_home = tempfile::tempdir().expect("create temp codex home");
|
||||
let state_db = Arc::new(
|
||||
codex_state::StateRuntime::init(
|
||||
codex_home.path().to_path_buf(),
|
||||
"test-provider".to_string(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("initialize state runtime"),
|
||||
);
|
||||
let owner = ThreadId::new();
|
||||
state_db
|
||||
.enqueue_global_consolidation(123)
|
||||
.await
|
||||
.expect("enqueue global consolidation");
|
||||
let claim = state_db
|
||||
.try_claim_global_phase2_job(owner, 3_600)
|
||||
.await
|
||||
.expect("claim global phase-2 job");
|
||||
let ownership_token = match claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token, ..
|
||||
} => ownership_token,
|
||||
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Completed(None));
|
||||
run_phase2_completion_task(
|
||||
Arc::clone(&state_db),
|
||||
ownership_token.clone(),
|
||||
123,
|
||||
ThreadId::new(),
|
||||
status_rx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let up_to_date_claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim up-to-date global job");
|
||||
assert_eq!(up_to_date_claim, Phase2JobClaimOutcome::SkippedNotDirty);
|
||||
|
||||
state_db
|
||||
.enqueue_global_consolidation(124)
|
||||
.await
|
||||
.expect("enqueue advanced consolidation watermark");
|
||||
let rerun_claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim rerun global job");
|
||||
assert!(
|
||||
matches!(rerun_claim, Phase2JobClaimOutcome::Claimed { .. }),
|
||||
"advanced watermark should be claimable after success finalization"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn phase2_completion_marks_failed_when_status_updates_are_lost() {
|
||||
let codex_home = tempfile::tempdir().expect("create temp codex home");
|
||||
let state_db = Arc::new(
|
||||
codex_state::StateRuntime::init(
|
||||
codex_home.path().to_path_buf(),
|
||||
"test-provider".to_string(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("initialize state runtime"),
|
||||
);
|
||||
state_db
|
||||
.enqueue_global_consolidation(456)
|
||||
.await
|
||||
.expect("enqueue global consolidation");
|
||||
let claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim global phase-2 job");
|
||||
let ownership_token = match claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token, ..
|
||||
} => ownership_token,
|
||||
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
let (status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running);
|
||||
drop(status_tx);
|
||||
run_phase2_completion_task(
|
||||
Arc::clone(&state_db),
|
||||
ownership_token,
|
||||
456,
|
||||
ThreadId::new(),
|
||||
status_rx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim after failure finalization");
|
||||
assert_eq!(
|
||||
claim,
|
||||
Phase2JobClaimOutcome::SkippedNotDirty,
|
||||
"failure finalization should leave global job in retry-backoff, not running ownership"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn phase2_completion_heartbeat_loss_does_not_steal_active_other_owner() {
|
||||
let codex_home = tempfile::tempdir().expect("create temp codex home");
|
||||
let state_db = Arc::new(
|
||||
codex_state::StateRuntime::init(
|
||||
codex_home.path().to_path_buf(),
|
||||
"test-provider".to_string(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("initialize state runtime"),
|
||||
);
|
||||
state_db
|
||||
.enqueue_global_consolidation(789)
|
||||
.await
|
||||
.expect("enqueue global consolidation");
|
||||
let claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim global phase-2 job");
|
||||
let claimed_token = match claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token, ..
|
||||
} => ownership_token,
|
||||
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running);
|
||||
run_phase2_completion_task(
|
||||
Arc::clone(&state_db),
|
||||
"non-owner-token".to_string(),
|
||||
789,
|
||||
ThreadId::new(),
|
||||
status_rx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let claim = state_db
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim after heartbeat ownership loss");
|
||||
assert_eq!(
|
||||
claim,
|
||||
Phase2JobClaimOutcome::SkippedRunning,
|
||||
"heartbeat ownership-loss handling should not steal a live owner lease"
|
||||
);
|
||||
assert_eq!(
|
||||
state_db
|
||||
.mark_global_phase2_job_succeeded(claimed_token.as_str(), 789)
|
||||
.await
|
||||
.expect("mark original owner success"),
|
||||
true,
|
||||
"the original owner should still be able to finalize"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -885,6 +885,7 @@ mod tests {
|
||||
use sqlx::Row;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use uuid::Uuid;
|
||||
@@ -1125,6 +1126,133 @@ mod tests {
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stage1_concurrent_claim_for_same_thread_is_conflict_safe() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join("workspace"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let thread_id_a = thread_id;
|
||||
let thread_id_b = thread_id;
|
||||
let runtime_a = Arc::clone(&runtime);
|
||||
let runtime_b = Arc::clone(&runtime);
|
||||
let claim_with_retry = |runtime: Arc<StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
owner: ThreadId| async move {
|
||||
for attempt in 0..5 {
|
||||
match runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3_600, 64)
|
||||
.await
|
||||
{
|
||||
Ok(outcome) => return outcome,
|
||||
Err(err) if err.to_string().contains("database is locked") && attempt < 4 => {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
Err(err) => panic!("claim stage1 should not fail: {err}"),
|
||||
}
|
||||
}
|
||||
panic!("claim stage1 should have returned within retry budget")
|
||||
};
|
||||
|
||||
let (claim_a, claim_b) = tokio::join!(
|
||||
claim_with_retry(runtime_a, thread_id_a, owner_a),
|
||||
claim_with_retry(runtime_b, thread_id_b, owner_b),
|
||||
);
|
||||
|
||||
let claim_outcomes = vec![claim_a, claim_b];
|
||||
let claimed_count = claim_outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, Stage1JobClaimOutcome::Claimed { .. }))
|
||||
.count();
|
||||
assert_eq!(claimed_count, 1);
|
||||
assert!(
|
||||
claim_outcomes.iter().all(|outcome| {
|
||||
matches!(
|
||||
outcome,
|
||||
Stage1JobClaimOutcome::Claimed { .. } | Stage1JobClaimOutcome::SkippedRunning
|
||||
)
|
||||
}),
|
||||
"unexpected claim outcomes: {claim_outcomes:?}"
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stage1_concurrent_claims_respect_running_cap() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_a,
|
||||
codex_home.join("workspace-a"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread a");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_b,
|
||||
codex_home.join("workspace-b"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread b");
|
||||
|
||||
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
let runtime_a = Arc::clone(&runtime);
|
||||
let runtime_b = Arc::clone(&runtime);
|
||||
|
||||
let (claim_a, claim_b) = tokio::join!(
|
||||
async move {
|
||||
runtime_a
|
||||
.try_claim_stage1_job(thread_a, owner_a, 100, 3_600, 1)
|
||||
.await
|
||||
.expect("claim stage1 thread a")
|
||||
},
|
||||
async move {
|
||||
runtime_b
|
||||
.try_claim_stage1_job(thread_b, owner_b, 101, 3_600, 1)
|
||||
.await
|
||||
.expect("claim stage1 thread b")
|
||||
},
|
||||
);
|
||||
|
||||
let claim_outcomes = vec![claim_a, claim_b];
|
||||
let claimed_count = claim_outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, Stage1JobClaimOutcome::Claimed { .. }))
|
||||
.count();
|
||||
assert_eq!(claimed_count, 1);
|
||||
assert!(
|
||||
claim_outcomes
|
||||
.iter()
|
||||
.any(|outcome| { matches!(outcome, Stage1JobClaimOutcome::SkippedRunning) }),
|
||||
"one concurrent claim should be throttled by running cap: {claim_outcomes:?}"
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn claim_stage1_jobs_filters_by_age_idle_and_current_thread() {
|
||||
let codex_home = unique_temp_dir();
|
||||
@@ -1717,6 +1845,62 @@ WHERE kind = 'memory_stage1'
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn phase2_failure_fallback_updates_unowned_running_job() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
runtime
|
||||
.enqueue_global_consolidation(400)
|
||||
.await
|
||||
.expect("enqueue global consolidation");
|
||||
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner");
|
||||
let claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3_600)
|
||||
.await
|
||||
.expect("claim global consolidation");
|
||||
let ownership_token = match claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token, ..
|
||||
} => ownership_token,
|
||||
other => panic!("unexpected claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
sqlx::query("UPDATE jobs SET ownership_token = NULL WHERE kind = ? AND job_key = ?")
|
||||
.bind("memory_consolidate_global")
|
||||
.bind("global")
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("clear ownership token");
|
||||
|
||||
assert_eq!(
|
||||
runtime
|
||||
.mark_global_phase2_job_failed(ownership_token.as_str(), "lost", 3_600)
|
||||
.await
|
||||
.expect("mark phase2 failed with strict ownership"),
|
||||
false,
|
||||
"strict failure update should not match unowned running job"
|
||||
);
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_failed_if_unowned(ownership_token.as_str(), "lost", 3_600)
|
||||
.await
|
||||
.expect("fallback failure update should match unowned running job"),
|
||||
"fallback failure update should transition the unowned running job"
|
||||
);
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
|
||||
.await
|
||||
.expect("claim after fallback failure");
|
||||
assert_eq!(claim, Phase2JobClaimOutcome::SkippedNotDirty);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
fn test_thread_metadata(
|
||||
codex_home: &Path,
|
||||
thread_id: ThreadId,
|
||||
|
||||
@@ -194,67 +194,8 @@ WHERE thread_id = ?
|
||||
}
|
||||
}
|
||||
|
||||
let existing_job = sqlx::query(
|
||||
let rows_affected = sqlx::query(
|
||||
r#"
|
||||
SELECT status, lease_until, retry_at, retry_remaining
|
||||
FROM jobs
|
||||
WHERE kind = ? AND job_key = ?
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(thread_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let should_insert = if let Some(existing_job) = existing_job {
|
||||
let status: String = existing_job.try_get("status")?;
|
||||
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
|
||||
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
|
||||
let retry_remaining: i64 = existing_job.try_get("retry_remaining")?;
|
||||
|
||||
if retry_remaining <= 0 {
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRetryExhausted);
|
||||
}
|
||||
if retry_at.is_some_and(|retry_at| retry_at > now) {
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRetryBackoff);
|
||||
}
|
||||
if status == "running"
|
||||
&& existing_lease_until.is_some_and(|lease_until| lease_until > now)
|
||||
{
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRunning);
|
||||
}
|
||||
|
||||
false
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let fresh_running_jobs = sqlx::query(
|
||||
r#"
|
||||
SELECT COUNT(*) AS count
|
||||
FROM jobs
|
||||
WHERE kind = ?
|
||||
AND status = 'running'
|
||||
AND lease_until IS NOT NULL
|
||||
AND lease_until > ?
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(now)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.try_get::<i64, _>("count")?;
|
||||
if fresh_running_jobs >= max_running_jobs {
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRunning);
|
||||
}
|
||||
|
||||
if should_insert {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO jobs (
|
||||
kind,
|
||||
job_key,
|
||||
@@ -269,61 +210,96 @@ INSERT INTO jobs (
|
||||
last_error,
|
||||
input_watermark,
|
||||
last_success_watermark
|
||||
) VALUES (?, ?, 'running', ?, ?, ?, NULL, ?, NULL, ?, NULL, ?, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(thread_id.as_str())
|
||||
.bind(worker_id.as_str())
|
||||
.bind(ownership_token.as_str())
|
||||
.bind(now)
|
||||
.bind(lease_until)
|
||||
.bind(DEFAULT_RETRY_REMAINING)
|
||||
.bind(source_updated_at)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::Claimed { ownership_token });
|
||||
}
|
||||
|
||||
let rows_affected = sqlx::query(
|
||||
r#"
|
||||
UPDATE jobs
|
||||
SET
|
||||
)
|
||||
SELECT ?, ?, 'running', ?, ?, ?, NULL, ?, NULL, ?, NULL, ?, NULL
|
||||
WHERE (
|
||||
SELECT COUNT(*)
|
||||
FROM jobs
|
||||
WHERE kind = ?
|
||||
AND status = 'running'
|
||||
AND lease_until IS NOT NULL
|
||||
AND lease_until > ?
|
||||
) < ?
|
||||
ON CONFLICT(kind, job_key) DO UPDATE SET
|
||||
status = 'running',
|
||||
worker_id = ?,
|
||||
ownership_token = ?,
|
||||
started_at = ?,
|
||||
worker_id = excluded.worker_id,
|
||||
ownership_token = excluded.ownership_token,
|
||||
started_at = excluded.started_at,
|
||||
finished_at = NULL,
|
||||
lease_until = ?,
|
||||
lease_until = excluded.lease_until,
|
||||
retry_at = NULL,
|
||||
last_error = NULL,
|
||||
input_watermark = ?
|
||||
WHERE kind = ? AND job_key = ?
|
||||
AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?)
|
||||
AND (retry_at IS NULL OR retry_at <= ?)
|
||||
AND retry_remaining > 0
|
||||
input_watermark = excluded.input_watermark
|
||||
WHERE
|
||||
(jobs.status != 'running' OR jobs.lease_until IS NULL OR jobs.lease_until <= excluded.started_at)
|
||||
AND (jobs.retry_at IS NULL OR jobs.retry_at <= excluded.started_at)
|
||||
AND jobs.retry_remaining > 0
|
||||
AND (
|
||||
SELECT COUNT(*)
|
||||
FROM jobs AS running_jobs
|
||||
WHERE running_jobs.kind = excluded.kind
|
||||
AND running_jobs.status = 'running'
|
||||
AND running_jobs.lease_until IS NOT NULL
|
||||
AND running_jobs.lease_until > excluded.started_at
|
||||
AND running_jobs.job_key != excluded.job_key
|
||||
) < ?
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(thread_id.as_str())
|
||||
.bind(worker_id.as_str())
|
||||
.bind(ownership_token.as_str())
|
||||
.bind(now)
|
||||
.bind(lease_until)
|
||||
.bind(DEFAULT_RETRY_REMAINING)
|
||||
.bind(source_updated_at)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(thread_id.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(max_running_jobs)
|
||||
.bind(max_running_jobs)
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
tx.commit().await?;
|
||||
if rows_affected == 0 {
|
||||
Ok(Stage1JobClaimOutcome::SkippedRunning)
|
||||
} else {
|
||||
Ok(Stage1JobClaimOutcome::Claimed { ownership_token })
|
||||
if rows_affected > 0 {
|
||||
tx.commit().await?;
|
||||
return Ok(Stage1JobClaimOutcome::Claimed { ownership_token });
|
||||
}
|
||||
|
||||
let existing_job = sqlx::query(
|
||||
r#"
|
||||
SELECT status, lease_until, retry_at, retry_remaining
|
||||
FROM jobs
|
||||
WHERE kind = ? AND job_key = ?
|
||||
"#,
|
||||
)
|
||||
.bind(JOB_KIND_MEMORY_STAGE1)
|
||||
.bind(thread_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
if let Some(existing_job) = existing_job {
|
||||
let status: String = existing_job.try_get("status")?;
|
||||
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
|
||||
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
|
||||
let retry_remaining: i64 = existing_job.try_get("retry_remaining")?;
|
||||
|
||||
if retry_remaining <= 0 {
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRetryExhausted);
|
||||
}
|
||||
if retry_at.is_some_and(|retry_at| retry_at > now) {
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRetryBackoff);
|
||||
}
|
||||
if status == "running"
|
||||
&& existing_lease_until.is_some_and(|lease_until| lease_until > now)
|
||||
{
|
||||
return Ok(Stage1JobClaimOutcome::SkippedRunning);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Stage1JobClaimOutcome::SkippedRunning)
|
||||
}
|
||||
|
||||
pub async fn mark_stage1_job_succeeded(
|
||||
@@ -627,6 +603,42 @@ WHERE kind = ? AND job_key = ?
|
||||
|
||||
Ok(rows_affected > 0)
|
||||
}
|
||||
|
||||
pub async fn mark_global_phase2_job_failed_if_unowned(
|
||||
&self,
|
||||
ownership_token: &str,
|
||||
failure_reason: &str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let retry_at = now.saturating_add(retry_delay_seconds.max(0));
|
||||
let rows_affected = sqlx::query(
|
||||
r#"
|
||||
UPDATE jobs
|
||||
SET
|
||||
status = 'error',
|
||||
finished_at = ?,
|
||||
lease_until = NULL,
|
||||
retry_at = ?,
|
||||
retry_remaining = retry_remaining - 1,
|
||||
last_error = ?
|
||||
WHERE kind = ? AND job_key = ?
|
||||
AND status = 'running'
|
||||
AND (ownership_token = ? OR ownership_token IS NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(now)
|
||||
.bind(retry_at)
|
||||
.bind(failure_reason)
|
||||
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
|
||||
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
|
||||
.bind(ownership_token)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
Ok(rows_affected > 0)
|
||||
}
|
||||
}
|
||||
|
||||
async fn enqueue_global_consolidation_with_executor<'e, E>(
|
||||
|
||||
Reference in New Issue
Block a user