mirror of
https://github.com/openai/codex.git
synced 2026-05-19 21:01:20 +03:00
Compare commits
1 Commits
pr22402
...
fc/thread-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92e658df03 |
@@ -35,6 +35,7 @@ use codex_protocol::protocol::SessionSource as CoreSessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::cmp::Reverse;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
@@ -174,6 +175,27 @@ fn set_rollout_cwd(path: &Path, cwd: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_user_message_to_rollout(path: &Path, timestamp: &str, message: &str) -> Result<()> {
|
||||
let mut content = fs::read_to_string(path)?;
|
||||
content.push_str(
|
||||
format!(
|
||||
"{}\n",
|
||||
json!({
|
||||
"timestamp": timestamp,
|
||||
"type": "event_msg",
|
||||
"payload": {
|
||||
"type": "user_message",
|
||||
"message": message,
|
||||
"kind": "plain"
|
||||
}
|
||||
})
|
||||
)
|
||||
.as_str(),
|
||||
);
|
||||
fs::write(path, content)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_basic_empty() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -592,10 +614,20 @@ sqlite = true
|
||||
codex_home.path(),
|
||||
"2025-01-02T12-00-00",
|
||||
"2025-01-02T12:00:00Z",
|
||||
"needle suffix",
|
||||
"no body hit in preview",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
append_user_message_to_rollout(
|
||||
rollout_path(
|
||||
codex_home.path(),
|
||||
"2025-01-02T12-00-00",
|
||||
newer_match.as_str(),
|
||||
)
|
||||
.as_path(),
|
||||
"2025-01-02T12:05:00Z",
|
||||
"later message with needle suffix",
|
||||
)?;
|
||||
|
||||
// `thread/list` applies `search_term` on the sqlite fast path. This test creates
|
||||
// rollouts manually, so mark the DB backfill complete and then run an unsearched
|
||||
|
||||
@@ -23,6 +23,7 @@ use codex_state::DB_METRIC_BACKFILL_DURATION_MS;
|
||||
use codex_state::ExtractionOutcome;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use codex_state::apply_rollout_item;
|
||||
use codex_state::thread_search_text_from_rollout_items;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
@@ -121,6 +122,7 @@ pub async fn extract_metadata_from_rollout(
|
||||
}
|
||||
Ok(ExtractionOutcome {
|
||||
metadata,
|
||||
search_text: thread_search_text_from_rollout_items(items.as_slice()),
|
||||
memory_mode: items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
@@ -210,33 +212,8 @@ pub(crate) async fn backfill_sessions_with_lease(
|
||||
}
|
||||
}
|
||||
|
||||
let sessions_root = codex_home.join(SESSIONS_SUBDIR);
|
||||
let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
|
||||
let mut rollout_paths: Vec<BackfillRolloutPath> = Vec::new();
|
||||
for (root, archived) in [(sessions_root, false), (archived_root, true)] {
|
||||
if !tokio::fs::try_exists(&root).await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
match collect_rollout_paths(&root).await {
|
||||
Ok(paths) => {
|
||||
rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath {
|
||||
watermark: backfill_watermark_for_path(codex_home, &path),
|
||||
path,
|
||||
archived,
|
||||
}));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to collect rollout paths under {}: {err}",
|
||||
root.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark));
|
||||
if let Some(last_watermark) = backfill_state.last_watermark.as_deref() {
|
||||
rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark);
|
||||
}
|
||||
let rollout_paths =
|
||||
rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await;
|
||||
|
||||
let mut stats = BackfillStats {
|
||||
scanned: 0,
|
||||
@@ -259,6 +236,7 @@ pub(crate) async fn backfill_sessions_with_lease(
|
||||
);
|
||||
}
|
||||
let mut metadata = outcome.metadata;
|
||||
let search_text = outcome.search_text;
|
||||
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
|
||||
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
|
||||
if let Ok(Some(existing_metadata)) = runtime.get_thread(metadata.id).await {
|
||||
@@ -274,6 +252,17 @@ pub(crate) async fn backfill_sessions_with_lease(
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!("failed to upsert rollout {}: {err}", rollout.path.display());
|
||||
} else {
|
||||
if let Err(err) = runtime
|
||||
.replace_thread_search_text(metadata.id, search_text.as_slice())
|
||||
.await
|
||||
{
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!(
|
||||
"failed to index rollout search text {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = runtime
|
||||
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
|
||||
.await
|
||||
@@ -369,6 +358,108 @@ pub(crate) async fn backfill_sessions_with_lease(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn backfill_thread_search(
|
||||
runtime: &codex_state::StateRuntime,
|
||||
codex_home: &Path,
|
||||
default_provider: &str,
|
||||
) {
|
||||
let backfill_state = match runtime.get_thread_search_backfill_state().await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read thread search backfill state at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if backfill_state.status == BackfillStatus::Complete {
|
||||
return;
|
||||
}
|
||||
let claimed = match runtime
|
||||
.try_claim_thread_search_backfill(BACKFILL_LEASE_SECONDS)
|
||||
.await
|
||||
{
|
||||
Ok(claimed) => claimed,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to claim thread search backfill at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if !claimed {
|
||||
return;
|
||||
}
|
||||
|
||||
let rollout_paths =
|
||||
rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await;
|
||||
let mut stats = BackfillStats {
|
||||
scanned: 0,
|
||||
upserted: 0,
|
||||
failed: 0,
|
||||
};
|
||||
let mut last_watermark = backfill_state.last_watermark.clone();
|
||||
for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) {
|
||||
for rollout in batch {
|
||||
stats.scanned = stats.scanned.saturating_add(1);
|
||||
match extract_metadata_from_rollout(&rollout.path, default_provider).await {
|
||||
Ok(outcome) => {
|
||||
if let Err(err) = runtime
|
||||
.replace_thread_search_text(
|
||||
outcome.metadata.id,
|
||||
outcome.search_text.as_slice(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!(
|
||||
"failed to backfill thread search text {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
} else {
|
||||
stats.upserted = stats.upserted.saturating_add(1);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!(
|
||||
"failed to extract thread search rollout {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(last_entry) = batch.last() {
|
||||
if let Err(err) = runtime
|
||||
.checkpoint_thread_search_backfill(last_entry.watermark.as_str())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"failed to checkpoint thread search backfill at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
} else {
|
||||
last_watermark = Some(last_entry.watermark.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(err) = runtime
|
||||
.mark_thread_search_backfill_complete(last_watermark.as_deref())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"failed to mark thread search backfill complete at {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
}
|
||||
info!(
|
||||
"thread search backfill scanned={}, indexed={}, failed={}",
|
||||
stats.scanned, stats.upserted, stats.failed
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct BackfillRolloutPath {
|
||||
watermark: String,
|
||||
@@ -376,6 +467,40 @@ struct BackfillRolloutPath {
|
||||
archived: bool,
|
||||
}
|
||||
|
||||
async fn rollout_paths_after_watermark(
|
||||
codex_home: &Path,
|
||||
last_watermark: Option<&str>,
|
||||
) -> Vec<BackfillRolloutPath> {
|
||||
let sessions_root = codex_home.join(SESSIONS_SUBDIR);
|
||||
let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
|
||||
let mut rollout_paths: Vec<BackfillRolloutPath> = Vec::new();
|
||||
for (root, archived) in [(sessions_root, false), (archived_root, true)] {
|
||||
if !tokio::fs::try_exists(&root).await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
match collect_rollout_paths(&root).await {
|
||||
Ok(paths) => {
|
||||
rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath {
|
||||
watermark: backfill_watermark_for_path(codex_home, &path),
|
||||
path,
|
||||
archived,
|
||||
}));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to collect rollout paths under {}: {err}",
|
||||
root.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark));
|
||||
if let Some(last_watermark) = last_watermark {
|
||||
rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark);
|
||||
}
|
||||
rollout_paths
|
||||
}
|
||||
|
||||
fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String {
|
||||
path.strip_prefix(codex_home)
|
||||
.unwrap_or(path)
|
||||
|
||||
@@ -1771,6 +1771,9 @@ async fn sync_thread_state_after_write(
|
||||
|| items
|
||||
.iter()
|
||||
.any(codex_state::rollout_item_affects_thread_metadata)
|
||||
|| items
|
||||
.iter()
|
||||
.any(codex_state::rollout_item_affects_thread_search)
|
||||
{
|
||||
state_db::apply_rollout_items(
|
||||
state_db_ctx,
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AgentReasoningEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -582,11 +583,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
|
||||
let initial_first_user_message = initial_thread.first_user_message.clone();
|
||||
|
||||
recorder
|
||||
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
|
||||
AgentMessageEvent {
|
||||
message: "assistant text".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning(
|
||||
AgentReasoningEvent {
|
||||
text: "metadata and search irrelevant reasoning".to_string(),
|
||||
},
|
||||
))])
|
||||
.await?;
|
||||
@@ -595,7 +594,7 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
|
||||
let updated_thread = state_db
|
||||
.get_thread(thread_id)
|
||||
.await
|
||||
.expect("thread should load after agent message")
|
||||
.expect("thread should load after reasoning")
|
||||
.expect("thread should still exist");
|
||||
|
||||
assert_eq!(updated_thread.updated_at, initial_updated_at);
|
||||
@@ -608,11 +607,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
|
||||
tokio::time::sleep(THREAD_UPDATED_AT_TOUCH_INTERVAL + Duration::from_millis(10)).await;
|
||||
|
||||
recorder
|
||||
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
|
||||
AgentMessageEvent {
|
||||
message: "more assistant text".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning(
|
||||
AgentReasoningEvent {
|
||||
text: "later metadata and search irrelevant reasoning".to_string(),
|
||||
},
|
||||
))])
|
||||
.await?;
|
||||
|
||||
@@ -125,6 +125,11 @@ async fn try_init_with_roots_inner(
|
||||
)
|
||||
})?;
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
start_thread_search_backfill(
|
||||
runtime.clone(),
|
||||
codex_home.clone(),
|
||||
default_model_provider_id.clone(),
|
||||
);
|
||||
return Ok(runtime);
|
||||
}
|
||||
|
||||
@@ -151,6 +156,11 @@ async fn try_init_with_roots_inner(
|
||||
)
|
||||
})?;
|
||||
if backfill_state.status == codex_state::BackfillStatus::Complete {
|
||||
start_thread_search_backfill(
|
||||
runtime.clone(),
|
||||
codex_home.clone(),
|
||||
default_model_provider_id.clone(),
|
||||
);
|
||||
return Ok(runtime);
|
||||
}
|
||||
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
|
||||
@@ -178,6 +188,21 @@ async fn try_init_with_roots_inner(
|
||||
}
|
||||
}
|
||||
|
||||
fn start_thread_search_backfill(
|
||||
runtime: StateDbHandle,
|
||||
codex_home: PathBuf,
|
||||
default_model_provider_id: String,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
metadata::backfill_thread_search(
|
||||
runtime.as_ref(),
|
||||
codex_home.as_path(),
|
||||
default_model_provider_id.as_str(),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn emit_startup_warning(message: &str) {
|
||||
warn!("{message}");
|
||||
if !tracing::dispatcher::has_been_set() {
|
||||
@@ -485,6 +510,7 @@ pub async fn reconcile_rollout(
|
||||
}
|
||||
};
|
||||
let mut metadata = outcome.metadata;
|
||||
let search_text = outcome.search_text;
|
||||
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
|
||||
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
|
||||
if let Ok(Some(existing_metadata)) = ctx.get_thread(metadata.id).await {
|
||||
@@ -506,6 +532,16 @@ pub async fn reconcile_rollout(
|
||||
);
|
||||
return;
|
||||
}
|
||||
if let Err(err) = ctx
|
||||
.replace_thread_search_text(metadata.id, search_text.as_slice())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"state db reconcile_rollout search index failed {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
if let Err(err) = ctx
|
||||
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
|
||||
.await
|
||||
|
||||
12
codex-rs/state/migrations/0033_thread_search.sql
Normal file
12
codex-rs/state/migrations/0033_thread_search.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
CREATE VIRTUAL TABLE thread_search USING fts5(
|
||||
thread_id UNINDEXED,
|
||||
body
|
||||
);
|
||||
|
||||
UPDATE backfill_state
|
||||
SET
|
||||
status = 'pending',
|
||||
last_watermark = NULL,
|
||||
last_success_at = NULL,
|
||||
updated_at = CAST(strftime('%s', 'now') AS INTEGER)
|
||||
WHERE id = 1;
|
||||
@@ -0,0 +1,23 @@
|
||||
CREATE TABLE thread_search_backfill_state (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
status TEXT NOT NULL,
|
||||
last_watermark TEXT,
|
||||
last_success_at INTEGER,
|
||||
updated_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO thread_search_backfill_state (
|
||||
id,
|
||||
status,
|
||||
last_watermark,
|
||||
last_success_at,
|
||||
updated_at
|
||||
)
|
||||
VALUES (
|
||||
1,
|
||||
'pending',
|
||||
NULL,
|
||||
NULL,
|
||||
CAST(strftime('%s', 'now') AS INTEGER)
|
||||
)
|
||||
ON CONFLICT(id) DO NOTHING;
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::model::ThreadMetadata;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -42,6 +43,58 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return whether this rollout item contributes user-visible conversation text to search.
|
||||
pub fn rollout_item_affects_thread_search(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => !agent.message.trim().is_empty(),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user)) => {
|
||||
!strip_user_message_prefix(user.message.as_str()).is_empty()
|
||||
}
|
||||
RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => {
|
||||
matches!(role.as_str(), "assistant" | "user")
|
||||
&& content.iter().any(content_item_has_search_text)
|
||||
}
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract searchable user and assistant text from rollout items.
|
||||
pub fn thread_search_text_from_rollout_items(items: &[RolloutItem]) -> Vec<String> {
|
||||
let mut chunks = Vec::new();
|
||||
for item in items {
|
||||
match item {
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => {
|
||||
push_search_text(&mut chunks, agent.message.as_str());
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user)) => {
|
||||
push_search_text(
|
||||
&mut chunks,
|
||||
strip_user_message_prefix(user.message.as_str()),
|
||||
);
|
||||
}
|
||||
RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. })
|
||||
if matches!(role.as_str(), "assistant" | "user") =>
|
||||
{
|
||||
for content_item in content {
|
||||
if let Some(text) = content_item_search_text(content_item) {
|
||||
push_search_text(&mut chunks, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_)
|
||||
| RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_) => {}
|
||||
}
|
||||
}
|
||||
chunks
|
||||
}
|
||||
|
||||
fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &SessionMetaLine) {
|
||||
if metadata.id != meta_line.meta.id {
|
||||
// Ignore session_meta lines that don't match the canonical thread ID,
|
||||
@@ -125,6 +178,24 @@ fn strip_user_message_prefix(text: &str) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
fn content_item_has_search_text(item: &ContentItem) -> bool {
|
||||
content_item_search_text(item).is_some_and(|text| !text.trim().is_empty())
|
||||
}
|
||||
|
||||
fn content_item_search_text(item: &ContentItem) -> Option<&str> {
|
||||
match item {
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text } => Some(text.as_str()),
|
||||
ContentItem::InputImage { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn push_search_text(chunks: &mut Vec<String>, text: &str) {
|
||||
let text = text.trim();
|
||||
if !text.is_empty() {
|
||||
chunks.push(text.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message_preview(user: &UserMessageEvent) -> Option<String> {
|
||||
let message = strip_user_message_prefix(user.message.as_str());
|
||||
if !message.is_empty() {
|
||||
|
||||
@@ -23,6 +23,8 @@ pub use runtime::StateRuntime;
|
||||
/// Most consumers should prefer [`StateRuntime`].
|
||||
pub use extract::apply_rollout_item;
|
||||
pub use extract::rollout_item_affects_thread_metadata;
|
||||
pub use extract::rollout_item_affects_thread_search;
|
||||
pub use extract::thread_search_text_from_rollout_items;
|
||||
pub use model::AgentJob;
|
||||
pub use model::AgentJobCreateParams;
|
||||
pub use model::AgentJobItem;
|
||||
|
||||
@@ -50,6 +50,8 @@ pub struct ThreadsPage {
|
||||
pub struct ExtractionOutcome {
|
||||
/// The extracted thread metadata.
|
||||
pub metadata: ThreadMetadata,
|
||||
/// Searchable conversation text extracted from rollout history.
|
||||
pub search_text: Vec<String>,
|
||||
/// The explicit thread memory mode from rollout metadata, if present.
|
||||
pub memory_mode: Option<String>,
|
||||
/// The number of rollout lines that failed to parse.
|
||||
|
||||
@@ -107,6 +107,115 @@ WHERE id = 1
|
||||
r#"
|
||||
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
|
||||
VALUES (?, ?, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(1_i64)
|
||||
.bind(crate::BackfillStatus::Pending.as_str())
|
||||
.bind(Utc::now().timestamp())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the persisted background thread-search backfill state.
|
||||
pub async fn get_thread_search_backfill_state(&self) -> anyhow::Result<crate::BackfillState> {
|
||||
self.ensure_thread_search_backfill_state_row().await?;
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT status, last_watermark, last_success_at
|
||||
FROM thread_search_backfill_state
|
||||
WHERE id = 1
|
||||
"#,
|
||||
)
|
||||
.fetch_one(self.pool.as_ref())
|
||||
.await?;
|
||||
crate::BackfillState::try_from_row(&row)
|
||||
}
|
||||
|
||||
/// Attempt to claim ownership of the background thread-search backfill.
|
||||
pub async fn try_claim_thread_search_backfill(
|
||||
&self,
|
||||
lease_seconds: i64,
|
||||
) -> anyhow::Result<bool> {
|
||||
self.ensure_thread_search_backfill_state_row().await?;
|
||||
let now = Utc::now().timestamp();
|
||||
let lease_cutoff = now.saturating_sub(lease_seconds.max(0));
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_search_backfill_state
|
||||
SET status = ?, updated_at = ?
|
||||
WHERE id = 1
|
||||
AND status != ?
|
||||
AND (status != ? OR updated_at <= ?)
|
||||
"#,
|
||||
)
|
||||
.bind(crate::BackfillStatus::Running.as_str())
|
||||
.bind(now)
|
||||
.bind(crate::BackfillStatus::Complete.as_str())
|
||||
.bind(crate::BackfillStatus::Running.as_str())
|
||||
.bind(lease_cutoff)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() == 1)
|
||||
}
|
||||
|
||||
/// Persist background thread-search backfill progress.
|
||||
pub async fn checkpoint_thread_search_backfill(&self, watermark: &str) -> anyhow::Result<()> {
|
||||
self.ensure_thread_search_backfill_state_row().await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_search_backfill_state
|
||||
SET status = ?, last_watermark = ?, updated_at = ?
|
||||
WHERE id = 1
|
||||
"#,
|
||||
)
|
||||
.bind(crate::BackfillStatus::Running.as_str())
|
||||
.bind(watermark)
|
||||
.bind(Utc::now().timestamp())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark the background thread-search backfill as complete.
|
||||
pub async fn mark_thread_search_backfill_complete(
|
||||
&self,
|
||||
last_watermark: Option<&str>,
|
||||
) -> anyhow::Result<()> {
|
||||
self.ensure_thread_search_backfill_state_row().await?;
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_search_backfill_state
|
||||
SET
|
||||
status = ?,
|
||||
last_watermark = COALESCE(?, last_watermark),
|
||||
last_success_at = ?,
|
||||
updated_at = ?
|
||||
WHERE id = 1
|
||||
"#,
|
||||
)
|
||||
.bind(crate::BackfillStatus::Complete.as_str())
|
||||
.bind(last_watermark)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_thread_search_backfill_state_row(&self) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_search_backfill_state (
|
||||
id,
|
||||
status,
|
||||
last_watermark,
|
||||
last_success_at,
|
||||
updated_at
|
||||
)
|
||||
VALUES (?, ?, NULL, NULL, ?)
|
||||
ON CONFLICT(id) DO NOTHING
|
||||
"#,
|
||||
)
|
||||
|
||||
@@ -584,6 +584,51 @@ ON CONFLICT(id) DO NOTHING
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
/// Replace all indexed conversation text for a thread.
|
||||
pub async fn replace_thread_search_text(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
chunks: &[String],
|
||||
) -> anyhow::Result<()> {
|
||||
let thread_id = thread_id.to_string();
|
||||
let mut tx = self.pool.begin().await?;
|
||||
sqlx::query("DELETE FROM thread_search WHERE thread_id = ?")
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
for chunk in chunks {
|
||||
sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)")
|
||||
.bind(thread_id.as_str())
|
||||
.bind(chunk.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Append newly persisted conversation text to the thread search index.
|
||||
pub async fn append_thread_search_text(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
chunks: &[String],
|
||||
) -> anyhow::Result<()> {
|
||||
if chunks.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let thread_id = thread_id.to_string();
|
||||
let mut tx = self.pool.begin().await?;
|
||||
for chunk in chunks {
|
||||
sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)")
|
||||
.bind(thread_id.as_str())
|
||||
.bind(chunk.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn touch_thread_updated_at(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
@@ -876,6 +921,9 @@ ON CONFLICT(thread_id, position) DO NOTHING
|
||||
self.upsert_thread(&metadata).await
|
||||
};
|
||||
upsert_result?;
|
||||
let search_text = crate::thread_search_text_from_rollout_items(items);
|
||||
self.append_thread_search_text(builder.id, search_text.as_slice())
|
||||
.await?;
|
||||
if let Some(memory_mode) = extract_memory_mode(items)
|
||||
&& let Err(err) = self
|
||||
.set_thread_memory_mode(builder.id, memory_mode.as_str())
|
||||
@@ -943,10 +991,17 @@ ON CONFLICT(thread_id, position) DO NOTHING
|
||||
|
||||
/// Delete a thread metadata row by id.
|
||||
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
|
||||
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
let thread_id = thread_id.to_string();
|
||||
let mut tx = self.pool.begin().await?;
|
||||
sqlx::query("DELETE FROM thread_search WHERE thread_id = ?")
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
}
|
||||
@@ -1105,7 +1160,15 @@ pub(super) fn push_thread_filters<'a>(
|
||||
builder.push_bind(search_term);
|
||||
builder.push(") > 0 OR instr(threads.preview, ");
|
||||
builder.push_bind(search_term);
|
||||
builder.push(") > 0)");
|
||||
builder.push(") > 0");
|
||||
if let Some(search_query) = thread_search_query(search_term) {
|
||||
builder.push(
|
||||
" OR threads.id IN (SELECT thread_id FROM thread_search WHERE thread_search MATCH ",
|
||||
);
|
||||
builder.push_bind(search_query);
|
||||
builder.push(")");
|
||||
}
|
||||
builder.push(")");
|
||||
}
|
||||
if let Some(anchor) = anchor {
|
||||
let anchor_ts = datetime_to_epoch_millis(anchor.ts);
|
||||
@@ -1157,6 +1220,20 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn thread_search_query(search_term: &str) -> Option<String> {
|
||||
let terms = search_term
|
||||
.split_whitespace()
|
||||
.filter_map(|term| {
|
||||
let term = term.trim();
|
||||
if term.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(format!("\"{}\"*", term.replace('"', "\"\"")))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
(!terms.is_empty()).then(|| terms.join(" AND "))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user