Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have consistent behavior of SPUBLISH within multi/exec like regular command #13276

Merged
merged 8 commits into from
May 21, 2024
17 changes: 10 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/

/*
Expand Down Expand Up @@ -1009,9 +1011,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}

int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
uint64_t cmd_flags = getCommandFlags(c);

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included = (cmd_flags & CMD_PUBSUB) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also affects the PUBSUB command, including PUBSUB HELP.

right below this line, we have a loop iterating on each command (handling their keys), why can't we move the previous check we had (or better yet use doesCommandHaveChannelsWithFlags) into the loop, and then set a flag named pubsubshard_included to be used in the code below the loop.?

Copy link
Collaborator Author

@sundb sundb May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUBSUB command doesn't enter getNodeByQuery().
pubsub_cmd->flags&CMD_MOVABLE_KEYS == 0 and pubsub_cmd->key_specs_num == 0

    !(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
          c->cmd->proc != execCommand))
    {
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &c->slot,&error_code);
       ....
    }


/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1088,7 +1092,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
if ((migrating_slot || importing_slot) && !pubsubshard_included)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
Expand All @@ -1101,11 +1105,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* without redirections or errors in all the cases. */
if (n == NULL) return myself;

uint64_t cmd_flags = getCommandFlags(c);
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (!isClusterHealthy()) {
if (is_pubsubshard) {
if (pubsubshard_included) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
Expand Down Expand Up @@ -1168,7 +1171,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
!is_write_command &&
clusterNodeIsSlave(myself) &&
clusterNodeGetSlaveof(myself) == n)
Expand Down
1 change: 1 addition & 0 deletions tests/test_helper.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ set ::all_tests {
unit/cluster/links
unit/cluster/cluster-response-tls
unit/cluster/failure-marking
unit/cluster/sharded-pubsub
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/cluster/sharded-pubsub.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#

start_cluster 1 1 {tags {external:skip cluster}} {
set primary_id 0
set replica1_id 1

set primary [Rn $primary_id]
set replica [Rn $replica1_id]

test "Sharded pubsub publish behavior within multi/exec" {
foreach {node} {primary replica} {
set node [set $node]
$node MULTI
$node SPUBLISH ch1 "hello"
$node EXEC
}
}

test "Sharded pubsub within multi/exec with cross slot operation" {
$primary MULTI
$primary SPUBLISH ch1 "hello"
$primary GET foo
catch {[$primary EXEC]} err
assert_match {CROSSSLOT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary GET foo
$primary EXEC
} {0 {}}

test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica GET foo]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary SET foo bar
$primary EXEC
} {0 OK}

test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica SET foo bar]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}
}