Skip to content

Commit

Permalink
fix: remove acl-check and cancel instead when REPLCONF ACK fails to v…
Browse files Browse the repository at this point in the history
…alidate (#2920)

* remove acl-checker which is also the source of this bug
* add a fallback that cancels replication when REPLCONF ACK fail
  • Loading branch information
kostasrim committed May 1, 2024
1 parent c84b6fa commit 1fd16ab
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 62 deletions.
6 changes: 3 additions & 3 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class DflyCmd {
// Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

private:
// JOURNAL [START/STOP]
// Start or stop journaling.
Expand Down Expand Up @@ -200,9 +203,6 @@ class DflyCmd {
// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id);

Expand Down
7 changes: 6 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Bonus points because this allows to continue replication with ACL users who got
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
LOG(ERROR) << "Tried to reply to REPLCONF";
auto info_ptr = server_family_.GetReplicaInfo(dfly_cntx);
if (info_ptr) {
unsigned session_id = dfly_cntx->conn_state.replication_info.repl_session_id;
DCHECK(session_id);
server_family_.GetDflyCmd()->CancelReplication(session_id, std::move(info_ptr));
}
return;
}
dfly_cntx->SendError(std::move(*err));
Expand Down
56 changes: 0 additions & 56 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ Replica::Replica(string host, uint16_t port, Service* se, std::string_view id,
Replica::~Replica() {
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
acl_check_fb_.JoinIfNeeded();
}

static const char kConnErr[] = "could not connect to master: ";
Expand Down Expand Up @@ -150,7 +149,6 @@ void Replica::Stop() {
// so we can freely release resources (connections).
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
acl_check_fb_.JoinIfNeeded();
}

void Replica::Pause(bool pause) {
Expand Down Expand Up @@ -625,8 +623,6 @@ error_code Replica::ConsumeRedisStream() {
error_code Replica::ConsumeDflyStream() {
// Set new error handler that closes flow sockets.
auto err_handler = [this](const auto& ge) {
// Trigger acl-checker
replica_waker_.notifyAll();
// Make sure the flows are not in a state transition
lock_guard lk{flows_op_mu_};
DefaultErrorHandler(ge);
Expand Down Expand Up @@ -655,12 +651,7 @@ error_code Replica::ConsumeDflyStream() {
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));
}

if (master_context_.version >= DflyVersion::VER3) {
acl_check_fb_ = fb2::Fiber("acl-check", &Replica::AclCheckFb, this);
}

JoinDflyFlows();
acl_check_fb_.JoinIfNeeded();

last_journal_LSNs_.emplace();
for (auto& flow : shard_flows_) {
Expand Down Expand Up @@ -893,53 +884,6 @@ void Replica::RedisStreamAcksFb() {
}
}

class AclCheckerClient : public ProtocolClient {
public:
AclCheckerClient(ServerContext server, Context* cntx)
: ProtocolClient(std::move(server)), cntx_(cntx) {
Connect();
}

void CheckAclRoundTrip() {
if (auto ec = SendCommandAndReadResponse(StrCat("REPLCONF acl-check ", "0")); ec) {
cntx_->Cancel();
LOG(INFO) << "Error in REPLCONF acl-check: " << ec.message();
} else if (!CheckRespIsSimpleReply("OK")) {
cntx_->Cancel();
LOG(INFO) << "Error: " << ToSV(LastResponseArgs().front().GetBuf());
}
}

private:
void Connect() {
VLOG(1) << "Connecting with acl client";
auto ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, cntx_);
if (ec) {
LOG(INFO) << "Failed to connect with acl client " << ec.message();
cntx_->Cancel();
}
}

Context* cntx_;
};

void Replica::AclCheckFb() {
// We need a new client with a different socket for acl-checks
// instead of using the ACK's fiber. This is because acks should
// not be replied (which makes them unusable for periodic ACL checks).
// Also there are N ACK fibers per replica instance while we only need
// one fiber to periodically check for ACL changes. Therefore,
// we decouple the logic via AclCheckFb.
AclCheckerClient acl_client(server(), &cntx_);

while (!cntx_.IsCancelled()) {
acl_client.CheckAclRoundTrip();
// We poll for ACL changes every second
replica_waker_.await_until([&]() { return cntx_.IsCancelled(); },
std::chrono::steady_clock::now() + std::chrono::seconds(1));
}
}

void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) {
constexpr size_t kAckRecordMaxInterval = 1024;
std::chrono::duration ack_time_max_interval =
Expand Down
1 change: 0 additions & 1 deletion src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class Replica : ProtocolClient {
// In redis replication mode.
util::fb2::Fiber sync_fb_;
util::fb2::Fiber acks_fb_;
util::fb2::Fiber acl_check_fb_;
util::fb2::EventCount replica_waker_;

std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
cntx->replication_flow->last_acked_lsn = ack;
return;
} else if (cmd == "ACL-CHECK") {
// TODO(kostasrim): Remove this branch 20/6/2024
cntx->SendOk();
return;
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "facade/reply_builder.h"
#include "server/channel_store.h"
#include "server/detail/save_stages_controller.h"
#include "server/dflycmd.h"
#include "server/engine_shard_set.h"
#include "server/replica.h"
#include "server/server_state.h"
Expand Down Expand Up @@ -49,7 +50,6 @@ class ClusterFamily;

class ConnectionContext;
class CommandRegistry;
class DflyCmd;
class Service;
class ScriptMgr;

Expand Down Expand Up @@ -215,6 +215,10 @@ class ServerFamily {
bool HasReplica() const;
std::optional<Replica::Info> GetReplicaInfo() const;

std::shared_ptr<DflyCmd::ReplicaInfo> GetReplicaInfo(ConnectionContext* cntx) const {
return dfly_cmd_->GetReplicaInfo(cntx);
}

void OnClose(ConnectionContext* cntx);

void BreakOnShutdown();
Expand Down

0 comments on commit 1fd16ab

Please sign in to comment.