Skip to content

Commit

Permalink
chore: move single shard lua to new interface
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 7, 2024
1 parent a72a150 commit e017b69
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 61 deletions.
4 changes: 4 additions & 0 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class CommandId : public facade::CommandId {

bool IsTransactional() const;

bool IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
}

bool IsReadOnly() const {
return opt_mask_ & CO::READONLY;
}
Expand Down
69 changes: 22 additions & 47 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,7 @@ optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa
trans->StartMultiGlobal(dbid);
return true;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(dbid, keys);
trans->StartMultiLockedAhead(dbid, keys, true);
return true;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
Expand Down Expand Up @@ -1762,26 +1762,6 @@ std::pair<const CommandId*, CmdArgList> Service::FindCmd(CmdArgList args) const
return {res, args.subspan(1)};
}

static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
const Transaction& tx) {
if (!sid.has_value()) {
return false;
}

if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
return false;
}

if (tx.GetMultiMode() != Transaction::NOT_DETERMINED) {
// We may be running EVAL under MULTI. Currently RunSingleShardMulti() will attempt to lock
// keys, in which case will be already locked by MULTI. We could optimize this path as well
// though.
return false;
}

return true;
}

void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
Expand All @@ -1806,20 +1786,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
sinfo = make_unique<ConnectionState::ScriptInfo>();
sinfo->keys.reserve(eval_args.keys.size());

optional<ShardId> sid;
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
string_view key = ArgS(eval_args.keys, i);
sinfo->keys.insert(KeyLockArgs::GetLockKey(key));

ShardId cur_sid = Shard(key, shard_count());
if (i == 0) {
sid = cur_sid;
}
if (sid.has_value() && *sid != cur_sid) {
sid = nullopt;
}
}

sinfo->async_cmds_heap_limit = absl::GetFlag(FLAGS_multi_eval_squash_buffer);
Transaction* tx = cntx->transaction;
CHECK(tx != nullptr);
Expand All @@ -1834,19 +1800,29 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret

Interpreter::RunResult result;

if (CanRunSingleShardMulti(sid, *params, *tx)) {
// If script runs on a single shard, we run it remotely to save hops.
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}

// If script runs on a single shard, we run it remotely to save hops
if (!tx->IsScheduled() && tx->GetMultiMode() == Transaction::LOCK_AHEAD &&
tx->GetUniqueShardCnt() == 1) {
DCHECK(*scheduled); // because tx multi mode is lock ahead
CHECK(!tx->IsScheduled()); // skip_scheduling = true in StartMultiEval

interpreter->SetRedisFunc([cntx, tx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
CallFromScript(cntx, args);
});

++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
// TODO: remove doubule key iteration
tx->PrepareMultiForScheduleSingleHop(*sid, tx->GetDbIndex(), args);

auto sid = tx->GetUniqueShard();
tx->MultiBecomeSquasher();
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, *sid};
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, sid};
stub_tx->MultiUpdateWithParent(tx);
cntx->transaction = stub_tx.get();

Expand All @@ -1856,16 +1832,14 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
return OpStatus::OK;
});

if (*sid != ServerState::tlocal()->thread_index()) {
if (sid != ServerState::tlocal()->thread_index()) {
VLOG(1) << "Migrating connection " << cntx->conn() << " from "
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid));
<< ProactorBase::me()->GetPoolIndex() << " to " << sid;
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(sid));
}
} else {
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}
if (*scheduled && !tx->IsScheduled())
tx->Schedule();

++ServerState::tlocal()->stats.eval_io_coordination_cnt;
interpreter->SetRedisFunc(
Expand Down Expand Up @@ -2080,6 +2054,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (scheduled && allow_squashing) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
} else {
DCHECK(!scheduled || !delay_scheduling);
for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();

Expand Down
2 changes: 1 addition & 1 deletion src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void MultiCommandSquasher::PerformHop() {
// Doesn't work with replication, so disallow if inline scheduling is not allowed.
bool singlehop_possible = IsAtomic() && !tx->IsScheduled() && cmds_.empty();
if (singlehop_possible) {
tx->MultiSwitchToNonAtomic();
tx->MultiBecomeSquasher();
DCHECK_GT(tx->GetUniqueShardCnt(), 0u); // it was initialized and determined active shards
tx->ScheduleSingleHop(run_cb);
return;
Expand Down
14 changes: 3 additions & 11 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,10 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
}
}

void Transaction::MultiSwitchToNonAtomic() {
void Transaction::MultiBecomeSquasher() {
DCHECK_EQ(multi_->mode, LOCK_AHEAD);
CHECK_EQ(coordinator_state_, 0); // not scheduled and certainly not executing
multi_->role = SQUASHER;
multi_->locks.clear();
}

void Transaction::StartMultiGlobal(DbIndex dbid) {
Expand Down Expand Up @@ -440,15 +439,6 @@ string Transaction::DebugId() const {
return res;
}

void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) {
multi_.reset();
InitBase(db, args);
EnableShard(sid);
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
CHECK(key_index);
StoreKeysInArgs(*key_index, false);
}

// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
Expand Down Expand Up @@ -588,6 +578,7 @@ void Transaction::ScheduleInternal() {
DCHECK(!shard_data_.empty());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
DCHECK(!multi_ || cid_->IsMultiTransactional());

bool span_all = IsGlobal();

Expand Down Expand Up @@ -706,6 +697,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
bool scheduled = (coordinator_state_ & COORD_SCHED) > 0;
DCHECK(!scheduled || IsAtomicMulti()); // if we don't need to schedule, we're an atomic multi
DCHECK(scheduled || !IsAtomicMulti() || multi_->role == SQUASHER); // only useful for squashing
DCHECK(scheduled || !multi_ || cid_->IsMultiTransactional()); // don't schedule on normal cmd

if (!scheduled) // Conclude only if we schedule, otherwise we're part of multi-hop multi
coordinator_state_ |= COORD_EXEC_CONCLUDING;
Expand Down
4 changes: 2 additions & 2 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class Transaction {
// to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);

// Switch lock-ahead transaction to non-atomic. Can only be called *before* it was scheduled.
void MultiSwitchToNonAtomic();
// Mark as squasher.
void MultiBecomeSquasher();

// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
Expand Down

0 comments on commit e017b69

Please sign in to comment.