From afb9f52cfe1c4854391eccb263781e6a1ed4569a Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 16:50:00 +0100 Subject: [PATCH 01/11] Add consume batching delay --- apps/labrinth/.env.docker-compose | 1 + apps/labrinth/.env.local | 1 + apps/labrinth/src/env.rs | 1 + apps/labrinth/src/search/incremental.rs | 41 +++---- .../src/search/incremental/consume.rs | 101 +++++++++++++----- 5 files changed, 91 insertions(+), 54 deletions(-) diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index 097a47c640..288e1aa0fc 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -25,6 +25,7 @@ ELASTICSEARCH_INDEX_PREFIX=labrinth ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=elastic SEARCH_INDEX_CHUNK_SIZE=5000 +SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 7925e52a1e..7843c066f8 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -43,6 +43,7 @@ ELASTICSEARCH_USERNAME= ELASTICSEARCH_PASSWORD= SEARCH_INDEX_CHUNK_SIZE=5000 +SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index bf99822156..3001a40d0c 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -160,6 +160,7 @@ vars! { // search SEARCH_BACKEND: crate::search::SearchBackendKind = crate::search::SearchBackendKind::Typesense; SEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64; + SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS: u64 = 5u64; TYPESENSE_URL: String = "http://localhost:8108"; TYPESENSE_API_KEY: String = "modrinth"; TYPESENSE_INDEX_PREFIX: String = "labrinth"; diff --git a/apps/labrinth/src/search/incremental.rs b/apps/labrinth/src/search/incremental.rs index 07c5a05b55..88a9f1d6a2 100644 --- a/apps/labrinth/src/search/incremental.rs +++ b/apps/labrinth/src/search/incremental.rs @@ -1,6 +1,6 @@ pub mod consume; -use std::{mem, sync::Arc}; +use std::{collections::HashSet, mem, sync::Arc, time::Duration}; use rdkafka::{producer::FutureRecord, util::Timeout}; use serde::Serialize; @@ -13,31 +13,29 @@ use crate::{ pub const SEARCH_PROJECT_INDEX_QUEUE_TOPIC: &str = "public.labrinth.search-project-index-queue.v1"; +const QUEUE_FLUSH_INTERVAL: Duration = Duration::from_secs(10); #[derive(Clone)] pub struct IncrementalSearchQueue { - operations: Arc>>, + project_ids: Arc>>, kafka_client: actix_web::web::Data, } impl IncrementalSearchQueue { pub fn new(kafka_client: actix_web::web::Data) -> Self { Self { - operations: Arc::new(Mutex::new(Vec::new())), + project_ids: Arc::new(Mutex::new(HashSet::new())), kafka_client, } } pub async fn push(&self, project_id: ProjectId) { - self.operations - .lock() - .await - .push(SearchIndexOperation { project_id }); + self.project_ids.lock().await.insert(project_id); } pub async fn run(self) { loop { - tokio::time::sleep(KAFKA_OPERATION_INTERVAL).await; + tokio::time::sleep(QUEUE_FLUSH_INTERVAL).await; if let Err(err) = self.drain().await { tracing::error!( @@ -48,22 +46,20 @@ impl IncrementalSearchQueue { } pub async fn drain(&self) -> eyre::Result<()> { - let operations = { - let mut operations = self.operations.lock().await; - mem::take(&mut *operations) + let project_ids = { + let mut project_ids = self.project_ids.lock().await; + mem::take(&mut *project_ids) }; - if operations.is_empty() { + if project_ids.is_empty() { return Ok(()); } - let mut operations = operations.into_iter(); - while let Some(operation) = operations.next() { + let mut project_ids = project_ids.into_iter(); + while let Some(project_id) = project_ids.next() { let event = KafkaEvent::new( SEARCH_PROJECT_INDEX_QUEUE_TOPIC, - SearchProjectIndexQueueEventData { - project_id: operation.project_id, - }, + SearchProjectIndexQueueEventData { project_id }, ); let event_id = event.event_metadata.event_id; let key = event_id.to_string(); @@ -78,9 +74,9 @@ impl IncrementalSearchQueue { .send(record, Timeout::After(KAFKA_OPERATION_INTERVAL)) .await { - let mut queued_operations = self.operations.lock().await; - queued_operations.push(operation); - queued_operations.extend(operations); + let mut queued_project_ids = self.project_ids.lock().await; + queued_project_ids.insert(project_id); + queued_project_ids.extend(project_ids); return Err(err.into()); } @@ -90,11 +86,6 @@ impl IncrementalSearchQueue { } } -#[derive(Debug, Clone)] -pub struct SearchIndexOperation { - pub project_id: ProjectId, -} - #[derive(Debug, Serialize)] pub struct SearchProjectIndexQueueEventData { pub project_id: ProjectId, diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index 69e3b30f54..040285a895 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -1,16 +1,21 @@ use actix_web::web; use eyre::WrapErr; -use futures::FutureExt; +use futures::never::Never; use rdkafka::{ Message, consumer::{CommitMode, Consumer, StreamConsumer}, message::BorrowedMessage, }; use serde::Deserialize; -use std::collections::HashSet; +use std::{ + collections::HashSet, + time::{Duration, Instant}, +}; +use tracing::info; use crate::{ database::{PgPool, redis::RedisPool}, + env::ENV, models::ids::ProjectId, search::{ SearchBackend, incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, @@ -22,8 +27,6 @@ use crate::{ }, }; -const BATCH_SIZE: usize = 100; - pub async fn run( ro_pool: PgPool, redis_pool: RedisPool, @@ -60,25 +63,57 @@ async fn consume( search_backend: &dyn SearchBackend, consumer: &StreamConsumer, ) -> eyre::Result<()> { + // keep buffer capacity (pre-)allocated + let mut messages = Vec::with_capacity(1024); loop { - let mut messages = Vec::with_capacity(BATCH_SIZE); - messages.push( - consumer - .recv() - .await - .wrap_err("failed to receive Kafka message")?, - ); + messages.clear(); - while messages.len() < BATCH_SIZE { - let Some(message) = consumer.recv().now_or_never() else { - break; - }; + // wait for a first message to come in... + let first_message = consumer + .recv() + .await + .wrap_err("failed to receive Kafka message")?; + messages.push(first_message); - messages.push(message.wrap_err("failed to receive Kafka message")?); + let delay = Duration::from_secs( + ENV.SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS, + ); + info!( + "Received initial Kafka message; waiting {delay:.2?} for more to batch", + ); + + // ..then wait a while for more messages to batch up + // so that we can process a big batch to reindex + // + // do a little trick with an `AsyncFnMut` closure + // so that we can explicitly specify the return type + let mut collect_more_messages = async || -> eyre::Result { + loop { + let message = consumer + .recv() + .await + .wrap_err("failed to receive Kafka message")?; + messages.push(message); + } + }; + match tokio::time::timeout(delay, collect_more_messages()).await { + Err(_elapsed) => {} + Ok(Err(err)) => { + return Err( + err.wrap_err("failed to receive more Kafka messages") + ); + } } - consume_batch(ro_pool, redis_pool, search_backend, consumer, messages) - .await?; + info!("Consuming batch of {} messages", messages.len()); + consume_batch( + ro_pool, + redis_pool, + search_backend, + consumer, + messages.drain(..), + ) + .await?; } } @@ -87,8 +122,10 @@ async fn consume_batch( redis_pool: &RedisPool, search_backend: &dyn SearchBackend, consumer: &StreamConsumer, - messages: Vec>, + messages: impl IntoIterator>, ) -> eyre::Result<()> { + let start = Instant::now(); + let mut project_ids = Vec::new(); let mut seen_project_ids = HashSet::new(); let mut messages_to_commit = Vec::new(); @@ -131,19 +168,19 @@ async fn consume_batch( messages_to_commit.push(message); } - if project_ids.is_empty() { - return Ok(()); - } - - tracing::info!( + info!( kafka.message_count = messages_to_commit.len(), - project_count = project_ids.len(), - "Consumed incremental search index event batch" + "Read all Kafka messages in {:.2?}, found {} projects to reindex", + start.elapsed(), + project_ids.len(), ); + let start = Instant::now(); - reindex_projects(ro_pool, redis_pool, search_backend, &project_ids) - .await - .wrap_err("failed to reindex project batch")?; + if !project_ids.is_empty() { + reindex_projects(ro_pool, redis_pool, search_backend, &project_ids) + .await + .wrap_err("failed to reindex project batch")?; + } for message in messages_to_commit { consumer @@ -151,6 +188,12 @@ async fn consume_batch( .wrap_err("failed to commit Kafka message")?; } + info!( + "Reindexed {} projects in {:.2?}", + project_ids.len(), + start.elapsed() + ); + Ok(()) } From 7821b6c1cd6421cddbf4f8337fa546b8378c6915 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 18:04:11 +0100 Subject: [PATCH 02/11] maybe fix --- apps/labrinth/src/background_task.rs | 3 +- .../src/search/incremental/consume.rs | 23 +++++++-------- apps/labrinth/src/search/indexing.rs | 29 ++++++++++--------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 31c51e2367..0c3fb13ef0 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -19,7 +19,7 @@ use crate::util::anrok; use actix_web::web; use clap::ValueEnum; use eyre::WrapErr; -use tracing::info; +use tracing::{info, instrument}; #[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)] #[clap(rename_all = "kebab_case")] @@ -50,6 +50,7 @@ pub enum BackgroundTask { impl BackgroundTask { #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(background_task = ?self))] pub async fn run( self, pool: PgPool, diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index 040285a895..cb0d4e1a0e 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -11,7 +11,7 @@ use std::{ collections::HashSet, time::{Duration, Instant}, }; -use tracing::info; +use tracing::{Instrument, info, info_span}; use crate::{ database::{PgPool, redis::RedisPool}, @@ -214,18 +214,15 @@ pub async fn reindex_projects( ) -> eyre::Result<()> { search_backend.remove_project_documents(project_ids).await?; - let mut documents = Vec::new(); - for project_id in project_ids { - documents.extend( - index_project_documents(ro_pool, redis_pool, *project_id) - .await - .wrap_err_with(|| { - format!( - "failed to build project {project_id} search documents" - ) - })?, - ); - } + let documents = index_project_documents(ro_pool, redis_pool, project_ids) + .instrument(info_span!("index", batch_size = project_ids.len())) + .await + .wrap_err_with(|| { + format!( + "failed to build search documents for {} projects", + project_ids.len() + ) + })?; search_backend.index_documents(&documents).await?; diff --git a/apps/labrinth/src/search/indexing.rs b/apps/labrinth/src/search/indexing.rs index 3c4c8a762f..75bfa3e926 100644 --- a/apps/labrinth/src/search/indexing.rs +++ b/apps/labrinth/src/search/indexing.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use regex::Regex; use std::collections::HashMap; use std::sync::LazyLock; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::database::PgPool; use crate::database::models::loader_fields::{ @@ -121,10 +121,13 @@ pub async fn index_local( pub async fn index_project_documents( pool: &PgPool, redis: &RedisPool, - project_id: ProjectId, + project_ids: &[ProjectId], ) -> eyre::Result> { let searchable_statuses = searchable_statuses(); - let project_ids = vec![DBProjectId::from(project_id).0]; + let project_ids = project_ids + .iter() + .map(|project_id| DBProjectId::from(*project_id).0) + .collect::>(); let db_projects = sqlx::query!( r#" @@ -177,7 +180,7 @@ async fn build_search_documents( .await .wrap_err("failed to fetch query context")?; - info!("Indexing local dependencies!"); + debug!("Indexing local dependencies!"); let dependencies: DashMap> = sqlx::query!( @@ -231,7 +234,7 @@ async fn build_search_documents( ordering: i64, } - info!("Indexing local gallery!"); + debug!("Indexing local gallery!"); let mods_gallery: DashMap> = sqlx::query!( " @@ -257,7 +260,7 @@ async fn build_search_documents( ) .await?; - info!("Indexing local categories!"); + debug!("Indexing local categories!"); let categories: DashMap> = sqlx::query!( " @@ -280,10 +283,10 @@ async fn build_search_documents( ) .await?; - info!("Indexing local versions!"); + debug!("Indexing local versions!"); let mut versions = index_versions(pool, project_ids.clone()).await?; - info!("Indexing local org owners!"); + debug!("Indexing local org owners!"); let mods_org_owners: DashMap = sqlx::query!( " @@ -308,7 +311,7 @@ async fn build_search_documents( }) .await?; - info!("Indexing local team owners!"); + debug!("Indexing local team owners!"); let mods_team_owners: DashMap = sqlx::query!( " @@ -332,7 +335,7 @@ async fn build_search_documents( }) .await?; - info!("Getting all loader fields!"); + debug!("Getting all loader fields!"); let loader_fields: Vec = sqlx::query!( " SELECT DISTINCT id, field, field_type, enum_type, min_val, max_val, optional @@ -353,7 +356,7 @@ async fn build_search_documents( .await?; let loader_fields: Vec<&QueryLoaderField> = loader_fields.iter().collect(); - info!("Getting all loader field enum values!"); + debug!("Getting all loader field enum values!"); let loader_field_enum_values: Vec = sqlx::query!( @@ -375,7 +378,7 @@ async fn build_search_documents( .try_collect() .await?; - info!("Indexing loaders, project types!"); + debug!("Indexing loaders, project types!"); let mut uploads = Vec::new(); let total_len = db_projects.len(); @@ -384,7 +387,7 @@ async fn build_search_documents( count += 1; if count % 1000 == 0 { - info!("projects index prog: {count}/{total_len}"); + debug!("projects index prog: {count}/{total_len}"); } let Some(( _, From 003ceb991822b3c31cd46ed469b92c334aa886db Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 18:24:10 +0100 Subject: [PATCH 03/11] max batch size --- apps/labrinth/.env.docker-compose | 1 + apps/labrinth/.env.local | 1 + apps/labrinth/src/env.rs | 3 ++- .../src/search/incremental/consume.rs | 13 +++++++---- apps/labrinth/src/search/indexing.rs | 22 ++++++++++--------- 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index 288e1aa0fc..b900b2ec10 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -26,6 +26,7 @@ ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=elastic SEARCH_INDEX_CHUNK_SIZE=5000 SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 +SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE=1000 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 7843c066f8..df34775b5e 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -44,6 +44,7 @@ ELASTICSEARCH_PASSWORD= SEARCH_INDEX_CHUNK_SIZE=5000 SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 +SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE=1000 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index 3001a40d0c..69c5e8e7e6 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -38,7 +38,7 @@ macro_rules! vars { )] let $field: Option<$ty> = { let mut default = None::<$ty>; - $( default = Some({ $default }.into()); )? + $( default = Some(<$ty>::from({ $default })); )? match parse_value::<$ty>(stringify!($field), default) { Ok(value) => Some(value), @@ -161,6 +161,7 @@ vars! { SEARCH_BACKEND: crate::search::SearchBackendKind = crate::search::SearchBackendKind::Typesense; SEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64; SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS: u64 = 5u64; + SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE: usize = 1000usize; TYPESENSE_URL: String = "http://localhost:8108"; TYPESENSE_API_KEY: String = "modrinth"; TYPESENSE_INDEX_PREFIX: String = "labrinth"; diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index cb0d4e1a0e..f74b0dd5c1 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -83,21 +83,24 @@ async fn consume( ); // ..then wait a while for more messages to batch up - // so that we can process a big batch to reindex + // so that we can process a big batch to reindex. + // we stop until either we've reached the max batch size, + // or we've waited enough time - whichever is first. // // do a little trick with an `AsyncFnMut` closure // so that we can explicitly specify the return type - let mut collect_more_messages = async || -> eyre::Result { - loop { + let mut collect_more_messages = async || -> eyre::Result<()> { + while messages.len() < ENV.SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE { let message = consumer .recv() .await .wrap_err("failed to receive Kafka message")?; messages.push(message); } + eyre::Ok(()) }; match tokio::time::timeout(delay, collect_more_messages()).await { - Err(_elapsed) => {} + Ok(Ok(())) | Err(_) => {} Ok(Err(err)) => { return Err( err.wrap_err("failed to receive more Kafka messages") @@ -224,6 +227,8 @@ pub async fn reindex_projects( ) })?; + info!("Fetched all project documents, indexing into backend"); + search_backend.index_documents(&documents).await?; Ok(()) diff --git a/apps/labrinth/src/search/indexing.rs b/apps/labrinth/src/search/indexing.rs index 75bfa3e926..77e334cee0 100644 --- a/apps/labrinth/src/search/indexing.rs +++ b/apps/labrinth/src/search/indexing.rs @@ -161,6 +161,8 @@ pub async fn index_project_documents( .await .wrap_err("failed to fetch project")?; + info!("Fetched partial projects"); + build_search_documents(pool, redis, db_projects).await } @@ -180,7 +182,7 @@ async fn build_search_documents( .await .wrap_err("failed to fetch query context")?; - debug!("Indexing local dependencies!"); + info!("Indexing local dependencies!"); let dependencies: DashMap> = sqlx::query!( @@ -234,7 +236,7 @@ async fn build_search_documents( ordering: i64, } - debug!("Indexing local gallery!"); + info!("Indexing local gallery!"); let mods_gallery: DashMap> = sqlx::query!( " @@ -260,7 +262,7 @@ async fn build_search_documents( ) .await?; - debug!("Indexing local categories!"); + info!("Indexing local categories!"); let categories: DashMap> = sqlx::query!( " @@ -283,10 +285,10 @@ async fn build_search_documents( ) .await?; - debug!("Indexing local versions!"); + info!("Indexing local versions!"); let mut versions = index_versions(pool, project_ids.clone()).await?; - debug!("Indexing local org owners!"); + info!("Indexing local org owners!"); let mods_org_owners: DashMap = sqlx::query!( " @@ -311,7 +313,7 @@ async fn build_search_documents( }) .await?; - debug!("Indexing local team owners!"); + info!("Indexing local team owners!"); let mods_team_owners: DashMap = sqlx::query!( " @@ -335,7 +337,7 @@ async fn build_search_documents( }) .await?; - debug!("Getting all loader fields!"); + info!("Getting all loader fields!"); let loader_fields: Vec = sqlx::query!( " SELECT DISTINCT id, field, field_type, enum_type, min_val, max_val, optional @@ -356,7 +358,7 @@ async fn build_search_documents( .await?; let loader_fields: Vec<&QueryLoaderField> = loader_fields.iter().collect(); - debug!("Getting all loader field enum values!"); + info!("Getting all loader field enum values!"); let loader_field_enum_values: Vec = sqlx::query!( @@ -378,7 +380,7 @@ async fn build_search_documents( .try_collect() .await?; - debug!("Indexing loaders, project types!"); + info!("Indexing loaders, project types!"); let mut uploads = Vec::new(); let total_len = db_projects.len(); @@ -387,7 +389,7 @@ async fn build_search_documents( count += 1; if count % 1000 == 0 { - debug!("projects index prog: {count}/{total_len}"); + info!("projects index prog: {count}/{total_len}"); } let Some(( _, From d4a7b15b0d8b217012aae16f106da06d71f928c8 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 18:39:13 +0100 Subject: [PATCH 04/11] more logging --- .../src/search/backend/typesense/mod.rs | 16 ++- .../src/search/incremental/consume.rs | 3 +- .../create-dummy-projects.cpython-314.pyc | Bin 0 -> 5414 bytes scripts/create-dummy-projects.py | 123 ++++++++++++++++++ 4 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 scripts/__pycache__/create-dummy-projects.cpython-314.pyc create mode 100755 scripts/create-dummy-projects.py diff --git a/apps/labrinth/src/search/backend/typesense/mod.rs b/apps/labrinth/src/search/backend/typesense/mod.rs index 6bbb141bf7..42beb818a5 100644 --- a/apps/labrinth/src/search/backend/typesense/mod.rs +++ b/apps/labrinth/src/search/backend/typesense/mod.rs @@ -7,7 +7,7 @@ use regex::Regex; use reqwest::Method; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::database::PgPool; use crate::database::redis::RedisPool; @@ -1038,21 +1038,35 @@ impl SearchBackend for Typesense { self.config.get_alias_name("projects"), self.config.get_alias_name("projects_filtered"), ] { + debug!("Performing removal on alias {alias:?}"); + let live = self.client.get_alias(&alias).await?; + debug!("Got live alias {live:?}"); + let shadow_alt = self.config.get_next_collection_name(&alias, true); + debug!("Got shadow alt {shadow_alt:?}"); + let shadow_current = self.config.get_next_collection_name(&alias, false); + debug!("Got shadow current {shadow_current:?}"); for collection in live.into_iter().chain([shadow_alt, shadow_current]) { + debug!("Working on collection {collection:?}"); if self.client.collection_exists(&collection).await? { + debug!( + filter_len = filter.len(), + "Collection exists, deleting by filter" + ); self.client .delete_documents_by_filter(&collection, &filter) .await?; } } } + + debug!("Done"); Ok(()) } diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index f74b0dd5c1..58e8a6456a 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -1,6 +1,5 @@ use actix_web::web; use eyre::WrapErr; -use futures::never::Never; use rdkafka::{ Message, consumer::{CommitMode, Consumer, StreamConsumer}, @@ -215,8 +214,10 @@ pub async fn reindex_projects( search_backend: &dyn SearchBackend, project_ids: &[ProjectId], ) -> eyre::Result<()> { + info!("Removing documents for batch"); search_backend.remove_project_documents(project_ids).await?; + info!("Creating project documents"); let documents = index_project_documents(ro_pool, redis_pool, project_ids) .instrument(info_span!("index", batch_size = project_ids.len())) .await diff --git a/scripts/__pycache__/create-dummy-projects.cpython-314.pyc b/scripts/__pycache__/create-dummy-projects.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5100c7d3103b7fd09b5ee18fc966255066873c37 GIT binary patch literal 5414 zcma)AYit|G5#De7}6>&p%KQI z)JUVHG}>qxqb0}Klbpt#NkJ1lNTlnLW)&HzZ6XV`{TYYI9r!LnaX3da zddA*{`m5UlH_>kV_wk=%+F2i|yMj}S6paTmndGsnQY@=w6x|k8Be6_MPD+{-_Ypdi z%_idgF{&yDH?{B|k01RUsyq@wY={&|JK9T%REQ91kr7#u6Zu{wT0}v#LTeN4eY{MC zVMV0`Oof;PXxmVT66-|AV0BKW#hMVZA+a_@L+v#BuDx7YNJns4|_~HR_EUEV7chB*irEL`Lx@GqGqAYFMO6 zs+LGk_UhKSq{ftltR*t(+`-^fLiL(Uu0&Puq?DGFDCn!QI5}+yx){}BQ{F^6E?os7 zhA_2PuT7*AS|XZ^JS!OwLRlhUdbNyK&P%vcb< zg`i4u6gJ8zYL4{#bKkKGM>1(mN^Ab3302OhAUTsB!n>#Z@u(IZ_F_dG+K&6%*O-IT zvNYt4$}(&b#Y_4wshPB0unRfbE`VU#@3#x^K!_fl81hj%jpua+lqIV=E2U$Zxa4DW z{$eJZj)Nu@3w#BgG486QsiPLKCLvvGO!_}=Ap`?`y@YmYL1zU6|NP9_7aSClt^pQK2RcSiq3N-(p|V% zB5kwr66q+AxMIpqi-pR&SGFQs?Tw+I4He?^xx3cA`TF}bo4=aBR7Rw;yUZfio*yeC z!oiien*5~#Tei?t!(#!_!py)=4wUUIX?p}`Krt5A)xtw-*uaxe=i%ML`$QpHB)|$N znGF*rke$f~1} zts!f`zF`pzP5T5#B%Hq-aD0_RwCJSaPEY&6O-UMno$B)v>=c1cjm2Oss$y0Yu#P32is5rt_=hjvM7E5%ad(E+c(`Qu;r_3 zMZ4$_YfL@Kc95`j6Rz3=uCRfmAYV#YU^QQC!s%SKfP1GoXRwQKJdXho04E79P2nOk&-f5 zaKz|rj>&2h{sTIvNOCe7lVF;a6a`#wVg*d{xXu_O-I`5^DXqZcLOLre z(nR8_f?o+foQC5~#f8&`@7-=pL^VxH$(l~V@$rr-Yyx+NZKbQ4iVYaty5ZTqMp5v4 z(j67&9;xbFISNGl5L6YkmPaM4bGEb4Rya1-UKlR&p1k`W?<{oA?_AA4fdI&kl`Mfcu?0}EY6*P${)*{FQ|ih!J33zPH2&CBoCZqJ{;$2)E~UT`e)t&4nX ziFM?^H+!u}HI;Z<$?7TzwPg#kcpf3fLg(Ad4uR_a(t-x1j}?WcC1D`nUt%rSr)H)Kb z7qD@SUz91}`Fw>(6nDMr$6YTyRd5zg+}t)NF4?`yWSjalDF4qbz9TO39oBPrgnMVF zfcqoXBQ@4L0uB8;H59I0o?{1?J9|diV>I*bAOVB-XdAAbM&IQ=wvTzQ!|3m0fF1+E ztEC2dIEFo%h3Jw;--h$dXdt5y+6>KMOevD4_w;-n7X?xD+qszVYNW)tbg&w0zg8;0 z#y?K}AN-be{LBXaP2%zD-3d{UNzobx?_VilBYnXAz-q5L1kO(L9m)>`y5mnC~4 znjb$1IED+c0qll1lN`e|2x7D_32(U>oeClp?lvVt zwq!A~C7d(4!hVw;VoYi9{aMJf+-vrADqGQwxCQ^MVXrl*;a*ctV7J*@Yuk{x6>_)w z&EJ670LZv;vy+IR3$bbao%i{D=jrnz=MlvF0JB+}Z{mlGYo2~Kj>N&nK zIU+NW%&Jpo6)-Zu64e+k#dWUAuCo3@)wt8g)LZOaiJ>CEYQq zOlDI+4d5aEfgm1_z^p;jDY&367@U}>Dgz)&!8xB!O-WZ3>}z%FIxQ((n8Fw;1>lpt zm`Z2{NKl6G%3)j`!Ymdk4O`%}S^?7n)B;C^Z=|!3w@T@_&d6A~I+X-igU_#EN93yk zbW=GeZ5#&!o{BEW%w^>q7Q{={-#V%QXd^GUy-gUzp*7<7Z>bBT263Ro11f4<GQ%en<#d_au=Ue-KJNWzIH-{IL-;ETzhKs_%{MP#{mv@zH zj(o!>Y{PtTdFTGco%@&Bfq$?AZ+AjE4G=8;%B7bt-K}kVw8d(tAGncodp=vPL$vMs z$jrz++J&*ynVB=Vd2;5-5^XEdLW#EE@#&eZF~O2%;Ed6+qs!@h2VVu zZTiN7-7-*NCdy0NW@p8h_I~ksO-fHFGSG1NS4W^K zQv^YLjA}kaR^wmYhiJ=3Xyl)0>qEAVus!T)CE6bzCiW9lStj5$%;xS=jUN!qPq#jx J5WUUV;C~)fnq&X~ literal 0 HcmV?d00001 diff --git a/scripts/create-dummy-projects.py b/scripts/create-dummy-projects.py new file mode 100755 index 0000000000..c55ea25d2e --- /dev/null +++ b/scripts/create-dummy-projects.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import argparse +import json +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed +from uuid import uuid4 + + +def make_body(boundary, slug, index): + data = { + "name": f"Dummy Load {index:04d}", + "slug": slug, + "summary": "A dummy project for local load testing.", + "description": "This project was generated locally for batch indexing tests.", + "initial_versions": [], + "is_draft": True, + "categories": [], + "license_id": "MIT", + } + + payload = json.dumps(data, separators=(",", ":")) + return ( + f"--{boundary}\r\n" + 'Content-Disposition: form-data; name="data"\r\n' + "Content-Type: application/json\r\n\r\n" + f"{payload}\r\n" + f"--{boundary}--\r\n" + ).encode() + + +def create_project(base_url, token, boundary, prefix, index, retries): + slug = f"{prefix}-{index:04d}" + body = make_body(boundary, slug, index) + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": f"multipart/form-data; boundary={boundary}", + } + + for attempt in range(retries + 1): + req = urllib.request.Request( + f"{base_url}/v3/project", + data=body, + headers=headers, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=60) as resp: + resp.read() + return True, slug, resp.status, "" + except urllib.error.HTTPError as err: + text = err.read().decode("utf-8", errors="replace") + if err.code < 500 or attempt == retries: + return False, slug, err.code, text + except Exception as err: + if attempt == retries: + return False, slug, "error", repr(err) + + time.sleep(min(2**attempt, 10)) + + raise RuntimeError("unreachable") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--base-url", default="http://localhost:8000") + parser.add_argument("--token", default="mra_admin") + parser.add_argument("--count", type=int, default=1000) + parser.add_argument("--concurrency", type=int, default=2) + parser.add_argument("--retries", type=int, default=5) + args = parser.parse_args() + + boundary = "----modrinth-dummy-project-boundary" + prefix = f"dummy-load-{int(time.time())}-{uuid4().hex[:6]}" + + ok = 0 + failures = [] + + with ThreadPoolExecutor(max_workers=args.concurrency) as executor: + futures = [ + executor.submit( + create_project, + args.base_url, + args.token, + boundary, + prefix, + index, + args.retries, + ) + for index in range(args.count) + ] + + for completed, future in enumerate(as_completed(futures), 1): + success, slug, status, text = future.result() + if success: + ok += 1 + else: + failures.append((slug, status, text[:500])) + + if completed % 50 == 0: + print( + f"completed={completed} created={ok} failed={len(failures)}", + flush=True, + ) + + print( + json.dumps( + { + "prefix": prefix, + "attempted": args.count, + "created": ok, + "failed": len(failures), + "failures": failures[:20], + }, + indent=2, + ) + ) + + +if __name__ == "__main__": + main() From 534c63e66491fdf1bf03c07fe21a8d8215349a21 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 19:10:00 +0100 Subject: [PATCH 05/11] parallelize remove tasks --- apps/labrinth/src/env.rs | 1 + .../src/search/backend/typesense/mod.rs | 121 +++++++++++++++--- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index 69c5e8e7e6..20c6472316 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -165,6 +165,7 @@ vars! { TYPESENSE_URL: String = "http://localhost:8108"; TYPESENSE_API_KEY: String = "modrinth"; TYPESENSE_INDEX_PREFIX: String = "labrinth"; + TYPESENSE_DELETE_BATCH_SIZE: usize = 10_000usize; // storage STORAGE_BACKEND: crate::file_hosting::FileHostKind = crate::file_hosting::FileHostKind::Local; diff --git a/apps/labrinth/src/search/backend/typesense/mod.rs b/apps/labrinth/src/search/backend/typesense/mod.rs index 42beb818a5..0f39656db2 100644 --- a/apps/labrinth/src/search/backend/typesense/mod.rs +++ b/apps/labrinth/src/search/backend/typesense/mod.rs @@ -32,6 +32,7 @@ pub struct TypesenseConfig { pub index_prefix: String, pub meta_namespace: String, pub index_chunk_size: i64, + pub delete_batch_size: usize, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -160,6 +161,7 @@ impl TypesenseConfig { index_prefix: ENV.TYPESENSE_INDEX_PREFIX.clone(), meta_namespace: meta_namespace.unwrap_or_default(), index_chunk_size: ENV.SEARCH_INDEX_CHUNK_SIZE, + delete_batch_size: ENV.TYPESENSE_DELETE_BATCH_SIZE, } } @@ -323,12 +325,13 @@ impl TypesenseClient { &self, collection: &str, filter_by: &str, + batch_size: usize, ) -> Result<()> { let resp = self .request( Method::DELETE, &format!( - "/collections/{collection}/documents?filter_by={}&batch_size=1000", + "/collections/{collection}/documents?filter_by={}&batch_size={batch_size}", urlencoding::encode(filter_by) ), ) @@ -721,6 +724,24 @@ impl Typesense { self.client.upsert_alias(alias, &name).await?; Ok(()) } + + async fn delete_documents_by_filter_if_exists( + &self, + collection: &str, + filter: &str, + ) -> Result<()> { + if self.client.collection_exists(collection).await? { + self.client + .delete_documents_by_filter( + collection, + filter, + self.config.delete_batch_size, + ) + .await?; + } + + Ok(()) + } } #[async_trait] @@ -1050,20 +1071,53 @@ impl SearchBackend for Typesense { self.config.get_next_collection_name(&alias, false); debug!("Got shadow current {shadow_current:?}"); - for collection in - live.into_iter().chain([shadow_alt, shadow_current]) - { - debug!("Working on collection {collection:?}"); - if self.client.collection_exists(&collection).await? { + let delete_live = async { + if let Some(collection) = live.as_deref() { + debug!("Working on collection {collection:?}"); debug!( filter_len = filter.len(), "Collection exists, deleting by filter" ); - self.client - .delete_documents_by_filter(&collection, &filter) - .await?; + self.delete_documents_by_filter_if_exists( + collection, &filter, + ) + .await?; } - } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_alt = async { + if live.as_deref() != Some(shadow_alt.as_str()) { + debug!("Working on collection {shadow_alt:?}"); + self.delete_documents_by_filter_if_exists( + &shadow_alt, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_current = async { + if live.as_deref() != Some(shadow_current.as_str()) { + debug!("Working on collection {shadow_current:?}"); + self.delete_documents_by_filter_if_exists( + &shadow_current, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let (live_result, shadow_alt_result, shadow_current_result) = tokio::join!( + delete_live, + delete_shadow_alt, + delete_shadow_current + ); + live_result?; + shadow_alt_result?; + shadow_current_result?; } debug!("Done"); @@ -1092,15 +1146,46 @@ impl SearchBackend for Typesense { let shadow_current = self.config.get_next_collection_name(&alias, false); - for collection in - live.into_iter().chain([shadow_alt, shadow_current]) - { - if self.client.collection_exists(&collection).await? { - self.client - .delete_documents_by_filter(&collection, &filter) - .await?; + let delete_live = async { + if let Some(collection) = live.as_deref() { + self.delete_documents_by_filter_if_exists( + collection, &filter, + ) + .await?; } - } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_alt = async { + if live.as_deref() != Some(shadow_alt.as_str()) { + self.delete_documents_by_filter_if_exists( + &shadow_alt, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_current = async { + if live.as_deref() != Some(shadow_current.as_str()) { + self.delete_documents_by_filter_if_exists( + &shadow_current, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let (live_result, shadow_alt_result, shadow_current_result) = tokio::join!( + delete_live, + delete_shadow_alt, + delete_shadow_current + ); + live_result?; + shadow_alt_result?; + shadow_current_result?; } Ok(()) } From 3b771d7e8348674beb0bacae30c79e94deb83f0b Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 19:39:25 +0100 Subject: [PATCH 06/11] delete/upsert project/version messages --- apps/labrinth/src/queue/server_ping.rs | 20 ++- .../routes/internal/moderation/tech_review.rs | 1 + apps/labrinth/src/routes/v3/organizations.rs | 3 + .../src/routes/v3/project_creation.rs | 2 + .../src/routes/v3/project_creation/new.rs | 1 + apps/labrinth/src/routes/v3/projects.rs | 58 ++++---- .../src/routes/v3/version_creation.rs | 4 +- apps/labrinth/src/routes/v3/versions.rs | 2 + apps/labrinth/src/search/incremental.rs | 130 +++++++++++++--- .../src/search/incremental/consume.rs | 140 ++++++++++++++++-- apps/labrinth/src/search/indexing.rs | 2 +- 11 files changed, 298 insertions(+), 65 deletions(-) diff --git a/apps/labrinth/src/queue/server_ping.rs b/apps/labrinth/src/queue/server_ping.rs index 5b6305961b..95359701d2 100644 --- a/apps/labrinth/src/queue/server_ping.rs +++ b/apps/labrinth/src/queue/server_ping.rs @@ -1,9 +1,9 @@ use crate::database::DBProject; -use crate::database::models::DBProjectId; +use crate::database::models::{DBProjectId, DBVersionId}; use crate::database::redis::RedisPool; use crate::env::ENV; use crate::models::exp; -use crate::models::ids::ProjectId; +use crate::models::ids::{ProjectId, VersionId}; use crate::models::projects::ProjectStatus; use crate::search::incremental::IncrementalSearchQueue; use crate::{database::PgPool, util::error::Context}; @@ -175,14 +175,26 @@ impl ServerPingQueue { } if updated_project { + let version_ids = sqlx::query_scalar!( + "SELECT id FROM versions WHERE mod_id = $1", + DBProjectId::from(*project_id) as DBProjectId, + ) + .fetch_all(&self.db) + .await + .wrap_err("failed to fetch project version IDs")? + .into_iter() + .map(|version_id| VersionId::from(DBVersionId(version_id))) + .collect::>(); + let clear_cache = DBProject::clear_cache( (*project_id).into(), None, None, &self.redis, ); - let queue_search = - self.incremental_search_queue.push(*project_id); + let queue_search = self + .incremental_search_queue + .push(*project_id, version_ids); let (clear_cache_result, _) = join(clear_cache, queue_search).await; diff --git a/apps/labrinth/src/routes/internal/moderation/tech_review.rs b/apps/labrinth/src/routes/internal/moderation/tech_review.rs index cfa609a98e..1ccdbafb49 100644 --- a/apps/labrinth/src/routes/internal/moderation/tech_review.rs +++ b/apps/labrinth/src/routes/internal/moderation/tech_review.rs @@ -1142,6 +1142,7 @@ async fn submit_report( if verdict == DelphiVerdict::Unsafe { crate::routes::v3::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, diff --git a/apps/labrinth/src/routes/v3/organizations.rs b/apps/labrinth/src/routes/v3/organizations.rs index b59e74cefd..b27b3ce4dc 100644 --- a/apps/labrinth/src/routes/v3/organizations.rs +++ b/apps/labrinth/src/routes/v3/organizations.rs @@ -802,6 +802,7 @@ pub async fn organization_delete( for project_id in organization_project_ids { super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, @@ -969,6 +970,7 @@ pub async fn organization_projects_add( ) .await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1159,6 +1161,7 @@ pub async fn organization_projects_remove( ) .await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, diff --git a/apps/labrinth/src/routes/v3/project_creation.rs b/apps/labrinth/src/routes/v3/project_creation.rs index 1088302d2f..5e239543e7 100644 --- a/apps/labrinth/src/routes/v3/project_creation.rs +++ b/apps/labrinth/src/routes/v3/project_creation.rs @@ -349,6 +349,7 @@ pub async fn project_create_internal( } else { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, project_id.into(), @@ -407,6 +408,7 @@ pub async fn project_create_with_id( } else { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, project_id.into(), diff --git a/apps/labrinth/src/routes/v3/project_creation/new.rs b/apps/labrinth/src/routes/v3/project_creation/new.rs index 877ad9a70d..d3d025a1ee 100644 --- a/apps/labrinth/src/routes/v3/project_creation/new.rs +++ b/apps/labrinth/src/routes/v3/project_creation/new.rs @@ -342,6 +342,7 @@ pub async fn create( .wrap_internal_err("failed to commit transaction")?; super::super::projects::clear_project_cache_and_queue_search( + &db, &redis, &search_state, project_id.into(), diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 902f62122f..07298a9c25 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -75,6 +75,7 @@ pub fn utoipa_config( } pub async fn clear_project_cache_and_queue_search( + pool: &PgPool, redis: &RedisPool, search_state: &SearchState, project_id: db_ids::DBProjectId, @@ -88,7 +89,21 @@ pub async fn clear_project_cache_and_queue_search( redis, ) .await?; - search_state.queue.push(project_id.into()).await; + let version_ids = sqlx::query_scalar!( + "SELECT id FROM versions WHERE mod_id = $1", + project_id as db_ids::DBProjectId, + ) + .fetch_all(pool) + .await + .wrap_internal_err("failed to fetch project version IDs")? + .into_iter() + .map(|version_id| VersionId::from(db_ids::DBVersionId(version_id))) + .collect::>(); + + search_state + .queue + .push(project_id.into(), version_ids) + .await; Ok(()) } @@ -1135,6 +1150,7 @@ pub async fn project_edit_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1149,16 +1165,9 @@ pub async fn project_edit_internal( new_project.status.map(|status| status.is_searchable()), ) { search_state - .backend - .remove_documents( - &project_item - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - ) - .await - .wrap_internal_err("failed to remove documents")?; + .queue + .push_project_removal(project_item.inner.id.into()) + .await; } Ok(HttpResponse::NoContent().body("")) @@ -1640,6 +1649,7 @@ pub async fn projects_edit( for (project_id, slug) in changed_projects { clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, @@ -1862,6 +1872,7 @@ pub async fn project_icon_edit_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1977,6 +1988,7 @@ pub async fn delete_project_icon_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2164,6 +2176,7 @@ pub async fn add_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2370,6 +2383,7 @@ pub async fn edit_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2510,6 +2524,7 @@ pub async fn delete_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2652,27 +2667,18 @@ pub async fn project_delete_internal( .await .wrap_internal_err("failed to commit transaction")?; - search_state - .backend - .remove_documents( - &project - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - ) - .await - .wrap_internal_err("failed to remove project version documents")?; - if result.is_some() { - clear_project_cache_and_queue_search( - &redis, - &search_state, + db_models::DBProject::clear_cache( project.inner.id, project.inner.slug, None, + &redis, ) .await?; + search_state + .queue + .push_project_removal(project.inner.id.into()) + .await; Ok(()) } else { Err(ApiError::NotFound) diff --git a/apps/labrinth/src/routes/v3/version_creation.rs b/apps/labrinth/src/routes/v3/version_creation.rs index 9749efee3c..f9746a9a63 100644 --- a/apps/labrinth/src/routes/v3/version_creation.rs +++ b/apps/labrinth/src/routes/v3/version_creation.rs @@ -148,6 +148,7 @@ pub async fn version_create( } else if let Ok((_, project_id)) = &result { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, *project_id, @@ -565,7 +566,7 @@ pub async fn upload_file_to_version( let result = upload_file_to_version_inner( req, &mut payload, - client, + client.clone(), &mut transaction, redis.clone(), &**file_host, @@ -591,6 +592,7 @@ pub async fn upload_file_to_version( } else if let Ok((_, project_id)) = &result { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, *project_id, diff --git a/apps/labrinth/src/routes/v3/versions.rs b/apps/labrinth/src/routes/v3/versions.rs index 0e322cde51..155e754d67 100644 --- a/apps/labrinth/src/routes/v3/versions.rs +++ b/apps/labrinth/src/routes/v3/versions.rs @@ -762,6 +762,7 @@ pub async fn version_edit_helper( database::models::DBVersion::clear_cache(&version_item, &redis) .await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, version_item.inner.project_id, @@ -1098,6 +1099,7 @@ pub async fn version_delete( transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, version.inner.project_id, diff --git a/apps/labrinth/src/search/incremental.rs b/apps/labrinth/src/search/incremental.rs index 88a9f1d6a2..1c57a57e9f 100644 --- a/apps/labrinth/src/search/incremental.rs +++ b/apps/labrinth/src/search/incremental.rs @@ -1,13 +1,18 @@ pub mod consume; -use std::{collections::HashSet, mem, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + mem, + sync::Arc, + time::Duration, +}; use rdkafka::{producer::FutureRecord, util::Timeout}; use serde::Serialize; use tokio::sync::Mutex; use crate::{ - models::ids::ProjectId, + models::ids::{ProjectId, VersionId}, util::kafka::{KAFKA_OPERATION_INTERVAL, KafkaClientState, KafkaEvent}, }; @@ -17,20 +22,36 @@ const QUEUE_FLUSH_INTERVAL: Duration = Duration::from_secs(10); #[derive(Clone)] pub struct IncrementalSearchQueue { - project_ids: Arc>>, + operations: Arc>, kafka_client: actix_web::web::Data, } impl IncrementalSearchQueue { pub fn new(kafka_client: actix_web::web::Data) -> Self { Self { - project_ids: Arc::new(Mutex::new(HashSet::new())), + operations: Arc::new(Mutex::new( + PendingSearchIndexOperations::default(), + )), kafka_client, } } - pub async fn push(&self, project_id: ProjectId) { - self.project_ids.lock().await.insert(project_id); + pub async fn push( + &self, + project_id: ProjectId, + version_ids: impl IntoIterator, + ) { + self.operations + .lock() + .await + .push_project_change(project_id, version_ids); + } + + pub async fn push_project_removal(&self, project_id: ProjectId) { + self.operations + .lock() + .await + .push_project_removal(project_id); } pub async fn run(self) { @@ -46,20 +67,20 @@ impl IncrementalSearchQueue { } pub async fn drain(&self) -> eyre::Result<()> { - let project_ids = { - let mut project_ids = self.project_ids.lock().await; - mem::take(&mut *project_ids) + let operations = { + let mut operations = self.operations.lock().await; + mem::take(&mut *operations) }; - if project_ids.is_empty() { + if operations.is_empty() { return Ok(()); } - let mut project_ids = project_ids.into_iter(); - while let Some(project_id) = project_ids.next() { + let mut operations = operations.into_events().into_iter(); + while let Some(operation) = operations.next() { let event = KafkaEvent::new( SEARCH_PROJECT_INDEX_QUEUE_TOPIC, - SearchProjectIndexQueueEventData { project_id }, + operation.clone(), ); let event_id = event.event_metadata.event_id; let key = event_id.to_string(); @@ -74,9 +95,11 @@ impl IncrementalSearchQueue { .send(record, Timeout::After(KAFKA_OPERATION_INTERVAL)) .await { - let mut queued_project_ids = self.project_ids.lock().await; - queued_project_ids.insert(project_id); - queued_project_ids.extend(project_ids); + let mut queued_operations = self.operations.lock().await; + queued_operations.push_event(operation); + for operation in operations { + queued_operations.push_event(operation); + } return Err(err.into()); } @@ -86,7 +109,76 @@ impl IncrementalSearchQueue { } } -#[derive(Debug, Serialize)] -pub struct SearchProjectIndexQueueEventData { - pub project_id: ProjectId, +#[derive(Default)] +struct PendingSearchIndexOperations { + changed_projects: HashMap>, + removed_project_ids: HashSet, +} + +impl PendingSearchIndexOperations { + fn is_empty(&self) -> bool { + self.changed_projects.is_empty() && self.removed_project_ids.is_empty() + } + + fn push_project_change( + &mut self, + project_id: ProjectId, + version_ids: impl IntoIterator, + ) { + if !self.removed_project_ids.contains(&project_id) { + self.changed_projects + .entry(project_id) + .or_default() + .extend(version_ids); + } + } + + fn push_project_removal(&mut self, project_id: ProjectId) { + self.changed_projects.remove(&project_id); + self.removed_project_ids.insert(project_id); + } + + fn push_event(&mut self, event: SearchProjectIndexQueueEventData) { + match event { + SearchProjectIndexQueueEventData::ProjectChange { + project_id, + version_ids, + } => self.push_project_change(project_id, version_ids), + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { + self.push_project_removal(project_id) + } + } + } + + fn into_events(self) -> Vec { + let mut events = Vec::with_capacity( + self.changed_projects.len() + self.removed_project_ids.len(), + ); + + events.extend(self.removed_project_ids.into_iter().map(|project_id| { + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } + })); + events.extend(self.changed_projects.into_iter().map( + |(project_id, version_ids)| { + SearchProjectIndexQueueEventData::ProjectChange { + project_id, + version_ids: version_ids.into_iter().collect(), + } + }, + )); + + events + } +} + +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SearchProjectIndexQueueEventData { + ProjectChange { + project_id: ProjectId, + version_ids: Vec, + }, + ProjectRemoval { + project_id: ProjectId, + }, } diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index 58e8a6456a..9713185c8a 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -15,7 +15,7 @@ use tracing::{Instrument, info, info_span}; use crate::{ database::{PgPool, redis::RedisPool}, env::ENV, - models::ids::ProjectId, + models::ids::{ProjectId, VersionId}, search::{ SearchBackend, incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, indexing::index_project_documents, @@ -128,8 +128,9 @@ async fn consume_batch( ) -> eyre::Result<()> { let start = Instant::now(); - let mut project_ids = Vec::new(); - let mut seen_project_ids = HashSet::new(); + let mut project_ids_to_change = HashSet::new(); + let mut project_ids_to_remove = HashSet::new(); + let mut version_ids_to_change = HashSet::new(); let mut messages_to_commit = Vec::new(); for message in messages { @@ -164,24 +165,94 @@ async fn consume_batch( } }; - if seen_project_ids.insert(event.project_id) { - project_ids.push(event.project_id); + match event.into_data() { + SearchProjectIndexQueueEventData::ProjectChange { + project_id, + version_ids, + } => { + project_ids_to_change.insert(project_id); + version_ids_to_change.extend(version_ids); + } + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { + project_ids_to_remove.insert(project_id); + } } messages_to_commit.push(message); } + project_ids_to_change + .retain(|project_id| !project_ids_to_remove.contains(project_id)); + + let project_ids_to_change = + project_ids_to_change.into_iter().collect::>(); + let project_ids_to_remove = + project_ids_to_remove.into_iter().collect::>(); + let version_ids_to_change = + version_ids_to_change.into_iter().collect::>(); + info!( kafka.message_count = messages_to_commit.len(), - "Read all Kafka messages in {:.2?}, found {} projects to reindex", + "Read all Kafka messages in {:.2?}, found {} projects to change, {} versions to change, and {} projects to remove", start.elapsed(), - project_ids.len(), + project_ids_to_change.len(), + version_ids_to_change.len(), + project_ids_to_remove.len(), ); let start = Instant::now(); - if !project_ids.is_empty() { - reindex_projects(ro_pool, redis_pool, search_backend, &project_ids) + if !project_ids_to_remove.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_to_remove.len(), + "Removing project documents" + ); + search_backend + .remove_project_documents(&project_ids_to_remove) + .await + .wrap_err("failed to remove project documents")?; + info!( + project_count = project_ids_to_remove.len(), + "Removed project documents in {:.2?}", + operation_start.elapsed() + ); + } + + if !version_ids_to_change.is_empty() { + let operation_start = Instant::now(); + info!( + version_count = version_ids_to_change.len(), + "Removing changed version documents" + ); + search_backend + .remove_documents(&version_ids_to_change) .await - .wrap_err("failed to reindex project batch")?; + .wrap_err("failed to remove changed version documents")?; + info!( + version_count = version_ids_to_change.len(), + "Removed changed version documents in {:.2?}", + operation_start.elapsed() + ); + } + + if !project_ids_to_change.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_to_change.len(), + "Indexing changed projects" + ); + index_changed_projects( + ro_pool, + redis_pool, + search_backend, + &project_ids_to_change, + ) + .await + .wrap_err("failed to index changed project batch")?; + info!( + project_count = project_ids_to_change.len(), + "Indexed changed projects in {:.2?}", + operation_start.elapsed() + ); } for message in messages_to_commit { @@ -191,8 +262,9 @@ async fn consume_batch( } info!( - "Reindexed {} projects in {:.2?}", - project_ids.len(), + "Changed {} projects and removed {} projects in {:.2?}", + project_ids_to_change.len(), + project_ids_to_remove.len(), start.elapsed() ); @@ -218,6 +290,18 @@ pub async fn reindex_projects( search_backend.remove_project_documents(project_ids).await?; info!("Creating project documents"); + index_changed_projects(ro_pool, redis_pool, search_backend, project_ids) + .await?; + + Ok(()) +} + +async fn index_changed_projects( + ro_pool: &PgPool, + redis_pool: &RedisPool, + search_backend: &dyn SearchBackend, + project_ids: &[ProjectId], +) -> eyre::Result<()> { let documents = index_project_documents(ro_pool, redis_pool, project_ids) .instrument(info_span!("index", batch_size = project_ids.len())) .await @@ -236,6 +320,34 @@ pub async fn reindex_projects( } #[derive(Debug, Deserialize)] -struct SearchProjectIndexQueueEvent { - project_id: ProjectId, +#[serde(untagged)] +enum SearchProjectIndexQueueEvent { + Current(SearchProjectIndexQueueEventData), + Legacy { project_id: ProjectId }, +} + +impl SearchProjectIndexQueueEvent { + fn into_data(self) -> SearchProjectIndexQueueEventData { + match self { + Self::Current(data) => data, + Self::Legacy { project_id } => { + SearchProjectIndexQueueEventData::ProjectChange { + project_id, + version_ids: Vec::new(), + } + } + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum SearchProjectIndexQueueEventData { + ProjectChange { + project_id: ProjectId, + version_ids: Vec, + }, + ProjectRemoval { + project_id: ProjectId, + }, } diff --git a/apps/labrinth/src/search/indexing.rs b/apps/labrinth/src/search/indexing.rs index 77e334cee0..75e564722d 100644 --- a/apps/labrinth/src/search/indexing.rs +++ b/apps/labrinth/src/search/indexing.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use regex::Regex; use std::collections::HashMap; use std::sync::LazyLock; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use crate::database::PgPool; use crate::database::models::loader_fields::{ From 876af7de3829b7ab713b28d4b1de34c7636adb86 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 19:46:10 +0100 Subject: [PATCH 07/11] prepare --- ...51f87424ee39aac49cae5575c0d3313fb7f24.json | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json diff --git a/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json b/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json new file mode 100644 index 0000000000..59ab74bde2 --- /dev/null +++ b/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM versions WHERE mod_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24" +} From d78883c130f227ea2c155faaaba10c69186f83d6 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 20:13:21 +0100 Subject: [PATCH 08/11] more logging --- apps/labrinth/src/search/backend/typesense/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/labrinth/src/search/backend/typesense/mod.rs b/apps/labrinth/src/search/backend/typesense/mod.rs index 0f39656db2..4ba421614f 100644 --- a/apps/labrinth/src/search/backend/typesense/mod.rs +++ b/apps/labrinth/src/search/backend/typesense/mod.rs @@ -1026,10 +1026,15 @@ impl SearchBackend for Typesense { let shadow_current = self.config.get_next_collection_name(&alias, false); + debug!( + "Inserting into alias {alias:?}, live {live:?}, shadow alt {shadow_alt:?}, shadow current {shadow_current:?}" + ); + for collection in live.into_iter().chain([shadow_alt, shadow_current]) { if self.client.collection_exists(&collection).await? { + debug!("Inserting into existing collection {collection:?}"); self.client .import_documents(&collection, jsonl.clone()) .await?; @@ -1037,6 +1042,7 @@ impl SearchBackend for Typesense { } } + debug!("Done importing"); Ok(()) } From 4b7905488b3bfccee2d9cdb40cb615cd6b7f3c94 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Fri, 26 Jun 2026 20:15:33 +0100 Subject: [PATCH 09/11] log number of docs --- apps/labrinth/src/search/backend/typesense/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/labrinth/src/search/backend/typesense/mod.rs b/apps/labrinth/src/search/backend/typesense/mod.rs index 4ba421614f..a6b2d69f6c 100644 --- a/apps/labrinth/src/search/backend/typesense/mod.rs +++ b/apps/labrinth/src/search/backend/typesense/mod.rs @@ -1016,6 +1016,7 @@ impl SearchBackend for Typesense { return Ok(()); } + let num_documents = documents.len(); let jsonl = documents_to_jsonl(documents)?; for alias in [ self.config.get_alias_name("projects"), @@ -1027,7 +1028,12 @@ impl SearchBackend for Typesense { self.config.get_next_collection_name(&alias, false); debug!( - "Inserting into alias {alias:?}, live {live:?}, shadow alt {shadow_alt:?}, shadow current {shadow_current:?}" + ?alias, + ?live, + ?shadow_alt, + ?shadow_current, + num_documents, + "Inserting into alias", ); for collection in From 674862602291047f588fc1d0a5e8b31e5cbad969 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Sat, 27 Jun 2026 14:27:51 +0100 Subject: [PATCH 10/11] try more targeted, homogenous version change ops --- apps/labrinth/src/routes/v3/projects.rs | 34 +++++-- .../src/routes/v3/version_creation.rs | 26 ++++-- apps/labrinth/src/routes/v3/versions.rs | 8 +- apps/labrinth/src/search/incremental.rs | 49 +++++++--- .../src/search/incremental/consume.rs | 93 ++++++++++++++++--- apps/labrinth/src/search/indexing.rs | 67 ++++++++++++- 6 files changed, 229 insertions(+), 48 deletions(-) diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 07298a9c25..ea3d9cda3b 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -82,13 +82,6 @@ pub async fn clear_project_cache_and_queue_search( slug: Option, clear_dependencies: Option, ) -> Result<(), ApiError> { - db_models::DBProject::clear_cache( - project_id, - slug, - clear_dependencies, - redis, - ) - .await?; let version_ids = sqlx::query_scalar!( "SELECT id FROM versions WHERE mod_id = $1", project_id as db_ids::DBProjectId, @@ -100,6 +93,33 @@ pub async fn clear_project_cache_and_queue_search( .map(|version_id| VersionId::from(db_ids::DBVersionId(version_id))) .collect::>(); + clear_project_cache_and_queue_search_versions( + redis, + search_state, + project_id, + slug, + clear_dependencies, + version_ids, + ) + .await +} + +pub async fn clear_project_cache_and_queue_search_versions( + redis: &RedisPool, + search_state: &SearchState, + project_id: db_ids::DBProjectId, + slug: Option, + clear_dependencies: Option, + version_ids: impl IntoIterator, +) -> Result<(), ApiError> { + db_models::DBProject::clear_cache( + project_id, + slug, + clear_dependencies, + redis, + ) + .await?; + search_state .queue .push(project_id.into(), version_ids) diff --git a/apps/labrinth/src/routes/v3/version_creation.rs b/apps/labrinth/src/routes/v3/version_creation.rs index f9746a9a63..a1a3424418 100644 --- a/apps/labrinth/src/routes/v3/version_creation.rs +++ b/apps/labrinth/src/routes/v3/version_creation.rs @@ -145,20 +145,20 @@ pub async fn version_create( if let Err(e) = rollback_result { return Err(e.into()); } - } else if let Ok((_, project_id)) = &result { + } else if let Ok((_, project_id, version_id)) = &result { transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( - &client, + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, *project_id, None, Some(true), + [VersionId::from(*version_id)], ) .await?; } - result.map(|(response, _)| response) + result.map(|(response, _, _)| response) } #[allow(clippy::too_many_arguments)] @@ -173,7 +173,8 @@ async fn version_create_inner( session_queue: &AuthQueue, moderation_queue: &AutomatedModerationQueue, http: &reqwest::Client, -) -> Result<(HttpResponse, models::DBProjectId), CreateError> { +) -> Result<(HttpResponse, models::DBProjectId, models::DBVersionId), CreateError> +{ let mut initial_version_data = None; let mut version_builder = None; let mut selected_loaders = None; @@ -544,7 +545,11 @@ async fn version_create_inner( moderation_queue.projects.insert(project_id.into()); } - Ok((HttpResponse::Ok().json(response), project_id)) + Ok(( + HttpResponse::Ok().json(response), + project_id, + models::DBVersionId::from(version_id), + )) } pub async fn upload_file_to_version( @@ -561,7 +566,8 @@ pub async fn upload_file_to_version( let mut transaction = client.begin().await?; let mut uploaded_files = Vec::new(); - let version_id = models::DBVersionId::from(url_data.into_inner().0); + let version_id = url_data.into_inner().0; + let db_version_id = models::DBVersionId::from(version_id); let result = upload_file_to_version_inner( req, @@ -571,7 +577,7 @@ pub async fn upload_file_to_version( redis.clone(), &**file_host, &mut uploaded_files, - version_id, + db_version_id, &session_queue, &http, ) @@ -591,13 +597,13 @@ pub async fn upload_file_to_version( } } else if let Ok((_, project_id)) = &result { transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( - &client, + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, *project_id, None, Some(true), + [version_id], ) .await?; } diff --git a/apps/labrinth/src/routes/v3/versions.rs b/apps/labrinth/src/routes/v3/versions.rs index 155e754d67..c9b8cda4b3 100644 --- a/apps/labrinth/src/routes/v3/versions.rs +++ b/apps/labrinth/src/routes/v3/versions.rs @@ -761,13 +761,13 @@ pub async fn version_edit_helper( transaction.commit().await?; database::models::DBVersion::clear_cache(&version_item, &redis) .await?; - super::projects::clear_project_cache_and_queue_search( - &pool, + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, version_item.inner.project_id, None, Some(true), + [VersionId::from(version_item.inner.id)], ) .await?; Ok(HttpResponse::NoContent().body("")) @@ -1098,13 +1098,13 @@ pub async fn version_delete( transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( - &pool, + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, version.inner.project_id, None, Some(true), + [VersionId::from(version.inner.id)], ) .await?; search_backend diff --git a/apps/labrinth/src/search/incremental.rs b/apps/labrinth/src/search/incremental.rs index 1c57a57e9f..b5ba4f0d0a 100644 --- a/apps/labrinth/src/search/incremental.rs +++ b/apps/labrinth/src/search/incremental.rs @@ -111,13 +111,16 @@ impl IncrementalSearchQueue { #[derive(Default)] struct PendingSearchIndexOperations { - changed_projects: HashMap>, + changed_project_ids: HashSet, + changed_project_versions: HashMap>, removed_project_ids: HashSet, } impl PendingSearchIndexOperations { fn is_empty(&self) -> bool { - self.changed_projects.is_empty() && self.removed_project_ids.is_empty() + self.changed_project_ids.is_empty() + && self.changed_project_versions.is_empty() + && self.removed_project_ids.is_empty() } fn push_project_change( @@ -126,24 +129,38 @@ impl PendingSearchIndexOperations { version_ids: impl IntoIterator, ) { if !self.removed_project_ids.contains(&project_id) { - self.changed_projects - .entry(project_id) - .or_default() - .extend(version_ids); + let version_ids = version_ids.into_iter().collect::>(); + if version_ids.is_empty() { + self.changed_project_versions.remove(&project_id); + self.changed_project_ids.insert(project_id); + } else if !self.changed_project_ids.contains(&project_id) { + self.changed_project_versions + .entry(project_id) + .or_default() + .extend(version_ids); + } } } fn push_project_removal(&mut self, project_id: ProjectId) { - self.changed_projects.remove(&project_id); + self.changed_project_ids.remove(&project_id); + self.changed_project_versions.remove(&project_id); self.removed_project_ids.insert(project_id); } fn push_event(&mut self, event: SearchProjectIndexQueueEventData) { match event { - SearchProjectIndexQueueEventData::ProjectChange { + SearchProjectIndexQueueEventData::ProjectChange { project_id } => { + self.push_project_change(project_id, []) + } + SearchProjectIndexQueueEventData::ProjectVersionChange { project_id, version_ids, - } => self.push_project_change(project_id, version_ids), + } => { + if !version_ids.is_empty() { + self.push_project_change(project_id, version_ids) + } + } SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { self.push_project_removal(project_id) } @@ -152,15 +169,20 @@ impl PendingSearchIndexOperations { fn into_events(self) -> Vec { let mut events = Vec::with_capacity( - self.changed_projects.len() + self.removed_project_ids.len(), + self.changed_project_ids.len() + + self.changed_project_versions.len() + + self.removed_project_ids.len(), ); events.extend(self.removed_project_ids.into_iter().map(|project_id| { SearchProjectIndexQueueEventData::ProjectRemoval { project_id } })); - events.extend(self.changed_projects.into_iter().map( + events.extend(self.changed_project_ids.into_iter().map(|project_id| { + SearchProjectIndexQueueEventData::ProjectChange { project_id } + })); + events.extend(self.changed_project_versions.into_iter().map( |(project_id, version_ids)| { - SearchProjectIndexQueueEventData::ProjectChange { + SearchProjectIndexQueueEventData::ProjectVersionChange { project_id, version_ids: version_ids.into_iter().collect(), } @@ -176,6 +198,9 @@ impl PendingSearchIndexOperations { pub enum SearchProjectIndexQueueEventData { ProjectChange { project_id: ProjectId, + }, + ProjectVersionChange { + project_id: ProjectId, version_ids: Vec, }, ProjectRemoval { diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index 9713185c8a..5128712bc8 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -17,8 +17,9 @@ use crate::{ env::ENV, models::ids::{ProjectId, VersionId}, search::{ - SearchBackend, incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, - indexing::index_project_documents, + SearchBackend, + incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, + indexing::{index_project_documents, index_project_version_documents}, }, util::kafka::{ INCREMENTAL_INDEX_SEARCH_TASK, KAFKA_OPERATION_INTERVAL, @@ -129,6 +130,7 @@ async fn consume_batch( let start = Instant::now(); let mut project_ids_to_change = HashSet::new(); + let mut project_ids_with_version_changes = HashSet::new(); let mut project_ids_to_remove = HashSet::new(); let mut version_ids_to_change = HashSet::new(); let mut messages_to_commit = Vec::new(); @@ -166,12 +168,17 @@ async fn consume_batch( }; match event.into_data() { - SearchProjectIndexQueueEventData::ProjectChange { + SearchProjectIndexQueueEventData::ProjectChange { project_id } => { + project_ids_to_change.insert(project_id); + } + SearchProjectIndexQueueEventData::ProjectVersionChange { project_id, version_ids, } => { - project_ids_to_change.insert(project_id); - version_ids_to_change.extend(version_ids); + if !version_ids.is_empty() { + project_ids_with_version_changes.insert(project_id); + version_ids_to_change.extend(version_ids); + } } SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { project_ids_to_remove.insert(project_id); @@ -182,9 +189,14 @@ async fn consume_batch( project_ids_to_change .retain(|project_id| !project_ids_to_remove.contains(project_id)); + project_ids_with_version_changes + .retain(|project_id| !project_ids_to_remove.contains(project_id)); let project_ids_to_change = project_ids_to_change.into_iter().collect::>(); + let project_ids_with_version_changes = project_ids_with_version_changes + .into_iter() + .collect::>(); let project_ids_to_remove = project_ids_to_remove.into_iter().collect::>(); let version_ids_to_change = @@ -192,9 +204,10 @@ async fn consume_batch( info!( kafka.message_count = messages_to_commit.len(), - "Read all Kafka messages in {:.2?}, found {} projects to change, {} versions to change, and {} projects to remove", + "Read all Kafka messages in {:.2?}, found {} projects to change, {} projects with version changes, {} versions to change, and {} projects to remove", start.elapsed(), project_ids_to_change.len(), + project_ids_with_version_changes.len(), version_ids_to_change.len(), project_ids_to_remove.len(), ); @@ -221,7 +234,7 @@ async fn consume_batch( let operation_start = Instant::now(); info!( version_count = version_ids_to_change.len(), - "Removing changed version documents" + "Removing changed version documents", ); search_backend .remove_documents(&version_ids_to_change) @@ -234,6 +247,30 @@ async fn consume_batch( ); } + if !project_ids_with_version_changes.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_with_version_changes.len(), + version_count = version_ids_to_change.len(), + "Indexing changed project versions" + ); + index_changed_project_versions( + ro_pool, + redis_pool, + search_backend, + &project_ids_with_version_changes, + &version_ids_to_change, + ) + .await + .wrap_err("failed to index changed project version batch")?; + info!( + project_count = project_ids_with_version_changes.len(), + version_count = version_ids_to_change.len(), + "Indexed changed project versions in {:.2?}", + operation_start.elapsed() + ); + } + if !project_ids_to_change.is_empty() { let operation_start = Instant::now(); info!( @@ -319,6 +356,40 @@ async fn index_changed_projects( Ok(()) } +async fn index_changed_project_versions( + ro_pool: &PgPool, + redis_pool: &RedisPool, + search_backend: &dyn SearchBackend, + project_ids: &[ProjectId], + version_ids: &[VersionId], +) -> eyre::Result<()> { + let documents = index_project_version_documents( + ro_pool, + redis_pool, + project_ids, + version_ids, + ) + .instrument(info_span!( + "index", + batch_size = project_ids.len(), + version_count = version_ids.len() + )) + .await + .wrap_err_with(|| { + format!( + "failed to build search documents for {} projects and {} versions", + project_ids.len(), + version_ids.len() + ) + })?; + + info!("Fetched all project version documents, indexing into backend"); + + search_backend.index_documents(&documents).await?; + + Ok(()) +} + #[derive(Debug, Deserialize)] #[serde(untagged)] enum SearchProjectIndexQueueEvent { @@ -331,10 +402,7 @@ impl SearchProjectIndexQueueEvent { match self { Self::Current(data) => data, Self::Legacy { project_id } => { - SearchProjectIndexQueueEventData::ProjectChange { - project_id, - version_ids: Vec::new(), - } + SearchProjectIndexQueueEventData::ProjectChange { project_id } } } } @@ -345,6 +413,9 @@ impl SearchProjectIndexQueueEvent { enum SearchProjectIndexQueueEventData { ProjectChange { project_id: ProjectId, + }, + ProjectVersionChange { + project_id: ProjectId, version_ids: Vec, }, ProjectRemoval { diff --git a/apps/labrinth/src/search/indexing.rs b/apps/labrinth/src/search/indexing.rs index 75e564722d..4ebe510abd 100644 --- a/apps/labrinth/src/search/indexing.rs +++ b/apps/labrinth/src/search/indexing.rs @@ -5,7 +5,7 @@ use futures::TryStreamExt; use heck::ToKebabCase; use itertools::Itertools; use regex::Regex; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::LazyLock; use tracing::{info, warn}; @@ -20,7 +20,7 @@ use crate::database::models::{ }; use crate::database::redis::RedisPool; use crate::models::exp; -use crate::models::ids::ProjectId; +use crate::models::ids::{ProjectId, VersionId}; use crate::models::projects::{DependencyType, from_duplicate_version_fields}; use crate::models::v2::projects::LegacyProject; use crate::routes::v2_reroute; @@ -114,7 +114,8 @@ pub async fn index_local( return Ok((vec![], i64::MAX)); }; - let uploads = build_search_documents(pool, redis, db_projects).await?; + let uploads = + build_search_documents(pool, redis, db_projects, None).await?; Ok((uploads, *largest)) } @@ -163,13 +164,65 @@ pub async fn index_project_documents( info!("Fetched partial projects"); - build_search_documents(pool, redis, db_projects).await + build_search_documents(pool, redis, db_projects, None).await +} + +pub async fn index_project_version_documents( + pool: &PgPool, + redis: &RedisPool, + project_ids: &[ProjectId], + version_ids: &[VersionId], +) -> eyre::Result> { + let searchable_statuses = searchable_statuses(); + let project_ids = project_ids + .iter() + .map(|project_id| DBProjectId::from(*project_id).0) + .collect::>(); + let version_ids = version_ids + .iter() + .map(|version_id| DBVersionId::from(*version_id)) + .collect::>(); + + let db_projects = sqlx::query!( + r#" + SELECT m.id id, m.name name, m.summary summary, m.downloads downloads, m.follows follows, + m.icon_url icon_url, m.updated updated, m.approved approved, m.published, m.license license, m.slug slug, m.color, + m.components AS "components: sqlx::types::Json" + FROM mods m + WHERE m.status = ANY($1) AND m.id = ANY($2) + GROUP BY m.id + ORDER BY m.id ASC; + "#, + &searchable_statuses, + &project_ids, + ) + .fetch(pool) + .map_ok(|m| PartialProject { + id: DBProjectId(m.id), + name: m.name, + summary: m.summary, + downloads: m.downloads, + follows: m.follows, + icon_url: m.icon_url, + updated: m.updated, + approved: m.approved.unwrap_or(m.published), + slug: m.slug, + color: m.color, + license: m.license, + components: m.components.0, + }) + .try_collect::>() + .await + .wrap_err("failed to fetch project")?; + + build_search_documents(pool, redis, db_projects, Some(&version_ids)).await } async fn build_search_documents( pool: &PgPool, redis: &RedisPool, db_projects: Vec, + version_ids_to_index: Option<&HashSet>, ) -> eyre::Result> { let searchable_statuses = searchable_statuses(); let project_ids = db_projects.iter().map(|x| x.id.0).collect::>(); @@ -501,6 +554,12 @@ async fn build_search_documents( .collect::>(); for version in versions { + if let Some(version_ids_to_index) = version_ids_to_index + && !version_ids_to_index.contains(&version.id) + { + continue; + } + let version_fields = VersionField::from_query_json( version.version_fields, &loader_fields, From e804ab0f79ce718517ffe97f82eec125ee755405 Mon Sep 17 00:00:00 2001 From: aecsocket <43144841+aecsocket@users.noreply.github.com> Date: Sat, 27 Jun 2026 14:35:02 +0100 Subject: [PATCH 11/11] ensure only necessary fields are serialized into typesense --- apps/labrinth/src/search/mod.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 9507aafc2e..d88a8ff656 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -232,16 +232,22 @@ impl FromStr for SearchBackendKind { } } +/// Nullable fields in Typesense-bound documents should use +/// `skip_serializing_if = "Option::is_none"` so they are omitted instead of +/// serialized as `null`. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct UploadSearchProject { pub version_id: String, pub project_id: String, // pub project_types: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub slug: Option, pub author: String, pub author_id: String, + #[serde(skip_serializing_if = "Option::is_none")] pub organization: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub organization_id: Option, pub indexed_author: String, pub name: String, @@ -252,9 +258,11 @@ pub struct UploadSearchProject { pub follows: i32, pub downloads: i32, pub log_downloads: f64, + #[serde(skip_serializing_if = "Option::is_none")] pub icon_url: Option, pub license: String, pub gallery: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub featured_gallery: Option, /// RFC 3339 formatted creation date of the project pub date_created: DateTime, @@ -267,6 +275,7 @@ pub struct UploadSearchProject { /// Unix timestamp of the publication date of the version pub version_published_timestamp: i64, pub open_source: bool, + #[serde(skip_serializing_if = "Option::is_none")] pub color: Option, #[serde(default)] pub dependency_project_ids: Vec, @@ -285,12 +294,17 @@ pub struct UploadSearchProject { pub loader_fields: HashMap>, } +/// Nullable fields in Typesense-bound documents should use +/// `skip_serializing_if = "Option::is_none"` so they are omitted instead of +/// serialized as `null`. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct SearchProjectDependency { pub project_id: String, pub dependency_type: DependencyType, pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] pub slug: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub icon_url: Option, }