Skip to content

Commit

Permalink
support phrase query with block max (#1189)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
* support phrase query with block max
* fix skiplist reader error in reading position

Issue link:#639

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature
- [x] Test cases
  • Loading branch information
Ma-cat authored and JinHai-CN committed May 10, 2024
1 parent 093ae71 commit 79918f5
Show file tree
Hide file tree
Showing 24 changed files with 435 additions and 144 deletions.
10 changes: 5 additions & 5 deletions benchmark/local_infinity/fulltext/fulltext_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ void BenchmarkOptimize(SharedPtr<Infinity> infinity, const String &db_name, cons

void BenchmarkQuery(SharedPtr<Infinity> infinity, const String &db_name, const String &table_name) {
std::string fields = "text";
//std::vector<std::string> query_vec = {"one of", "is", "a", "\"is a\"", "\"one of\""};// {"Animalia", "Algorithms", "Animalia Algorithms", "network space", "harmful chemical anarchism"};
std::vector<std::string> query_vec = {"harmful chemical anarchism", "\"harmful chemical\"", "\"one of\"", "harmful chemical"};
std::vector<std::string> query_vec = {"harmful \"social custom\"", "social custom \"harmful chemical\"", "\"annual American awards\"", "harmful chemical", "\"one of\""};

for (auto match_text : query_vec) {
BaseProfiler profiler;
profiler.Begin();
Expand Down Expand Up @@ -314,7 +314,7 @@ int main(int argc, char *argv[]) {
};
Mode mode(Mode::kInsert);
SizeT insert_batch = 500;
app.add_option("--mode", mode, "Bencmark mode, one of insert, import, merge, query")
app.add_option("--mode", mode, "Benchmark mode, one of insert, import, merge, query")
->required()
->transform(CLI::CheckedTransformer(mode_map, CLI::ignore_case));
app.add_option("--insert-batch", insert_batch, "batch size of each insert, valid only at insert and merge mode, default value 500");
Expand All @@ -331,7 +331,7 @@ int main(int argc, char *argv[]) {
String srcfile = test_data_path();
srcfile += "/benchmark/dbpedia-entity/corpus.jsonl";

// #define DEL_LOCAL_DATA
//#define DEL_LOCAL_DATA
#ifdef DEL_LOCAL_DATA
system("rm -rf /var/infinity/data /var/infinity/wal /var/infinity/log /var/infinity/tmp");
#endif
Expand All @@ -358,7 +358,7 @@ int main(int argc, char *argv[]) {
case Mode::kQuery: {
BenchmarkCreateIndex(infinity, db_name, table_name, index_name);
BenchmarkInsert(infinity, db_name, table_name, srcfile, insert_batch);
// BenchmarkOptimize(infinity, db_name, table_name);
BenchmarkOptimize(infinity, db_name, table_name);
sleep(10);
BenchmarkMoreQuery(infinity, db_name, table_name, 1);
break;
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/configs/infinity_enwiki.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"query_link": "to_be_set",
"mode": "fulltext",
"topK": 10,
"use_import": false,
"use_import": true,
"schema": {
"doctitle": {"type": "varchar", "default":""},
"docdate": {"type": "varchar", "default":""},
Expand Down
49 changes: 3 additions & 46 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,45 +494,6 @@ void ASSERT_FLOAT_EQ(float bar, u32 i, float a, float b) {
}
}

void AnalyzeFunc(const String &analyzer_name, String &&text, TermList &output_terms) {
UniquePtr<Analyzer> analyzer = AnalyzerPool::instance().Get(analyzer_name);
// (dynamic_cast<CommonLanguageAnalyzer*>(analyzer.get()))->SetExtractEngStem(false);
if (analyzer.get() == nullptr) {
RecoverableError(Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer_name)));
}
Term input_term;
input_term.text_ = std::move(text);
TermList temp_output_terms;
analyzer->Analyze(input_term, temp_output_terms);
if (analyzer_name == AnalyzerPool::STANDARD) {
// remove duplicates and only keep the root words for query
const u32 INVALID_TERM_OFFSET = -1;
Term last_term;
last_term.word_offset_ = INVALID_TERM_OFFSET;
for (const Term &term : temp_output_terms) {
if (last_term.word_offset_ != INVALID_TERM_OFFSET) {
assert(term.word_offset_ >= last_term.word_offset_);
}
if (last_term.word_offset_ != term.word_offset_) {
if (last_term.word_offset_ != INVALID_TERM_OFFSET) {
output_terms.emplace_back(last_term);
}
last_term.text_ = term.text_;
last_term.word_offset_ = term.word_offset_;
last_term.stats_ = term.stats_;
} else {
if (term.text_.size() < last_term.text_.size()) {
last_term.text_ = term.text_;
last_term.stats_ = term.stats_;
}
}
}
if (last_term.word_offset_ != INVALID_TERM_OFFSET) {
output_terms.emplace_back(last_term);
}
}
}

void ExecuteFTSearch(UniquePtr<EarlyTerminateIterator> &et_iter, FullTextScoreResultHeap &result_heap, u32 &blockmax_loop_cnt) {
if (et_iter) {
while (true) {
Expand Down Expand Up @@ -584,12 +545,7 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
if (!query_tree) {
RecoverableError(Status::ParseMatchExprFailed(match_expr_->fields_, match_expr_->matching_text_));
}
if (query_tree->type_ == QueryNodeType::PHRASE) {
// TODO: make sure there is no problem with block max phrase and delete this code
// LOG_INFO(fmt::format("Block max phrase not supported, use ordinary iterator, query: {}", match_expr_->matching_text_));
use_block_max_iter = false;
use_ordinary_iter = true;
}

auto finish_parse_query_tree_time = std::chrono::high_resolution_clock::now();
TimeDurationType parse_query_tree_duration = finish_parse_query_tree_time - finish_init_query_builder_time;
LOG_TRACE(fmt::format("PhysicalMatch Part 0.2: Parse QueryNode tree time: {} ms", parse_query_tree_duration.count()));
Expand Down Expand Up @@ -868,8 +824,9 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
SharedPtr<Vector<String>> PhysicalMatch::GetOutputNames() const {
SharedPtr<Vector<String>> result_names = MakeShared<Vector<String>>();
result_names->reserve(base_table_ref_->column_names_->size() + 2);
for (auto &name : *base_table_ref_->column_names_)
for (auto &name : *base_table_ref_->column_names_) {
result_names->emplace_back(name);
}
result_names->emplace_back(COLUMN_NAME_SCORE);
result_names->emplace_back(COLUMN_NAME_ROW_ID);
return result_names;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/invertedindex/format/position_list_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ void PositionListDecoder::InitPositionSkipList(const ByteSliceList *pos_list,
state->SetRecordOffset(pos_skiplist_end);
} else {
pos_skiplist_reader_ = session_pool_ ? new ((session_pool_)->Allocate(sizeof(SkipListReaderByteSlice)))
SkipListReaderByteSlice(option_.GetDocListFormatOption())
: new SkipListReaderByteSlice(option_.GetDocListFormatOption());
SkipListReaderByteSlice(option_.GetPosListFormatOption())
: new SkipListReaderByteSlice(option_.GetPosListFormatOption());
skiplist_reader_real_size_ = sizeof(SkipListReaderByteSlice);
static_cast<SkipListReaderByteSlice *>(pos_skiplist_reader_)->Load(pos_list, pos_skiplist_start, pos_skiplist_end);
decoded_pos_count_ = 0;
Expand All @@ -66,8 +66,8 @@ void PositionListDecoder::InitPositionSkipList(ByteSlice *pos_list,
state->SetRecordOffset(pos_skiplist_end);
} else {
pos_skiplist_reader_ = session_pool_ ? new ((session_pool_)->Allocate(sizeof(SkipListReaderByteSlice)))
SkipListReaderByteSlice(option_.GetDocListFormatOption())
: new SkipListReaderByteSlice(option_.GetDocListFormatOption());
SkipListReaderByteSlice(option_.GetPosListFormatOption())
: new SkipListReaderByteSlice(option_.GetPosListFormatOption());
skiplist_reader_real_size_ = sizeof(SkipListReaderByteSlice);
static_cast<SkipListReaderByteSlice *>(pos_skiplist_reader_)->Load(pos_list, pos_skiplist_start, pos_skiplist_end);
decoded_pos_count_ = 0;
Expand Down
14 changes: 10 additions & 4 deletions src/storage/invertedindex/format/skiplist_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import doc_list_format_option;
import memory_pool;
import posting_byte_slice;
import posting_byte_slice_reader;
import position_list_format_option;

namespace infinity {

export class SkipListReader {
public:
explicit SkipListReader(const DocListFormatOption &doc_list_format_option) : doc_list_format_option_(doc_list_format_option) {
explicit SkipListReader(const DocListFormatOption &doc_list_format_option)
: has_tf_list_(doc_list_format_option.HasTfList()),
has_block_max_(doc_list_format_option.HasBlockMax()) {
if (has_tf_list_) {
ttf_buffer_ = MakeUnique<u32[]>(SKIP_LIST_BUFFER_SIZE);
}
Expand All @@ -24,6 +27,8 @@ public:
}
}

explicit SkipListReader(const PositionListFormatOption &doc_list_format_option) {}

virtual ~SkipListReader() = default;

bool SkipTo(u32 query_doc_id, u32 &doc_id, u32 &prev_doc_id, u32 &offset, u32 &delta);
Expand Down Expand Up @@ -55,9 +60,8 @@ public:
protected:
virtual Pair<int, bool> LoadBuffer() = 0;

DocListFormatOption doc_list_format_option_;
const bool has_tf_list_ = doc_list_format_option_.HasTfList();
const bool has_block_max_ = doc_list_format_option_.HasBlockMax();
const bool has_tf_list_ = false;
const bool has_block_max_ = false;
i32 skipped_item_count_ = 0;
u32 current_doc_id_ = 0;
u32 current_offset_ = 0;
Expand All @@ -80,6 +84,8 @@ export class SkipListReaderByteSlice final : public SkipListReader {
public:
explicit SkipListReaderByteSlice(const DocListFormatOption &doc_list_format_option) : SkipListReader(doc_list_format_option) {}

explicit SkipListReaderByteSlice(const PositionListFormatOption &pos_list_format_option) : SkipListReader(pos_list_format_option) {}

void Load(const ByteSliceList *byte_slice_list, u32 start, u32 end);

void Load(ByteSlice *byteSlice, u32 start, u32 end);
Expand Down
1 change: 1 addition & 0 deletions src/storage/invertedindex/multi_posting_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ bool MultiPostingDecoder::DiskSegMoveToSegment(SegmentPosting &cur_segment_posti
u32 doc_skiplist_end = doc_skiplist_start + doc_skiplist_size;

index_decoder_->InitSkipList(doc_skiplist_start, doc_skiplist_end, posting_list, term_meta.GetDocFreq());

if (format_option_.HasPositionList()) {
u32 pos_list_begin = doc_list_reader.Tell() + doc_skiplist_size + doc_list_size;
in_doc_state_keeper_.MoveToSegment(posting_list, term_meta.GetTotalTermFreq(), pos_list_begin, format_option_);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/invertedindex/multi_posting_decoder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ private:
}

bool MoveToSegment(RowID start_row_id);

bool MemSegMoveToSegment(const SharedPtr<PostingWriter> &posting_writer);

bool DiskSegMoveToSegment(SegmentPosting &cur_segment_posting);

IndexDecoder *CreateDocIndexDecoder(u32 doc_list_begin_pos);

private:
PostingFormatOption format_option_;
bool need_decode_doc_id_ = false;
Expand Down
22 changes: 5 additions & 17 deletions src/storage/invertedindex/search/bm25_ranker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module bm25_ranker;

import stl;
import index_defines;
import third_party;

namespace infinity {

Expand All @@ -34,23 +35,10 @@ void BM25Ranker::AddTermParam(u64 tf, u64 df, float avg_column_len, u32 column_l
score_ += smooth_idf * smooth_tf * weight;
}

void BM25Ranker::AddPhraseParam(const Vector<tf_t>& all_tf,
const Vector<u32>& all_df,
float avg_column_len,
u32 column_len,
float weight,
SizeT term_num) {
for (SizeT i = 0; i < term_num; ++i) {
u64 tf = 0;
u64 df = 0;
if (i < all_tf.size()) {
tf = all_tf[i];
}
if (i < all_df.size()) {
df = all_df[i];
}
AddTermParam(tf, df, avg_column_len, column_len, weight);
}
void BM25Ranker::AddPhraseParam(tf_t tf, u64 df, float avg_column_len, u32 column_len, float weight) {
float smooth_idf = std::log(1.0F + (total_df_ - df + 0.5F) / (df + 0.5F));
float smooth_tf = (k1 + 1.0F) * tf / (tf + k1 * (1.0F - b + b * column_len / avg_column_len));
score_ += smooth_idf * smooth_tf * weight;
}

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/invertedindex/search/bm25_ranker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public:

void AddTermParam(u64 tf, u64 df, float avg_column_len, u32 column_len, float weight);

void AddPhraseParam(const Vector<tf_t>& all_tf, const Vector<u32>& all_df, float avg_column_len, u32 column_len, float weight, SizeT term_num);
void AddPhraseParam(tf_t tf, u64 df, float avg_colum_len, u32 column_len, float weight);

float GetScore() { return score_; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import stl;
import index_defines;
import early_terminate_iterator;
import internal_types;
import third_party;

namespace infinity {

Expand Down Expand Up @@ -79,7 +80,7 @@ bool BlockMaxAndIterator::BlockSkipTo(RowID doc_id, float threshold) {
sum_score += sorted_iterators_[j - 1]->BlockMaxBM25Score();
common_block_max_bm25_score_parts_[j - 1] = prev_sum_score;
}
assert((sum_score <= bm25_score_upper_bound_));
// assert((sum_score <= bm25_score_upper_bound_));
if (sum_score >= threshold) {
common_block_max_bm25_score_ = sum_score;
common_block_min_possible_doc_id_ = doc_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ private:
Vector<Pair<u32, u64>> must_have_history_;
};

} // namespace infinity
} // namespace infinity

0 comments on commit 79918f5

Please sign in to comment.