Skip to content

Commit

Permalink
Reduce memory footprint (#227)
Browse files Browse the repository at this point in the history
This PR includes 2 memory reduction changes:
1. Reduce the memory footprint of `collect_peer_intervals`
2. Reduce the peak memory footprint during node initialization

This PR also adds a new `process_memory` metric which samples the memory footprint of all active erlang processes every 5 seconds. The relatively slow sample rate for this metric and the related `process_functions` metric, minimizes the performance impact of the sampling (it's negligible).

## collect_peer_intervals
Prior to this change the `collect_peer_intervals` process could reserve roughly 500MB of memory per storage_module. This change limits the footprint to about 1MB.

### Root cause
`collect_peer_intervals` queries the syncing intervals advertised by a basket of peers, intersects those intervals with the ranges sought by the node, and then queues up "work" in form of specific chunks to be queried from each peer. Prior to this change `collect_peer_intervals` would try to queue all the chunks to be queried for a complete storage_module. In an extreme case - with 3.6TB of unsynced data - this resulted in about 15,000,000 chunks worth of tasks being queued at once. The `gb_sets` data structure to hold this queue could baloon to 500MB.

### Fix
Now `collect_peer_intervals` only queries one sync bucket worth of work at a time. This means only 10GB of chunks (not 3.6TB) are queued at any one time. More detail:

Considerations when managing the queue length:
1. Periodically `sync_intervals` will pull from Q and send work to `ar_data_sync_worker_master`. We need to make sure Q is long enough so that we never starve `ar_data_sync_worker_master` of work.
4. On the flip side we don't want Q to get so long as to trigger an out-of-memory condition. In the extreme case we could collect and enqueue all chunks in a full 3.6TB storage_module. A Q of this length  would have a roughly 500MB memory footprint per storage_module. For a node that is syncing multiple storage modules, this can add up fast.
5. We also want to make sure we are using the most up to date information  we can. Every time we add a task to the Q we're locking in a specific view of Peer data availability. If that peer goes offline before we get to the task it can result in wasted work or syncing stalls. A shorter queue helps ensure we're always syncing from the "best" peers  at any point in time.

With all that in mind, we'll pause collection once the Q hits roughly a bucket size worth of chunks. This number is slightly arbitrary and we should feel free to adjust as necessary.

Considerations when pausing:
1. When we pause and resume we keep track of the last range collected so we can pick up where we left off.
2. While we are working through the storage_module range we'll maintain a cache of the mapping of peers to their advertised intervals. This ensures we don't query each peer's /data_sync_record endpoint too often. Once we've made a full pass through the range, we'll clear the cache so that we can incorporate any peer interval changes the next time through.

## Peak memory during initialization
Previously `ar_node_worker` was using `ar_events:send` to cast the Blocks list. The Blocks list can get big (~175MB), and any process that subscribes to 1node_state1 would receive the list in its mailbox - even if it did not handle the message. Having dozens of processes all loading 175MB at the same time could sometimes exceed the resident memory capacity and trigger an OOM.

The fix is to have `ar_node_worker` cast an empty `initializing` message and any process that cares about the Blocks list can load it from `ets`
  • Loading branch information
JamesPiechota committed May 31, 2023
1 parent 3f02c3d commit be9b87c
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 106 deletions.
2 changes: 2 additions & 0 deletions apps/arweave/include/ar.hrl
Expand Up @@ -7,6 +7,8 @@
%% (e.g. bin/test or bin/shell)
-define(IS_TEST, erlang:get_cookie() == test).

-define(DATA_SIZE(Term), erlang:byte_size(term_to_binary(Term))).

%% The mainnet name. Does not change at the hard forks.
-ifndef(NETWORK_NAME).
-ifdef(DEBUG).
Expand Down
8 changes: 4 additions & 4 deletions apps/arweave/include/ar_data_discovery.hrl
Expand Up @@ -3,23 +3,23 @@
%% to have some data there are asked for the intervals they have and check which of them
%% cross the desired interval.
-ifdef(DEBUG).
-define(NETWORK_DATA_BUCKET_SIZE, 10000000).
-define(NETWORK_DATA_BUCKET_SIZE, 10_000_000). % 10 MB
-else.
-define(NETWORK_DATA_BUCKET_SIZE, 10000000000). % 10 GB
-define(NETWORK_DATA_BUCKET_SIZE, 10_000_000_000). % 10 GB
-endif.

%% The maximum number of synced intervals shared with peers.
-ifdef(DEBUG).
-define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 20).
-else.
-define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 10000).
-define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 10_000).
-endif.

%% The upper limit for the size of a sync record serialized using Erlang Term Format.
-define(MAX_ETF_SYNC_RECORD_SIZE, 80 * ?MAX_SHARED_SYNCED_INTERVALS_COUNT).

%% The upper limit for the size of the serialized (in Erlang Term Format) sync buckets.
-define(MAX_SYNC_BUCKETS_SIZE, 100000).
-define(MAX_SYNC_BUCKETS_SIZE, 100_000).

%% How many peers with the biggest synced shares in the given bucket to query per bucket
%% per sync job iteration.
Expand Down
8 changes: 3 additions & 5 deletions apps/arweave/include/ar_data_sync.hrl
Expand Up @@ -49,10 +49,6 @@
%% The frequency of storing the server state on disk.
-define(STORE_STATE_FREQUENCY_MS, 30000).

%% Do not repeat the missing interval collection procedure while there are more than this
%% number of intervals in the queue.
-define(SYNC_INTERVALS_MAX_QUEUE_SIZE, 10).

%% The maximum number of chunks currently being downloaded or processed.
-ifdef(DEBUG).
-define(SYNC_BUFFER_SIZE, 100).
Expand Down Expand Up @@ -213,5 +209,7 @@
%% The threshold controlling the brief accumuluation of the chunks in the queue before
%% the actual disk dump, to reduce the chance of out-of-order write causing disk
%% fragmentation.
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD,
%% Cache mapping peers to /data_sync_record responses
all_peers_intervals = #{}
}).
13 changes: 12 additions & 1 deletion apps/arweave/src/ar_data_discovery.erl
Expand Up @@ -57,14 +57,24 @@ get_bucket_peers(Bucket, Cursor, Peers) ->
case ets:next(?MODULE, Cursor) of
'$end_of_table' ->
UniquePeers = sets:to_list(sets:from_list(Peers)),
pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT);
PickedPeers = pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT),
?LOG_DEBUG([
{event, get_bucket_peers},
{pid, self()},
{bucket, Bucket},
{peers, length(Peers)},
{unique_peers, length(UniquePeers)},
{picked_peers, length(PickedPeers)}
]),
PickedPeers;
{Bucket, _Share, Peer} = Key ->
get_bucket_peers(Bucket, Key, [Peer | Peers]);
_ ->
UniquePeers = sets:to_list(sets:from_list(Peers)),
PickedPeers = pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT),
?LOG_DEBUG([
{event, get_bucket_peers},
{pid, self()},
{bucket, Bucket},
{peers, length(Peers)},
{unique_peers, length(UniquePeers)},
Expand Down Expand Up @@ -115,6 +125,7 @@ handle_cast(update_network_data_map, #state{ peers_pending = N } = State)
get_sync_buckets(Peer);
Error ->
?LOG_DEBUG([{event, failed_to_fetch_sync_buckets},
{peer, ar_util:format_peer(Peer)},
{reason, io_lib:format("~p", [Error])}])
end
end
Expand Down
202 changes: 121 additions & 81 deletions apps/arweave/src/ar_data_sync.erl
Expand Up @@ -24,7 +24,7 @@
-ifdef(DEBUG).
-define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 5000).
-else.
-define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 300000).
-define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 300_000).
-endif.

%%%===================================================================
Expand Down Expand Up @@ -707,13 +707,38 @@ handle_cast({pack_and_store_chunk, Args} = Cast,
end;

%% Schedule syncing of the unsynced intervals. Choose a peer for each of the intervals.
%% There are two message payloads:
%% 1. collect_peer_intervals
%% Start the collection process over the full storage_module range.
%% 2. {collect_peer_intervals, Start, End}
%% Collect intervals for the specified range. This interface is used to pick up where
%% we left off after a pause. There are 2 main conditions that can trigger a pause:
%% a. Insufficient disk space. Will pause until disk space frees up
%% b. Sync queue is busy. Will pause until previously queued intervals are scheduled to the
%% ar_data_sync_worker_master for syncing.
handle_cast(collect_peer_intervals, State) ->
#sync_data_state{ range_start = Start, range_end = End, sync_intervals_queue = Q,
store_id = StoreID, disk_pool_threshold = DiskPoolThreshold } = State,
#sync_data_state{ range_start = Start, range_end = End } = State,
gen_server:cast(self(), {collect_peer_intervals, Start, End}),
{noreply, State};

handle_cast({collect_peer_intervals, Start, End}, State) when Start >= End ->
#sync_data_state{ store_id = StoreID } = State,
?LOG_DEBUG([{event, collect_peer_intervals_end}, {pid, self()}, {store_id, StoreID},
{start, Start}]),
%% We've finished collecting intervals for the whole storage_module range. Schedule
%% the collection process to restart in ?COLLECT_SYNC_INTERVALS_FREQUENCY_MS and
%% clear the all_peers_intervals cache so we can start fresh and requery peers for
%% their advertised intervals.
ar_util:cast_after(?COLLECT_SYNC_INTERVALS_FREQUENCY_MS, self(), collect_peer_intervals),
{noreply, State#sync_data_state{ all_peers_intervals = #{} }};
handle_cast({collect_peer_intervals, Start, End}, State) ->
#sync_data_state{ sync_intervals_queue = Q,
store_id = StoreID, disk_pool_threshold = DiskPoolThreshold,
all_peers_intervals = AllPeersIntervals } = State,
IsJoined =
case ar_node:is_joined() of
false ->
ar_util:cast_after(1000, self(), collect_peer_intervals),
ar_util:cast_after(1000, self(), {collect_peer_intervals, Start, End}),
false;
true ->
true
Expand All @@ -727,7 +752,7 @@ handle_cast(collect_peer_intervals, State) ->
true ->
true;
_ ->
ar_util:cast_after(30000, self(), collect_peer_intervals),
ar_util:cast_after(30_000, self(), {collect_peer_intervals, Start, End}),
false
end
end,
Expand All @@ -736,51 +761,56 @@ handle_cast(collect_peer_intervals, State) ->
false ->
true;
true ->
case gb_sets:size(Q) > ?SYNC_INTERVALS_MAX_QUEUE_SIZE of
%% Q is the number of chunks that we've already queued for syncing. We need
%% to manage the queue length.
%% 1. Periodically sync_intervals will pull from Q and send work to
%% ar_data_sync_worker_master. We need to make sure Q is long enough so
%% that we never starve ar_data_sync_worker_master of work.
%% 2. On the flip side we don't want Q to get so long as to trigger an
%% out-of-memory condition. In the extreme case we could collect and
%% enqueue all chunks in a full 3.6TB storage_module. A Q of this length
%% would have a roughly 500MB memory footprint per storage_module. For a
%% node that is syncing multiple storage modules, this can add up fast.
%% 3. We also want to make sure we are using the most up to date information
%% we can. Every time we add a task to the Q we're locking in a specific
%% view of Peer data availability. If that peer goes offline before we
%% get to the task it can result in wasted work or syncing stalls. A
%% shorter queue helps ensure we're always syncing from the "best" peers
%% at any point in time.
%%
%% With all that in mind, we'll pause collection once the Q hits roughly
%% a bucket size worth of chunks. This number is slightly arbitrary and we
%% should feel free to adjust as necessary.
case gb_sets:size(Q) > (?NETWORK_DATA_BUCKET_SIZE / ?DATA_CHUNK_SIZE) of
true ->
ar_util:cast_after(500, self(), collect_peer_intervals),
ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End}),
true;
false ->
false
end
end,
IsBelowDiskPoolThreshold =
AllPeersIntervals2 =
case IsSyncQueueBusy of
true ->
false;
AllPeersIntervals;
false ->
case Start >= DiskPoolThreshold of
true ->
ar_util:cast_after(500, self(), collect_peer_intervals),
false;
ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End}),
AllPeersIntervals;
false ->
true
%% All checks have pased, find and enqueue intervals for one
%% sync bucket worth of chunks starting at offset Start
find_peer_intervals(
Start, min(End, DiskPoolThreshold), StoreID, AllPeersIntervals)
end
end,
case IsBelowDiskPoolThreshold of
false ->
ok;
true ->
{ok, Config} = application:get_env(arweave, config),
SyncWorkers = Config#config.sync_jobs,
%% We do not want to bother peers if we still have a lot of data scheduled for
%% local copying and repacking.
case ar_data_sync_worker_master:get_total_task_count() > SyncWorkers * 2 of
true ->
ar_util:cast_after(1000, self(), collect_peer_intervals);
false ->
Self = self(),
monitor(process, spawn(
fun() ->
find_peer_intervals(Start, min(End, DiskPoolThreshold), StoreID,
Self, #{}),
ar_util:cast_after(?COLLECT_SYNC_INTERVALS_FREQUENCY_MS, Self,
collect_peer_intervals)
end
))
end
end,
{noreply, State};
%% While we are working through the storage_module range we'll maintain a cache
%% of the mapping of peers to their advertised intervals. This ensures we don't query
%% each peer's /data_sync_record endpoint too often. Once we've made a full pass through
%% the range, we'll clear the cache so that we can incorporate any peer interval changes
%% the next time through.
{noreply, State#sync_data_state{ all_peers_intervals = AllPeersIntervals2 }};

handle_cast({enqueue_intervals, []}, State) ->
{noreply, State};
Expand All @@ -802,7 +832,7 @@ handle_cast({enqueue_intervals, Intervals}, State) ->

%% This is an approximation. The intent is to enqueue one sync_bucket at a time - but
%% due to the selection of each peer's intervals, the total number of bytes may be
%% less than a full sync_bucket. But for the purposes of distrubiting requests among
%% less than a full sync_bucket. But for the purposes of distributing requests among
%% many peers - the approximation is fine (and much cheaper to calculate than taking
%% the sum of all the peer intervals).
TotalChunksToEnqueue = ?DEFAULT_SYNC_BUCKET_SIZE div ?DATA_CHUNK_SIZE,
Expand All @@ -815,6 +845,10 @@ handle_cast({enqueue_intervals, Intervals}, State) ->
{Q2, QIntervals2} = enqueue_intervals(
ar_util:shuffle_list(Intervals), ChunksPerPeer, {Q, QIntervals}),

?LOG_DEBUG([{event, enqueue_intervals}, {pid, self()},
{queue_before, gb_sets:size(Q)}, {queue_after, gb_sets:size(Q2)},
{num_peers, NumPeers}, {chunks_per_peer, ChunksPerPeer}]),

{noreply, State#sync_data_state{ sync_intervals_queue = Q2,
sync_intervals_queue_intervals = QIntervals2 }};
handle_cast(sync_intervals, State) ->
Expand Down Expand Up @@ -2154,20 +2188,39 @@ get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeS
end
end.

find_peer_intervals(Start, End, _StoreID, _Self, _AllPeersIntervals) when Start >= End ->
ok;
find_peer_intervals(Start, End, StoreID, Self, AllPeersIntervals) ->
find_peer_intervals(Start, End, StoreID, _AllPeersIntervals) when Start >= End ->
%% We've reached the end of the range, next time through we'll start with a clear cache.
?LOG_DEBUG([{event, find_peer_intervals_end}, {pid, self()}, {store_id, StoreID},
{start, Start}]),
#{};
find_peer_intervals(Start, End, StoreID, AllPeersIntervals) ->
Start2 = Start - Start rem ?NETWORK_DATA_BUCKET_SIZE,
End2 = min(Start2 + ?NETWORK_DATA_BUCKET_SIZE, End),
UnsyncedIntervals = get_unsynced_intervals(Start, End2, StoreID),
AllPeersIntervals2 =
case ar_intervals:is_empty(UnsyncedIntervals) of

Bucket = Start div ?NETWORK_DATA_BUCKET_SIZE,
{ok, Config} = application:get_env(arweave, config),
Peers =
case Config#config.sync_from_local_peers_only of
true ->
AllPeersIntervals;
Config#config.local_peers;
false ->
find_peer_intervals2(Start, UnsyncedIntervals, Self, AllPeersIntervals)
ar_data_discovery:get_bucket_peers(Bucket)
end,
find_peer_intervals(End2, End, StoreID, Self, AllPeersIntervals2).
?LOG_DEBUG([{event, find_peer_intervals}, {pid, self()}, {store_id, StoreID},
{start, Start}, {bucket, Bucket},
{peers, io_lib:format("~p", [[ar_util:format_peer(Peer) || Peer <- Peers]])}]),

%% Schedule the next sync bucket. The cast handler logic will pause collection if needed.
gen_server:cast(self(), {collect_peer_intervals, End2, End}),

%% The updated AllPeersIntervals cache is returned so it can be added to the State
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
AllPeersIntervals;
false ->
find_peer_intervals2(Start, Peers, UnsyncedIntervals, AllPeersIntervals)
end.

%% @doc Collect the unsynced intervals between Start and End excluding the blocklisted
%% intervals.
Expand All @@ -2193,24 +2246,7 @@ get_unsynced_intervals(Start, End, Intervals, StoreID) ->
end
end.

find_peer_intervals2(Start, UnsyncedIntervals, Self, AllPeersIntervals) ->
Bucket = Start div ?NETWORK_DATA_BUCKET_SIZE,
{ok, Config} = application:get_env(arweave, config),
Peers =
case Config#config.sync_from_local_peers_only of
true ->
Config#config.local_peers;
false ->
ar_data_discovery:get_bucket_peers(Bucket)
end,
case ar_intervals:is_empty(UnsyncedIntervals) of
true ->
AllPeersIntervals;
false ->
find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers)
end.

find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) ->
find_peer_intervals2(Start, Peers, UnsyncedIntervals, AllPeersIntervals) ->
Intervals =
ar_util:pmap(
fun(Peer) ->
Expand All @@ -2226,22 +2262,6 @@ find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) -
end,
Peers
),
AllPeersIntervals2 =
lists:foldl(
fun ({Peer, _, PeerIntervals, Left}, Acc) ->
case ar_intervals:is_empty(PeerIntervals) of
true ->
Acc;
false ->
Right = element(1, ar_intervals:largest(PeerIntervals)),
maps:put(Peer, {Right, Left, PeerIntervals}, Acc)
end;
(_, Acc) ->
Acc
end,
AllPeersIntervals,
Intervals
),
EnqueueIntervals =
lists:foldl(
fun ({Peer, SoughtIntervals, _, _}, Acc) ->
Expand All @@ -2257,8 +2277,24 @@ find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) -
[],
Intervals
),
gen_server:cast(Self, {enqueue_intervals, EnqueueIntervals}),
AllPeersIntervals2.
gen_server:cast(self(), {enqueue_intervals, EnqueueIntervals}),

%% Update and return AllPeersIntervals
lists:foldl(
fun ({Peer, _, PeerIntervals, Left}, Acc) ->
case ar_intervals:is_empty(PeerIntervals) of
true ->
Acc;
false ->
Right = element(1, ar_intervals:largest(PeerIntervals)),
maps:put(Peer, {Right, Left, PeerIntervals}, Acc)
end;
(_, Acc) ->
Acc
end,
AllPeersIntervals,
Intervals
).

%% @doc
%% @return {ok, Intervals, PeerIntervals, Left} | Error
Expand Down Expand Up @@ -2290,6 +2326,10 @@ enqueue_intervals([{Peer, Intervals} | Rest], ChunksToEnqueue, {Q, QIntervals})
enqueue_intervals(Rest, ChunksToEnqueue, {Q2, QIntervals2}).

enqueue_peer_intervals(Peer, Intervals, ChunksToEnqueue, {Q, QIntervals}) ->
?LOG_DEBUG([{event, enqueue_peer_intervals}, {pid, self()},
{peer, ar_util:format_peer(Peer)}, {num_intervals, gb_sets:size(Intervals)},
{chunks_to_enqueue, ChunksToEnqueue}]),

%% Only keep unique intervals. We may get some duplicates for two
%% reasons:
%% 1) find_peer_intervals might choose the same interval several
Expand Down

0 comments on commit be9b87c

Please sign in to comment.