Skip to content

Commit 6be7a7f

Browse files
committed
refactor: remove dep from chain subscriber
1 parent 55a7e48 commit 6be7a7f

File tree

7 files changed

+190
-149
lines changed

7 files changed

+190
-149
lines changed

lib/ae_mdw/db/sync/block_index.ex

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,51 @@
11
defmodule AeMdw.Db.Sync.BlockIndex do
2-
@moduledoc "fills :block table from backwards to get {height, nil} -> keyblock_hash"
2+
@moduledoc """
3+
Fills Model.Block table from backwards to save the key block hashes
4+
"""
35

4-
require AeMdw.Db.Model
5-
6-
alias AeMdw.Db.Sync
76
alias AeMdw.Db.Model
7+
alias AeMdw.Node.Chain
8+
alias AeMdw.Log
89

9-
import AeMdw.{Sigil, Util, Db.Util}
10+
require Model
1011

11-
@log_freq 10000
12+
import AeMdw.Db.Util
13+
import AeMdw.Sigil
1214

13-
################################################################################
15+
@log_freq 10_000
1416

15-
def sync(max_height \\ :safe) do
16-
max_height = Sync.height(max_height)
17+
@spec sync(Chain.height()) :: Chain.height()
18+
def sync(max_height) when is_integer(max_height) do
19+
max_height = Chain.checked_height(max_height)
1720
min_height = (max_kbi() || -1) + 1
1821

1922
with true <- max_height >= min_height do
20-
header = :aec_chain.get_key_header_by_height(max_height) |> ok!
21-
hash = :aec_headers.hash_header(header) |> ok!
22-
syncer = &sync_key_header(~t[block], &1, &2)
23-
tracker = Sync.progress_logger(syncer, @log_freq, &log_msg/2)
24-
:mnesia.transaction(fn -> max_height..min_height |> Enum.reduce(hash, tracker) end)
23+
{:ok, header} = :aec_chain.get_key_header_by_height(max_height)
24+
{:ok, hash} = :aec_headers.hash_header(header)
25+
26+
:mnesia.transaction(fn -> sync_range(max_height..min_height, hash) end)
2527
end
2628

2729
max_kbi()
2830
end
2931

30-
def min_kbi(), do: kbi(&first/1)
32+
@spec max_kbi() :: Chain.height() | nil
3133
def max_kbi(), do: kbi(&last/1)
3234

35+
@spec clear() :: :ok
3336
def clear(),
3437
do: :mnesia.clear_table(~t[block])
3538

36-
################################################################################
39+
#
40+
# Private functions
41+
#
42+
defp sync_range(max_height..min_height, block_hash) do
43+
Enum.reduce(max_height..min_height, block_hash, fn height, next_hash ->
44+
if rem(height, @log_freq) == 0, do: Log.info("syncing block index at #{height}")
45+
46+
sync_key_header(~t[block], height, next_hash)
47+
end)
48+
end
3749

3850
defp sync_key_header(table, height, hash) do
3951
{:ok, kh} = :aec_chain.get_header(hash)
@@ -50,7 +62,4 @@ defmodule AeMdw.Db.Sync.BlockIndex do
5062
{kbi, -1} -> kbi
5163
end
5264
end
53-
54-
defp log_msg(height, _hash),
55-
do: "syncing block index at #{height}"
5665
end
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
defmodule AeMdw.Db.Sync.ChainSubscriber do
2+
@moduledoc """
3+
Listens to sync chain events and spawns processes to sync until the height notified.
4+
"""
5+
use GenServer
6+
7+
alias AeMdw.Db.Model
8+
alias AeMdw.Db.Sync.BlockIndex
9+
alias AeMdw.Db.Sync.Invalidate
10+
alias AeMdw.Db.Sync.Transaction
11+
alias AeMdw.Log
12+
alias AeMdw.Sync.Watcher
13+
alias __MODULE__, as: State
14+
15+
@typep state :: %State{
16+
pid: pid() | nil,
17+
fork: integer() | nil
18+
}
19+
20+
defstruct [:pid, :fork]
21+
22+
require Model
23+
24+
import AeMdw.Node.Chain, only: [top_height: 0]
25+
26+
@verify_range_kbs 200
27+
28+
################################################################################
29+
30+
@spec start_link([]) :: GenServer.on_start()
31+
def start_link([]),
32+
do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
33+
34+
@spec init([]) :: {:ok, state(), {:continue, :start_sync}}
35+
@impl GenServer
36+
def init([]) do
37+
:ets.delete_all_objects(:stat_sync_cache)
38+
:aec_events.subscribe(:chain)
39+
Watcher.notify_sync(self())
40+
{:ok, %State{}, {:continue, :start_sync}}
41+
end
42+
43+
@impl GenServer
44+
def handle_continue(:start_sync, %State{pid: nil} = s),
45+
do: {:noreply, spawn_action({Transaction, :sync, [safe_height()]}, s)}
46+
47+
@impl GenServer
48+
def handle_info({:fork, height}, %State{pid: pid} = s) when is_integer(height) do
49+
s = %{s | fork: fork_height(height, s.fork)}
50+
{:noreply, (pid && s) || spawn_action(s)}
51+
end
52+
53+
@impl GenServer
54+
def handle_info({_, :chain, %{info: {:fork, header}}}, %State{} = s),
55+
do: handle_info({:fork, :aec_headers.height(header)}, s)
56+
57+
@impl GenServer
58+
def handle_info({_, :chain, %{info: {:generation, _}}}, %State{pid: pid} = s),
59+
do: {:noreply, (pid && s) || spawn_action(s)}
60+
61+
@impl GenServer
62+
def handle_info({pid, _act, _res}, %State{pid: pid, fork: fork} = s) when not is_nil(fork),
63+
do: {:noreply, spawn_action(%{s | pid: nil})}
64+
65+
@impl GenServer
66+
def handle_info({pid, _, _next_txi}, %State{pid: pid, fork: nil} = s) do
67+
bi_max_kbi = BlockIndex.max_kbi()
68+
is_synced? = bi_max_kbi == top_height()
69+
next_state = %{s | pid: nil}
70+
{:noreply, (is_synced? && next_state) || spawn_action(next_state)}
71+
end
72+
73+
#
74+
# Private functions
75+
#
76+
defp safe_height(),
77+
do: max(0, top_height() - @verify_range_kbs)
78+
79+
defp spawn_action(%State{pid: nil, fork: nil} = s),
80+
do: spawn_action({Transaction, :sync, [top_height() - 1]}, s)
81+
82+
defp spawn_action(%State{pid: nil, fork: height} = s) when not is_nil(height) do
83+
Log.info("invalidation #{height}")
84+
Invalidate.invalidate(height)
85+
spawn_action({Transaction, :sync, [top_height()]}, %{s | fork: nil})
86+
end
87+
88+
defp spawn_action({m, f, a}, %State{} = s) do
89+
Log.info("sync action #{inspect(hd(a))}")
90+
%{s | pid: spawn_link(fn -> run_action({m, f, a}) end)}
91+
end
92+
93+
defp run_action({m, f, a} = action) do
94+
result = apply(m, f, a)
95+
send(__MODULE__, {self(), action, result})
96+
end
97+
98+
defp fork_height(height1, height2) when is_integer(height1) do
99+
case {height1, height2} do
100+
{_, nil} -> height1
101+
{h1, h2} -> min(h1, h2)
102+
end
103+
end
104+
end

lib/ae_mdw/db/sync/supervisor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule AeMdw.Db.Sync.Supervisor do
1212
@impl true
1313
def init(_args) do
1414
children = [
15-
AeMdw.Db.Sync,
15+
AeMdw.Db.Sync.ChainSubscriber,
1616
AeMdw.Db.Sync.ForkDetector
1717
]
1818

lib/ae_mdw/db/sync/sync.ex

Lines changed: 0 additions & 109 deletions
This file was deleted.

lib/ae_mdw/db/sync/transaction.ex

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ defmodule AeMdw.Db.Sync.Transaction do
55

66
alias AeMdw.Blocks
77
alias AeMdw.Node, as: AE
8+
alias AeMdw.Node.Chain
89
alias AeMdw.Contract
910
alias AeMdw.Db.Model
10-
alias AeMdw.Db.Sync
11+
alias AeMdw.Db.Sync.BlockIndex
1112
alias AeMdw.Db.Aex9AccountPresenceMutation
1213
alias AeMdw.Db.ContractEventsMutation
1314
alias AeMdw.Db.IntTransfer
@@ -19,6 +20,7 @@ defmodule AeMdw.Db.Sync.Transaction do
1920
alias AeMdw.Db.WriteFieldsMutation
2021
alias AeMdw.Db.WriteLinksMutation
2122
alias AeMdw.Db.WriteTxMutation
23+
alias AeMdw.Log
2224
alias AeMdw.Mnesia
2325
alias AeMdw.Node
2426
alias AeMdw.Txs
@@ -29,15 +31,15 @@ defmodule AeMdw.Db.Sync.Transaction do
2931
import AeMdw.Db.Util
3032
import AeMdw.Util
3133

32-
@log_freq 1000
34+
@log_freq 1_000
3335
@sync_cache_cleanup_freq 150_000
3436

3537
################################################################################
3638

37-
@spec sync(non_neg_integer() | :safe) :: pos_integer()
38-
def sync(max_height \\ :safe) do
39-
max_height = Sync.height((is_integer(max_height) && max_height + 1) || max_height)
40-
bi_max_kbi = Sync.BlockIndex.sync(max_height) - 1
39+
@spec sync(non_neg_integer()) :: pos_integer()
40+
def sync(max_height) when is_integer(max_height) do
41+
max_height = Chain.checked_height(max_height + 1)
42+
bi_max_kbi = BlockIndex.sync(max_height) - 1
4143

4244
case last(Model.Tx) do
4345
:"$end_of_table" ->
@@ -53,8 +55,13 @@ defmodule AeMdw.Db.Sync.Transaction do
5355

5456
@spec sync(non_neg_integer(), non_neg_integer(), non_neg_integer()) :: pos_integer()
5557
def sync(from_height, to_height, txi) when from_height <= to_height do
56-
tracker = Sync.progress_logger(&sync_generation/2, @log_freq, &log_msg/2)
57-
next_txi = from_height..to_height |> Enum.reduce(txi, tracker)
58+
next_txi =
59+
Enum.reduce(from_height..to_height, txi, fn height, next_txi ->
60+
if rem(height, @log_freq) == 0,
61+
do: Log.info("syncing transactions at generation #{height}")
62+
63+
sync_generation(height, next_txi)
64+
end)
5865

5966
:mnesia.transaction(fn ->
6067
[succ_kb] = :mnesia.read(Model.Block, {to_height + 1, -1})
@@ -194,7 +201,4 @@ defmodule AeMdw.Db.Sync.Transaction do
194201

195202
{txi + length(mb_txs), mbi + 1, mutations}
196203
end
197-
198-
defp log_msg(height, _ignore),
199-
do: "syncing transactions at generation #{height}"
200204
end

lib/ae_mdw/node/chain.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule AeMdw.Node.Chain do
2+
@moduledoc """
3+
Calls to the chain Node excluding the ones related to :aec_db (for these please see `AeMdw.Node.Db`).
4+
"""
5+
@type height :: non_neg_integer()
6+
7+
@spec checked_height(height()) :: height()
8+
def checked_height(height) when is_integer(height) and height >= 0 do
9+
top = top_height()
10+
11+
if height > top and top > 0,
12+
do: raise(RuntimeError, message: "no such generation: #{height} (max = #{top})")
13+
14+
height
15+
end
16+
17+
@spec top_height() :: height()
18+
def top_height(),
19+
do: :aec_headers.height(:aec_chain.top_header())
20+
end

0 commit comments

Comments
 (0)