Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor client tracking, fix atomicity, squashing and multi/exec #2970

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 6 additions & 13 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,9 @@ thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_poo
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;

void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; });
ec.await([this] {
return done || subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit;
kostasrim marked this conversation as resolved.
Show resolved Hide resolved
});
}

struct Connection::Shutdown {
Expand Down Expand Up @@ -885,6 +886,8 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
queue_backpressure_->done = true;
kostasrim marked this conversation as resolved.
Show resolved Hide resolved
queue_backpressure_->ec.notify();
phase_ = SHUTTING_DOWN;

VLOG(2) << "Before dispatch_fb.join()";
Expand Down Expand Up @@ -1114,7 +1117,7 @@ void Connection::HandleMigrateRequest() {
this->Migrate(dest);
}

DCHECK(dispatch_q_.empty());
// DCHECK(dispatch_q_.empty());
kostasrim marked this conversation as resolved.
Show resolved Hide resolved

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
LaunchDispatchFiberIfNeeded();
Expand Down Expand Up @@ -1641,16 +1644,6 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}

void Connection::SetClientTrackingSwitch(bool is_on) {
tracking_enabled_ = is_on;
if (tracking_enabled_)
cc_->subscriptions++;
}

bool Connection::IsTrackingOn() const {
return tracking_enabled_;
}

void Connection::StartTrafficLogging(string_view path) {
OpenTrafficLogger(path);
}
Expand Down
8 changes: 2 additions & 6 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ class Connection : public util::Connection {
// Connections will migrate at most once, and only when the flag --migrate_connections is true.
void RequestAsyncMigration(util::fb2::ProactorBase* dest);

void SetClientTrackingSwitch(bool is_on);

bool IsTrackingOn() const;

// Starts traffic logging in the calling thread. Must be a proactor thread.
// Each thread creates its own log file combining requests from all the connections in
// that thread. A noop if the thread is already logging.
Expand Down Expand Up @@ -333,6 +329,8 @@ class Connection : public util::Connection {

size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
// cancelation flag
bool done = false;
};

private:
Expand Down Expand Up @@ -444,8 +442,6 @@ class Connection : public util::Connection {
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Connection migration vars, see RequestAsyncMigration() above.
Expand Down
85 changes: 83 additions & 2 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, facade::Connection* own
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::ALL_COMMANDS);
}

ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction* tx,
ConnectionContext::ConnectionContext(ConnectionContext* owner, Transaction* tx,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to remove const here?

facade::CapturingReplyBuilder* crb)
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
: facade::ConnectionContext(nullptr, nullptr), transaction{tx}, parent_cntx_(owner) {
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::ALL_COMMANDS);
if (tx) { // If we have a carrier transaction, this context is used for squashing
DCHECK(owner);
Expand Down Expand Up @@ -119,6 +119,13 @@ void ConnectionContext::ChangeMonitor(bool start) {
EnableMonitoring(start);
}

ConnectionState::ClientTracking& ConnectionContext::ClientTrackingInfo() {
if (parent_cntx_) {
return parent_cntx_->conn_state.tracking_info_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That;s for squashing :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you access conn_state, don't make it a function on conn_cntx

then you can just use conn_state, you can make it mutable or add a new member like conn

if (cntx->conn_state.squashing_info)
cntx = cntx->conn_state.squashing_info->owner;

}
return conn_state.tracking_info_;
}

vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply,
ConnectionContext* conn) {
vector<unsigned> result(to_reply ? args.size() : 0, 0);
Expand Down Expand Up @@ -265,4 +272,78 @@ void ConnectionState::ExecInfo::ClearWatched() {
watched_existed = 0;
}

void ConnectionState::ClientTracking::SetClientTracking(bool is_on) {
tracking_enabled_ = is_on;
}

void ConnectionState::ClientTracking::TrackClientCaching() {
executing_command_ = true;
}

void ConnectionState::ClientTracking::UpdatePrevAndLastCommand() {
if (prev_command_ && multi_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that what you really want is to know if you are in the middle of EXEC execution and not multi.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We store so much fragile info that needs to be updated everywhere... seqnums would solve all this

return;
}
prev_command_ = std::exchange(executing_command_, false);
}

void ConnectionState::ClientTracking::SetOptin(bool optin) {
optin_ = optin;
}

void ConnectionState::ClientTracking::SetMulti(bool multi) {
multi_ = multi;
}

bool ConnectionState::ClientTracking::IsTrackingOn() const {
return tracking_enabled_;
}

bool ConnectionState::ClientTracking::ShouldTrackKeys() const {
if (!IsTrackingOn()) {
return false;
}

return !optin_ || prev_command_;
}

OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::WeakRef& conn_ref,
const ShardArgs& args) {
if (conn_ref.IsExpired()) {
DVLOG(2) << "Connection expired, exiting TrackKey function.";
return OpStatus::OK;
}

DVLOG(2) << "Start tracking keys for client ID: " << conn_ref.GetClientId()
<< " with thread ID: " << conn_ref.Thread();

auto& db_slice = slice_args.shard->db_slice();
// TODO: There is a bug here that we track all arguments instead of tracking only keys.
Copy link
Contributor

@dranikpg dranikpg May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, found it, yes that's also left 🙂

Copy link
Contributor Author

@kostasrim kostasrim May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

99.9% it does not affect us and it's not a bug. I created this #3034 for @romange to confirm and I will push a quick fix

for (auto key : args) {
DVLOG(2) << "Inserting client ID " << conn_ref.GetClientId()
<< " into the tracking client set of key " << key;
db_slice.TrackKey(conn_ref, key);
}

return OpStatus::OK;
}

void ConnectionState::ClientTracking::Track(ConnectionContext* cntx, const CommandId* cid) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function now is called within the Shard. We have a convention to name it as xxxOnShard to stress it.

auto& info = cntx->ClientTrackingInfo();
auto shards = cntx->transaction->GetActiveShards();
if ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys()) {
romange marked this conversation as resolved.
Show resolved Hide resolved
if (cntx->parent_cntx_) {
}
auto conn = cntx->parent_cntx_ ? cntx->parent_cntx_->conn()->Borrow() : cntx->conn()->Borrow();
romange marked this conversation as resolved.
Show resolved Hide resolved
auto cb = [&, conn](unsigned i, auto* pb) {
if (shards.find(i) != shards.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is IsActive() so you don't need GetActiveShards()

auto* t = cntx->transaction;
CHECK(t);
auto* shard = EngineShard::tlocal();
OpTrackKeys(t->GetOpArgs(shard), conn, t->GetShardArgs(shard->shard_id()));
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
}
} // namespace dfly
80 changes: 78 additions & 2 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,78 @@ struct ConnectionState {

size_t UsedMemory() const;

// Client tracking is a per-connection state machine that adheres to the requirements
// of the CLIENT TRACKING command. Note that the semantics described below are enforced
// by the tests in server_family_test. The rules are:
// 1. If CLIENT TRACKING is ON then each READ command must be tracked. Invalidation
// messages are sent `only once`. Subsequent changes of the same key require the
// client to re-read the key in order to receive the next invalidation message.
// 2. CLIENT TRACKING ON OPTIN turns on optional tracking. Read commands are not
// tracked unless the client issues a CLIENT CACHING YES command which conditionally
// allows the tracking of the command that follows CACHING YES). For example:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> GET foo <--------------------- From now foo is being tracked
// However:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> SET foo bar
// >> GET foo <--------------------- is *NOT* tracked since GET does not succeed CACHING
// Also, in the context of multi transactions, CLIENT CACHING YES is *STICKY*:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> MULTI
// >> GET foo
// >> SET foo bar
// >> GET brother_foo
// >> EXEC
// From this point onwards `foo` and `get` keys are tracked. Same aplies if CACHING YES
// is used within the MULTI/EXEC block.
//
// The state machine implements the above rules. We need to track two commands at each time:
// 1. The command invoked previously.
// 2. The command that is invoked now (via InvokeCmd).
// Which is tracked by current_command_ and prev_command_ respectively. When CACHING YES
// is invoked the current_command_ is set to true which is later moved to the prev_command_
// when the next command is invoked. This is needed to keep track of the different rules
// described above. Stickiness is covered similarly by the multi/exec/discard command which
// when called sets the corresponding multi_ variable to true.
class ClientTracking {
public:
// Sets to true when CLIENT TRACKING is ON
void SetClientTracking(bool is_on);
// Enable tracking on the client
void TrackClientCaching();
kostasrim marked this conversation as resolved.
Show resolved Hide resolved

void UpdatePrevAndLastCommand();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UdatePrevAndLastCommand name describes the implementation of this function. What it does is advancing the state. So I think it's better call it Tick or Advance or Update

// Set if OPTIN subcommand is used in CLIENT TRACKING
romange marked this conversation as resolved.
Show resolved Hide resolved
void SetOptin(bool optin);
// When Multi command is invoked, it calls this to broadcast that we are on a multi
// transaction.
void SetMulti(bool multi);

// Check if the keys should be tracked. Result adheres to the state machine described above.
bool ShouldTrackKeys() const;
// Check only if CLIENT TRACKING is ON
bool IsTrackingOn() const;

// Iterates over the active shards of the transaction. If a key satisfies
// the tracking requirements, is is set for tracking.
void Track(ConnectionContext* cntx, const CommandId* cid);

private:
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool optin_ = false;
// remember if CLIENT CACHING TRUE was the last command
// true if the previous command invoked is CLIENT CACHING TRUE
bool prev_command_ = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename prev_command_ to track_current_cmd_

// true if the currently executing command is CLIENT CACHING TRUE
bool executing_command_ = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: executing_command_ to track_next_cmd_

Copy link
Contributor Author

@kostasrim kostasrim May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the track_next_cmd_ seems misleading since it implies it's the next command. executing_command_ is the command we currently execute in InvokeCmd flow and prev_command_ is the command before it. So:

>> GET FOO ----> prev_command
>> GET BAR ----> current_command

// true if we are in a multi transaction
bool multi_ = false;
};

public:
DbIndex db_index = 0;

Expand All @@ -161,14 +233,14 @@ struct ConnectionState {
std::optional<SquashingInfo> squashing_info;
std::unique_ptr<ScriptInfo> script_info;
std::unique_ptr<SubscribeInfo> subscribe_info;
ClientTracking tracking_info_;
};

class ConnectionContext : public facade::ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, facade::Connection* owner);

ConnectionContext(const ConnectionContext* owner, Transaction* tx,
facade::CapturingReplyBuilder* crb);
ConnectionContext(ConnectionContext* owner, Transaction* tx, facade::CapturingReplyBuilder* crb);

struct DebugInfo {
uint32_t shards_count = 0;
Expand All @@ -183,6 +255,10 @@ class ConnectionContext : public facade::ConnectionContext {
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;
ConnectionContext* parent_cntx_ = nullptr;
romange marked this conversation as resolved.
Show resolved Hide resolved

ConnectionState::ClientTracking& ClientTrackingInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment on whether we can keep this in conn_state


ConnectionState conn_state;

DbIndex db_index() const {
Expand Down
37 changes: 20 additions & 17 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1391,24 +1391,27 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
return;

auto it = client_tracking_map_.find(key);
if (it != client_tracking_map_.end()) {
// notify all the clients.
auto& client_set = it->second;
auto cb = [key, client_set = std::move(client_set)](unsigned idx, util::ProactorBase*) {
for (auto it = client_set.begin(); it != client_set.end(); ++it) {
if ((unsigned int)it->Thread() != idx)
continue;
facade::Connection* conn = it->Get();
if ((conn != nullptr) && conn->IsTrackingOn()) {
std::string key_str = {key.begin(), key.end()};
conn->SendInvalidationMessageAsync({key_str});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
if (it == client_tracking_map_.end()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in most cases client_tracking_map_ is empty but find still does all the computations including the hash value to return negative results. I suggest to guard this with .empty() check to save some cycles here

return;
}
auto& client_set = it->second;
romange marked this conversation as resolved.
Show resolved Hide resolved
// notify all the clients.
auto cb = [key = std::string(key), client_set = std::move(client_set)](unsigned idx,
romange marked this conversation as resolved.
Show resolved Hide resolved
util::ProactorBase*) {
for (auto& client : client_set) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->ClientTrackingInfo().IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
}

void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
Expand Down