diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 6ede252b0c26..9244d93f77fe 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1262,6 +1262,10 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "amqpl_direct_reply_to_SUITE", +) + assert_suites() filegroup( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 858625604002..e03eac147855 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -2175,3 +2175,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) + erlang_bytecode( + name = "amqpl_direct_reply_to_SUITE_beam_files", + testonly = True, + srcs = ["test/amqpl_direct_reply_to_SUITE.erl"], + outs = ["test/amqpl_direct_reply_to_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index eedec82451a5..6f780c9cb9da 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -45,7 +45,7 @@ -behaviour(gen_server2). -export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver_reply/2]). +-export([send_command/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, emit_info_all/4, info_local/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -167,7 +167,7 @@ %% rejected but are yet to be sent to the client rejected, %% used by "one shot RPC" (amq. - reply_consumer, + reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()}, %% flow | noflow, see rabbitmq-server#114 delivery_flow, interceptor_state, @@ -306,7 +306,14 @@ send_command(Pid, Msg) -> -spec deliver_reply(binary(), mc:state()) -> 'ok'. -deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> +deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message0) -> + Message = case rabbit_feature_flags:is_enabled(message_containers) of + true -> + Message0; + false -> + %% 3.12 nodes expect a #delivery{} + #delivery{message = Message0} + end, case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of {ok, Pid, Key} -> @@ -330,8 +337,11 @@ deliver_reply_v1(EncodedBin, Message) -> %% We want to ensure people can't use this mechanism to send a message %% to an arbitrary process and kill it! --spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'. - +-spec deliver_reply_local(pid(), binary(), mc:state() | rabbit_types:delivery()) -> ok. +deliver_reply_local(Pid, Key, #delivery{message = BasicMsg}) -> + %% Backward compat clause when feature flag message_containers is disabled. + %% 3.12 nodes send us a #delivery{}. + deliver_reply_local(Pid, Key, BasicMsg); deliver_reply_local(Pid, Key, Message) -> case pg_local:in_group(rabbit_channels, Pid) of true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); @@ -1269,8 +1279,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {ok, Message0} -> Message = rabbit_message_interceptor:intercept(Message0), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), - [rabbit_channel:deliver_reply(RK, Message) || - {virtual_reply_queue, RK} <- QNames], + [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], Queues = rabbit_amqqueue:lookup_many(QNames), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), diff --git a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl new file mode 100644 index 000000000000..38cae6ca37f4 --- /dev/null +++ b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl @@ -0,0 +1,124 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqpl_direct_reply_to_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile([nowarn_export_all, + export_all]). + +all() -> + [ + {group, cluster_size_3} + ]. + +groups() -> + [ + {cluster_size_3, [shuffle], + [ + rpc_new_to_old_node, + rpc_old_to_new_node + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Nodes = 3, + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodes_count, Nodes}, + {rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests. +rpc_new_to_old_node(Config) -> + rpc(0, 1, Config). + +rpc_old_to_new_node(Config) -> + rpc(1, 0, Config). + +rpc(RequesterNode, ResponderNode, Config) -> + RequestQueue = <<"my request queue">>, + %% This is the pseudo queue that is specially interpreted by RabbitMQ. + ReplyQueue = <<"amq.rabbitmq.reply-to">>, + RequestPayload = <<"my request">>, + ReplyPayload = <<"my reply">>, + CorrelationId = <<"my correlation ID">>, + RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode), + ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode), + + %% There is no need to declare this pseudo queue first. + amqp_channel:subscribe(RequesterCh, + #'basic.consume'{queue = ReplyQueue, + no_ack = true}, + self()), + CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0 + end, + #'queue.declare_ok'{} = amqp_channel:call( + RequesterCh, + #'queue.declare'{queue = RequestQueue}), + #'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}), + amqp_channel:register_confirm_handler(RequesterCh, self()), + %% Send the request. + amqp_channel:cast( + RequesterCh, + #'basic.publish'{routing_key = RequestQueue}, + #amqp_msg{props = #'P_basic'{reply_to = ReplyQueue, + correlation_id = CorrelationId}, + payload = RequestPayload}), + receive #'basic.ack'{} -> ok + after 5000 -> ct:fail(confirm_timeout) + end, + + %% Receive the request. + {#'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{reply_to = ReplyTo, + correlation_id = CorrelationId}, + payload = RequestPayload} + } = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}), + %% Send the reply. + amqp_channel:cast( + ResponderCh, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId}, + payload = ReplyPayload}), + + %% Receive the reply. + receive {#'basic.deliver'{consumer_tag = CTag}, + #amqp_msg{payload = ReplyPayload, + props = #'P_basic'{correlation_id = CorrelationId}}} -> + ok + after 5000 -> ct:fail(missing_reply) + end.