Skip to content

Commit

Permalink
[Feat] KNN filtering with limit and KNN distance function (#4036)
Browse files Browse the repository at this point in the history
  • Loading branch information
emmanuel-keller committed May 24, 2024
1 parent 23653e5 commit 7495611
Show file tree
Hide file tree
Showing 47 changed files with 2,887 additions and 1,101 deletions.
6 changes: 2 additions & 4 deletions core/src/dbs/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::Document;
use crate::err::Error;
use crate::idx::docids::DocId;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::iterators::{IteratorRecord, IteratorRef};
use crate::idx::planner::IterationStage;
use crate::sql::edges::Edges;
use crate::sql::range::Range;
Expand All @@ -34,9 +33,8 @@ pub(crate) enum Iterable {
}

pub(crate) struct Processed {
pub(crate) ir: Option<IteratorRef>,
pub(crate) rid: Option<Thing>,
pub(crate) doc_id: Option<DocId>,
pub(crate) ir: Option<IteratorRecord>,
pub(crate) val: Operable,
}

Expand Down
130 changes: 71 additions & 59 deletions core/src/dbs/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use crate::dbs::distinct::AsyncDistinct;
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::{Iterable, Iterator, Operable, Options, Processed, Statement, Transaction};
use crate::err::Error;
use crate::idx::planner::executor::IteratorRef;
use crate::idx::planner::iterators::{CollectorRecord, IteratorRef, ThingIterator};
use crate::idx::planner::IterationStage;
use crate::key::{graph, thing};
use crate::kvs;
use crate::kvs::ScanPage;
use crate::sql::dir::Dir;
use crate::sql::{Edges, Range, Table, Thing, Value};
#[cfg(not(target_arch = "wasm32"))]
use channel::Sender;
use reblessive::tree::Stk;
use std::ops::Bound;
use std::vec;

impl Iterable {
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -60,7 +62,7 @@ impl Iterable {
if let Some(IterationStage::BuildKnn) = ctx.get_iteration_stage() {
if let Some(qp) = ctx.get_query_planner() {
if let Some(exe) = qp.get_query_executor(tb) {
return exe.has_knn();
return exe.has_bruteforce_knn();
}
}
}
Expand All @@ -71,7 +73,7 @@ impl Iterable {
}
}

enum Processor<'a> {
pub(crate) enum Processor<'a> {
Iterator(Option<&'a mut SyncDistinct>, &'a mut Iterator),
#[cfg(not(target_arch = "wasm32"))]
Channel(Option<AsyncDistinct>, Sender<Processed>),
Expand Down Expand Up @@ -141,17 +143,17 @@ impl<'a> Processor<'a> {
}
Iterable::Range(v) => self.process_range(stk, ctx, opt, txn, stm, v).await?,
Iterable::Edges(e) => self.process_edge(stk, ctx, opt, txn, stm, e).await?,
Iterable::Index(t, ir) => {
Iterable::Index(t, irf) => {
if let Some(qp) = ctx.get_query_planner() {
if let Some(exe) = qp.get_query_executor(&t.0) {
// We set the query executor matching the current table in the Context
// Avoiding search in the hashmap of the query planner for each doc
let mut ctx = Context::new(ctx);
ctx.set_query_executor(exe.clone());
return self.process_index(stk, &ctx, opt, txn, stm, &t, ir).await;
return self.process_index(stk, &ctx, opt, txn, stm, &t, irf).await;
}
}
self.process_index(stk, ctx, opt, txn, stm, &t, ir).await?
self.process_index(stk, ctx, opt, txn, stm, &t, irf).await?
}
Iterable::Mergeable(v, o) => {
self.process_mergeable(stk, ctx, opt, txn, stm, v, o).await?
Expand All @@ -175,9 +177,8 @@ impl<'a> Processor<'a> {
) -> Result<(), Error> {
// Pass the value through
let pro = Processed {
ir: None,
rid: None,
doc_id: None,
ir: None,
val: Operable::Value(v),
};
// Process the document record
Expand Down Expand Up @@ -205,9 +206,8 @@ impl<'a> Processor<'a> {
});
// Process the document record
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand All @@ -228,9 +228,8 @@ impl<'a> Processor<'a> {
txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &v.tb, opt.strict).await?;
// Process the document record
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
ir: None,
val: Operable::Value(Value::None),
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand Down Expand Up @@ -263,9 +262,8 @@ impl<'a> Processor<'a> {
let val = Operable::Mergeable(x, o);
// Process the document record
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand Down Expand Up @@ -299,9 +297,8 @@ impl<'a> Processor<'a> {
let val = Operable::Relatable(f, x, w);
// Process the document record
let pro = Processed {
ir: None,
rid: Some(v),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand Down Expand Up @@ -352,9 +349,8 @@ impl<'a> Processor<'a> {
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand Down Expand Up @@ -425,9 +421,8 @@ impl<'a> Processor<'a> {
let val = Operable::Value(val);
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand Down Expand Up @@ -551,9 +546,8 @@ impl<'a> Processor<'a> {
});
// Process the record
let pro = Processed {
ir: None,
rid: Some(rid),
doc_id: None,
ir: None,
val,
};
self.process(stk, ctx, opt, txn, stm, pro).await?;
Expand All @@ -574,53 +568,27 @@ impl<'a> Processor<'a> {
txn: &Transaction,
stm: &Statement<'_>,
table: &Table,
ir: IteratorRef,
irf: IteratorRef,
) -> Result<(), Error> {
// Check that the table exists
txn.lock().await.check_ns_db_tb(opt.ns(), opt.db(), &table.0, opt.strict).await?;
if let Some(exe) = ctx.get_query_executor() {
if let Some(mut iterator) = exe.new_iterator(opt, ir).await? {
let mut things = Vec::new();
iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?;
while !things.is_empty() {
if let Some(mut iterator) = exe.new_iterator(opt, irf).await? {
// Get the first batch
let mut to_process = Self::next_batch(ctx, opt, txn, &mut iterator).await?;

while !to_process.is_empty() {
// Check if the context is finished
if ctx.is_done() {
break;
}

for (thing, doc_id) in things {
// Check the context
if ctx.is_done() {
break;
}

// If the record is from another table we can skip
if !thing.tb.eq(table.as_str()) {
continue;
}

// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &table.0, &thing.id);
let val = txn.lock().await.get(key.clone()).await?;
let rid = Thing::from((key.tb, key.id));
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
None => Value::None,
});
// Process the document record
let pro = Processed {
ir: Some(ir),
rid: Some(rid),
doc_id,
val,
};
// Process the records
// TODO: par_iter
for pro in to_process {
self.process(stk, ctx, opt, txn, stm, pro).await?;
}

// Collect the next batch of ids
things = Vec::new();
iterator.next_batch(txn, PROCESSOR_BATCH_SIZE, &mut things).await?;
// Get the next batch
to_process = Self::next_batch(ctx, opt, txn, &mut iterator).await?;
}
// Everything ok
return Ok(());
Expand All @@ -634,4 +602,48 @@ impl<'a> Processor<'a> {
message: "No QueryExecutor has been found.".to_string(),
})
}

async fn next_batch(
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
iterator: &mut ThingIterator,
) -> Result<Vec<Processed>, Error> {
let mut tx = txn.lock().await;
let records: Vec<CollectorRecord> =
iterator.next_batch(ctx, &mut tx, PROCESSOR_BATCH_SIZE).await?;
let mut to_process = Vec::with_capacity(records.len());
for r in records {
let v = if let Some(v) = r.2 {
// The value may be already be fetched by the KNN iterator to evaluate the condition
v
} else {
// Otherwise we have to fetch the record
Iterable::fetch_thing(&mut tx, opt, &r.0).await?
};
let p = Processed {
rid: Some(r.0),
ir: Some(r.1),
val: Operable::Value(v),
};
to_process.push(p);
}
Ok(to_process)
}
}

impl Iterable {
/// Returns the value from the store, or Value::None it the value does not exist.
pub(crate) async fn fetch_thing(
tx: &mut kvs::Transaction,
opt: &Options,
thg: &Thing,
) -> Result<Value, Error> {
// Fetch the data from the store
let key = thing::new(opt.ns(), opt.db(), &thg.tb, &thg.id);
// Fetch and parse the data from the store
let val = tx.get(key).await?.map(Value::from).unwrap_or(Value::None);
// Return the result
Ok(val)
}
}
18 changes: 15 additions & 3 deletions core/src/doc/check.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::ctx::Context;
use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::Document;
use crate::doc::{CursorDoc, Document};
use crate::err::Error;
use crate::sql::Cond;
use reblessive::tree::Stk;

impl<'a> Document<'a> {
Expand All @@ -13,11 +14,22 @@ impl<'a> Document<'a> {
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
Self::check_cond(stk, ctx, opt, txn, stm.conds(), &self.current).await
}

pub(crate) async fn check_cond(
stk: &mut Stk,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
cond: Option<&Cond>,
doc: &CursorDoc<'_>,
) -> Result<(), Error> {
// Check where condition
if let Some(cond) = stm.conds() {
if let Some(cond) = cond {
// Check if the expression is truthy
if !cond.compute(stk, ctx, opt, txn, Some(&self.current)).await?.is_truthy() {
if !cond.compute(stk, ctx, opt, txn, Some(doc)).await?.is_truthy() {
// Ignore this document
return Err(Error::Ignore);
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/doc/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a> Document<'a> {
Operable::Relatable(f, v, w) => (v, Workable::Relate(f, w)),
};
// Setup a new document
let mut doc = Document::new(pro.ir, pro.rid.as_ref(), pro.doc_id, &ins.0, ins.1);
let mut doc = Document::new(pro.rid.as_ref(), pro.ir.as_ref(), &ins.0, ins.1);
// Process the statement
let res = match stm {
Statement::Select(_) => doc.select(stk, ctx, opt, txn, stm).await,
Expand All @@ -59,9 +59,8 @@ impl<'a> Document<'a> {
None => Value::None,
};
pro = Processed {
ir: None,
doc_id: None,
rid: Some(v),
ir: None,
val: match doc.extras {
Workable::Normal => Operable::Value(val),
Workable::Insert(o) => Operable::Mergeable(val, o),
Expand Down

0 comments on commit 7495611

Please sign in to comment.