Skip to content

Commit

Permalink
Implement webhook for tracking tx synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Berman committed Apr 25, 2024
1 parent 80fb925 commit 703c908
Show file tree
Hide file tree
Showing 13 changed files with 827 additions and 112 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Expand Up @@ -37,6 +37,7 @@ jobs:
ar_pricing,
ar_retarget,
ar_serialize,
ar_storage_module,
ar_storage,
ar_storage_module,
ar_sync_buckets,
Expand Down
34 changes: 25 additions & 9 deletions apps/arweave/src/ar.erl
Expand Up @@ -48,15 +48,31 @@ show_help() ->
)
end,
[
{"config_file (path)", "Load the configuration from the specified JSON file."
" The keys in the root object are mapped to the command line arguments "
"described here. Additionally, you may specify a semaphores key. Its value "
"has to be a nested JSON object where keys are some of: get_chunk, "
"get_and_pack_chunk, get_tx_data, post_chunk, post_tx, get_block_index, "
"get_wallet_list, get_sync_record. For instance, your"
" config file contents may look like {\"semaphores\": {\"post_tx\": 100}}."
" In this case, the node will validate up to 100 incoming transactions in "
"parallel."},
{"config_file (path)", io_lib:format("Load the configuration from the "
"specified JSON file.~n~n"
"The configuration file is currently the only place where you may configure "
"webhooks and tune semaphores.~n~n"
"An example:~n~n"
"{~n"
" \"webhooks\": [~n"
" {~n"
" \"events\": [\"transaction\", \"block\"],~n"
" \"url\": \"https://example.com/block_or_tx\",~n"
" \"headers\": {~n"
" \"Authorization\": \"Bearer 123\"~n"
" }~n"
" },~n"
" {~n"
" \"events\": [\"transaction_data\"],~n"
" \"url\": \"http://127.0.0.1:1985/tx_data\"~n"
" },~n"
" \"semaphores\": {\"post_tx\": 100}~n"
"}~n~n"
"100 means the node will validate up to 100 incoming transactions in "
"parallel.~n"
"The supported semaphore keys are get_chunk, get_and_pack_chunk, get_tx_data, "
"post_chunk, post_tx, get_block_index, get_wallet_list, get_sync_record.~n~n",
[])},
{"peer (IP:port)", "Join a network on a peer (or set of peers)."},
{"block_gossip_peer (IP:port)", "Optionally specify peer(s) to always"
" send blocks to."},
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_config.erl
Expand Up @@ -693,6 +693,7 @@ parse_webhook([], Webhook) ->
parse_webhook_events([Event | Rest], Events) ->
case Event of
<<"transaction">> -> parse_webhook_events(Rest, [transaction | Events]);
<<"transaction_data">> -> parse_webhook_events(Rest, [transaction_data | Events]);
<<"block">> -> parse_webhook_events(Rest, [block | Events]);
_ -> error
end;
Expand Down
68 changes: 61 additions & 7 deletions apps/arweave/src/ar_data_sync.erl
Expand Up @@ -6,7 +6,7 @@
is_chunk_proof_ratio_attractive/3,
add_chunk/5, add_data_root_to_disk_pool/3, maybe_drop_data_root_from_disk_pool/3,
get_chunk/2, get_chunk_proof/2, get_tx_data/1, get_tx_data/2,
get_tx_offset/1, has_data_root/2,
get_tx_offset/1, get_tx_offset_data_in_range/2, has_data_root/2,
request_tx_data_removal/3, request_data_removal/4, record_disk_pool_chunks_count/0,
record_chunk_cache_size_metric/0, is_chunk_cache_full/0, is_disk_space_sufficient/1,
get_chunk_by_byte/2, read_chunk/4, decrement_chunk_cache_size/0,
Expand Down Expand Up @@ -371,8 +371,6 @@ get_tx_data(TXID) ->
%% the size is bigger than SizeLimit.
get_tx_data(TXID, SizeLimit) ->
case get_tx_offset(TXID) of
{error, not_joined} ->
{error, not_joined};
{error, not_found} ->
{error, not_found};
{error, failed_to_read_tx_offset} ->
Expand All @@ -391,6 +389,14 @@ get_tx_offset(TXID) ->
TXIndex = {tx_index, "default"},
get_tx_offset(TXIndex, TXID).

%% @doc Return {ok, [{TXID, AbsoluteStartOffset, AbsoluteEndOffset}, ...]}
%% where AbsoluteStartOffset, AbsoluteEndOffset are transaction borders
%% (not clipped by the given range) for all TXIDs intersecting the given range.
get_tx_offset_data_in_range(Start, End) ->
TXIndex = {tx_index, "default"},
TXOffsetIndex = {tx_offset_index, "default"},
get_tx_offset_data_in_range(TXOffsetIndex, TXIndex, Start, End).

%% @doc Return true if the given {DataRoot, DataSize} is in the mempool
%% or in the index.
has_data_root(DataRoot, DataSize) ->
Expand Down Expand Up @@ -681,7 +687,7 @@ handle_cast({join, RecentBI}, State) ->
{cut, Offset}) || Module <- Config#config.storage_modules],
ok = ar_chunk_storage:cut(Offset, StoreID),
ok = ar_sync_record:cut(Offset, ?MODULE, StoreID),
ar_events:send(data_sync, {cut, Offset}),
ar_events:send(sync_record, {global_cut, Offset}),
reset_orphaned_data_roots_disk_pool_timestamps(OrphanedDataRoots)
end,
BI = ar_block_index:get_list_by_hash(element(1, lists:last(RecentBI))),
Expand Down Expand Up @@ -742,7 +748,7 @@ handle_cast({add_tip_block, BlockTXPairs, BI}, State) ->
reset_orphaned_data_roots_disk_pool_timestamps(OrphanedDataRoots),
ok = ar_chunk_storage:cut(BlockStartOffset, StoreID),
ok = ar_sync_record:cut(BlockStartOffset, ?MODULE, StoreID),
ar_events:send(data_sync, {cut, BlockStartOffset}),
ar_events:send(sync_record, {global_cut, BlockStartOffset}),
DiskPoolThreshold = get_disk_pool_threshold(BI),
ets:insert(ar_data_sync_state, {disk_pool_threshold, DiskPoolThreshold}),
State2 = State#sync_data_state{ weave_size = WeaveSize,
Expand Down Expand Up @@ -1699,6 +1705,53 @@ get_tx_offset(TXIndex, TXID) ->
{error, failed_to_read_offset}
end.

get_tx_offset_data_in_range(TXOffsetIndex, TXIndex, Start, End) ->
case ar_kv:get_prev(TXOffsetIndex, << Start:?OFFSET_KEY_BITSIZE >>) of
none ->
get_tx_offset_data_in_range2(TXOffsetIndex, TXIndex, Start, End);
{ok, << Start2:?OFFSET_KEY_BITSIZE >>, _} ->
get_tx_offset_data_in_range2(TXOffsetIndex, TXIndex, Start2, End);
Error ->
Error
end.

get_tx_offset_data_in_range2(TXOffsetIndex, TXIndex, Start, End) ->
case ar_kv:get_range(TXOffsetIndex, << Start:?OFFSET_KEY_BITSIZE >>,
<< (End - 1):?OFFSET_KEY_BITSIZE >>) of
{ok, EmptyMap} when map_size(EmptyMap) == 0 ->
{ok, []};
{ok, Map} ->
case maps:fold(
fun
(_, _Value, {error, _} = Error) ->
Error;
(_, TXID, Acc) ->
case get_tx_offset(TXIndex, TXID) of
{ok, {EndOffset, Size}} ->
case EndOffset =< Start of
true ->
Acc;
false ->
[{TXID, EndOffset - Size, EndOffset} | Acc]
end;
not_found ->
Acc;
Error ->
Error
end
end,
[],
Map
) of
{error, _} = Error ->
Error;
List ->
{ok, lists:reverse(List)}
end;
Error ->
Error
end.

get_tx_data(Start, End, Chunks) when Start >= End ->
{ok, iolist_to_binary(Chunks)};
get_tx_data(Start, End, Chunks) ->
Expand Down Expand Up @@ -1736,7 +1789,7 @@ remove_range(Start, End, Ref, ReplyTo) ->
case sets:is_empty(StorageRefs) of
true ->
ReplyTo ! {removed_range, Ref},
ar_events:send(data_sync, {remove_range, Start, End});
ar_events:send(sync_record, {global_remove_range, Start, End});
false ->
receive
{removed_range, StorageRef} ->
Expand Down Expand Up @@ -2174,6 +2227,8 @@ update_tx_index(SizeTaggedTXs, BlockStartOffset, StoreID) ->
case ar_kv:put({tx_index, StoreID}, TXID,
term_to_binary({AbsoluteEndOffset, TXSize})) of
ok ->
ar_events:send(tx, {registered_offset, TXID, AbsoluteEndOffset,
TXSize}),
ar_tx_blacklist:notify_about_added_tx(TXID, AbsoluteEndOffset,
AbsoluteStartOffset),
TXEndOffset;
Expand Down Expand Up @@ -2699,7 +2754,6 @@ update_chunks_index2(Args, State) ->
PaddedOffset = get_chunk_padded_offset(AbsoluteOffset),
case ar_sync_record:add(PaddedOffset, StartOffset, Packing, ?MODULE, StoreID) of
ok ->
ar_events:send(data_sync, {add_range, StartOffset, PaddedOffset, StoreID}),
ok;
{error, Reason} ->
?LOG_ERROR([{event, failed_to_update_sync_record}, {reason, Reason},
Expand Down
26 changes: 14 additions & 12 deletions apps/arweave/src/ar_ets_intervals.erl
Expand Up @@ -142,41 +142,43 @@ get_interval_with_byte(Table, Offset) ->
end.

%% @doc Return the lowest interval outside the recorded set of intervals,
%% strictly above the given Offset, and with the right bound at most RightBound.
%% strictly above the given Offset, and with the end offset at most EndOffsetUpperBound.
%% Return not_found if there are no such intervals.
get_next_interval_outside(_Table, Offset, RightBound) when Offset >= RightBound ->
get_next_interval_outside(_Table, Offset, EndOffsetUpperBound)
when Offset >= EndOffsetUpperBound ->
not_found;
get_next_interval_outside(Table, Offset, RightBound) ->
get_next_interval_outside(Table, Offset, EndOffsetUpperBound) ->
case ets:next(Table, Offset) of
'$end_of_table' ->
{RightBound, Offset};
{EndOffsetUpperBound, Offset};
NextOffset ->
case ets:lookup(Table, NextOffset) of
[{NextOffset, Start}] when Start > Offset ->
{min(RightBound, Start), Offset};
{min(EndOffsetUpperBound, Start), Offset};
_ ->
get_next_interval_outside(Table, NextOffset, RightBound)
get_next_interval_outside(Table, NextOffset, EndOffsetUpperBound)
end
end.

%% @doc Return the lowest interval inside the recorded set of intervals with the
%% end offset strictly above the given offset, and with the right bound at most RightBound.
%% end offset strictly above the given offset, and with the end offset
%% at most EndOffsetUpperBound.
%% Return not_found if there are no such intervals.
get_next_interval(_Table, Offset, RightBound) when Offset >= RightBound ->
get_next_interval(_Table, Offset, EndOffsetUpperBound) when Offset >= EndOffsetUpperBound ->
not_found;
get_next_interval(Table, Offset, RightBound) ->
get_next_interval(Table, Offset, EndOffsetUpperBound) ->
case ets:next(Table, Offset) of
'$end_of_table' ->
not_found;
NextOffset ->
case ets:lookup(Table, NextOffset) of
[{_NextOffset, Start}] when Start >= RightBound ->
[{_NextOffset, Start}] when Start >= EndOffsetUpperBound ->
not_found;
[{NextOffset, Start}] ->
{min(NextOffset, RightBound), Start};
{min(NextOffset, EndOffsetUpperBound), Start};
[] ->
%% The key should have been just removed, unlucky timing.
get_next_interval(Table, Offset, RightBound)
get_next_interval(Table, Offset, EndOffsetUpperBound)
end
end.

Expand Down
7 changes: 4 additions & 3 deletions apps/arweave/src/ar_events_sup.erl
Expand Up @@ -34,7 +34,8 @@ init([]) ->
{ok, {{one_for_one, 5, 10}, [
%% Events: remaining_disk_space.
?CHILD(ar_events, disksup, worker),
%% Events: new, ready_for_mining, dropped.
%% Events: new, ready_for_mining, orphaned, emitting_scheduled,
%% preparing_unblacklisting, ready_for_unblacklisting, registered_offset.
?CHILD(ar_events, tx, worker),
%% Events: discovered, rejected, new, double_signing.
?CHILD(ar_events, block, worker),
Expand All @@ -51,8 +52,8 @@ init([]) ->
?CHILD(ar_events, miner, worker),
%% Events: removed_file.
?CHILD(ar_events, chunk_storage, worker),
%% Events: add_range, remove_range, cut.
?CHILD(ar_events, data_sync, worker),
%% Events: add_range, remove_range, global_remove_range, cut, global_cut.
?CHILD(ar_events, sync_record, worker),
%% Events: rejected, stale, partial, accepted.
?CHILD(ar_events, solution, worker),
%% Used for the testing purposes.
Expand Down
10 changes: 5 additions & 5 deletions apps/arweave/src/ar_global_sync_record.erl
Expand Up @@ -70,7 +70,7 @@ get_serialized_sync_buckets() ->

init([]) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(data_sync),
ok = ar_events:subscribe(sync_record),
{ok, Config} = application:get_env(arweave, config),
SyncRecord =
lists:foldl(
Expand Down Expand Up @@ -127,25 +127,25 @@ handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_info({event, data_sync, {add_range, Start, End, _StoreID}}, State) ->
handle_info({event, sync_record, {add_range, Start, End, ar_data_sync, _StoreID}}, State) ->
#state{ sync_record = SyncRecord, sync_buckets = SyncBuckets } = State,
SyncRecord2 = ar_intervals:add(SyncRecord, End, Start),
SyncBuckets2 = ar_sync_buckets:add(End, Start, SyncBuckets),
{noreply, State#state{ sync_record = SyncRecord2, sync_buckets = SyncBuckets2 }};

handle_info({event, data_sync, {cut, Offset}}, State) ->
handle_info({event, sync_record, {global_cut, Offset}}, State) ->
#state{ sync_record = SyncRecord, sync_buckets = SyncBuckets } = State,
SyncRecord2 = ar_intervals:cut(SyncRecord, Offset),
SyncBuckets2 = ar_sync_buckets:cut(Offset, SyncBuckets),
{noreply, State#state{ sync_record = SyncRecord2, sync_buckets = SyncBuckets2 }};

handle_info({event, data_sync, {remove_range, Start, End}}, State) ->
handle_info({event, sync_record, {global_remove_range, Start, End}}, State) ->
#state{ sync_record = SyncRecord, sync_buckets = SyncBuckets } = State,
SyncRecord2 = ar_intervals:delete(SyncRecord, End, Start),
SyncBuckets2 = ar_sync_buckets:delete(End, Start, SyncBuckets),
{noreply, State#state{ sync_record = SyncRecord2, sync_buckets = SyncBuckets2 }};

handle_info({event, data_sync, _}, State) ->
handle_info({event, sync_record, _}, State) ->
{noreply, State};

handle_info(Message, State) ->
Expand Down
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_mining_io.erl
Expand Up @@ -30,14 +30,14 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

set_largest_seen_upper_bound(PartitionUpperBound) ->
gen_server:call(?MODULE, {set_largest_seen_upper_bound, PartitionUpperBound}, 30000).
gen_server:call(?MODULE, {set_largest_seen_upper_bound, PartitionUpperBound}, 60000).

get_partitions() ->
gen_server:call(?MODULE, get_partitions, 30000).
gen_server:call(?MODULE, get_partitions, 60000).

read_recall_range(WhichChunk, Worker, Candidate, RecallRangeStart) ->
gen_server:call(?MODULE,
{read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart}, 30000).
{read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart}, 60000).

get_partitions(PartitionUpperBound) when PartitionUpperBound =< 0 ->
[];
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_node_worker.erl
Expand Up @@ -1733,6 +1733,7 @@ return_orphaned_txs_to_mempool(H, H) ->
return_orphaned_txs_to_mempool(H, BaseH) ->
#block{ txs = TXs, previous_block = PrevH } = ar_block_cache:get(block_cache, H),
lists:foreach(fun(TX) ->
ar_events:send(tx, {orphaned, TX}),
ar_events:send(tx, {ready_for_mining, TX}),
%% Add it to the mempool here even though have triggered an event - processes
%% do not handle their own events.
Expand Down

0 comments on commit 703c908

Please sign in to comment.