diff --git a/codex-rs/state/migrations/0004_memory_summaries.sql b/codex-rs/state/migrations/0004_memory_summaries.sql new file mode 100644 index 0000000000..b49695b123 --- /dev/null +++ b/codex-rs/state/migrations/0004_memory_summaries.sql @@ -0,0 +1,20 @@ +CREATE TABLE memory_summaries ( + thread_id TEXT PRIMARY KEY, + cwd TEXT NOT NULL, + summary TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE INDEX idx_memory_summaries_cwd_updated_at + ON memory_summaries(cwd, updated_at DESC, thread_id); + +CREATE TABLE memory_summary_locks ( + cwd TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + acquired_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL +); + +CREATE INDEX idx_memory_summary_locks_expires_at + ON memory_summary_locks(expires_at); diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index c08c76a1cf..0e7f535405 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -14,6 +14,8 @@ mod runtime; pub use model::LogEntry; pub use model::LogQuery; pub use model::LogRow; +pub use model::MemorySummary; +pub use model::MemorySummaryLock; /// Preferred entrypoint: owns configuration and metrics. pub use runtime::StateRuntime; diff --git a/codex-rs/state/src/model/memory_summary.rs b/codex-rs/state/src/model/memory_summary.rs new file mode 100644 index 0000000000..45a953d3b5 --- /dev/null +++ b/codex-rs/state/src/model/memory_summary.rs @@ -0,0 +1,19 @@ +use codex_protocol::ThreadId; +use std::path::PathBuf; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemorySummary { + pub thread_id: ThreadId, + pub cwd: PathBuf, + pub summary: String, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemorySummaryLock { + pub cwd: PathBuf, + pub owner_id: String, + pub acquired_at: i64, + pub expires_at: i64, +} diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs index bd615d7561..18280d8de1 100644 --- a/codex-rs/state/src/model/mod.rs +++ b/codex-rs/state/src/model/mod.rs @@ -1,9 +1,12 @@ mod log; +mod memory_summary; mod thread_metadata; pub use log::LogEntry; pub use log::LogQuery; pub use log::LogRow; +pub use memory_summary::MemorySummary; +pub use memory_summary::MemorySummaryLock; pub use thread_metadata::Anchor; pub use thread_metadata::BackfillStats; pub use thread_metadata::ExtractionOutcome; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 6f2e128967..3050fdfcac 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -8,6 +8,8 @@ use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; use crate::migrations::MIGRATOR; +use crate::model::MemorySummary; +use crate::model::MemorySummaryLock; use crate::model::ThreadRow; use crate::model::anchor_from_item; use crate::model::datetime_to_epoch_seconds; @@ -275,6 +277,199 @@ FROM threads Ok(max_id.unwrap_or(0)) } + /// Upsert a memory summary for a thread. + pub async fn upsert_memory_summary(&self, summary: &MemorySummary) -> anyhow::Result<()> { + sqlx::query( + r#" +INSERT INTO memory_summaries ( + thread_id, + cwd, + summary, + created_at, + updated_at +) VALUES (?, ?, ?, ?, ?) +ON CONFLICT(thread_id) DO UPDATE SET + cwd = excluded.cwd, + summary = excluded.summary, + updated_at = excluded.updated_at + "#, + ) + .bind(summary.thread_id.to_string()) + .bind(summary.cwd.to_string_lossy().as_ref()) + .bind(&summary.summary) + .bind(summary.created_at) + .bind(summary.updated_at) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Fetch the most recent memory summaries for a given cwd. + pub async fn list_memory_summaries_for_cwd( + &self, + cwd: &Path, + limit: usize, + ) -> anyhow::Result> { + let rows = sqlx::query( + r#" +SELECT thread_id, cwd, summary, created_at, updated_at +FROM memory_summaries +WHERE cwd = ? +ORDER BY updated_at DESC, thread_id DESC +LIMIT ? + "#, + ) + .bind(cwd.to_string_lossy().as_ref()) + .bind(limit as i64) + .fetch_all(self.pool.as_ref()) + .await?; + + rows.into_iter() + .map(|row| { + let thread_id: String = row.try_get("thread_id")?; + Ok(MemorySummary { + thread_id: ThreadId::try_from(thread_id)?, + cwd: PathBuf::from(row.try_get::("cwd")?), + summary: row.try_get("summary")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) + }) + .collect() + } + + /// Fetch memory summaries for a set of thread ids. + pub async fn list_memory_summaries_for_threads( + &self, + thread_ids: &[ThreadId], + ) -> anyhow::Result> { + if thread_ids.is_empty() { + return Ok(Vec::new()); + } + + let mut builder = QueryBuilder::::new( + "SELECT thread_id, cwd, summary, created_at, updated_at FROM memory_summaries WHERE thread_id IN (", + ); + let mut separated = builder.separated(", "); + for thread_id in thread_ids { + separated.push_bind(thread_id.to_string()); + } + separated.push_unseparated(")"); + + let rows = builder.build().fetch_all(self.pool.as_ref()).await?; + rows.into_iter() + .map(|row| { + let thread_id: String = row.try_get("thread_id")?; + Ok(MemorySummary { + thread_id: ThreadId::try_from(thread_id)?, + cwd: PathBuf::from(row.try_get::("cwd")?), + summary: row.try_get("summary")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) + }) + .collect() + } + + /// Acquire a per-cwd summary lock if it is free or expired. + pub async fn try_acquire_memory_summary_lock( + &self, + cwd: &Path, + owner_id: &str, + now_ts: i64, + ttl_secs: i64, + ) -> anyhow::Result { + let expires_at = now_ts.saturating_add(ttl_secs); + let mut txn = self.pool.begin().await?; + + let updated = sqlx::query( + r#" +UPDATE memory_summary_locks +SET owner_id = ?, acquired_at = ?, expires_at = ? +WHERE cwd = ? AND expires_at <= ? + "#, + ) + .bind(owner_id) + .bind(now_ts) + .bind(expires_at) + .bind(cwd.to_string_lossy().as_ref()) + .bind(now_ts) + .execute(&mut *txn) + .await? + .rows_affected(); + + if updated == 0 { + let inserted = sqlx::query( + r#" +INSERT OR IGNORE INTO memory_summary_locks (cwd, owner_id, acquired_at, expires_at) +VALUES (?, ?, ?, ?) + "#, + ) + .bind(cwd.to_string_lossy().as_ref()) + .bind(owner_id) + .bind(now_ts) + .bind(expires_at) + .execute(&mut *txn) + .await? + .rows_affected(); + + if inserted == 0 { + txn.rollback().await?; + return Ok(false); + } + } + + txn.commit().await?; + Ok(true) + } + + /// Release a per-cwd summary lock if owned by the caller. + pub async fn release_memory_summary_lock( + &self, + cwd: &Path, + owner_id: &str, + ) -> anyhow::Result<()> { + sqlx::query( + r#" +DELETE FROM memory_summary_locks +WHERE cwd = ? AND owner_id = ? + "#, + ) + .bind(cwd.to_string_lossy().as_ref()) + .bind(owner_id) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Fetch the current memory summary lock, if any. + pub async fn get_memory_summary_lock( + &self, + cwd: &Path, + ) -> anyhow::Result> { + let row = sqlx::query( + r#" +SELECT cwd, owner_id, acquired_at, expires_at +FROM memory_summary_locks +WHERE cwd = ? + "#, + ) + .bind(cwd.to_string_lossy().as_ref()) + .fetch_optional(self.pool.as_ref()) + .await?; + + let Some(row) = row else { + return Ok(None); + }; + + Ok(Some(MemorySummaryLock { + cwd: PathBuf::from(row.try_get::("cwd")?), + owner_id: row.try_get("owner_id")?, + acquired_at: row.try_get("acquired_at")?, + expires_at: row.try_get("expires_at")?, + })) + } + /// List thread ids using the underlying database (no rollout scanning). pub async fn list_thread_ids( &self, @@ -305,6 +500,57 @@ FROM threads .collect() } + /// List recent threads for a cwd using the underlying database (no rollout scanning). + pub async fn list_recent_threads_for_cwd( + &self, + cwd: &Path, + limit: usize, + sort_key: SortKey, + allowed_sources: &[String], + model_providers: Option<&[String]>, + archived_only: bool, + ) -> anyhow::Result> { + let mut builder = QueryBuilder::::new( + r#" +SELECT + id, + rollout_path, + created_at, + updated_at, + source, + model_provider, + cwd, + title, + sandbox_policy, + approval_mode, + tokens_used, + has_user_event, + archived_at, + git_sha, + git_branch, + git_origin_url +FROM threads + "#, + ); + push_thread_filters( + &mut builder, + archived_only, + allowed_sources, + model_providers, + None, + sort_key, + ); + let cwd_value = cwd.to_string_lossy(); + builder.push(" AND cwd = "); + builder.push_bind(cwd_value.as_ref()); + push_thread_order_and_limit(&mut builder, sort_key, limit); + + let rows = builder.build().fetch_all(self.pool.as_ref()).await?; + rows.into_iter() + .map(|row| ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from)) + .collect() + } + /// Insert or replace thread metadata directly. pub async fn upsert_thread(&self, metadata: &crate::ThreadMetadata) -> anyhow::Result<()> { sqlx::query(