From e0eccbe87470235566bda43ba7e06911818a8811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 2 Feb 2024 16:22:11 +0100 Subject: [PATCH 1/9] Revert "Merge pull request #10453 from rabbitmq/rabbitmq-server-10416" This reverts commit 40ff91a55b17f1af6b7870c8b9bad5b3bbd37163, reversing changes made to d57a31068d23570d9c14ab6581f8ea3016253e78. --- ...abbit_federation_exchange_link_sup_sup.erl | 118 +++++------------- 1 file changed, 28 insertions(+), 90 deletions(-) diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl index d615aad0da08..7b6dc913577b 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -66,87 +66,42 @@ start_link() -> %% Note that the next supervisor down, rabbit_federation_link_sup, is common %% between exchanges and queues. start_child(X) -> - Result = - case child_exists(X) orelse - mirrored_supervisor:start_child( - ?SUPERVISOR, - {id(X), {rabbit_federation_link_sup, start_link, [X]}, - transient, ?SUPERVISOR_WAIT, supervisor, - [rabbit_federation_link_sup]}) of - true -> - already_started; - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - already_started; - %% A link returned {stop, gone}, the link_sup shut down, that's OK. - {error, {shutdown, _}} -> - ok - end, - case Result of - ok -> - ok; - already_started -> - #exchange{name = ExchangeName} = X, - rabbit_log_federation:debug("Federation link for exchange ~tp was already started", - [rabbit_misc:rs(ExchangeName)]), - ok + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(X), {rabbit_federation_link_sup, start_link, [X]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:debug("Federation link for exchange ~tp was already started", + [rabbit_misc:rs(ExchangeName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok end. - -child_exists(Name) -> - Id = id(Name), - %% older format, pre-3.13.0 - OldId = old_id(Name), - lists:any(fun ({ChildId, _, _, _}) -> - ChildId =:= Id orelse ChildId =:= OldId - end, - mirrored_supervisor:which_children(?SUPERVISOR)). - adjust({clear_upstream, VHost, UpstreamName}) -> - _ = [rabbit_federation_link_sup:adjust(Pid, exchange_record_from_child_id(Id), {clear_upstream, UpstreamName}) || - {Id, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), - virtual_host_name_from_child_id(Id) =:= VHost - ], + _ = [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) || + {{_, #exchange{name = Name} = X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + Name#resource.virtual_host == VHost], ok; adjust(Reason) -> - _ = [rabbit_federation_link_sup:adjust(Pid, exchange_record_from_child_id(Id), Reason) || - {Id, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR) - ], + _ = [rabbit_federation_link_sup:adjust(Pid, X, Reason) || + {{_, X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], ok. stop_child(X) -> - Result = - case stop_and_delete_child(id(X)) of - ok -> ok; - {error, not_found} = Error -> - case rabbit_khepri:is_enabled() of - true -> - %% Old id format is not supported by Khepri and cannot exist there - Error; - false -> - %% try old format, pre-3.13.0 - stop_and_delete_child(old_id(X)) - end - end, - case Result of - ok -> - ok; - {error, Err} -> - #exchange{name = ExchangeName} = X, - rabbit_log_federation:warning( - "Attempt to stop a federation link for exchange ~tp failed: ~tp", - [rabbit_misc:rs(ExchangeName), Err]), - ok - end. - -stop_and_delete_child(Id) -> - case mirrored_supervisor:terminate_child(?SUPERVISOR, Id) of - ok -> - ok = mirrored_supervisor:delete_child(?SUPERVISOR, Id); - {error, not_found} = Error -> - Error - end. + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(X)) of + ok -> ok; + {error, Err} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:warning( + "Attempt to stop a federation link for exchange ~tp failed: ~tp", + [rabbit_misc:rs(ExchangeName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)). %%---------------------------------------------------------------------------- @@ -160,20 +115,3 @@ id(X = #exchange{policy = Policy}) -> simple_id(#exchange{name = #resource{virtual_host = VHost, name = Name}}) -> [exchange, VHost, Name]. - -%% Old child id format, pre 3.13.0 -old_id(X = #exchange{policy = Policy}) -> - X1 = rabbit_exchange:immutable(X), - X1#exchange{policy = Policy}. - -%% New child id format, introduced in 3.13.0 for Khepri -exchange_record_from_child_id({_, #exchange{} = XR}) -> - XR; -%% Old child id format, pre-3.13.0 -exchange_record_from_child_id(#exchange{} = XR) -> - XR. - -virtual_host_name_from_child_id({_, #exchange{name = Res}}) -> - Res#resource.virtual_host; -virtual_host_name_from_child_id(#exchange{name = Res}) -> - Res#resource.virtual_host. From 9dcf7c322e743314c9bf2b267d2d3c544522f1a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Sun, 4 Feb 2024 14:41:50 +0100 Subject: [PATCH 2/9] Revert "Merge pull request #10096 from rabbitmq/upgrade-mirroring-sup-childspec" This reverts commit 75773e6d9ba286b5ce042b0459571e63416b419a, reversing changes made to e40367f26460f6291f5f790d686c92d3f2c513e6. --- deps/rabbit/app.bzl | 5 +- .../{include => src}/mirrored_supervisor.hrl | 0 deps/rabbit/src/rabbit_db_m2k_converter.erl | 11 +--- .../src/rabbit_db_msup_m2k_converter.erl | 55 ++----------------- ...bbit_mnesia_to_khepri_record_converter.erl | 24 -------- ...abbit_federation_exchange_link_sup_sup.erl | 34 ------------ .../rabbit_federation_queue_link_sup_sup.erl | 35 ------------ .../src/rabbit_shovel_dyn_worker_sup_sup.erl | 36 ------------ moduleindex.yaml | 1 - 9 files changed, 8 insertions(+), 193 deletions(-) rename deps/rabbit/{include => src}/mirrored_supervisor.hrl (100%) delete mode 100644 deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 698a5201e144..bf09ac25664b 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -169,7 +169,6 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", @@ -433,7 +432,6 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", @@ -543,7 +541,6 @@ def all_srcs(name = "all_srcs"): "include/gm_specs.hrl", "include/internal_user.hrl", "include/mc.hrl", - "include/mirrored_supervisor.hrl", "include/rabbit_global_counters.hrl", "include/vhost.hrl", "include/vhost_v2.hrl", @@ -557,6 +554,7 @@ def all_srcs(name = "all_srcs"): filegroup( name = "private_hdrs", srcs = [ + "src/mirrored_supervisor.hrl", "src/rabbit_feature_flags.hrl", "src/rabbit_fifo.hrl", "src/rabbit_fifo_dlx.hrl", @@ -715,7 +713,6 @@ def all_srcs(name = "all_srcs"): "src/rabbit_mirror_queue_slave.erl", "src/rabbit_mirror_queue_sync.erl", "src/rabbit_mnesia.erl", - "src/rabbit_mnesia_to_khepri_record_converter.erl", "src/rabbit_msg_file.erl", "src/rabbit_msg_record.erl", "src/rabbit_msg_store.erl", diff --git a/deps/rabbit/include/mirrored_supervisor.hrl b/deps/rabbit/src/mirrored_supervisor.hrl similarity index 100% rename from deps/rabbit/include/mirrored_supervisor.hrl rename to deps/rabbit/src/mirrored_supervisor.hrl diff --git a/deps/rabbit/src/rabbit_db_m2k_converter.erl b/deps/rabbit/src/rabbit_db_m2k_converter.erl index 87953da1a249..570a41d4d5b9 100644 --- a/deps/rabbit/src/rabbit_db_m2k_converter.erl +++ b/deps/rabbit/src/rabbit_db_m2k_converter.erl @@ -15,8 +15,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). %% Functions for `rabbit_db_*_m2k_converter' modules to call. --export([with_correlation_id/2, - get_sub_state/2]). +-export([with_correlation_id/2]). %% `mnesia_to_khepri_converter' callbacks. -export([init_copy_to_khepri/4, @@ -69,14 +68,6 @@ with_correlation_id( run_async_fun(Fun, State0) end. --spec get_sub_state(Module, State) -> Ret when - Module :: module(), - State :: state(), - Ret :: any(). - -get_sub_state(Module, #?MODULE{sub_states = SubStates}) -> - maps:get(Module, SubStates). - %% `mnesia_to_khepri_converter' callbacks -spec init_copy_to_khepri(StoreId, MigrationId, Tables, Migrations) -> diff --git a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl index 964d7e74db5c..58cdac1dae12 100644 --- a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl +++ b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2022-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_db_msup_m2k_converter). @@ -19,7 +19,7 @@ copy_to_khepri/3, delete_from_khepri/3]). --record(?MODULE, {record_converters :: [module()]}). +-record(?MODULE, {}). -spec init_copy_to_khepri(StoreId, MigrationId, Tables) -> Ret when StoreId :: khepri:store_id(), @@ -33,8 +33,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) -> %% Clean up any previous attempt to copy the Mnesia table to Khepri. lists:foreach(fun clear_data_in_khepri/1, Tables), - Converters = discover_converters(?MODULE), - SubState = #?MODULE{record_converters = Converters}, + SubState = #?MODULE{}, {ok, SubState}. -spec copy_to_khepri(Table, Record, State) -> Ret when @@ -47,12 +46,8 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) -> %% @private copy_to_khepri(mirrored_sup_childspec = Table, - #mirrored_sup_childspec{} = Record0, + #mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record, State) -> - #?MODULE{record_converters = Converters} = - rabbit_db_m2k_converter:get_sub_state(?MODULE, State), - Record = upgrade_record(Converters, Table, Record0), - #mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record, ?LOG_DEBUG( "Mnesia->Khepri data copy: [~0p] key: ~0p", [Table, Key], @@ -81,10 +76,8 @@ copy_to_khepri(Table, Record, State) -> Reason :: any(). %% @private -delete_from_khepri(mirrored_sup_childspec = Table, Key0, State) -> - #?MODULE{record_converters = Converters} = - rabbit_db_m2k_converter:get_sub_state(?MODULE, State), - {Group, Id} = Key = upgrade_key(Converters, Table, Key0), +delete_from_khepri( + mirrored_sup_childspec = Table, {Group, Id} = Key, State) -> ?LOG_DEBUG( "Mnesia->Khepri data delete: [~0p] key: ~0p", [Table, Key], @@ -109,39 +102,3 @@ clear_data_in_khepri(mirrored_sup_childspec) -> ok -> ok; Error -> throw(Error) end. - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - -discover_converters(MigrationMod) -> - Apps = rabbit_misc:rabbitmq_related_apps(), - AttrsPerApp = rabbit_misc:module_attributes_from_apps( - rabbit_mnesia_records_to_khepri_db, Apps), - discover_converters(MigrationMod, AttrsPerApp, []). - -discover_converters(MigrationMod, [{_App, _AppMod, AppConverters} | Rest], - Converters0) -> - Converters = - lists:foldl(fun({Module, Mod}, Acc) when Module =:= MigrationMod -> - [Mod | Acc]; - (_, Acc) -> - Acc - end, Converters0, AppConverters), - discover_converters(MigrationMod, Rest, Converters); -discover_converters(_MigrationMod, [], Converters) -> - Converters. - -upgrade_record(Converters, Table, Record) -> - lists:foldl(fun(Mod, Record0) -> - Mod:upgrade_record(Table, Record0) - end, Record, Converters). - -upgrade_key(Converters, Table, Key) -> - lists:foldl(fun(Mod, Key0) -> - Mod:upgrade_key(Table, Key0) - end, Key, Converters). diff --git a/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl b/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl deleted file mode 100644 index a3853f31706a..000000000000 --- a/deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl +++ /dev/null @@ -1,24 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_mnesia_to_khepri_record_converter). - -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --callback upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). - --callback upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl index 7b6dc913577b..3bd9c4631d3a 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -8,9 +8,7 @@ -module(rabbit_federation_exchange_link_sup_sup). -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SUPERVISOR, ?MODULE). @@ -20,38 +18,6 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --export([upgrade_record/2, upgrade_key/2]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, #exchange{} = Exchange}} = Record) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(Exchange)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, #exchange{} = Exchange}) -> - {?MODULE, id(Exchange)}; -upgrade_key(_Table, Key) -> - Key. - %%---------------------------------------------------------------------------- start_link() -> diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl index eb22c153dc96..2461c8ed67b2 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl @@ -8,10 +8,8 @@ -module(rabbit_federation_queue_link_sup_sup). -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). -include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit/include/amqqueue.hrl"). -define(SUPERVISOR, ?MODULE). @@ -21,39 +19,6 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --export([upgrade_record/2, upgrade_key/2]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, Q}} = Record) - when ?is_amqqueue(Q) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(Q)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, Q}) when ?is_amqqueue(Q) -> - {?MODULE, id(Q)}; -upgrade_key(_Table, Key) -> - Key. - %%---------------------------------------------------------------------------- start_link() -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index 1bc1edec5fd3..2156417b937e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -6,9 +6,7 @@ %% -module(rabbit_shovel_dyn_worker_sup_sup). - -behaviour(mirrored_supervisor). --behaviour(rabbit_mnesia_to_khepri_record_converter). -export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]). @@ -16,43 +14,9 @@ -import(rabbit_data_coercion, [to_map/1, to_list/1]). -include("rabbit_shovel.hrl"). --include_lib("rabbit/include/mirrored_supervisor.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SUPERVISOR, ?MODULE). -%% Khepri paths don't support tuples or records, so the key part of the -%% #mirrored_sup_childspec{} used by some plugins must be transformed in a -%% valid Khepri path during the migration from Mnesia to Khepri. -%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which -%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks. -%% -%% This mechanism could be reused by any other rabbit_db_*_m2k_converter --export([upgrade_record/2, upgrade_key/2]). - --rabbit_mnesia_records_to_khepri_db( - [ - {rabbit_db_msup_m2k_converter, ?MODULE} - ]). - --spec upgrade_record(Table, Record) -> Record when - Table :: mnesia_to_khepri:mnesia_table(), - Record :: tuple(). -upgrade_record(mirrored_sup_childspec, - #mirrored_sup_childspec{key = {?MODULE, {A, B} = OldId}} = Record) - when is_binary(A) and is_binary(B) -> - Record#mirrored_sup_childspec{key = {?MODULE, id(OldId)}}; -upgrade_record(_Table, Record) -> - Record. - --spec upgrade_key(Table, Key) -> Key when - Table :: mnesia_to_khepri:mnesia_table(), - Key :: any(). -upgrade_key(mirrored_sup_childspec, {?MODULE, {A, B} = OldId}) - when is_binary(A) and is_binary(B) -> - {?MODULE, id(OldId)}; -upgrade_key(_Table, Key) -> - Key. - start_link() -> Pid = case mirrored_supervisor:start_link( {local, ?SUPERVISOR}, ?SUPERVISOR, diff --git a/moduleindex.yaml b/moduleindex.yaml index 7b6aa9bc8102..fbadf554f0ea 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -778,7 +778,6 @@ rabbit: - rabbit_mirror_queue_slave - rabbit_mirror_queue_sync - rabbit_mnesia -- rabbit_mnesia_to_khepri_record_converter - rabbit_msg_file - rabbit_msg_record - rabbit_msg_store From f51cc106bfad5e4008c8e5ea1ef3afdaf78b0e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 7 Feb 2024 17:27:54 +0100 Subject: [PATCH 3/9] rabbitmq_ct_helpers: Make ignored crashes configurable A testsuite just has to set: Config1 = rabbit_ct_helpers:set_config( Config, [{ignored_crashes, ["some fixed string pattern A", "some fixed string pattern B"]} ]), --- .../src/rabbit_ct_broker_helpers.erl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 696ee8feb45a..1e6c00cf120b 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -1089,8 +1089,14 @@ stop_rabbitmq_nodes(Config) -> case FindCrashes of true -> %% TODO: Make the ignore list configurable. - IgnoredCrashes = ["** force_vhost_failure"], - find_crashes_in_logs(NodeConfigs, IgnoredCrashes); + IgnoredCrashes0 = ["** force_vhost_failure"], + case rabbit_ct_helpers:get_config(Config, ignored_crashes) of + undefined -> + find_crashes_in_logs(NodeConfigs, IgnoredCrashes0); + IgnoredCrashes1 -> + find_crashes_in_logs( + NodeConfigs, IgnoredCrashes0 ++ IgnoredCrashes1) + end; false -> ok end, @@ -1172,7 +1178,11 @@ capture_gen_server_termination( Ret = re:run(Line, Prefix ++ "( .*|\\*.*|)$", ReOpts), case Ret of {match, [Suffix]} -> - case lists:member(Suffix, IgnoredCrashes) of + Ignore = lists:any( + fun(IgnoredCrash) -> + string:find(Suffix, IgnoredCrash) =/= nomatch + end, IgnoredCrashes), + case Ignore of false -> capture_gen_server_termination( Rest, Prefix, [Line | Acc], Count, IgnoredCrashes); From 92ca64e773ad711b2d67770d8982ada9b72ac6e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 7 Feb 2024 17:29:41 +0100 Subject: [PATCH 4/9] rabbitmq_ct_helpers: Fix how we locate rabbitmqctl from secondary umbrella We use the same code as `do_start_rabbitmq_node/3`. --- .../src/rabbit_ct_broker_helpers.erl | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 1e6c00cf120b..27a9fc4a0190 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -1269,31 +1269,28 @@ rabbitmqctl(Config, Node, Args, Timeout) -> _ -> CanUseSecondary end, + WithPlugins0 = rabbit_ct_helpers:get_config(Config, + broker_with_plugins), + WithPlugins = case is_list(WithPlugins0) of + true -> lists:nth(I + 1, WithPlugins0); + false -> WithPlugins0 + end, Rabbitmqctl = case UseSecondaryUmbrella of true -> case BazelRunSecCmd of undefined -> - SrcDir = ?config( - secondary_rabbit_srcdir, - Config), - SecDepsDir = ?config( - secondary_erlang_mk_depsdir, - Config), - SecNewScriptsDir = filename:join( - [SecDepsDir, - SrcDir, - "sbin"]), - SecOldScriptsDir = filename:join( - [SecDepsDir, - "rabbit", - "scripts"]), - SecNewScriptsDirExists = filelib:is_dir( - SecNewScriptsDir), - SecScriptsDir = - case SecNewScriptsDirExists of - true -> SecNewScriptsDir; - false -> SecOldScriptsDir - end, + SrcDir = case WithPlugins of + false -> + ?config( + secondary_rabbit_srcdir, + Config); + _ -> + ?config( + secondary_current_srcdir, + Config) + end, + SecScriptsDir = filename:join( + [SrcDir, "sbin"]), rabbit_misc:format( "~ts/rabbitmqctl", [SecScriptsDir]); _ -> From 455a5a22f0d4370999a05955c8686ab07f0ac30f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 2 Feb 2024 16:30:42 +0100 Subject: [PATCH 5/9] rabbitmq_federation: Add testcase for #10306 [Why] An upgrade scenario going from RabbitMQ 3.12.x to the upcoming 3.13.0 was shared in issue #10306 to demonstrate that the change of child ID format broke rolling upgrades when there are existing federated exchanges. [How] The testcase uses 5 nodes: * one upstream node * two "old" downstream nodes * two "new" downstream nodes The old downstream nodes are used to prepare a 2-node cluster that is about to be upgraded. The new downstream nodes are added to the cluster then the old downstream nodes are stopped to simulate that rolling upgrade. The child ID format was restored in the previous commit, thus there is no conversion to handle and the testcase should just work with a fresh 3.13.0+ cluster or with a mixed-version cluster with 3.12.x. It failed during the preparation of the previous commit to make sure it was effective. --- deps/rabbitmq_federation/BUILD.bazel | 2 +- .../test/exchange_SUITE.erl | 123 +++++++++++++++++- 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_federation/BUILD.bazel b/deps/rabbitmq_federation/BUILD.bazel index 1fb2aad791c2..007b672b0536 100644 --- a/deps/rabbitmq_federation/BUILD.bazel +++ b/deps/rabbitmq_federation/BUILD.bazel @@ -96,7 +96,7 @@ rabbitmq_integration_suite( "test/rabbit_federation_test_util.beam", ], flaky = True, - shard_count = 2, + shard_count = 3, ) rabbitmq_integration_suite( diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 31d8825671bb..0f920caca0d7 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -30,13 +30,15 @@ all() -> [ {group, essential}, - {group, cluster_size_3} + {group, cluster_size_3}, + {group, rolling_upgrade} ]. groups() -> [ {essential, [], essential()}, {cluster_size_3, [], [max_hops]}, + {rolling_upgrade, [], [child_id_format]}, {cycle_protection, [], [ %% TBD: port from v3.10.x in an Erlang 25-compatible way ]}, @@ -96,6 +98,12 @@ init_per_group(cluster_size_3 = Group, Config) -> {rmq_nodes_count, 3} ]), init_per_group1(Group, Config1); +init_per_group(rolling_upgrade = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 5}, + {rmq_nodes_clustered, false} + ]), + init_per_group1(Group, Config1); init_per_group(Group, Config) -> init_per_group1(Group, Config). @@ -539,6 +547,119 @@ lookup_exchange_status(Config) -> [key, uri, status, timestamp, id, supervisor, upstream]), clean_up_federation_related_bits(Config). + +child_id_format(Config) -> + [UpstreamNode, + OldNodeA, + NewNodeB, + OldNodeC, + NewNodeD] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Create a cluster with the nodes running the old version of RabbitMQ in + %% mixed-version testing. + %% + %% Note: we build this on the assumption that `rabbit_ct_broker_helpers' + %% starts nodes this way: + %% Node 1: the primary copy of RabbitMQ the test is started from + %% Node 2: the secondary umbrella (if any) + %% Node 3: the primary copy + %% Node 4: the secondary umbrella + %% ... + %% + %% Therefore, `UpstreamNode' will use the primary copy, `OldNodeA' the + %% secondary umbrella, `NewNodeB' the primary copy, and so on. + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, [OldNodeA, OldNodeC]), + + %% Prepare the whole federated exchange on that old cluster. + UpstreamName = <<"fed_on_upgrade">>, + rabbit_ct_broker_helpers:set_parameter( + Config1, OldNodeA, <<"federation-upstream">>, UpstreamName, + [ + {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config1, UpstreamNode)} + ]), + + rabbit_ct_broker_helpers:set_policy( + Config1, OldNodeA, + <<"fed_on_upgrade_policy">>, <<"^fed_">>, <<"all">>, + [ + {<<"federation-upstream-pattern">>, UpstreamName} + ]), + + XName = <<"fed_ex_on_upgrade_cluster">>, + X = exchange_declare_method(XName, <<"direct">>), + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel( + Config1, OldNodeA), + ?assertEqual({'exchange.declare_ok'}, declare_exchange(Ch1, X)), + rabbit_ct_client_helpers:close_channel(Ch1), + rabbit_ct_client_helpers:close_connection(Conn1), + + %% Verify the format of the child ID. In the main branch, the format was + %% temporarily a size-2 tuple with a list as the first element. This was + %% not kept later and the original ID format is used in old and new nodes. + [{Id, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config1, OldNodeA, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup]), + case Id of + %% This is the format we expect everywhere. + #exchange{name = #resource{name = XName}} -> + %% Verify that the supervisors exist on all nodes. + lists:foreach( + fun(Node) -> + ?assertMatch( + [{#exchange{name = #resource{name = XName}}, + _, _, _}], + rabbit_ct_broker_helpers:rpc( + Config1, Node, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup])) + end, [OldNodeA, OldNodeC]), + + %% Simulate a rolling upgrade by: + %% 1. adding new nodes to the old cluster + %% 2. stopping the old nodes + %% + %% After that, the supervisors run on the new code. + Config2 = rabbit_ct_broker_helpers:cluster_nodes( + Config1, [OldNodeA, NewNodeB, NewNodeD]), + ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeA), + ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNodeA), + ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeC), + ok = rabbit_ct_broker_helpers:reset_node(Config2, OldNodeC), + + %% Verify that the supervisors still use the same IDs. + lists:foreach( + fun(Node) -> + ?assertMatch( + [{#exchange{name = #resource{name = XName}}, + _, _, _}], + rabbit_ct_broker_helpers:rpc( + Config2, Node, + mirrored_supervisor, which_children, + [rabbit_federation_exchange_link_sup_sup])) + end, [NewNodeB, NewNodeD]), + + %% Delete the exchange: it should work because the ID format is the + %% one expected. + %% + %% During the transient period where the ID format was changed, + %% this would crash with a badmatch because the running + %% supervisor's ID would not match the content of the database. + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel( + Config2, NewNodeB), + ?assertEqual({'exchange.delete_ok'}, delete_exchange(Ch2, XName)), + rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection(Conn2); + + %% This is the transient format we are not interested in as it only + %% lived in a development branch. + {List, #exchange{name = #resource{name = XName}}} + when is_list(List) -> + {skip, "Testcase skipped with the transiently changed ID format"} + end. + %% %% Test helpers %% From 85286f4c38dba34e7f187ceb281a6d9890fd457f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 7 Feb 2024 17:31:20 +0100 Subject: [PATCH 6/9] rabbitmq_shovel: Add testcase for #9894 [Why] An upgrade scenario going from RabbitMQ 3.11.24 to the upcoming 3.12.8 was shared in issue #9894 to demonstrate that the change of child ID format broke rolling upgrades when there are existing dynamic shovels. [How] The testcase uses 4 nodes: * one reference node * one node to host source and target queues * one "old" node * one "new" node The reference node is using the new version to see what format it uses. The node hosting queues is using the old version but it is not relevant for this one? The testcase uses the old node to create the dynamic shovel, then the new node to simulate an upgrade by clustering it with the old node and stopping the old one. --- deps/rabbitmq_shovel/BUILD.bazel | 10 + deps/rabbitmq_shovel/app.bzl | 9 + .../test/rolling_upgrade_SUITE.erl | 268 ++++++++++++++++++ .../test/shovel_test_utils.erl | 45 ++- 4 files changed, 320 insertions(+), 12 deletions(-) create mode 100644 deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl diff --git a/deps/rabbitmq_shovel/BUILD.bazel b/deps/rabbitmq_shovel/BUILD.bazel index f156c434686f..aaad41c3c622 100644 --- a/deps/rabbitmq_shovel/BUILD.bazel +++ b/deps/rabbitmq_shovel/BUILD.bazel @@ -164,6 +164,16 @@ rabbitmq_suite( ], ) +rabbitmq_integration_suite( + name = "rolling_upgrade_SUITE", + additional_beam = [ + "test/shovel_test_utils.beam", + ], + deps = [ + "@khepri//:erlang_app", + ], +) + rabbitmq_integration_suite( name = "shovel_status_command_SUITE", additional_beam = [ diff --git a/deps/rabbitmq_shovel/app.bzl b/deps/rabbitmq_shovel/app.bzl index 7ce47fc8dfe9..00b16a791c2e 100644 --- a/deps/rabbitmq_shovel/app.bzl +++ b/deps/rabbitmq_shovel/app.bzl @@ -225,6 +225,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], ) + erlang_bytecode( + name = "rolling_upgrade_SUITE_beam_files", + testonly = True, + srcs = ["test/rolling_upgrade_SUITE.erl"], + outs = ["test/rolling_upgrade_SUITE.beam"], + app_name = "rabbitmq_shovel", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app", "@khepri//:erlang_app"], + ) erlang_bytecode( name = "shovel_status_command_SUITE_beam_files", testonly = True, diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl new file mode 100644 index 000000000000..c4051ae3bba6 --- /dev/null +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -0,0 +1,268 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rolling_upgrade_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("khepri/include/khepri.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + child_id_format/1]). + +suite() -> + [{timetrap, {minutes, 5}}]. + +all() -> + [ + {group, mnesia_store}, + {group, khepri_store} + ]. + +groups() -> + [{mnesia_store, [], [child_id_format]}, + {khepri_store, [], [child_id_format]}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(mnesia_store, Config) -> + rabbit_ct_helpers:set_config(Config, [{metadata_store__manual, mnesia}]); +init_per_group(khepri_store, Config) -> + rabbit_ct_helpers:set_config(Config, [{metadata_store__manual, khepri}]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = 4, + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodes_clustered, false}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, + {ignored_crashes, + ["process is stopped by supervisor", + "broker forced connection closure with reason 'shutdown'"]} + ]), + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +child_id_format(Config) -> + [NewRefNode, + OldNode, + NewNode, + NodeWithQueues] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% We build this test on the assumption that `rabbit_ct_broker_helpers' + %% starts nodes this way: + %% Node 1: the primary copy of RabbitMQ the test is started from + %% Node 2: the secondary umbrella (if any) + %% Node 3: the primary copy + %% Node 4: the secondary umbrella + %% ... + %% + %% Therefore, `Pouet' will use the primary copy, `OldNode' the secondary + %% umbrella, `NewRefNode' the primary copy, and `NodeWithQueues' the + %% secondary umbrella. + + %% Declare source and target queues on a node that won't run the shovel. + ct:pal("Declaring queues on node ~s", [NodeWithQueues]), + SourceQName = <<"source-queue">>, + TargetQName = <<"target-queue">>, + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, NodeWithQueues), + lists:foreach( + fun(QName) -> + ?assertEqual( + {'queue.declare_ok', QName, 0, 0}, + amqp_channel:call( + Ch, #'queue.declare'{queue = QName, durable = true})) + end, [SourceQName, TargetQName]), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + + %% Declare a dynamic shovel on the old node. + ct:pal("Declaring queues on node ~s", [OldNode]), + VHost = <<"/">>, + ShovelName = <<"test-shovel">>, + shovel_test_utils:set_param( + Config, + OldNode, + NodeWithQueues, + ShovelName, + [{<<"src-queue">>, SourceQName}, + {<<"dest-queue">>, TargetQName}]), + + %% We declare the same shovel on a new node that won't be clustered with + %% the rest. It is only used as a reference node to determine which ID + %% format the new version is using. + ct:pal("Declaring queues on node ~s (as a reference)", [NewRefNode]), + shovel_test_utils:set_param( + Config, + NewRefNode, + NodeWithQueues, + ShovelName, + [{<<"src-queue">>, SourceQName}, + {<<"dest-queue">>, TargetQName}]), + + %% Verify the format of the child ID. Some versions of RabbitMQ 3.11.x and + %% 3.12.x use a temporary experimental format that was erroneously + %% backported from a work-in-progress happening in the main branch. + ct:pal("Checking mirrored_supervisor child ID formats"), + [{Id0, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config, NewRefNode, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup]), + PrimaryIdType = case Id0 of + {VHost, ShovelName} -> + ct:pal( + "The nodes from the primary umbrella are using " + "the NORMAL mirrored_supervisor child ID format " + "natively"), + normal; + {[VHost, ShovelName], {VHost, ShovelName}} -> + ct:pal( + "The nodes from the primary umbrella are using " + "the TEMPORARY EXPERIMENTAL mirrored_supervisor " + "child ID format natively"), + temp_exp + end, + + [{Id1, _, _, _}] = rabbit_ct_broker_helpers:rpc( + Config, OldNode, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup]), + SecondaryIdType = case Id1 of + {VHost, ShovelName} -> + ct:pal( + "The nodes from the secondary umbrella are " + "using the NORMAL mirrored_supervisor child " + "ID format natively"), + normal; + {[VHost, ShovelName], {VHost, ShovelName}} -> + ct:pal( + "The nodes from the secondary umbrella are " + "using the TEMPORARY EXPERIMENTAL " + "mirrored_supervisor child ID format " + "natively"), + temp_exp + end, + if + PrimaryIdType =/= SecondaryIdType -> + ct:pal( + "The mirrored_supervisor child ID format is changing between " + "the primary and the secondary umbrellas!"); + true -> + ok + end, + + %% Verify that the supervisors exist on all nodes. + ct:pal( + "Checking running mirrored_supervisor children on old node ~s", + [OldNode]), + lists:foreach( + fun(Node) -> + ?assertMatch( + [{Id, _, _, _}] + when (SecondaryIdType =:= normal andalso + Id =:= {VHost, ShovelName}) orelse + (SecondaryIdType =:= temp_exp andalso + Id =:= {[VHost, ShovelName], {VHost, ShovelName}}), + rabbit_ct_broker_helpers:rpc( + Config, Node, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup])) + end, [OldNode]), + + %% Simulate a rolling upgrade by: + %% 1. adding new nodes to the old cluster + %% 2. stopping the old nodes + %% + %% After that, the supervisors run on the new code. + ct:pal("Clustering nodes ~s and ~s", [OldNode, NewNode]), + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, [OldNode, NewNode]), + ok = rabbit_ct_broker_helpers:stop_broker(Config1, OldNode), + ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNode), + + shovel_test_utils:await_shovel(Config, NewNode, ShovelName), + + case ?config(metadata_store__manual, Config) of + mnesia -> + ok; + khepri -> + ok = rabbit_ct_broker_helpers:enable_feature_flag( + Config, [NewNode], khepri_db) + end, + + %% Verify that the supervisors still use the same IDs. + ct:pal( + "Checking running mirrored_supervisor children on new node ~s", + [NewNode]), + lists:foreach( + fun(Node) -> + ?assertMatch( + [{Id, _, _, _}] + when (SecondaryIdType =:= normal andalso + Id =:= {VHost, ShovelName}) orelse + (SecondaryIdType =:= temp_exp andalso + Id =:= {[VHost, ShovelName], {VHost, ShovelName}}), + rabbit_ct_broker_helpers:rpc( + Config1, Node, + mirrored_supervisor, which_children, + [rabbit_shovel_dyn_worker_sup_sup])) + end, [NewNode]), + + case ?config(metadata_store__manual, Config) of + mnesia -> + ok; + khepri -> + Path = rabbit_db_msup:khepri_mirrored_supervisor_path(), + ?assertMatch( + {ok, + #{[rabbit_db_msup, mirrored_supervisor_childspec, + rabbit_shovel_dyn_worker_sup_sup, VHost, ShovelName] := _}}, + rabbit_ct_broker_helpers:rpc( + Config, NewNode, rabbit_khepri, list, + [Path ++ [?KHEPRI_WILDCARD_STAR_STAR]])) + end. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index d59f259cb6af..a5589290fa90 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -8,29 +8,44 @@ -module(shovel_test_utils). -include_lib("common_test/include/ct.hrl"). --export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2, - shovels_from_status/0, get_shovel_status/2, - await/1, await/2, clear_param/2]). +-export([set_param/3, set_param/4, set_param/5, set_param_nowait/3, + await_shovel/2, await_shovel/3, await_shovel1/2, + shovels_from_status/0, get_shovel_status/2, get_shovel_status/3, + await/1, await/2, clear_param/2, clear_param/3]). -make_uri(Config) -> +make_uri(Config, Node) -> Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b", [Hostname, Port]))). + set_param(Config, Name, Value) -> - set_param_nowait(Config, Name, Value), - await_shovel(Config, Name). + set_param_nowait(Config, 0, 0, Name, Value), + await_shovel(Config, 0, Name). + +set_param(Config, Node, Name, Value) -> + set_param(Config, Node, Node, Name, Value). + +set_param(Config, Node, QueueNode, Name, Value) -> + set_param_nowait(Config, Node, QueueNode, Name, Value), + await_shovel(Config, Node, Name). set_param_nowait(Config, Name, Value) -> - Uri = make_uri(Config), - ok = rabbit_ct_broker_helpers:rpc(Config, 0, + set_param_nowait(Config, 0, 0, Name, Value). + +set_param_nowait(Config, Node, QueueNode, Name, Value) -> + Uri = make_uri(Config, QueueNode), + ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_runtime_parameters, set, [ <<"/">>, <<"shovel">>, Name, [{<<"src-uri">>, Uri}, {<<"dest-uri">>, [Uri]} | Value], none]). await_shovel(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, + await_shovel(Config, 0, Name). + +await_shovel(Config, Node, Name) -> + rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, await_shovel1, [Config, Name]). await_shovel1(_Config, Name) -> @@ -41,8 +56,11 @@ shovels_from_status() -> [N || {{<<"/">>, N}, dynamic, {running, _}, _} <- S]. get_shovel_status(Config, Name) -> + get_shovel_status(Config, 0, Name). + +get_shovel_status(Config, Node, Name) -> S = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + Config, Node, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), case S of not_found -> not_found; @@ -70,5 +88,8 @@ await(Pred, Timeout) -> end. clear_param(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, + clear_param(Config, 0, Name). + +clear_param(Config, Node, Name) -> + rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]). From 9d0e2aede00d3efba0c9648f634d0535251c24a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 13 Feb 2024 14:07:35 +0100 Subject: [PATCH 7/9] rabbitmq_shovel: Mark `rolling_upgrade_SUITE` as flaky in Bazel [Why] There is a bug in Khepri that prevents the mirrored supervisor from restarting its processes on the new node. This is unrelated to the shovel plugin or this testsuite. [How] Mark the testsuite as "flaky" until a solution is found in Khepri. --- deps/rabbitmq_shovel/BUILD.bazel | 3 +++ 1 file changed, 3 insertions(+) diff --git a/deps/rabbitmq_shovel/BUILD.bazel b/deps/rabbitmq_shovel/BUILD.bazel index aaad41c3c622..2fd0032bdb46 100644 --- a/deps/rabbitmq_shovel/BUILD.bazel +++ b/deps/rabbitmq_shovel/BUILD.bazel @@ -172,6 +172,9 @@ rabbitmq_integration_suite( deps = [ "@khepri//:erlang_app", ], + # FIXME: As of this writing, there is a bug in Khepri that makes this + # testsuite unstable. + flaky = True, ) rabbitmq_integration_suite( From 21975a5c25608496faf84e24de015cc537490863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 1 Feb 2024 16:30:44 +0100 Subject: [PATCH 8/9] mirrored_supervisor: Restore child ID format [Why] The format was changed to be compatible with Khepri paths. However, this ID is used in in-memory states here and there as well. So changing its format makes upgrades complicated because the code has to handle both the old and new formats possibly used by the mirrored supervisor already running on other nodes. [How] Instead, this patch converts the ID (in its old format) to something compatible with a Khepri path only when we need to build a Khepri path. This relies on the fact that the `Group` is a module and we can call it to let it convert the opaque ID to a Khepri path. While here, improve the type specs to document that a group is always a module name and to document what a child ID can be. --- deps/rabbit/src/mirrored_supervisor.erl | 6 +++- deps/rabbit/src/rabbit_db_msup.erl | 32 +++++++++++-------- .../src/rabbit_db_msup_m2k_converter.erl | 4 +-- .../rabbit/test/mirrored_supervisor_SUITE.erl | 2 +- deps/rabbit/test/rabbit_db_msup_SUITE.erl | 2 +- ...abbit_federation_exchange_link_sup_sup.erl | 11 ++++--- .../rabbit_federation_queue_link_sup_sup.erl | 12 ++++--- 7 files changed, 41 insertions(+), 28 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 8b8f065a5812..1aa33413cbf5 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -137,7 +137,11 @@ -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type group_name() :: any(). +-type group_name() :: module(). +-type child_id() :: term(). %% supervisor:child_id() is not exported. + +-export_type([group_name/0, + child_id/0]). -spec start_link(GroupName, Module, Args) -> startlink_ret() when GroupName :: group_name(), diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl index 5ab135dbc892..3939efa6ae60 100644 --- a/deps/rabbit/src/rabbit_db_msup.erl +++ b/deps/rabbit/src/rabbit_db_msup.erl @@ -73,11 +73,11 @@ table_definitions() -> %% ------------------------------------------------------------------- -spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when - Group :: any(), + Group :: mirrored_supervisor:group_name(), Overall :: pid(), Delegate :: pid() | undefined, ChildSpec :: supervisor2:child_spec(), - Id :: {any(), any()}, + Id :: mirrored_supervisor:child_id(), Ret :: start | undefined | pid(). create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> @@ -129,8 +129,8 @@ write_in_mnesia(Group, Overall, ChildSpec, Id) -> ok = mnesia:write(?TABLE, S, write), ChildSpec. -create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = Id) -> - Path = khepri_mirrored_supervisor_path(Group, SimpleId), +create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) -> + Path = khepri_mirrored_supervisor_path(Group, Id), S = #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Overall, childspec = ChildSpec}, @@ -169,8 +169,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = %% ------------------------------------------------------------------- -spec delete(Group, Id) -> ok when - Group :: any(), - Id :: any(). + Group :: mirrored_supervisor:group_name(), + Id :: mirrored_supervisor:child_id(). delete(Group, Id) -> rabbit_khepri:handle_fallback( @@ -184,16 +184,16 @@ delete_in_mnesia(Group, Id) -> ok = mnesia:delete({?TABLE, {Group, Id}}) end). -delete_in_khepri(Group, {SimpleId, _}) -> - ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, SimpleId)). +delete_in_khepri(Group, Id) -> + ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, Id)). %% ------------------------------------------------------------------- %% find_mirror(). %% ------------------------------------------------------------------- -spec find_mirror(Group, Id) -> Ret when - Group :: any(), - Id :: any(), + Group :: mirrored_supervisor:group_name(), + Id :: mirrored_supervisor:child_id(), Ret :: {ok, pid()} | {error, not_found}. find_mirror(Group, Id) -> @@ -214,8 +214,8 @@ find_mirror_in_mnesia(Group, Id) -> _ -> {error, not_found} end. -find_mirror_in_khepri(Group, {SimpleId, _}) -> - case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, SimpleId)) of +find_mirror_in_khepri(Group, Id) -> + case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, Id)) of {ok, #mirrored_sup_childspec{mirroring_pid = Pid}} -> {ok, Pid}; _ -> @@ -269,7 +269,7 @@ update_all_in_khepri(Overall, OldOverall) -> %% ------------------------------------------------------------------- -spec delete_all(Group) -> ok when - Group :: any(). + Group :: mirrored_supervisor:group_name(). delete_all(Group) -> rabbit_khepri:handle_fallback( @@ -324,5 +324,9 @@ clear_in_khepri() -> khepri_mirrored_supervisor_path() -> [?MODULE, mirrored_supervisor_childspec]. +khepri_mirrored_supervisor_path(Group, Id) + when is_atom(Id) orelse is_binary(Id) -> + [?MODULE, mirrored_supervisor_childspec, Group, Id]; khepri_mirrored_supervisor_path(Group, Id) -> - [?MODULE, mirrored_supervisor_childspec, Group] ++ Id. + IdPath = Group:id_to_khepri_path(Id), + [?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath. diff --git a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl index 58cdac1dae12..e57b030d7768 100644 --- a/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl +++ b/deps/rabbit/src/rabbit_db_msup_m2k_converter.erl @@ -46,13 +46,13 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) -> %% @private copy_to_khepri(mirrored_sup_childspec = Table, - #mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record, + #mirrored_sup_childspec{key = {Group, Id} = Key} = Record, State) -> ?LOG_DEBUG( "Mnesia->Khepri data copy: [~0p] key: ~0p", [Table, Key], #{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}), - Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, SimpleId), + Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, Id), rabbit_db_m2k_converter:with_correlation_id( fun(CorrId) -> Extra = #{async => CorrId}, diff --git a/deps/rabbit/test/mirrored_supervisor_SUITE.erl b/deps/rabbit/test/mirrored_supervisor_SUITE.erl index 07c7a27d2fae..7ce527a684fc 100644 --- a/deps/rabbit/test/mirrored_supervisor_SUITE.erl +++ b/deps/rabbit/test/mirrored_supervisor_SUITE.erl @@ -331,7 +331,7 @@ childspec(Id) -> {id(Id), {?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. id(Id) -> - {[Id], Id}. + Id. pid_of(Id) -> {received, Pid, ping} = call(Id, ping), diff --git a/deps/rabbit/test/rabbit_db_msup_SUITE.erl b/deps/rabbit/test/rabbit_db_msup_SUITE.erl index 382b7639888c..f4a3f9a7fe00 100644 --- a/deps/rabbit/test/rabbit_db_msup_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_msup_SUITE.erl @@ -87,7 +87,7 @@ create_or_update1(_Config) -> passed. id(Id) -> - {[Id], Id}. + Id. find_mirror(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl index 3bd9c4631d3a..63011ccd4359 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -17,6 +17,7 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). +-export([id_to_khepri_path/1]). %%---------------------------------------------------------------------------- @@ -49,12 +50,12 @@ start_child(X) -> adjust({clear_upstream, VHost, UpstreamName}) -> _ = [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) || - {{_, #exchange{name = Name} = X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + {#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), Name#resource.virtual_host == VHost], ok; adjust(Reason) -> _ = [rabbit_federation_link_sup:adjust(Pid, X, Reason) || - {{_, X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + {X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], ok. stop_child(X) -> @@ -77,7 +78,9 @@ init([]) -> %% See comment in rabbit_federation_queue_link_sup_sup:id/1 id(X = #exchange{policy = Policy}) -> X1 = rabbit_exchange:immutable(X), - {simple_id(X), X1#exchange{policy = Policy}}. + X2 = X1#exchange{policy = Policy}, + X2. -simple_id(#exchange{name = #resource{virtual_host = VHost, name = Name}}) -> +id_to_khepri_path( + #exchange{name = #resource{virtual_host = VHost, name = Name}}) -> [exchange, VHost, Name]. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl index 2461c8ed67b2..0b7f8adba2bc 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl @@ -18,6 +18,7 @@ -export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). +-export([id_to_khepri_path/1]). %%---------------------------------------------------------------------------- @@ -51,12 +52,12 @@ start_child(Q) -> adjust({clear_upstream, VHost, UpstreamName}) -> _ = [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) || - {{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), ?amqqueue_vhost_equals(Q, VHost)], ok; adjust(Reason) -> _ = [rabbit_federation_link_sup:adjust(Pid, Q, Reason) || - {{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], ok. stop_child(Q) -> @@ -88,8 +89,9 @@ init([]) -> id(Q) when ?is_amqqueue(Q) -> Policy = amqqueue:get_policy(Q), Q1 = amqqueue:set_immutable(Q), - {simple_id(Q), amqqueue:set_policy(Q1, Policy)}. + Q2 = amqqueue:set_policy(Q1, Policy), + Q2. -simple_id(Q) when ?is_amqqueue(Q) -> - #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q), +id_to_khepri_path(Id) when ?is_amqqueue(Id) -> + #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Id), [queue, VHost, Name]. From 29f485854334752f526856169790aac1a14a0b3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Sun, 4 Feb 2024 14:33:34 +0100 Subject: [PATCH 9/9] rabbitmq_shovel: Restore original mirrored_supervisor child ID handling [Why] We don't need to change the mirrored_supervisor child ID format for Khepri. Unfortunately, the temporary experimental was erroneously backported to 3.11.x and 3.12.x releases... This broke the federation and shovel plugins during upgrades. [How] Here, we restore the original behavior, meaning that the ID stays as it was and we just modify it when we need a Khepri path. The code is updated to know about the temporary experimental format as well because it will be used by the latest 3.11.x and 3.12.x releases. --- .../src/rabbit_shovel_dyn_worker_sup_sup.erl | 102 +++++++++--------- .../src/rabbit_shovel_worker_sup.erl | 8 +- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index 2156417b937e..0b06bdb39090 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -9,6 +9,7 @@ -behaviour(mirrored_supervisor). -export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]). +-export([id_to_khepri_path/1]). -import(rabbit_misc, [pget/2]). -import(rabbit_data_coercion, [to_map/1, to_list/1]). @@ -61,10 +62,9 @@ obfuscated_uris_parameters(Def) when is_list(Def) -> child_exists(Name) -> Id = id(Name), - %% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894. - OldId = old_id(Name), + TmpExpId = temp_experimental_id(Name), lists:any(fun ({ChildId, _, _, _}) -> - ChildId =:= Id orelse ChildId =:= OldId + ChildId =:= Id orelse ChildId =:= TmpExpId end, mirrored_supervisor:which_children(?SUPERVISOR)). @@ -74,20 +74,14 @@ stop_child({VHost, ShovelName} = Name) -> case get({shovel_worker_autodelete, Name}) of true -> ok; %% [1] _ -> - case stop_and_delete_child(id(Name)) of + Id = id(Name), + case stop_and_delete_child(Id) of ok -> ok; {error, not_found} -> - case rabbit_khepri:is_enabled() of - true -> - %% Old id format is not supported by and cannot exist in Khepri - ok; - false -> - %% try older format, pre 3.13.0 and 3.12.8. - %% See rabbitmq/rabbitmq-server#9894. - _ = stop_and_delete_child(old_id(Name)), - ok - end + TmpExpId = temp_experimental_id(Name), + _ = stop_and_delete_child(TmpExpId), + ok end, rabbit_shovel_status:remove(Name) end, @@ -112,48 +106,54 @@ stop_and_delete_child(Id) -> cleanup_specs() -> Children = mirrored_supervisor:which_children(?SUPERVISOR), - - ChildIdSet = sets:from_list([element(1, S) || S <- Children]), - ParamsSet = params_to_child_ids(rabbit_khepri:is_enabled()), - F = fun(ChildId, ok) -> - try - %% The supervisor operation is very unlikely to fail, it's the schema - %% data stores that can make a fuss about a non-existent or non-standard value passed in. - %% For example, an old style Shovel name is an invalid Khepri query path element. MK. - _ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId) - catch _:_:_Stacktrace -> - ok - end, - ok - end, + ParamsSet = sets:from_list( + [id({proplists:get_value(vhost, S), + proplists:get_value(name, S)}) + || S <- rabbit_runtime_parameters:list_component( + <<"shovel">>)]), %% Delete any supervisor children that do not have their respective runtime parameters in the database. - SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet), - ok = sets:fold(F, ok, SetToCleanUp). - -params_to_child_ids(_KhepriEnabled = true) -> - %% Old id format simply cannot exist in Khepri because having Khepri enabled - %% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK. - sets:from_list([id({proplists:get_value(vhost, S), proplists:get_value(name, S)}) - || S <- rabbit_runtime_parameters:list_component(<<"shovel">>)]); -params_to_child_ids(_KhepriEnabled = false) -> - sets:from_list( - lists:flatmap( - fun(S) -> - Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)}, - %% Supervisor Id format was different pre 3.13.0 and 3.12.8. - %% Try both formats to cover the transitionary mixed version cluster period. - [id(Name), old_id(Name)] - end, - rabbit_runtime_parameters:list_component(<<"shovel">>))). + lists:foreach( + fun + ({{VHost, ShovelName} = ChildId, _, _, _}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(ChildId, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end; + ({{List, {VHost, ShovelName} = Id} = ChildId, _, _, _}) + when is_list(List) andalso + is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(Id, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end + end, Children). %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 3, 10}, []}}. -id({V, S} = Name) -> - {[V, S], Name}. - -%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894 -old_id({_V, _S} = Name) -> +id({VHost, ShovelName} = Name) + when is_binary(VHost) andalso is_binary(ShovelName) -> Name. + +id_to_khepri_path({VHost, ShovelName}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]; +id_to_khepri_path({List, {VHost, ShovelName}}) + when is_list(List) andalso is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]. + +%% Temporary experimental format, erroneously backported to some 3.11.x and +%% 3.12.x releases in rabbitmq/rabbitmq-server#9796. +%% +%% See rabbitmq/rabbitmq-server#10306. +temp_experimental_id({V, S} = Name) -> + {[V, S], Name}. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl index a4266e7573d9..50056d023042 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl @@ -9,6 +9,7 @@ -behaviour(mirrored_supervisor). -export([start_link/2, init/1]). +-export([id_to_khepri_path/1]). -include("rabbit_shovel.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -30,5 +31,8 @@ init([Name, Config]) -> [rabbit_shovel_worker]}], {ok, {{one_for_one, 1, ?MAX_WAIT}, ChildSpecs}}. -id(Name) -> - {[Name], Name}. +id(Name) when is_atom(Name) -> + Name. + +id_to_khepri_path(Name) when is_atom(Name) -> + [Name].