Skip to content

Commit 42b09e8

Browse files
authored
refactor: commit only through mutations (#534)
1 parent ec1badd commit 42b09e8

17 files changed

+510
-322
lines changed

lib/ae_mdw/blocks.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule AeMdw.Blocks do
1717
@type height() :: non_neg_integer()
1818
@type mbi() :: non_neg_integer()
1919
@type time() :: non_neg_integer()
20-
@type block_index() :: {height(), mbi()}
20+
@type block_index() :: {height(), mbi() | -1}
2121
@type txi_pos() :: non_neg_integer() | -1
2222
@type block_index_txi_pos() :: {height(), txi_pos()}
2323
@type key_header() :: term()

lib/ae_mdw/database.ex

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ defmodule AeMdw.Database do
1313

1414
alias AeMdw.Db.Model
1515
alias AeMdw.Db.Mutation
16+
alias AeMdw.Db.TxnMutation
1617
alias AeMdw.Db.RocksDb
1718
alias AeMdw.Db.RocksDbCF
1819

@@ -22,6 +23,7 @@ defmodule AeMdw.Database do
2223
@type direction() :: :forward | :backward
2324
@type cursor() :: key() | nil
2425
@type limit() :: pos_integer()
26+
@type transaction() :: RocksDb.transaction()
2527

2628
@end_token :"$end_of_table"
2729

@@ -209,18 +211,29 @@ defmodule AeMdw.Database do
209211
:mnesia.read(table, key, lock)
210212
end
211213

212-
@spec write(table(), record()) :: :ok
213-
def write(tab, record) when use_rocksdb?(tab) do
214-
RocksDbCF.put(tab, record)
214+
@spec write(transaction(), table(), record()) :: :ok
215+
def write(txn, tab, record) when use_rocksdb?(tab) do
216+
RocksDbCF.put(txn, tab, record)
215217
end
216218

219+
@spec write(table(), record()) :: :ok
217220
def write(table, record) do
218221
:mnesia.write(table, record, :write)
219222
end
220223

221-
@spec commit() :: :ok
222-
def commit do
223-
:ok = RocksDb.commit()
224+
@doc """
225+
Creates a transaction and commits the changes of a mutation list.
226+
"""
227+
@spec commit([TxnMutation.t()]) :: :ok
228+
def commit(txn_mutations) do
229+
{:ok, txn} = RocksDb.transaction_new()
230+
231+
txn_mutations
232+
|> List.flatten()
233+
|> Enum.reject(&is_nil/1)
234+
|> Enum.each(&TxnMutation.execute(&1, txn))
235+
236+
:ok = RocksDb.transaction_commit(txn)
224237
end
225238

226239
@spec transaction([Mutation.t()]) :: :ok

lib/ae_mdw/db/mutations/key_blocks_mutation.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@ defmodule AeMdw.Db.KeyBlocksMutation do
2020
%__MODULE__{key_block: m_block, next_txi: next_txi}
2121
end
2222

23-
@spec mutate(t()) :: :ok
24-
def mutate(%__MODULE__{key_block: m_block, next_txi: next_txi}) do
23+
@spec execute(t(), Database.transaction()) :: :ok
24+
def execute(%__MODULE__{key_block: m_block, next_txi: next_txi}, txn) do
2525
{height, -1} = Model.block(m_block, :index)
2626
[next_kb] = Database.read(Model.Block, {height + 1, -1})
2727

28-
Database.write(Model.Block, m_block)
29-
Database.write(Model.Block, Model.block(next_kb, tx_index: next_txi))
28+
Database.write(txn, Model.Block, m_block)
29+
Database.write(txn, Model.Block, Model.block(next_kb, tx_index: next_txi))
3030
end
3131
end
3232

33-
defimpl AeMdw.Db.Mutation, for: AeMdw.Db.KeyBlocksMutation do
34-
def mutate(mutation) do
35-
@for.mutate(mutation)
33+
defimpl AeMdw.Db.TxnMutation, for: AeMdw.Db.KeyBlocksMutation do
34+
def execute(mutation, txn) do
35+
@for.execute(mutation, txn)
3636
end
3737
end

lib/ae_mdw/db/mutations/write_tx_mutation.ex

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
defmodule AeMdw.Db.WriteTxnMutation do
2+
@moduledoc """
3+
This is the most basic kind of transaction, it just inserts a record in a
4+
mnesia table.
5+
"""
6+
7+
alias AeMdw.Database
8+
9+
defstruct [:table, :record]
10+
11+
@opaque t() :: %__MODULE__{
12+
table: Database.table(),
13+
record: Database.record()
14+
}
15+
16+
@spec new(Database.table(), Database.record()) :: t()
17+
def new(table, record) do
18+
%__MODULE__{table: table, record: record}
19+
end
20+
21+
@spec execute(t(), Database.transaction()) :: :ok
22+
def execute(%__MODULE__{table: table, record: record}, txn) do
23+
Database.write(txn, table, record)
24+
end
25+
end
26+
27+
defimpl AeMdw.Db.TxnMutation, for: AeMdw.Db.WriteTxnMutation do
28+
def execute(mutation, txn) do
29+
@for.execute(mutation, txn)
30+
end
31+
end

0 commit comments

Comments
 (0)