Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: small tiering fixes #2966

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx

auto it = Iterator(res->it, StringOrView::FromView(key));
auto exp_it = ExpIterator(res->exp_it, StringOrView::FromView(key));
PreUpdate(cntx.db_index, it);
PreUpdate(cntx.db_index, it, key);
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return {{it, exp_it,
Expand Down Expand Up @@ -502,7 +502,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
if (res.ok()) {
Iterator it(res->it, StringOrView::FromView(key));
ExpIterator exp_it(res->exp_it, StringOrView::FromView(key));
PreUpdate(cntx.db_index, it);
PreUpdate(cntx.db_index, it, key);
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return DbSlice::AddOrFindResult{
Expand Down Expand Up @@ -971,14 +971,21 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
return true;
}

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it) {
void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FiberAtomicGuard fg;

DVLOG(2) << "Running callbacks in dbid " << db_ind;
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
}

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
// append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands
if (it.IsOccupied() && it->second.HasIoPending()) {
owner_->tiered_storage_v2()->CancelStash(db_ind, key, &it->second);
}

it.GetInnerIt().SetVersion(NextVersion());
}

Expand Down Expand Up @@ -1431,7 +1438,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
const PrimeValue& pv = del_it->second;

if (pv.IsExternal() && shard_owner()->tiered_storage_v2()) {
shard_owner()->tiered_storage_v2()->Delete(del_it.key(), &del_it->second);
shard_owner()->tiered_storage_v2()->Delete(&del_it->second);
}

size_t value_heap_size = pv.MallocUsed();
Expand Down
23 changes: 13 additions & 10 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,15 @@ class DbSlice {
}

// returns absolute time of the expiration.
time_t ExpireTime(ExpConstIterator it) const {
time_t ExpireTime(const ExpConstIterator& it) const {
return ExpireTime(it.GetInnerIt());
}
time_t ExpireTime(ExpIterator it) const {

time_t ExpireTime(const ExpIterator& it) const {
return ExpireTime(it.GetInnerIt());
}
time_t ExpireTime(ExpireConstIterator it) const {

time_t ExpireTime(const ExpireConstIterator& it) const {
return it.is_done() ? 0 : expire_base_[0] + it->second.duration_ms();
}

Expand Down Expand Up @@ -468,7 +470,7 @@ class DbSlice {
void PerformDeletion(PrimeIterator del_it, DbTable* table);

private:
void PreUpdate(DbIndex db_ind, Iterator it);
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);

OpResult<AddOrFindResult> AddOrUpdateInternal(const Context& cntx, std::string_view key,
Expand Down Expand Up @@ -503,11 +505,12 @@ class DbSlice {
};

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);

OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type);

Expand Down Expand Up @@ -572,19 +575,19 @@ class DbSlice {
client_tracking_map_;
};

inline bool IsValid(DbSlice::Iterator it) {
inline bool IsValid(const DbSlice::Iterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ConstIterator it) {
inline bool IsValid(const DbSlice::ConstIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ExpIterator it) {
inline bool IsValid(const DbSlice::ExpIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

inline bool IsValid(DbSlice::ExpConstIterator it) {
inline bool IsValid(const DbSlice::ExpConstIterator& it) {
return dfly::IsValid(it.GetInnerIt());
}

Expand Down
40 changes: 21 additions & 19 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,17 +482,16 @@ OpResult<variant<size_t, util::fb2::Future<size_t>>> OpExtend(const OpArgs& op_a
*v = prepend ? absl::StrCat(value, *v) : absl::StrCat(*v, value);
return v->size();
};
return {shard->tiered_storage_v2()->Modify<size_t>(key, pv, std::move(modf))};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not want to improve the name of StringReplies ? :)

return {shard->tiered_storage_v2()->Modify<size_t>(op_args.db_cntx.db_index, key, pv,
std::move(modf))};
} else {
if (pv.HasIoPending())
shard->tiered_storage_v2()->Delete(key, &pv);
return {ExtendExisting(it_res->it, key, value, prepend)};
}
}

// Helper for building replies for strings
struct StringReplies {
StringReplies(SinkReplyBuilder* rb) : rb{static_cast<RedisReplyBuilder*>(rb)} {
struct GetReplies {
GetReplies(SinkReplyBuilder* rb) : rb{static_cast<RedisReplyBuilder*>(rb)} {
DCHECK(dynamic_cast<RedisReplyBuilder*>(rb));
}

Expand Down Expand Up @@ -520,8 +519,9 @@ struct StringReplies {

} // namespace

StringValue StringValue::Read(string_view key, const PrimeValue& pv, EngineShard* es) {
return pv.IsExternal() ? StringValue{es->tiered_storage_v2()->Read(key, pv)}
StringValue StringValue::Read(DbIndex dbid, string_view key, const PrimeValue& pv,
EngineShard* es) {
return pv.IsExternal() ? StringValue{es->tiered_storage_v2()->Read(dbid, key, pv)}
: StringValue(GetString(pv));
}

Expand Down Expand Up @@ -611,8 +611,8 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);

// If value is external, mark it as deleted
if (prime_value.IsExternal() || prime_value.HasIoPending()) {
shard->tiered_storage_v2()->Delete(key, &prime_value);
if (prime_value.IsExternal()) {
shard->tiered_storage_v2()->Delete(&prime_value);
}

// overwrite existing entry.
Expand Down Expand Up @@ -653,7 +653,7 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string

// Currently we always offload
if (auto* ts = shard->tiered_storage_v2(); ts && ts->ShouldStash(*pv)) {
ts->Stash(key, pv);
ts->Stash(op_args_.db_cntx.db_index, key, pv);
}

if (manual_journal_ && op_args_.shard->journal()) {
Expand Down Expand Up @@ -692,7 +692,8 @@ OpStatus SetCmd::CachePrevIfNeeded(const SetCmd::SetParams& params, DbSlice::Ite
if (it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;

*params.prev_val = StringValue::Read(it.key(), it->second, EngineShard::tlocal());
*params.prev_val =
StringValue::Read(op_args_.db_cntx.db_index, it.key(), it->second, EngineShard::tlocal());
return OpStatus::OK;
}

Expand Down Expand Up @@ -794,7 +795,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
}

if (sparams.flags & SetCmd::SET_GET) {
return StringReplies{cntx->reply_builder()}.Send(std::move(prev));
return GetReplies{cntx->reply_builder()}.Send(std::move(prev));
}

if (result == OpStatus::OK) {
Expand Down Expand Up @@ -849,10 +850,10 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
if (!it_res.ok())
return it_res.status();

return StringValue::Read(key, (*it_res)->second, es);
return StringValue::Read(tx->GetDbIndex(), key, (*it_res)->second, es);
};

StringReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

// With tieringV2 support
Expand All @@ -862,13 +863,13 @@ void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
if (!it_res.ok())
return it_res.status();

auto value = StringValue::Read(key, it_res->it->second, es);
auto value = StringValue::Read(tx->GetDbIndex(), key, it_res->it->second, es);
it_res->post_updater.Run(); // Run manually before delete
es->db_slice().Del(tx->GetDbIndex(), it_res->it);
return value;
};

StringReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

// With tieringV2 support
Expand All @@ -884,7 +885,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(status);
}

StringReplies{cntx->reply_builder()}.Send(std::move(prev));
GetReplies{cntx->reply_builder()}.Send(std::move(prev));
}

void StringFamily::Append(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -895,6 +896,7 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) {
ExtendGeneric(args, true, cntx);
}

// With tieringV2 support
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
Expand Down Expand Up @@ -979,7 +981,7 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
if (!it_res)
return it_res.status();

StringValue value = StringValue::Read(key, it_res->it->second, shard);
StringValue value = StringValue::Read(t->GetDbIndex(), key, it_res->it->second, shard);

if (exp_params.IsDefined()) {
it_res->post_updater.Run(); // Run manually before possible delete due to negative expire
Expand All @@ -1001,7 +1003,7 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
return value;
};

StringReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}

void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) {
Expand Down
3 changes: 2 additions & 1 deletion src/server/string_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ struct StringValue {
bool IsEmpty() const;

// Read string from prime value - either from memory or issue tiered storage read
static StringValue Read(std::string_view key, const PrimeValue& pv, EngineShard* es);
static StringValue Read(DbIndex dbid, std::string_view key, const PrimeValue& pv,
EngineShard* es);

private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_;
Expand Down