From 9167bb26d4624d230db81e317856e8823a60ac04 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 23 Jun 2026 15:03:53 +0200 Subject: [PATCH 1/4] add a fast constructor if all items are available in one go --- crates/geo_filters/evaluation/performance.rs | 47 ++++++- crates/geo_filters/src/config/lookup.rs | 16 ++- crates/geo_filters/src/diff_count.rs | 124 +++++++++++++++++++ crates/geo_filters/src/diff_count/bitvec.rs | 28 +++++ 4 files changed, 209 insertions(+), 6 deletions(-) diff --git a/crates/geo_filters/evaluation/performance.rs b/crates/geo_filters/evaluation/performance.rs index 77a0ebd..35dc675 100644 --- a/crates/geo_filters/evaluation/performance.rs +++ b/crates/geo_filters/evaluation/performance.rs @@ -1,9 +1,10 @@ +use std::hash::BuildHasher; use std::hint::black_box; use criterion::{criterion_group, criterion_main, Criterion}; use geo_filters::build_hasher::UnstableDefaultBuildHasher; use geo_filters::config::VariableConfig; -use geo_filters::diff_count::{GeoDiffCount, GeoDiffCount13}; +use geo_filters::diff_count::{GeoDiffCount, GeoDiffCount13, GeoDiffCount7}; use geo_filters::distinct_count::GeoDistinctCount13; use geo_filters::evaluation::hll::Hll14; use geo_filters::Count; @@ -130,6 +131,50 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); } + + // Compare building a diff filter from a precomputed slice of hashes one by one + // (`push_hash`) versus in a single batch (`from_hashes`). The hashes are precomputed so that + // only the construction cost is measured. + for size in [1000usize, 10000, 100000, 1000000] { + let mut group = c.benchmark_group(format!("construct:{size}")); + let build_hasher = UnstableDefaultBuildHasher::default(); + let hashes: Vec = (0..size).map(|i| build_hasher.hash_one(i)).collect(); + + group.bench_function("geo_diff_count_7_push", |b| { + b.iter(|| { + let mut gc = GeoDiffCount7::default(); + for &hash in &hashes { + gc.push_hash(hash); + } + black_box(&gc); + }) + }); + group.bench_function("geo_diff_count_7_from_hashes", |b| { + b.iter(|| { + black_box(GeoDiffCount7::from_hashes( + Default::default(), + hashes.iter().copied(), + )); + }) + }); + group.bench_function("geo_diff_count_13_push", |b| { + b.iter(|| { + let mut gc = GeoDiffCount13::default(); + for &hash in &hashes { + gc.push_hash(hash); + } + black_box(&gc); + }) + }); + group.bench_function("geo_diff_count_13_from_hashes", |b| { + b.iter(|| { + black_box(GeoDiffCount13::from_hashes( + Default::default(), + hashes.iter().copied(), + )); + }) + }); + } } criterion_group!(benches, criterion_benchmark); diff --git a/crates/geo_filters/src/config/lookup.rs b/crates/geo_filters/src/config/lookup.rs index 67d4d1c..b97600b 100644 --- a/crates/geo_filters/src/config/lookup.rs +++ b/crates/geo_filters/src/config/lookup.rs @@ -2,22 +2,24 @@ use crate::config::phi_f64; pub(crate) struct HashToBucketLookup { b: usize, - buckets: Vec<(usize, usize)>, + buckets: Vec<(u32, u32)>, } impl HashToBucketLookup { pub(crate) fn new(b: usize) -> Self { - let mut buckets = vec![(0, 0); 2 << b]; + let mut buckets = vec![(0u32, 0u32); 2 << b]; let mut last_filled_bucket = buckets.len(); let phi = phi_f64(b); for bucket in 0..(1 << b) { let lower_bucket_limit = phi.powf((bucket + 1) as f64); + // `lower_hash_limit` is a 32-bit hash threshold: `lower_bucket_limit` lies in + // `[0.5, 1)`, so this value is always in `[0, 2^32)` and fits losslessly into a `u32`. let lower_hash_limit = ((lower_bucket_limit - 0.5) * 2.0f64.powf(33.0)) as usize; let lower_hash_bucket = lower_hash_limit >> (32 - b - 1); assert!(lower_hash_bucket < last_filled_bucket); while last_filled_bucket > lower_hash_bucket { last_filled_bucket -= 1; - buckets[last_filled_bucket] = (bucket, lower_hash_limit); + buckets[last_filled_bucket] = (bucket as u32, lower_hash_limit as u32); } } assert_eq!(last_filled_bucket, 0); @@ -38,8 +40,12 @@ impl HashToBucketLookup { } & 0xFFFFFFFF) as usize; // From those, the first B bits determine the bucket index in our lookup table. let idx = hash >> (32 - self.b - 1); - let offset = (hash < self.buckets[idx].1) as usize; - offset + self.buckets[idx].0 + (1 << self.b) * levels + // SAFETY: `hash` was masked to 32 bits, so `idx = hash >> (31 - b)` holds at most `b + 1` + // significant bits and is therefore always `< 2^(b+1) == 2 << b == self.buckets.len()`. + debug_assert!(idx < self.buckets.len()); + let (base, threshold) = *unsafe { self.buckets.get_unchecked(idx) }; + let offset = (hash < threshold as usize) as usize; + offset + base as usize + (1 << self.b) * levels } } diff --git a/crates/geo_filters/src/diff_count.rs b/crates/geo_filters/src/diff_count.rs index 728cd1f..c3474dc 100644 --- a/crates/geo_filters/src/diff_count.rs +++ b/crates/geo_filters/src/diff_count.rs @@ -120,6 +120,55 @@ impl<'a, C: GeoConfig> GeoDiffCount<'a, C> { result } + /// Constructs a [`GeoDiffCount`] directly from an iterator of item hashes. + /// + /// Inserting hashes one by one via [`Count::push_hash`] repeatedly toggles individual bits + /// and rebalances the sparse/dense split, which is wasteful when the hashes are known up + /// front. Instead, this function uses the (exact) number of hashes to estimate the split + /// point between the sparsely stored most-significant buckets (the "numbers") and the densely + /// stored least-significant buckets (the "bits"): + /// + /// * Buckets at or above the estimated split point are collected, sorted, and reduced + /// modulo two so that buckets occurring an even number of times cancel out. This matches + /// the symmetric-difference semantics of [`Diff`] and only sorts the comparatively few + /// most-significant buckets. + /// * Buckets below the split point are folded directly into a dense bit vector, where + /// repeated buckets cancel out as well. + /// + /// Both pieces are then combined into a single descending stream of one bits and handed to + /// [`Self::from_bit_chunks`], which re-establishes the filter invariants. The resulting filter + /// is identical to the one produced by pushing the same hashes one by one. + pub fn from_hashes(config: C, hashes: impl ExactSizeIterator) -> Self { + let split = estimate_split_bucket(&config, hashes.len()); + + // Buckets >= split are sparse and collected for sorting; buckets < split are dense and + // folded directly into the bit vector, where repeated buckets cancel via xor. The toggler + // resolves the bit vector's owned storage once, keeping the per-bit work out of the loop. + let mut numbers = Vec::new(); + let mut bits = BitVec::default(); + bits.resize(split); + { + let mut toggler = bits.toggler(); + for hash in hashes { + let bucket = config.hash_to_bucket(hash).into_usize(); + if bucket >= split { + numbers.push(bucket); + } else { + toggler.toggle(bucket); + } + } + } + + // Sort the most-significant buckets and drop those occurring an even number of times. + numbers.sort_unstable_by(|a, b| b.cmp(a)); + erase_even_occurrences(&mut numbers); + + Self::from_bit_chunks( + config, + iter_bit_chunks(numbers.into_iter(), bits.bit_chunks()), + ) + } + /// Compare two geometric filters after applying the specified mask. /// /// To reduce the number of operations, the implementation first xors the bit chunks together, @@ -429,6 +478,47 @@ pub(crate) fn xor>( ) } +/// Estimates the bucket id that separates the sparse most-significant buckets ("numbers") from +/// the dense least-significant buckets ("bits") for a filter built from `n` hashes. +/// +/// The expected number of hashes falling into buckets `>= s` is `n * phi^s`. We pick `s` such +/// that this is a small multiple of the most-significant bucket budget, so that after parity +/// reduction the most-significant buckets are almost always supplied by the collected numbers. +/// Choosing a slightly lower split only makes the collected set a bit larger; correctness does +/// not depend on the estimate, since [`GeoDiffCount::from_bit_chunks`] re-splits the combined +/// stream regardless. +fn estimate_split_bucket>(config: &C, n: usize) -> usize { + let target = 2 * config.max_msb_len(); + if n <= target { + return 0; + } + let ratio = target as f64 / n as f64; + let split = (ratio.ln() / config.phi_f64().ln()).floor() as usize; + // No bucket can ever exceed this bound, so never allocate a larger bit vector. + split.min(64 * config.bits_per_level()) +} + +/// Given a slice sorted in descending order, keeps a single copy of every value occurring an odd +/// number of times and drops every value occurring an even number of times. The retained values +/// stay sorted in descending order. +fn erase_even_occurrences(values: &mut Vec) { + let mut write = 0; + let mut read = 0; + while read < values.len() { + let value = values[read]; + let mut count = 0; + while read < values.len() && values[read] == value { + read += 1; + count += 1; + } + if count % 2 == 1 { + values[write] = value; + write += 1; + } + } + values.truncate(write); +} + impl> Count for GeoDiffCount<'_, C> { fn push_hash(&mut self, hash: u64) { self.xor_bit(self.config.hash_to_bucket(hash)); @@ -542,6 +632,40 @@ mod tests { assert_eq!(m.iter_ones().count(), 101); } + /// Building a filter from an iterator of hashes must produce exactly the same filter as + /// pushing those hashes one by one. + #[test] + fn test_from_hashes() { + fn assert_from_hashes_matches + Default>(hashes: &[u64], n: usize) { + let mut expected: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); + for &hash in hashes { + expected.push_hash(hash); + } + let actual: GeoDiffCount<'_, C> = + GeoDiffCount::from_hashes(C::default(), hashes.iter().copied()); + assert_eq!(expected, actual, "filter mismatch for n = {n}"); + assert_eq!( + expected.iter_ones().collect_vec(), + actual.iter_ones().collect_vec(), + "ones mismatch for n = {n}", + ); + } + + prng_test_harness(4, |rnd| { + for n in [0usize, 1, 5, 50, 500, 5000, 50000] { + // Draw from a smaller pool so that buckets are hit multiple times, exercising the + // even-occurrence cancellation on both the dense bits and the sparse numbers path. + let pool: Vec = (0..n.div_ceil(2).max(1)).map(|_| rnd.next_u64()).collect(); + let hashes: Vec = (0..n) + .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) + .collect(); + + assert_from_hashes_matches::(&hashes, n); + assert_from_hashes_matches::(&hashes, n); + } + }); + } + #[test] fn test_estimate_fast() { prng_test_harness(1, |rnd| { diff --git a/crates/geo_filters/src/diff_count/bitvec.rs b/crates/geo_filters/src/diff_count/bitvec.rs index f77323c..1317868 100644 --- a/crates/geo_filters/src/diff_count/bitvec.rs +++ b/crates/geo_filters/src/diff_count/bitvec.rs @@ -90,6 +90,17 @@ impl BitVec<'_> { self.blocks.to_mut()[block_idx] ^= bit_idx.into_block(); } + /// Returns a [`BitToggler`] that toggles many bits without re-resolving the `Cow` on every + /// access. [`Self::toggle`] resolves `self.blocks.to_mut()` on every call, which keeps a + /// branch in the caller's hot loop even when the storage is already owned; resolving it once + /// up front avoids that overhead when toggling a large number of bits. + pub fn toggler(&mut self) -> BitToggler<'_> { + BitToggler { + num_bits: self.num_bits, + blocks: self.blocks.to_mut(), + } + } + /// Returns an iterator over all blocks in reverse order. /// The blocks are represented as `BitChunk`s. pub fn bit_chunks(&self) -> impl Iterator + '_ { @@ -200,6 +211,23 @@ impl Index for BitVec<'_> { } } +/// Toggles bits in an already-owned [`BitVec`] without re-resolving the `Cow` on every call. +/// Obtained via [`BitVec::toggler`]. +pub(crate) struct BitToggler<'a> { + num_bits: usize, + blocks: &'a mut [u64], +} + +impl BitToggler<'_> { + /// Toggles the bit at the given zero-based position. The position must be `< num_bits`. + #[inline] + pub fn toggle(&mut self, index: usize) { + debug_assert!(index < self.num_bits); + let (block_idx, bit_idx) = index.into_index_and_bit(); + self.blocks[block_idx] ^= bit_idx.into_block(); + } +} + #[cfg(test)] mod tests { use super::*; From 52b25c661bbedac50b2fab25f3f6e30f3f466f60 Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Tue, 23 Jun 2026 15:25:17 +0200 Subject: [PATCH 2/4] tune split point --- crates/geo_filters/src/diff_count.rs | 38 +++++++++++++++++----------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/crates/geo_filters/src/diff_count.rs b/crates/geo_filters/src/diff_count.rs index c3474dc..59e26d6 100644 --- a/crates/geo_filters/src/diff_count.rs +++ b/crates/geo_filters/src/diff_count.rs @@ -139,12 +139,12 @@ impl<'a, C: GeoConfig> GeoDiffCount<'a, C> { /// [`Self::from_bit_chunks`], which re-establishes the filter invariants. The resulting filter /// is identical to the one produced by pushing the same hashes one by one. pub fn from_hashes(config: C, hashes: impl ExactSizeIterator) -> Self { - let split = estimate_split_bucket(&config, hashes.len()); + let (split, capacity) = estimate_split_bucket(&config, hashes.len()); // Buckets >= split are sparse and collected for sorting; buckets < split are dense and // folded directly into the bit vector, where repeated buckets cancel via xor. The toggler // resolves the bit vector's owned storage once, keeping the per-bit work out of the loop. - let mut numbers = Vec::new(); + let mut numbers = Vec::with_capacity(capacity); let mut bits = BitVec::default(); bits.resize(split); { @@ -479,23 +479,31 @@ pub(crate) fn xor>( } /// Estimates the bucket id that separates the sparse most-significant buckets ("numbers") from -/// the dense least-significant buckets ("bits") for a filter built from `n` hashes. +/// the dense least-significant buckets ("bits") for a filter built from `n` hashes, and a capacity +/// to preallocate the `numbers` buffer with (the expected count at or above the split, plus +/// headroom for variance). /// -/// The expected number of hashes falling into buckets `>= s` is `n * phi^s`. We pick `s` such -/// that this is a small multiple of the most-significant bucket budget, so that after parity -/// reduction the most-significant buckets are almost always supplied by the collected numbers. -/// Choosing a slightly lower split only makes the collected set a bit larger; correctness does -/// not depend on the estimate, since [`GeoDiffCount::from_bit_chunks`] re-splits the combined -/// stream regardless. -fn estimate_split_bucket>(config: &C, n: usize) -> usize { - let target = 2 * config.max_msb_len(); +/// The expected number of hashes falling into buckets `>= s` is `n * phi^s`. We target about +/// `max_msb_len / 2` such hashes: the most-significant buckets do *not* need to be fully supplied +/// by the collected numbers, since [`GeoDiffCount::from_bit_chunks`] re-splits the combined stream +/// and pulls the remainder from the dense bits. Because the buckets are geometric, raising the +/// split by one `bits_per_level` roughly halves the collected set, so a small target keeps the +/// sort cheap while only marginally enlarging the bit vector. Correctness does not depend on the +/// estimate. +fn estimate_split_bucket>(config: &C, n: usize) -> (usize, usize) { + let target = config.max_msb_len() / 2; + // The count at or above the split is ~`target` but varies (std ~sqrt(target)), so reserve + // twice as much to avoid reallocating `numbers` -- but never more than the total hash count. + let capacity = (2 * target).min(n); if n <= target { - return 0; + // Every hash ends up in `numbers` (split == 0). + return (0, capacity); } let ratio = target as f64 / n as f64; - let split = (ratio.ln() / config.phi_f64().ln()).floor() as usize; - // No bucket can ever exceed this bound, so never allocate a larger bit vector. - split.min(64 * config.bits_per_level()) + let split = ((ratio.ln() / config.phi_f64().ln()).floor() as usize) + // No bucket can ever exceed this bound, so never allocate a larger bit vector. + .min(64 * config.bits_per_level()); + (split, capacity) } /// Given a slice sorted in descending order, keeps a single copy of every value occurring an odd From e1654571ddf0820c9d4ab9bc761a41be2d470f6d Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Wed, 24 Jun 2026 12:16:34 +0200 Subject: [PATCH 3/4] introduce a more generic extend function --- crates/geo_filters/evaluation/performance.rs | 68 +++++- crates/geo_filters/src/config/bitchunks.rs | 79 ++++++- crates/geo_filters/src/diff_count.rs | 227 +++++++++++++------ 3 files changed, 294 insertions(+), 80 deletions(-) diff --git a/crates/geo_filters/evaluation/performance.rs b/crates/geo_filters/evaluation/performance.rs index 35dc675..ae4b6d7 100644 --- a/crates/geo_filters/evaluation/performance.rs +++ b/crates/geo_filters/evaluation/performance.rs @@ -133,8 +133,8 @@ fn criterion_benchmark(c: &mut Criterion) { } // Compare building a diff filter from a precomputed slice of hashes one by one - // (`push_hash`) versus in a single batch (`from_hashes`). The hashes are precomputed so that - // only the construction cost is measured. + // (`push_hash`) versus in a single batch (`extend_by_hashes` on an empty filter). The hashes + // are precomputed so that only the construction cost is measured. for size in [1000usize, 10000, 100000, 1000000] { let mut group = c.benchmark_group(format!("construct:{size}")); let build_hasher = UnstableDefaultBuildHasher::default(); @@ -149,12 +149,11 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(&gc); }) }); - group.bench_function("geo_diff_count_7_from_hashes", |b| { + group.bench_function("geo_diff_count_7_extend", |b| { b.iter(|| { - black_box(GeoDiffCount7::from_hashes( - Default::default(), - hashes.iter().copied(), - )); + let mut gc = GeoDiffCount7::default(); + gc.extend_by_hashes(hashes.iter().copied()); + black_box(&gc); }) }); group.bench_function("geo_diff_count_13_push", |b| { @@ -166,15 +165,60 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(&gc); }) }); - group.bench_function("geo_diff_count_13_from_hashes", |b| { + group.bench_function("geo_diff_count_13_extend", |b| { b.iter(|| { - black_box(GeoDiffCount13::from_hashes( - Default::default(), - hashes.iter().copied(), - )); + let mut gc = GeoDiffCount13::default(); + gc.extend_by_hashes(hashes.iter().copied()); + black_box(&gc); }) }); } + + // Insert a batch of hashes into an existing filter: one-by-one `push_hash` vs the batched + // `extend_by_hashes`. The existing filter is cloned in the (untimed) batched setup so that only + // the insertion cost is measured, not the clone. + for (existing_n, batch_n) in [ + (10000usize, 10000usize), + (100000, 10000), + (100000, 100000), + (1000, 100000), + ] { + let mut group = c.benchmark_group(format!("insert_batch:{existing_n}+{batch_n}")); + let build_hasher = UnstableDefaultBuildHasher::default(); + let batch: Vec = (existing_n..existing_n + batch_n) + .map(|i| build_hasher.hash_one(i)) + .collect(); + let base = { + let mut gc = GeoDiffCount13::default(); + for i in 0..existing_n { + gc.push_hash(build_hasher.hash_one(i)); + } + gc + }; + + group.bench_function("push", |b| { + b.iter_batched( + || base.clone(), + |mut gc| { + for &hash in &batch { + gc.push_hash(hash); + } + gc + }, + criterion::BatchSize::SmallInput, + ) + }); + group.bench_function("extend_by_hashes", |b| { + b.iter_batched( + || base.clone(), + |mut gc| { + gc.extend_by_hashes(batch.iter().copied()); + gc + }, + criterion::BatchSize::SmallInput, + ) + }); + } } criterion_group!(benches, criterion_benchmark); diff --git a/crates/geo_filters/src/config/bitchunks.rs b/crates/geo_filters/src/config/bitchunks.rs index 72660c7..eff424a 100644 --- a/crates/geo_filters/src/config/bitchunks.rs +++ b/crates/geo_filters/src/config/bitchunks.rs @@ -20,6 +20,11 @@ impl BitChunk { } } +/// Merges a descending stream of distinct one-bit positions (`leading`) with a descending stream +/// of `BitChunk`s (`trailing`) into a single descending `BitChunk` stream. All leading positions +/// must be more significant than all trailing bits, except that the least-significant leading block +/// may overlap the most-significant trailing block (the two are or-ed). Leading positions must be +/// distinct. pub(crate) fn iter_bit_chunks( leading: impl Iterator, trailing: impl Iterator, @@ -55,8 +60,7 @@ impl, J: Iterator> Iterator for BitCh _ => break, } } - // All leading bits were consumed, test whether it can be merged with - // trailing bits. + // All leading bits were consumed, test whether it can be merged with trailing bits. match self.trailing.peek() { Some(BitChunk { index: other_index, @@ -78,6 +82,39 @@ impl, J: Iterator> Iterator for BitCh } } +/// Turns a descending stream of one-bit positions into a descending `BitChunk` stream containing +/// each position's parity. Unlike [`iter_bit_chunks`], positions may repeat: a position occurring +/// an even number of times cancels out (xor), and a block that cancels out entirely is skipped. +pub(crate) fn parity_bit_positions( + positions: impl Iterator, +) -> impl Iterator { + ParityBitPositions(positions.peekable()) +} + +struct ParityBitPositions>(Peekable); + +impl> Iterator for ParityBitPositions { + type Item = BitChunk; + + fn next(&mut self) -> Option { + loop { + let (index, bit) = self.0.next()?.into_index_and_bit(); + let mut block: u64 = bit.into_block(); + while let Some(&other) = self.0.peek() { + if other.into_index() != index { + break; + } + block ^= other.into_bit().into_block(); + self.0.next(); + } + // The block is empty only if its positions cancelled out entirely; skip it. + if block != 0 { + return Some(BitChunk::new(index, block)); + } + } + } +} + /// Combine-s two `BitChunk` iterators using the given operator. Both iterators have to output /// `BitChunk`s from most to least significant and must not report duplicates! struct BinOpBitChunksIterator< @@ -314,7 +351,7 @@ impl> Iterator for BitChunksOnes(chunks.into_iter().peekable()).collect_vec() ); } + + #[test] + fn test_iter_bit_chunks() { + // Distinct leading bits merge within a block (via or) and merge with the trailing block at + // the boundary index. + let chunks = iter_bit_chunks( + vec![70, 67, 5].into_iter(), + vec![BitChunk::new(0, 1 << 2)].into_iter(), + ) + .collect_vec(); + assert_eq!( + chunks, + vec![ + BitChunk::new(1, (1 << 6) | (1 << 3)), // 70, 67 + BitChunk::new(0, (1 << 5) | (1 << 2)), // 5 (leading) and bit 2 (trailing) + ] + ); + } + + #[test] + fn test_parity_bit_positions() { + // Equal positions cancel (xor): 70 twice cancels, 67 stays; 5 three times stays, 3 once. + let chunks = parity_bit_positions(vec![70, 70, 67, 5, 5, 5, 3].into_iter()).collect_vec(); + assert_eq!( + chunks, + vec![ + BitChunk::new(1, 1 << 3), // 67 + BitChunk::new(0, (1 << 5) | (1 << 3)), // 5, 3 + ] + ); + + // A block whose positions cancel out entirely is skipped. + assert!(parity_bit_positions(vec![5, 5].into_iter()) + .collect_vec() + .is_empty()); + } } diff --git a/crates/geo_filters/src/diff_count.rs b/crates/geo_filters/src/diff_count.rs index 59e26d6..93089c9 100644 --- a/crates/geo_filters/src/diff_count.rs +++ b/crates/geo_filters/src/diff_count.rs @@ -8,7 +8,8 @@ use std::ops::Deref as _; use crate::config::{ count_ones_from_bitchunks, count_ones_from_msb_and_lsb, iter_bit_chunks, iter_ones, - mask_bit_chunks, take_ref, xor_bit_chunks, BitChunk, GeoConfig, IsBucketType, + mask_bit_chunks, parity_bit_positions, take_ref, xor_bit_chunks, BitChunk, GeoConfig, + IsBucketType, }; use crate::{Count, Diff}; @@ -120,53 +121,105 @@ impl<'a, C: GeoConfig> GeoDiffCount<'a, C> { result } - /// Constructs a [`GeoDiffCount`] directly from an iterator of item hashes. + /// Extends this filter by inserting (xor-ing in) a batch of item hashes in a single pass. /// - /// Inserting hashes one by one via [`Count::push_hash`] repeatedly toggles individual bits - /// and rebalances the sparse/dense split, which is wasteful when the hashes are known up - /// front. Instead, this function uses the (exact) number of hashes to estimate the split - /// point between the sparsely stored most-significant buckets (the "numbers") and the densely - /// stored least-significant buckets (the "bits"): + /// Instead of rebalancing the sparse/dense split after every hash (as a loop of + /// [`Count::push_hash`] calls would), it folds the dense low buckets directly into the existing + /// bit vector and merges the sparse high buckets with the current most-significant buckets, + /// re-establishing the invariants once at the end. The result is identical to pushing the + /// hashes one by one. To build a fresh filter, start from [`Self::new`] and extend it. /// - /// * Buckets at or above the estimated split point are collected, sorted, and reduced - /// modulo two so that buckets occurring an even number of times cancel out. This matches - /// the symmetric-difference semantics of [`Diff`] and only sorts the comparatively few - /// most-significant buckets. - /// * Buckets below the split point are folded directly into a dense bit vector, where - /// repeated buckets cancel out as well. - /// - /// Both pieces are then combined into a single descending stream of one bits and handed to - /// [`Self::from_bit_chunks`], which re-establishes the filter invariants. The resulting filter - /// is identical to the one produced by pushing the same hashes one by one. - pub fn from_hashes(config: C, hashes: impl ExactSizeIterator) -> Self { - let (split, capacity) = estimate_split_bucket(&config, hashes.len()); - - // Buckets >= split are sparse and collected for sorting; buckets < split are dense and - // folded directly into the bit vector, where repeated buckets cancel via xor. The toggler - // resolves the bit vector's owned storage once, keeping the per-bit work out of the loop. - let mut numbers = Vec::with_capacity(capacity); - let mut bits = BitVec::default(); - bits.resize(split); + /// The split between dense and sparse buckets is `max(boundary, estimated split for the batch + /// size)`: for a small batch this is just the current boundary, but for a batch larger than + /// the filter currently holds (including the empty-filter case) the bit vector is presized so + /// that only `~max_msb_len` buckets need sorting (rather than growing with the batch). Buckets + /// below the split — including the existing msb entries demoted by a larger split — are toggled + /// straight into the bit vector; the rest are xor-merged with the remaining msb. The top + /// `max_msb_len` of that merge become the new msb (its smallest entry is the new boundary); any + /// surplus is flipped into the bit vector. If cancellations leave the msb under-full, the + /// highest bits are pulled back out of the bit vector instead. Crucially, the existing dense + /// bits are never re-materialized. + pub fn extend_by_hashes(&mut self, hashes: impl ExactSizeIterator) { + let max_msb_len = self.config.max_msb_len(); + let boundary = self.lsb.num_bits(); + // The estimated split exceeds the current boundary exactly when more items are inserted + // than the filter holds; presizing then keeps the collected set (and the sort) bounded. + // For a smaller batch this is just the boundary, leaving the existing dense bits in place. + let (estimate, capacity) = estimate_split_bucket(&self.config, hashes.len()); + let split = estimate.max(boundary); + if split > boundary { + self.lsb.resize(split); + } + // The existing msb entries at or above the split stay sparse; those below it are demoted + // into the bit vector (the msb is stored in descending order). + let msb_split = self.msb.partition_point(|b| b.into_usize() >= split); + + // Fold every bucket below the split into the bit vector (new lows and demoted msb entries); + // collect the buckets at or above the split to merge with the remaining msb. + let mut new_high = Vec::with_capacity(capacity); { - let mut toggler = bits.toggler(); + let config = &self.config; + let mut toggler = self.lsb.toggler(); + for bucket in self.msb[msb_split..].iter() { + toggler.toggle(bucket.into_usize()); + } for hash in hashes { let bucket = config.hash_to_bucket(hash).into_usize(); if bucket >= split { - numbers.push(bucket); + new_high.push(bucket); } else { toggler.toggle(bucket); } } } + // Sort the collected buckets; duplicates cancel during the (xor-merging) chunk iteration + // below, so no separate parity pass is needed. + new_high.sort_unstable_by(|a, b| b.cmp(a)); + + // Xor the remaining (>= split) msb with the collected buckets (even occurrences cancel). + // The descending result yields the new msb (its top `max_msb_len` entries); any remainder + // of the same iterator is folded straight into the bit vector without a separate buffer. + let mut new_msb: Vec = Vec::with_capacity(max_msb_len); + { + let msb = iter_bit_chunks( + self.msb[..msb_split].iter().map(|b| b.into_usize()), + std::iter::empty(), + ); + let high = parity_bit_positions(new_high.iter().copied()); + let mut buckets = iter_ones::(xor_bit_chunks(msb, high).peekable()); + + new_msb.extend(buckets.by_ref().take(max_msb_len)); + + if new_msb.len() == max_msb_len { + // The msb stays full: its smallest entry is the new boundary and the remaining + // buckets are folded into the (grown) bit vector. + let smallest = new_msb[max_msb_len - 1].into_usize(); + self.lsb.resize(smallest); + let mut toggler = self.lsb.toggler(); + for bucket in buckets { + toggler.toggle(bucket.into_usize()); + } + } else { + // The collected buckets did not fill the msb: refill it from the highest bits of + // the bit vector, then truncate the bit vector to the new boundary (empty if it + // could not be refilled). + let need = max_msb_len - new_msb.len(); + let pulled: Vec = + iter_ones::(self.lsb.bit_chunks().peekable()) + .take(need) + .collect(); + let smallest = if pulled.len() == need { + pulled[need - 1].into_usize() + } else { + 0 + }; + new_msb.extend(pulled); + self.lsb.resize(smallest); + } + } - // Sort the most-significant buckets and drop those occurring an even number of times. - numbers.sort_unstable_by(|a, b| b.cmp(a)); - erase_even_occurrences(&mut numbers); - - Self::from_bit_chunks( - config, - iter_bit_chunks(numbers.into_iter(), bits.bit_chunks()), - ) + self.msb = Cow::from(new_msb); + self.debug_assert_invariants(); } /// Compare two geometric filters after applying the specified mask. @@ -506,27 +559,6 @@ fn estimate_split_bucket>(config: &C, n: usize) -> (usize, us (split, capacity) } -/// Given a slice sorted in descending order, keeps a single copy of every value occurring an odd -/// number of times and drops every value occurring an even number of times. The retained values -/// stay sorted in descending order. -fn erase_even_occurrences(values: &mut Vec) { - let mut write = 0; - let mut read = 0; - while read < values.len() { - let value = values[read]; - let mut count = 0; - while read < values.len() && values[read] == value { - read += 1; - count += 1; - } - if count % 2 == 1 { - values[write] = value; - write += 1; - } - } - values.truncate(write); -} - impl> Count for GeoDiffCount<'_, C> { fn push_hash(&mut self, hash: u64) { self.xor_bit(self.config.hash_to_bucket(hash)); @@ -640,17 +672,17 @@ mod tests { assert_eq!(m.iter_ones().count(), 101); } - /// Building a filter from an iterator of hashes must produce exactly the same filter as - /// pushing those hashes one by one. + /// Building a filter from an iterator of hashes (via [`GeoDiffCount::extend_by_hashes`] on an + /// empty filter) must produce exactly the same filter as pushing those hashes one by one. #[test] - fn test_from_hashes() { - fn assert_from_hashes_matches + Default>(hashes: &[u64], n: usize) { + fn test_extend_from_empty() { + fn assert_extend_matches + Default>(hashes: &[u64], n: usize) { let mut expected: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); for &hash in hashes { expected.push_hash(hash); } - let actual: GeoDiffCount<'_, C> = - GeoDiffCount::from_hashes(C::default(), hashes.iter().copied()); + let mut actual: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); + actual.extend_by_hashes(hashes.iter().copied()); assert_eq!(expected, actual, "filter mismatch for n = {n}"); assert_eq!( expected.iter_ones().collect_vec(), @@ -668,8 +700,73 @@ mod tests { .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) .collect(); - assert_from_hashes_matches::(&hashes, n); - assert_from_hashes_matches::(&hashes, n); + assert_extend_matches::(&hashes, n); + assert_extend_matches::(&hashes, n); + } + }); + } + + /// Extending an existing filter with a batch of hashes must produce exactly the same filter as + /// pushing the existing hashes and the batch one by one. + #[test] + fn test_extend_by_hashes() { + fn assert_extend_matches + Default>(existing: &[u64], batch: &[u64]) { + let mut expected: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); + for &hash in existing.iter().chain(batch) { + expected.push_hash(hash); + } + + let mut actual: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); + for &hash in existing { + actual.push_hash(hash); + } + actual.extend_by_hashes(batch.iter().copied()); + + let label = (existing.len(), batch.len()); + assert_eq!(expected, actual, "filter mismatch for {label:?}"); + assert_eq!( + expected.iter_ones().collect_vec(), + actual.iter_ones().collect_vec(), + "ones mismatch for {label:?}", + ); + } + + prng_test_harness(4, |rnd| { + // Mix of: empty filter, empty batch, small/large existing, and inserting many more + // hashes than the filter currently holds (exercising the split-growth path). + for (e, b) in [ + (0usize, 0usize), + (0, 500), + (500, 0), + (500, 500), + (5000, 200), + (200, 50000), + (100, 200000), + (5000, 5000), + ] { + // Draw existing and batch from a shared pool so they overlap, exercising the xor + // cancellation (removal) that insertion performs on already-set buckets. + let pool: Vec = (0..(e + b).div_ceil(2).max(1)) + .map(|_| rnd.next_u64()) + .collect(); + let draw = |count: usize, rnd: &mut ChaCha12Rng| -> Vec { + (0..count) + .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) + .collect() + }; + let existing = draw(e, rnd); + let batch = draw(b, rnd); + + assert_extend_matches::(&existing, &batch); + assert_extend_matches::(&existing, &batch); + + // Re-inserting (a prefix of) the existing hashes cancels those buckets, draining + // the msb below capacity and exercising the refill-from-bit-vector path. + let subset = &existing[..existing.len() / 2]; + assert_extend_matches::(&existing, subset); + assert_extend_matches::(&existing, subset); + assert_extend_matches::(&existing, &existing); + assert_extend_matches::(&existing, &existing); } }); } From f278b8b62882aef7bbe0f7e5585cf374d49d980f Mon Sep 17 00:00:00 2001 From: Alexander Neubeck Date: Thu, 25 Jun 2026 19:16:01 +0200 Subject: [PATCH 4/4] replace extend with a more flexible builder API --- crates/geo_filters/evaluation/performance.rs | 99 ++-- crates/geo_filters/src/config/bitchunks.rs | 53 +- crates/geo_filters/src/diff_count.rs | 494 ++++++++++++------- crates/geo_filters/src/diff_count/bitvec.rs | 12 + 4 files changed, 365 insertions(+), 293 deletions(-) diff --git a/crates/geo_filters/evaluation/performance.rs b/crates/geo_filters/evaluation/performance.rs index ae4b6d7..ca03abb 100644 --- a/crates/geo_filters/evaluation/performance.rs +++ b/crates/geo_filters/evaluation/performance.rs @@ -4,7 +4,9 @@ use std::hint::black_box; use criterion::{criterion_group, criterion_main, Criterion}; use geo_filters::build_hasher::UnstableDefaultBuildHasher; use geo_filters::config::VariableConfig; -use geo_filters::diff_count::{GeoDiffCount, GeoDiffCount13, GeoDiffCount7}; +use geo_filters::diff_count::{ + GeoDiffConfig13, GeoDiffCount, GeoDiffCount13, GeoDiffCount7, GeoDiffCountBuilder, +}; use geo_filters::distinct_count::GeoDistinctCount13; use geo_filters::evaluation::hll::Hll14; use geo_filters::Count; @@ -132,9 +134,9 @@ fn criterion_benchmark(c: &mut Criterion) { }); } - // Compare building a diff filter from a precomputed slice of hashes one by one - // (`push_hash`) versus in a single batch (`extend_by_hashes` on an empty filter). The hashes - // are precomputed so that only the construction cost is measured. + // Compare building a diff filter from a precomputed slice of hashes one by one (`push_hash`) + // versus via the incremental `GeoDiffCountBuilder` (per-hash, and the batched + // `extend_by_hashes`). The hashes are precomputed so that only construction cost is measured. for size in [1000usize, 10000, 100000, 1000000] { let mut group = c.benchmark_group(format!("construct:{size}")); let build_hasher = UnstableDefaultBuildHasher::default(); @@ -149,13 +151,6 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(&gc); }) }); - group.bench_function("geo_diff_count_7_extend", |b| { - b.iter(|| { - let mut gc = GeoDiffCount7::default(); - gc.extend_by_hashes(hashes.iter().copied()); - black_box(&gc); - }) - }); group.bench_function("geo_diff_count_13_push", |b| { b.iter(|| { let mut gc = GeoDiffCount13::default(); @@ -165,58 +160,42 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(&gc); }) }); - group.bench_function("geo_diff_count_13_extend", |b| { + group.bench_function("geo_diff_count_13_builder_extend", |b| { b.iter(|| { - let mut gc = GeoDiffCount13::default(); - gc.extend_by_hashes(hashes.iter().copied()); - black_box(&gc); + let mut builder = GeoDiffCountBuilder::with_capacity( + GeoDiffConfig13::::default(), + 0, + ); + builder.extend_by_hashes(hashes.iter().copied()); + black_box(builder.build()); }) }); - } - - // Insert a batch of hashes into an existing filter: one-by-one `push_hash` vs the batched - // `extend_by_hashes`. The existing filter is cloned in the (untimed) batched setup so that only - // the insertion cost is measured, not the clone. - for (existing_n, batch_n) in [ - (10000usize, 10000usize), - (100000, 10000), - (100000, 100000), - (1000, 100000), - ] { - let mut group = c.benchmark_group(format!("insert_batch:{existing_n}+{batch_n}")); - let build_hasher = UnstableDefaultBuildHasher::default(); - let batch: Vec = (existing_n..existing_n + batch_n) - .map(|i| build_hasher.hash_one(i)) - .collect(); - let base = { - let mut gc = GeoDiffCount13::default(); - for i in 0..existing_n { - gc.push_hash(build_hasher.hash_one(i)); - } - gc - }; - - group.bench_function("push", |b| { - b.iter_batched( - || base.clone(), - |mut gc| { - for &hash in &batch { - gc.push_hash(hash); - } - gc - }, - criterion::BatchSize::SmallInput, - ) - }); - group.bench_function("extend_by_hashes", |b| { - b.iter_batched( - || base.clone(), - |mut gc| { - gc.extend_by_hashes(batch.iter().copied()); - gc - }, - criterion::BatchSize::SmallInput, - ) + group.bench_function("geo_diff_count_13_builder", |b| { + b.iter(|| { + let mut builder = GeoDiffCountBuilder::with_capacity( + GeoDiffConfig13::::default(), + size, + ); + for &hash in &hashes { + builder.push_hash(hash); + } + black_box(builder.build()); + }) + }); + // Reserve nothing so the split starts at 0 and every bucket initially lands in `numbers`, + // forcing the buffer to fill and compact (lazily flush) repeatedly as the split ramps up. + // This isolates the cost of the lazy-flush path versus a well-positioned builder. + group.bench_function("geo_diff_count_13_builder_unreserved", |b| { + b.iter(|| { + let mut builder = GeoDiffCountBuilder::with_capacity( + GeoDiffConfig13::::default(), + 0, + ); + for &hash in &hashes { + builder.push_hash(hash); + } + black_box(builder.build()); + }) }); } } diff --git a/crates/geo_filters/src/config/bitchunks.rs b/crates/geo_filters/src/config/bitchunks.rs index eff424a..438a5df 100644 --- a/crates/geo_filters/src/config/bitchunks.rs +++ b/crates/geo_filters/src/config/bitchunks.rs @@ -82,39 +82,6 @@ impl, J: Iterator> Iterator for BitCh } } -/// Turns a descending stream of one-bit positions into a descending `BitChunk` stream containing -/// each position's parity. Unlike [`iter_bit_chunks`], positions may repeat: a position occurring -/// an even number of times cancels out (xor), and a block that cancels out entirely is skipped. -pub(crate) fn parity_bit_positions( - positions: impl Iterator, -) -> impl Iterator { - ParityBitPositions(positions.peekable()) -} - -struct ParityBitPositions>(Peekable); - -impl> Iterator for ParityBitPositions { - type Item = BitChunk; - - fn next(&mut self) -> Option { - loop { - let (index, bit) = self.0.next()?.into_index_and_bit(); - let mut block: u64 = bit.into_block(); - while let Some(&other) = self.0.peek() { - if other.into_index() != index { - break; - } - block ^= other.into_bit().into_block(); - self.0.next(); - } - // The block is empty only if its positions cancelled out entirely; skip it. - if block != 0 { - return Some(BitChunk::new(index, block)); - } - } - } -} - /// Combine-s two `BitChunk` iterators using the given operator. Both iterators have to output /// `BitChunk`s from most to least significant and must not report duplicates! struct BinOpBitChunksIterator< @@ -351,7 +318,7 @@ impl> Iterator for BitChunksOnes> GeoDiffCount<'a, C> { result } - /// Extends this filter by inserting (xor-ing in) a batch of item hashes in a single pass. - /// - /// Instead of rebalancing the sparse/dense split after every hash (as a loop of - /// [`Count::push_hash`] calls would), it folds the dense low buckets directly into the existing - /// bit vector and merges the sparse high buckets with the current most-significant buckets, - /// re-establishing the invariants once at the end. The result is identical to pushing the - /// hashes one by one. To build a fresh filter, start from [`Self::new`] and extend it. - /// - /// The split between dense and sparse buckets is `max(boundary, estimated split for the batch - /// size)`: for a small batch this is just the current boundary, but for a batch larger than - /// the filter currently holds (including the empty-filter case) the bit vector is presized so - /// that only `~max_msb_len` buckets need sorting (rather than growing with the batch). Buckets - /// below the split — including the existing msb entries demoted by a larger split — are toggled - /// straight into the bit vector; the rest are xor-merged with the remaining msb. The top - /// `max_msb_len` of that merge become the new msb (its smallest entry is the new boundary); any - /// surplus is flipped into the bit vector. If cancellations leave the msb under-full, the - /// highest bits are pulled back out of the bit vector instead. Crucially, the existing dense - /// bits are never re-materialized. - pub fn extend_by_hashes(&mut self, hashes: impl ExactSizeIterator) { - let max_msb_len = self.config.max_msb_len(); - let boundary = self.lsb.num_bits(); - // The estimated split exceeds the current boundary exactly when more items are inserted - // than the filter holds; presizing then keeps the collected set (and the sort) bounded. - // For a smaller batch this is just the boundary, leaving the existing dense bits in place. - let (estimate, capacity) = estimate_split_bucket(&self.config, hashes.len()); - let split = estimate.max(boundary); - if split > boundary { - self.lsb.resize(split); - } - // The existing msb entries at or above the split stay sparse; those below it are demoted - // into the bit vector (the msb is stored in descending order). - let msb_split = self.msb.partition_point(|b| b.into_usize() >= split); - - // Fold every bucket below the split into the bit vector (new lows and demoted msb entries); - // collect the buckets at or above the split to merge with the remaining msb. - let mut new_high = Vec::with_capacity(capacity); - { - let config = &self.config; - let mut toggler = self.lsb.toggler(); - for bucket in self.msb[msb_split..].iter() { - toggler.toggle(bucket.into_usize()); - } - for hash in hashes { - let bucket = config.hash_to_bucket(hash).into_usize(); - if bucket >= split { - new_high.push(bucket); - } else { - toggler.toggle(bucket); - } - } - } - // Sort the collected buckets; duplicates cancel during the (xor-merging) chunk iteration - // below, so no separate parity pass is needed. - new_high.sort_unstable_by(|a, b| b.cmp(a)); - - // Xor the remaining (>= split) msb with the collected buckets (even occurrences cancel). - // The descending result yields the new msb (its top `max_msb_len` entries); any remainder - // of the same iterator is folded straight into the bit vector without a separate buffer. - let mut new_msb: Vec = Vec::with_capacity(max_msb_len); - { - let msb = iter_bit_chunks( - self.msb[..msb_split].iter().map(|b| b.into_usize()), - std::iter::empty(), - ); - let high = parity_bit_positions(new_high.iter().copied()); - let mut buckets = iter_ones::(xor_bit_chunks(msb, high).peekable()); - - new_msb.extend(buckets.by_ref().take(max_msb_len)); - - if new_msb.len() == max_msb_len { - // The msb stays full: its smallest entry is the new boundary and the remaining - // buckets are folded into the (grown) bit vector. - let smallest = new_msb[max_msb_len - 1].into_usize(); - self.lsb.resize(smallest); - let mut toggler = self.lsb.toggler(); - for bucket in buckets { - toggler.toggle(bucket.into_usize()); - } - } else { - // The collected buckets did not fill the msb: refill it from the highest bits of - // the bit vector, then truncate the bit vector to the new boundary (empty if it - // could not be refilled). - let need = max_msb_len - new_msb.len(); - let pulled: Vec = - iter_ones::(self.lsb.bit_chunks().peekable()) - .take(need) - .collect(); - let smallest = if pulled.len() == need { - pulled[need - 1].into_usize() - } else { - 0 - }; - new_msb.extend(pulled); - self.lsb.resize(smallest); - } - } - - self.msb = Cow::from(new_msb); - self.debug_assert_invariants(); - } - /// Compare two geometric filters after applying the specified mask. /// /// To reduce the number of operations, the implementation first xors the bit chunks together, @@ -531,10 +429,8 @@ pub(crate) fn xor>( ) } -/// Estimates the bucket id that separates the sparse most-significant buckets ("numbers") from -/// the dense least-significant buckets ("bits") for a filter built from `n` hashes, and a capacity -/// to preallocate the `numbers` buffer with (the expected count at or above the split, plus -/// headroom for variance). +/// Estimates the split bucket separating the sparse most-significant buckets ("numbers") from +/// the dense least-significant buckets ("bits") for a filter built from `n` hashes. /// /// The expected number of hashes falling into buckets `>= s` is `n * phi^s`. We target about /// `max_msb_len / 2` such hashes: the most-significant buckets do *not* need to be fully supplied @@ -543,20 +439,254 @@ pub(crate) fn xor>( /// split by one `bits_per_level` roughly halves the collected set, so a small target keeps the /// sort cheap while only marginally enlarging the bit vector. Correctness does not depend on the /// estimate. -fn estimate_split_bucket>(config: &C, n: usize) -> (usize, usize) { +fn estimate_split_bucket>(config: &C, n: usize) -> usize { let target = config.max_msb_len() / 2; - // The count at or above the split is ~`target` but varies (std ~sqrt(target)), so reserve - // twice as much to avoid reallocating `numbers` -- but never more than the total hash count. - let capacity = (2 * target).min(n); if n <= target { // Every hash ends up in `numbers` (split == 0). - return (0, capacity); + return 0; } let ratio = target as f64 / n as f64; - let split = ((ratio.ln() / config.phi_f64().ln()).floor() as usize) + ((ratio.ln() / config.phi_f64().ln()).floor() as usize) // No bucket can ever exceed this bound, so never allocate a larger bit vector. - .min(64 * config.bits_per_level()); - (split, capacity) + .min(64 * config.bits_per_level()) +} + +/// Splits a descending stream of set buckets into the new msb (the top `max_msb_len`) and folds +/// the remaining buckets into `lsb`, resizing it to the new boundary. If the stream is too short +/// to fill the msb, the highest bits of `lsb` are pulled back out to refill it (and `lsb` is +/// truncated accordingly, or emptied if it could not be refilled). Returns the new msb. +fn split_into_msb( + mut buckets: impl Iterator, + lsb: &mut BitVec<'_>, + max_msb_len: usize, +) -> Vec { + let mut msb: Vec = Vec::with_capacity(max_msb_len); + msb.extend(buckets.by_ref().take(max_msb_len)); + if msb.len() == max_msb_len { + // The msb is full: its smallest entry is the new boundary, the rest folds into the bits. + let smallest = msb[max_msb_len - 1].into_usize(); + lsb.resize(smallest); + let mut toggler = lsb.toggler(); + for bucket in buckets { + toggler.toggle(bucket.into_usize()); + } + } else { + // Refill the msb from the highest bits, then truncate the bits to the new boundary. + let need = max_msb_len - msb.len(); + let pulled: Vec = iter_ones::(lsb.bit_chunks().peekable()) + .take(need) + .collect(); + let smallest = if pulled.len() == need { + pulled[need - 1].into_usize() + } else { + 0 + }; + msb.extend(pulled); + lsb.resize(smallest); + } + msb +} + +/// Incrementally builds a [`GeoDiffCount`] from a known number of pushes. +/// +/// Hashes are added one at a time via [`Self::push_hash`] / [`Self::push`], or in bulk via +/// [`Self::extend_by_hashes`]. Reserve the expected number of pushes with [`Self::with_capacity`] +/// so the dense/sparse split can be estimated and the buffers presized. The most-significant +/// buckets accumulate in a plain vector without enforcing the `max_msb_len` limit; that limit, and +/// the filter invariants, are applied only once when [`Self::build`] turns the builder into a +/// [`GeoDiffCount`]. Pushing more (or fewer) hashes than reserved stays correct — only the presizing +/// is then less accurate. If the final count is not known up front, call [`Self::reserve`] as it +/// grows. +pub struct GeoDiffCountBuilder> { + config: C, + /// Running total of pushes reserved for; drives the split estimate. + expected: usize, + /// Buckets at or above `split` accumulate in `numbers` (with duplicates, and transiently some + /// below `split` after a [`GeoDiffCountBuilder::reserve`]); buckets below `split` are folded + /// (xor) into `blocks`. [`GeoDiffCountBuilder::cleanup`] reconciles the two. + split: usize, + numbers: Vec, + blocks: Vec, +} + +impl> GeoDiffCountBuilder { + /// Creates a builder reserving space for roughly `expected` pushes. + /// + /// `expected` only positions the dense/sparse split; the `numbers` buffer is a fixed + /// `2 * max_msb_len` working set that is compacted in place once full (see [`Self::push_hash`]), + /// so it never needs to be sized to the number of pushes. + pub fn with_capacity(config: C, expected: usize) -> Self { + let split = estimate_split_bucket(&config, expected); + let capacity = 2 * config.max_msb_len(); + Self { + config, + expected, + split, + numbers: Vec::with_capacity(capacity), + blocks: vec![0; split.div_ceil(BITS_PER_BLOCK)], + } + } + + /// Reserves space for `additional` further pushes. + /// + /// This only advances the estimated split (growing the bit space to match) so that subsequent + /// pushes of low buckets fold straight into the bits. The numbers already collected below the + /// new split are *not* migrated here — they are folded in lazily the next time the buffer is + /// compacted or built (see [`Self::cleanup`]). The resulting filter is unaffected. + pub fn reserve(&mut self, additional: usize) { + self.expected = self.expected.saturating_add(additional); + let new_split = estimate_split_bucket(&self.config, self.expected); + if new_split > self.split { + self.split = new_split; + self.blocks.resize(new_split.div_ceil(BITS_PER_BLOCK), 0); + } + } + + /// Sorts `numbers` and reduces it to the distinct buckets that still belong above the split: + /// even occurrences cancel (xor), and any bucket below the current split is folded into the bit + /// space. Afterwards `numbers` is sorted in descending order with no duplicates and no entries + /// below `split`. Shared by [`Self::compact`] and [`Self::build`]. + fn cleanup(&mut self) { + self.numbers.sort_unstable_by(|a, b| b.cmp(a)); + let split = self.split; + let blocks = &mut self.blocks; + let numbers = &mut self.numbers; + let mut write = 0; + let mut read = 0; + while read < numbers.len() { + let bucket = numbers[read]; + let mut next = read + 1; + while next < numbers.len() && numbers[next] == bucket { + next += 1; + } + // An odd number of occurrences leaves the bucket set; an even number cancels. + if (next - read) % 2 == 1 { + if bucket < split { + let (index, bit) = bucket.into_index_and_bit(); + blocks[index] ^= bit.into_block(); + } else { + numbers[write] = bucket; + write += 1; + } + } + read = next; + } + numbers.truncate(write); + } + + /// Processes a full `numbers` buffer in place rather than letting it grow. [`Self::cleanup`] + /// first collapses duplicates and any sub-split entries; if that already frees half the buffer + /// the split stays put. Otherwise the split is advanced in whole levels — each level halves the + /// expected number of buckets at or above it — until at most half the buffer remains, folding + /// the now-sub-split buckets into the bit space. The buffer is therefore never reallocated. + fn compact(&mut self) { + let target = self.numbers.capacity() / 2; + self.cleanup(); + if self.numbers.len() <= target { + return; + } + // `numbers` is sorted descending, so the count at or above a split is a prefix length. + let bits_per_level = self.config.bits_per_level(); + let mut new_split = self.split; + let mut keep = self.numbers.len(); + while keep > target { + new_split += bits_per_level; + keep = self.numbers.partition_point(|&b| b >= new_split); + } + self.blocks.resize(new_split.div_ceil(BITS_PER_BLOCK), 0); + let blocks = &mut self.blocks; + for &bucket in &self.numbers[keep..] { + let (index, bit) = bucket.into_index_and_bit(); + blocks[index] ^= bit.into_block(); + } + self.numbers.truncate(keep); + self.split = new_split; + } + + /// Adds the given hash to the filter being built. + #[inline] + pub fn push_hash(&mut self, hash: u64) { + let bucket = self.config.hash_to_bucket(hash).into_usize(); + if bucket >= self.split { + // Compact the buffer in place once it is full rather than reallocating it. Compacting + // may advance the split past this bucket, in which case it lands in `numbers` below the + // split; the next `cleanup` simply folds it into the bits, so this stays correct. + if self.numbers.len() == self.numbers.capacity() { + self.compact(); + } + self.numbers.push(bucket); + } else { + // `bucket < split`, so the block index is always in range; toggling cancels repeats. + let (index, bit) = bucket.into_index_and_bit(); + self.blocks[index] ^= bit.into_block(); + } + } + + /// Adds the hash of the given item, computed with the configured hasher, to the filter. + pub fn push(&mut self, item: I) { + let build_hasher = C::BuildHasher::default(); + self.push_hash(build_hasher.hash_one(item)); + } + + /// Inserts a batch of hashes, reserving room for them up front via the size estimator. + /// + /// Unlike a loop of [`Self::push_hash`] calls — which must re-resolve `self` on every call — + /// this folds the dense low buckets into the bit space in a tight loop that hoists the bit + /// storage out of the per-hash work, only re-acquiring it after the rare in-place compaction. + /// It can be mixed freely with [`Self::push_hash`], and further pushes remain possible after. + pub fn extend_by_hashes(&mut self, mut hashes: impl ExactSizeIterator) { + self.reserve(hashes.len()); + loop { + let split = self.split; + let filled = { + let config = &self.config; + let blocks = &mut self.blocks; + let numbers = &mut self.numbers; + let mut filled = false; + for hash in hashes.by_ref() { + let bucket = config.hash_to_bucket(hash).into_usize(); + if bucket >= split { + numbers.push(bucket); + // Stop exactly at capacity so the buffer is never reallocated. + if numbers.len() == numbers.capacity() { + filled = true; + break; + } + } else { + let (index, bit) = bucket.into_index_and_bit(); + blocks[index] ^= bit.into_block(); + } + } + filled + }; + // The iterator is either exhausted or the buffer filled; compact and continue if full. + if !filled { + break; + } + self.compact(); + } + } + + /// Finalizes the builder into a [`GeoDiffCount`], applying the `max_msb_len` constraint and + /// re-establishing the filter invariants. + pub fn build(mut self) -> GeoDiffCount<'static, C> { + let max_msb_len = self.config.max_msb_len(); + // `cleanup` leaves `numbers` sorted descending, deduplicated, and free of sub-split entries. + self.cleanup(); + let mut lsb = BitVec::from_blocks(self.blocks, self.split); + let msb = split_into_msb( + self.numbers.iter().map(|&b| C::BucketType::from_usize(b)), + &mut lsb, + max_msb_len, + ); + let result = GeoDiffCount { + config: self.config, + msb: Cow::from(msb), + lsb, + }; + result.debug_assert_invariants(); + result + } } impl> Count for GeoDiffCount<'_, C> { @@ -672,101 +802,103 @@ mod tests { assert_eq!(m.iter_ones().count(), 101); } - /// Building a filter from an iterator of hashes (via [`GeoDiffCount::extend_by_hashes`] on an - /// empty filter) must produce exactly the same filter as pushing those hashes one by one. + /// Building a filter via `GeoDiffCountBuilder` must produce exactly the same filter as pushing + /// the hashes one by one, regardless of how accurately the capacity was reserved. #[test] - fn test_extend_from_empty() { - fn assert_extend_matches + Default>(hashes: &[u64], n: usize) { - let mut expected: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); + fn test_builder() { + fn assert_builder_matches + Default>(hashes: &[u64], reserve: usize) { + let mut expected: GeoDiffCount<'static, C> = GeoDiffCount::new(C::default()); for &hash in hashes { expected.push_hash(hash); } - let mut actual: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); - actual.extend_by_hashes(hashes.iter().copied()); - assert_eq!(expected, actual, "filter mismatch for n = {n}"); + let mut builder = GeoDiffCountBuilder::with_capacity(C::default(), reserve); + for &hash in hashes { + builder.push_hash(hash); + } + let actual = builder.build(); + let label = (hashes.len(), reserve); + assert_eq!(expected, actual, "filter mismatch for {label:?}"); assert_eq!( expected.iter_ones().collect_vec(), actual.iter_ones().collect_vec(), - "ones mismatch for n = {n}", + "ones mismatch for {label:?}", ); } + // Starts with a tiny reservation and grows it while pushing, which moves the split forward + // and exercises the number-migration path in `reserve`. + fn assert_grown_builder_matches + Default>(hashes: &[u64]) { + let mut expected: GeoDiffCount<'static, C> = GeoDiffCount::new(C::default()); + for &hash in hashes { + expected.push_hash(hash); + } + let mut builder = GeoDiffCountBuilder::with_capacity(C::default(), 1); + for (i, &hash) in hashes.iter().enumerate() { + if i % 64 == 0 { + builder.reserve(64); + } + builder.push_hash(hash); + } + assert_eq!(expected, builder.build(), "grown builder mismatch"); + } + prng_test_harness(4, |rnd| { for n in [0usize, 1, 5, 50, 500, 5000, 50000] { - // Draw from a smaller pool so that buckets are hit multiple times, exercising the - // even-occurrence cancellation on both the dense bits and the sparse numbers path. let pool: Vec = (0..n.div_ceil(2).max(1)).map(|_| rnd.next_u64()).collect(); let hashes: Vec = (0..n) .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) .collect(); - - assert_extend_matches::(&hashes, n); - assert_extend_matches::(&hashes, n); + // Reserve exactly, far too little (split too low), and far too much (split too high). + assert_builder_matches::(&hashes, n); + assert_builder_matches::(&hashes, n); + assert_builder_matches::(&hashes, n / 4); + assert_builder_matches::(&hashes, n * 4); + // Reserve nothing so the split starts at 0 and every bucket initially lands in + // `numbers`, forcing repeated compaction once the fixed-size buffer fills. This + // hammers the lazy-flush path, including buckets that land below the split a + // compaction just advanced past. + assert_builder_matches::(&hashes, 0); + assert_builder_matches::(&hashes, 0); + assert_grown_builder_matches::(&hashes); + assert_grown_builder_matches::(&hashes); } }); } - /// Extending an existing filter with a batch of hashes must produce exactly the same filter as - /// pushing the existing hashes and the batch one by one. + /// `GeoDiffCountBuilder::extend_by_hashes` (alone, or mixed with `push_hash`) must produce + /// exactly the same filter as pushing every hash one by one. #[test] - fn test_extend_by_hashes() { - fn assert_extend_matches + Default>(existing: &[u64], batch: &[u64]) { - let mut expected: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); - for &hash in existing.iter().chain(batch) { + fn test_builder_extend() { + fn assert_extend_matches + Default>(hashes: &[u64]) { + let mut expected: GeoDiffCount<'static, C> = GeoDiffCount::new(C::default()); + for &hash in hashes { expected.push_hash(hash); } - let mut actual: GeoDiffCount<'_, C> = GeoDiffCount::new(C::default()); - for &hash in existing { - actual.push_hash(hash); - } - actual.extend_by_hashes(batch.iter().copied()); + // Extend a fresh builder in one batch (auto-reserves for the batch size). + let mut batched = GeoDiffCountBuilder::with_capacity(C::default(), 0); + batched.extend_by_hashes(hashes.iter().copied()); + assert_eq!(expected, batched.build(), "extend-from-empty mismatch"); - let label = (existing.len(), batch.len()); - assert_eq!(expected, actual, "filter mismatch for {label:?}"); - assert_eq!( - expected.iter_ones().collect_vec(), - actual.iter_ones().collect_vec(), - "ones mismatch for {label:?}", - ); + // Push a prefix one by one, then extend with the remainder. + let mid = hashes.len() / 2; + let mut mixed = GeoDiffCountBuilder::with_capacity(C::default(), 0); + for &hash in &hashes[..mid] { + mixed.push_hash(hash); + } + mixed.extend_by_hashes(hashes[mid..].iter().copied()); + assert_eq!(expected, mixed.build(), "push+extend mismatch"); } prng_test_harness(4, |rnd| { - // Mix of: empty filter, empty batch, small/large existing, and inserting many more - // hashes than the filter currently holds (exercising the split-growth path). - for (e, b) in [ - (0usize, 0usize), - (0, 500), - (500, 0), - (500, 500), - (5000, 200), - (200, 50000), - (100, 200000), - (5000, 5000), - ] { - // Draw existing and batch from a shared pool so they overlap, exercising the xor - // cancellation (removal) that insertion performs on already-set buckets. - let pool: Vec = (0..(e + b).div_ceil(2).max(1)) - .map(|_| rnd.next_u64()) + for n in [0usize, 1, 5, 50, 500, 5000, 50000] { + // Draw from a smaller pool so buckets repeat, exercising xor cancellation. + let pool: Vec = (0..n.div_ceil(2).max(1)).map(|_| rnd.next_u64()).collect(); + let hashes: Vec = (0..n) + .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) .collect(); - let draw = |count: usize, rnd: &mut ChaCha12Rng| -> Vec { - (0..count) - .map(|_| *pool.iter().choose(rnd).expect("pool is non-empty")) - .collect() - }; - let existing = draw(e, rnd); - let batch = draw(b, rnd); - - assert_extend_matches::(&existing, &batch); - assert_extend_matches::(&existing, &batch); - - // Re-inserting (a prefix of) the existing hashes cancels those buckets, draining - // the msb below capacity and exercising the refill-from-bit-vector path. - let subset = &existing[..existing.len() / 2]; - assert_extend_matches::(&existing, subset); - assert_extend_matches::(&existing, subset); - assert_extend_matches::(&existing, &existing); - assert_extend_matches::(&existing, &existing); + assert_extend_matches::(&hashes); + assert_extend_matches::(&hashes); } }); } diff --git a/crates/geo_filters/src/diff_count/bitvec.rs b/crates/geo_filters/src/diff_count/bitvec.rs index 1317868..42c873c 100644 --- a/crates/geo_filters/src/diff_count/bitvec.rs +++ b/crates/geo_filters/src/diff_count/bitvec.rs @@ -47,6 +47,18 @@ impl BitVec<'_> { result } + /// Wraps raw `blocks` covering `[0, num_bits)` into a `BitVec`. The number of blocks must match + /// `num_bits`, and any bits at or above `num_bits` are cleared. + pub fn from_blocks(blocks: Vec, num_bits: usize) -> BitVec<'static> { + debug_assert_eq!(blocks.len(), num_bits.div_ceil(BITS_PER_BLOCK)); + let mut result = BitVec { + num_bits, + blocks: Cow::Owned(blocks), + }; + result.clear_superfluous_bits(); + result + } + /// Resize the vector such that the top block contains the given bucket. pub fn resize(&mut self, num_bits: usize) { let num_blocks = num_bits.div_ceil(BITS_PER_BLOCK);