Skip to content

Commit

Permalink
Merge pull request #10472 from rabbitmq/rework-mirrored_supervisor-ch…
Browse files Browse the repository at this point in the history
…ild-id

Rework `mirrored_supervisor` child ID format
  • Loading branch information
michaelklishin authored Feb 13, 2024
2 parents ea2119e + 29f4858 commit c14bf13
Show file tree
Hide file tree
Showing 21 changed files with 608 additions and 399 deletions.
5 changes: 1 addition & 4 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_mnesia_to_khepri_record_converter.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
Expand Down Expand Up @@ -433,7 +432,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_mnesia_to_khepri_record_converter.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
Expand Down Expand Up @@ -543,7 +541,6 @@ def all_srcs(name = "all_srcs"):
"include/gm_specs.hrl",
"include/internal_user.hrl",
"include/mc.hrl",
"include/mirrored_supervisor.hrl",
"include/rabbit_global_counters.hrl",
"include/vhost.hrl",
"include/vhost_v2.hrl",
Expand All @@ -557,6 +554,7 @@ def all_srcs(name = "all_srcs"):
filegroup(
name = "private_hdrs",
srcs = [
"src/mirrored_supervisor.hrl",
"src/rabbit_feature_flags.hrl",
"src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl",
Expand Down Expand Up @@ -715,7 +713,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_mnesia_to_khepri_record_converter.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/mirrored_supervisor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.

-type group_name() :: any().
-type group_name() :: module().
-type child_id() :: term(). %% supervisor:child_id() is not exported.

-export_type([group_name/0,
child_id/0]).

-spec start_link(GroupName, Module, Args) -> startlink_ret() when
GroupName :: group_name(),
Expand Down
File renamed without changes.
11 changes: 1 addition & 10 deletions deps/rabbit/src/rabbit_db_m2k_converter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").

%% Functions for `rabbit_db_*_m2k_converter' modules to call.
-export([with_correlation_id/2,
get_sub_state/2]).
-export([with_correlation_id/2]).

%% `mnesia_to_khepri_converter' callbacks.
-export([init_copy_to_khepri/4,
Expand Down Expand Up @@ -69,14 +68,6 @@ with_correlation_id(
run_async_fun(Fun, State0)
end.

-spec get_sub_state(Module, State) -> Ret when
Module :: module(),
State :: state(),
Ret :: any().

get_sub_state(Module, #?MODULE{sub_states = SubStates}) ->
maps:get(Module, SubStates).

%% `mnesia_to_khepri_converter' callbacks

-spec init_copy_to_khepri(StoreId, MigrationId, Tables, Migrations) ->
Expand Down
32 changes: 18 additions & 14 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ table_definitions() ->
%% -------------------------------------------------------------------

-spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when
Group :: any(),
Group :: mirrored_supervisor:group_name(),
Overall :: pid(),
Delegate :: pid() | undefined,
ChildSpec :: supervisor2:child_spec(),
Id :: {any(), any()},
Id :: mirrored_supervisor:child_id(),
Ret :: start | undefined | pid().

create_or_update(Group, Overall, Delegate, ChildSpec, Id) ->
Expand Down Expand Up @@ -129,8 +129,8 @@ write_in_mnesia(Group, Overall, ChildSpec, Id) ->
ok = mnesia:write(?TABLE, S, write),
ChildSpec.

create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = Id) ->
Path = khepri_mirrored_supervisor_path(Group, SimpleId),
create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
Path = khepri_mirrored_supervisor_path(Group, Id),
S = #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Overall,
childspec = ChildSpec},
Expand Down Expand Up @@ -169,8 +169,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} =
%% -------------------------------------------------------------------

-spec delete(Group, Id) -> ok when
Group :: any(),
Id :: any().
Group :: mirrored_supervisor:group_name(),
Id :: mirrored_supervisor:child_id().

delete(Group, Id) ->
rabbit_khepri:handle_fallback(
Expand All @@ -184,16 +184,16 @@ delete_in_mnesia(Group, Id) ->
ok = mnesia:delete({?TABLE, {Group, Id}})
end).

delete_in_khepri(Group, {SimpleId, _}) ->
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, SimpleId)).
delete_in_khepri(Group, Id) ->
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, Id)).

%% -------------------------------------------------------------------
%% find_mirror().
%% -------------------------------------------------------------------

-spec find_mirror(Group, Id) -> Ret when
Group :: any(),
Id :: any(),
Group :: mirrored_supervisor:group_name(),
Id :: mirrored_supervisor:child_id(),
Ret :: {ok, pid()} | {error, not_found}.

find_mirror(Group, Id) ->
Expand All @@ -214,8 +214,8 @@ find_mirror_in_mnesia(Group, Id) ->
_ -> {error, not_found}
end.

find_mirror_in_khepri(Group, {SimpleId, _}) ->
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, SimpleId)) of
find_mirror_in_khepri(Group, Id) ->
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, Id)) of
{ok, #mirrored_sup_childspec{mirroring_pid = Pid}} ->
{ok, Pid};
_ ->
Expand Down Expand Up @@ -269,7 +269,7 @@ update_all_in_khepri(Overall, OldOverall) ->
%% -------------------------------------------------------------------

-spec delete_all(Group) -> ok when
Group :: any().
Group :: mirrored_supervisor:group_name().

delete_all(Group) ->
rabbit_khepri:handle_fallback(
Expand Down Expand Up @@ -324,5 +324,9 @@ clear_in_khepri() ->
khepri_mirrored_supervisor_path() ->
[?MODULE, mirrored_supervisor_childspec].

khepri_mirrored_supervisor_path(Group, Id)
when is_atom(Id) orelse is_binary(Id) ->
[?MODULE, mirrored_supervisor_childspec, Group, Id];
khepri_mirrored_supervisor_path(Group, Id) ->
[?MODULE, mirrored_supervisor_childspec, Group] ++ Id.
IdPath = Group:id_to_khepri_path(Id),
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.
57 changes: 7 additions & 50 deletions deps/rabbit/src/rabbit_db_msup_m2k_converter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% Copyright (c) 2022-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_db_msup_m2k_converter).
Expand All @@ -19,7 +19,7 @@
copy_to_khepri/3,
delete_from_khepri/3]).

-record(?MODULE, {record_converters :: [module()]}).
-record(?MODULE, {}).

-spec init_copy_to_khepri(StoreId, MigrationId, Tables) -> Ret when
StoreId :: khepri:store_id(),
Expand All @@ -33,8 +33,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
%% Clean up any previous attempt to copy the Mnesia table to Khepri.
lists:foreach(fun clear_data_in_khepri/1, Tables),

Converters = discover_converters(?MODULE),
SubState = #?MODULE{record_converters = Converters},
SubState = #?MODULE{},
{ok, SubState}.

-spec copy_to_khepri(Table, Record, State) -> Ret when
Expand All @@ -47,17 +46,13 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
%% @private

copy_to_khepri(mirrored_sup_childspec = Table,
#mirrored_sup_childspec{} = Record0,
#mirrored_sup_childspec{key = {Group, Id} = Key} = Record,
State) ->
#?MODULE{record_converters = Converters} =
rabbit_db_m2k_converter:get_sub_state(?MODULE, State),
Record = upgrade_record(Converters, Table, Record0),
#mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record,
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] key: ~0p",
[Table, Key],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, SimpleId),
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, Id),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{async => CorrId},
Expand All @@ -81,10 +76,8 @@ copy_to_khepri(Table, Record, State) ->
Reason :: any().
%% @private

delete_from_khepri(mirrored_sup_childspec = Table, Key0, State) ->
#?MODULE{record_converters = Converters} =
rabbit_db_m2k_converter:get_sub_state(?MODULE, State),
{Group, Id} = Key = upgrade_key(Converters, Table, Key0),
delete_from_khepri(
mirrored_sup_childspec = Table, {Group, Id} = Key, State) ->
?LOG_DEBUG(
"Mnesia->Khepri data delete: [~0p] key: ~0p",
[Table, Key],
Expand All @@ -109,39 +102,3 @@ clear_data_in_khepri(mirrored_sup_childspec) ->
ok -> ok;
Error -> throw(Error)
end.

%% Khepri paths don't support tuples or records, so the key part of the
%% #mirrored_sup_childspec{} used by some plugins must be transformed in a
%% valid Khepri path during the migration from Mnesia to Khepri.
%% `rabbit_db_msup_m2k_converter` iterates over all declared converters, which
%% must implement `rabbit_mnesia_to_khepri_record_converter` behaviour callbacks.
%%
%% This mechanism could be reused by any other rabbit_db_*_m2k_converter

discover_converters(MigrationMod) ->
Apps = rabbit_misc:rabbitmq_related_apps(),
AttrsPerApp = rabbit_misc:module_attributes_from_apps(
rabbit_mnesia_records_to_khepri_db, Apps),
discover_converters(MigrationMod, AttrsPerApp, []).

discover_converters(MigrationMod, [{_App, _AppMod, AppConverters} | Rest],
Converters0) ->
Converters =
lists:foldl(fun({Module, Mod}, Acc) when Module =:= MigrationMod ->
[Mod | Acc];
(_, Acc) ->
Acc
end, Converters0, AppConverters),
discover_converters(MigrationMod, Rest, Converters);
discover_converters(_MigrationMod, [], Converters) ->
Converters.

upgrade_record(Converters, Table, Record) ->
lists:foldl(fun(Mod, Record0) ->
Mod:upgrade_record(Table, Record0)
end, Record, Converters).

upgrade_key(Converters, Table, Key) ->
lists:foldl(fun(Mod, Key0) ->
Mod:upgrade_key(Table, Key0)
end, Key, Converters).
24 changes: 0 additions & 24 deletions deps/rabbit/src/rabbit_mnesia_to_khepri_record_converter.erl

This file was deleted.

2 changes: 1 addition & 1 deletion deps/rabbit/test/mirrored_supervisor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ childspec(Id) ->
{id(Id), {?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.

id(Id) ->
{[Id], Id}.
Id.

pid_of(Id) ->
{received, Pid, ping} = call(Id, ping),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/rabbit_db_msup_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ create_or_update1(_Config) ->
passed.

id(Id) ->
{[Id], Id}.
Id.

find_mirror(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]).
Expand Down
55 changes: 31 additions & 24 deletions deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,14 @@ stop_rabbitmq_nodes(Config) ->
case FindCrashes of
true ->
%% TODO: Make the ignore list configurable.
IgnoredCrashes = ["** force_vhost_failure"],
find_crashes_in_logs(NodeConfigs, IgnoredCrashes);
IgnoredCrashes0 = ["** force_vhost_failure"],
case rabbit_ct_helpers:get_config(Config, ignored_crashes) of
undefined ->
find_crashes_in_logs(NodeConfigs, IgnoredCrashes0);
IgnoredCrashes1 ->
find_crashes_in_logs(
NodeConfigs, IgnoredCrashes0 ++ IgnoredCrashes1)
end;
false ->
ok
end,
Expand Down Expand Up @@ -1172,7 +1178,11 @@ capture_gen_server_termination(
Ret = re:run(Line, Prefix ++ "( .*|\\*.*|)$", ReOpts),
case Ret of
{match, [Suffix]} ->
case lists:member(Suffix, IgnoredCrashes) of
Ignore = lists:any(
fun(IgnoredCrash) ->
string:find(Suffix, IgnoredCrash) =/= nomatch
end, IgnoredCrashes),
case Ignore of
false ->
capture_gen_server_termination(
Rest, Prefix, [Line | Acc], Count, IgnoredCrashes);
Expand Down Expand Up @@ -1259,31 +1269,28 @@ rabbitmqctl(Config, Node, Args, Timeout) ->
_ ->
CanUseSecondary
end,
WithPlugins0 = rabbit_ct_helpers:get_config(Config,
broker_with_plugins),
WithPlugins = case is_list(WithPlugins0) of
true -> lists:nth(I + 1, WithPlugins0);
false -> WithPlugins0
end,
Rabbitmqctl = case UseSecondaryUmbrella of
true ->
case BazelRunSecCmd of
undefined ->
SrcDir = ?config(
secondary_rabbit_srcdir,
Config),
SecDepsDir = ?config(
secondary_erlang_mk_depsdir,
Config),
SecNewScriptsDir = filename:join(
[SecDepsDir,
SrcDir,
"sbin"]),
SecOldScriptsDir = filename:join(
[SecDepsDir,
"rabbit",
"scripts"]),
SecNewScriptsDirExists = filelib:is_dir(
SecNewScriptsDir),
SecScriptsDir =
case SecNewScriptsDirExists of
true -> SecNewScriptsDir;
false -> SecOldScriptsDir
end,
SrcDir = case WithPlugins of
false ->
?config(
secondary_rabbit_srcdir,
Config);
_ ->
?config(
secondary_current_srcdir,
Config)
end,
SecScriptsDir = filename:join(
[SrcDir, "sbin"]),
rabbit_misc:format(
"~ts/rabbitmqctl", [SecScriptsDir]);
_ ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_federation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ rabbitmq_integration_suite(
"test/rabbit_federation_test_util.beam",
],
flaky = True,
shard_count = 2,
shard_count = 3,
)

rabbitmq_integration_suite(
Expand Down
Loading

0 comments on commit c14bf13

Please sign in to comment.