Skip to content

Commit

Permalink
[WIP] Compressed posting lists
Browse files Browse the repository at this point in the history
  • Loading branch information
xzfc committed Apr 30, 2024
1 parent c173a9f commit c8e4939
Show file tree
Hide file tree
Showing 18 changed files with 1,465 additions and 109 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ debug = true
inherits = "release"
lto = false
opt-level = 3
codegen-units = 16

[patch.crates-io]
# Temporary patch until <https://github.com/hyperium/tonic/pull/1401> is merged
Expand Down
2 changes: 2 additions & 0 deletions lib/segment/benches/sparse_index_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn sparse_vector_index_build_benchmark(c: &mut Criterion) {
sparse_vector_index.build_index(permit, &stopped).unwrap();

// intent: measure mmap conversion time
/* XXX(xzfc): mmap disabled for now
group.bench_function("convert-mmap-index", |b| {
b.iter(|| {
let mmap_index_dir = Builder::new().prefix("mmap_index_dir").tempdir().unwrap();
Expand All @@ -116,6 +117,7 @@ fn sparse_vector_index_build_benchmark(c: &mut Criterion) {
assert_eq!(mmap_inverted_index.vector_count(), NUM_VECTORS);
})
});
*/

group.finish();
}
Expand Down
13 changes: 12 additions & 1 deletion lib/segment/benches/sparse_index_search.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(unused_imports)]

#[cfg(not(target_os = "windows"))]
mod prof;

Expand Down Expand Up @@ -66,12 +68,16 @@ fn sparse_vector_index_search_benchmark(c: &mut Criterion) {
let query_vector = vector.into();

let permit_cpu_count = num_rayon_threads(0);
#[cfg(any())]
let permit = Arc::new(CpuPermit::dummy(permit_cpu_count as u32));

// mmap inverted index
#[cfg(any())]
let mmap_index_dir = Builder::new().prefix("mmap_index_dir").tempdir().unwrap();
#[cfg(any())]
let sparse_index_config =
SparseIndexConfig::new(Some(FULL_SCAN_THRESHOLD), SparseIndexType::Mmap);
#[cfg(any())]
let mut sparse_vector_index_mmap: SparseVectorIndex<InvertedIndexMmap> =
SparseVectorIndex::open(
sparse_index_config,
Expand All @@ -82,12 +88,16 @@ fn sparse_vector_index_search_benchmark(c: &mut Criterion) {
&stopped,
)
.unwrap();
#[cfg(any())]
sparse_vector_index_mmap
.build_index(permit, &stopped)
.unwrap();
#[cfg(any())]
assert_eq!(sparse_vector_index_mmap.indexed_vector_count(), NUM_VECTORS);

// intent: bench `search` without filter on mmap inverted index
// disabled
#[cfg(any())]
group.bench_function("mmap-inverted-index-search", |b| {
b.iter(|| {
let results = sparse_vector_index_mmap
Expand Down Expand Up @@ -198,7 +208,8 @@ fn sparse_vector_index_search_benchmark(c: &mut Criterion) {
#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(prof::FlamegraphProfiler::new(100));
// config = Criterion::default().with_profiler(prof::FlamegraphProfiler::new(100));
config = Criterion::default();
targets = sparse_vector_index_search_benchmark
}

Expand Down
2 changes: 1 addition & 1 deletion lib/segment/src/index/sparse_index/sparse_vector_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<TInvertedIndex: InvertedIndex> SparseVectorIndex<TInvertedIndex> {
for dim_id in query_vector.indices.iter() {
if let Some(dim_id) = self.indices_tracker.remap_index(*dim_id) {
if let Some(posting_list) = self.inverted_index.get(&dim_id) {
for element in posting_list.elements.iter() {
for element in posting_list {
unique_record_ids.insert(element.record_id);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use common::cpu::CpuPermit;
use common::types::{PointOffsetType, TelemetryDetail};
use itertools::Itertools;
use rand::rngs::StdRng;
use rand::SeedableRng;
use segment::common::operation_error::OperationResult;
Expand Down Expand Up @@ -173,13 +174,12 @@ fn check_index_storage_consistency<T: InvertedIndex>(sparse_vector_index: &Spars
let posting_list = sparse_vector_index.inverted_index.get(dim_id).unwrap();
// assert posting list sorted by record id
assert!(posting_list
.elements
.windows(2)
.all(|w| w[0].record_id < w[1].record_id));
.clone()
.tuple_windows()
.all(|(w0, w1)| w0.record_id < w1.record_id));
// assert posted list contains record id
assert!(posting_list
.elements
.iter()
.clone()
.any(|e| e.record_id == id && e.weight == *dim_value));
}
// check the vector can be found via search using large top
Expand Down
11 changes: 11 additions & 0 deletions lib/sparse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,15 @@ ordered-float = "4.2"
rand = "0.8.5"
validator = { workspace = true }
itertools = "0.12.1"
log = "0.4"
parking_lot = "0.12.1"
bitpacking = "0.9.2"
indicatif = "0.17.8"

[dev-dependencies]
criterion = "0.5"
pprof = { version = "0.12", features = ["flamegraph", "prost-codec"] }

[[bench]]
name = "search"
harness = false
89 changes: 89 additions & 0 deletions lib/sparse/benches/prof.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::fs::File;
use std::io::Write;
use std::os::raw::c_int;
use std::path::Path;

use criterion::profiler::Profiler;
use pprof::flamegraph::TextTruncateDirection;
use pprof::protos::Message;
use pprof::ProfilerGuard;

/// Small custom profiler that can be used with Criterion to create a flamegraph for benchmarks.
/// Also see [the Criterion documentation on this][custom-profiler].
///
/// ## Example on how to enable the custom profiler:
///
/// ```
/// mod perf;
/// use perf::FlamegraphProfiler;
///
/// fn fibonacci_profiled(criterion: &mut Criterion) {
/// // Use the criterion struct as normal here.
/// }
///
/// fn custom() -> Criterion {
/// Criterion::default().with_profiler(FlamegraphProfiler::new())
/// }
///
/// criterion_group! {
/// name = benches;
/// config = custom();
/// targets = fibonacci_profiled
/// }
/// ```
///
/// The neat thing about this is that it will sample _only_ the benchmark, and not other stuff like
/// the setup process.
///
/// Further, it will only kick in if `--profile-time <time>` is passed to the benchmark binary.
/// A flamegraph will be created for each individual benchmark in its report directory under
/// `profile/flamegraph.svg`.
///
/// [custom-profiler]: https://bheisler.github.io/criterion.rs/book/user_guide/profiling.html#implementing-in-process-profiling-hooks
pub struct FlamegraphProfiler<'a> {
frequency: c_int,
active_profiler: Option<ProfilerGuard<'a>>,
}

impl<'a> FlamegraphProfiler<'a> {
#[allow(dead_code)]
pub fn new(frequency: c_int) -> Self {
FlamegraphProfiler {
frequency,
active_profiler: None,
}
}
}

impl<'a> Profiler for FlamegraphProfiler<'a> {
fn start_profiling(&mut self, _benchmark_id: &str, _benchmark_dir: &Path) {
self.active_profiler = Some(ProfilerGuard::new(self.frequency).unwrap());
}

fn stop_profiling(&mut self, _benchmark_id: &str, benchmark_dir: &Path) {
std::fs::create_dir_all(benchmark_dir).unwrap();
let pprof_path = benchmark_dir.join("profile.pb");
let flamegraph_path = benchmark_dir.join("flamegraph.svg");
eprintln!("\nflamegraph_path = {flamegraph_path:#?}");
let flamegraph_file = File::create(&flamegraph_path)
.expect("File system error while creating flamegraph.svg");
let mut options = pprof::flamegraph::Options::default();
options.hash = true;
options.image_width = Some(2500);
options.text_truncate_direction = TextTruncateDirection::Left;
options.font_size /= 3;
if let Some(profiler) = self.active_profiler.take() {
let report = profiler.report().build().unwrap();

let mut file = File::create(pprof_path).unwrap();
let profile = report.pprof().unwrap();
let mut content = Vec::new();
profile.encode(&mut content).unwrap();
file.write_all(&content).unwrap();

report
.flamegraph_with_options(flamegraph_file, &mut options)
.expect("Error writing flamegraph");
}
}
}
113 changes: 113 additions & 0 deletions lib/sparse/benches/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;

use common::types::PointOffsetType;
use criterion::measurement::Measurement;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use rand::rngs::StdRng;
use rand::SeedableRng as _;
use sparse::common::scores_memory_pool::ScoresMemoryPool;
use sparse::common::sparse_vector::RemappedSparseVector;
use sparse::common::sparse_vector_fixture::{random_positive_sparse_vector, random_sparse_vector};
use sparse::index::csr;
use sparse::index::inverted_index::inverted_index_ram_builder::InvertedIndexBuilder;
use sparse::index::search_context::SearchContext;
mod prof;

const NUM_QUERIES: usize = 2048;
const MAX_SPARSE_DIM: usize = 30_000;
const TOP: usize = 10;

pub fn bench_search(c: &mut Criterion) {
let mut group = c.benchmark_group("search");

bench_search_random(&mut group, "random_50k", 50_000);
bench_search_random(&mut group, "random_500k", 500_000);

bench_search_msmarco(&mut group, "msmarco_1M", "base_1M.csr", 1.0);
bench_search_msmarco(&mut group, "msmarco_full_0.25", "base_full.csr", 0.25);
}

fn bench_search_random<M: Measurement>(c: &mut BenchmarkGroup<M>, name: &str, num_vectors: usize) {
let mut rnd = StdRng::seed_from_u64(42);

// index
let mut builder = InvertedIndexBuilder::new();
for idx in 0..num_vectors {
let vec = random_sparse_vector(&mut rnd, MAX_SPARSE_DIM);
builder.add(
idx as PointOffsetType,
RemappedSparseVector::new(vec.indices, vec.values).unwrap(),
);
}
let index = builder.build();

let query_vectors = (0..NUM_QUERIES)
.map(|_| {
let vector = random_positive_sparse_vector(&mut rnd, MAX_SPARSE_DIM);
RemappedSparseVector::new(vector.indices, vector.values).unwrap()
})
.collect::<Vec<_>>();
let mut it = query_vectors.iter().cycle();

let pool = ScoresMemoryPool::new();
let stopped = AtomicBool::new(false);

c.bench_function(name, |b| {
b.iter(|| {
SearchContext::new(
it.next().unwrap().clone(),
TOP,
&index,
pool.get(),
&stopped,
)
.search(&|_| true)
})
});
}

pub fn bench_search_msmarco<M: Measurement>(
c: &mut BenchmarkGroup<M>,
name: &str,
dataset: &str,
ratio: f32,
) {
let base_dir = PathBuf::from(std::env::var("MSMARCO_DIR").unwrap());

let index = csr::load_index(base_dir.join(dataset), ratio).unwrap();
let query_vectors = csr::load_vec(base_dir.join("queries.dev.csr")).unwrap();
let mut it = query_vectors.iter().cycle();

let pool = ScoresMemoryPool::new();
let stopped = AtomicBool::new(false);

c.bench_function(name, |b| {
b.iter(|| {
SearchContext::new(
it.next().unwrap().clone(),
TOP,
&index,
pool.get(),
&stopped,
)
.search(&|_| true)
})
});
}

#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(prof::FlamegraphProfiler::new(100));
targets = bench_search,
}

#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default();
targets = bench_search,
}

criterion_main!(benches);

0 comments on commit c8e4939

Please sign in to comment.