diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 211abc1ef..122ede81f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -76,6 +76,7 @@ jobs: ar_tx_db, ar_unbalanced_merkle, ar_util, + ar_verify_chunks, ar_wallet, ar_webhook, ar_pool, diff --git a/apps/arweave/include/ar_config.hrl b/apps/arweave/include/ar_config.hrl index 3fdaef380..eedc1ffc2 100644 --- a/apps/arweave/include/ar_config.hrl +++ b/apps/arweave/include/ar_config.hrl @@ -119,6 +119,7 @@ init = false, port = ?DEFAULT_HTTP_IFACE_PORT, mine = false, + verify = false, peers = [], block_gossip_peers = [], local_peers = [], diff --git a/apps/arweave/include/ar_poa.hrl b/apps/arweave/include/ar_poa.hrl new file mode 100644 index 000000000..11eccd05d --- /dev/null +++ b/apps/arweave/include/ar_poa.hrl @@ -0,0 +1,26 @@ +-ifndef(AR_POA_HRL). +-define(AR_POA_HRL, true). + +-record(chunk_proof, { + absolute_offset :: non_neg_integer(), + tx_root :: binary(), + tx_path :: binary(), + data_root :: binary(), + data_path :: binary(), + tx_start_offset :: non_neg_integer(), + tx_end_offset :: non_neg_integer(), + block_start_offset :: non_neg_integer(), + block_end_offset :: non_neg_integer(), + chunk_id :: binary(), + chunk_start_offset :: non_neg_integer(), + chunk_end_offset :: non_neg_integer(), + validate_data_path_ruleset :: + 'offset_rebase_support_ruleset' | + 'strict_data_split_ruleset' | + 'strict_borders_ruleset', + tx_path_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid', + data_path_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid', + chunk_is_valid = not_validated :: 'not_validated' | 'valid' | 'invalid' +}). + +-endif. diff --git a/apps/arweave/include/ar_verify_chunks.hrl b/apps/arweave/include/ar_verify_chunks.hrl new file mode 100644 index 000000000..9ab4fe12e --- /dev/null +++ b/apps/arweave/include/ar_verify_chunks.hrl @@ -0,0 +1,14 @@ +-ifndef(AR_VERIFY_CHUNKS_HRL). +-define(AR_VERIFY_CHUNKS_HRL, true). + +-record(verify_report, { + start_time :: non_neg_integer(), + total_error_bytes = 0 :: non_neg_integer(), + total_error_chunks = 0 :: non_neg_integer(), + error_bytes = #{} :: #{atom() => non_neg_integer()}, + error_chunks = #{} :: #{atom() => non_neg_integer()}, + bytes_processed = 0 :: non_neg_integer(), + progress = 0 :: non_neg_integer() +}). + +-endif. diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index 0de0e7be1..360dea491 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -342,7 +342,8 @@ show_help() -> "Useful if you have multiple machines (or replicas) " "and you want to monitor them separately on pool"}, {"rocksdb_flush_interval", "RocksDB flush interval in seconds"}, - {"rocksdb_wal_sync_interval", "RocksDB WAL sync interval in seconds"} + {"rocksdb_wal_sync_interval", "RocksDB WAL sync interval in seconds"}, + {"verify", "Run in verify mode. In verify mode, the node won't join the network and won't compute or request VDF steps. Also no mining or packing."} ] ), erlang:halt(). @@ -373,6 +374,8 @@ read_config_from_file(Path) -> parse_cli_args([], C) -> C; parse_cli_args(["mine" | Rest], C) -> parse_cli_args(Rest, C#config{ mine = true }); +parse_cli_args(["verify" | Rest], C) -> + parse_cli_args(Rest, C#config{ verify = true }); parse_cli_args(["peer", Peer | Rest], C = #config{ peers = Ps }) -> case ar_util:safe_parse_peer(Peer) of {ok, ValidPeer} -> @@ -675,10 +678,11 @@ start(Config) -> timer:sleep(2000), erlang:halt() end, - ok = application:set_env(arweave, config, Config), - filelib:ensure_dir(Config#config.log_dir ++ "/"), + Config2 = ar_config:set_dependent_flags(Config), + ok = application:set_env(arweave, config, Config2), + filelib:ensure_dir(Config2#config.log_dir ++ "/"), warn_if_single_scheduler(), - case Config#config.nonce_limiter_server_trusted_peers of + case Config2#config.nonce_limiter_server_trusted_peers of [] -> VDFSpeed = ar_bench_vdf:run_benchmark(), ?LOG_INFO([{event, vdf_benchmark}, {vdf_s, VDFSpeed / 1000000}]); diff --git a/apps/arweave/src/ar_block_index.erl b/apps/arweave/src/ar_block_index.erl index ce482565f..64286d29f 100644 --- a/apps/arweave/src/ar_block_index.erl +++ b/apps/arweave/src/ar_block_index.erl @@ -1,7 +1,7 @@ -module(ar_block_index). -export([init/1, update/2, member/1, get_list/1, get_list_by_hash/1, get_element_by_height/1, - get_block_bounds/1, get_intersection/2, get_intersection/1, get_range/2]). + get_block_bounds/1, get_intersection/2, get_intersection/1, get_range/2, get_last/0]). %%%=================================================================== %%% Public interface. @@ -96,6 +96,10 @@ get_range(Start, End) -> {error, invalid_start} end. +%% @doc Return the last element in the block index. +get_last() -> + ets:last(block_index). + %%%=================================================================== %%% Private functions. %%%=================================================================== diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index c8fde4c2b..f0e34a4d9 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -1,9 +1,9 @@ -%%% The blob storage optimized for fast reads. +%% The blob storage optimized for fast reads. -module(ar_chunk_storage). -behaviour(gen_server). --export([start_link/2, put/2, put/3, +-export([start_link/2, name/1, is_storage_supported/3, put/2, put/3, open_files/1, get/1, get/2, get/5, read_chunk2/5, get_range/2, get_range/3, close_file/2, close_files/1, cut/2, delete/1, delete/2, list_files/2, run_defragmentation/0]). @@ -36,16 +36,44 @@ start_link(Name, StoreID) -> gen_server:start_link({local, Name}, ?MODULE, StoreID, []). +%% @doc Return the name of the server serving the given StoreID. +name(StoreID) -> + list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)). + +%% @doc Return true if we can accept the chunk for storage. +%% 256 KiB chunks are stored in the blob storage optimized for read speed. +%% Unpacked chunks smaller than 256 KiB cannot be stored here currently, +%% because the module does not keep track of the chunk sizes - all chunks +%% are assumed to be 256 KiB. +-spec is_storage_supported( + Offset :: non_neg_integer(), + ChunkSize :: non_neg_integer(), + Packing :: term() +) -> true | false. + +is_storage_supported(Offset, ChunkSize, Packing) -> + case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of + true -> + %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets + %% so technically can be stored in ar_chunk_storage. However, to avoid + %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB + %% (we do not need fast random access to unpacked chunks after + %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. + Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); + false -> + ChunkSize == (?DATA_CHUNK_SIZE) + end. + %% @doc Store the chunk under the given end offset, -%% bytes Offset - ?DATA_CHUNK_SIIZE, Offset - ?DATA_CHUNK_SIIZE + 1, .., Offset - 1. -put(Offset, Chunk) -> - put(Offset, Chunk, "default"). +%% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. +put(PaddedOffset, Chunk) -> + put(PaddedOffset, Chunk, "default"). %% @doc Store the chunk under the given end offset, -%% bytes Offset - ?DATA_CHUNK_SIIZE, Offset - ?DATA_CHUNK_SIIZE + 1, .., Offset - 1. -put(Offset, Chunk, StoreID) -> +%% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. +put(PaddedOffset, Chunk, StoreID) -> GenServerID = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {put, Offset, Chunk}) of + case catch gen_server:call(GenServerID, {put, PaddedOffset, Chunk}) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -155,9 +183,9 @@ delete(Offset) -> delete(Offset, "default"). %% @doc Remove the chunk with the given end offset. -delete(Offset, StoreID) -> +delete(PaddedOffset, StoreID) -> GenServerID = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), - case catch gen_server:call(GenServerID, {delete, Offset}, 20000) of + case catch gen_server:call(GenServerID, {delete, PaddedOffset}, 20000) of {'EXIT', {timeout, {gen_server, call, _}}} -> {error, timeout}; Reply -> @@ -287,8 +315,8 @@ handle_cast({repack, Start, End, NextCursor, RightBound, Packing}, spawn(fun() -> repack(Start, End, NextCursor, RightBound, Packing, StoreID) end), {noreply, State}; -handle_cast({register_packing_ref, Ref, Offset}, #state{ packing_map = Map } = State) -> - {noreply, State#state{ packing_map = maps:put(Ref, Offset, Map) }}; +handle_cast({register_packing_ref, Ref, Args}, #state{ packing_map = Map } = State) -> + {noreply, State#state{ packing_map = maps:put(Ref, Args, Map) }}; handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) -> {noreply, State#state{ packing_map = maps:remove(Ref, Map) }}; @@ -297,22 +325,22 @@ handle_cast(Cast, State) -> ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), {noreply, State}. -handle_call({put, Offset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> +handle_call({put, PaddedOffset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> #state{ file_index = FileIndex, store_id = StoreID } = State, - case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of + case handle_store_chunk(PaddedOffset, Chunk, FileIndex, StoreID) of {ok, FileIndex2} -> {reply, ok, State#state{ file_index = FileIndex2 }}; Error -> {reply, Error, State} end; -handle_call({delete, Offset}, _From, State) -> +handle_call({delete, PaddedOffset}, _From, State) -> #state{ file_index = FileIndex, store_id = StoreID } = State, - Key = get_key(Offset), + Key = get_key(PaddedOffset), Filepath = filepath(Key, FileIndex, StoreID), - case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of + case ar_sync_record:delete(PaddedOffset, PaddedOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of ok -> - case delete_chunk(Offset, Key, Filepath) of + case delete_chunk(PaddedOffset, Key, Filepath) of ok -> {reply, ok, State}; Error2 -> @@ -343,33 +371,69 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, case maps:get(Ref, Map, not_found) of not_found -> {noreply, State}; - Offset -> + Args -> State2 = State#state{ packing_map = maps:remove(Ref, Map) }, - {Packing, Chunk, _, _, _} = ChunkArgs, - case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE, - ar_data_sync, StoreID) of + {Packing, Chunk, Offset, _, ChunkSize} = ChunkArgs, + StartOffset = Offset - ?DATA_CHUNK_SIZE, + RemoveFromSyncRecordResult = ar_sync_record:delete(Offset, + StartOffset, ar_data_sync, StoreID), + IsStorageSupported = + case RemoveFromSyncRecordResult of + ok -> + is_storage_supported(Offset, ChunkSize, Packing); + Error -> + Error + end, + RemoveFromChunkStorageSyncRecordResult = + case IsStorageSupported of + true -> + store; + false -> + %% Based on the new packing we do not want to + %% store the chunk in the chunk storage anymore so + %% we also remove the record from the + %% chunk-storage specific sync record and + %% send the chunk to the corresponding ar_data_sync + %% module to store it in RocksDB. + ar_sync_record:delete(Offset, StartOffset, + ?MODULE, StoreID); + Error2 -> + Error2 + end, + case RemoveFromChunkStorageSyncRecordResult of ok -> + DataSyncServer = ar_data_sync:name(StoreID), + gen_server:cast(DataSyncServer, + {store_chunk, ChunkArgs, Args}), + {noreply, State2#state{ repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + store -> case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of {ok, FileIndex2} -> ar_sync_record:add_async(repacked_chunk, - Offset, Offset - ?DATA_CHUNK_SIZE, + Offset, StartOffset, Packing, ar_data_sync, StoreID), {noreply, State2#state{ file_index = FileIndex2, - repack_cursor = Offset, prev_repack_cursor = PrevCursor }}; - Error2 -> + repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + Error3 -> + PackingStr = ar_serialize:encode_packing(Packing, true), ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error2])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error3])}]), {noreply, State2} end; - Error3 -> - ?LOG_ERROR([{event, failed_to_remove_repacked_chunk_from_sync_record}, + Error4 -> + PackingStr = ar_serialize:encode_packing(Packing, true), + ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error3])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error4])}]), {noreply, State2} end end; @@ -397,7 +461,7 @@ get_chunk_group_size() -> Config#config.chunk_storage_file_size. read_repack_cursor(StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:read_file(Filepath) of {ok, Bin} -> case catch binary_to_term(Bin) of @@ -411,7 +475,7 @@ read_repack_cursor(StoreID, TargetPacking) -> end. remove_repack_cursor(StoreID) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:delete(Filepath) of ok -> ok; @@ -424,7 +488,7 @@ remove_repack_cursor(StoreID) -> store_repack_cursor(0, _StoreID, _TargetPacking) -> ok; store_repack_cursor(Cursor, StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), file:write_file(Filepath, term_to_binary({Cursor, TargetPacking})). get_filepath(Name, StoreID) -> @@ -437,11 +501,12 @@ get_filepath(Name, StoreID) -> filename:join([DataDir, "storage_modules", StoreID, ?CHUNK_DIR, Name]) end. -handle_store_chunk(Offset, Chunk, FileIndex, StoreID) -> - Key = get_key(Offset), - case store_chunk(Key, Offset, Chunk, FileIndex, StoreID) of +handle_store_chunk(PaddedOffset, Chunk, FileIndex, StoreID) -> + Key = get_key(PaddedOffset), + case store_chunk(Key, PaddedOffset, Chunk, FileIndex, StoreID) of {ok, Filepath} -> - case ar_sync_record:add(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of + case ar_sync_record:add( + PaddedOffset, PaddedOffset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of ok -> ets:insert(chunk_storage_file_index, {{Key, StoreID}, Filepath}), {ok, maps:put(Key, Filepath, FileIndex)}; @@ -456,9 +521,9 @@ get_key(Offset) -> StartOffset = Offset - ?DATA_CHUNK_SIZE, ar_util:floor_int(StartOffset, get_chunk_group_size()). -store_chunk(Key, Offset, Chunk, FileIndex, StoreID) -> +store_chunk(Key, PaddedOffset, Chunk, FileIndex, StoreID) -> Filepath = filepath(Key, FileIndex, StoreID), - store_chunk(Key, Offset, Chunk, Filepath). + store_chunk(Key, PaddedOffset, Chunk, Filepath). filepath(Key, FileIndex, StoreID) -> case maps:get(Key, FileIndex, not_found) of @@ -468,28 +533,28 @@ filepath(Key, FileIndex, StoreID) -> Filepath end. -store_chunk(Key, Offset, Chunk, Filepath) -> +store_chunk(Key, PaddedOffset, Chunk, Filepath) -> case erlang:get({write_handle, Filepath}) of undefined -> case file:open(Filepath, [read, write, raw]) of {error, Reason} = Error -> ?LOG_ERROR([ {event, failed_to_open_chunk_file}, - {offset, Offset}, + {padded_offset, PaddedOffset}, {file, Filepath}, {reason, io_lib:format("~p", [Reason])} ]), Error; {ok, F} -> erlang:put({write_handle, Filepath}, F), - store_chunk2(Key, Offset, Chunk, Filepath, F) + store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) end; F -> - store_chunk2(Key, Offset, Chunk, Filepath, F) + store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) end. -store_chunk2(Key, Offset, Chunk, Filepath, F) -> - StartOffset = Offset - ?DATA_CHUNK_SIZE, +store_chunk2(Key, PaddedOffset, Chunk, Filepath, F) -> + StartOffset = PaddedOffset - ?DATA_CHUNK_SIZE, LeftChunkBorder = ar_util:floor_int(StartOffset, ?DATA_CHUNK_SIZE), ChunkOffset = StartOffset - LeftChunkBorder, RelativeOffset = LeftChunkBorder - Key, @@ -507,7 +572,7 @@ store_chunk2(Key, Offset, Chunk, Filepath, F) -> {error, Reason} = Error -> ?LOG_ERROR([ {event, failed_to_write_chunk}, - {offset, Offset}, + {padded_offset, PaddedOffset}, {file, Filepath}, {position, Position}, {reason, io_lib:format("~p", [Reason])} @@ -518,10 +583,10 @@ store_chunk2(Key, Offset, Chunk, Filepath, F) -> {ok, Filepath} end. -delete_chunk(Offset, Key, Filepath) -> +delete_chunk(PaddedOffset, Key, Filepath) -> case file:open(Filepath, [read, write, raw]) of {ok, F} -> - StartOffset = Offset - ?DATA_CHUNK_SIZE, + StartOffset = PaddedOffset - ?DATA_CHUNK_SIZE, LeftChunkBorder = ar_util:floor_int(StartOffset, ?DATA_CHUNK_SIZE), RelativeOffset = LeftChunkBorder - Key, Position = RelativeOffset + ?OFFSET_SIZE * (RelativeOffset div ?DATA_CHUNK_SIZE), @@ -757,23 +822,26 @@ chunk_offset_list_to_map(ChunkOffsets) -> chunk_offset_list_to_map(ChunkOffsets, infinity, 0, #{}). repack(Cursor, RightBound, Packing, StoreID) -> - case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ?MODULE, StoreID) of + case ar_sync_record:get_next_synced_interval(Cursor, RightBound, + ar_data_sync, StoreID) of not_found -> ar:console("~n~nRepacking of ~s is complete! " "We suggest you stop the node, rename " - "the storage module folder to reflect the new packing, and start the " + "the storage module folder to reflect " + "the new packing, and start the " "node with the new storage module.~n", [StoreID]), ?LOG_INFO([{event, repacking_complete}, {storage_module, StoreID}, - {target_packing, ar_serialize:encode_packing(Packing, true)}]), + {target_packing, + ar_serialize:encode_packing(Packing, true)}]), Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), gen_server:cast(Server, repacking_complete), ok; {End, Start} -> Start2 = max(Cursor, Start), - case ar_sync_record:get_next_synced_interval(Start2, End, Packing, ar_data_sync, - StoreID) of + case ar_sync_record:get_next_synced_interval(Start2, End, + Packing, ar_data_sync, StoreID) of not_found -> repack(Start2, End, End, RightBound, Packing, StoreID); {End3, Start3} when Start3 > Start2 -> @@ -788,12 +856,16 @@ repack(Start, End, NextCursor, RightBound, Packing, StoreID) when Start >= End - repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {ok, Config} = application:get_env(arweave, config), RepackIntervalSize = ?DATA_CHUNK_SIZE * Config#config.repack_batch_size, - Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), + Server = name(StoreID), + Start2 = Start + RepackIntervalSize, + RepackFurtherArgs = {repack, Start2, End, NextCursor, RightBound, + RequiredPacking}, CheckPackingBuffer = case ar_packing_server:is_buffer_full() of true -> ar_util:cast_after(200, Server, - {repack, Start, End, NextCursor, RightBound, RequiredPacking}), + {repack, Start, End, + NextCursor, RightBound, RequiredPacking}), continue; false -> ok @@ -803,104 +875,158 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> continue -> continue; ok -> - case catch get_range(Start, RepackIntervalSize, StoreID) of - [] -> - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - {'EXIT', _Exc} -> - ?LOG_ERROR([{event, failed_to_read_chunk_range}, - {storage_module, StoreID}, - {start, Start}, - {size, RepackIntervalSize}, - {store_id, StoreID}]), - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - Range -> - {ok, Range} - end + repack_read_chunk_range(Start, RepackIntervalSize, + StoreID, RepackFurtherArgs) end, ReadMetadataRange = case ReadRange of continue -> continue; {ok, Range2} -> - {Min, Max, Map} = chunk_offset_list_to_map(Range2), - case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of - {ok, MetadataMap} -> - {ok, Map, MetadataMap}; - {error, Error} -> - ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, - {storage_module, StoreID}, - {error, io_lib:format("~p", [Error])}, - {left, Min}, - {right, Max}]), - Start3 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start3, End, NextCursor, RightBound, - RequiredPacking}), - continue - end + repack_read_chunk_metadata_range(Start, RepackIntervalSize, End, + Range2, StoreID, RepackFurtherArgs) end, case ReadMetadataRange of continue -> ok; {ok, Map2, MetadataMap2} -> - Start4 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start4, End, NextCursor, RightBound, - RequiredPacking}), - maps:fold( - fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) - when ChunkSize /= ?DATA_CHUNK_SIZE, - AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> - ok; - (AbsoluteOffset, {_, TXRoot, _, _, _, ChunkSize}, ok) -> - PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), - case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of - {true, RequiredPacking} -> - ?LOG_WARNING([{event, - repacking_process_chunk_already_repacked}, - {storage_module, StoreID}, - {packing, - ar_serialize:encode_packing(RequiredPacking,true)}, - {offset, AbsoluteOffset}]), - ok; - {true, Packing} -> - case maps:get(PaddedOffset, Map2, not_found) of - not_found -> - ?LOG_WARNING([{event, - chunk_not_found_in_chunk_storage}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; - Chunk -> - Ref = make_ref(), - gen_server:cast(Server, - {register_packing_ref, Ref, PaddedOffset}), - ar_util:cast_after(300000, Server, - {expire_repack_request, Ref}), - ar_packing_server:request_repack(Ref, whereis(Server), - {RequiredPacking, Packing, Chunk, - AbsoluteOffset, TXRoot, ChunkSize}), - ok - end; - true -> - ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; + gen_server:cast(Server, {repack, Start2, End, NextCursor, + RightBound, RequiredPacking}), + Args = {StoreID, RequiredPacking, Map2}, + repack_send_chunks_for_repacking(MetadataMap2, Args) + end. + +repack_read_chunk_range(Start, Size, StoreID, RepackFurtherArgs) -> + Server = name(StoreID), + case catch get_range(Start, Size, StoreID) of + [] -> + gen_server:cast(Server, RepackFurtherArgs), + continue; + {'EXIT', _Exc} -> + ?LOG_ERROR([{event, failed_to_read_chunk_range}, + {storage_module, StoreID}, + {start, Start}, + {size, Size}]), + gen_server:cast(Server, RepackFurtherArgs), + continue; + Range -> + {ok, Range} + end. + +repack_read_chunk_metadata_range(Start, Size, End, + Range, StoreID, RepackFurtherArgs) -> + Server = name(StoreID), + End2 = min(Start + Size, End), + {_, _, Map} = chunk_offset_list_to_map(Range), + case ar_data_sync:get_chunk_metadata_range(Start, End2, StoreID) of + {ok, MetadataMap} -> + {ok, Map, MetadataMap}; + {error, Error} -> + ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, + {storage_module, StoreID}, + {error, io_lib:format("~p", [Error])}]), + gen_server:cast(Server, RepackFurtherArgs), + continue + end. + +repack_send_chunks_for_repacking(MetadataMap, Args) -> + maps:fold(repack_send_chunks_for_repacking(Args), ok, MetadataMap). + +repack_send_chunks_for_repacking(Args) -> + fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) + when ChunkSize /= ?DATA_CHUNK_SIZE, + AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> + ok; + (AbsoluteOffset, ChunkMeta, ok) -> + repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) + end. + +repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) -> + {StoreID, RequiredPacking, ChunkMap} = Args, + Server = name(StoreID), + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + {ChunkDataKey, TXRoot, DataRoot, TXPath, + RelativeOffset, ChunkSize} = ChunkMeta, + case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of + {true, RequiredPacking} -> + ?LOG_WARNING([{event, repacking_process_chunk_already_repacked}, + {storage_module, StoreID}, + {packing, + ar_serialize:encode_packing(RequiredPacking, true)}, + {offset, AbsoluteOffset}]), + ok; + {true, Packing} -> + ChunkMaybeDataPath = + case maps:get(PaddedOffset, ChunkMap, not_found) of + not_found -> + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, no_chunk); + Chunk3 -> + case is_storage_supported(AbsoluteOffset, + ChunkSize, Packing) of false -> - ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok + %% We are going to move this chunk to + %% RocksDB after repacking so we read + %% its DataPath here to pass it later on + %% to store_chunk. + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, Chunk3); + true -> + %% We are going to repack the chunk and keep it + %% in the chunk storage - no need to make an + %% extra disk access to read the data path. + {Chunk3, none} end end, - ok, - MetadataMap2 - ) + case ChunkMaybeDataPath of + not_found -> + ok; + {Chunk, MaybeDataPath} -> + Ref = make_ref(), + RepackArgs = {Packing, MaybeDataPath, RelativeOffset, + DataRoot, TXPath, none, none}, + gen_server:cast(Server, + {register_packing_ref, Ref, RepackArgs}), + ar_util:cast_after(300000, Server, + {expire_repack_request, Ref}), + ar_packing_server:request_repack(Ref, whereis(Server), + {RequiredPacking, Packing, Chunk, + AbsoluteOffset, TXRoot, ChunkSize}) + end; + true -> + ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok; + false -> + ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok + end. + +repack_read_chunk_and_data_path(StoreID, ChunkDataKey, AbsoluteOffset, + MaybeChunk) -> + case ar_kv:get({chunk_data_db, StoreID}, ChunkDataKey) of + not_found -> + ?LOG_WARNING([{event, chunk_not_found}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found; + {ok, V} -> + case binary_to_term(V) of + {Chunk, DataPath} -> + {Chunk, DataPath}; + DataPath when MaybeChunk /= no_chunk -> + {MaybeChunk, DataPath}; + _ -> + ?LOG_WARNING([{event, chunk_not_found2}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found + end end. chunk_offset_list_to_map([], Min, Max, Map) -> diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index 37ab4fcb7..116ef35a2 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -1,8 +1,8 @@ -module(ar_config). --export([validate_config/1, use_remote_vdf_server/0, pull_from_remote_vdf_server/0, - compute_own_vdf/0, is_vdf_server/0, is_public_vdf_server/0, - parse/1, parse_storage_module/1, log_config/1]). +-export([validate_config/1, set_dependent_flags/1, use_remote_vdf_server/0, + pull_from_remote_vdf_server/0, compute_own_vdf/0, is_vdf_server/0, + is_public_vdf_server/0, parse/1, parse_storage_module/1, log_config/1]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). @@ -13,11 +13,20 @@ %%% Public interface. %%%=================================================================== +-spec validate_config(Config :: #config{}) -> boolean(). validate_config(Config) -> validate_init(Config) andalso + validate_storage_modules(Config) andalso validate_repack_in_place(Config) andalso validate_cm_pool(Config) andalso - validate_storage_modules(Config). + validate_packing_difficulty(Config) andalso + validate_verify(Config). + +-spec set_dependent_flags(Config :: #config{}) -> #config{}. +%% @doc Some flags force other flags to be set. +set_dependent_flags(Config) -> + Config2 = set_verify_flags(Config), + Config2. use_remote_vdf_server() -> {ok, Config} = application:get_env(arweave, config), @@ -167,6 +176,13 @@ parse_options([{<<"mine">>, false} | Rest], Config) -> parse_options([{<<"mine">>, Opt} | _], _) -> {error, {bad_type, mine, boolean}, Opt}; +parse_options([{<<"verify">>, true} | Rest], Config) -> + parse_options(Rest, Config#config{ verify = true }); +parse_options([{<<"verify">>, false} | Rest], Config) -> + parse_options(Rest, Config); +parse_options([{<<"verify">>, Opt} | _], _) -> + {error, {bad_type, verify, boolean}, Opt}; + parse_options([{<<"port">>, Port} | Rest], Config) when is_integer(Port) -> parse_options(Rest, Config#config{ port = Port }); parse_options([{<<"port">>, Port} | _], _) -> @@ -884,6 +900,15 @@ validate_init(Config) -> false -> true end. + +validate_storage_modules(#config{ storage_modules = StorageModules }) -> + case length(StorageModules) =:= length(lists:usort(StorageModules)) of + true -> + true; + false -> + io:format("~nDuplicate value detected in the storage_modules option.~n~n"), + false + end. validate_repack_in_place(Config) -> Modules = [ar_storage_module:id(M) || M <- Config#config.storage_modules], validate_repack_in_place(Config#config.repack_in_place_storage_modules, Modules). @@ -927,9 +952,9 @@ validate_cm_pool(Config) -> end, A andalso B andalso C. -validate_storage_modules(#config{ mine = false }) -> +validate_packing_difficulty(#config{ mine = false }) -> true; -validate_storage_modules(Config) -> +validate_packing_difficulty(Config) -> MiningAddr = Config#config.mining_addr, UniquePackingDifficulties = lists:foldl( fun({_, _, {composite, Addr, Difficulty}}, Acc) when Addr =:= MiningAddr -> @@ -950,3 +975,46 @@ validate_storage_modules(Config) -> "for the same mining address.~n~n"), false end. + + +validate_verify(#config{ verify = false }) -> + true; +validate_verify(#config{ verify = true, mine = true }) -> + io:format("~nThe verify flag cannot be set together with the mine flag.~n~n"), + false; +validate_verify(#config{ verify = true, + repack_in_place_storage_modules = RepackInPlaceStorageModules }) + when RepackInPlaceStorageModules =/= [] -> + io:format("~nThe verify flag cannot be set together with the repack_in_place flag.~n~n"), + false; +validate_verify(_Config) -> + true. + +disable_vdf(Config) -> + RemovePublicVDFServer = + lists:filter(fun(Item) -> Item =/= public_vdf_server end, Config#config.enable), + Config#config{ + nonce_limiter_client_peers = [], + nonce_limiter_server_trusted_peers = [], + enable = RemovePublicVDFServer, + disable = [compute_own_vdf | Config#config.disable] + }. + +set_verify_flags(#config{ verify = false } = Config) -> + Config; +set_verify_flags(Config) -> + io:format("~n~nWARNING: The verify flag is set. Forcing the following options:"), + io:format("~n - auto_join = false"), + io:format("~n - start_from_latest_state = true"), + io:format("~n - sync_jobs = 0"), + io:format("~n - block_pollers = 0"), + io:format("~n - header_sync_jobs = 0"), + io:format("~n - all VDF features disabled"), + Config2 = disable_vdf(Config), + Config2#config{ + auto_join = false, + start_from_latest_state = true, + sync_jobs = 0, + block_pollers = 0, + header_sync_jobs = 0 + }. diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 252be3a90..c6d1c6983 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -3,14 +3,16 @@ -behaviour(gen_server). -export([name/1, start_link/2, join/1, add_tip_block/2, add_block/2, - is_chunk_proof_ratio_attractive/3, + invalidate_bad_data_record/4, is_chunk_proof_ratio_attractive/3, add_chunk/5, add_data_root_to_disk_pool/3, maybe_drop_data_root_from_disk_pool/3, get_chunk/2, get_chunk_proof/2, get_tx_data/1, get_tx_data/2, get_tx_offset/1, get_tx_offset_data_in_range/2, has_data_root/2, request_tx_data_removal/3, request_data_removal/4, record_disk_pool_chunks_count/0, record_chunk_cache_size_metric/0, is_chunk_cache_full/0, is_disk_space_sufficient/1, - get_chunk_by_byte/2, read_chunk/4, decrement_chunk_cache_size/0, - increment_chunk_cache_size/0, get_chunk_padded_offset/1, get_chunk_metadata_range/3]). + get_chunk_by_byte/2, get_chunk_seek_offset/1, read_chunk/4, read_data_path/2, + increment_chunk_cache_size/0, decrement_chunk_cache_size/0, + get_chunk_padded_offset/1, get_chunk_metadata_range/3, + get_merkle_rebase_threshold/0]). -export([debug_get_disk_pool_chunks/0]). @@ -20,6 +22,7 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_config.hrl"). +-include_lib("arweave/include/ar_poa.hrl"). -include_lib("arweave/include/ar_data_discovery.hrl"). -include_lib("arweave/include/ar_data_sync.hrl"). -include_lib("arweave/include/ar_sync_buckets.hrl"). @@ -48,6 +51,10 @@ join(RecentBI) -> add_tip_block(BlockTXPairs, RecentBI) -> gen_server:cast(ar_data_sync_default, {add_tip_block, BlockTXPairs, RecentBI}). +invalidate_bad_data_record(Start, End, StoreID, Case) -> + gen_server:cast(name(StoreID), {invalidate_bad_data_record, + {Start, End, StoreID, Case}}). + %% @doc The condition which is true if the chunk is too small compared to the proof. %% Small chunks make syncing slower and increase space amplification. A small chunk %% is accepted if it is the last chunk of the corresponding transaction - such chunks @@ -542,6 +549,8 @@ read_chunk(Offset, ChunkDataDB, ChunkDataKey, StoreID) -> Error end. +read_data_path(ChunkDataDB, ChunkDataKey) -> + read_data_path(undefined, ChunkDataDB, ChunkDataKey, undefined). %% The first and last arguments are introduced to match the read_chunk/4 signature. read_data_path(_Offset, ChunkDataDB, ChunkDataKey, _StoreID) -> case ar_kv:get(ChunkDataDB, ChunkDataKey) of @@ -721,7 +730,7 @@ handle_cast({join, RecentBI}, State) -> PreviousWeaveSize = element(2, hd(CurrentBI)), {ok, OrphanedDataRoots} = remove_orphaned_data(State, Offset, PreviousWeaveSize), {ok, Config} = application:get_env(arweave, config), - [gen_server:cast(list_to_atom("ar_data_sync_" ++ ar_storage_module:label(Module)), + [gen_server:cast(name(ar_storage_module:id(Module)), {cut, Offset}) || Module <- Config#config.storage_modules], ok = ar_chunk_storage:cut(Offset, StoreID), ok = ar_sync_record:cut(Offset, ?MODULE, StoreID), @@ -858,6 +867,16 @@ handle_cast({pack_and_store_chunk, Args} = Cast, {noreply, State} end; +handle_cast({store_chunk, ChunkArgs, Args} = Cast, + #sync_data_state{ store_id = StoreID } = State) -> + case is_disk_space_sufficient(StoreID) of + true -> + {noreply, store_chunk(ChunkArgs, Args, State)}; + _ -> + ar_util:cast_after(30000, self(), Cast), + {noreply, State} + end; + %% Schedule syncing of the unsynced intervals. Choose a peer for each of the intervals. %% There are two message payloads: %% 1. collect_peer_intervals @@ -1584,7 +1603,7 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, IsMinerRequ {expected_chunk_id, ar_util:encode(ChunkID)}, {chunk_id, ar_util:encode(ComputedChunkID)}]), invalidate_bad_data_record({AbsoluteOffset - ChunkSize, - AbsoluteOffset, {chunks_index, StoreID}, StoreID, 4}), + AbsoluteOffset, StoreID, 4}), {error, chunk_not_found} end end @@ -1668,8 +1687,7 @@ read_chunk_with_metadata( false -> ok end, - invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, - {chunks_index, StoreID}, StoreID, 1}), + invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID, 1}), {error, chunk_not_found}; {error, Error} -> ?LOG_ERROR([{event, failed_to_read_chunk}, @@ -1706,7 +1724,7 @@ read_chunk_with_metadata( end end. -invalidate_bad_data_record({Start, End, ChunksIndex, StoreID, Case}) -> +invalidate_bad_data_record({Start, End, StoreID, Case}) -> [{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold), case End > T of true -> @@ -1726,7 +1744,8 @@ invalidate_bad_data_record({Start, End, ChunksIndex, StoreID, Case}) -> {range_start, PaddedStart2}, {range_end, PaddedEnd}]), case ar_sync_record:delete(PaddedEnd, PaddedStart2, ?MODULE, StoreID) of ok -> - case ar_kv:delete(ChunksIndex, << End:?OFFSET_KEY_BITSIZE >>) of + ar_sync_record:add(PaddedEnd, PaddedStart2, invalid_chunks, StoreID), + case ar_kv:delete({chunks_index, StoreID}, << End:?OFFSET_KEY_BITSIZE >>) of ok -> ok; Error2 -> @@ -1761,10 +1780,9 @@ validate_fetched_chunk(Args) -> {BlockStart, BlockEnd, TXRoot} -> ValidateDataPathRuleset = ar_poa:get_data_path_validation_ruleset( BlockStart, get_merkle_rebase_threshold()), - BlockSize = BlockEnd - BlockStart, ChunkOffset = Offset - BlockStart - 1, - case validate_proof2({TXRoot, ChunkOffset, BlockSize, DataPath, TXPath, - ChunkSize, ValidateDataPathRuleset, IsMinerRequest}) of + case validate_proof2(TXRoot, TXPath, DataPath, BlockStart, BlockEnd, + ChunkOffset, ValidateDataPathRuleset, ChunkSize, IsMinerRequest) of {true, ChunkID} -> {true, ChunkID}; false -> @@ -1778,8 +1796,7 @@ validate_fetched_chunk(Args) -> ok end, StartOffset = Offset - ChunkSize, - invalidate_bad_data_record({StartOffset, Offset, - {chunks_index, StoreID}, StoreID, 2}), + invalidate_bad_data_record({StartOffset, Offset, StoreID, 2}), false end; {_BlockStart, _BlockEnd, TXRoot2} -> @@ -1789,8 +1806,7 @@ validate_fetched_chunk(Args) -> {tx_root, ar_util:encode(TXRoot2)}, {stored_tx_root, ar_util:encode(TXRoot)}, {store_id, StoreID}]), - invalidate_bad_data_record({Offset - ChunkSize, Offset, - {chunks_index, StoreID}, StoreID, 3}), + invalidate_bad_data_record({Offset - ChunkSize, Offset, StoreID, 3}), false end end. @@ -1922,10 +1938,8 @@ remove_range(Start, End, Ref, ReplyTo) -> RefL = [make_ref() || _ <- StoreIDs], PID = spawn(fun() -> ReplyFun(ReplyFun, sets:from_list(RefL)) end), lists:foreach( - fun({StorageID, R}) -> - GenServerID = list_to_atom("ar_data_sync_" - ++ ar_storage_module:label_by_id(StorageID)), - gen_server:cast(GenServerID, {remove_range, End, Start + 1, R, PID}) + fun({StoreID, R}) -> + gen_server:cast(name(StoreID), {remove_range, End, Start + 1, R, PID}) end, lists:zip(StoreIDs, RefL) ). @@ -2458,6 +2472,12 @@ store_sync_state(#sync_data_state{ store_id = "default" } = State) -> store_sync_state(_State) -> ok. +%% @doc Look to StoreID to find data that TargetStoreID is missing. +%% Args: +%% TargetStoreID - The ID of the storage module to sync to (this module is missing data) +%% StoreID - The ID of the storage module to sync from (this module might have the data) +%% RangeStart - The start offset of the range to check +%% RangeEnd - The end offset of the range to check get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeStart, RangeEnd) -> get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeStart, @@ -2545,78 +2565,93 @@ enqueue_peer_range(Peer, RangeStart, RangeEnd, ChunkOffsets, {Q, QIntervals}) -> validate_proof(TXRoot, BlockStartOffset, Offset, BlockSize, Proof, ValidateDataPathRuleset) -> #{ data_path := DataPath, tx_path := TXPath, chunk := Chunk, packing := Packing } = Proof, - case ar_merkle:validate_path(TXRoot, Offset, BlockSize, TXPath) of - false -> + + BlockEndOffset = BlockStartOffset + BlockSize, + case ar_poa:validate_paths(TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, Offset, ValidateDataPathRuleset) of + {false, _} -> false; - {DataRoot, TXStartOffset, TXEndOffset} -> + {true, ChunkProof} -> + #chunk_proof{ + data_root = DataRoot, + absolute_offset = AbsoluteOffset, + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + tx_start_offset = TXStartOffset, + tx_end_offset = TXEndOffset + } = ChunkProof, TXSize = TXEndOffset - TXStartOffset, - ChunkOffset = Offset - TXStartOffset, - case ar_merkle:validate_path(DataRoot, ChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of - false -> - false; - {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - AbsoluteEndOffset = BlockStartOffset + TXStartOffset + ChunkEndOffset, - ChunkSize = ChunkEndOffset - ChunkStartOffset, - case Packing of - unpacked -> - case ar_tx:generate_chunk_id(Chunk) == ChunkID of - false -> - false; + ChunkSize = ChunkEndOffset - ChunkStartOffset, + AbsoluteEndOffset = AbsoluteOffset + ChunkSize, + case Packing of + unpacked -> + case ar_tx:generate_chunk_id(Chunk) == ChunkID of + false -> + false; + true -> + case ChunkSize == byte_size(Chunk) of true -> - case ChunkSize == byte_size(Chunk) of - true -> - {true, DataRoot, TXStartOffset, ChunkEndOffset, - TXSize, ChunkSize, ChunkID}; - false -> - false - end - end; - _ -> - ChunkArgs = {Packing, Chunk, AbsoluteEndOffset, TXRoot, ChunkSize}, - Args = {Packing, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, - ChunkID}, - {need_unpacking, AbsoluteEndOffset, ChunkArgs, Args} - end + {true, DataRoot, TXStartOffset, ChunkEndOffset, + TXSize, ChunkSize, ChunkID}; + false -> + false + end + end; + _ -> + ChunkArgs = {Packing, Chunk, AbsoluteEndOffset, TXRoot, ChunkSize}, + Args = {Packing, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, + ChunkID}, + {need_unpacking, AbsoluteEndOffset, ChunkArgs, Args} end end. -validate_proof2(Args) -> - {TXRoot, Offset, BlockSize, DataPath, TXPath, ChunkSize, - ValidateDataPathRuleset, IsMinerRequest} = Args, - case ar_merkle:validate_path(TXRoot, Offset, BlockSize, TXPath) of - false -> - case IsMinerRequest of - true -> - ?LOG_ERROR([{event, failed_to_validate_tx_path}, - {tags, [solution_proofs]}]); +validate_proof2( + TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, BlockRelativeOffset, ValidateDataPathRuleset, + ExpectedChunkSize, IsMinerRequest) -> + {IsValid, ChunkProof} = ar_poa:validate_paths( + TXRoot, TXPath, DataPath, BlockStartOffset, + BlockEndOffset, BlockRelativeOffset, ValidateDataPathRuleset), + case IsValid of + true -> + #chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset + } = ChunkProof, + case ChunkEndOffset - ChunkStartOffset == ExpectedChunkSize of false -> - ok - end, - false; - {DataRoot, TXStartOffset, TXEndOffset} -> - TXSize = TXEndOffset - TXStartOffset, - ChunkOffset = Offset - TXStartOffset, - case ar_merkle:validate_path(DataRoot, ChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of - {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - case ChunkEndOffset - ChunkStartOffset == ChunkSize of + case IsMinerRequest of + true -> + ?LOG_ERROR([{event, failed_to_validate_data_path_offset}, + {tags, [solution_proofs]}, + {chunk_end_offset, ChunkEndOffset}, + {chunk_start_offset, ChunkStartOffset}, + {chunk_size, ExpectedChunkSize}]); false -> - case IsMinerRequest of - true -> - ?LOG_ERROR([{event, failed_to_validate_data_path_offset}, - {tags, [solution_proofs]}, - {chunk_end_offset, ChunkEndOffset}, - {chunk_start_offset, ChunkStartOffset}, - {chunk_size, ChunkSize}]); - false -> - ok - end, - false; + ok + end, + false; + true -> + {true, ChunkID} + end; + false -> + #chunk_proof{ + tx_path_is_valid = TXPathIsValid, + data_path_is_valid = DataPathIsValid + } = ChunkProof, + case {TXPathIsValid, DataPathIsValid} of + {invalid, _} -> + case IsMinerRequest of true -> - {true, ChunkID} - end; - _ -> + ?LOG_ERROR([{event, failed_to_validate_tx_path}, + {tags, [solution_proofs]}]); + false -> + ok + end, + false; + {_, invalid} -> case IsMinerRequest of true -> ?LOG_ERROR([{event, failed_to_validate_data_path}, @@ -2711,7 +2746,7 @@ write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> #sync_data_state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, - ShouldStoreInChunkStorage = should_store_in_chunk_storage(Offset, ChunkSize, Packing), + ShouldStoreInChunkStorage = ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing), Result = case ShouldStoreInChunkStorage of true -> @@ -2732,21 +2767,6 @@ write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Pa Result end. -%% @doc 256 KiB chunks are stored in the blob storage optimized for read speed. -%% Return true if we want to place the chunk there. -should_store_in_chunk_storage(Offset, ChunkSize, Packing) -> - case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of - true -> - %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets - %% so technically can be stored in ar_chunk_storage. However, to avoid - %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB - %% (we do not need fast random access to unpacked chunks after - %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. - Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); - false -> - ChunkSize == (?DATA_CHUNK_SIZE) - end. - update_chunks_index(Args, State) -> AbsoluteChunkOffset = element(1, Args), case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of @@ -3357,9 +3377,7 @@ process_disk_pool_matured_chunk_offset(Iterator, TXRoot, TXPath, AbsoluteOffset, increment_chunk_cache_size(), Args2 = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, unpacked, Offset, ChunkSize, Chunk, Chunk, none, none}, - Label = ar_storage_module:label_by_id(StoreID6), - gen_server:cast(list_to_atom("ar_data_sync_" ++ Label), - {pack_and_store_chunk, Args2}), + gen_server:cast(name(StoreID6), {pack_and_store_chunk, Args2}), gen_server:cast(self(), {process_disk_pool_chunk_offsets, Iterator, false, Args}), {noreply, cache_recently_processed_offset(AbsoluteOffset, ChunkDataKey, diff --git a/apps/arweave/src/ar_data_sync_worker.erl b/apps/arweave/src/ar_data_sync_worker.erl index e28a8ad5f..1302d4b8a 100644 --- a/apps/arweave/src/ar_data_sync_worker.erl +++ b/apps/arweave/src/ar_data_sync_worker.erl @@ -144,10 +144,8 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID, SkipSm read_range2(MessagesRemaining, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, SkipSmall}); not_found -> - Label = ar_storage_module:label_by_id(OriginStoreID), - gen_server:cast(list_to_atom("ar_data_sync_" ++ Label), - {invalidate_bad_data_record, {Start, AbsoluteOffset, ChunksIndex, - OriginStoreID, 1}}), + ar_data_sync:invalidate_bad_data_record( + Start, AbsoluteOffset, OriginStoreID, 1), read_range2(MessagesRemaining-1, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, SkipSmall}); {error, Error} -> @@ -172,8 +170,7 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID, SkipSm Args = {DataRoot, AbsoluteOffset, TXPath, TXRoot, DataPath, Packing, RelativeOffset, ChunkSize, Chunk, UnpackedChunk, TargetStoreID, ChunkDataKey}, - gen_server:cast(list_to_atom("ar_data_sync_" - ++ ar_storage_module:label_by_id(TargetStoreID)), + gen_server:cast(ar_data_sync:name(TargetStoreID), {pack_and_store_chunk, Args}), read_range2(MessagesRemaining-1, {Start + ChunkSize, End, OriginStoreID, TargetStoreID, diff --git a/apps/arweave/src/ar_mining_stats.erl b/apps/arweave/src/ar_mining_stats.erl index 11339a32c..a698b62a5 100644 --- a/apps/arweave/src/ar_mining_stats.erl +++ b/apps/arweave/src/ar_mining_stats.erl @@ -338,9 +338,11 @@ get_packing() -> undefined; MultiplePackings -> % More than one unique packing found - ?LOG_ERROR([ + ?LOG_WARNING([ {event, get_packing_failed}, {reason, multiple_unique_packings}, - {unique_packings, [format_packing(Packing) || Packing <- MultiplePackings]} + {unique_packings, + string:join( + [format_packing(Packing) || Packing <- MultiplePackings], ", ")} ]), undefined end. diff --git a/apps/arweave/src/ar_peers.erl b/apps/arweave/src/ar_peers.erl index 6b6341478..fa643d43e 100644 --- a/apps/arweave/src/ar_peers.erl +++ b/apps/arweave/src/ar_peers.erl @@ -375,13 +375,20 @@ resolve_and_cache_peer(RawPeer, Type) -> %%%=================================================================== init([]) -> - %% Trap exit to avoid corrupting any open files on quit. - process_flag(trap_exit, true), - ok = ar_events:subscribe(block), - load_peers(), - gen_server:cast(?MODULE, rank_peers), - gen_server:cast(?MODULE, ping_peers), - timer:apply_interval(?GET_MORE_PEERS_FREQUENCY_MS, ?MODULE, discover_peers, []), + {ok, Config} = application:get_env(arweave, config), + case Config#config.verify of + true -> + ok; + false -> + %% Trap exit to avoid corrupting any open files on quit. + process_flag(trap_exit, true), + ok = ar_events:subscribe(block), + load_peers(), + gen_server:cast(?MODULE, rank_peers), + gen_server:cast(?MODULE, ping_peers), + timer:apply_interval(?GET_MORE_PEERS_FREQUENCY_MS, ?MODULE, discover_peers, []) + end, + {ok, #state{}}. handle_call(Request, _From, State) -> diff --git a/apps/arweave/src/ar_poa.erl b/apps/arweave/src/ar_poa.erl index 6bb306d80..fe71c192e 100644 --- a/apps/arweave/src/ar_poa.erl +++ b/apps/arweave/src/ar_poa.erl @@ -3,8 +3,10 @@ -module(ar_poa). -export([get_data_path_validation_ruleset/2, get_data_path_validation_ruleset/3, - validate_pre_fork_2_5/4, validate/1, get_padded_offset/2]). + validate_pre_fork_2_5/4, validate/1, validate_paths/4, validate_paths/7, + get_padded_offset/2]). +-include_lib("arweave/include/ar_poa.hrl"). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_pricing.hrl"). @@ -51,34 +53,123 @@ validate(Args) -> {BlockStartOffset, RecallOffset, TXRoot, BlockSize, SPoA, Packing, SubChunkIndex, ExpectedChunkID} = Args, #poa{ chunk = Chunk, unpacked_chunk = UnpackedChunk } = SPoA, - TXPath = SPoA#poa.tx_path, - RecallBucketOffset = get_recall_bucket_offset(RecallOffset, BlockStartOffset), + + case validate_paths(SPoA, TXRoot, RecallOffset, BlockStartOffset, BlockSize) of + {false, _} -> + false; + {true, ChunkProof} -> + #chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + tx_start_offset = TXStartOffset + } = ChunkProof, + case ExpectedChunkID of + not_set -> + validate2(Packing, {ChunkID, ChunkStartOffset, + ChunkEndOffset, BlockStartOffset, TXStartOffset, + TXRoot, Chunk, UnpackedChunk, SubChunkIndex}); + _ -> + case ChunkID == ExpectedChunkID of + false -> + false; + true -> + {true, ChunkID} + end + end + end. + +%% @doc Validate the TXPath and DataPath for a chunk. This will return the ChunkID but won't +%% validate that the ChunkID is correct. +%% +%% SPoA: the proof of access +%% RecallOffset: the absoluteoffset of the recall byte - + validate_paths(#poa{} = SPoA, TXRoot, RecallOffset, BlockStartOffset, BlockSize) -> + BlockRelativeOffset = get_recall_bucket_offset(RecallOffset, BlockStartOffset), ValidateDataPathRuleset = get_data_path_validation_ruleset(BlockStartOffset), - case ar_merkle:validate_path(TXRoot, RecallBucketOffset, BlockSize, TXPath) of + + Proof = #chunk_proof{ + absolute_offset = BlockStartOffset + BlockRelativeOffset, + tx_root = TXRoot, + tx_path = SPoA#poa.tx_path, + data_path = SPoA#poa.data_path, + block_start_offset = BlockStartOffset, + block_end_offset = BlockStartOffset + BlockSize, + validate_data_path_ruleset = ValidateDataPathRuleset + }, + validate_paths(Proof). + +%% @doc Validate the TXPath and DataPath for a chunk. This will return the ChunkID but won't +%% validate that the ChunkID is correct. +%% +%% AbsoluteOffset: the end offset of the chunk - indexed to the beginning of the weave +validate_paths(TXRoot, TXPath, DataPath, AbsoluteOffset) -> + {BlockStartOffset, BlockEndOffset, TXRoot} = + ar_block_index:get_block_bounds(AbsoluteOffset), + + Proof = #chunk_proof{ + absolute_offset = AbsoluteOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = get_data_path_validation_ruleset(BlockStartOffset) + }, + validate_paths(Proof). + +validate_paths( + TXRoot, TXPath, DataPath, BlockStartOffset, BlockEndOffset, BlockRelativeOffset, + ValidateDataPathRuleset) -> + Proof = #chunk_proof{ + absolute_offset = BlockStartOffset + BlockRelativeOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = ValidateDataPathRuleset + }, + validate_paths(Proof). + +validate_paths(Proof) -> + #chunk_proof{ + absolute_offset = AbsoluteOffset, + tx_root = TXRoot, + tx_path = TXPath, + data_path = DataPath, + block_start_offset = BlockStartOffset, + block_end_offset = BlockEndOffset, + validate_data_path_ruleset = ValidateDataPathRuleset + } = Proof, + + BlockRelativeOffset = AbsoluteOffset - BlockStartOffset, + BlockSize = BlockEndOffset - BlockStartOffset, + + case ar_merkle:validate_path(TXRoot, BlockRelativeOffset, BlockSize, TXPath) of false -> - false; + {false, Proof#chunk_proof{ tx_path_is_valid = invalid }}; {DataRoot, TXStartOffset, TXEndOffset} -> + Proof2 = Proof#chunk_proof{ + data_root = DataRoot, + tx_start_offset = TXStartOffset, + tx_end_offset = TXEndOffset, + tx_path_is_valid = valid + }, TXSize = TXEndOffset - TXStartOffset, - RecallChunkOffset = RecallBucketOffset - TXStartOffset, - DataPath = SPoA#poa.data_path, - case ar_merkle:validate_path(DataRoot, RecallChunkOffset, TXSize, DataPath, - ValidateDataPathRuleset) of + TXRelativeOffset = BlockRelativeOffset - TXStartOffset, + case ar_merkle:validate_path( + DataRoot, TXRelativeOffset, TXSize, DataPath, ValidateDataPathRuleset) of false -> - false; + {false, Proof2#chunk_proof{ data_path_is_valid = invalid }}; {ChunkID, ChunkStartOffset, ChunkEndOffset} -> - case ExpectedChunkID of - not_set -> - validate2(Packing, {ChunkID, ChunkStartOffset, - ChunkEndOffset, BlockStartOffset, TXStartOffset, - TXRoot, Chunk, UnpackedChunk, SubChunkIndex}); - _ -> - case ChunkID == ExpectedChunkID of - false -> - false; - true -> - {true, ChunkID} - end - end + Proof3 = Proof2#chunk_proof{ + chunk_id = ChunkID, + chunk_start_offset = ChunkStartOffset, + chunk_end_offset = ChunkEndOffset, + data_path_is_valid = valid + }, + {true, Proof3} end end. diff --git a/apps/arweave/src/ar_sup.erl b/apps/arweave/src/ar_sup.erl index cb9279b1c..a6608e82f 100644 --- a/apps/arweave/src/ar_sup.erl +++ b/apps/arweave/src/ar_sup.erl @@ -79,6 +79,7 @@ init([]) -> ?CHILD(ar_header_sync, worker), ?CHILD_SUP(ar_data_sync_sup, supervisor), ?CHILD_SUP(ar_chunk_storage_sup, supervisor), + ?CHILD_SUP(ar_verify_chunks_sup, supervisor), ?CHILD(ar_global_sync_record, worker), ?CHILD_SUP(ar_nonce_limiter_server_sup, supervisor), ?CHILD(ar_nonce_limiter, worker), diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl new file mode 100644 index 000000000..66184f863 --- /dev/null +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -0,0 +1,544 @@ +-module(ar_verify_chunks). + +-behaviour(gen_server). + +-export([start_link/2, name/1]). +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_consensus.hrl"). +-include_lib("arweave/include/ar_verify_chunks.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + store_id :: binary(), + packing :: binary(), + start_offset :: non_neg_integer(), + end_offset :: non_neg_integer(), + cursor :: non_neg_integer(), + ready = false :: boolean(), + verify_report = #verify_report{} :: #verify_report{} +}). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +%% @doc Start the server. +start_link(Name, StorageModule) -> + gen_server:start_link({local, Name}, ?MODULE, StorageModule, []). + +-spec name(binary()) -> atom(). +name(StoreID) -> + list_to_atom("ar_verify_chunks_" ++ ar_storage_module:label_by_id(StoreID)). + +%%%=================================================================== +%%% Generic server callbacks. +%%%=================================================================== + +init(StoreID) -> + ?LOG_INFO([{event, verify_chunk_storage_started}, {store_id, StoreID}]), + {StartOffset, EndOffset} = ar_storage_module:get_range(StoreID), + gen_server:cast(self(), verify), + {ok, #state{ + store_id = StoreID, + packing = ar_storage_module:get_packing(StoreID), + start_offset = StartOffset, + end_offset = EndOffset, + cursor = StartOffset, + ready = is_ready(EndOffset), + verify_report = #verify_report{ + start_time = erlang:system_time(millisecond) + } + }}. + +handle_cast(verify, #state{ready = false, end_offset = EndOffset} = State) -> + ar_util:cast_after(1000, self(), verify), + {noreply, State#state{ready = is_ready(EndOffset)}}; +handle_cast(verify, + #state{cursor = Cursor, end_offset = EndOffset} = State) when Cursor >= EndOffset -> + ar:console("Done!~n"), + {noreply, State}; +handle_cast(verify, State) -> + State2 = verify(State), + State3 = report_progress(State2), + {noreply, State3}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Call, From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]), + {reply, ok, State}. + +handle_info(Info, State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +is_ready(EndOffset) -> + case ar_block_index:get_last() of + '$end_of_table' -> + false; + {WeaveSize, _Height, _H, _TXRoot} -> + WeaveSize >= EndOffset + end. + +verify(State) -> + #state{store_id = StoreID} = State, + {UnionInterval, Intervals} = query_intervals(State), + State2 = verify_chunks(UnionInterval, Intervals, State), + case State2#state.cursor >= State2#state.end_offset of + true -> + ar:console("Done verifying ~s!~n", [StoreID]), + ?LOG_INFO([{event, verify_chunk_storage_verify_chunks_done}, {store_id, StoreID}]); + false -> + gen_server:cast(self(), verify) + end, + State2. + +verify_chunks(not_found, _Intervals, State) -> + State#state{ cursor = State#state.end_offset }; +verify_chunks({End, _Start}, _Intervals, #state{cursor = Cursor} = State) when Cursor >= End -> + State; +verify_chunks({IntervalEnd, IntervalStart}, Intervals, State) -> + #state{cursor = Cursor, store_id = StoreID} = State, + Cursor2 = max(IntervalStart, Cursor), + ChunkData = ar_data_sync:get_chunk_by_byte({chunks_index, StoreID}, Cursor2+1), + State2 = verify_chunk(ChunkData, Intervals, State#state{ cursor = Cursor2 }), + verify_chunks({IntervalEnd, IntervalStart}, Intervals, State2). + +verify_chunk({error, Reason}, _Intervals, State) -> + #state{ cursor = Cursor } = State, + log_error(get_chunk_error, Cursor, ?DATA_CHUNK_SIZE, [{reason, Reason}], State); +verify_chunk({ok, _Key, MetaData}, Intervals, State) -> + {AbsoluteOffset, _ChunkDataKey, _TXRoot, _DataRoot, _TXPath, + _TXRelativeOffset, ChunkSize} = MetaData, + {ChunkStorageInterval, _DataSyncInterval} = Intervals, + + State2 = verify_chunk_storage(AbsoluteOffset, ChunkSize, ChunkStorageInterval, State), + + State3 = verify_proof(MetaData, State2), + + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + State3#state{ cursor = PaddedOffset }. + +verify_proof(MetaData, State) -> + #state{ store_id = StoreID } = State, + {AbsoluteOffset, ChunkDataKey, TXRoot, _DataRoot, TXPath, + _TXRelativeOffset, ChunkSize} = MetaData, + + case ar_data_sync:read_data_path({chunk_data_db, StoreID}, ChunkDataKey) of + {ok, DataPath} -> + case ar_poa:validate_paths(TXRoot, TXPath, DataPath, AbsoluteOffset - 1) of + {false, _Proof} -> + invalidate_chunk(validate_paths_error, AbsoluteOffset, ChunkSize, State); + {true, _Proof} -> + State + end; + Error -> + invalidate_chunk( + read_data_path_error, AbsoluteOffset, ChunkSize, [{reason, Error}], State) + end. + +verify_chunk_storage(Offset, _ChunkSize, {End, Start}, State) + when Offset - ?DATA_CHUNK_SIZE >= Start andalso Offset =< End -> + State; +verify_chunk_storage(Offset, ChunkSize, _Interval, State) -> + #state{ packing = Packing } = State, + case ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing) of + true -> + invalidate_chunk(chunk_storage_gap, Offset, ChunkSize, State); + false -> + State + end. + +invalidate_chunk(Type, Offset, ChunkSize, State) -> + invalidate_chunk(Type, Offset, ChunkSize, [], State). + +invalidate_chunk(Type, Offset, ChunkSize, Logs, State) -> + #state{ store_id = StoreID } = State, + ar_data_sync:invalidate_bad_data_record(Offset - ChunkSize, Offset, StoreID, 5), + log_error(Type, Offset, ChunkSize, Logs, State). + +log_error(Type, Offset, ChunkSize, Logs, State) -> + #state{ verify_report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State, + + LogMessage = [{event, verify_chunk_storage_error}, + {type, Type}, {store_id, StoreID}, + {packing, ar_serialize:encode_packing(Packing, true)}, + {offset, Offset}, {cursor, Cursor}, {chunk_size, ChunkSize}] ++ Logs, + ?LOG_INFO(LogMessage), + NewBytes = maps:get(Type, Report#verify_report.error_bytes, 0) + ChunkSize, + NewChunks = maps:get(Type, Report#verify_report.error_chunks, 0) + 1, + + Report2 = Report#verify_report{ + total_error_bytes = Report#verify_report.total_error_bytes + ChunkSize, + total_error_chunks = Report#verify_report.total_error_chunks + 1, + error_bytes = maps:put(Type, NewBytes, Report#verify_report.error_bytes), + error_chunks = maps:put(Type, NewChunks, Report#verify_report.error_chunks) + }, + State#state{ verify_report = Report2 }. + +query_intervals(State) -> + #state{cursor = Cursor, store_id = StoreID} = State, + {ChunkStorageInterval, DataSyncInterval} = align_intervals(Cursor, StoreID), + UnionInterval = union_intervals(ChunkStorageInterval, DataSyncInterval), + {UnionInterval, {ChunkStorageInterval, DataSyncInterval}}. + +align_intervals(Cursor, StoreID) -> + ChunkStorageInterval = ar_sync_record:get_next_synced_interval( + Cursor, infinity, ar_chunk_storage, StoreID), + DataSyncInterval = ar_sync_record:get_next_synced_interval( + Cursor, infinity, ar_data_sync, StoreID), + align_intervals(Cursor, ChunkStorageInterval, DataSyncInterval). + +align_intervals(_Cursor, not_found, not_found) -> + {not_found, not_found}; +align_intervals(Cursor, not_found, DataSyncInterval) -> + {not_found, clamp_interval(Cursor, infinity, DataSyncInterval)}; +align_intervals(Cursor, ChunkStorageInterval, not_found) -> + {clamp_interval(Cursor, infinity, ChunkStorageInterval), not_found}; +align_intervals(Cursor, ChunkStorageInterval, DataSyncInterval) -> + {ChunkStorageEnd, _} = ChunkStorageInterval, + {DataSyncEnd, _} = DataSyncInterval, + + { + clamp_interval(Cursor, DataSyncEnd, ChunkStorageInterval), + clamp_interval(Cursor, ChunkStorageEnd, DataSyncInterval) + }. + +union_intervals(not_found, not_found) -> + not_found; +union_intervals(not_found, B) -> + B; +union_intervals(A, not_found) -> + A; +union_intervals({End1, Start1}, {End2, Start2}) -> + {max(End1, End2), min(Start1, Start2)}. + +clamp_interval(ClampMin, ClampMax, {End, Start}) -> + check_interval({min(End, ClampMax), max(Start, ClampMin)}). + +check_interval({End, Start}) when Start > End -> + not_found; +check_interval(Interval) -> + Interval. + +report_progress(State) -> + #state{ + store_id = StoreID, verify_report = Report, cursor = Cursor, + start_offset = StartOffset, end_offset = EndOffset + } = State, + BytesProcessed = Cursor - StartOffset, + Progress = BytesProcessed * 100 div (EndOffset - StartOffset), + Report2 = Report#verify_report{ + bytes_processed = BytesProcessed, + progress = Progress + }, + ar_verify_chunks_reporter:update(StoreID, Report2), + State#state{ verify_report = Report2 }. + +%% ar_chunk_storage does not store small chunks before strict_split_data_threshold +%% (before 30607159107830 = partitions 0-7 and a half of 8 +%% + +%%%=================================================================== +%%% Tests. +%%%=================================================================== + +intervals_test_() -> + [ + {timeout, 30, fun test_align_intervals/0}, + {timeout, 30, fun test_union_intervals/0} + ]. + +verify_chunk_storage_test_() -> + [ + {timeout, 30, fun test_verify_chunk_storage_in_interval/0}, + {timeout, 30, fun test_verify_chunk_storage_should_store/0}, + {timeout, 30, fun test_verify_chunk_storage_should_not_store/0} + ]. + +verify_proof_test_() -> + [ + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> not_found end}], + fun test_verify_proof_no_datapath/0 + ), + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {true, <<>>} end} + ], + fun test_verify_proof_valid_paths/0 + ), + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {false, <<>>} end} + ], + fun test_verify_proof_invalid_paths/0 + ) + ]. + +verify_chunk_test_() -> + [ + ar_test_node:test_with_mocked_functions([ + {ar_data_sync, read_data_path, fun(_, _) -> {ok, <<>>} end}, + {ar_poa, validate_paths, fun(_, _, _, _) -> {true, <<>>} end} + ], + fun test_verify_chunk/0 + ) + ]. + +test_align_intervals() -> + ?assertEqual( + {not_found, not_found}, + align_intervals(0, not_found, not_found)), + ?assertEqual( + {{10, 5}, not_found}, + align_intervals(0, {10, 5}, not_found)), + ?assertEqual( + {{10, 7}, not_found}, + align_intervals(7, {10, 5}, not_found)), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {10, 5}, not_found)), + ?assertEqual( + {not_found, {10, 5}}, + align_intervals(0, not_found, {10, 5})), + ?assertEqual( + {not_found, {10, 7}}, + align_intervals(7, not_found, {10, 5})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, not_found, {10, 5})), + + ?assertEqual( + {{9, 4}, {9, 5}}, + align_intervals(0, {9, 4}, {10, 5})), + ?assertEqual( + {{9, 7}, {9, 7}}, + align_intervals(7, {9, 4}, {10, 5})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {9, 4}, {10, 5})), + ?assertEqual( + {{9, 5}, {9, 4}}, + align_intervals(0, {10, 5}, {9, 4})), + ?assertEqual( + {{9, 7}, {9, 7}}, + align_intervals(7, {10, 5}, {9, 4})), + ?assertEqual( + {not_found, not_found}, + align_intervals(12, {10, 5}, {9, 4})), + ok. + +test_union_intervals() -> + ?assertEqual( + not_found, + union_intervals(not_found, not_found)), + ?assertEqual( + {10, 5}, + union_intervals(not_found, {10, 5})), + ?assertEqual( + {10, 5}, + union_intervals({10, 5}, not_found)), + ?assertEqual( + {10, 3}, + union_intervals({10, 7}, {5, 3})), + ok. + + +test_verify_chunk_storage_in_interval() -> + ?assertEqual( + #state{}, + verify_chunk_storage( + 10*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ?assertEqual( + #state{}, + verify_chunk_storage( + 5*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ?assertEqual( + #state{}, + verify_chunk_storage( + 20*?DATA_CHUNK_SIZE, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{})), + ok. + +test_verify_chunk_storage_should_store() -> + Addr = crypto:strong_rand_bytes(32), + ExpectedState = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE}, + error_chunks = #{chunk_storage_gap => 1} + } + }, + ?assertEqual( + ExpectedState, + verify_chunk_storage( + 0, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + #state{ + packing = {composite, Addr, 1}, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{chunk_storage_gap => 1} + } + }, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = {composite, Addr, 1} })), + ok. + +test_verify_chunk_storage_should_not_store() -> + ExpectedState = #state{ + packing = unpacked + }, + ?assertEqual( + ExpectedState, + verify_chunk_storage( + 0, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState, + verify_chunk_storage( + ?STRICT_DATA_SPLIT_THRESHOLD + 1, + ?DATA_CHUNK_SIZE div 2, + {20*?DATA_CHUNK_SIZE, 5*?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ok. + +test_verify_proof_no_datapath() -> + ExpectedState1 = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{read_data_path_error => 1} + } + }, + ExpectedState2 = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{read_data_path_error => 1} + } + }, + ?assertEqual( + ExpectedState1, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState2, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}, + #state{ packing = unpacked })), + ok. + +test_verify_proof_valid_paths() -> + ?assertEqual( + #state{}, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{})), + ok. + +test_verify_proof_invalid_paths() -> + ExpectedState1 = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{validate_paths_error => 1} + } + }, + ExpectedState2 = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE div 2, + total_error_chunks = 1, + error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE div 2}, + error_chunks = #{validate_paths_error => 1} + } + }, + ?assertEqual( + ExpectedState1, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE}, + #state{ packing = unpacked })), + ?assertEqual( + ExpectedState2, + verify_proof( + {10, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}, + #state{ packing = unpacked })), + ok. + +test_verify_chunk() -> + PreSplitOffset = ?STRICT_DATA_SPLIT_THRESHOLD - (?DATA_CHUNK_SIZE div 2), + PostSplitOffset = ?STRICT_DATA_SPLIT_THRESHOLD + (?DATA_CHUNK_SIZE div 2), + IntervalStart = ?STRICT_DATA_SPLIT_THRESHOLD - ?DATA_CHUNK_SIZE, + IntervalEnd = ?STRICT_DATA_SPLIT_THRESHOLD + ?DATA_CHUNK_SIZE, + Interval = {IntervalEnd, IntervalStart}, + ?assertEqual( + #state{cursor = PreSplitOffset}, + verify_chunk( + {ok, <<>>, {PreSplitOffset, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}}, + {Interval, not_found}, + #state{})), + ?assertEqual( + #state{cursor = ?STRICT_DATA_SPLIT_THRESHOLD + ?DATA_CHUNK_SIZE}, + verify_chunk( + {ok, <<>>, {PostSplitOffset, <<>>, <<>>, <<>>, <<>>, <<>>, ?DATA_CHUNK_SIZE div 2}}, + {Interval, not_found}, + #state{})), + ExpectedState = #state{ + packing = unpacked, + verify_report = #verify_report{ + total_error_bytes = ?DATA_CHUNK_SIZE, + total_error_chunks = 1, + error_bytes = #{get_chunk_error => ?DATA_CHUNK_SIZE}, + error_chunks = #{get_chunk_error => 1} + } + }, + ?assertEqual( + ExpectedState, + verify_chunk( + {error, some_error}, + {Interval, not_found}, + #state{ packing = unpacked })), + ok. diff --git a/apps/arweave/src/ar_verify_chunks_reporter.erl b/apps/arweave/src/ar_verify_chunks_reporter.erl new file mode 100644 index 000000000..1c03d03f0 --- /dev/null +++ b/apps/arweave/src/ar_verify_chunks_reporter.erl @@ -0,0 +1,103 @@ +%%% The blob storage optimized for fast reads. +-module(ar_verify_chunks_reporter). + +-behaviour(gen_server). + +-export([start_link/0, update/2]). +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include_lib("arweave/include/ar.hrl"). +-include_lib("arweave/include/ar_verify_chunks.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + reports = #{} :: #{binary() => #verify_report{}} +}). + +-define(REPORT_PROGRESS_INTERVAL, 10000). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +%% @doc Start the server. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec update(binary(), #verify_report{}) -> ok. +update(StoreID, Report) -> + gen_server:cast(?MODULE, {update, StoreID, Report}). + +%%%=================================================================== +%%% Generic server callbacks. +%%%=================================================================== + +init([]) -> + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {ok, #state{}}. + + +handle_cast({update, StoreID, Report}, State) -> + {noreply, State#state{ reports = maps:put(StoreID, Report, State#state.reports) }}; + +handle_cast(report_progress, State) -> + #state{ + reports = Reports + } = State, + + print_reports(Reports), + ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress), + {noreply, State}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Call, From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]), + {reply, ok, State}. + +handle_info(Info, State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +print_reports(Reports) when map_size(Reports) == 0 -> + ok; +print_reports(Reports) -> + print_header(), + maps:foreach( + fun(StoreID, Report) -> + print_report(StoreID, Report) + end, + Reports + ), + print_footer(), + ok. + +print_header() -> + ar:console("Verification Report~n", []), + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []), + ar:console("| Storage Module | Processed | % | Errors | Verify Rate |~n", []), + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []). + +print_footer() -> + ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n~n", []). + +print_report(StoreID, Report) -> + #verify_report{ + total_error_bytes = TotalErrorBytes, + bytes_processed = BytesProcessed, + progress = Progress, + start_time = StartTime + } = Report, + Duration = erlang:system_time(millisecond) - StartTime, + Rate = 1000 * BytesProcessed / Duration, + ar:console("| ~65s | ~4B GB | ~3B% | ~5.1f GB | ~6.1f MB/s |~n", + [ + StoreID, BytesProcessed div 1000000000, Progress, + TotalErrorBytes / 1000000000, Rate / 1000000 + ] + ). \ No newline at end of file diff --git a/apps/arweave/src/ar_verify_chunks_sup.erl b/apps/arweave/src/ar_verify_chunks_sup.erl new file mode 100644 index 000000000..9d81d65c9 --- /dev/null +++ b/apps/arweave/src/ar_verify_chunks_sup.erl @@ -0,0 +1,40 @@ +-module(ar_verify_chunks_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-include_lib("arweave/include/ar_sup.hrl"). +-include_lib("arweave/include/ar_config.hrl"). + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks. +%% =================================================================== + +init([]) -> + {ok, Config} = application:get_env(arweave, config), + case Config#config.verify of + false -> + ignore; + true -> + Workers = lists:map( + fun(StorageModule) -> + StoreID = ar_storage_module:id(StorageModule), + Name = ar_verify_chunks:name(StoreID), + ?CHILD_WITH_ARGS(ar_verify_chunks, worker, Name, [Name, StoreID]) + end, + Config#config.storage_modules + ), + Reporter = ?CHILD(ar_verify_chunks_reporter, worker), + {ok, {{one_for_one, 5, 10}, [Reporter | Workers]}} + end. +