Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove acl-check and cancel instead when REPLCONF ACK fails to validate #2920

Merged
merged 8 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class DflyCmd {
// Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

private:
// JOURNAL [START/STOP]
// Start or stop journaling.
Expand Down Expand Up @@ -200,9 +203,6 @@ class DflyCmd {
// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id);

Expand Down
7 changes: 6 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Bonus points because this allows to continue replication with ACL users who got
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
LOG(ERROR) << "Tried to reply to REPLCONF";
auto info_ptr = server_family_.GetReplicaInfo(dfly_cntx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adiholden please help me reviewing these changes

if (info_ptr) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont you need to call DflyCmd::CancelReplication?

unsigned session_id = dfly_cntx->conn_state.replication_info.repl_session_id;
DCHECK(session_id);
server_family_.GetDflyCmd()->CancelReplication(session_id, std::move(info_ptr));
}
return;
}
dfly_cntx->SendError(std::move(*err));
Expand Down
56 changes: 0 additions & 56 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ Replica::Replica(string host, uint16_t port, Service* se, std::string_view id,
Replica::~Replica() {
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
acl_check_fb_.JoinIfNeeded();
}

static const char kConnErr[] = "could not connect to master: ";
Expand Down Expand Up @@ -150,7 +149,6 @@ void Replica::Stop() {
// so we can freely release resources (connections).
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
acl_check_fb_.JoinIfNeeded();
}

void Replica::Pause(bool pause) {
Expand Down Expand Up @@ -625,8 +623,6 @@ error_code Replica::ConsumeRedisStream() {
error_code Replica::ConsumeDflyStream() {
// Set new error handler that closes flow sockets.
auto err_handler = [this](const auto& ge) {
// Trigger acl-checker
replica_waker_.notifyAll();
// Make sure the flows are not in a state transition
lock_guard lk{flows_op_mu_};
DefaultErrorHandler(ge);
Expand Down Expand Up @@ -655,12 +651,7 @@ error_code Replica::ConsumeDflyStream() {
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));
}

if (master_context_.version >= DflyVersion::VER3) {
acl_check_fb_ = fb2::Fiber("acl-check", &Replica::AclCheckFb, this);
}

JoinDflyFlows();
acl_check_fb_.JoinIfNeeded();

last_journal_LSNs_.emplace();
for (auto& flow : shard_flows_) {
Expand Down Expand Up @@ -893,53 +884,6 @@ void Replica::RedisStreamAcksFb() {
}
}

class AclCheckerClient : public ProtocolClient {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a small bug if master is an older version that we cover in our tests. The problem is that if ACL of the masteruser changes then REPLCONF ACK will start failing silently without noticing and replication will break :(

I guess this setup is more probably since upgrading from one version to the other would involve a replica copying the data and then being promoted to master. An intermediate solution would be to keep this check for older version masters and deprecate/completely remove at some later point.

I am open for suggestions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine, the use-case is not interesting.

public:
AclCheckerClient(ServerContext server, Context* cntx)
: ProtocolClient(std::move(server)), cntx_(cntx) {
Connect();
}

void CheckAclRoundTrip() {
if (auto ec = SendCommandAndReadResponse(StrCat("REPLCONF acl-check ", "0")); ec) {
cntx_->Cancel();
LOG(INFO) << "Error in REPLCONF acl-check: " << ec.message();
} else if (!CheckRespIsSimpleReply("OK")) {
cntx_->Cancel();
LOG(INFO) << "Error: " << ToSV(LastResponseArgs().front().GetBuf());
}
}

private:
void Connect() {
VLOG(1) << "Connecting with acl client";
auto ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, cntx_);
if (ec) {
LOG(INFO) << "Failed to connect with acl client " << ec.message();
cntx_->Cancel();
}
}

Context* cntx_;
};

void Replica::AclCheckFb() {
// We need a new client with a different socket for acl-checks
// instead of using the ACK's fiber. This is because acks should
// not be replied (which makes them unusable for periodic ACL checks).
// Also there are N ACK fibers per replica instance while we only need
// one fiber to periodically check for ACL changes. Therefore,
// we decouple the logic via AclCheckFb.
AclCheckerClient acl_client(server(), &cntx_);

while (!cntx_.IsCancelled()) {
acl_client.CheckAclRoundTrip();
// We poll for ACL changes every second
replica_waker_.await_until([&]() { return cntx_.IsCancelled(); },
std::chrono::steady_clock::now() + std::chrono::seconds(1));
}
}

void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) {
constexpr size_t kAckRecordMaxInterval = 1024;
std::chrono::duration ack_time_max_interval =
Expand Down
1 change: 0 additions & 1 deletion src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class Replica : ProtocolClient {
// In redis replication mode.
util::fb2::Fiber sync_fb_;
util::fb2::Fiber acks_fb_;
util::fb2::Fiber acl_check_fb_;
util::fb2::EventCount replica_waker_;

std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2599,6 +2599,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
cntx->replication_flow->last_acked_lsn = ack;
return;
} else if (cmd == "ACL-CHECK") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks replication when replica is of lower version. My two cents is that we don't care for this case since this setup doesn't really make much sense. Let me know if you disagree

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep the code in master and add a TODO with your anniversary date to remove this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add an alarm on my calendar as well 😉

// TODO(kostasrim): Remove this branch 20/6/2024
cntx->SendOk();
return;
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "facade/reply_builder.h"
#include "server/channel_store.h"
#include "server/detail/save_stages_controller.h"
#include "server/dflycmd.h"
#include "server/engine_shard_set.h"
#include "server/replica.h"
#include "server/server_state.h"
Expand Down Expand Up @@ -47,7 +48,6 @@ class Journal;
class ClusterFamily;
class ConnectionContext;
class CommandRegistry;
class DflyCmd;
class Service;
class ScriptMgr;

Expand Down Expand Up @@ -215,6 +215,10 @@ class ServerFamily {
bool HasReplica() const;
std::optional<Replica::Info> GetReplicaInfo() const;

std::shared_ptr<DflyCmd::ReplicaInfo> GetReplicaInfo(ConnectionContext* cntx) const {
return dfly_cmd_->GetReplicaInfo(cntx);
}

void OnClose(ConnectionContext* cntx);

void BreakOnShutdown();
Expand Down