Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transaction): Use single hop in squashing when possible #2376

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction
if (tx) { // If we have a carrier transaction, this context is used for squashing
DCHECK(owner);
conn_state.db_index = owner->conn_state.db_index;
conn_state.squashing_info = {owner};
conn_state.squashing_info = {owner, owner->transaction};
}
auto* prev_reply_builder = Inject(crb);
CHECK_EQ(prev_reply_builder, nullptr);
Expand Down Expand Up @@ -236,7 +236,7 @@ size_t ConnectionState::ExecInfo::UsedMemory() const {
}

size_t ConnectionState::ScriptInfo::UsedMemory() const {
return dfly::HeapSize(keys) + async_cmds_heap_mem;
return async_cmds_heap_mem;
}

size_t ConnectionState::SubscribeInfo::UsedMemory() const {
Expand Down
9 changes: 4 additions & 5 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ struct ConnectionState {
struct ScriptInfo {
size_t UsedMemory() const;

absl::flat_hash_set<std::string_view> keys; // declared keys

size_t async_cmds_heap_mem = 0; // bytes used by async_cmds
size_t async_cmds_heap_limit = 0; // max bytes allowed for async_cmds
std::vector<StoredCmd> async_cmds; // aggregated by acall
Expand Down Expand Up @@ -135,10 +133,11 @@ struct ConnectionState {
};

struct SquashingInfo {
// Pointer to the original underlying context of the base command.
// Only const access it possible for reading from multiple threads,
// each squashing thread has its own proxy context that contains this info.
// Underlying context of the base command, should be used for state checks.
// Note: some squashing mechanisms re-use the context (single shard eval).
const ConnectionContext* owner = nullptr;
// Underlying base transaction of squashing mechanism.
const Transaction* transaction = nullptr;
};

enum MCGetMask {
Expand Down
189 changes: 75 additions & 114 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,22 +917,24 @@ optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis

// Return OK if all keys are allowed to be accessed: either declared in EVAL or
// transaction is running in global or non-atomic mode.
OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid,
CmdArgList args, Transaction* trans) {
Transaction::MultiMode multi_mode = trans->GetMultiMode();
OpStatus CheckKeysDeclared(CmdArgList args, const ConnectionContext* cntx, const CommandId* cid) {
// If we're squashing, rely on owner for locked keys and multi mode info
const Transaction* trans = cntx->transaction;
if (cntx->conn_state.squashing_info) {
trans = cntx->conn_state.squashing_info->transaction;
}

// We either scheduled on all shards or re-schedule for each operation,
// so we are not restricted to any keys.
auto multi_mode = trans->GetMultiMode();
if (multi_mode == Transaction::GLOBAL || multi_mode == Transaction::NON_ATOMIC)
return OpStatus::OK;

OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res)
return key_index_res.status();

// TODO: Switch to transaction internal locked keys once single hop multi transactions are merged
// const auto& locked_keys = trans->GetMultiKeys();
const auto& locked_keys = eval_info.keys;
const auto& locked_keys = trans->GetMultiKeys();

const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
Expand Down Expand Up @@ -1075,8 +1077,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
}

if (under_script && cid->IsTransactional()) {
OpStatus status =
CheckKeysDeclared(*dfly_cntx.conn_state.script_info, cid, tail_args, dfly_cntx.transaction);
OpStatus status = CheckKeysDeclared(tail_args, &dfly_cntx, cid);

if (status == OpStatus::KEY_NOTFOUND)
return ErrorReply{"script tried accessing undeclared key"};
Expand Down Expand Up @@ -1623,8 +1624,10 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)

optional<ErrorReply> findcmd_err;

if (ca.async) {
auto& info = cntx->conn_state.script_info;
auto& info = cntx->conn_state.script_info;
bool running_async = ca.async && info->async_cmds_heap_limit > 0;

if (running_async) {
ToUpper(&ca.args[0]);

// Full command verification happens during squashed execution
Expand All @@ -1637,7 +1640,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
}
}

if (auto err = FlushEvalAsyncCmds(cntx, !ca.async || findcmd_err.has_value()); err) {
if (auto err = FlushEvalAsyncCmds(cntx, !running_async || findcmd_err.has_value()); err) {
CapturingReplyBuilder::Apply(std::move(*err), &replier); // forward error to lua
*ca.requested_abort = true;
return;
Expand All @@ -1648,7 +1651,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
*ca.requested_abort |= ca.error_abort;
}

if (ca.async)
if (running_async)
return;

DispatchCommand(ca.args, cntx);
Expand Down Expand Up @@ -1728,44 +1731,40 @@ Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) {
return Transaction::NON_ATOMIC;
}

// Start multi transaction for eval. Returns true if transaction was scheduled.
// Skips scheduling if multi mode requires declaring keys, but no keys were declared.
// Return nullopt if eval runs inside multi and conflicts with multi mode
optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params,
ConnectionContext* cntx) {
Transaction* trans = cntx->transaction;
// Start multi transaction for EVAL in determined multi mode, if needed.
// Return error if the transaction is embedded into EXEC and conflicts with its mode.
std::optional<facade::ErrorReply> StartMultiEval(DbIndex dbid, CmdArgList keys,
ScriptMgr::ScriptParams params, Transaction* tx) {
Transaction::MultiMode script_mode = DetermineMultiMode(params);
Transaction::MultiMode multi_mode = trans->GetMultiMode();
Transaction::MultiMode multi_mode = tx->GetMultiMode();
// Check if eval is already part of a running multi transaction
if (multi_mode != Transaction::NOT_DETERMINED) {
if (multi_mode > script_mode) {
string err = StrCat(
return facade::ErrorReply{StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode,
" eval mode is: ", script_mode);
cntx->SendError(err);
return nullopt;
" eval mode is: ", script_mode)};
}
return false;
return nullopt;
}

if (keys.empty() && script_mode == Transaction::LOCK_AHEAD)
return false;
return nullopt;

switch (script_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
return true;
tx->StartMultiGlobal(dbid);
return nullopt;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(dbid, CmdArgVec{keys.begin(), keys.end()});
return true;
tx->StartMultiLockedAhead(dbid, CmdArgVec{keys.begin(), keys.end()}, true);
return nullopt;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
return true;
tx->StartMultiNonAtomic();
return nullopt;
default:
CHECK(false) << "Invalid mode";
};

return false;
return nullopt;
}

static std::string FullAclCommandFromArgs(CmdArgList args, std::string_view name) {
Expand Down Expand Up @@ -1797,26 +1796,6 @@ std::pair<const CommandId*, CmdArgList> Service::FindCmd(CmdArgList args) const
return {res, args.subspan(1)};
}

static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
const Transaction& tx) {
if (!sid.has_value()) {
return false;
}

if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
return false;
}

if (tx.GetMultiMode() != Transaction::NOT_DETERMINED) {
// We may be running EVAL under MULTI. Currently RunSingleShardMulti() will attempt to lock
// keys, in which case will be already locked by MULTI. We could optimize this path as well
// though.
return false;
}

return true;
}

void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
Expand All @@ -1830,98 +1809,79 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
if (!params)
return cntx->SendError(facade::kScriptNotFound);

string error;

DCHECK(!cntx->conn_state.script_info); // we should not call eval from the script.

// TODO: to determine whether the script is RO by scanning all "redis.p?call" calls
// and checking whether all invocations consist of RO commands.
// we can do it once during script insertion into script mgr.
auto& sinfo = cntx->conn_state.script_info;
sinfo = make_unique<ConnectionState::ScriptInfo>();
sinfo->keys.reserve(eval_args.keys.size());

optional<ShardId> sid;

UniqueSlotChecker slot_checker;
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
string_view key = ArgS(eval_args.keys, i);
slot_checker.Add(key);
sinfo->keys.insert(KeyLockArgs::GetLockKey(key));

ShardId cur_sid = Shard(key, shard_count());
if (i == 0) {
sid = cur_sid;
}
if (sid.has_value() && *sid != cur_sid) {
sid = nullopt;
}
}

sinfo->async_cmds_heap_limit = absl::GetFlag(FLAGS_multi_eval_squash_buffer);

Transaction* tx = cntx->transaction;
CHECK(tx != nullptr);

interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);
interpreter->SetRedisFunc(
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });

absl::Cleanup clean = [interpreter, &sinfo]() {
interpreter->ResetStack();
sinfo.reset();
};

string error;
Interpreter::RunResult result;

if (CanRunSingleShardMulti(sid, *params, *tx)) {
// If script runs on a single shard, we run it remotely to save hops.
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
CallFromScript(cntx, args);
});
bool embedded = tx->GetMultiMode() != Transaction::NOT_DETERMINED;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename embedded to was_undetermined and reverse the condition here and below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was_undetermined doesn't explain why it should be so, whether it's embedded or not is what we care about and we check it by multi mode determinedness

if (auto err = StartMultiEval(cntx->db_index(), eval_args.keys, *params, tx); err)
return cntx->SendError(std::move(*err));

// If we are placed only on a single shard, we can run on it directly to save hops
if (!embedded && tx->GetMultiMode() == Transaction::LOCK_AHEAD && tx->GetUniqueShardCnt() == 1) {
DCHECK(!tx->IsScheduled()); // We asked not to schedule for lock ahead

++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
tx->PrepareMultiForScheduleSingleHop(*sid, tx->GetDbIndex(), args);

sinfo->async_cmds_heap_limit = 0; // disable async commands

tx->MultiBecomeSquasher();
tx->ReportWritesSquashedMulti([tx](auto sid) { return sid == tx->GetUniqueShard(); });
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
boost::intrusive_ptr<Transaction> stub_tx =
new Transaction{tx, *sid, slot_checker.GetUniqueSlotId()};
cntx->transaction = stub_tx.get();
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, tx->GetUniqueShard()};

cntx->conn_state.squashing_info = {cntx, tx};
cntx->transaction = stub_tx.get();
result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal

cntx->transaction = tx;
cntx->conn_state.squashing_info.reset();
return OpStatus::OK;
});

if (*sid != ServerState::tlocal()->thread_index()) {
if (tx->GetUniqueShard() != ServerState::tlocal()->thread_index()) {
VLOG(1) << "Migrating connection " << cntx->conn() << " from "
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid));
<< ProactorBase::me()->GetPoolIndex() << " to " << tx->GetUniqueShard();
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(tx->GetUniqueShard()));
}
} else {
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}

++ServerState::tlocal()->stats.eval_io_coordination_cnt;
interpreter->SetRedisFunc(
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });

if (!tx->IsScheduled() && tx->IsAtomicMulti())
tx->Schedule();

result = interpreter->RunFunction(eval_args.sha, &error);

if (auto err = FlushEvalAsyncCmds(cntx, true); err) {
auto err_ref = CapturingReplyBuilder::GetError(*err);
result = Interpreter::RUN_ERR;
error = absl::StrCat(err_ref->first);
error = err_ref->first;
}

// Conclude the transaction.
if (*scheduled)
cntx->transaction->UnlockMulti();
}

if (!embedded)
cntx->transaction->UnlockMulti();

if (result == Interpreter::RUN_ERR) {
string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error);
return cntx->SendError(resp, facade::kScriptErrType);
Expand Down Expand Up @@ -2027,13 +1987,13 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {

// Return true if transaction was scheduled, false if scheduling was not required.
void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
Transaction::MultiMode multi_mode) {
Transaction::MultiMode multi_mode, bool delay_scheduling) {
switch (multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
break;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(dbid, CollectAllKeys(exec_info));
trans->StartMultiLockedAhead(dbid, CollectAllKeys(exec_info), delay_scheduling);
break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
Expand Down Expand Up @@ -2081,14 +2041,17 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
// and scripts
optional<Transaction::MultiMode> multi_mode = DeduceExecMode(state, exec_info, *script_mgr());
if (!multi_mode)
return rb->SendError(
"Dragonfly does not allow execution of a server-side Lua in Multi transaction");
return rb->SendError("Dragonfly does not allow execution of server-side Lua in Multi/Exec");

bool scheduled = false;
if (*multi_mode != Transaction::NOT_DETERMINED) {
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode);
scheduled = true;
}
// If there might be only one hop (looking up watched key is one on it's own), possibly delay
// scheduling for lock ahead (only). Squashing can then make use of it, if it detects that all
// commands fit into a single hop.
bool allow_squashing = absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE;
bool delay_scheduling = allow_squashing && exec_info.watched_keys.empty();
if (*multi_mode != Transaction::NOT_DETERMINED)
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, *multi_mode, delay_scheduling);

bool is_transactional = cntx->transaction->GetMultiMode() != Transaction::NOT_DETERMINED;

// EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) {
Expand All @@ -2110,7 +2073,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
ServerState::tlocal()->exec_freq_count[descr]++;
}

if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) {
if (allow_squashing && is_transactional) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
} else {
CmdArgVec arg_vec;
Expand Down Expand Up @@ -2140,10 +2103,8 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
}

if (scheduled) {
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
if (is_transactional)
cntx->transaction->UnlockMulti();
}

cntx->cid = exec_cid_;
VLOG(1) << "Exec completed";
Expand Down