Skip to content

Commit

Permalink
Keep track of peers' txs to speed up distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
ldmberman committed Aug 20, 2021
1 parent 3c70e88 commit 474484b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 45 deletions.
54 changes: 15 additions & 39 deletions apps/arweave/src/ar_http_iface_client.erl
Expand Up @@ -35,45 +35,21 @@

%% @doc Send a new transaction to an Arweave HTTP node.
send_new_tx(Peer, TX) ->
case byte_size(TX#tx.data) of
_Size when _Size < ?TX_SEND_WITHOUT_ASKING_SIZE_LIMIT ->
do_send_new_tx(Peer, TX);
_ ->
case has_tx(Peer, TX#tx.id) of
doesnt_have_tx -> do_send_new_tx(Peer, TX);
has_tx -> not_sent;
error -> not_sent
end
end.

do_send_new_tx(Peer, TX) ->
TXSize = byte_size(TX#tx.data),
ar_http:req(#{
method => post,
peer => Peer,
path => "/tx",
headers => p2p_headers() ++ [{<<"arweave-tx-id">>, ar_util:encode(TX#tx.id)}],
body => ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX)),
connect_timeout => 500,
timeout => max(3, min(60, TXSize * 8 div ?TX_PROPAGATION_BITS_PER_SECOND)) * 1000
}).

%% @doc Check whether a peer has a given transaction
has_tx(Peer, ID) ->
case
ar_http:req(#{
method => get,
peer => Peer,
path => "/tx/" ++ binary_to_list(ar_util:encode(ID)) ++ "/id",
headers => p2p_headers(),
connect_timeout => 500,
timeout => 3 * 1000
})
of
{ok, {{<<"200">>, _}, _, _, _, _}} -> has_tx;
{ok, {{<<"202">>, _}, _, _, _, _}} -> has_tx; % In the mempool
{ok, {{<<"404">>, _}, _, _, _, _}} -> doesnt_have_tx;
_ -> error
TXID = TX#tx.id,
case ets:member(peer_txid, {Peer, TXID}) of
true ->
not_sent;
false ->
TXSize = byte_size(TX#tx.data),
ar_http:req(#{
method => post,
peer => Peer,
path => "/tx",
headers => p2p_headers() ++ [{<<"arweave-tx-id">>, ar_util:encode(TXID)}],
body => ar_serialize:jsonify(ar_serialize:tx_to_json_struct(TX)),
connect_timeout => 500,
timeout => max(3, min(60, TXSize * 8 div ?TX_PROPAGATION_BITS_PER_SECOND)) * 1000
})
end.

%% @doc Distribute a newly found block to remote nodes.
Expand Down
19 changes: 16 additions & 3 deletions apps/arweave/src/ar_http_iface_middleware.erl
Expand Up @@ -454,7 +454,8 @@ handle(<<"POST">>, [<<"tx">>], Req, Pid) ->
case post_tx_parse_id({Req, Pid}) of
{error, invalid_hash, Req2} ->
{400, #{}, <<"Invalid hash.">>, Req2};
{error, tx_already_processed, Req2} ->
{error, tx_already_processed, TXID, Req2} ->
register_peer_mempool_tx(TXID, Req),
{208, #{}, <<"Transaction already processed.">>, Req2};
{error, invalid_json, Req2} ->
{400, #{}, <<"Invalid JSON.">>, Req2};
Expand All @@ -464,6 +465,7 @@ handle(<<"POST">>, [<<"tx">>], Req, Pid) ->
Peer = ar_http_util:arweave_peer(Req),
case handle_post_tx(Req, Peer, TX) of
ok ->
register_peer_mempool_tx(TX#tx.id, Req),
{200, #{}, <<"OK">>, Req};
{error_response, {Status, Headers, Body}} ->
ar_ignore_registry:remove_temporary(TX#tx.id),
Expand Down Expand Up @@ -871,6 +873,7 @@ handle(<<"GET">>, [<<"tx">>, Hash, Field], Req, _Pid) ->
{error, ID, unavailable} ->
case is_a_pending_tx(ID) of
true ->
register_peer_mempool_tx(ID, Req),
{202, #{}, <<"Pending">>, Req};
false ->
{404, #{}, <<"Not Found.">>, Req}
Expand Down Expand Up @@ -1364,6 +1367,16 @@ handle_post_chunk(validate_proof, Proof, Req) ->
{503, #{}, jiffy:encode(#{ error => timeout }), Req}
end.

register_peer_mempool_tx(TXID, Req) ->
case cowboy_req:header(<<"x-p2p-port">>, Req, not_set) of
not_set ->
ok;
_ ->
Peer = ar_http_util:arweave_peer(Req),
ets:insert(timestamp_peer_txid, {os:system_time(microsecond), {Peer, TXID}}),
ets:insert(peer_txid, {{Peer, TXID}, ok})
end.

check_internal_api_secret(Req) ->
Reject = fun(Msg) ->
log_internal_api_reject(Msg, Req),
Expand Down Expand Up @@ -1958,7 +1971,7 @@ post_tx_parse_id(check_body, {Req, Pid}) ->
post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, FirstChunk}) ->
case ar_ignore_registry:member(TXID) of
true ->
{error, tx_already_processed, Req};
{error, tx_already_processed, TXID, Req};
false ->
ar_ignore_registry:add_temporary(TXID, 500),
post_tx_parse_id(read_body, {TXID, Req, Pid, FirstChunk})
Expand Down Expand Up @@ -2010,7 +2023,7 @@ post_tx_parse_id(verify_id_match, {MaybeTXID, Req, TX}) ->
false ->
case ar_ignore_registry:member(TXID) of
true ->
{error, tx_already_processed, Req};
{error, tx_already_processed, TXID, Req};
false ->
ar_ignore_registry:add_temporary(TXID, 500),
{ok, TX}
Expand Down
56 changes: 55 additions & 1 deletion apps/arweave/src/ar_node_worker.erl
Expand Up @@ -9,7 +9,7 @@
%%% @end
-module(ar_node_worker).

-export([start_link/0]).
-export([start_link/0, cleanup_peer_mempools/0]).

-export([init/1, handle_cast/2, handle_info/2, terminate/2, tx_mempool_size/1]).

Expand All @@ -27,13 +27,61 @@

-define(FILTER_MEMPOOL_CHUNK_SIZE, 100).

-ifdef(DEBUG).
-define(CLEANUP_PEER_MEMPOOLS_FREQUENCY_MS, 1000).
-else.
-define(CLEANUP_PEER_MEMPOOLS_FREQUENCY_MS, 30 * 1000).
-endif.

-ifdef(DEBUG).
-define(PEER_MEMPOOLS_MAX_SIZE, 4).
-else.
-define(PEER_MEMPOOLS_MAX_SIZE, 1000000).
-endif.

-ifdef(DEBUG).
-define(PEER_MEMPOOL_TXID_EXPIRATION_TIME_US, 2 * 1000000).
-else.
-define(PEER_MEMPOOL_TXID_EXPIRATION_TIME_US, 60 * 1000000).
-endif.

%%%===================================================================
%%% Public interface.
%%%===================================================================

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

cleanup_peer_mempools() ->
Size = ets:info(timestamp_peer_txid, size),
cleanup_peer_mempools(ets:first(timestamp_peer_txid), Size - ?PEER_MEMPOOLS_MAX_SIZE).

cleanup_peer_mempools('$end_of_table', _N) ->
ok;
cleanup_peer_mempools(Timestamp, N) when N > 0 ->
case ets:lookup(timestamp_peer_txid, Timestamp) of
[{_, PeerTXID}] ->
ets:delete(peer_txid, PeerTXID),
ets:delete(timestamp_peer_txid, Timestamp),
cleanup_peer_mempools(ets:next(timestamp_peer_txid, Timestamp), N - 1);
[] ->
ok
end;
cleanup_peer_mempools(Timestamp, N) ->
case Timestamp + ?PEER_MEMPOOL_TXID_EXPIRATION_TIME_US > os:system_time(microsecond) of
true ->
ok;
false ->
case ets:lookup(timestamp_peer_txid, Timestamp) of
[{_, PeerTXID}] ->
ets:delete(peer_txid, PeerTXID),
ets:delete(timestamp_peer_txid, Timestamp),
cleanup_peer_mempools(ets:next(timestamp_peer_txid, Timestamp), N);
[] ->
ok
end
end.

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
Expand All @@ -46,6 +94,12 @@ init([]) ->
ar_randomx_state:start_block_polling(),
%% Read persisted mempool.
load_mempool(),
timer:apply_interval(
?CLEANUP_PEER_MEMPOOLS_FREQUENCY_MS,
ar_node_worker,
cleanup_peer_mempools,
[]
),
%% Join the network.
{ok, Config} = application:get_env(arweave, config),
BI =
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_sup.erl
Expand Up @@ -31,6 +31,8 @@ init([]) ->
ets:new(ar_storage, [set, public, named_table, {read_concurrency, true}]),
ets:new(blacklist, [set, public, named_table]),
ets:new(ignored_ids, [bag, public, named_table]),
ets:new(peer_txid, [set, public, named_table, {write_concurrency, true}]),
ets:new(timestamp_peer_txid, [ordered_set, public, named_table, {write_concurrency, true}]),
ets:new(ar_tx_db, [set, public, named_table]),
ets:new(ar_header_sync, [set, public, named_table, {read_concurrency, true}]),
ets:new(ar_data_sync_state, [set, public, named_table, {read_concurrency, true}]),
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/test/ar_header_sync_tests.erl
Expand Up @@ -58,7 +58,7 @@ test_syncs_headers() ->
timer:sleep(Config#config.disk_space_check_frequency),
NoSpaceHeight = ?MAX_TX_ANCHOR_DEPTH + 6,
NoSpaceTX = sign_v1_tx(master, Wallet,
#{ data => crypto:strong_rand_bytes(10 * 1024), last_tx => get_tx_anchor() }),
#{ data => random_v1_data(10 * 1024), last_tx => get_tx_anchor() }),
assert_post_tx_to_master(NoSpaceTX),
ar_node:mine(),
[{NoSpaceH, _, _} | _] = wait_until_height(NoSpaceHeight),
Expand Down
8 changes: 7 additions & 1 deletion apps/arweave/test/ar_http_iface_tests.erl
Expand Up @@ -23,14 +23,20 @@ get_info_test() ->
%% @doc Ensure that transactions are only accepted once.
single_regossip_test() ->
ar_test_node:start(no_block),
ar_test_node:slave_start(no_block),
TX = ar_tx:new(),
?assertMatch(
{ok, {{<<"200">>, _}, _, _, _, _}},
ar_http_iface_client:send_new_tx({127, 0, 0, 1, 1984}, TX)
),
?assertMatch(
{ok, {{<<"200">>, _}, _, _, _, _}},
ar_http_iface_client:send_new_tx({127, 0, 0, 1, 1983}, TX)
),
?assertEqual(not_sent, ar_http_iface_client:send_new_tx({127, 0, 0, 1, 1984}, TX)),
?assertMatch(
{ok, {{<<"208">>, _}, _, _, _, _}},
ar_http_iface_client:send_new_tx({127, 0, 0, 1, 1984}, TX)
ar_http_iface_client:send_new_tx({127, 0, 0, 1, 1983}, TX)
).

%% @doc Unjoined nodes should not accept blocks
Expand Down

0 comments on commit 474484b

Please sign in to comment.