Compare commits

...

1 Commits

Author SHA1 Message Date
Francis Chalissery
92e658df03 Add SQLite-backed thread content search 2026-05-18 17:53:26 -07:00
12 changed files with 532 additions and 43 deletions

View File

@@ -35,6 +35,7 @@ use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::protocol::SubAgentSource;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::cmp::Reverse;
use std::fs;
use std::fs::FileTimes;
@@ -174,6 +175,27 @@ fn set_rollout_cwd(path: &Path, cwd: &Path) -> Result<()> {
Ok(())
}
fn append_user_message_to_rollout(path: &Path, timestamp: &str, message: &str) -> Result<()> {
let mut content = fs::read_to_string(path)?;
content.push_str(
format!(
"{}\n",
json!({
"timestamp": timestamp,
"type": "event_msg",
"payload": {
"type": "user_message",
"message": message,
"kind": "plain"
}
})
)
.as_str(),
);
fs::write(path, content)?;
Ok(())
}
#[tokio::test]
async fn thread_list_basic_empty() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -592,10 +614,20 @@ sqlite = true
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"needle suffix",
"no body hit in preview",
Some("mock_provider"),
/*git_info*/ None,
)?;
append_user_message_to_rollout(
rollout_path(
codex_home.path(),
"2025-01-02T12-00-00",
newer_match.as_str(),
)
.as_path(),
"2025-01-02T12:05:00Z",
"later message with needle suffix",
)?;
// `thread/list` applies `search_term` on the sqlite fast path. This test creates
// rollouts manually, so mark the DB backfill complete and then run an unsearched

View File

@@ -23,6 +23,7 @@ use codex_state::DB_METRIC_BACKFILL_DURATION_MS;
use codex_state::ExtractionOutcome;
use codex_state::ThreadMetadataBuilder;
use codex_state::apply_rollout_item;
use codex_state::thread_search_text_from_rollout_items;
use std::path::Path;
use std::path::PathBuf;
use tracing::info;
@@ -121,6 +122,7 @@ pub async fn extract_metadata_from_rollout(
}
Ok(ExtractionOutcome {
metadata,
search_text: thread_search_text_from_rollout_items(items.as_slice()),
memory_mode: items.iter().rev().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
@@ -210,33 +212,8 @@ pub(crate) async fn backfill_sessions_with_lease(
}
}
let sessions_root = codex_home.join(SESSIONS_SUBDIR);
let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
let mut rollout_paths: Vec<BackfillRolloutPath> = Vec::new();
for (root, archived) in [(sessions_root, false), (archived_root, true)] {
if !tokio::fs::try_exists(&root).await.unwrap_or(false) {
continue;
}
match collect_rollout_paths(&root).await {
Ok(paths) => {
rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath {
watermark: backfill_watermark_for_path(codex_home, &path),
path,
archived,
}));
}
Err(err) => {
warn!(
"failed to collect rollout paths under {}: {err}",
root.display()
);
}
}
}
rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark));
if let Some(last_watermark) = backfill_state.last_watermark.as_deref() {
rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark);
}
let rollout_paths =
rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await;
let mut stats = BackfillStats {
scanned: 0,
@@ -259,6 +236,7 @@ pub(crate) async fn backfill_sessions_with_lease(
);
}
let mut metadata = outcome.metadata;
let search_text = outcome.search_text;
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
if let Ok(Some(existing_metadata)) = runtime.get_thread(metadata.id).await {
@@ -274,6 +252,17 @@ pub(crate) async fn backfill_sessions_with_lease(
stats.failed = stats.failed.saturating_add(1);
warn!("failed to upsert rollout {}: {err}", rollout.path.display());
} else {
if let Err(err) = runtime
.replace_thread_search_text(metadata.id, search_text.as_slice())
.await
{
stats.failed = stats.failed.saturating_add(1);
warn!(
"failed to index rollout search text {}: {err}",
rollout.path.display()
);
continue;
}
if let Err(err) = runtime
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
.await
@@ -369,6 +358,108 @@ pub(crate) async fn backfill_sessions_with_lease(
}
}
pub(crate) async fn backfill_thread_search(
runtime: &codex_state::StateRuntime,
codex_home: &Path,
default_provider: &str,
) {
let backfill_state = match runtime.get_thread_search_backfill_state().await {
Ok(state) => state,
Err(err) => {
warn!(
"failed to read thread search backfill state at {}: {err}",
codex_home.display()
);
return;
}
};
if backfill_state.status == BackfillStatus::Complete {
return;
}
let claimed = match runtime
.try_claim_thread_search_backfill(BACKFILL_LEASE_SECONDS)
.await
{
Ok(claimed) => claimed,
Err(err) => {
warn!(
"failed to claim thread search backfill at {}: {err}",
codex_home.display()
);
return;
}
};
if !claimed {
return;
}
let rollout_paths =
rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await;
let mut stats = BackfillStats {
scanned: 0,
upserted: 0,
failed: 0,
};
let mut last_watermark = backfill_state.last_watermark.clone();
for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) {
for rollout in batch {
stats.scanned = stats.scanned.saturating_add(1);
match extract_metadata_from_rollout(&rollout.path, default_provider).await {
Ok(outcome) => {
if let Err(err) = runtime
.replace_thread_search_text(
outcome.metadata.id,
outcome.search_text.as_slice(),
)
.await
{
stats.failed = stats.failed.saturating_add(1);
warn!(
"failed to backfill thread search text {}: {err}",
rollout.path.display()
);
} else {
stats.upserted = stats.upserted.saturating_add(1);
}
}
Err(err) => {
stats.failed = stats.failed.saturating_add(1);
warn!(
"failed to extract thread search rollout {}: {err}",
rollout.path.display()
);
}
}
}
if let Some(last_entry) = batch.last() {
if let Err(err) = runtime
.checkpoint_thread_search_backfill(last_entry.watermark.as_str())
.await
{
warn!(
"failed to checkpoint thread search backfill at {}: {err}",
codex_home.display()
);
} else {
last_watermark = Some(last_entry.watermark.clone());
}
}
}
if let Err(err) = runtime
.mark_thread_search_backfill_complete(last_watermark.as_deref())
.await
{
warn!(
"failed to mark thread search backfill complete at {}: {err}",
codex_home.display()
);
}
info!(
"thread search backfill scanned={}, indexed={}, failed={}",
stats.scanned, stats.upserted, stats.failed
);
}
#[derive(Debug, Clone)]
struct BackfillRolloutPath {
watermark: String,
@@ -376,6 +467,40 @@ struct BackfillRolloutPath {
archived: bool,
}
async fn rollout_paths_after_watermark(
codex_home: &Path,
last_watermark: Option<&str>,
) -> Vec<BackfillRolloutPath> {
let sessions_root = codex_home.join(SESSIONS_SUBDIR);
let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
let mut rollout_paths: Vec<BackfillRolloutPath> = Vec::new();
for (root, archived) in [(sessions_root, false), (archived_root, true)] {
if !tokio::fs::try_exists(&root).await.unwrap_or(false) {
continue;
}
match collect_rollout_paths(&root).await {
Ok(paths) => {
rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath {
watermark: backfill_watermark_for_path(codex_home, &path),
path,
archived,
}));
}
Err(err) => {
warn!(
"failed to collect rollout paths under {}: {err}",
root.display()
);
}
}
}
rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark));
if let Some(last_watermark) = last_watermark {
rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark);
}
rollout_paths
}
fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String {
path.strip_prefix(codex_home)
.unwrap_or(path)

View File

@@ -1771,6 +1771,9 @@ async fn sync_thread_state_after_write(
|| items
.iter()
.any(codex_state::rollout_item_affects_thread_metadata)
|| items
.iter()
.any(codex_state::rollout_item_affects_thread_search)
{
state_db::apply_rollout_items(
state_db_ctx,

View File

@@ -7,6 +7,7 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -582,11 +583,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
let initial_first_user_message = initial_thread.first_user_message.clone();
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "assistant text".to_string(),
phase: None,
memory_citation: None,
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning(
AgentReasoningEvent {
text: "metadata and search irrelevant reasoning".to_string(),
},
))])
.await?;
@@ -595,7 +594,7 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
let updated_thread = state_db
.get_thread(thread_id)
.await
.expect("thread should load after agent message")
.expect("thread should load after reasoning")
.expect("thread should still exist");
assert_eq!(updated_thread.updated_at, initial_updated_at);
@@ -608,11 +607,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R
tokio::time::sleep(THREAD_UPDATED_AT_TOUCH_INTERVAL + Duration::from_millis(10)).await;
recorder
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage(
AgentMessageEvent {
message: "more assistant text".to_string(),
phase: None,
memory_citation: None,
.record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning(
AgentReasoningEvent {
text: "later metadata and search irrelevant reasoning".to_string(),
},
))])
.await?;

View File

@@ -125,6 +125,11 @@ async fn try_init_with_roots_inner(
)
})?;
if backfill_state.status == codex_state::BackfillStatus::Complete {
start_thread_search_backfill(
runtime.clone(),
codex_home.clone(),
default_model_provider_id.clone(),
);
return Ok(runtime);
}
@@ -151,6 +156,11 @@ async fn try_init_with_roots_inner(
)
})?;
if backfill_state.status == codex_state::BackfillStatus::Complete {
start_thread_search_backfill(
runtime.clone(),
codex_home.clone(),
default_model_provider_id.clone(),
);
return Ok(runtime);
}
if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT {
@@ -178,6 +188,21 @@ async fn try_init_with_roots_inner(
}
}
fn start_thread_search_backfill(
runtime: StateDbHandle,
codex_home: PathBuf,
default_model_provider_id: String,
) {
tokio::spawn(async move {
metadata::backfill_thread_search(
runtime.as_ref(),
codex_home.as_path(),
default_model_provider_id.as_str(),
)
.await;
});
}
fn emit_startup_warning(message: &str) {
warn!("{message}");
if !tracing::dispatcher::has_been_set() {
@@ -485,6 +510,7 @@ pub async fn reconcile_rollout(
}
};
let mut metadata = outcome.metadata;
let search_text = outcome.search_text;
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
if let Ok(Some(existing_metadata)) = ctx.get_thread(metadata.id).await {
@@ -506,6 +532,16 @@ pub async fn reconcile_rollout(
);
return;
}
if let Err(err) = ctx
.replace_thread_search_text(metadata.id, search_text.as_slice())
.await
{
warn!(
"state db reconcile_rollout search index failed {}: {err}",
rollout_path.display()
);
return;
}
if let Err(err) = ctx
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
.await

View File

@@ -0,0 +1,12 @@
CREATE VIRTUAL TABLE thread_search USING fts5(
thread_id UNINDEXED,
body
);
UPDATE backfill_state
SET
status = 'pending',
last_watermark = NULL,
last_success_at = NULL,
updated_at = CAST(strftime('%s', 'now') AS INTEGER)
WHERE id = 1;

View File

@@ -0,0 +1,23 @@
CREATE TABLE thread_search_backfill_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
status TEXT NOT NULL,
last_watermark TEXT,
last_success_at INTEGER,
updated_at INTEGER NOT NULL
);
INSERT INTO thread_search_backfill_state (
id,
status,
last_watermark,
last_success_at,
updated_at
)
VALUES (
1,
'pending',
NULL,
NULL,
CAST(strftime('%s', 'now') AS INTEGER)
)
ON CONFLICT(id) DO NOTHING;

View File

@@ -1,4 +1,5 @@
use crate::model::ThreadMetadata;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -42,6 +43,58 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
}
}
/// Return whether this rollout item contributes user-visible conversation text to search.
pub fn rollout_item_affects_thread_search(item: &RolloutItem) -> bool {
match item {
RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => !agent.message.trim().is_empty(),
RolloutItem::EventMsg(EventMsg::UserMessage(user)) => {
!strip_user_message_prefix(user.message.as_str()).is_empty()
}
RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => {
matches!(role.as_str(), "assistant" | "user")
&& content.iter().any(content_item_has_search_text)
}
RolloutItem::SessionMeta(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => false,
}
}
/// Extract searchable user and assistant text from rollout items.
pub fn thread_search_text_from_rollout_items(items: &[RolloutItem]) -> Vec<String> {
let mut chunks = Vec::new();
for item in items {
match item {
RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => {
push_search_text(&mut chunks, agent.message.as_str());
}
RolloutItem::EventMsg(EventMsg::UserMessage(user)) => {
push_search_text(
&mut chunks,
strip_user_message_prefix(user.message.as_str()),
);
}
RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. })
if matches!(role.as_str(), "assistant" | "user") =>
{
for content_item in content {
if let Some(text) = content_item_search_text(content_item) {
push_search_text(&mut chunks, text);
}
}
}
RolloutItem::SessionMeta(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => {}
}
}
chunks
}
fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &SessionMetaLine) {
if metadata.id != meta_line.meta.id {
// Ignore session_meta lines that don't match the canonical thread ID,
@@ -125,6 +178,24 @@ fn strip_user_message_prefix(text: &str) -> &str {
}
}
fn content_item_has_search_text(item: &ContentItem) -> bool {
content_item_search_text(item).is_some_and(|text| !text.trim().is_empty())
}
fn content_item_search_text(item: &ContentItem) -> Option<&str> {
match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => Some(text.as_str()),
ContentItem::InputImage { .. } => None,
}
}
fn push_search_text(chunks: &mut Vec<String>, text: &str) {
let text = text.trim();
if !text.is_empty() {
chunks.push(text.to_string());
}
}
fn user_message_preview(user: &UserMessageEvent) -> Option<String> {
let message = strip_user_message_prefix(user.message.as_str());
if !message.is_empty() {

View File

@@ -23,6 +23,8 @@ pub use runtime::StateRuntime;
/// Most consumers should prefer [`StateRuntime`].
pub use extract::apply_rollout_item;
pub use extract::rollout_item_affects_thread_metadata;
pub use extract::rollout_item_affects_thread_search;
pub use extract::thread_search_text_from_rollout_items;
pub use model::AgentJob;
pub use model::AgentJobCreateParams;
pub use model::AgentJobItem;

View File

@@ -50,6 +50,8 @@ pub struct ThreadsPage {
pub struct ExtractionOutcome {
/// The extracted thread metadata.
pub metadata: ThreadMetadata,
/// Searchable conversation text extracted from rollout history.
pub search_text: Vec<String>,
/// The explicit thread memory mode from rollout metadata, if present.
pub memory_mode: Option<String>,
/// The number of rollout lines that failed to parse.

View File

@@ -107,6 +107,115 @@ WHERE id = 1
r#"
INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(1_i64)
.bind(crate::BackfillStatus::Pending.as_str())
.bind(Utc::now().timestamp())
.execute(self.pool.as_ref())
.await?;
Ok(())
}
/// Read the persisted background thread-search backfill state.
pub async fn get_thread_search_backfill_state(&self) -> anyhow::Result<crate::BackfillState> {
self.ensure_thread_search_backfill_state_row().await?;
let row = sqlx::query(
r#"
SELECT status, last_watermark, last_success_at
FROM thread_search_backfill_state
WHERE id = 1
"#,
)
.fetch_one(self.pool.as_ref())
.await?;
crate::BackfillState::try_from_row(&row)
}
/// Attempt to claim ownership of the background thread-search backfill.
pub async fn try_claim_thread_search_backfill(
&self,
lease_seconds: i64,
) -> anyhow::Result<bool> {
self.ensure_thread_search_backfill_state_row().await?;
let now = Utc::now().timestamp();
let lease_cutoff = now.saturating_sub(lease_seconds.max(0));
let result = sqlx::query(
r#"
UPDATE thread_search_backfill_state
SET status = ?, updated_at = ?
WHERE id = 1
AND status != ?
AND (status != ? OR updated_at <= ?)
"#,
)
.bind(crate::BackfillStatus::Running.as_str())
.bind(now)
.bind(crate::BackfillStatus::Complete.as_str())
.bind(crate::BackfillStatus::Running.as_str())
.bind(lease_cutoff)
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() == 1)
}
/// Persist background thread-search backfill progress.
pub async fn checkpoint_thread_search_backfill(&self, watermark: &str) -> anyhow::Result<()> {
self.ensure_thread_search_backfill_state_row().await?;
sqlx::query(
r#"
UPDATE thread_search_backfill_state
SET status = ?, last_watermark = ?, updated_at = ?
WHERE id = 1
"#,
)
.bind(crate::BackfillStatus::Running.as_str())
.bind(watermark)
.bind(Utc::now().timestamp())
.execute(self.pool.as_ref())
.await?;
Ok(())
}
/// Mark the background thread-search backfill as complete.
pub async fn mark_thread_search_backfill_complete(
&self,
last_watermark: Option<&str>,
) -> anyhow::Result<()> {
self.ensure_thread_search_backfill_state_row().await?;
let now = Utc::now().timestamp();
sqlx::query(
r#"
UPDATE thread_search_backfill_state
SET
status = ?,
last_watermark = COALESCE(?, last_watermark),
last_success_at = ?,
updated_at = ?
WHERE id = 1
"#,
)
.bind(crate::BackfillStatus::Complete.as_str())
.bind(last_watermark)
.bind(now)
.bind(now)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
async fn ensure_thread_search_backfill_state_row(&self) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO thread_search_backfill_state (
id,
status,
last_watermark,
last_success_at,
updated_at
)
VALUES (?, ?, NULL, NULL, ?)
ON CONFLICT(id) DO NOTHING
"#,
)

View File

@@ -584,6 +584,51 @@ ON CONFLICT(id) DO NOTHING
Ok(result.rows_affected() > 0)
}
/// Replace all indexed conversation text for a thread.
pub async fn replace_thread_search_text(
&self,
thread_id: ThreadId,
chunks: &[String],
) -> anyhow::Result<()> {
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
sqlx::query("DELETE FROM thread_search WHERE thread_id = ?")
.bind(thread_id.as_str())
.execute(&mut *tx)
.await?;
for chunk in chunks {
sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)")
.bind(thread_id.as_str())
.bind(chunk.as_str())
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
/// Append newly persisted conversation text to the thread search index.
pub async fn append_thread_search_text(
&self,
thread_id: ThreadId,
chunks: &[String],
) -> anyhow::Result<()> {
if chunks.is_empty() {
return Ok(());
}
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
for chunk in chunks {
sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)")
.bind(thread_id.as_str())
.bind(chunk.as_str())
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn touch_thread_updated_at(
&self,
thread_id: ThreadId,
@@ -876,6 +921,9 @@ ON CONFLICT(thread_id, position) DO NOTHING
self.upsert_thread(&metadata).await
};
upsert_result?;
let search_text = crate::thread_search_text_from_rollout_items(items);
self.append_thread_search_text(builder.id, search_text.as_slice())
.await?;
if let Some(memory_mode) = extract_memory_mode(items)
&& let Err(err) = self
.set_thread_memory_mode(builder.id, memory_mode.as_str())
@@ -943,10 +991,17 @@ ON CONFLICT(thread_id, position) DO NOTHING
/// Delete a thread metadata row by id.
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
sqlx::query("DELETE FROM thread_search WHERE thread_id = ?")
.bind(thread_id.as_str())
.execute(&mut *tx)
.await?;
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
.bind(thread_id.as_str())
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(result.rows_affected())
}
}
@@ -1105,7 +1160,15 @@ pub(super) fn push_thread_filters<'a>(
builder.push_bind(search_term);
builder.push(") > 0 OR instr(threads.preview, ");
builder.push_bind(search_term);
builder.push(") > 0)");
builder.push(") > 0");
if let Some(search_query) = thread_search_query(search_term) {
builder.push(
" OR threads.id IN (SELECT thread_id FROM thread_search WHERE thread_search MATCH ",
);
builder.push_bind(search_query);
builder.push(")");
}
builder.push(")");
}
if let Some(anchor) = anchor {
let anchor_ts = datetime_to_epoch_millis(anchor.ts);
@@ -1157,6 +1220,20 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str {
.unwrap_or_default()
}
fn thread_search_query(search_term: &str) -> Option<String> {
let terms = search_term
.split_whitespace()
.filter_map(|term| {
let term = term.trim();
if term.is_empty() {
return None;
}
Some(format!("\"{}\"*", term.replace('"', "\"\"")))
})
.collect::<Vec<_>>();
(!terms.is_empty()).then(|| terms.join(" AND "))
}
#[cfg(test)]
mod tests {
use super::*;