Skip to content

Commit

Permalink
Merge pull request #11398 from rabbitmq/md/khepri/read-only-permissio…
Browse files Browse the repository at this point in the history
…ns-transaction-queries

Khepri: Use read-only transactions to query for user/topic permissions
  • Loading branch information
the-mikedavis committed Jun 7, 2024
2 parents c592405 + d0425e5 commit 5e4a432
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 13 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ rabbitmq_integration_suite(

rabbitmq_integration_suite(
name = "cluster_minority_SUITE",
size = "large",
size = "medium",
additional_beam = [
":test_clustering_utils_beam",
],
Expand Down
20 changes: 13 additions & 7 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,16 @@ match_user_permissions_in_khepri('_' = _Username, VHostName) ->
VHostName,
fun() ->
match_user_permissions_in_khepri_tx(?KHEPRI_WILDCARD_STAR, VHostName)
end));
end),
ro);
match_user_permissions_in_khepri(Username, '_' = _VHostName) ->
rabbit_khepri:transaction(
with_fun_in_khepri_tx(
Username,
fun() ->
match_user_permissions_in_khepri_tx(Username, ?KHEPRI_WILDCARD_STAR)
end));
end),
ro);
match_user_permissions_in_khepri(Username, VHostName) ->
rabbit_khepri:transaction(
with_fun_in_khepri_tx(
Expand All @@ -390,7 +392,8 @@ match_user_permissions_in_khepri(Username, VHostName) ->
VHostName,
fun() ->
match_user_permissions_in_khepri_tx(Username, VHostName)
end))).
end)),
ro).

match_user_permissions_in_khepri_tx(Username, VHostName) ->
Path = khepri_user_permission_path(Username, VHostName),
Expand Down Expand Up @@ -739,15 +742,16 @@ match_topic_permissions_in_khepri('_' = _Username, '_' = _VHostName, ExchangeNam
fun() ->
match_topic_permissions_in_khepri_tx(
?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR, any(ExchangeName))
end);
end, ro);
match_topic_permissions_in_khepri('_' = _Username, VHostName, ExchangeName) ->
rabbit_khepri:transaction(
rabbit_db_vhost:with_fun_in_khepri_tx(
VHostName,
fun() ->
match_topic_permissions_in_khepri_tx(
?KHEPRI_WILDCARD_STAR, VHostName, any(ExchangeName))
end));
end),
ro);
match_topic_permissions_in_khepri(
Username, '_' = _VHostName, ExchangeName) ->
rabbit_khepri:transaction(
Expand All @@ -756,7 +760,8 @@ match_topic_permissions_in_khepri(
fun() ->
match_topic_permissions_in_khepri_tx(
Username, ?KHEPRI_WILDCARD_STAR, any(ExchangeName))
end));
end),
ro);
match_topic_permissions_in_khepri(
Username, VHostName, ExchangeName) ->
rabbit_khepri:transaction(
Expand All @@ -767,7 +772,8 @@ match_topic_permissions_in_khepri(
fun() ->
match_topic_permissions_in_khepri_tx(
Username, VHostName, any(ExchangeName))
end))).
end)),
ro).

match_topic_permissions_in_khepri_tx(Username, VHostName, ExchangeName) ->
Path = khepri_topic_permission_path(Username, VHostName, ExchangeName),
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,16 @@ transaction(Fun, ReadWrite) ->
transaction(Fun, ReadWrite, #{}).

transaction(Fun, ReadWrite, Options0) ->
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0),
%% If the transaction is read-only, use the same default options we use
%% for most queries.
DefaultQueryOptions = case ReadWrite of
ro ->
#{favor => low_latency};
_ ->
#{}
end,
Options1 = maps:merge(DefaultQueryOptions, Options0),
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options1),
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options) of
ok -> ok;
{ok, Result} -> Result;
Expand Down
22 changes: 18 additions & 4 deletions deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).
-compile([export_all, nowarn_export_all]).

all() ->
[
Expand All @@ -36,7 +36,8 @@ groups() ->
update_user,
delete_user,
set_policy,
delete_policy
delete_policy,
export_definitions
]},
{cluster_operation_add, [], [add_node]},
{cluster_operation_remove, [], [remove_node]},
Expand Down Expand Up @@ -88,6 +89,14 @@ init_per_group(Group, Config0) when Group == client_operations;
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),

%% Lower the default Khepri command timeout. By default this is set
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
%% group run unnecessarily slow.
[ok = rabbit_ct_broker_helpers:rpc(
Config1, N,
application, set_env,
[khepri, default_timeout, 100]) || N <- lists:seq(0, 4)],

%% Create partition
partition_5_node_cluster(Config1),
Config1
Expand All @@ -104,8 +113,8 @@ init_per_group(Group, Config0) ->

end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps() ++
rabbit_ct_client_helpers:teardown_steps()).
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
Expand Down Expand Up @@ -252,6 +261,11 @@ enable_feature_flag(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
?assertMatch({error, missing_clustered_nodes}, rabbit_ct_broker_helpers:rpc(Config, A, rabbit_feature_flags, enable, [khepri_db])).

export_definitions(Config) ->
Definitions = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_definitions, all_definitions, []),
?assert(is_map(Definitions)).

%% -------------------------------------------------------------------
%% Internal helpers.
Expand Down

0 comments on commit 5e4a432

Please sign in to comment.