Skip to content

Commit

Permalink
Attempt to recognize "seeding-only" chunks early
Browse files Browse the repository at this point in the history
Send a distinct reply to POST /chunk.
  • Loading branch information
Lev Berman committed Apr 3, 2024
1 parent 40c30ab commit c844a19
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 20 deletions.
43 changes: 41 additions & 2 deletions apps/arweave/src/ar_data_sync.erl
Expand Up @@ -134,7 +134,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
case ar_kv:get(DiskPoolChunksIndex, DiskPoolChunkKey) of
{ok, _DiskPoolChunk} ->
%% The chunk is already in disk pool.
synced;
{synced_disk_pool, EndOffset2};
not_found ->
case DataRootOffsetReply of
not_found ->
Expand Down Expand Up @@ -164,6 +164,13 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
case CheckSynced of
synced ->
ok;
{synced_disk_pool, EndOffset4} ->
case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset4) of
false ->
temporary;
true ->
ok
end;
{error, _} = Error4 ->
Error4;
{ok, {DataPathHash2, DiskPoolChunkKey2, {EndOffset3, PassesBase3, PassesStrict3,
Expand Down Expand Up @@ -199,11 +206,43 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
ets:update_counter(ar_data_sync_state, disk_pool_size,
{2, ChunkSize}),
prometheus_gauge:inc(pending_chunks_size, ChunkSize),
ok
case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset3) of
false ->
temporary;
true ->
ok
end
end
end
end.

%% @doc Return true if we expect the chunk with the given data root index value and
%% relative end offset to end up in one of the configured storage modules.
is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset) ->
WeaveSize = ar_node:get_current_weave_size(),
case DataRootOffsetReply of
not_found ->
%% A chunk from a pending transaction.
is_offset_vicinity_covered(WeaveSize);
{ok, {TXStartOffset, _}} ->
WeaveSize = ar_node:get_current_weave_size(),
Size = ar_node:get_recent_max_block_size(),
AbsoluteEndOffset = TXStartOffset + EndOffset,
case AbsoluteEndOffset > WeaveSize - Size * 4 of
true ->
%% A relatively recent offset - do not expect this chunk to be
%% persisted unless we have some storage modules configured for
%% the space ahead (the data may be rearranged during after a reorg).
is_offset_vicinity_covered(AbsoluteEndOffset);
false ->
ar_storage_module:has_any(AbsoluteEndOffset)
end
end.

is_offset_vicinity_covered(Offset) ->
Size = ar_node:get_recent_max_block_size(),
ar_storage_module:has_range(max(0, Offset - Size * 2), Offset + Size * 2).

%% @doc Notify the server about the new pending data root (added to mempool).
%% The server may accept pending chunks and store them in the disk pool.
add_data_root_to_disk_pool(_, 0, _) ->
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_http_iface_middleware.erl
Expand Up @@ -2128,6 +2128,8 @@ handle_post_chunk(validate_proof, Proof, Req) ->
receive
ok ->
{200, #{}, <<>>, Req};
temporary ->
{303, #{}, <<>>, Req};
{error, data_root_not_found} ->
{400, #{}, jiffy:encode(#{ error => data_root_not_found }), Req};
{error, exceeds_disk_pool_size_limit} ->
Expand Down
15 changes: 14 additions & 1 deletion apps/arweave/src/ar_node.erl
Expand Up @@ -14,7 +14,8 @@
get_recent_txs_map/0, get_mempool_size/0,
get_block_shadow_from_cache/1, get_recent_partition_upper_bound_by_prev_h/1,
get_block_txs_pairs/0, get_partition_upper_bound/1, get_nth_or_last/2,
get_partition_number/1, get_max_partition_number/1]).
get_partition_number/1, get_max_partition_number/1,
get_current_weave_size/0, get_recent_max_block_size/0]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -293,6 +294,18 @@ get_max_partition_number(infinity) ->
get_max_partition_number(PartitionUpperBound) ->
max(0, PartitionUpperBound div ?PARTITION_SIZE - 1).

%% @doc Return the current weave size. Assume the node has joined the network and
%% initialized the state.
get_current_weave_size() ->
[{_, WeaveSize}] = ets:lookup(node_state, weave_size),
WeaveSize.

%% @doc Return the maximum block size among the latest ?BLOCK_INDEX_HEAD_LEN blocks.
%% Assume the node has joined the network and initialized the state.
get_recent_max_block_size() ->
[{_, MaxBlockSize}] = ets:lookup(node_state, recent_max_block_size),
MaxBlockSize.

%%%===================================================================
%%% Tests.
%%%===================================================================
Expand Down
16 changes: 15 additions & 1 deletion apps/arweave/src/ar_node_worker.erl
Expand Up @@ -396,8 +396,10 @@ handle_info({event, nonce_limiter, initialized}, State) ->
BlockTXPairs = [block_txs_pair(Block) || Block <- Blocks],
{BlockAnchors, RecentTXMap} = get_block_anchors_and_recent_txs_map(BlockTXPairs),
{Rate, ScheduledRate} = {B#block.usd_to_ar_rate, B#block.scheduled_usd_to_ar_rate},
RecentBI2 = lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN),
ets:insert(node_state, [
{recent_block_index, lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN)},
{recent_block_index, RecentBI2},
{recent_max_block_size, get_max_block_size(RecentBI2)},
{is_joined, true},
{current, Current},
{timestamp, B#block.timestamp},
Expand Down Expand Up @@ -1023,6 +1025,17 @@ get_block_anchors_and_recent_txs_map(BlockTXPairs) ->
lists:sublist(BlockTXPairs, ?MAX_TX_ANCHOR_DEPTH)
).

get_max_block_size([_SingleElement]) ->
0;
get_max_block_size([{_BH, WeaveSize, _TXRoot} | BI]) ->
get_max_block_size(BI, WeaveSize, 0).

get_max_block_size([], _WeaveSize, Max) ->
Max;
get_max_block_size([{_BH, PrevWeaveSize, _TXRoot} | BI], WeaveSize, Max) ->
Max2 = max(Max, WeaveSize - PrevWeaveSize),
get_max_block_size(BI, PrevWeaveSize, Max2).

apply_block(State) ->
case ar_block_cache:get_earliest_not_validated_from_longest_chain(block_cache) of
not_found ->
Expand Down Expand Up @@ -1536,6 +1549,7 @@ apply_validated_block2(State, B, PrevBlocks, Orphans, RecentBI, BlockTXPairs) ->
ar_storage:store_block_time_history_part(AddedBlocks, lists:last(PrevBlocks)),
ets:insert(node_state, [
{recent_block_index, RecentBI2},
{recent_max_block_size, get_max_block_size(RecentBI2)},
{current, B#block.indep_hash},
{timestamp, B#block.timestamp},
{wallet_list, B#block.wallet_list},
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_storage.erl
Expand Up @@ -680,6 +680,8 @@ write_tx_data(DataRoot, DataTree, Data, SizeTaggedChunks, TXID) ->
case ar_data_sync:add_chunk(DataRoot, DataPath, Chunk, Offset - 1, TXSize) of
ok ->
Acc;
temporary ->
Acc;
{error, Reason} ->
?LOG_WARNING([{event, failed_to_write_tx_chunk},
{tx, ar_util:encode(TXID)},
Expand Down
84 changes: 83 additions & 1 deletion apps/arweave/src/ar_storage_module.erl
@@ -1,12 +1,17 @@
-module(ar_storage_module).

-export([id/1, label/1, address_label/1, label_by_id/1,
get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2]).
get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2,
has_any/1, has_range/2]).

-export([get_unique_sorted_intervals/1]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("arweave/include/ar_config.hrl").

-include_lib("eunit/include/eunit.hrl").

%% The overlap makes sure a 100 MiB recall range can always be fetched
%% from a single storage module.
-ifdef(DEBUG).
Expand Down Expand Up @@ -172,6 +177,23 @@ get_all(Start, End) ->
{ok, Config} = application:get_env(arweave, config),
get_all(Start, End, Config#config.storage_modules, []).

%% @doc Return true if the given Offset belongs to at least one storage module.
has_any(Offset) ->
{ok, Config} = application:get_env(arweave, config),
has_any(Offset, Config#config.storage_modules).

%% @doc Return true if the given range is covered by the configured storage modules.
has_range(Start, End) ->
{ok, Config} = application:get_env(arweave, config),
case ets:lookup(?MODULE, unique_sorted_intervals) of
[] ->
Intervals = get_unique_sorted_intervals(Config#config.storage_modules),
ets:insert(?MODULE, {unique_sorted_intervals, Intervals}),
has_range(Start, End, Intervals);
[{_, Intervals}] ->
has_range(Start, End, Intervals)
end.

%%%===================================================================
%%% Private functions.
%%%===================================================================
Expand Down Expand Up @@ -204,3 +226,63 @@ get_all(Start, End, [StorageModule | StorageModules], FoundModules) ->
get_all(Start, End, StorageModules, [StorageModule | FoundModules]);
get_all(_Start, _End, [], FoundModules) ->
FoundModules.

has_any(_Offset, []) ->
false;
has_any(Offset, [{BucketSize, Bucket, _Packing} | StorageModules]) ->
case Offset > Bucket * BucketSize andalso Offset =< (Bucket + 1) * BucketSize + ?OVERLAP of
true ->
true;
false ->
has_any(Offset, StorageModules)
end.

get_unique_sorted_intervals(StorageModules) ->
get_unique_sorted_intervals(StorageModules, ar_intervals:new()).

get_unique_sorted_intervals([], Intervals) ->
[{Start, End} || {End, Start} <- ar_intervals:to_list(Intervals)];
get_unique_sorted_intervals([{BucketSize, Bucket, _Packing} | StorageModules], Intervals) ->
End = (Bucket + 1) * BucketSize,
Start = Bucket * BucketSize,
get_unique_sorted_intervals(StorageModules, ar_intervals:add(Intervals, End, Start)).

has_range(PartitionStart, PartitionEnd, _Intervals)
when PartitionStart >= PartitionEnd ->
true;
has_range(_PartitionStart, _PartitionEnd, []) ->
false;
has_range(PartitionStart, _PartitionEnd, [{Start, _End} | _Intervals])
when PartitionStart < Start ->
%% The given intervals are unique and sorted.
false;
has_range(PartitionStart, PartitionEnd, [{_Start, End} | Intervals])
when PartitionStart >= End ->
has_range(PartitionStart, PartitionEnd, Intervals);
has_range(_PartitionStart, PartitionEnd, [{_Start, End} | Intervals]) ->
has_range(End, PartitionEnd, Intervals).

%%%===================================================================
%%% Tests.
%%%===================================================================

has_any_test() ->
?assertEqual(false, has_any(0, [])),
?assertEqual(false, has_any(0, [{10, 1, p}])),
?assertEqual(false, has_any(10, [{10, 1, p}])),
?assertEqual(true, has_any(11, [{10, 1, p}])),
?assertEqual(true, has_any(20 + ?OVERLAP, [{10, 1, p}])),
?assertEqual(false, has_any(20 + ?OVERLAP + 1, [{10, 1, p}])).

get_unique_sorted_intervals_test() ->
?assertEqual([{0, 24}, {90, 120}],
get_unique_sorted_intervals([{10, 0, p}, {30, 3, p}, {20, 0, p}, {12, 1, p}])).

has_range_test() ->
?assertEqual(false, has_range(0, 10, [])),
?assertEqual(false, has_range(0, 10, [{0, 9}])),
?assertEqual(true, has_range(0, 10, [{0, 10}])),
?assertEqual(true, has_range(0, 10, [{0, 11}])),
?assertEqual(true, has_range(0, 10, [{0, 9}, {9, 10}])),
?assertEqual(true, has_range(5, 10, [{0, 9}, {9, 10}])),
?assertEqual(true, has_range(5, 10, [{0, 2}, {2, 9}, {9, 10}])).
33 changes: 18 additions & 15 deletions apps/arweave/test/ar_data_sync_tests.erl
Expand Up @@ -221,7 +221,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
Data1 = crypto:strong_rand_bytes(
(?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024) + 1
),
Chunks1 = split(?DATA_CHUNK_SIZE, Data1),
Chunks1 = imperfect_split(Data1),
{DataRoot1, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks1)
Expand Down Expand Up @@ -249,7 +249,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB - 1
) * 1024 * 1024
),
Chunks2 = split(Data2),
Chunks2 = imperfect_split(Data2),
{DataRoot2, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks2)
Expand All @@ -275,7 +275,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
byte_size(Data2),
?assert(Left < ?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024),
Data3 = crypto:strong_rand_bytes(Left + 1),
Chunks3 = split(Data3),
Chunks3 = imperfect_split(Data3),
{DataRoot3, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks3)
Expand Down Expand Up @@ -303,12 +303,14 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
true = ar_util:do_until(
fun() ->
%% After a block is mined, the chunks receive their absolute offsets, which
%% end up above the rebase threshold and so the node discovers the very last
%% chunks of the last two transactions are invalid under these offsets and
%% frees up 131072 + 131072 bytes in the disk pool => we can submit a 262144-byte
%% chunk.
%% end up above the strict data split threshold and so the node discovers
%% the very last chunks of the last two transactions are invalid under these
%% offsets and frees up 131072 + 131072 bytes in the disk pool => we can submit
%% a 262144-byte chunk. Also, expect 303 instead of 200 because the last block
%% was large such that the configured partitions do not cover at least two
%% times as much space ahead of the current weave size.
case ar_test_node:post_chunk(main, ar_serialize:jsonify(FirstProof3)) of
{ok, {{<<"200">>, _}, _, _, _, _}} ->
{ok, {{<<"303">>, _}, _, _, _, _}} ->
true;
_ ->
false
Expand Down Expand Up @@ -1012,23 +1014,24 @@ v2_standard_split_get_chunks(<< _:262144/binary, LastChunk/binary >> = Rest, Chu
v2_standard_split_get_chunks(<< Chunk:262144/binary, Rest/binary >>, Chunks, MinSize) ->
v2_standard_split_get_chunks(Rest, [Chunk | Chunks], MinSize).

split(Data) ->
split(?DATA_CHUNK_SIZE, Data).
imperfect_split(Data) ->
imperfect_split(?DATA_CHUNK_SIZE, Data).

split(_ChunkSize, Bin) when byte_size(Bin) == 0 ->
imperfect_split(_ChunkSize, Bin) when byte_size(Bin) == 0 ->
[];
split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize ->
imperfect_split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize ->
[Bin];
split(ChunkSize, Bin) ->
imperfect_split(ChunkSize, Bin) ->
<<ChunkBin:ChunkSize/binary, Rest/binary>> = Bin,
HalfSize = ChunkSize div 2,
case byte_size(Rest) < HalfSize of
true ->
HalfSize = ChunkSize div 2,
<<ChunkBin2:HalfSize/binary, Rest2/binary>> = Bin,
%% If Rest is <<>>, both chunks are HalfSize - the chunks are invalid
%% after the strict data split threshold.
[ChunkBin2, Rest2];
false ->
[ChunkBin | split(ChunkSize, Rest)]
[ChunkBin | imperfect_split(ChunkSize, Rest)]
end.

build_proofs(B, TX, Chunks) ->
Expand Down

0 comments on commit c844a19

Please sign in to comment.