From 29f485854334752f526856169790aac1a14a0b3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Sun, 4 Feb 2024 14:33:34 +0100 Subject: [PATCH] rabbitmq_shovel: Restore original mirrored_supervisor child ID handling [Why] We don't need to change the mirrored_supervisor child ID format for Khepri. Unfortunately, the temporary experimental was erroneously backported to 3.11.x and 3.12.x releases... This broke the federation and shovel plugins during upgrades. [How] Here, we restore the original behavior, meaning that the ID stays as it was and we just modify it when we need a Khepri path. The code is updated to know about the temporary experimental format as well because it will be used by the latest 3.11.x and 3.12.x releases. --- .../src/rabbit_shovel_dyn_worker_sup_sup.erl | 102 +++++++++--------- .../src/rabbit_shovel_worker_sup.erl | 8 +- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl index 2156417b937e..0b06bdb39090 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl @@ -9,6 +9,7 @@ -behaviour(mirrored_supervisor). -export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]). +-export([id_to_khepri_path/1]). -import(rabbit_misc, [pget/2]). -import(rabbit_data_coercion, [to_map/1, to_list/1]). @@ -61,10 +62,9 @@ obfuscated_uris_parameters(Def) when is_list(Def) -> child_exists(Name) -> Id = id(Name), - %% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894. - OldId = old_id(Name), + TmpExpId = temp_experimental_id(Name), lists:any(fun ({ChildId, _, _, _}) -> - ChildId =:= Id orelse ChildId =:= OldId + ChildId =:= Id orelse ChildId =:= TmpExpId end, mirrored_supervisor:which_children(?SUPERVISOR)). @@ -74,20 +74,14 @@ stop_child({VHost, ShovelName} = Name) -> case get({shovel_worker_autodelete, Name}) of true -> ok; %% [1] _ -> - case stop_and_delete_child(id(Name)) of + Id = id(Name), + case stop_and_delete_child(Id) of ok -> ok; {error, not_found} -> - case rabbit_khepri:is_enabled() of - true -> - %% Old id format is not supported by and cannot exist in Khepri - ok; - false -> - %% try older format, pre 3.13.0 and 3.12.8. - %% See rabbitmq/rabbitmq-server#9894. - _ = stop_and_delete_child(old_id(Name)), - ok - end + TmpExpId = temp_experimental_id(Name), + _ = stop_and_delete_child(TmpExpId), + ok end, rabbit_shovel_status:remove(Name) end, @@ -112,48 +106,54 @@ stop_and_delete_child(Id) -> cleanup_specs() -> Children = mirrored_supervisor:which_children(?SUPERVISOR), - - ChildIdSet = sets:from_list([element(1, S) || S <- Children]), - ParamsSet = params_to_child_ids(rabbit_khepri:is_enabled()), - F = fun(ChildId, ok) -> - try - %% The supervisor operation is very unlikely to fail, it's the schema - %% data stores that can make a fuss about a non-existent or non-standard value passed in. - %% For example, an old style Shovel name is an invalid Khepri query path element. MK. - _ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId) - catch _:_:_Stacktrace -> - ok - end, - ok - end, + ParamsSet = sets:from_list( + [id({proplists:get_value(vhost, S), + proplists:get_value(name, S)}) + || S <- rabbit_runtime_parameters:list_component( + <<"shovel">>)]), %% Delete any supervisor children that do not have their respective runtime parameters in the database. - SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet), - ok = sets:fold(F, ok, SetToCleanUp). - -params_to_child_ids(_KhepriEnabled = true) -> - %% Old id format simply cannot exist in Khepri because having Khepri enabled - %% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK. - sets:from_list([id({proplists:get_value(vhost, S), proplists:get_value(name, S)}) - || S <- rabbit_runtime_parameters:list_component(<<"shovel">>)]); -params_to_child_ids(_KhepriEnabled = false) -> - sets:from_list( - lists:flatmap( - fun(S) -> - Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)}, - %% Supervisor Id format was different pre 3.13.0 and 3.12.8. - %% Try both formats to cover the transitionary mixed version cluster period. - [id(Name), old_id(Name)] - end, - rabbit_runtime_parameters:list_component(<<"shovel">>))). + lists:foreach( + fun + ({{VHost, ShovelName} = ChildId, _, _, _}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(ChildId, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end; + ({{List, {VHost, ShovelName} = Id} = ChildId, _, _, _}) + when is_list(List) andalso + is_binary(VHost) andalso is_binary(ShovelName) -> + case sets:is_element(Id, ParamsSet) of + false -> + _ = mirrored_supervisor:delete_child( + ?SUPERVISOR, ChildId); + true -> + ok + end + end, Children). %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 3, 10}, []}}. -id({V, S} = Name) -> - {[V, S], Name}. - -%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894 -old_id({_V, _S} = Name) -> +id({VHost, ShovelName} = Name) + when is_binary(VHost) andalso is_binary(ShovelName) -> Name. + +id_to_khepri_path({VHost, ShovelName}) + when is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]; +id_to_khepri_path({List, {VHost, ShovelName}}) + when is_list(List) andalso is_binary(VHost) andalso is_binary(ShovelName) -> + [VHost, ShovelName]. + +%% Temporary experimental format, erroneously backported to some 3.11.x and +%% 3.12.x releases in rabbitmq/rabbitmq-server#9796. +%% +%% See rabbitmq/rabbitmq-server#10306. +temp_experimental_id({V, S} = Name) -> + {[V, S], Name}. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl index a4266e7573d9..50056d023042 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl @@ -9,6 +9,7 @@ -behaviour(mirrored_supervisor). -export([start_link/2, init/1]). +-export([id_to_khepri_path/1]). -include("rabbit_shovel.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -30,5 +31,8 @@ init([Name, Config]) -> [rabbit_shovel_worker]}], {ok, {{one_for_one, 1, ?MAX_WAIT}, ChildSpecs}}. -id(Name) -> - {[Name], Name}. +id(Name) when is_atom(Name) -> + Name. + +id_to_khepri_path(Name) when is_atom(Name) -> + [Name].