Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize full-text index creation time #1199

Merged
merged 2 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ SizeT ColumnInverter::InvertColumn(SharedPtr<ColumnVector> column_vector, u32 ro
SizeT term_count_sum = 0;
for (SizeT i = 0; i < row_count; ++i) {
String data = column_vector->ToString(row_offset + i);
if (data.empty())
if (data.empty()) {
continue;
}
SizeT term_count = InvertColumn(begin_doc_id + i, data);
column_lengths[i] = term_count;
term_count_sum += term_count;
Expand Down Expand Up @@ -246,8 +247,9 @@ void ColumnInverter::SortForOfflineDump() {
// Data within each group
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
// spill sort results for external merge sort
if (positions_.empty())
if (positions_.empty()) {
return;
}
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = ftell(spill_file);
Expand Down
38 changes: 38 additions & 0 deletions src/storage/invertedindex/common/external_sort_merger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,44 @@ struct KeyAddress<KeyType, LenType, typename std::enable_if<std::is_scalar<KeyTy
bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
};

template <typename LenType>
struct KeyAddress<TermTuple, LenType> {
char *data{nullptr};
u64 addr;
u32 idx;

KeyAddress(char *p, u64 ad, u32 i) {
data = p;
addr = ad;
idx = i;
}

KeyAddress() {
data = nullptr;
addr = -1;
idx = -1;
}

TermTuple KEY() { return TermTuple(data + sizeof(LenType), LEN()); }
TermTuple KEY() const { return TermTuple(data + sizeof(LenType), LEN()); }
LenType LEN() const { return *(LenType *)data; }
u64 &ADDR() { return addr; }
u64 ADDR() const { return addr; }
u32 IDX() const { return idx; }
u32 &IDX() { return idx; }

int Compare(const KeyAddress &p) const {
return KEY().Compare(p.KEY());
}

bool operator==(const KeyAddress &other) const { return Compare(other) == 0; }

bool operator>(const KeyAddress &other) const { return Compare(other) < 0; }

bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
};


export template <typename KeyType, typename LenType>
class SortMerger {
typedef SortMerger<KeyType, LenType> self_t;
Expand Down
24 changes: 16 additions & 8 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ MemoryIndexer::~MemoryIndexer() {
}

void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline) {
if (is_spilled_)
if (is_spilled_) {
Load();
}

u64 seq_inserted(0);
u32 doc_count(0);
Expand Down Expand Up @@ -121,8 +122,9 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
auto func = [this, task, inverter](int id) {
SizeT column_length_sum = inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
column_length_sum_ += column_length_sum;
if (column_length_sum > 0)
if (column_length_sum > 0) {
inverter->SortForOfflineDump();
}
this->ring_sorted_.Put(task->task_seq_, inverter);
};
inverting_thread_pool_.push(std::move(func));
Expand All @@ -145,8 +147,9 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
}

void MemoryIndexer::InsertGap(u32 row_count) {
if (is_spilled_)
if (is_spilled_) {
Load();
}

std::unique_lock<std::mutex> lock(mutex_);
doc_count_ += row_count;
Expand All @@ -155,14 +158,16 @@ void MemoryIndexer::InsertGap(u32 row_count) {
void MemoryIndexer::Commit(bool offline) {
if (offline) {
commiting_thread_pool_.push([this](int id) { this->CommitOffline(); });
} else
} else {
commiting_thread_pool_.push([this](int id) { this->CommitSync(); });
}
}

SizeT MemoryIndexer::CommitOffline(SizeT wait_if_empty_ms) {
std::unique_lock<std::mutex> lock(mutex_commit_, std::defer_lock);
if (!lock.try_lock())
if (!lock.try_lock()) {
return 0;
}

if (nullptr == spill_file_handle_) {
PrepareSpillFile();
Expand Down Expand Up @@ -200,14 +205,16 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) {
};

std::unique_lock<std::mutex> lock(mutex_commit_, std::defer_lock);
if (!lock.try_lock())
if (!lock.try_lock()) {
return 0;
}

while (1) {
this->ring_sorted_.GetBatch(inverters, wait_if_empty_ms);
// num_merged = inverters.size();
if (inverters.empty())
if (inverters.empty()) {
break;
}
for (auto &inverter : inverters) {
inverter->GeneratePosting();
num_generated += inverter->GetMerged();
Expand Down Expand Up @@ -353,8 +360,9 @@ void MemoryIndexer::OfflineDump() {
// 2. Generate posting
// 3. Dump disk segment data
// LOG_INFO(fmt::format("MemoryIndexer::OfflineDump begin, num_runs_ {}", num_runs_));
if (tuple_count_ == 0)
if (tuple_count_ == 0) {
return;
}
FinalSpillFile();
constexpr u32 buffer_size_of_each_run = 2 * 1024 * 1024;
SortMerger<TermTuple, u32> *merger = new SortMerger<TermTuple, u32>(spill_full_path_.c_str(), num_runs_, buffer_size_of_each_run * num_runs_, 2);
Expand Down