Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas o32 upstream.d31 #9

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(CACHE_SIZE_JITTER, 25).
Expand Down Expand Up @@ -107,7 +108,8 @@
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity,
max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity,
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
Expand Down
11 changes: 10 additions & 1 deletion src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{max_mergebelow, 24},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
Expand Down Expand Up @@ -201,6 +202,12 @@
% The maximum number of slots in a SST file. All testing is done
% at a size of 256 (except for Quickcheck tests}, altering this
% value is not recommended
{max_mergeblow, pos_integer()|infinity} |
% The maximum number of files for a single file to be merged into
% within the ledger. If less than this, the merge will continue
% without a maximum. If this or more overlapping below, only up
% to max_mergebelow div 2 additions should be created (the merge
% should be partial)
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
Expand Down Expand Up @@ -293,7 +300,7 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|zstd|none} |
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4 or zstd).
Expand Down Expand Up @@ -1836,6 +1843,7 @@ set_options(Opts, Monitor) ->
CompressionLevel = proplists:get_value(compression_level, Opts),

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
MaxMergeBelow = proplists:get_value(max_mergebelow, Opts),

ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),

Expand Down Expand Up @@ -1869,6 +1877,7 @@ set_options(Opts, Monitor) ->
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
max_mergebelow = MaxMergeBelow,
monitor = Monitor},
monitor = Monitor}
}.
Expand Down
4 changes: 3 additions & 1 deletion src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
pc010 =>
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
pc011 =>
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>},
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>},
pc012 =>
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
pc013 =>
Expand All @@ -172,6 +172,8 @@
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
pc025 =>
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
pc026 =>
{info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>},
pm002 =>
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
sst03 =>
Expand Down
220 changes: 168 additions & 52 deletions src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,61 +262,177 @@ perform_merge(Manifest,
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions =
do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
OptsSST,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
MaxMergeBelow = OptsSST#sst_options.max_mergebelow,
MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow),
{L2Additions, L1Additions, L2FileRemainder} =
do_merge(
SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
OptsSST,
[],
MergeLimit
),
RevertPointerFun = fun({next, ME, _SK}) -> ME end,
SinkManifestRemovals =
lists:subtract(
lists:map(RevertPointerFun, SinkList),
lists:map(RevertPointerFun, L2FileRemainder)
),
Man0 =
leveled_pmanifest:replace_manifest_entry(
Manifest,
NewSQN,
SinkLevel,
SinkManifestRemovals,
L2Additions
),
Man1 =
case L1Additions of
[] ->
leveled_pmanifest:remove_manifest_entry(
Man0,
NewSQN,
SrcLevel,
Src
);
PartialFiles ->
leveled_pmanifest:replace_manifest_entry(
Man0,
NewSQN,
SrcLevel,
[Src],
PartialFiles
)
end,
SinkManifestList = lists:map(RevertPointerFun, SinkList),
Man0 = leveled_pmanifest:replace_manifest_entry(Manifest,
NewSQN,
SinkLevel,
SinkManifestList,
Additions),
Man2 = leveled_pmanifest:remove_manifest_entry(Man0,
NewSQN,
SrcLevel,
Src),
{Man2, [Src|SinkManifestList]}.

do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel,
length(Additions)),
{Man1, [Src|SinkManifestRemovals]}.

-spec merge_limit(
non_neg_integer(), non_neg_integer(), pos_integer()|infinity)
-> pos_integer()|infinity.
merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB ->
infinity;
merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) ->
AdditionsLimit = max(1, MMB div 2),
leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]),
AdditionsLimit.

-type merge_maybe_expanded_pointer() ::
leveled_codec:ledger_kv()|
leveled_sst:slot_pointer()|
leveled_sst:sst_pointer().
% Different to leveled_sst:maybe_expanded_pointer/0
% No sst_closed_pointer()

do_merge(
[], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]),
{lists:reverse(Additions), [], []};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max)
when length(Additions) >= Max ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]),
FNSrc =
leveled_penciller:sst_filename(
NewSQN, SinkLevel - 1, 1
),
FNSnk =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions) + 1
),
{ExpandedKL1, []} = split_unexpanded_files(KL1),
{ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2),
TS1 = os:timestamp(),
InfOpts = OptsSST#sst_options{max_sstslots = infinity},
% Need to be careful to make sure all the remainder goes in one file,
% could be situations whereby the max_sstslots has been changed between
% restarts - and so there is too much data for one file in the
% remainder ... but don't want to loop round and consider more complex
% scenarios here.
NewMergeKL1 =
leveled_sst:sst_newmerge(
RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts
),
TS2 = os:timestamp(),
NewMergeKL2 =
leveled_sst:sst_newmerge(
RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts
),
{KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []),
{KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions),
{lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) ->
FileName =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions)
),
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_newmerge(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty ->
leveled_log:log(pc013, [FileName]),
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
Additions);
{ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry = #manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName,
bloom=Bloom},
leveled_log:log_timer(pc015, [], TS1),
do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
Additions ++ [Entry])
end.
NewMerge =
leveled_sst:sst_newmerge(
RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST),
{UpdAdditions, KL1Rem, KL2Rem} =
add_entry(NewMerge, FileName, TS1, Additions),
do_merge(
KL1Rem,
KL2Rem,
SinkLevel,
SinkB,
RP,
NewSQN,
MaxSQN,
OptsSST,
UpdAdditions,
Max
).

add_entry(empty, FileName, _TS1, Additions) ->
leveled_log:log(pc013, [FileName]),
{[], [], Additions};
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
#manifest_entry{
start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName,
bloom=Bloom
},
leveled_log:log_timer(pc015, [], TS1),
{[Entry|Additions], KL1Rem, KL2Rem}.


-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files(Pointers) ->
split_unexpanded_files(Pointers, [], []).

-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer()),
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files([], MaybeExpanded, FilePointers) ->
{lists:reverse(MaybeExpanded), lists:reverse(FilePointers)};
split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) ->
split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]);
split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []);
split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(
Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], []
).

-spec grooming_scorer(
list(leveled_pmanifest:manifest_entry()))
Expand Down
52 changes: 38 additions & 14 deletions src/leveled_sst.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,17 @@

-type build_timings() :: no_timing|#build_timings{}.

-export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]).
-export_type(
[
expandable_pointer/0,
sst_closed_pointer/0,
sst_pointer/0,
slot_pointer/0,
press_method/0,
segment_check_fun/0,
sst_options/0
]
).

%%%============================================================================
%%% API
Expand Down Expand Up @@ -292,17 +302,29 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
{ok, Pid, {SK, EK}, Bloom}
end.

-spec sst_newmerge(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), leveled_pmanifest:lsm_level(),
integer(), sst_options())
-> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())},
leveled_codec:ledger_key(),
leveled_codec:ledger_key()},
binary()}.
-spec sst_newmerge(
string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(),
leveled_pmanifest:lsm_level(),
integer(),
sst_options())
->
empty|
{
ok,
pid(),
{
{
list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())
},
leveled_codec:ledger_key(),
leveled_codec:ledger_key()
},
binary()
}.
%% @doc
%% Start a new SST file at the assigned level passing in a two lists of
%% {Key, Value} pairs to be merged. The merge_lists function will use the
Expand Down Expand Up @@ -1433,7 +1455,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) ->
PressMethod.

-spec maxslots_level(
leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer().
leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity.
maxslots_level(_Level, infinity) ->
infinity;
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
MaxSlotCount;
maxslots_level(_Level, MaxSlotCount) ->
Expand Down Expand Up @@ -2787,7 +2811,7 @@ merge_lists(
list(binary_slot()),
leveled_codec:ledger_key()|null,
non_neg_integer(),
non_neg_integer(),
pos_integer()|infinity,
press_method(),
boolean(),
non_neg_integer()|not_counted,
Expand Down
Loading
Loading