Skip to content

Commit

Permalink
chore: tiering fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg committed May 1, 2024
1 parent c84b6fa commit 842e3b6
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 104 deletions.
15 changes: 11 additions & 4 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx

auto it = Iterator(res->it, StringOrView::FromView(key));
auto exp_it = ExpIterator(res->exp_it, StringOrView::FromView(key));
PreUpdate(cntx.db_index, it);
PreUpdate(cntx.db_index, it, key);
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return {{it, exp_it,
Expand Down Expand Up @@ -502,7 +502,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
if (res.ok()) {
Iterator it(res->it, StringOrView::FromView(key));
ExpIterator exp_it(res->exp_it, StringOrView::FromView(key));
PreUpdate(cntx.db_index, it);
PreUpdate(cntx.db_index, it, key);
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return DbSlice::AddOrFindResult{
Expand Down Expand Up @@ -971,14 +971,21 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
return true;
}

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it) {
void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FiberAtomicGuard fg;

DVLOG(2) << "Running callbacks in dbid " << db_ind;
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
}

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
// append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands
if (it->second.HasIoPending()) {
owner_->tiered_storage_v2()->CancelStash(db_ind, key, &it->second);
}

it.GetInnerIt().SetVersion(NextVersion());
}

Expand Down Expand Up @@ -1431,7 +1438,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
const PrimeValue& pv = del_it->second;

if (pv.IsExternal() && shard_owner()->tiered_storage_v2()) {
shard_owner()->tiered_storage_v2()->Delete(del_it.key(), &del_it->second);
shard_owner()->tiered_storage_v2()->Delete(table->index, del_it.key(), &del_it->second);
}

size_t value_heap_size = pv.MallocUsed();
Expand Down
23 changes: 13 additions & 10 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,15 @@ class DbSlice {
}

// returns absolute time of the expiration.
time_t ExpireTime(ExpConstIterator it) const {
time_t ExpireTime(const ExpConstIterator& it) const {
return ExpireTime(it.GetInnerIt());
}
time_t ExpireTime(ExpIterator it) const {

time_t ExpireTime(const ExpIterator& it) const {
return ExpireTime(it.GetInnerIt());
}
time_t ExpireTime(ExpireConstIterator it) const {

time_t ExpireTime(const ExpireConstIterator& it) const {
return it.is_done() ? 0 : expire_base_[0] + it->second.duration_ms();
}

Expand Down Expand Up @@ -468,7 +470,7 @@ class DbSlice {
void PerformDeletion(PrimeIterator del_it, DbTable* table);

private:
void PreUpdate(DbIndex db_ind, Iterator it);
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);

OpResult<AddOrFindResult> AddOrUpdateInternal(const Context& cntx, std::string_view key,
Expand Down Expand Up @@ -503,11 +505,12 @@ class DbSlice {
};

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);

OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type);

Expand Down Expand Up @@ -572,19 +575,19 @@ class DbSlice {
client_tracking_map_;
};

inline bool IsValid(DbSlice::Iterator it) {
inline bool IsValid(const DbSlice::Iterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ConstIterator it) {
inline bool IsValid(const DbSlice::ConstIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ExpIterator it) {
inline bool IsValid(const DbSlice::ExpIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ExpConstIterator it) {
inline bool IsValid(const DbSlice::ExpConstIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

Expand Down
26 changes: 14 additions & 12 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,9 @@ OpResult<variant<size_t, util::fb2::Future<size_t>>> OpExtend(const OpArgs& op_a
*v = prepend ? absl::StrCat(value, *v) : absl::StrCat(*v, value);
return v->size();
};
return {shard->tiered_storage_v2()->Modify<size_t>(key, pv, std::move(modf))};
return {shard->tiered_storage_v2()->Modify<size_t>(op_args.db_cntx.db_index, key, pv,
std::move(modf))};
} else {
if (pv.HasIoPending())
shard->tiered_storage_v2()->Delete(key, &pv);
return {ExtendExisting(it_res->it, key, value, prepend)};
}
}
Expand Down Expand Up @@ -520,8 +519,9 @@ struct StringReplies {

} // namespace

StringValue StringValue::Read(string_view key, const PrimeValue& pv, EngineShard* es) {
return pv.IsExternal() ? StringValue{es->tiered_storage_v2()->Read(key, pv)}
StringValue StringValue::Read(DbIndex dbid, string_view key, const PrimeValue& pv,
EngineShard* es) {
return pv.IsExternal() ? StringValue{es->tiered_storage_v2()->Read(dbid, key, pv)}
: StringValue(GetString(pv));
}

Expand Down Expand Up @@ -611,8 +611,8 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);

// If value is external, mark it as deleted
if (prime_value.IsExternal() || prime_value.HasIoPending()) {
shard->tiered_storage_v2()->Delete(key, &prime_value);
if (prime_value.IsExternal()) {
shard->tiered_storage_v2()->Delete(op_args_.db_cntx.db_index, key, &prime_value);
}

// overwrite existing entry.
Expand Down Expand Up @@ -653,7 +653,7 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string

// Currently we always offload
if (auto* ts = shard->tiered_storage_v2(); ts && ts->ShouldStash(*pv)) {
ts->Stash(key, pv);
ts->Stash(op_args_.db_cntx.db_index, key, pv);
}

if (manual_journal_ && op_args_.shard->journal()) {
Expand Down Expand Up @@ -692,7 +692,8 @@ OpStatus SetCmd::CachePrevIfNeeded(const SetCmd::SetParams& params, DbSlice::Ite
if (it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;

*params.prev_val = StringValue::Read(it.key(), it->second, EngineShard::tlocal());
*params.prev_val =
StringValue::Read(op_args_.db_cntx.db_index, it.key(), it->second, EngineShard::tlocal());
return OpStatus::OK;
}

Expand Down Expand Up @@ -849,7 +850,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
if (!it_res.ok())
return it_res.status();

return StringValue::Read(key, (*it_res)->second, es);
return StringValue::Read(tx->GetDbIndex(), key, (*it_res)->second, es);
};

StringReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
Expand All @@ -862,7 +863,7 @@ void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
if (!it_res.ok())
return it_res.status();

auto value = StringValue::Read(key, it_res->it->second, es);
auto value = StringValue::Read(tx->GetDbIndex(), key, it_res->it->second, es);
it_res->post_updater.Run(); // Run manually before delete
es->db_slice().Del(tx->GetDbIndex(), it_res->it);
return value;
Expand Down Expand Up @@ -895,6 +896,7 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) {
ExtendGeneric(args, true, cntx);
}

// With tieringV2 support
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
Expand Down Expand Up @@ -979,7 +981,7 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
if (!it_res)
return it_res.status();

StringValue value = StringValue::Read(key, it_res->it->second, shard);
StringValue value = StringValue::Read(t->GetDbIndex(), key, it_res->it->second, shard);

if (exp_params.IsDefined()) {
it_res->post_updater.Run(); // Run manually before possible delete due to negative expire
Expand Down
3 changes: 2 additions & 1 deletion src/server/string_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ struct StringValue {
bool IsEmpty() const;

// Read string from prime value - either from memory or issue tiered storage read
static StringValue Read(std::string_view key, const PrimeValue& pv, EngineShard* es);
static StringValue Read(DbIndex dbid, std::string_view key, const PrimeValue& pv,
EngineShard* es);

private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_;
Expand Down
84 changes: 45 additions & 39 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}

// Find entry by key in db_slice and store external segment in place of original value
void SetExternal(string_view key, tiering::DiskSegment segment) {
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats
Expand All @@ -52,12 +52,12 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {

// Find bin by id and call SetExternal for all contained entries
void SetExternal(tiering::SmallBins::BinId id, tiering::DiskSegment segment) {
for (const auto& [sub_key, sub_segment] : ts_->bins_->ReportStashed(id, segment))
SetExternal(string_view{sub_key}, sub_segment);
for (const auto& [sub_dbid, sub_key, sub_segment] : ts_->bins_->ReportStashed(id, segment))
SetExternal({sub_dbid, sub_key}, sub_segment);
}

// Clear IO pending flag for entry
void ClearIoPending(string_view key) {
void ClearIoPending(OpManager::KeyRef key) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
stats_.total_cancels++;
Expand All @@ -66,13 +66,13 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {

// Clear IO pending flag for all contained entries of bin
void ClearIoPending(tiering::SmallBins::BinId id) {
for (const string& key : ts_->bins_->ReportStashAborted(id))
for (const auto& key : ts_->bins_->ReportStashAborted(id))
ClearIoPending(key);
}

// Find entry by key and store it's up-to-date value in place of external segment.
// Returns false if the value is outdated, true otherwise
bool SetInMemory(string_view key, string_view value, tiering::DiskSegment segment) {
bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
pv->Reset(); // TODO: account for memory
pv->SetString(value);
Expand All @@ -94,13 +94,13 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {

void ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
DCHECK(holds_alternative<string_view>(id)); // we never issue reads for bins
DCHECK(holds_alternative<OpManager::KeyRef>(id)); // we never issue reads for bins

// Modified values are always cached and deleted from disk
if (!modified && !cache_fetched_)
return;

SetInMemory(get<string_view>(id), value, segment);
SetInMemory(get<OpManager::KeyRef>(id), value, segment);

// Delete value
if (segment.length >= TieredStorageV2::kMinOccupancySize) {
Expand All @@ -114,10 +114,11 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
private:
friend class TieredStorageV2;

PrimeValue* Find(string_view key) {
PrimeValue* Find(OpManager::KeyRef key) {
// TODO: Get DbContext for transaction for correct dbid and time
auto it = db_slice_->FindMutable(DbContext{}, key);
return IsValid(it.it) ? &it.it->second : nullptr;
// Bypass all update and stat mechanisms
auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second);
return IsValid(it) ? &it->second : nullptr;
}

bool cache_fetched_ = false;
Expand All @@ -139,40 +140,44 @@ TieredStorageV2::~TieredStorageV2() {
}

error_code TieredStorageV2::Open(string_view path) {
return op_manager_->Open(path);
return op_manager_->Open(absl::StrCat(path, ProactorBase::me()->GetPoolIndex()));
}

void TieredStorageV2::Close() {
op_manager_->Close();
}

util::fb2::Future<string> TieredStorageV2::Read(string_view key, const PrimeValue& value) {
util::fb2::Future<string> TieredStorageV2::Read(DbIndex dbid, string_view key,
const PrimeValue& value) {
DCHECK(value.IsExternal());
util::fb2::Future<string> future;
op_manager_->Enqueue(key, value.GetExternalSlice(), [future](string* value) mutable {
auto cb = [future](string* value) mutable {
future.Resolve(*value);
return false;
});
};
op_manager_->Enqueue(std::make_pair(dbid, key), value.GetExternalSlice(), std::move(cb));
return future;
}

template <typename T>
util::fb2::Future<T> TieredStorageV2::Modify(std::string_view key, const PrimeValue& value,
util::fb2::Future<T> TieredStorageV2::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,
std::function<T(std::string*)> modf) {
DCHECK(value.IsExternal());
util::fb2::Future<T> future;
auto cb = [future, modf = std::move(modf)](std::string* value) mutable {
future.Resolve(modf(value));
return true;
};
op_manager_->Enqueue(key, value.GetExternalSlice(), std::move(cb));
op_manager_->Enqueue(std::make_pair(dbid, key), value.GetExternalSlice(), std::move(cb));
return future;
}

template util::fb2::Future<size_t> TieredStorageV2::Modify(
std::string_view key, const PrimeValue& value, std::function<size_t(std::string*)> modf);
DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<size_t(std::string*)> modf);

void TieredStorageV2::Stash(string_view key, PrimeValue* value) {
void TieredStorageV2::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
DCHECK(!value->IsExternal() && !value->HasIoPending());

string buf;
Expand All @@ -182,9 +187,9 @@ void TieredStorageV2::Stash(string_view key, PrimeValue* value) {
tiering::OpManager::EntryId id;
error_code ec;
if (value->Size() >= kMinOccupancySize) {
id = key;
ec = op_manager_->Stash(key, value_sv);
} else if (auto bin = bins_->Stash(key, value_sv); bin) {
id = std::make_pair(dbid, key);
ec = op_manager_->Stash(id, value_sv);
} else if (auto bin = bins_->Stash(dbid, key, value_sv); bin) {
id = bin->first;
ec = op_manager_->Stash(bin->first, bin->second);
}
Expand All @@ -194,24 +199,25 @@ void TieredStorageV2::Stash(string_view key, PrimeValue* value) {
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
}
}
void TieredStorageV2::Delete(string_view key, PrimeValue* value) {
if (value->IsExternal()) {
tiering::DiskSegment segment = value->GetExternalSlice();
if (segment.length >= kMinOccupancySize) {
op_manager_->Delete(segment);
} else if (auto bin = bins_->Delete(segment); bin) {
op_manager_->Delete(*bin);
}
value->Reset();
} else {
DCHECK(value->HasIoPending());
if (value->Size() >= kMinOccupancySize) {
op_manager_->Delete(key);
} else if (auto bin = bins_->Delete(key); bin) {
op_manager_->Delete(*bin);
}
value->SetIoPending(false);
void TieredStorageV2::Delete(DbIndex dbid, string_view key, PrimeValue* value) {
DCHECK(value->IsExternal());
tiering::DiskSegment segment = value->GetExternalSlice();
if (segment.length >= kMinOccupancySize) {
op_manager_->Delete(segment);
} else if (auto bin = bins_->Delete(segment); bin) {
op_manager_->Delete(*bin);
}
value->Reset();
}

void TieredStorageV2::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
DCHECK(value->HasIoPending());
if (value->Size() >= kMinOccupancySize) {
op_manager_->Delete(std::make_pair(dbid, key));
} else if (auto bin = bins_->Delete(dbid, key); bin) {
op_manager_->Delete(*bin);
}
value->SetIoPending(false);
}

bool TieredStorageV2::ShouldStash(const PrimeValue& pv) {
Expand Down

0 comments on commit 842e3b6

Please sign in to comment.