Skip to content

Commit

Permalink
[ENH] Handle deletes and updates for count API (#2185)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - New functionality
Updates, deletes and duplicate adds are handled for the count API in
this PR.

## Test plan
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None

---------

Co-authored-by: Sanket Kedia <[email protected]>
  • Loading branch information
sanketkedia and sanketkedia committed May 14, 2024
1 parent cf7ad5c commit 37a030c
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 34 deletions.
16 changes: 16 additions & 0 deletions rust/worker/src/blockstore/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,22 @@ impl<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>> ArrowBlockfileRe
}
}

pub(crate) async fn contains(&'me self, prefix: &str, key: K) -> bool {
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
let block = self.get_block(target_block_id).await;
let res: Option<V> = match block {
Some(block) => block.get(prefix, key),
None => {
return false;
}
};
match res {
Some(_) => true,
None => false,
}
}

// Count the total number of records.
pub(crate) async fn count(&self) -> Result<usize, Box<dyn ChromaError>> {
let mut block_ids: Vec<Uuid> = vec![];
Expand Down
18 changes: 16 additions & 2 deletions rust/worker/src/blockstore/memory/reader_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ pub(crate) struct MemoryBlockfileWriter {
id: uuid::Uuid,
}

pub(crate) struct MemoryBlockfileFlusher {
id: uuid::Uuid,
}

impl MemoryBlockfileFlusher {
pub(crate) fn id(&self) -> uuid::Uuid {
self.id
}
}

impl MemoryBlockfileWriter {
pub(super) fn new(storage_manager: StorageManager) -> Self {
let builder = storage_manager.create();
Expand All @@ -22,9 +32,9 @@ impl MemoryBlockfileWriter {
}
}

pub(crate) fn commit(&self) -> Result<(), Box<dyn ChromaError>> {
pub(crate) fn commit(&self) -> Result<MemoryBlockfileFlusher, Box<dyn ChromaError>> {
self.storage_manager.commit(self.builder.id);
Ok(())
Ok(MemoryBlockfileFlusher { id: self.id })
}

pub(crate) fn set<K: Key + Into<KeyWrapper>, V: Value + Writeable>(
Expand Down Expand Up @@ -171,6 +181,10 @@ impl<
V::count(&self.storage)
}

pub(crate) fn contains(&'storage self, prefix: &str, key: K) -> bool {
V::contains(prefix, key.into(), &self.storage)
}

pub(crate) fn id(&self) -> uuid::Uuid {
self.storage.id
}
Expand Down
52 changes: 52 additions & 0 deletions rust/worker/src/blockstore/memory/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub(crate) trait Readable<'referred_data>: Sized {
) -> Vec<(&'referred_data CompositeKey, Self)>;

fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>>;

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool;
}

impl Writeable for &str {
Expand Down Expand Up @@ -163,6 +165,16 @@ impl<'referred_data> Readable<'referred_data> for &'referred_data str {
fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>> {
Ok(storage.string_value_storage.iter().len())
}

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool {
storage
.string_value_storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.is_some()
}
}

// TODO: remove this and make this all use a unified storage so we don't have two impls
Expand Down Expand Up @@ -273,6 +285,16 @@ impl<'referred_data> Readable<'referred_data> for Int32Array {
fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>> {
Ok(storage.int32_array_storage.iter().len())
}

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool {
storage
.int32_array_storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.is_some()
}
}

impl Writeable for &RoaringBitmap {
Expand Down Expand Up @@ -382,6 +404,16 @@ impl<'referred_data> Readable<'referred_data> for RoaringBitmap {
fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>> {
Ok(storage.roaring_bitmap_storage.iter().len())
}

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool {
storage
.roaring_bitmap_storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.is_some()
}
}

impl Writeable for u32 {
Expand Down Expand Up @@ -486,6 +518,16 @@ impl<'referred_data> Readable<'referred_data> for u32 {
fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>> {
Ok(storage.u32_storage.iter().len())
}

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool {
storage
.u32_storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.is_some()
}
}

impl Writeable for &DataRecord<'_> {
Expand Down Expand Up @@ -688,6 +730,16 @@ impl<'referred_data> Readable<'referred_data> for DataRecord<'referred_data> {
fn count(storage: &Storage) -> Result<usize, Box<dyn ChromaError>> {
Ok(storage.data_record_id_storage.iter().len())
}

fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool {
storage
.data_record_id_storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.is_some()
}
}

#[derive(Clone)]
Expand Down
18 changes: 13 additions & 5 deletions rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use super::arrow::types::{
ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue,
};
use super::key::KeyWrapper;
use super::memory::reader_writer::{MemoryBlockfileReader, MemoryBlockfileWriter};
use super::memory::reader_writer::{
MemoryBlockfileFlusher, MemoryBlockfileReader, MemoryBlockfileWriter,
};
use super::memory::storage::{Readable, Writeable};
use super::PositionalPostingList;
use crate::errors::{ChromaError, ErrorCodes};
Expand Down Expand Up @@ -197,7 +199,7 @@ impl BlockfileWriter {
) -> Result<BlockfileFlusher, Box<dyn ChromaError>> {
match self {
BlockfileWriter::MemoryBlockfileWriter(writer) => match writer.commit() {
Ok(_) => Ok(BlockfileFlusher::MemoryBlockfileFlusher(())),
Ok(flusher) => Ok(BlockfileFlusher::MemoryBlockfileFlusher(flusher)),
Err(e) => Err(e),
},
BlockfileWriter::ArrowBlockfileWriter(writer) => match writer.commit::<K, V>() {
Expand Down Expand Up @@ -247,7 +249,7 @@ impl BlockfileWriter {
}

pub(crate) enum BlockfileFlusher {
MemoryBlockfileFlusher(()),
MemoryBlockfileFlusher(MemoryBlockfileFlusher),
ArrowBlockfileFlusher(ArrowBlockfileFlusher),
}

Expand All @@ -266,8 +268,7 @@ impl BlockfileFlusher {

pub(crate) fn id(&self) -> uuid::Uuid {
match self {
// TODO: should memory blockfiles have ids?
BlockfileFlusher::MemoryBlockfileFlusher(_) => uuid::Uuid::nil(),
BlockfileFlusher::MemoryBlockfileFlusher(flusher) => flusher.id(),
BlockfileFlusher::ArrowBlockfileFlusher(flusher) => flusher.id(),
}
}
Expand Down Expand Up @@ -302,6 +303,13 @@ impl<
}
}

pub(crate) async fn contains(&'referred_data self, prefix: &str, key: K) -> bool {
match self {
BlockfileReader::ArrowBlockfileReader(reader) => reader.contains(prefix, key).await,
BlockfileReader::MemoryBlockfileReader(reader) => reader.contains(prefix, key),
}
}

pub(crate) async fn count(&'referred_data self) -> Result<usize, Box<dyn ChromaError>> {
match self {
BlockfileReader::MemoryBlockfileReader(reader) => reader.count(),
Expand Down

0 comments on commit 37a030c

Please sign in to comment.