Skip to content

Commit

Permalink
fix: fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 7, 2024
1 parent e017b69 commit 2d5a5ce
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
10 changes: 6 additions & 4 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,10 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
auto& sinfo = cntx->conn_state.script_info;
sinfo = make_unique<ConnectionState::ScriptInfo>();
sinfo->keys.reserve(eval_args.keys.size());
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
string_view key = ArgS(eval_args.keys, i);
sinfo->keys.insert(KeyLockArgs::GetLockKey(key));
}

sinfo->async_cmds_heap_limit = absl::GetFlag(FLAGS_multi_eval_squash_buffer);
Transaction* tx = cntx->transaction;
Expand All @@ -1808,8 +1812,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
// If script runs on a single shard, we run it remotely to save hops
if (!tx->IsScheduled() && tx->GetMultiMode() == Transaction::LOCK_AHEAD &&
tx->GetUniqueShardCnt() == 1) {
DCHECK(*scheduled); // because tx multi mode is lock ahead
CHECK(!tx->IsScheduled()); // skip_scheduling = true in StartMultiEval
DCHECK(*scheduled); // because tx multi mode is lock ahead

interpreter->SetRedisFunc([cntx, tx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
Expand All @@ -1823,7 +1826,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
tx->MultiBecomeSquasher();
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, sid};
stub_tx->MultiUpdateWithParent(tx);
cntx->transaction = stub_tx.get();

result = interpreter->RunFunction(eval_args.sha, &error);
Expand All @@ -1838,7 +1840,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(sid));
}
} else {
if (*scheduled && !tx->IsScheduled())
if (*scheduled && !tx->IsScheduled() && tx->GetMultiMode() != Transaction::NON_ATOMIC)
tx->Schedule();

++ServerState::tlocal()->stats.eval_io_coordination_cnt;
Expand Down
6 changes: 4 additions & 2 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ void MultiCommandSquasher::PerformHop() {
auto run_cb = absl::bind_front(&MultiCommandSquasher::SquashedHopCb, this);

// If all commands fit into a single batch, run them as a real single hop without multi overhead.
// Doesn't work with replication, so disallow if inline scheduling is not allowed.
bool singlehop_possible = IsAtomic() && !tx->IsScheduled() && cmds_.empty();
if (singlehop_possible) {
// Single hop concludes immediately, so give it shard write info
tx->ReportWritesSquashedMulti([this](ShardId sid) { return sharded_[sid].had_writes; });
tx->MultiBecomeSquasher();
DCHECK_GT(tx->GetUniqueShardCnt(), 0u); // it was initialized and determined active shards
tx->ScheduleSingleHop(run_cb);
return;
}
Expand Down Expand Up @@ -136,7 +136,9 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm

auto& sinfo = PrepareShardInfo(last_sid);

DCHECK_EQ(cmd->Cid()->opt_mask() & CO::NO_KEY_TRANSACTIONAL, 0u); // TODO: handle their writes
sinfo.had_writes |= cmd->Cid()->IsWriteOnly();

sinfo.cmds.push_back(cmd);
order_.push_back(last_sid);

Expand Down
2 changes: 2 additions & 0 deletions src/server/multi_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ TEST_F(MultiTest, MultiGlobalCommands) {

RespExpr resp = Run({"exec"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_FALSE(service_->IsLocked(0, "key"));
ASSERT_FALSE(service_->IsLocked(2, "key"));

ASSERT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL));

Expand Down
28 changes: 17 additions & 11 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id)
multi_->mode = LOCK_AHEAD;
}
multi_->role = SQUASHED_STUB;
MultiUpdateWithParent(parent);
}

Transaction::~Transaction() {
Expand Down Expand Up @@ -326,8 +327,6 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
CHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);

MultiSwitchCmd(cid);

multi_->role = SQUASHER;
InitBase(db_index_, {});

// Because squashing already determines active shards by partitioning commands,
Expand All @@ -345,11 +344,14 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
shard_data_[i].arg_start = 0;
shard_data_[i].arg_count = 0;
}

MultiBecomeSquasher();
}

void Transaction::MultiBecomeSquasher() {
DCHECK_EQ(multi_->mode, LOCK_AHEAD);
CHECK_EQ(coordinator_state_, 0); // not scheduled and certainly not executing
DCHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
DCHECK_GT(GetUniqueShardCnt(), 0u); // initialized and determined active shards
DCHECK(cid_->IsMultiTransactional()); // proper base command set
multi_->role = SQUASHER;
}

Expand Down Expand Up @@ -423,7 +425,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {

void Transaction::MultiUpdateWithParent(const Transaction* parent) {
DCHECK(multi_);
DCHECK_EQ(parent->multi_->role, SQUASHER);
DCHECK(parent->multi_); // it might not be a squasher yet, but certainly is multi
DCHECK_EQ(multi_->role, SQUASHED_STUB);
txid_ = parent->txid_;
time_now_ms_ = parent->time_now_ms_;
Expand Down Expand Up @@ -548,7 +550,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this);

if (IsAtomicMulti()) // We conclude as a single-hop multi command, so we need to report
if (IsAtomicMulti()) // Can only be true if run through ScheduleSingleHop
MultiReportJournalOnShard(shard);

// It has 2 responsibilities.
Expand Down Expand Up @@ -578,7 +580,7 @@ void Transaction::ScheduleInternal() {
DCHECK(!shard_data_.empty());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
DCHECK(!multi_ || cid_->IsMultiTransactional());
DCHECK(!IsAtomicMulti() || cid_->IsMultiTransactional());

bool span_all = IsGlobal();

Expand Down Expand Up @@ -696,8 +698,10 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {

bool scheduled = (coordinator_state_ & COORD_SCHED) > 0;
DCHECK(!scheduled || IsAtomicMulti()); // if we don't need to schedule, we're an atomic multi
DCHECK(scheduled || !IsAtomicMulti() || multi_->role == SQUASHER); // only useful for squashing
DCHECK(scheduled || !multi_ || cid_->IsMultiTransactional()); // don't schedule on normal cmd

bool suspicious_schedule = !scheduled && IsAtomicMulti();
DCHECK(!suspicious_schedule || multi_->role == SQUASHER); // only useful for squashing
DCHECK(!suspicious_schedule || cid_->IsMultiTransactional()); // don't schedule on normal cmd

if (!scheduled) // Conclude only if we schedule, otherwise we're part of multi-hop multi
coordinator_state_ |= COORD_EXEC_CONCLUDING;
Expand Down Expand Up @@ -797,7 +801,7 @@ void Transaction::UnlockMulti() {
return;

// We already concluded after running as a multi-shard single-hop
if ((coordinator_state_ & COORD_EXEC_CONCLUDING) == 0)
if ((coordinator_state_ & COORD_EXEC_CONCLUDING) > 0)
return;

auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
Expand Down Expand Up @@ -861,6 +865,8 @@ void Transaction::Execute(RunnableType cb, bool conclude) {

if (conclude && !IsAtomicMulti()) {
coordinator_state_ |= COORD_EXEC_CONCLUDING;
} else if (!IsAtomicMulti()) {
coordinator_state_ &= ~COORD_EXEC_CONCLUDING;
}

ExecuteAsync();
Expand Down Expand Up @@ -1491,7 +1497,7 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
if (multi_ && multi_->role != SQUASHED_STUB)
if (multi_ && multi_->role != SQUASHED_STUB) // those are recorded separately and accumulated
multi_->shard_journal_write[shard->shard_id()] = true;

bool is_multi = multi_commands || IsAtomicMulti();
Expand Down

0 comments on commit 2d5a5ce

Please sign in to comment.