diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 9244d93f77fe..ba4cf2fe20f8 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1256,7 +1256,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "cluster_minority_SUITE", - size = "large", + size = "medium", additional_beam = [ ":test_clustering_utils_beam", ], diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index ba629a5c71ea..c19084f74ad4 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -374,14 +374,16 @@ match_user_permissions_in_khepri('_' = _Username, VHostName) -> VHostName, fun() -> match_user_permissions_in_khepri_tx(?KHEPRI_WILDCARD_STAR, VHostName) - end)); + end), + ro); match_user_permissions_in_khepri(Username, '_' = _VHostName) -> rabbit_khepri:transaction( with_fun_in_khepri_tx( Username, fun() -> match_user_permissions_in_khepri_tx(Username, ?KHEPRI_WILDCARD_STAR) - end)); + end), + ro); match_user_permissions_in_khepri(Username, VHostName) -> rabbit_khepri:transaction( with_fun_in_khepri_tx( @@ -390,7 +392,8 @@ match_user_permissions_in_khepri(Username, VHostName) -> VHostName, fun() -> match_user_permissions_in_khepri_tx(Username, VHostName) - end))). + end)), + ro). match_user_permissions_in_khepri_tx(Username, VHostName) -> Path = khepri_user_permission_path(Username, VHostName), @@ -739,7 +742,7 @@ match_topic_permissions_in_khepri('_' = _Username, '_' = _VHostName, ExchangeNam fun() -> match_topic_permissions_in_khepri_tx( ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR, any(ExchangeName)) - end); + end, ro); match_topic_permissions_in_khepri('_' = _Username, VHostName, ExchangeName) -> rabbit_khepri:transaction( rabbit_db_vhost:with_fun_in_khepri_tx( @@ -747,7 +750,8 @@ match_topic_permissions_in_khepri('_' = _Username, VHostName, ExchangeName) -> fun() -> match_topic_permissions_in_khepri_tx( ?KHEPRI_WILDCARD_STAR, VHostName, any(ExchangeName)) - end)); + end), + ro); match_topic_permissions_in_khepri( Username, '_' = _VHostName, ExchangeName) -> rabbit_khepri:transaction( @@ -756,7 +760,8 @@ match_topic_permissions_in_khepri( fun() -> match_topic_permissions_in_khepri_tx( Username, ?KHEPRI_WILDCARD_STAR, any(ExchangeName)) - end)); + end), + ro); match_topic_permissions_in_khepri( Username, VHostName, ExchangeName) -> rabbit_khepri:transaction( @@ -767,7 +772,8 @@ match_topic_permissions_in_khepri( fun() -> match_topic_permissions_in_khepri_tx( Username, VHostName, any(ExchangeName)) - end))). + end)), + ro). match_topic_permissions_in_khepri_tx(Username, VHostName, ExchangeName) -> Path = khepri_topic_permission_path(Username, VHostName, ExchangeName), diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 1e617f968d43..05f517f30875 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -974,7 +974,16 @@ transaction(Fun, ReadWrite) -> transaction(Fun, ReadWrite, #{}). transaction(Fun, ReadWrite, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), + %% If the transaction is read-only, use the same default options we use + %% for most queries. + DefaultQueryOptions = case ReadWrite of + ro -> + #{favor => low_latency}; + _ -> + #{} + end, + Options1 = maps:merge(DefaultQueryOptions, Options0), + Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options1), case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options) of ok -> ok; {ok, Result} -> Result; diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index 352199f0afa3..558635d1b893 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -11,7 +11,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). --compile(export_all). +-compile([export_all, nowarn_export_all]). all() -> [ @@ -37,7 +37,8 @@ groups() -> update_user, delete_user, set_policy, - delete_policy + delete_policy, + export_definitions ]}, {cluster_operation_add, [], [add_node]}, {cluster_operation_remove, [], [remove_node]}, @@ -89,6 +90,14 @@ init_per_group(Group, Config0) when Group == client_operations; #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>, arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}), + %% Lower the default Khepri command timeout. By default this is set + %% to 30s in `rabbit_khepri:setup/1' which makes the cases in this + %% group run unnecessarily slow. + [ok = rabbit_ct_broker_helpers:rpc( + Config1, N, + application, set_env, + [khepri, default_timeout, 100]) || N <- lists:seq(0, 4)], + %% Create partition partition_5_node_cluster(Config1), Config1 @@ -105,8 +114,8 @@ init_per_group(Group, Config0) -> end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, - rabbit_ct_broker_helpers:teardown_steps() ++ - rabbit_ct_client_helpers:teardown_steps()). + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -253,6 +262,11 @@ enable_feature_flag(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ?assertMatch({error, missing_clustered_nodes}, rabbit_ct_broker_helpers:rpc(Config, A, rabbit_feature_flags, enable, [khepri_db])). +export_definitions(Config) -> + Definitions = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_definitions, all_definitions, []), + ?assert(is_map(Definitions)). %% ------------------------------------------------------------------- %% Internal helpers.