diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 698a5201e144..bf09ac25664b 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -169,7 +169,6 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", @@ -433,7 +432,6 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", @@ -543,7 +541,6 @@ def all_srcs(name = "all_srcs"): "include/gm_specs.hrl", "include/internal_user.hrl", "include/mc.hrl", - "include/mirrored_supervisor.hrl", "include/rabbit_global_counters.hrl", "include/vhost.hrl", "include/vhost_v2.hrl", @@ -557,6 +554,7 @@ def all_srcs(name = "all_srcs"): filegroup( name = "private_hdrs", srcs = [ + "src/mirrored_supervisor.hrl", "src/rabbit_feature_flags.hrl", "src/rabbit_fifo.hrl", "src/rabbit_fifo_dlx.hrl", @@ -715,7 +713,6 @@ def all_srcs(name = "all_srcs"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 8b8f065a5812..1aa33413cbf5 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -137,7 +137,11 @@ -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type group_name() :: any(). +-type group_name() :: module(). +-type child_id() :: term(). %% supervisor:child_id() is not exported. + +-export_type([group_name/0, + child_id/0]). -spec start_link(GroupName, Module, Args) -> startlink_ret() when GroupName :: group_name(), diff --git a/deps/rabbit/include/mirrored_supervisor.hrl b/deps/rabbit/src/mirrored_supervisor.hrl similarity index 100% rename from deps/rabbit/include/mirrored_supervisor.hrl rename to deps/rabbit/src/mirrored_supervisor.hrl diff --git a/deps/rabbit/src/rabbit_db_m2k_converter.erl b/deps/rabbit/src/rabbit_db_m2k_converter.erl index 87953da1a249..570a41d4d5b9 100644 --- a/deps/rabbit/src/rabbit_db_m2k_converter.erl +++ b/deps/rabbit/src/rabbit_db_m2k_converter.erl @@ -15,8 +15,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). %% Functions for `rabbit_db_*_m2k_converter' modules to call. --export([with_correlation_id/2, - get_sub_state/2]). +-export([with_correlation_id/2]). %% `mnesia_to_khepri_converter' callbacks. -export([init_copy_to_khepri/4, @@ -69,14 +68,6 @@ with_correlation_id( run_async_fun(Fun, State0) end. --spec get_sub_state(Module, State) -> Ret when - Module :: module(), - State :: state(), - Ret :: any(). - -get_sub_state(Module, #?MODULE{sub_states = SubStates}) -> - maps:get(Module, SubStates). - %% `mnesia_to_khepri_converter' callbacks -spec init_copy_to_khepri(StoreId, MigrationId, Tables, Migrations) -> diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl index 5ab135dbc892..3939efa6ae60 100644 --- a/deps/rabbit/src/rabbit_db_msup.erl +++ b/deps/rabbit/src/rabbit_db_msup.erl @@ -73,11 +73,11 @@ table_definitions() -> %% ------------------------------------------------------------------- -spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when - Group :: any(), + Group :: mirrored_supervisor:group_name(), Overall :: pid(), Delegate :: pid() | undefined, ChildSpec :: supervisor2:child_spec(), - Id :: {any(), any()}, + Id :: mirrored_supervisor:child_id(), Ret :: start | undefined | pid(). create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> @@ -129,8 +129,8 @@ write_in_mnesia(Group, Overall, ChildSpec, Id) -> ok = mnesia:write(?TABLE, S, write), ChildSpec. -create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = Id) -> - Path = khepri_mirrored_supervisor_path(Group, SimpleId), +create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) -> + Path = khepri_mirrored_supervisor_path(Group, Id), S = #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Overall, childspec = ChildSpec}, @@ -169,8 +169,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = %% ------------------------------------------------------------------- -spec delete(Group, Id) -> ok when - Group :: any(), - Id :: any(). + Group :: mirrored_supervisor:group_name(), + Id :: mirrored_supervisor:child_id(). delete(Group, Id) -> rabbit_khepri:handle_fallback( @@ -184,16 +184,16 @@ delete_in_mnesia(Group, Id) -> ok = mnesia:delete({?TABLE, {Group, Id}}) end). -delete_in_khepri(Group, {SimpleId, _}) -> - ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, SimpleId)). +delete_in_khepri(Group, Id) -> + ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, Id)). %% ------------------------------------------------------------------- %% find_mirror(). %% ------------------------------------------------------------------- -spec find_mirror(Group, Id) -> Ret when - Group :: any(), - Id :: any(), + Group :: mirrored_supervisor:group_name(), + Id :: mirrored_supervisor:child_id(), Ret :: {ok, pid()} | {error, not_found}. find_mirror(Group, Id) -> @@ -214,8 +214,8 @@ find_mirror_in_mnesia(Group, Id) -> _ -> {error, not_found} end. -find_mirror_in_khepri(Group, {SimpleId, _}) -> - case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, SimpleId)) of +find_mirror_in_khepri(Group, Id) -> + case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, Id)) of {ok, #mirrored_sup_childspec{mirroring_pid = Pid}} -> {ok, Pid}; _ -> @@ -269,7 +269,7 @@ update_all_in_khepri(Overall, OldOverall) -> %% ------------------------------------------------------------------- -spec delete_all(Group) -> ok when - Group :: any(). + Group :: mirrored_supervisor:group_name(). delete_all(Group) -> rabbit_khepri:handle_fallback( @@ -324,5 +324,9 @@ clear_in_khepri() -> khepri_mirrored_supervisor_path() -> [?MODULE, mirrored_supervisor_childspec]. +khepri_mirrored_supervisor_path(Group, Id) + when is_atom(Id) orelse is_binary(Id) -> + [?MODULE, mirrored_supervisor_childspec, Group, Id]; khepri_mirrored_supervisor_path(Group, Id) -> - [?MODULE, mirrored_supervisor_childspec, Group] ++ Id. + IdPath = Group:id_to_khepri_path(Id), + [?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath. diff --git a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl index 964d7e74db5c..e57b030d7768 100644 --- a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl +++ b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2022-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_db_msup_m2k_converter). @@ -19,7 +19,7 @@ copy_to_khepri/3, delete_from_khepri/3]). --record(?MODULE, {record_converters :: [module()]}). +-record(?MODULE, {}). -spec init_copy_to_khepri(StoreId, MigrationId, Tables) -> Ret when StoreId :: khepri:store_id(), @@ -33,8 +33,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) -> %% Clean up any previous attempt to copy the Mnesia table to Khepri. lists:foreach(fun clear_data_in_khepri/1, Tables), - Converters = discover_converters(?MODULE), - SubState = #?MODULE{record_converters = Converters}, + SubState = #?MODULE{}, {ok, SubState}. -spec copy_to_khepri(Table, Record, State) -> Ret when @@ -47,17 +46,13 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) -> %% @private copy_to_khepri(mirrored_sup_childspec = Table, - #mirrored_sup_childspec{} = Record0, + #mirrored_sup_childspec{key = {Group, Id} = Key} = Record, State) -> - #?MODULE{record_converters = Converters} = - rabbit_db_m2k_converter:get_sub_state(?MODULE, State), - Record = upgrade_record(Converters, Table, Record0), - #mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record, ?LOG_DEBUG( "Mnesia->Khepri data copy: [~0p] key: ~0p", [Table, Key], #{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}), - Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, SimpleId), + Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, Id), rabbit_db_m2k_converter:with_correlation_id( fun(CorrId) -> Extra = #{async => CorrId}, @@ -81,10 +76,8 @@ copy_to_khepri(Table, Record, State) -> Reason :: any(). %% @private -delete_from_khepri(mirrored_sup_childspec = Table, Key0, State) -> - #?MODULE{record_converters = Converters} = - rabbit_db_m2k_converter:get_sub_state(?MODULE, State), - {Group, Id} = Key = upgrade_key(Converters, Table, Key0), +delete_from_khepri( + mirrored_sup_childspec = Table, {Group, Id} = Key, State) -> ?LOG_DEBUG( "Mnesia->Khepri data delete: [~0p] key: ~0p", [Table, Key], @@ -109,39 +102,3 @@ clear_data_in_khepri(mirrored_sup_childspec) -> ok -> ok; Error -> throw(Error) end. - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - -discover_converters(MigrationMod) -> - Apps = rabbit_misc:rabbitmq_related_apps(), - AttrsPerApp = rabbit_misc:module_attributes_from_apps( - rabbit_mnesia_records_to_khepri_db, Apps), - discover_converters(MigrationMod, AttrsPerApp, []). - -discover_converters(MigrationMod, [{_App, _AppMod, AppConverters} | Rest], - Converters0) -> - Converters = - lists:foldl(fun({Module, Mod}, Acc) when Module =:= MigrationMod -> - [Mod | Acc]; - (_, Acc) -> - Acc - end, Converters0, AppConverters), - discover_converters(MigrationMod, Rest, Converters); -discover_converters(_MigrationMod, [], Converters) -> - Converters. - -upgrade_record(Converters, Table, Record) -> - lists:foldl(fun(Mod, Record0) -> - Mod:upgrade_record(Table, Record0) - end, Record, Converters). - -upgrade_key(Converters, Table, Key) -> - lists:foldl(fun(Mod, Key0) -> - Mod:upgrade_key(Table, Key0) - end, Key, Converters). diff --git a/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl b/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl deleted file mode 100644 index a3853f31706a..000000000000 --- a/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl +++ /dev/null @@ -1,24 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_mnesia_to_khepri_record_converter). - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --callback upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). - --callback upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). diff --git a/deps/rabbit/test/mirrored_supervisor_SUITE.erl b/deps/rabbit/test/mirrored_supervisor_SUITE.erl index 07c7a27d2fae..7ce527a684fc 100644 --- a/deps/rabbit/test/mirrored_supervisor_SUITE.erl +++ b/deps/rabbit/test/mirrored_supervisor_SUITE.erl @@ -331,7 +331,7 @@ childspec(Id) -> {id(Id), {?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. id(Id) -> - {[Id], Id}. + Id. pid_of(Id) -> {received, Pid, ping} = call(Id, ping), diff --git a/deps/rabbit/test/rabbit_db_msup_SUITE.erl b/deps/rabbit/test/rabbit_db_msup_SUITE.erl index 382b7639888c..f4a3f9a7fe00 100644 --- a/deps/rabbit/test/rabbit_db_msup_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_msup_SUITE.erl @@ -87,7 +87,7 @@ create_or_update1(_Config) -> passed. id(Id) -> - {[Id], Id}. + Id. find_mirror(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]). diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 696ee8feb45a..27a9fc4a0190 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -1089,8 +1089,14 @@ stop_rabbitmq_nodes(Config) -> case FindCrashes of true -> %% TODO: Make the ignore list configurable. - IgnoredCrashes = ["** force_vhost_failure"], - find_crashes_in_logs(NodeConfigs, IgnoredCrashes); + IgnoredCrashes0 = ["** force_vhost_failure"], + case rabbit_ct_helpers:get_config(Config, ignored_crashes) of + undefined -> + find_crashes_in_logs(NodeConfigs, IgnoredCrashes0); + IgnoredCrashes1 -> + find_crashes_in_logs( + NodeConfigs, IgnoredCrashes0 ++ IgnoredCrashes1) + end; false -> ok end, @@ -1172,7 +1178,11 @@ capture_gen_server_termination( Ret = re:run(Line, Prefix ++ "( .*|\\*.*|)$", ReOpts), case Ret of {match, [Suffix]} -> - case lists:member(Suffix, IgnoredCrashes) of + Ignore = lists:any( + fun(IgnoredCrash) -> + string:find(Suffix, IgnoredCrash) =/= nomatch + end, IgnoredCrashes), + case Ignore of false -> capture_gen_server_termination( Rest, Prefix, [Line | Acc], Count, IgnoredCrashes); @@ -1259,31 +1269,28 @@ rabbitmqctl(Config, Node, Args, Timeout) -> _ -> CanUseSecondary end, + WithPlugins0 = rabbit_ct_helpers:get_config(Config, + broker_with_plugins), + WithPlugins = case is_list(WithPlugins0) of + true -> lists:nth(I + 1, WithPlugins0); + false -> WithPlugins0 + end, Rabbitmqctl = case UseSecondaryUmbrella of true -> case BazelRunSecCmd of undefined -> - SrcDir = ?config( - secondary_rabbit_srcdir, - Config), - SecDepsDir = ?config( - secondary_erlang_mk_depsdir, - Config), - SecNewScriptsDir = filename:join( - [SecDepsDir, - SrcDir, - "sbin"]), - SecOldScriptsDir = filename:join( - [SecDepsDir, - "rabbit", - "scripts"]), - SecNewScriptsDirExists = filelib:is_dir( - SecNewScriptsDir), - SecScriptsDir = - case SecNewScriptsDirExists of - true -> SecNewScriptsDir; - false -> SecOldScriptsDir - end, + SrcDir = case WithPlugins of + false -> + ?config( + secondary_rabbit_srcdir, + Config); + _ -> + ?config( + secondary_current_srcdir, + Config) + end, + SecScriptsDir = filename:join( + [SrcDir, "sbin"]), rabbit_misc:format( "~ts/rabbitmqctl", [SecScriptsDir]); _ -> diff --git a/deps/rabbitmq_federation/BUILD.bazel b/deps/rabbitmq_federation/BUILD.bazel index 1fb2aad791c2..007b672b0536 100644 --- a/deps/rabbitmq_federation/BUILD.bazel +++ b/deps/rabbitmq_federation/BUILD.bazel @@ -96,7 +96,7 @@ rabbitmq_integration_suite( "test/rabbit_federation_test_util.beam", ], flaky = True, - shard_count = 2, + shard_count = 3, ) rabbitmq_integration_suite( diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl index d615aad0da08..63011ccd4359 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -8,9 +8,7 @@ -module(rabbit_federation_exchange_link_sup_sup). -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SUPERVISOR, ?MODULE). @@ -19,38 +17,7 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --export([upgrade_record/2, upgrade_key/2]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, #exchange{} = Exchange}} = Record) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(Exchange)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, #exchange{} = Exchange}) -> - {?MODULE, id(Exchange)}; -upgrade_key(_Table, Key) -> - Key. +-export([id_to_khepri_path/1]). %%---------------------------------------------------------------------------- @@ -66,87 +33,42 @@ start_link() -> %% Note that the next supervisor down, rabbit_federation_link_sup, is common %% between exchanges and queues. start_child(X) -> - Result = - case child_exists(X) orelse - mirrored_supervisor:start_child( - ?SUPERVISOR, - {id(X), {rabbit_federation_link_sup, start_link, [X]}, - transient, ?SUPERVISOR_WAIT, supervisor, - [rabbit_federation_link_sup]}) of - true -> - already_started; - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - already_started; - %% A link returned {stop, gone}, the link_sup shut down, that's OK. - {error, {shutdown, _}} -> - ok - end, - case Result of - ok -> - ok; - already_started -> - #exchange{name = ExchangeName} = X, - rabbit_log_federation:debug("Federation link for exchange ~tp was already started", - [rabbit_misc:rs(ExchangeName)]), - ok + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(X), {rabbit_federation_link_sup, start_link, [X]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:debug("Federation link for exchange ~tp was already started", + [rabbit_misc:rs(ExchangeName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok end. - -child_exists(Name) -> - Id = id(Name), - %% older format, pre-3.13.0 - OldId = old_id(Name), - lists:any(fun ({ChildId, _, _, _}) -> - ChildId =:= Id orelse ChildId =:= OldId - end, - mirrored_supervisor:which_children(?SUPERVISOR)). - adjust({clear_upstream, VHost, UpstreamName}) -> - _ = [rabbit_federation_link_sup:adjust(Pid, exchange_record_from_child_id(Id), {clear_upstream, UpstreamName}) || - {Id, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), - virtual_host_name_from_child_id(Id) =:= VHost - ], + _ = [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) || + {#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + Name#resource.virtual_host == VHost], ok; adjust(Reason) -> - _ = [rabbit_federation_link_sup:adjust(Pid, exchange_record_from_child_id(Id), Reason) || - {Id, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR) - ], + _ = [rabbit_federation_link_sup:adjust(Pid, X, Reason) || + {X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], ok. stop_child(X) -> - Result = - case stop_and_delete_child(id(X)) of - ok -> ok; - {error, not_found} = Error -> - case rabbit_khepri:is_enabled() of - true -> - %% Old id format is not supported by Khepri and cannot exist there - Error; - false -> - %% try old format, pre-3.13.0 - stop_and_delete_child(old_id(X)) - end - end, - case Result of - ok -> - ok; - {error, Err} -> - #exchange{name = ExchangeName} = X, - rabbit_log_federation:warning( - "Attempt to stop a federation link for exchange ~tp failed: ~tp", - [rabbit_misc:rs(ExchangeName), Err]), - ok - end. - -stop_and_delete_child(Id) -> - case mirrored_supervisor:terminate_child(?SUPERVISOR, Id) of - ok -> - ok = mirrored_supervisor:delete_child(?SUPERVISOR, Id); - {error, not_found} = Error -> - Error - end. + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(X)) of + ok -> ok; + {error, Err} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:warning( + "Attempt to stop a federation link for exchange ~tp failed: ~tp", + [rabbit_misc:rs(ExchangeName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)). %%---------------------------------------------------------------------------- @@ -156,24 +78,9 @@ init([]) -> %% See comment in rabbit_federation_queue_link_sup_sup:id/1 id(X = #exchange{policy = Policy}) -> X1 = rabbit_exchange:immutable(X), - {simple_id(X), X1#exchange{policy = Policy}}. + X2 = X1#exchange{policy = Policy}, + X2. -simple_id(#exchange{name = #resource{virtual_host = VHost, name = Name}}) -> +id_to_khepri_path( + #exchange{name = #resource{virtual_host = VHost, name = Name}}) -> [exchange, VHost, Name]. - -%% Old child id format, pre 3.13.0 -old_id(X = #exchange{policy = Policy}) -> - X1 = rabbit_exchange:immutable(X), - X1#exchange{policy = Policy}. - -%% New child id format, introduced in 3.13.0 for Khepri -exchange_record_from_child_id({_, #exchange{} = XR}) -> - XR; -%% Old child id format, pre-3.13.0 -exchange_record_from_child_id(#exchange{} = XR) -> - XR. - -virtual_host_name_from_child_id({_, #exchange{name = Res}}) -> - Res#resource.virtual_host; -virtual_host_name_from_child_id(#exchange{name = Res}) -> - Res#resource.virtual_host. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl index eb22c153dc96..0b7f8adba2bc 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl @@ -8,10 +8,8 @@ -module(rabbit_federation_queue_link_sup_sup). -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). -include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit/include/amqqueue.hrl"). -define(SUPERVISOR, ?MODULE). @@ -20,39 +18,7 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --export([upgrade_record/2, upgrade_key/2]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, Q}} = Record) - when ?is_amqqueue(Q) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(Q)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, Q}) when ?is_amqqueue(Q) -> - {?MODULE, id(Q)}; -upgrade_key(_Table, Key) -> - Key. +-export([id_to_khepri_path/1]). %%---------------------------------------------------------------------------- @@ -86,12 +52,12 @@ start_child(Q) -> adjust({clear_upstream, VHost, UpstreamName}) -> _ = [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) || - {{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), ?amqqueue_vhost_equals(Q, VHost)], ok; adjust(Reason) -> _ = [rabbit_federation_link_sup:adjust(Pid, Q, Reason) || - {{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], ok. stop_child(Q) -> @@ -123,8 +89,9 @@ init([]) -> id(Q) when ?is_amqqueue(Q) -> Policy = amqqueue:get_policy(Q), Q1 = amqqueue:set_immutable(Q), - {simple_id(Q), amqqueue:set_policy(Q1, Policy)}. + Q2 = amqqueue:set_policy(Q1, Policy), + Q2. -simple_id(Q) when ?is_amqqueue(Q) -> - #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q), +id_to_khepri_path(Id) when ?is_amqqueue(Id) -> + #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Id), [queue, VHost, Name]. diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 31d8825671bb..0f920caca0d7 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -30,13 +30,15 @@ all() -> [ {group, essential}, - {group, cluster_size_3} + {group, cluster_size_3}, + {group, rolling_upgrade} ]. groups() -> [ {essential, [], essential()}, {cluster_size_3, [], [max_hops]}, + {rolling_upgrade, [], [child_id_format]}, {cycle_protection, [], [ %% TBD: port from v3.10.x in an Erlang 25-compatible way ]}, @@ -96,6 +98,12 @@ init_per_group(cluster_size_3 = Group, Config) -> {rmq_nodes_count, 3} ]), init_per_group1(Group, Config1); +init_per_group(rolling_upgrade = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 5}, + {rmq_nodes_clustered, false} + ]), + init_per_group1(Group, Config1); init_per_group(Group, Config) -> init_per_group1(Group, Config). @@ -539,6 +547,119 @@ lookup_exchange_status(Config) -> [key, uri, status, timestamp, id, supervisor, upstream]), clean_up_federation_related_bits(Config). + +child_id_format(Config) -> + [UpstreamNode, + OldNodeA, + NewNodeB, + OldNodeC, + NewNodeD] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Create a cluster with the nodes running the old version of RabbitMQ in + %% mixed-version testing. + %% + %% Note: we build this on the assumption that `rabbit_ct_broker_helpers' + %% starts nodes this way: + %% Node 1: the primary copy of RabbitMQ the test is started from + %% Node 2: the secondary umbrella (if any) + %% Node 3: the primary copy + %% Node 4: the secondary umbrella + %% ... + %% + %% Therefore, `UpstreamNode' will use the primary copy, `OldNodeA' the + %% secondary umbrella, `NewNodeB' the primary copy, and so on. + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, [OldNodeA, OldNodeC]), + + %% Prepare the whole federated exchange on that old cluster. + UpstreamName = <<"fed_on_upgrade">>, + rabbit_ct_broker_helpers:set_parameter( + Config1, OldNodeA, <<"federation-upstream">>, UpstreamName, + [ + {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config1, UpstreamNode)} + ]), + + rabbit_ct_broker_helpers:set_policy( + Config1, OldNodeA, + <<"fed_on_upgrade_policy">>, <<"^fed_">>, <<"all">>, + [ + {<<"federation-upstream-pattern">>, UpstreamName} + ]), + + XName = <<"fed_ex_on_upgrade_cluster">>, + X = exchange_declare_method(XName, <<"direct">>), + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel( + Config1, OldNodeA), + ?assertEqual({'exchange.declare_ok'}, declare_exchange(Ch1, X)), + rabbit_ct_client_helpers:close_channel(Ch1), + rabbit_ct_client_helpers:close_connection(Conn1), + + %% Verify the format of the child ID. In the main branch, the format was + %% temporarily a size-2 tuple with a list as the first element. This was + %% not kept later and the original ID format is used in old and new nodes. + [{Id, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config1, OldNodeA, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup]), + case Id of + %% This is the format we expect everywhere. + #exchange{name = #resource{name = XName}} -> + %% Verify that the supervisors exist on all nodes. + lists:foreach( + fun(Node) -> + ?assertMatch( + [{#exchange{name = #resource{name = XName}}, + _, _, _}], + rabbit_ct_broker_helpers:rpc( + Config1, Node, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup])) + end, [OldNodeA, OldNodeC]), + + %% Simulate a rolling upgrade by: + %% 1. adding new nodes to the old cluster + %% 2. stopping the old nodes + %% + %% After that, the supervisors run on the new code. + Config2 = rabbit_ct_broker_helpers:cluster_nodes( + Config1, [OldNodeA, NewNodeB, NewNodeD]), + ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeA), + ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNodeA), + ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeC), + ok = rabbit_ct_broker_helpers:reset_node(Config2, OldNodeC), + + %% Verify that the supervisors still use the same IDs. + lists:foreach( + fun(Node) -> + ?assertMatch( + [{#exchange{name = #resource{name = XName}}, + _, _, _}], + rabbit_ct_broker_helpers:rpc( + Config2, Node, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup])) + end, [NewNodeB, NewNodeD]), + + %% Delete the exchange: it should work because the ID format is the + %% one expected. + %% + %% During the transient period where the ID format was changed, + %% this would crash with a badmatch because the running + %% supervisor's ID would not match the content of the database. + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel( + Config2, NewNodeB), + ?assertEqual({'exchange.delete_ok'}, delete_exchange(Ch2, XName)), + rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection(Conn2); + + %% This is the transient format we are not interested in as it only + %% lived in a development branch. + {List, #exchange{name = #resource{name = XName}}} + when is_list(List) -> + {skip, "Testcase skipped with the transiently changed ID format"} + end. + %% %% Test helpers %% diff --git a/deps/rabbitmq_shovel/BUILD.bazel b/deps/rabbitmq_shovel/BUILD.bazel index f156c434686f..2fd0032bdb46 100644 --- a/deps/rabbitmq_shovel/BUILD.bazel +++ b/deps/rabbitmq_shovel/BUILD.bazel @@ -164,6 +164,19 @@ rabbitmq_suite( ], ) +rabbitmq_integration_suite( + name = "rolling_upgrade_SUITE", + additional_beam = [ + "test/shovel_test_utils.beam", + ], + deps = [ + "@khepri//:erlang_app", + ], + # FIXME: As of this writing, there is a bug in Khepri that makes this + # testsuite unstable. + flaky = True, +) + rabbitmq_integration_suite( name = "shovel_status_command_SUITE", additional_beam = [ diff --git a/deps/rabbitmq_shovel/app.bzl b/deps/rabbitmq_shovel/app.bzl index 7ce47fc8dfe9..00b16a791c2e 100644 --- a/deps/rabbitmq_shovel/app.bzl +++ b/deps/rabbitmq_shovel/app.bzl @@ -225,6 +225,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], ) + erlang_bytecode( + name = "rolling_upgrade_SUITE_beam_files", + testonly = True, + srcs = ["test/rolling_upgrade_SUITE.erl"], + outs = ["test/rolling_upgrade_SUITE.beam"], + app_name = "rabbitmq_shovel", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app", "@khepri//:erlang_app"], + ) erlang_bytecode( name = "shovel_status_command_SUITE_beam_files", testonly = True, diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index 1bc1edec5fd3..0b06bdb39090 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -6,53 +6,18 @@ %% -module(rabbit_shovel_dyn_worker_sup_sup). - -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). -export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]). +-export([id_to_khepri_path/1]). -import(rabbit_misc, [pget/2]). -import(rabbit_data_coercion, [to_map/1, to_list/1]). -include("rabbit_shovel.hrl"). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SUPERVISOR, ?MODULE). -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter --export([upgrade_record/2, upgrade_key/2]). - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, {A, B} = OldId}} = Record) - when is_binary(A) and is_binary(B) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(OldId)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, {A, B} = OldId}) - when is_binary(A) and is_binary(B) -> - {?MODULE, id(OldId)}; -upgrade_key(_Table, Key) -> - Key. - start_link() -> Pid = case mirrored_supervisor:start_link( {local, ?SUPERVISOR}, ?SUPERVISOR, @@ -97,10 +62,9 @@ obfuscated_uris_parameters(Def) when is_list(Def) -> child_exists(Name) -> Id = id(Name), - %% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894. - OldId = old_id(Name), + TmpExpId = temp_experimental_id(Name), lists:any(fun ({ChildId, _, _, _}) -> - ChildId =:= Id orelse ChildId =:= OldId + ChildId =:= Id orelse ChildId =:= TmpExpId end, mirrored_supervisor:which_children(?SUPERVISOR)). @@ -110,20 +74,14 @@ stop_child({VHost, ShovelName} = Name) -> case get({shovel_worker_autodelete, Name}) of true -> ok; %% [1] _ -> - case stop_and_delete_child(id(Name)) of + Id = id(Name), + case stop_and_delete_child(Id) of ok -> ok; {error, not_found} -> - case rabbit_khepri:is_enabled() of - true -> - %% Old id format is not supported by and cannot exist in Khepri - ok; - false -> - %% try older format, pre 3.13.0 and 3.12.8. - %% See rabbitmq/rabbitmq-server#9894. - _ = stop_and_delete_child(old_id(Name)), - ok - end + TmpExpId = temp_experimental_id(Name), + _ = stop_and_delete_child(TmpExpId), + ok end, rabbit_shovel_status:remove(Name) end, @@ -148,48 +106,54 @@ stop_and_delete_child(Id) -> cleanup_specs() -> Children = mirrored_supervisor:which_children(?SUPERVISOR), - - ChildIdSet = sets:from_list([element(1, S) || S <- Children]), - ParamsSet = params_to_child_ids(rabbit_khepri:is_enabled()), - F = fun(ChildId, ok) -> - try - %% The supervisor operation is very unlikely to fail, it's the schema - %% data stores that can make a fuss about a non-existent or non-standard value passed in. - %% For example, an old style Shovel name is an invalid Khepri query path element. MK. - _ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId) - catch _:_:_Stacktrace -> - ok - end, - ok - end, + ParamsSet = sets:from_list( + [id({proplists:get_value(vhost, S), + proplists:get_value(name, S)}) + || S <- rabbit_runtime_parameters:list_component( + <<"shovel">>)]), %% Delete any supervisor children that do not have their respective runtime parameters in the database. - SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet), - ok = sets:fold(F, ok, SetToCleanUp). - -params_to_child_ids(_KhepriEnabled = true) -> - %% Old id format simply cannot exist in Khepri because having Khepri enabled - %% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK. - sets:from_list([id({proplists:get_value(vhost, S), proplists:get_value(name, S)}) - || S <- rabbit_runtime_parameters:list_component(<<"shovel">>)]); -params_to_child_ids(_KhepriEnabled = false) -> - sets:from_list( - lists:flatmap( - fun(S) -> - Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)}, - %% Supervisor Id format was different pre 3.13.0 and 3.12.8. - %% Try both formats to cover the transitionary mixed version cluster period. - [id(Name), old_id(Name)] - end, - rabbit_runtime_parameters:list_component(<<"shovel">>))). + lists:foreach( + fun + ({{VHost, ShovelName} = ChildId, _, _, _}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(ChildId, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end; + ({{List, {VHost, ShovelName} = Id} = ChildId, _, _, _}) + when is_list(List) andalso + is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(Id, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end + end, Children). %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 3, 10}, []}}. -id({V, S} = Name) -> - {[V, S], Name}. - -%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894 -old_id({_V, _S} = Name) -> +id({VHost, ShovelName} = Name) + when is_binary(VHost) andalso is_binary(ShovelName) -> Name. + +id_to_khepri_path({VHost, ShovelName}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]; +id_to_khepri_path({List, {VHost, ShovelName}}) + when is_list(List) andalso is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]. + +%% Temporary experimental format, erroneously backported to some 3.11.x and +%% 3.12.x releases in rabbitmq/rabbitmq-server#9796. +%% +%% See rabbitmq/rabbitmq-server#10306. +temp_experimental_id({V, S} = Name) -> + {[V, S], Name}. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl index a4266e7573d9..50056d023042 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl @@ -9,6 +9,7 @@ -behaviour(mirrored_supervisor). -export([start_link/2, init/1]). +-export([id_to_khepri_path/1]). -include("rabbit_shovel.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -30,5 +31,8 @@ init([Name, Config]) -> [rabbit_shovel_worker]}], {ok, {{one_for_one, 1, ?MAX_WAIT}, ChildSpecs}}. -id(Name) -> - {[Name], Name}. +id(Name) when is_atom(Name) -> + Name. + +id_to_khepri_path(Name) when is_atom(Name) -> + [Name]. diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl new file mode 100644 index 000000000000..c4051ae3bba6 --- /dev/null +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -0,0 +1,268 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rolling_upgrade_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("khepri/include/khepri.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + child_id_format/1]). + +suite() -> + [{timetrap, {minutes, 5}}]. + +all() -> + [ + {group, mnesia_store}, + {group, khepri_store} + ]. + +groups() -> + [{mnesia_store, [], [child_id_format]}, + {khepri_store, [], [child_id_format]}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(mnesia_store, Config) -> + rabbit_ct_helpers:set_config(Config, [{metadata_store__manual, mnesia}]); +init_per_group(khepri_store, Config) -> + rabbit_ct_helpers:set_config(Config, [{metadata_store__manual, khepri}]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = 4, + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodes_clustered, false}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, + {ignored_crashes, + ["process is stopped by supervisor", + "broker forced connection closure with reason 'shutdown'"]} + ]), + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +child_id_format(Config) -> + [NewRefNode, + OldNode, + NewNode, + NodeWithQueues] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% We build this test on the assumption that `rabbit_ct_broker_helpers' + %% starts nodes this way: + %% Node 1: the primary copy of RabbitMQ the test is started from + %% Node 2: the secondary umbrella (if any) + %% Node 3: the primary copy + %% Node 4: the secondary umbrella + %% ... + %% + %% Therefore, `Pouet' will use the primary copy, `OldNode' the secondary + %% umbrella, `NewRefNode' the primary copy, and `NodeWithQueues' the + %% secondary umbrella. + + %% Declare source and target queues on a node that won't run the shovel. + ct:pal("Declaring queues on node ~s", [NodeWithQueues]), + SourceQName = <<"source-queue">>, + TargetQName = <<"target-queue">>, + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, NodeWithQueues), + lists:foreach( + fun(QName) -> + ?assertEqual( + {'queue.declare_ok', QName, 0, 0}, + amqp_channel:call( + Ch, #'queue.declare'{queue = QName, durable = true})) + end, [SourceQName, TargetQName]), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + + %% Declare a dynamic shovel on the old node. + ct:pal("Declaring queues on node ~s", [OldNode]), + VHost = <<"/">>, + ShovelName = <<"test-shovel">>, + shovel_test_utils:set_param( + Config, + OldNode, + NodeWithQueues, + ShovelName, + [{<<"src-queue">>, SourceQName}, + {<<"dest-queue">>, TargetQName}]), + + %% We declare the same shovel on a new node that won't be clustered with + %% the rest. It is only used as a reference node to determine which ID + %% format the new version is using. + ct:pal("Declaring queues on node ~s (as a reference)", [NewRefNode]), + shovel_test_utils:set_param( + Config, + NewRefNode, + NodeWithQueues, + ShovelName, + [{<<"src-queue">>, SourceQName}, + {<<"dest-queue">>, TargetQName}]), + + %% Verify the format of the child ID. Some versions of RabbitMQ 3.11.x and + %% 3.12.x use a temporary experimental format that was erroneously + %% backported from a work-in-progress happening in the main branch. + ct:pal("Checking mirrored_supervisor child ID formats"), + [{Id0, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config, NewRefNode, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup]), + PrimaryIdType = case Id0 of + {VHost, ShovelName} -> + ct:pal( + "The nodes from the primary umbrella are using " + "the NORMAL mirrored_supervisor child ID format " + "natively"), + normal; + {[VHost, ShovelName], {VHost, ShovelName}} -> + ct:pal( + "The nodes from the primary umbrella are using " + "the TEMPORARY EXPERIMENTAL mirrored_supervisor " + "child ID format natively"), + temp_exp + end, + + [{Id1, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config, OldNode, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup]), + SecondaryIdType = case Id1 of + {VHost, ShovelName} -> + ct:pal( + "The nodes from the secondary umbrella are " + "using the NORMAL mirrored_supervisor child " + "ID format natively"), + normal; + {[VHost, ShovelName], {VHost, ShovelName}} -> + ct:pal( + "The nodes from the secondary umbrella are " + "using the TEMPORARY EXPERIMENTAL " + "mirrored_supervisor child ID format " + "natively"), + temp_exp + end, + if + PrimaryIdType =/= SecondaryIdType -> + ct:pal( + "The mirrored_supervisor child ID format is changing between " + "the primary and the secondary umbrellas!"); + true -> + ok + end, + + %% Verify that the supervisors exist on all nodes. + ct:pal( + "Checking running mirrored_supervisor children on old node ~s", + [OldNode]), + lists:foreach( + fun(Node) -> + ?assertMatch( + [{Id, _, _, _}] + when (SecondaryIdType =:= normal andalso + Id =:= {VHost, ShovelName}) orelse + (SecondaryIdType =:= temp_exp andalso + Id =:= {[VHost, ShovelName], {VHost, ShovelName}}), + rabbit_ct_broker_helpers:rpc( + Config, Node, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup])) + end, [OldNode]), + + %% Simulate a rolling upgrade by: + %% 1. adding new nodes to the old cluster + %% 2. stopping the old nodes + %% + %% After that, the supervisors run on the new code. + ct:pal("Clustering nodes ~s and ~s", [OldNode, NewNode]), + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, [OldNode, NewNode]), + ok = rabbit_ct_broker_helpers:stop_broker(Config1, OldNode), + ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNode), + + shovel_test_utils:await_shovel(Config, NewNode, ShovelName), + + case ?config(metadata_store__manual, Config) of + mnesia -> + ok; + khepri -> + ok = rabbit_ct_broker_helpers:enable_feature_flag( + Config, [NewNode], khepri_db) + end, + + %% Verify that the supervisors still use the same IDs. + ct:pal( + "Checking running mirrored_supervisor children on new node ~s", + [NewNode]), + lists:foreach( + fun(Node) -> + ?assertMatch( + [{Id, _, _, _}] + when (SecondaryIdType =:= normal andalso + Id =:= {VHost, ShovelName}) orelse + (SecondaryIdType =:= temp_exp andalso + Id =:= {[VHost, ShovelName], {VHost, ShovelName}}), + rabbit_ct_broker_helpers:rpc( + Config1, Node, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup])) + end, [NewNode]), + + case ?config(metadata_store__manual, Config) of + mnesia -> + ok; + khepri -> + Path = rabbit_db_msup:khepri_mirrored_supervisor_path(), + ?assertMatch( + {ok, + #{[rabbit_db_msup, mirrored_supervisor_childspec, + rabbit_shovel_dyn_worker_sup_sup, VHost, ShovelName] := _}}, + rabbit_ct_broker_helpers:rpc( + Config, NewNode, rabbit_khepri, list, + [Path ++ [?KHEPRI_WILDCARD_STAR_STAR]])) + end. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index d59f259cb6af..a5589290fa90 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -8,29 +8,44 @@ -module(shovel_test_utils). -include_lib("common_test/include/ct.hrl"). --export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2, - shovels_from_status/0, get_shovel_status/2, - await/1, await/2, clear_param/2]). +-export([set_param/3, set_param/4, set_param/5, set_param_nowait/3, + await_shovel/2, await_shovel/3, await_shovel1/2, + shovels_from_status/0, get_shovel_status/2, get_shovel_status/3, + await/1, await/2, clear_param/2, clear_param/3]). -make_uri(Config) -> +make_uri(Config, Node) -> Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b", [Hostname, Port]))). + set_param(Config, Name, Value) -> - set_param_nowait(Config, Name, Value), - await_shovel(Config, Name). + set_param_nowait(Config, 0, 0, Name, Value), + await_shovel(Config, 0, Name). + +set_param(Config, Node, Name, Value) -> + set_param(Config, Node, Node, Name, Value). + +set_param(Config, Node, QueueNode, Name, Value) -> + set_param_nowait(Config, Node, QueueNode, Name, Value), + await_shovel(Config, Node, Name). set_param_nowait(Config, Name, Value) -> - Uri = make_uri(Config), - ok = rabbit_ct_broker_helpers:rpc(Config, 0, + set_param_nowait(Config, 0, 0, Name, Value). + +set_param_nowait(Config, Node, QueueNode, Name, Value) -> + Uri = make_uri(Config, QueueNode), + ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_runtime_parameters, set, [ <<"/">>, <<"shovel">>, Name, [{<<"src-uri">>, Uri}, {<<"dest-uri">>, [Uri]} | Value], none]). await_shovel(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, + await_shovel(Config, 0, Name). + +await_shovel(Config, Node, Name) -> + rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, await_shovel1, [Config, Name]). await_shovel1(_Config, Name) -> @@ -41,8 +56,11 @@ shovels_from_status() -> [N || {{<<"/">>, N}, dynamic, {running, _}, _} <- S]. get_shovel_status(Config, Name) -> + get_shovel_status(Config, 0, Name). + +get_shovel_status(Config, Node, Name) -> S = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + Config, Node, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), case S of not_found -> not_found; @@ -70,5 +88,8 @@ await(Pred, Timeout) -> end. clear_param(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, + clear_param(Config, 0, Name). + +clear_param(Config, Node, Name) -> + rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]). diff --git a/moduleindex.yaml b/moduleindex.yaml index 7b6aa9bc8102..fbadf554f0ea 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -778,7 +778,6 @@ rabbit: - rabbit_mirror_queue_slave - rabbit_mirror_queue_sync - rabbit_mnesia -- rabbit_mnesia_to_khepri_record_converter - rabbit_msg_file - rabbit_msg_record - rabbit_msg_store