Skip to content

Commit

Permalink
Add log cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 25, 2024
1 parent ed0d578 commit 49712f9
Show file tree
Hide file tree
Showing 14 changed files with 1,047 additions and 71 deletions.
659 changes: 603 additions & 56 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ aws-smithy-types = "1.1.0"
aws-config = { version = "1.1.2", features = ["behavior-version-latest"] }
arrow = "50.0.0"
roaring = "0.10.3"
tantivy = "0.21.1"
tracing = "0.1"
tracing-subscriber = "0.3"
tantivy = "0.22.0"
foyer = "0.8"
foyer-memory = "0.3.2"
ahash = "0.8.6"
once_cell = "1.8.0"

[dev-dependencies]
proptest = "1.4.0"
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ query_service:
storage:
S3:
bucket: "chroma-storage"
log_cache:
capacity: 100000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "logservice.chroma"
Expand Down
13 changes: 13 additions & 0 deletions rust/worker/src/cache/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub(crate) enum EvictionConfig {
Lru,
}

#[derive(Deserialize)]
pub(crate) struct LogCacheConfig {
pub capacity: usize,
pub shard_num: usize,
pub eviction: EvictionConfig,
}
242 changes: 242 additions & 0 deletions rust/worker/src/cache/log_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use crate::cache::config::EvictionConfig;
use crate::cache::config::LogCacheConfig;
use crate::types::LogRecord;
use ahash::RandomState;
use foyer::CacheContext;
use foyer::LruConfig;
use foyer_memory::Cache;
use foyer_memory::CacheBuilder;
use foyer_memory::CacheEntry;
use foyer_memory::CacheEventListener;
use foyer_memory::DefaultCacheEventListener;
use once_cell::sync::OnceCell;
use std::cmp::Ord;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

static LOG_CACHE: OnceCell<LogCache> = OnceCell::new();

pub fn init(config: &LogCacheConfig) {
if LOG_CACHE.set(LogCache::new(&config)).is_err() {
panic!("log cache already initialized");
}
}

pub fn get() -> &'static LogCache {
LOG_CACHE.get().expect("log cache not initialized")
}

#[derive(Eq, PartialEq, Hash, Debug)]
pub(crate) struct LogCacheKey {
pub(crate) collection_id: String,
pub(crate) start_log_offset: i64,
}

impl Ord for LogCacheKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.collection_id
.cmp(&other.collection_id)
.then(self.start_log_offset.cmp(&other.start_log_offset))
}
}

impl PartialOrd for LogCacheKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

pub(crate) type LogCacheEntry = CacheEntry<LogCacheKey, Arc<[LogRecord]>, LogCacheEventListener>;

pub(crate) struct LogCacheEventListener {}

impl CacheEventListener<LogCacheKey, Arc<[LogRecord]>> for LogCacheEventListener {
fn on_release(
&self,
key: Arc<LogCacheKey>,
value: Arc<Arc<[LogRecord]>>,
context: CacheContext,
charges: usize,
) {
println!(
"LogCacheEventListener::on_release: key: {:?}, value: {:?}, context: {:?}, charges: {}",
key, value, context, charges
);
}
}

pub(crate) struct LogCache {
cache: Cache<LogCacheKey, Arc<[LogRecord]>, LogCacheEventListener>,
}

impl Debug for LogCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogCache").finish()
}
}

pub(crate) struct EntryDataHandle {
pub handle: LogCacheEntry,
}

impl EntryDataHandle {
pub fn new(entry: LogCacheEntry) -> EntryDataHandle {
EntryDataHandle { handle: entry }
}
}

impl LogCache {
pub fn new(config: &LogCacheConfig) -> LogCache {
let cache = CacheBuilder::<
LogCacheKey,
Arc<[LogRecord]>,
DefaultCacheEventListener<LogCacheKey, Arc<[LogRecord]>>,
RandomState,
>::new(config.capacity)
.with_event_listener(LogCacheEventListener {})
.with_eviction_config(match config.eviction {
EvictionConfig::Lru => LruConfig {
high_priority_pool_ratio: 0.1,
},
})
.with_shards(config.shard_num)
.build();
LogCache { cache }
}

pub fn get(&self, collection_id: String, start_log_offset: i64) -> Option<EntryDataHandle> {
let key = LogCacheKey {
collection_id,
start_log_offset,
};
let value = self.cache.get(&key);
match value {
Some(value) => Some(EntryDataHandle::new(value)),
None => None,
}
}

pub fn insert(
&self,
collection_id: String,
start_log_offset: i64,
logs: Arc<[LogRecord]>,
) -> EntryDataHandle {
let key = LogCacheKey {
collection_id,
start_log_offset,
};
let entry = self.cache.insert(key, logs);
EntryDataHandle::new(entry)
}

pub fn usage(&self) -> usize {
self.cache.usage()
}

pub fn capacity(&self) -> usize {
self.cache.capacity()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::Operation;
use crate::types::OperationRecord;

#[tokio::test]
async fn test_log_cache() {
let config = LogCacheConfig {
capacity: 1,
shard_num: 1,
eviction: EvictionConfig::Lru,
};
let log_cache = LogCache::new(&config);

let collection_id_1 = "collection_id".to_string();
let start_log_offset = 0;
let mut logs = vec![LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}];
log_cache.insert(
collection_id_1.clone(),
start_log_offset,
logs.clone().into(),
);
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);

assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 1);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_1");
drop(holder);

logs.append(
vec![LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}]
.as_mut(),
);
log_cache.insert(collection_id_1.clone(), start_log_offset, logs.into());
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);
assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 2);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_1");
assert_eq!(value[1].log_offset, 1);
assert_eq!(value[1].record.id, "embedding_id_2");
drop(holder);

let collection_id_2 = "collection_id_2".to_string();
let start_log_offset = 0;
let logs = vec![LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_3".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}];
log_cache.insert(
collection_id_2.clone(),
start_log_offset,
logs.clone().into(),
);
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);
assert_eq!(holder.is_none(), true);
drop(holder);

let holder = log_cache.get(collection_id_2.clone(), start_log_offset);
assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 1);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_3");
drop(holder);

assert_eq!(log_cache.capacity(), 1);
assert_eq!(log_cache.usage(), 1);
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod config;
pub(crate) mod log_cache;
17 changes: 17 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub(crate) struct QueryServiceConfig {
pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig,
pub(crate) sysdb: crate::sysdb::config::SysDbConfig,
pub(crate) storage: crate::storage::config::StorageConfig,
pub(crate) log_cache: Option<crate::cache::config::LogCacheConfig>,
pub(crate) log: crate::log::config::LogConfig,
pub(crate) dispatcher: crate::execution::config::DispatcherConfig,
}
Expand Down Expand Up @@ -168,6 +169,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -243,6 +248,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -336,6 +345,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -409,6 +422,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {

num_records_read += logs.len();
offset += batch_size as i64;
result.append(&mut logs);
result.extend_from_slice(&logs);

// We used a a timestamp and we didn't get a full batch, so we have retrieved
// the last batch of logs relevant to our query
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::system::Handler;
use crate::system::Receiver;
use crate::system::System;
use crate::types::SegmentFlushInfo;
use arrow::compute::kernels::partition;
use async_trait::async_trait;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod assignment;
mod blockstore;
mod cache;
mod compactor;
mod config;
mod distance;
Expand Down

0 comments on commit 49712f9

Please sign in to comment.