Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor client tracking, fix atomicity, squashing and multi/exec #2970

Merged
merged 20 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 1 addition & 12 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
phase_ = SHUTTING_DOWN;

VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
VLOG(2) << "After dispatch_fb.join()";
Expand Down Expand Up @@ -1114,7 +1113,7 @@ void Connection::HandleMigrateRequest() {
this->Migrate(dest);
}

DCHECK(dispatch_q_.empty());
// DCHECK(dispatch_q_.empty());
kostasrim marked this conversation as resolved.
Show resolved Hide resolved

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
LaunchDispatchFiberIfNeeded();
Expand Down Expand Up @@ -1641,16 +1640,6 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}

void Connection::SetClientTrackingSwitch(bool is_on) {
tracking_enabled_ = is_on;
if (tracking_enabled_)
cc_->subscriptions++;
}

bool Connection::IsTrackingOn() const {
return tracking_enabled_;
}

void Connection::StartTrafficLogging(string_view path) {
OpenTrafficLogger(path);
}
Expand Down
6 changes: 0 additions & 6 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ class Connection : public util::Connection {
// Connections will migrate at most once, and only when the flag --migrate_connections is true.
void RequestAsyncMigration(util::fb2::ProactorBase* dest);

void SetClientTrackingSwitch(bool is_on);

bool IsTrackingOn() const;

// Starts traffic logging in the calling thread. Must be a proactor thread.
// Each thread creates its own log file combining requests from all the connections in
// that thread. A noop if the thread is already logging.
Expand Down Expand Up @@ -444,8 +440,6 @@ class Connection : public util::Connection {
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Connection migration vars, see RequestAsyncMigration() above.
Expand Down
6 changes: 3 additions & 3 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc conn_context.cc channel_store.cc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need this move now

)

SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessors.cc
search/aggregator.cc)

add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc
config_registry.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
${SEARCH_FILES}
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
Expand Down
49 changes: 47 additions & 2 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, facade::Connection* own
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::ALL_COMMANDS);
}

ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction* tx,
ConnectionContext::ConnectionContext(ConnectionContext* owner, Transaction* tx,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to remove const here?

facade::CapturingReplyBuilder* crb)
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
: facade::ConnectionContext(nullptr, nullptr), transaction{tx}, parent_cntx_(owner) {
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::ALL_COMMANDS);
if (tx) { // If we have a carrier transaction, this context is used for squashing
DCHECK(owner);
Expand Down Expand Up @@ -265,4 +265,49 @@ void ConnectionState::ExecInfo::ClearWatched() {
watched_existed = 0;
}

bool ConnectionState::ClientTracking::ShouldTrackKeys() const {
if (!IsTrackingOn()) {
return false;
}

return !optin_ || (seq_num_ == (1 + caching_seq_num_));
}
romange marked this conversation as resolved.
Show resolved Hide resolved

OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::WeakRef& conn_ref,
const ShardArgs& args) {
if (conn_ref.IsExpired()) {
DVLOG(2) << "Connection expired, exiting TrackKey function.";
return OpStatus::OK;
}

DVLOG(2) << "Start tracking keys for client ID: " << conn_ref.GetClientId()
<< " with thread ID: " << conn_ref.Thread();

auto& db_slice = slice_args.shard->db_slice();
// TODO: There is a bug here that we track all arguments instead of tracking only keys.
Copy link
Contributor

@dranikpg dranikpg May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, found it, yes that's also left 🙂

Copy link
Contributor Author

@kostasrim kostasrim May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

99.9% it does not affect us and it's not a bug. I created this #3034 for @romange to confirm and I will push a quick fix

for (auto key : args) {
DVLOG(2) << "Inserting client ID " << conn_ref.GetClientId()
<< " into the tracking client set of key " << key;
db_slice.TrackKey(conn_ref, key);
}

return OpStatus::OK;
}

void ConnectionState::ClientTracking::Track(ConnectionContext* cntx, const CommandId* cid) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function now is called within the Shard. We have a convention to name it as xxxOnShard to stress it.

auto& info = cntx->conn_state.tracking_info_;
auto shards = cntx->transaction->GetActiveShards();
if ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys()) {
romange marked this conversation as resolved.
Show resolved Hide resolved
auto conn = cntx->parent_cntx_ ? cntx->parent_cntx_->conn()->Borrow() : cntx->conn()->Borrow();
romange marked this conversation as resolved.
Show resolved Hide resolved
auto cb = [&, conn](unsigned i, auto* pb) {
if (shards.find(i) != shards.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is IsActive() so you don't need GetActiveShards()

auto* t = cntx->transaction;
CHECK(t);
auto* shard = EngineShard::tlocal();
OpTrackKeys(t->GetOpArgs(shard), conn, t->GetShardArgs(shard->shard_id()));
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
}
} // namespace dfly
91 changes: 89 additions & 2 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,91 @@ struct ConnectionState {

size_t UsedMemory() const;

// Client tracking is a per-connection state machine that adheres to the requirements
// of the CLIENT TRACKING command. Note that the semantics described below are enforced
// by the tests in server_family_test. The rules are:
// 1. If CLIENT TRACKING is ON then each READ command must be tracked. Invalidation
// messages are sent `only once`. Subsequent changes of the same key require the
// client to re-read the key in order to receive the next invalidation message.
// 2. CLIENT TRACKING ON OPTIN turns on optional tracking. Read commands are not
// tracked unless the client issues a CLIENT CACHING YES command which conditionally
// allows the tracking of the command that follows CACHING YES). For example:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> GET foo <--------------------- From now foo is being tracked
// However:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> SET foo bar
// >> GET foo <--------------------- is *NOT* tracked since GET does not succeed CACHING
// Also, in the context of multi transactions, CLIENT CACHING YES is *STICKY*:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> MULTI
// >> GET foo
// >> SET foo bar
// >> GET brother_foo
// >> EXEC
// From this point onwards `foo` and `get` keys are tracked. Same aplies if CACHING YES
// is used within the MULTI/EXEC block.
//
// The state machine implements the above rules. We need to track:
// 1. If TRACKING is ON and OPTIN
// 2. Stickiness of CACHING as described above
//
// We introduce a monotonic counter called sequence number which we increment only:
// * On InvokeCmd when we are not Collecting (multi)
// We introduce another counter called caching_seq_num which is set to seq_num
// when the users sends a CLIENT CACHING YES command
// If seq_num == caching_seq_num + 1 then we know that we should Track().
class ClientTracking {
public:
// Sets to true when CLIENT TRACKING is ON
void SetClientTracking(bool is_on) {
tracking_enabled_ = is_on;
}

// Increment current sequence number
void IncrementSequenceNumber() {
++seq_num_;
}

// Set if OPTIN subcommand is used in CLIENT TRACKING
romange marked this conversation as resolved.
Show resolved Hide resolved
void SetOptin(bool optin) {
optin_ = optin;
}

// Check if the keys should be tracked. Result adheres to the state machine described above.
bool ShouldTrackKeys() const;

// Check only if CLIENT TRACKING is ON
bool IsTrackingOn() const {
return tracking_enabled_;
}

// Iterates over the active shards of the transaction. If a key satisfies
// the tracking requirements, is is set for tracking.
void Track(ConnectionContext* cntx, const CommandId* cid);

// Called by CLIENT CACHING YES and caches the current seq_num_
void SetCachingSequenceNumber(bool is_multi) {
// We need -1 when we are in multi
caching_seq_num_ = is_multi && seq_num_ != 0 ? seq_num_ - 1 : seq_num_;
}

void ResetCachingSequenceNumber() {
caching_seq_num_ = 0;
}

private:
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool optin_ = false;
// sequence number
size_t seq_num_ = 0;
size_t caching_seq_num_ = 0;
};

public:
DbIndex db_index = 0;

Expand All @@ -161,14 +246,14 @@ struct ConnectionState {
std::optional<SquashingInfo> squashing_info;
std::unique_ptr<ScriptInfo> script_info;
std::unique_ptr<SubscribeInfo> subscribe_info;
ClientTracking tracking_info_;
};

class ConnectionContext : public facade::ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, facade::Connection* owner);

ConnectionContext(const ConnectionContext* owner, Transaction* tx,
facade::CapturingReplyBuilder* crb);
ConnectionContext(ConnectionContext* owner, Transaction* tx, facade::CapturingReplyBuilder* crb);

struct DebugInfo {
uint32_t shards_count = 0;
Expand All @@ -183,6 +268,8 @@ class ConnectionContext : public facade::ConnectionContext {
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;
ConnectionContext* parent_cntx_ = nullptr;
romange marked this conversation as resolved.
Show resolved Hide resolved

ConnectionState conn_state;

DbIndex db_index() const {
Expand Down
37 changes: 20 additions & 17 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1391,24 +1391,27 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
return;

auto it = client_tracking_map_.find(key);
if (it != client_tracking_map_.end()) {
// notify all the clients.
auto& client_set = it->second;
auto cb = [key, client_set = std::move(client_set)](unsigned idx, util::ProactorBase*) {
for (auto it = client_set.begin(); it != client_set.end(); ++it) {
if ((unsigned int)it->Thread() != idx)
continue;
facade::Connection* conn = it->Get();
if ((conn != nullptr) && conn->IsTrackingOn()) {
std::string key_str = {key.begin(), key.end()};
conn->SendInvalidationMessageAsync({key_str});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
if (it == client_tracking_map_.end()) {
return;
}
auto& client_set = it->second;
romange marked this conversation as resolved.
Show resolved Hide resolved
// notify all the clients.
auto cb = [key = std::string(key), client_set = std::move(client_set)](unsigned idx,
romange marked this conversation as resolved.
Show resolved Hide resolved
util::ProactorBase*) {
for (auto& client : client_set) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->conn_state.tracking_info_.IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
}

void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
Expand Down