Skip to content

Commit

Permalink
Merge pull request #11397 from rabbitmq/direct-reply-to
Browse files Browse the repository at this point in the history
4.x: add tests for AMQP 0.9.1 direct reply to feature
  • Loading branch information
michaelklishin committed Jun 7, 2024
2 parents d40110b + d806d69 commit c592405
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
4 changes: 4 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,10 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "amqpl_direct_reply_to_SUITE",
)

assert_suites()

filegroup(
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2120,3 +2120,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "amqpl_direct_reply_to_SUITE_beam_files",
testonly = True,
srcs = ["test/amqpl_direct_reply_to_SUITE.erl"],
outs = ["test/amqpl_direct_reply_to_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
7 changes: 3 additions & 4 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
-behaviour(gen_server2).

-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver_reply/2]).
-export([send_command/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
emit_info_all/4, info_local/1]).
-export([refresh_config_local/0, ready_for_close/1]).
Expand Down Expand Up @@ -158,7 +158,7 @@
%% rejected but are yet to be sent to the client
rejected,
%% used by "one shot RPC" (amq.
reply_consumer,
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
delivery_flow, %% Deprecated since removal of CMQ in 4.0
interceptor_state,
queue_states,
Expand Down Expand Up @@ -1210,8 +1210,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Message = rabbit_message_interceptor:intercept(Message0),
check_user_id_header(Message, User),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
[rabbit_channel:deliver_reply(RK, Message) ||
{virtual_reply_queue, RK} <- QNames],
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
Queues = rabbit_amqqueue:lookup_many(QNames),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
Expand Down
124 changes: 124 additions & 0 deletions deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(amqpl_direct_reply_to_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile([nowarn_export_all,
export_all]).

all() ->
[
{group, cluster_size_3}
].

groups() ->
[
{cluster_size_3, [shuffle],
[
rpc_new_to_old_node,
rpc_old_to_new_node
]}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config.

end_per_suite(Config) ->
Config.

init_per_group(_Group, Config) ->
Nodes = 3,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, Nodes},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
rpc_new_to_old_node(Config) ->
rpc(0, 1, Config).

rpc_old_to_new_node(Config) ->
rpc(1, 0, Config).

rpc(RequesterNode, ResponderNode, Config) ->
RequestQueue = <<"my request queue">>,
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
ReplyQueue = <<"amq.rabbitmq.reply-to">>,
RequestPayload = <<"my request">>,
ReplyPayload = <<"my reply">>,
CorrelationId = <<"my correlation ID">>,
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode),
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode),

%% There is no need to declare this pseudo queue first.
amqp_channel:subscribe(RequesterCh,
#'basic.consume'{queue = ReplyQueue,
no_ack = true},
self()),
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
end,
#'queue.declare_ok'{} = amqp_channel:call(
RequesterCh,
#'queue.declare'{queue = RequestQueue}),
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
amqp_channel:register_confirm_handler(RequesterCh, self()),
%% Send the request.
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = ReplyQueue,
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
after 5000 -> ct:fail(confirm_timeout)
end,

%% Receive the request.
{#'basic.get_ok'{},
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
correlation_id = CorrelationId},
payload = RequestPayload}
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
%% Send the reply.
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = ReplyPayload}),

%% Receive the reply.
receive {#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
ok
after 5000 -> ct:fail(missing_reply)
end.

0 comments on commit c592405

Please sign in to comment.