Skip to content

Commit

Permalink
Have consistent behavior of SPUBLISH within multi/exec like regular c…
Browse files Browse the repository at this point in the history
…ommand (redis#13276)

This PR is based on the commits from PR redis#12944.

Allow SPUBLISH command within multi/exec on replica

Behavior on unstable:

```
127.0.0.1:6380> CLUSTER NODES
39ce8aa20f1f0d91f1a88d976ee1926dfefcdf1a 127.0.0.1:6380@16380 myself,slave 8b0feb120b68aac489d6a5af9c77dc40d71bc792 0 0 0 connected
8b0feb120b68aac489d6a5af9c77dc40d71bc792 127.0.0.1:6379@16379 master - 0 1705091681202 0 connected 0-16383
127.0.0.1:6380> SPUBLISH hello world
(integer) 0
127.0.0.1:6380> MULTI
OK
127.0.0.1:6380(TX)> SPUBLISH hello world
QUEUED
127.0.0.1:6380(TX)> EXEC
(error) MOVED 866 127.0.0.1:6379
```

With this change:

```
127.0.0.1:6380> SPUBLISH hello world
(integer) 0
127.0.0.1:6380> MULTI
OK
127.0.0.1:6380(TX)> SPUBLISH hello world
QUEUED
127.0.0.1:6380(TX)> EXEC
1) (integer) 0
```

---------

Co-authored-by: Harkrishn Patro <[email protected]>
Co-authored-by: oranagra <[email protected]>
  • Loading branch information
3 people authored and filipecosta90 committed May 28, 2024
1 parent cfae246 commit 05da067
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/

/*
Expand Down Expand Up @@ -951,6 +953,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */

/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
Expand Down Expand Up @@ -982,10 +985,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}

int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
Expand All @@ -998,6 +997,13 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE))
{
pubsubshard_included = 1;
}

getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
Expand Down Expand Up @@ -1061,7 +1067,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
if ((migrating_slot || importing_slot) && !pubsubshard_included)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
Expand All @@ -1077,7 +1083,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (!isClusterHealthy()) {
if (is_pubsubshard) {
if (pubsubshard_included) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
Expand Down Expand Up @@ -1140,7 +1146,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
!is_write_command &&
clusterNodeIsSlave(myself) &&
clusterNodeGetSlaveof(myself) == n)
Expand Down
1 change: 1 addition & 0 deletions tests/test_helper.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ set ::all_tests {
unit/cluster/links
unit/cluster/cluster-response-tls
unit/cluster/failure-marking
unit/cluster/sharded-pubsub
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/cluster/sharded-pubsub.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#

start_cluster 1 1 {tags {external:skip cluster}} {
set primary_id 0
set replica1_id 1

set primary [Rn $primary_id]
set replica [Rn $replica1_id]

test "Sharded pubsub publish behavior within multi/exec" {
foreach {node} {primary replica} {
set node [set $node]
$node MULTI
$node SPUBLISH ch1 "hello"
$node EXEC
}
}

test "Sharded pubsub within multi/exec with cross slot operation" {
$primary MULTI
$primary SPUBLISH ch1 "hello"
$primary GET foo
catch {[$primary EXEC]} err
assert_match {CROSSSLOT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary GET foo
$primary EXEC
} {0 {}}

test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica GET foo]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary SET foo bar
$primary EXEC
} {0 OK}

test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica SET foo bar]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}
}

0 comments on commit 05da067

Please sign in to comment.