Skip to content

Commit

Permalink
Fix bug in 0.1.x (#1195)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix: visible check of entry created by committing txn.
2. Fix: dead lock in `SegmentIndexEntry::MemIndexInsert`
3. Fix: cleanup processor get visit_ts from txn_manager, visit_ts is
Min(first_uncommitted_begin_ts, last_checkpoint_ts)
4. Fix: config file typo.

Issue link:#1172
### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 committed May 9, 2024
1 parent 96370f1 commit 4875f41
Show file tree
Hide file tree
Showing 53 changed files with 392 additions and 363 deletions.
16 changes: 8 additions & 8 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ storage_capacity = "64GB"
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
garbage_collection_interval = "60s"
cleanup_interval = "60s"

# storage ratio activates garbage collection:
# 0 means disable,
# 0.1 means, once the storage reach 10% storage capacity, GC is triggered.
garbage_collection_storage_ratio = 0.1

# dump memory index entry when it reachs the capacity
memindex_capacity = 1048576
mem_index_capacity = 1048576

[buffer]
buffer_pool_size = "4GB"
buffer_manager_size = "4GB"
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
full_checkpoint_interval_sec = 86400
delta_checkpoint_interval_sec = 60
delta_checkpoint_interval_wal_bytes = 1000000000
wal_file_size_threshold = "1GB"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
flush_at_commit = "only_write"
flush_option = "only_write"

[resource]
dictionary_dir = "/var/infinity/resource"
19 changes: 9 additions & 10 deletions src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,13 @@ struct FilterResult {
}

template <typename ColumnValueType>
inline void
ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) {
inline void ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, Txn *txn) {
Vector<UniquePtr<TrunkReader<ColumnValueType>>> trunk_readers;
Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<SecondaryIndexInMem>> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot();
const u32 segment_row_count = SegmentRowCount();
auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot;
for (const auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
trunk_readers.emplace_back(MakeUnique<TrunkReaderT<ColumnValueType>>(segment_row_count, chunk_index_entry));
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ struct FilterResult {
inline void ExecuteSingleRange(const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const FilterExecuteSingleRange &single_range,
SegmentID segment_id,
const TxnTimeStamp ts) {
Txn *txn) {
// step 1. check if range is empty
if (single_range.IsEmpty()) {
return SetEmptyResult();
Expand All @@ -504,7 +503,7 @@ struct FilterResult {
// step 3. search index
auto &interval_range_variant = single_range.GetIntervalRange();
std::visit(Overload{[&]<typename ColumnValueType>(const FilterIntervalRangeT<ColumnValueType> &interval_range) {
ExecuteSingleRangeT(interval_range, index_entry, ts);
ExecuteSingleRangeT(interval_range, index_entry, txn);
},
[](const std::monostate &empty) {
UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!");
Expand Down Expand Up @@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
Vector<FilterResult> result_stack;
// execute filter_execute_command_ (Reverse Polish notation)
for (auto const &elem : filter_execute_command) {
Expand Down Expand Up @@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
},
[&](const FilterExecuteSingleRange &single_range) {
result_stack.emplace_back(segment_row_count, segment_row_actual_count);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn);
}},
elem);
}
Expand All @@ -644,13 +643,13 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
if (filter_execute_command.empty()) {
// return all true
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
}
auto result =
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts);
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
return std::move(result.selected_rows_);
}

Expand Down Expand Up @@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
const auto result =
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts);
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn);
result.Output(output_data_blocks, segment_id, delete_filter);

LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size()));
Expand Down
4 changes: 3 additions & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import bitmask;

namespace infinity {

class Txn;

// for int range filter, x > n is equivalent to x >= n + 1
// for float range filter, x > f is equivalent to x >= std::nextafter(f, INFINITY)
// we can use this to simplify the filter
Expand Down Expand Up @@ -110,6 +112,6 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts);
Txn *txn);

} // namespace infinity
9 changes: 5 additions & 4 deletions src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i

template <typename DataType, template <typename, typename> typename C>
void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) {
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
if (!common_query_filter_->TryFinishBuild(txn)) {
// not ready, abort and wait for next time
return;
}
Expand Down Expand Up @@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
if (block_entry == nullptr) {
UnrecoverableError(
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
}
} // this is for debug
}
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
}
Expand All @@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
int i = 0;
for (auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
BufferHandle index_handle = chunk_index_entry->GetIndex();
hnsw_search(index_handle, false, i++);
}
Expand Down
10 changes: 5 additions & 5 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module;
module physical_match;

import stl;

import txn;
import query_context;
import operator_state;
import physical_operator;
Expand Down Expand Up @@ -556,9 +556,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
auto execute_start_time = std::chrono::high_resolution_clock::now();
// 1. build QueryNode tree
// 1.1 populate column2analyzer
TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_);
Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(txn, base_table_ref_);
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
Expand Down Expand Up @@ -849,7 +848,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
auto start_time = std::chrono::high_resolution_clock::now();
assert(common_query_filter_);
{
bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr());
Txn *txn = query_context->GetTxn();
bool try_result = common_query_filter_->TryFinishBuild(txn);
auto finish_filter_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float, std::milli> filter_duration = finish_filter_time - start_time;
LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count()));
Expand Down
20 changes: 9 additions & 11 deletions src/main/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import session_manager;
import base_statement;
import parser_result;
import parser_assert;
import plan_fragment;

namespace infinity {

Expand Down Expand Up @@ -106,6 +107,10 @@ QueryResult QueryContext::Query(const String &query) {

QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
QueryResult query_result;
SharedPtr<LogicalNode> logical_plan = nullptr;
UniquePtr<PlanFragment> plan_fragment = nullptr;
UniquePtr<PhysicalOperator> physical_plan = nullptr;

// ProfilerStart("Query");
// BaseProfiler profiler;
// profiler.Begin();
Expand All @@ -127,7 +132,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
}

current_max_node_id_ = bind_context->GetNewLogicalNodeId();
SharedPtr<LogicalNode> logical_plan = logical_planner_->LogicalPlan();
logical_plan = logical_planner_->LogicalPlan();
StopProfile(QueryPhase::kLogicalPlan);
// LOG_WARN(fmt::format("Before optimizer cost: {}", profiler.ElapsedToString()));
// Apply optimized rule to the logical plan
Expand All @@ -137,13 +142,13 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {

// Build physical plan
StartProfile(QueryPhase::kPhysicalPlan);
UniquePtr<PhysicalOperator> physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
physical_plan = physical_planner_->BuildPhysicalOperator(logical_plan);
StopProfile(QueryPhase::kPhysicalPlan);
// LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kPipelineBuild);
// Fragment Builder, only for test now.
// SharedPtr<PlanFragment> plan_fragment = fragment_builder.Build(physical_plan);
auto plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
plan_fragment = fragment_builder_->BuildFragment(physical_plan.get());
StopProfile(QueryPhase::kPipelineBuild);

auto notifier = MakeUnique<Notifier>();
Expand All @@ -159,14 +164,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *statement) {
StopProfile(QueryPhase::kExecution);
// LOG_WARN(fmt::format("Before commit cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kCommit);
try {
this->CommitTxn();
} catch (RecoverableException &e) {
StopProfile();
this->RollbackTxn();
query_result.result_table_ = nullptr;
query_result.status_.Init(e.ErrorCode(), e.what());
}
this->CommitTxn();
StopProfile(QueryPhase::kCommit);

} catch (RecoverableException &e) {
Expand Down
6 changes: 3 additions & 3 deletions src/planner/bound/base_table_ref.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export module base_table_ref;
import stl;
import table_ref;
import table_entry;

import txn;
import table_function;
import block_index;
import internal_types;
Expand All @@ -48,8 +48,8 @@ public:
explicit BaseTableRef(TableEntry *table_entry, SharedPtr<BlockIndex> block_index)
: TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {}

static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(ts);
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, Txn *txn) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
return MakeShared<BaseTableRef>(table_entry, std::move(block_index));
}

Expand Down
6 changes: 3 additions & 3 deletions src/planner/query_binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import data_type;
import logical_type;
import base_entry;
import view_entry;
import txn;

namespace infinity {

Expand Down Expand Up @@ -425,10 +426,9 @@ SharedPtr<BaseTableRef> QueryBinder::BuildBaseTable(QueryContext *query_context,
columns.emplace_back(idx);
}

// TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();

SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(begin_ts);
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);

u64 table_index = bind_context_ptr_->GenerateTableIndex();
auto table_ref = MakeShared<BaseTableRef>(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/bg_task/compact_segments_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
}

auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
block_index->Insert(new_segment.get(), txn_);

segment_data.emplace_back(new_segment, std::move(to_compact_segments));
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());
Expand Down
5 changes: 2 additions & 3 deletions src/storage/bg_task/periodic_trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import third_party;
namespace infinity {

void CleanupPeriodicTrigger::Trigger() {
TxnTimeStamp visible_ts = txn_mgr_->GetMinUnflushedTS();
TxnTimeStamp visible_ts = txn_mgr_->GetCleanupScanTS();
// if (visible_ts == last_visible_ts_) {
// LOG_TRACE(fmt::format("Skip cleanup. visible timestamp: {}", visible_ts));
// return;
Expand All @@ -49,8 +49,7 @@ void CheckpointPeriodicTrigger::Trigger() {
auto checkpoint_task = MakeShared<CheckpointTask>(is_full_checkpoint_);
LOG_TRACE(fmt::format("Trigger {} periodic checkpoint.", is_full_checkpoint_ ? "FULL" : "DELTA"));
if (!wal_mgr_->TrySubmitCheckpointTask(std::move(checkpoint_task))) {
LOG_TRACE(
fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
LOG_TRACE(fmt::format("Skip {} checkpoint(time) because there is already a checkpoint task running.", is_full_checkpoint_ ? "FULL" : "DELTA"));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export String BufferStatusToString(BufferStatus status) {
return "Freed";
case BufferStatus::kNew:
return "New";
case BufferStatus::kClean:
return "Clean";
default:
return "Invalid";
}
Expand Down Expand Up @@ -77,7 +79,7 @@ public:

void CleanupFile() const;

void CleanupTempFile() const ;
void CleanupTempFile() const;

SizeT GetBufferSize() const { return file_worker_->GetMemoryCost(); }

Expand Down
24 changes: 14 additions & 10 deletions src/storage/common/block_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,36 @@

module;

#include <vector>

module block_index;

import stl;
import segment_entry;
import global_block_id;
import block_iter;
import segment_iter;
import txn;

namespace infinity {

void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) {
if (!check_ts || segment_entry->CheckVisible(timestamp)) {
void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) {
if (segment_entry->CheckVisible(txn)) {
u32 segment_id = segment_entry->segment_id();
segments_.emplace_back(segment_entry);
segment_index_.emplace(segment_id, segment_entry);
BlocksInfo blocks_info;

auto block_entry_iter = BlockEntryIter(segment_entry);
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
if (timestamp >= block_entry->min_row_ts()) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry);
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
{
auto block_guard = segment_entry->GetBlocksGuard();
for (const auto &block_entry : block_guard.block_entries_) {
if (block_entry->CheckVisible(txn)) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get());
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
}
}
}
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);
// blocks_info.segment_offset_ = segment_entry->row_count(); // use false row count to pass benchmark
TxnTimeStamp begin_ts = txn->BeginTS();
blocks_info.segment_offset_ = segment_entry->row_count(begin_ts);

segment_block_index_.emplace(segment_id, std::move(blocks_info));
}
Expand Down

0 comments on commit 4875f41

Please sign in to comment.