Skip to content

Commit

Permalink
Speedup default storage repacking
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Berman committed Apr 12, 2023
1 parent c6d1e33 commit 67a8f5a
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 45 deletions.
312 changes: 285 additions & 27 deletions apps/arweave/src/ar_chunk_storage.erl
Expand Up @@ -10,16 +10,24 @@

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("arweave/include/ar_chunk_storage.hrl").

-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/file.hrl").

-record(state, {
file_index,
store_id
store_id,
packing_map = #{},
repack_cursor = 0,
prev_repack_cursor = 0,
repacking_complete = false
}).

%% The number of bytes fetched from disk at a time during repacking.
-define(REPACK_INTERVAL_SIZE, (262144 * 100)).

%%%===================================================================
%%% Public interface.
%%%===================================================================
Expand Down Expand Up @@ -181,6 +189,7 @@ run_defragmentation() ->

init(StoreID) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(chunk),
{ok, Config} = application:get_env(arweave, config),
DataDir = Config#config.data_dir,
Dir =
Expand All @@ -207,29 +216,54 @@ init(StoreID) ->
end,
FileIndex
),
{ok, #state{ file_index = FileIndex2, store_id = StoreID }}.
{ok, #state{ file_index = FileIndex2, store_id = StoreID,
repack_cursor = read_repack_cursor(StoreID) }}.

handle_cast(store_repack_cursor, #state{ repacking_complete = true } = State) ->
{noreply, State};
handle_cast(store_repack_cursor,
#state{ repack_cursor = Cursor, prev_repack_cursor = Cursor } = State) ->
ar_util:cast_after(30000, self(), store_repack_cursor),
{noreply, State};
handle_cast(store_repack_cursor,
#state{ repack_cursor = Cursor, store_id = StoreID } = State) ->
ar_util:cast_after(30000, self(), store_repack_cursor),
ar:console("Repacked up to ~B, scanning further..~n", [Cursor]),
?LOG_INFO([{event, repacked_partially}, {cursor, Cursor}]),
store_repack_cursor(Cursor, StoreID),
{noreply, State#state{ prev_repack_cursor = Cursor }};

handle_cast(repacking_complete, State) ->
{noreply, State#state{ repacking_complete = true }};

handle_cast({repack, RightBound, Packing},
#state{ store_id = StoreID, repack_cursor = Cursor } = State) ->
gen_server:cast(self(), store_repack_cursor),
spawn(fun() -> repack(Cursor, RightBound, Packing, StoreID) end),
{noreply, State};
handle_cast({repack, Start, End, NextCursor, RightBound, Packing},
#state{ store_id = StoreID } = State) ->
spawn(fun() -> repack(Start, End, NextCursor, RightBound, Packing, StoreID) end),
{noreply, State};

handle_cast({register_packing_ref, Ref, Offset}, #state{ packing_map = Map } = State) ->
{noreply, State#state{ packing_map = maps:put(Ref, Offset, Map) }};

handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) ->
{noreply, State#state{ packing_map = maps:remove(Ref, Map) }};

handle_cast(Cast, State) ->
?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]),
{noreply, State}.

handle_call({put, Offset, Chunk}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE ->
#state{ file_index = FileIndex, store_id = StoreID } = State,
Key = get_key(Offset),
{Reply, FileIndex2} =
case store_chunk(Key, Offset, Chunk, FileIndex, StoreID) of
{ok, Filepath} ->
case ar_sync_record:add(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of
ok ->
ets:insert(chunk_storage_file_index, {{Key, StoreID}, Filepath}),
{ok, maps:put(Key, Filepath, FileIndex)};
Error ->
{Error, FileIndex}
end;
Error2 ->
{Error2, FileIndex}
end,
{reply, Reply, State#state{ file_index = FileIndex2 }};
case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of
{ok, FileIndex2} ->
{reply, ok, State#state{ file_index = FileIndex2 }};
Error ->
{reply, Error, State}
end;

handle_call({delete, Offset}, _From, State) ->
#state{ file_index = FileIndex, store_id = StoreID } = State,
Expand Down Expand Up @@ -262,6 +296,52 @@ handle_call(Request, _From, State) ->
?LOG_WARNING("event: unhandled_call, request: ~p", [Request]),
{reply, ok, State}.

handle_info({event, chunk, {packed, Ref, ChunkArgs}},
#state{ packing_map = Map, store_id = StoreID, file_index = FileIndex,
repack_cursor = PrevCursor } = State) ->
case maps:get(Ref, Map, not_found) of
not_found ->
{noreply, State};
Offset ->
State2 = State#state{ packing_map = maps:remove(Ref, Map) },
{Packing, Chunk, _, _, _} = ChunkArgs,
case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE,
ar_data_sync, StoreID) of
ok ->
case handle_store_chunk(Offset - 1, Chunk, FileIndex, StoreID) of
{ok, FileIndex2} ->
case ar_sync_record:add(Offset, Offset - ?DATA_CHUNK_SIZE,
Packing, ar_data_sync, StoreID) of
ok ->
?LOG_DEBUG([{event, repacked_chunk},
{offset, Offset},
{packing, format_packing(Packing)}]);
Error ->
?LOG_ERROR([{event, failed_to_record_repacked_chunk},
{offset, Offset},
{packing, format_packing(Packing)},
{error, io_lib:format("~p", [Error])}])
end,
{noreply, State2#state{ file_index = FileIndex2,
repack_cursor = Offset, prev_repack_cursor = PrevCursor }};
Error2 ->
?LOG_ERROR([{event, failed_to_store_repacked_chunk},
{offset, Offset},
{packing, format_packing(Packing)},
{error, io_lib:format("~p", [Error2])}]),
{noreply, State2}
end;
Error3 ->
?LOG_ERROR([{event, failed_to_remove_repacked_chunk_from_sync_record},
{offset, Offset},
{packing, format_packing(Packing)},
{error, io_lib:format("~p", [Error3])}]),
{noreply, State2}
end
end;
handle_info({event, chunk, _}, State) ->
{noreply, State};

handle_info({Ref, _Reply}, State) when is_reference(Ref) ->
%% A stale gen_server:call reply.
{noreply, State};
Expand All @@ -270,14 +350,60 @@ handle_info(Info, State) ->
?LOG_ERROR([{event, unhandled_info}, {info, io_lib:format("~p", [Info])}]),
{noreply, State}.

terminate(_Reason, _State) ->
terminate(_Reason, #state{ repack_cursor = Cursor, store_id = StoreID }) ->
sync_and_close_files(),
store_repack_cursor(Cursor, StoreID),
ok.

%%%===================================================================
%%% Private functions.
%%%===================================================================

read_repack_cursor(StoreID) ->
Filepath = get_filepath("repack_cursor", StoreID),
case file:read_file(Filepath) of
{ok, Bin} ->
case catch binary_to_term(Bin) of
Cursor when is_integer(Cursor) ->
Cursor;
_ ->
0
end;
_ ->
0
end.

store_repack_cursor(0, _StoreID) ->
ok;
store_repack_cursor(Cursor, StoreID) ->
Filepath = get_filepath("repack_cursor", StoreID),
file:write_file(Filepath, term_to_binary(Cursor)).

get_filepath(Name, StoreID) ->
{ok, Config} = application:get_env(arweave, config),
DataDir = Config#config.data_dir,
case StoreID of
"default" ->
filename:join([DataDir, ?CHUNK_DIR, Name]);
_ ->
filename:join([DataDir, "storage_modules", StoreID, ?CHUNK_DIR, Name])
end.

handle_store_chunk(Offset, Chunk, FileIndex, StoreID) ->
Key = get_key(Offset),
case store_chunk(Key, Offset, Chunk, FileIndex, StoreID) of
{ok, Filepath} ->
case ar_sync_record:add(Offset, Offset - ?DATA_CHUNK_SIZE, ?MODULE, StoreID) of
ok ->
ets:insert(chunk_storage_file_index, {{Key, StoreID}, Filepath}),
{ok, maps:put(Key, Filepath, FileIndex)};
Error ->
Error
end;
Error2 ->
Error2
end.

get_key(Offset) ->
StartOffset = Offset - ?DATA_CHUNK_SIZE,
StartOffset - StartOffset rem ?CHUNK_GROUP_SIZE.
Expand All @@ -289,15 +415,7 @@ store_chunk(Key, Offset, Chunk, FileIndex, StoreID) ->
filepath(Key, FileIndex, StoreID) ->
case maps:get(Key, FileIndex, not_found) of
not_found ->
{ok, Config} = application:get_env(arweave, config),
DataDir = Config#config.data_dir,
Name = integer_to_binary(Key),
case StoreID of
"default" ->
filename:join([DataDir, ?CHUNK_DIR, Name]);
_ ->
filename:join([DataDir, "storage_modules", StoreID, ?CHUNK_DIR, Name])
end;
get_filepath(integer_to_binary(Key), StoreID);
Filepath ->
Filepath
end.
Expand Down Expand Up @@ -585,6 +703,146 @@ read_chunks_sizes(DataDir) ->
modules_to_defrag(#config{defragmentation_modules = [_ | _] = Modules}) -> Modules;
modules_to_defrag(#config{storage_modules = Modules}) -> Modules.

chunk_offset_list_to_map(ChunkOffsets) ->
chunk_offset_list_to_map(ChunkOffsets, infinity, 0, #{}).

repack(Cursor, RightBound, Packing, StoreID) ->
case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ?MODULE, StoreID) of
not_found ->
ar:console("Repacking complete.~n", []),
?LOG_INFO([{event, repacking_complete},
{target_packing, format_packing(Packing)}]),
Server = list_to_atom("ar_chunk_storage_" ++ StoreID),
gen_server:cast(Server, repacking_complete),
ok;
{End, Start} ->
Start2 = max(Cursor, Start),
case ar_sync_record:get_next_synced_interval(Start2, End, Packing, ar_data_sync,
StoreID) of
not_found ->
repack(Start, End, End, RightBound, Packing, StoreID);
{End3, Start3} when Start3 > Start2 ->
repack(Start2, Start3, End3, RightBound, Packing, StoreID);
{End3, _Start3} ->
repack(End3, RightBound, Packing, StoreID)
end
end.

repack(Start, End, NextCursor, RightBound, Packing, StoreID) when Start >= End ->
repack(NextCursor, RightBound, Packing, StoreID);
repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) ->
Server = list_to_atom("ar_chunk_storage_" ++ StoreID),
CheckPackingBuffer =
case ar_packing_server:is_buffer_full() of
true ->
ar_util:cast_after(200, Server,
{repack, Start, End, NextCursor, RightBound, RequiredPacking}),
continue;
false ->
ok
end,
ReadRange =
case CheckPackingBuffer of
continue ->
continue;
ok ->
case catch get_range(Start, ?REPACK_INTERVAL_SIZE, StoreID) of
[] ->
Start2 = Start + ?REPACK_INTERVAL_SIZE,
gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound,
RequiredPacking}),
continue;
{'EXIT', _Exc} ->
?LOG_ERROR([{event, failed_to_read_chunk_range},
{start, Start},
{size, ?REPACK_INTERVAL_SIZE},
{store_id, StoreID}]),
Start2 = Start + ?REPACK_INTERVAL_SIZE,
gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound,
RequiredPacking}),
continue;
Range ->
{ok, Range}
end
end,
ReadMetadataRange =
case ReadRange of
continue ->
continue;
{ok, Range2} ->
{Min, Max, Map} = chunk_offset_list_to_map(Range2),
case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of
{ok, MetadataMap} ->
{ok, Map, MetadataMap};
{error, Error} ->
?LOG_ERROR([{event, failed_to_read_chunk_metadata_range},
{error, io_lib:format("~p", [Error])},
{left, Min},
{right, Max}]),
Start3 = Start + ?REPACK_INTERVAL_SIZE,
gen_server:cast(Server, {repack, Start3, End, NextCursor, RightBound,
RequiredPacking}),
continue
end
end,
case ReadMetadataRange of
continue ->
ok;
{ok, Map2, MetadataMap2} ->
Start4 = Start + ?REPACK_INTERVAL_SIZE,
gen_server:cast(Server, {repack, Start4, End, NextCursor, RightBound,
RequiredPacking}),
maps:fold(
fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok)
when ChunkSize /= ?DATA_CHUNK_SIZE,
AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD ->
ok;
(AbsoluteOffset, {_, TXRoot, _, _, _, ChunkSize}, ok) ->
PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset),
case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync) of
{{true, RequiredPacking}, StoreID} ->
?LOG_WARNING([{event,
repacking_process_chunk_already_repacked},
{packing, format_packing(RequiredPacking)},
{offset, AbsoluteOffset}]),
ok;
{{true, Packing}, StoreID} ->
case maps:get(PaddedOffset, Map2, not_found) of
not_found ->
ok;
Chunk ->
Ref = make_ref(),
gen_server:cast(Server,
{register_packing_ref, Ref, PaddedOffset}),
ar_util:cast_after(300000, Server,
{expire_repack_request, Ref}),
ar_events:send(chunk, {repack_request, Ref,
{RequiredPacking, Packing, Chunk,
AbsoluteOffset, TXRoot, ChunkSize}}),
ok
end;
_ ->
ok
end
end,
ok,
MetadataMap2
)
end.

chunk_offset_list_to_map([], Min, Max, Map) ->
{Min, Max, Map};
chunk_offset_list_to_map([{Offset, Chunk} | ChunkOffsets], Min, Max, Map) ->
chunk_offset_list_to_map(ChunkOffsets, min(Min, Offset), max(Max, Offset),
maps:put(Offset, Chunk, Map)).

format_packing({spora_2_6, Addr}) ->
"spora_2_6_" ++ binary_to_list(ar_util:encode(Addr));
format_packing(spora_2_5) ->
"spora_2_5";
format_packing(unpacked) ->
"unpacked".

%%%===================================================================
%%% Tests.
%%%===================================================================
Expand Down

0 comments on commit 67a8f5a

Please sign in to comment.