Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to recognize "seeding-only" chunks early #537

Merged
merged 1 commit into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 41 additions & 2 deletions apps/arweave/src/ar_data_sync.erl
Expand Up @@ -135,7 +135,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
case ar_kv:get(DiskPoolChunksIndex, DiskPoolChunkKey) of
{ok, _DiskPoolChunk} ->
%% The chunk is already in disk pool.
synced;
{synced_disk_pool, EndOffset2};
not_found ->
case DataRootOffsetReply of
not_found ->
Expand Down Expand Up @@ -165,6 +165,13 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
case CheckSynced of
synced ->
ok;
{synced_disk_pool, EndOffset4} ->
case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset4) of
false ->
temporary;
true ->
ok
end;
{error, _} = Error4 ->
Error4;
{ok, {DataPathHash2, DiskPoolChunkKey2, {EndOffset3, PassesBase3, PassesStrict3,
Expand Down Expand Up @@ -200,11 +207,43 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
ets:update_counter(ar_data_sync_state, disk_pool_size,
{2, ChunkSize}),
prometheus_gauge:inc(pending_chunks_size, ChunkSize),
ok
case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset3) of
false ->
temporary;
true ->
ok
end
end
end
end.

%% @doc Return true if we expect the chunk with the given data root index value and
%% relative end offset to end up in one of the configured storage modules.
is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset) ->
WeaveSize = ar_node:get_current_weave_size(),
case DataRootOffsetReply of
not_found ->
%% A chunk from a pending transaction.
is_offset_vicinity_covered(WeaveSize);
{ok, {TXStartOffset, _}} ->
WeaveSize = ar_node:get_current_weave_size(),
Size = ar_node:get_recent_max_block_size(),
AbsoluteEndOffset = TXStartOffset + EndOffset,
case AbsoluteEndOffset > WeaveSize - Size * 4 of
JamesPiechota marked this conversation as resolved.
Show resolved Hide resolved
true ->
%% A relatively recent offset - do not expect this chunk to be
%% persisted unless we have some storage modules configured for
%% the space ahead (the data may be rearranged during after a reorg).
is_offset_vicinity_covered(AbsoluteEndOffset);
false ->
ar_storage_module:has_any(AbsoluteEndOffset)
end
end.

is_offset_vicinity_covered(Offset) ->
Size = ar_node:get_recent_max_block_size(),
ar_storage_module:has_range(max(0, Offset - Size * 2), Offset + Size * 2).

%% @doc Notify the server about the new pending data root (added to mempool).
%% The server may accept pending chunks and store them in the disk pool.
add_data_root_to_disk_pool(_, 0, _) ->
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_http_iface_middleware.erl
Expand Up @@ -2190,6 +2190,8 @@ handle_post_chunk(validate_proof, Proof, Req) ->
receive
ok ->
{200, #{}, <<>>, Req};
temporary ->
{303, #{}, <<>>, Req};
{error, data_root_not_found} ->
{400, #{}, jiffy:encode(#{ error => data_root_not_found }), Req};
{error, exceeds_disk_pool_size_limit} ->
Expand Down
15 changes: 14 additions & 1 deletion apps/arweave/src/ar_node.erl
Expand Up @@ -14,7 +14,8 @@
get_recent_txs_map/0, get_mempool_size/0,
get_block_shadow_from_cache/1, get_recent_partition_upper_bound_by_prev_h/1,
get_block_txs_pairs/0, get_partition_upper_bound/1, get_nth_or_last/2,
get_partition_number/1, get_max_partition_number/1]).
get_partition_number/1, get_max_partition_number/1,
get_current_weave_size/0, get_recent_max_block_size/0]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -293,6 +294,18 @@ get_max_partition_number(infinity) ->
get_max_partition_number(PartitionUpperBound) ->
max(0, PartitionUpperBound div ?PARTITION_SIZE - 1).

%% @doc Return the current weave size. Assume the node has joined the network and
%% initialized the state.
get_current_weave_size() ->
[{_, WeaveSize}] = ets:lookup(node_state, weave_size),
WeaveSize.

%% @doc Return the maximum block size among the latest ?BLOCK_INDEX_HEAD_LEN blocks.
%% Assume the node has joined the network and initialized the state.
get_recent_max_block_size() ->
[{_, MaxBlockSize}] = ets:lookup(node_state, recent_max_block_size),
MaxBlockSize.

%%%===================================================================
%%% Tests.
%%%===================================================================
Expand Down
16 changes: 15 additions & 1 deletion apps/arweave/src/ar_node_worker.erl
Expand Up @@ -396,8 +396,10 @@ handle_info({event, nonce_limiter, initialized}, State) ->
BlockTXPairs = [block_txs_pair(Block) || Block <- Blocks],
{BlockAnchors, RecentTXMap} = get_block_anchors_and_recent_txs_map(BlockTXPairs),
{Rate, ScheduledRate} = {B#block.usd_to_ar_rate, B#block.scheduled_usd_to_ar_rate},
RecentBI2 = lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN),
ets:insert(node_state, [
{recent_block_index, lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN)},
{recent_block_index, RecentBI2},
{recent_max_block_size, get_max_block_size(RecentBI2)},
{is_joined, true},
{current, Current},
{timestamp, B#block.timestamp},
Expand Down Expand Up @@ -1023,6 +1025,17 @@ get_block_anchors_and_recent_txs_map(BlockTXPairs) ->
lists:sublist(BlockTXPairs, ?MAX_TX_ANCHOR_DEPTH)
).

get_max_block_size([_SingleElement]) ->
0;
get_max_block_size([{_BH, WeaveSize, _TXRoot} | BI]) ->
get_max_block_size(BI, WeaveSize, 0).

get_max_block_size([], _WeaveSize, Max) ->
Max;
get_max_block_size([{_BH, PrevWeaveSize, _TXRoot} | BI], WeaveSize, Max) ->
Max2 = max(Max, WeaveSize - PrevWeaveSize),
get_max_block_size(BI, PrevWeaveSize, Max2).

apply_block(State) ->
case ar_block_cache:get_earliest_not_validated_from_longest_chain(block_cache) of
not_found ->
Expand Down Expand Up @@ -1536,6 +1549,7 @@ apply_validated_block2(State, B, PrevBlocks, Orphans, RecentBI, BlockTXPairs) ->
ar_storage:store_block_time_history_part(AddedBlocks, lists:last(PrevBlocks)),
ets:insert(node_state, [
{recent_block_index, RecentBI2},
{recent_max_block_size, get_max_block_size(RecentBI2)},
{current, B#block.indep_hash},
{timestamp, B#block.timestamp},
{wallet_list, B#block.wallet_list},
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_storage.erl
Expand Up @@ -680,6 +680,8 @@ write_tx_data(DataRoot, DataTree, Data, SizeTaggedChunks, TXID) ->
case ar_data_sync:add_chunk(DataRoot, DataPath, Chunk, Offset - 1, TXSize) of
ok ->
Acc;
temporary ->
Acc;
{error, Reason} ->
?LOG_WARNING([{event, failed_to_write_tx_chunk},
{tx, ar_util:encode(TXID)},
Expand Down
82 changes: 78 additions & 4 deletions apps/arweave/src/ar_storage_module.erl
@@ -1,14 +1,19 @@
-module(ar_storage_module).

-export([id/1, label/1, address_label/1, packing_label/1, label_by_id/1,
get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2]).
get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2,
has_any/1, has_range/2]).

-export([get_unique_sorted_intervals/1]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("eunit/include/eunit.hrl").


-include_lib("eunit/include/eunit.hrl").

%% The overlap makes sure a 100 MiB recall range can always be fetched
%% from a single storage module.
-ifdef(DEBUG).
Expand Down Expand Up @@ -159,6 +164,23 @@ get_all(Start, End) ->
{ok, Config} = application:get_env(arweave, config),
get_all(Start, End, Config#config.storage_modules, []).

%% @doc Return true if the given Offset belongs to at least one storage module.
has_any(Offset) ->
{ok, Config} = application:get_env(arweave, config),
has_any(Offset, Config#config.storage_modules).

%% @doc Return true if the given range is covered by the configured storage modules.
has_range(Start, End) ->
{ok, Config} = application:get_env(arweave, config),
case ets:lookup(?MODULE, unique_sorted_intervals) of
[] ->
Intervals = get_unique_sorted_intervals(Config#config.storage_modules),
ets:insert(?MODULE, {unique_sorted_intervals, Intervals}),
has_range(Start, End, Intervals);
[{_, Intervals}] ->
has_range(Start, End, Intervals)
end.

%%%===================================================================
%%% Private functions.
%%%===================================================================
Expand Down Expand Up @@ -200,6 +222,41 @@ get_all(Start, End, [StorageModule | StorageModules], FoundModules) ->
get_all(_Start, _End, [], FoundModules) ->
FoundModules.

has_any(_Offset, []) ->
false;
has_any(Offset, [{BucketSize, Bucket, _Packing} | StorageModules]) ->
case Offset > Bucket * BucketSize andalso Offset =< (Bucket + 1) * BucketSize + ?OVERLAP of
true ->
true;
false ->
has_any(Offset, StorageModules)
end.

get_unique_sorted_intervals(StorageModules) ->
get_unique_sorted_intervals(StorageModules, ar_intervals:new()).

get_unique_sorted_intervals([], Intervals) ->
[{Start, End} || {End, Start} <- ar_intervals:to_list(Intervals)];
get_unique_sorted_intervals([{BucketSize, Bucket, _Packing} | StorageModules], Intervals) ->
End = (Bucket + 1) * BucketSize,
Start = Bucket * BucketSize,
get_unique_sorted_intervals(StorageModules, ar_intervals:add(Intervals, End, Start)).

has_range(PartitionStart, PartitionEnd, _Intervals)
when PartitionStart >= PartitionEnd ->
true;
has_range(_PartitionStart, _PartitionEnd, []) ->
false;
has_range(PartitionStart, _PartitionEnd, [{Start, _End} | _Intervals])
when PartitionStart < Start ->
%% The given intervals are unique and sorted.
false;
has_range(PartitionStart, PartitionEnd, [{_Start, End} | Intervals])
when PartitionStart >= End ->
has_range(PartitionStart, PartitionEnd, Intervals);
has_range(_PartitionStart, PartitionEnd, [{_Start, End} | Intervals]) ->
has_range(End, PartitionEnd, Intervals).

%%%===================================================================
%%% Tests.
%%%===================================================================
Expand All @@ -223,6 +280,23 @@ label_test() ->
?assertEqual("storage_module_524288_2_3",
label({524288, 2, {spora_2_6, <<"s÷">>}})).




has_any_test() ->
?assertEqual(false, has_any(0, [])),
?assertEqual(false, has_any(0, [{10, 1, p}])),
?assertEqual(false, has_any(10, [{10, 1, p}])),
?assertEqual(true, has_any(11, [{10, 1, p}])),
?assertEqual(true, has_any(20 + ?OVERLAP, [{10, 1, p}])),
?assertEqual(false, has_any(20 + ?OVERLAP + 1, [{10, 1, p}])).

get_unique_sorted_intervals_test() ->
?assertEqual([{0, 24}, {90, 120}],
get_unique_sorted_intervals([{10, 0, p}, {30, 3, p}, {20, 0, p}, {12, 1, p}])).

has_range_test() ->
?assertEqual(false, has_range(0, 10, [])),
?assertEqual(false, has_range(0, 10, [{0, 9}])),
?assertEqual(true, has_range(0, 10, [{0, 10}])),
?assertEqual(true, has_range(0, 10, [{0, 11}])),
?assertEqual(true, has_range(0, 10, [{0, 9}, {9, 10}])),
?assertEqual(true, has_range(5, 10, [{0, 9}, {9, 10}])),
?assertEqual(true, has_range(5, 10, [{0, 2}, {2, 9}, {9, 10}])).
33 changes: 18 additions & 15 deletions apps/arweave/test/ar_data_sync_tests.erl
Expand Up @@ -221,7 +221,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
Data1 = crypto:strong_rand_bytes(
(?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024) + 1
),
Chunks1 = split(?DATA_CHUNK_SIZE, Data1),
Chunks1 = imperfect_split(Data1),
{DataRoot1, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks1)
Expand Down Expand Up @@ -249,7 +249,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB - 1
) * 1024 * 1024
),
Chunks2 = split(Data2),
Chunks2 = imperfect_split(Data2),
{DataRoot2, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks2)
Expand All @@ -275,7 +275,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
byte_size(Data2),
?assert(Left < ?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024),
Data3 = crypto:strong_rand_bytes(Left + 1),
Chunks3 = split(Data3),
Chunks3 = imperfect_split(Data3),
{DataRoot3, _} = ar_merkle:generate_tree(
ar_tx:sized_chunks_to_sized_chunk_ids(
ar_tx:chunks_to_size_tagged_chunks(Chunks3)
Expand Down Expand Up @@ -303,12 +303,14 @@ test_rejects_chunks_exceeding_disk_pool_limit() ->
true = ar_util:do_until(
fun() ->
%% After a block is mined, the chunks receive their absolute offsets, which
%% end up above the rebase threshold and so the node discovers the very last
%% chunks of the last two transactions are invalid under these offsets and
%% frees up 131072 + 131072 bytes in the disk pool => we can submit a 262144-byte
%% chunk.
%% end up above the strict data split threshold and so the node discovers
%% the very last chunks of the last two transactions are invalid under these
%% offsets and frees up 131072 + 131072 bytes in the disk pool => we can submit
%% a 262144-byte chunk. Also, expect 303 instead of 200 because the last block
%% was large such that the configured partitions do not cover at least two
%% times as much space ahead of the current weave size.
case ar_test_node:post_chunk(main, ar_serialize:jsonify(FirstProof3)) of
{ok, {{<<"200">>, _}, _, _, _, _}} ->
{ok, {{<<"303">>, _}, _, _, _, _}} ->
true;
_ ->
false
Expand Down Expand Up @@ -1012,23 +1014,24 @@ v2_standard_split_get_chunks(<< _:262144/binary, LastChunk/binary >> = Rest, Chu
v2_standard_split_get_chunks(<< Chunk:262144/binary, Rest/binary >>, Chunks, MinSize) ->
v2_standard_split_get_chunks(Rest, [Chunk | Chunks], MinSize).

split(Data) ->
split(?DATA_CHUNK_SIZE, Data).
imperfect_split(Data) ->
imperfect_split(?DATA_CHUNK_SIZE, Data).

split(_ChunkSize, Bin) when byte_size(Bin) == 0 ->
imperfect_split(_ChunkSize, Bin) when byte_size(Bin) == 0 ->
[];
split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize ->
imperfect_split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize ->
[Bin];
split(ChunkSize, Bin) ->
imperfect_split(ChunkSize, Bin) ->
<<ChunkBin:ChunkSize/binary, Rest/binary>> = Bin,
HalfSize = ChunkSize div 2,
case byte_size(Rest) < HalfSize of
true ->
HalfSize = ChunkSize div 2,
<<ChunkBin2:HalfSize/binary, Rest2/binary>> = Bin,
%% If Rest is <<>>, both chunks are HalfSize - the chunks are invalid
%% after the strict data split threshold.
[ChunkBin2, Rest2];
false ->
[ChunkBin | split(ChunkSize, Rest)]
[ChunkBin | imperfect_split(ChunkSize, Rest)]
end.

build_proofs(B, TX, Chunks) ->
Expand Down