Skip to content

Commit

Permalink
fix: fixes v3
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 8, 2024
1 parent 639b2cb commit 86794b0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 50 deletions.
71 changes: 29 additions & 42 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1693,44 +1693,41 @@ Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) {
return Transaction::NON_ATOMIC;
}

// Start multi transaction for eval. Returns true if transaction was scheduled.
// Start multi transaction for eval and return true if it is transactional (schedules on its own).
// Skips scheduling if multi mode requires declaring keys, but no keys were declared.
// Return nullopt if eval runs inside multi and conflicts with multi mode
optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params,
ConnectionContext* cntx) {
Transaction* trans = cntx->transaction;
optional<facade::ErrorReply> StartMultiEval(DbIndex dbid, CmdArgList keys,
ScriptMgr::ScriptParams params, Transaction* trans) {
Transaction::MultiMode script_mode = DetermineMultiMode(params);
Transaction::MultiMode multi_mode = trans->GetMultiMode();
// Check if eval is already part of a running multi transaction
if (multi_mode != Transaction::NOT_DETERMINED) {
if (multi_mode > script_mode) {
string err = StrCat(
return facade::ErrorReply(StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode,
" eval mode is: ", script_mode);
cntx->SendError(err);
return nullopt;
" eval mode is: ", script_mode));
}
return false;
return nullopt;
}

// No keys will be accessed, so we don't have to be transactional
if (keys.empty() && script_mode == Transaction::LOCK_AHEAD)
return false;
return nullopt;

switch (script_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
return true;
break;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(dbid, keys, true);
return true;
break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
return true;
break;
default:
CHECK(false) << "Invalid mode";
};

return false;
return nullopt;
}

static std::string FullAclCommandFromArgs(CmdArgList args, std::string_view name) {
Expand Down Expand Up @@ -1776,7 +1773,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
return cntx->SendError(facade::kScriptNotFound);

string error;

DCHECK(!cntx->conn_state.script_info); // we should not call eval from the script.

// TODO: to determine whether the script is RO by scanning all "redis.p?call" calls
Expand Down Expand Up @@ -1804,16 +1800,12 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret

Interpreter::RunResult result;

optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}
bool embedded = tx->GetMultiMode() != Transaction::NOT_DETERMINED;
if (auto err = StartMultiEval(cntx->db_index(), eval_args.keys, *params, tx); err)
return cntx->SendError(std::move(*err));

// If script runs on a single shard, we run it remotely to save hops
if (!tx->IsScheduled() && tx->GetMultiMode() == Transaction::LOCK_AHEAD &&
tx->GetUniqueShardCnt() == 1) {
DCHECK(*scheduled); // because tx multi mode is lock ahead

if (tx->GetMultiMode() == Transaction::LOCK_AHEAD && tx->GetUniqueShardCnt() == 1) {
interpreter->SetRedisFunc([cntx, tx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
Expand All @@ -1840,7 +1832,8 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(sid));
}
} else {
if (*scheduled && !tx->IsScheduled() && tx->GetMultiMode() != Transaction::NON_ATOMIC)
// We possibly delay scheduling for the optimization above
if (!tx->IsScheduled() && tx->IsAtomicMulti())
tx->Schedule();

++ServerState::tlocal()->stats.eval_io_coordination_cnt;
Expand All @@ -1855,8 +1848,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
error = absl::StrCat(err_ref->first);
}

// Conclude the transaction.
if (*scheduled)
if (!embedded)
cntx->transaction->UnlockMulti();
}

Expand Down Expand Up @@ -1979,13 +1971,12 @@ void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo*
trans->StartMultiNonAtomic();
break;
case Transaction::NOT_DETERMINED:
LOG(FATAL) << "should not reach";
break;
};
}

void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());

absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); };

if (!cntx->conn_state.exec_info.IsCollecting()) {
Expand All @@ -2006,7 +1997,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}

cntx->last_command_debug.exec_body_len = exec_info.body.size();
const CommandId* const exec_cid = cntx->cid;
CmdArgVec arg_vec;
ExecEvalState state = DetermineEvalPresense(exec_info.body);

Expand All @@ -2016,18 +2006,14 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (!multi_mode)
return rb->SendError("Dragonfly does not allow execution of server-side Lua in Multi/Exec");

bool allow_squashing = absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE;
bool scheduled = false;

// If there might be only one hop (looking up watched key is one on it's own), possibly delay
// scheduling for lock ahead (only). Squashing can then make use of it, if it detects that all
// commands fit into a single hop.
bool allow_squashing = absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE;
bool delay_scheduling = allow_squashing && exec_info.watched_keys.empty();
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode, delay_scheduling);

if (*multi_mode != Transaction::NOT_DETERMINED) {
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode, delay_scheduling);
scheduled = true;
}
bool is_transactional = cntx->transaction->GetMultiMode() != Transaction::NOT_DETERMINED;

// EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) {
Expand All @@ -2053,10 +2039,12 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
ss->exec_freq_count[descr]++;
}

if (scheduled && allow_squashing) {
if (allow_squashing && is_transactional) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
} else {
DCHECK(!scheduled || !delay_scheduling);
// If we delayed scheduling, we should have chosen squashing
DCHECK(!is_transactional || cntx->transaction->IsScheduled());

for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();

Expand Down Expand Up @@ -2088,13 +2076,12 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
exec_info.preborrowed_interpreter = nullptr;
}

if (scheduled) {
if (is_transactional) {
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
cntx->transaction->UnlockMulti();
}

cntx->cid = exec_cid;

cntx->cid = exec_cid_;
VLOG(1) << "Exec completed";
}

Expand Down
16 changes: 8 additions & 8 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ class Transaction {
return coordinator_state_ & COORD_SCHED;
}

// Whether the transaction is multi and runs in an atomic mode.
// This, instead of just IsMulti(), should be used to check for the possibility of
// different optimizations, because they can safely be applied to non-atomic multi
// transactions as well.
bool IsAtomicMulti() const {
return multi_ && (multi_->mode == LOCK_AHEAD || multi_->mode == GLOBAL);
}

// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;

Expand Down Expand Up @@ -519,14 +527,6 @@ class Transaction {
return use_count_.load(std::memory_order_relaxed);
}

// Whether the transaction is multi and runs in an atomic mode.
// This, instead of just IsMulti(), should be used to check for the possibility of
// different optimizations, because they can safely be applied to non-atomic multi
// transactions as well.
bool IsAtomicMulti() const {
return multi_ && multi_->mode != NON_ATOMIC;
}

bool IsActiveMulti() const {
return multi_ && multi_->role != SQUASHED_STUB;
}
Expand Down

0 comments on commit 86794b0

Please sign in to comment.