Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add log cache #2021

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
659 changes: 603 additions & 56 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ aws-smithy-types = "1.1.0"
aws-config = { version = "1.1.2", features = ["behavior-version-latest"] }
arrow = "50.0.0"
roaring = "0.10.3"
tantivy = "0.21.1"
tracing = "0.1"
tracing-subscriber = "0.3"
tantivy = "0.22.0"
foyer = "0.8"
foyer-memory = "0.3.2"
ahash = "0.8.6"
once_cell = "1.8.0"

[dev-dependencies]
proptest = "1.4.0"
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ query_service:
storage:
S3:
bucket: "chroma-storage"
log_cache:
capacity: 100000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "logservice.chroma"
Expand Down
13 changes: 13 additions & 0 deletions rust/worker/src/cache/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::Deserialize;

#[derive(Deserialize)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me support serde for EvictionConfig in foyer, then you can omit it.

pub(crate) enum EvictionConfig {
Lru,
}

#[derive(Deserialize)]
pub(crate) struct LogCacheConfig {
pub capacity: usize,
pub shard_num: usize,
pub eviction: EvictionConfig,
}
242 changes: 242 additions & 0 deletions rust/worker/src/cache/log_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use crate::cache::config::EvictionConfig;
use crate::cache::config::LogCacheConfig;
use crate::types::LogRecord;
use ahash::RandomState;
use foyer::CacheContext;
use foyer::LruConfig;
use foyer_memory::Cache;
use foyer_memory::CacheBuilder;
use foyer_memory::CacheEntry;
use foyer_memory::CacheEventListener;
use foyer_memory::DefaultCacheEventListener;
use once_cell::sync::OnceCell;
use std::cmp::Ord;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

static LOG_CACHE: OnceCell<LogCache> = OnceCell::new();

pub fn init(config: &LogCacheConfig) {
if LOG_CACHE.set(LogCache::new(&config)).is_err() {
panic!("log cache already initialized");
}
}

pub fn get() -> &'static LogCache {
LOG_CACHE.get().expect("log cache not initialized")
}

#[derive(Eq, PartialEq, Hash, Debug)]
pub(crate) struct LogCacheKey {
pub(crate) collection_id: String,
pub(crate) start_log_offset: i64,
}

impl Ord for LogCacheKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.collection_id
.cmp(&other.collection_id)
.then(self.start_log_offset.cmp(&other.start_log_offset))
}
}

impl PartialOrd for LogCacheKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

pub(crate) type LogCacheEntry = CacheEntry<LogCacheKey, Arc<[LogRecord]>, LogCacheEventListener>;

pub(crate) struct LogCacheEventListener {}

impl CacheEventListener<LogCacheKey, Arc<[LogRecord]>> for LogCacheEventListener {
fn on_release(
&self,
key: Arc<LogCacheKey>,
value: Arc<Arc<[LogRecord]>>,
context: CacheContext,
charges: usize,
) {
println!(
"LogCacheEventListener::on_release: key: {:?}, value: {:?}, context: {:?}, charges: {}",
key, value, context, charges
);
}
}

pub(crate) struct LogCache {
cache: Cache<LogCacheKey, Arc<[LogRecord]>, LogCacheEventListener>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use foyer::Cache is only in-memory is needed, otherwise, use foyer::HybridCache instead.

In short, only foyer needs to be imported.

}

impl Debug for LogCache {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll impl Debug for Cache and HybridCache, then you can use derive.

fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogCache").finish()
}
}

pub(crate) struct EntryDataHandle {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it if you only read from foyer?

pub handle: LogCacheEntry,
}

impl EntryDataHandle {
pub fn new(entry: LogCacheEntry) -> EntryDataHandle {
EntryDataHandle { handle: entry }
}
}

impl LogCache {
pub fn new(config: &LogCacheConfig) -> LogCache {
let cache = CacheBuilder::<
LogCacheKey,
Arc<[LogRecord]>,
DefaultCacheEventListener<LogCacheKey, Arc<[LogRecord]>>,
RandomState,
>::new(config.capacity)
.with_event_listener(LogCacheEventListener {})
.with_eviction_config(match config.eviction {
EvictionConfig::Lru => LruConfig {
high_priority_pool_ratio: 0.1,
},
})
.with_shards(config.shard_num)
.build();
LogCache { cache }
}

pub fn get(&self, collection_id: String, start_log_offset: i64) -> Option<EntryDataHandle> {
let key = LogCacheKey {
collection_id,
start_log_offset,
};
let value = self.cache.get(&key);
match value {
Some(value) => Some(EntryDataHandle::new(value)),
None => None,
}
}

pub fn insert(
&self,
collection_id: String,
start_log_offset: i64,
logs: Arc<[LogRecord]>,
) -> EntryDataHandle {
let key = LogCacheKey {
collection_id,
start_log_offset,
};
let entry = self.cache.insert(key, logs);
EntryDataHandle::new(entry)
}

pub fn usage(&self) -> usize {
self.cache.usage()
}

pub fn capacity(&self) -> usize {
self.cache.capacity()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::Operation;
use crate::types::OperationRecord;

#[tokio::test]
async fn test_log_cache() {
let config = LogCacheConfig {
capacity: 1,
shard_num: 1,
eviction: EvictionConfig::Lru,
};
let log_cache = LogCache::new(&config);

let collection_id_1 = "collection_id".to_string();
let start_log_offset = 0;
let mut logs = vec![LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}];
log_cache.insert(
collection_id_1.clone(),
start_log_offset,
logs.clone().into(),
);
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);

assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 1);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_1");
drop(holder);

logs.append(
vec![LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}]
.as_mut(),
);
log_cache.insert(collection_id_1.clone(), start_log_offset, logs.into());
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);
assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 2);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_1");
assert_eq!(value[1].log_offset, 1);
assert_eq!(value[1].record.id, "embedding_id_2");
drop(holder);

let collection_id_2 = "collection_id_2".to_string();
let start_log_offset = 0;
let logs = vec![LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_3".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
}];
log_cache.insert(
collection_id_2.clone(),
start_log_offset,
logs.clone().into(),
);
let holder = log_cache.get(collection_id_1.clone(), start_log_offset);
assert_eq!(holder.is_none(), true);
drop(holder);

let holder = log_cache.get(collection_id_2.clone(), start_log_offset);
assert_eq!(holder.is_some(), true);
let holder = holder.unwrap();
let value = holder.handle.value();
assert_eq!(value.len(), 1);
assert_eq!(value[0].log_offset, 0);
assert_eq!(value[0].record.id, "embedding_id_3");
drop(holder);

assert_eq!(log_cache.capacity(), 1);
assert_eq!(log_cache.usage(), 1);
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod config;
pub(crate) mod log_cache;
17 changes: 17 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub(crate) struct QueryServiceConfig {
pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig,
pub(crate) sysdb: crate::sysdb::config::SysDbConfig,
pub(crate) storage: crate::storage::config::StorageConfig,
pub(crate) log_cache: Option<crate::cache::config::LogCacheConfig>,
pub(crate) log: crate::log::config::LogConfig,
pub(crate) dispatcher: crate::execution::config::DispatcherConfig,
}
Expand Down Expand Up @@ -167,6 +168,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -242,6 +247,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -335,6 +344,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -408,6 +421,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log_cache:
capacity: 1000
shard_num: 1
eviction: Lru
log:
Grpc:
host: "localhost"
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {

num_records_read += logs.len();
offset += batch_size as i64;
result.append(&mut logs);
result.extend_from_slice(&logs);

// We used a a timestamp and we didn't get a full batch, so we have retrieved
// the last batch of logs relevant to our query
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::system::Handler;
use crate::system::Receiver;
use crate::system::System;
use crate::types::SegmentFlushInfo;
use arrow::compute::kernels::partition;
use async_trait::async_trait;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod assignment;
mod blockstore;
mod cache;
mod compactor;
mod config;
mod distance;
Expand Down