Skip to content

Commit

Permalink
Use the database for the validity store
Browse files Browse the repository at this point in the history
Mantain the txns in the validity store in database tables.
  • Loading branch information
viquezclaudio committed May 6, 2024
1 parent b7c31bf commit 3f82095
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 85 deletions.
36 changes: 23 additions & 13 deletions blockchain/src/history/light_history_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl LightHistoryStore {
/// Creates a new LightHistoryStore.
pub fn new(db: DatabaseProxy, network_id: NetworkId) -> Self {
let hist_tree_table = db.open_table("LightHistoryTrees".to_string());
let validity_store = Arc::new(RwLock::new(ValidityStore::new()));
let validity_store = Arc::new(RwLock::new(ValidityStore::new(db.clone())));

LightHistoryStore {
db,
Expand All @@ -52,7 +52,7 @@ impl LightHistoryStore {

let mut validity_store = self.validity_store.write();

validity_store.delete_block_transactions(block_number);
validity_store.delete_block_transactions(txn, block_number);
}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl HistoryInterface for LightHistoryStore {

let mut validity_store = self.validity_store.write();

validity_store.delete_block_transactions(block.block_number());
validity_store.delete_block_transactions(txn, block.block_number());

// TODO: We dont keep track of the size of the txns that we removed, is this necessary?
Some(0)
Expand Down Expand Up @@ -204,23 +204,22 @@ impl HistoryInterface for LightHistoryStore {

let mut validity_store = self.validity_store.write();

validity_store.update_validity_store(block_number);

for tx in hist_txs {
tree.push(tx).ok()?;

validity_store.add_transaction(block_number, tx.tx_hash().into())
}

log::trace!(
let root = tree.get_root().ok()?;

log::debug!(
num_txns = hist_txs.len(),
" Added txns to the history store"
);

// Prune the validity store after adding txns for some specific block
validity_store.prune_validity_store();
for tx in hist_txs {
validity_store.add_transaction(txn, block_number, tx.tx_hash().into())
}

let root = tree.get_root().ok()?;
validity_store.update_validity_store(txn, block_number);

// Return the history root.
Some((root, 0))
Expand All @@ -239,9 +238,20 @@ impl HistoryInterface for LightHistoryStore {
&self,
tx_hash: &Blake2bHash,
_validity_window_start: u32,
_txn_opt: Option<&TransactionProxy>,
txn_opt: Option<&TransactionProxy>,
) -> bool {
self.validity_store.read().has_transaction(tx_hash.clone())
let read_txn: TransactionProxy;
let txn = match txn_opt {
Some(txn) => txn,
None => {
read_txn = self.db.read_transaction();
&read_txn
}
};

self.validity_store
.read()
.has_transaction(&txn, tx_hash.clone())

Check warning on line 254 in blockchain/src/history/light_history_store.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> blockchain/src/history/light_history_store.rs:254:30 | 254 | .has_transaction(&txn, tx_hash.clone()) | ^^^^ help: change this to: `txn` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
}

fn get_hist_tx_by_hash(
Expand Down
177 changes: 105 additions & 72 deletions blockchain/src/history/validity_store.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,151 @@
use std::collections::{HashMap, HashSet};

Check warning on line 1 in blockchain/src/history/validity_store.rs

View workflow job for this annotation

GitHub Actions / Clippy Report

unused imports: `HashMap`, `HashSet`

warning: unused imports: `HashMap`, `HashSet` --> blockchain/src/history/validity_store.rs:1:24 | 1 | use std::collections::{HashMap, HashSet}; | ^^^^^^^ ^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

use nimiq_block::Block;
use nimiq_database::{
traits::{Database, ReadCursor, ReadTransaction, WriteTransaction},
DatabaseProxy, TableFlags, TableProxy, TransactionProxy, WriteTransactionProxy,
};
use nimiq_hash::Blake2bHash;
use nimiq_primitives::policy::Policy;

/// The validity store is used by full nodes to keep track of which
/// transactions have occurred within the validity window without
/// having to store the full transactions
pub struct ValidityStore {
/// The set of all raw transactions of the current validity window.
pub(crate) txs: HashSet<Blake2bHash>,
// A database table with all the txn hashes
pub(crate) txn_hashes: TableProxy,

// A map of txns hashes associated to each block number
pub(crate) block_txs: HashMap<u32, HashSet<Blake2bHash>>,

pub(crate) first_bn: u32,

pub(crate) latest_bn: u32,
// A database table from block number to txn hashes
pub(crate) block_txns: TableProxy,
}

impl ValidityStore {
pub(crate) fn new() -> Self {
const TXN_HASHES_DB_NAME: &'static str = "ValidityTxnHashes";
const BLOCK_TXNS_DB_NAME: &'static str = "ValidityBlockTxnHashes";

/// Creates a new validity store initializing database tables
pub(crate) fn new(db: DatabaseProxy) -> Self {
let txn_hashes_table = db.open_table(Self::TXN_HASHES_DB_NAME.to_string());
let block_txns_table = db.open_table_with_flags(
Self::BLOCK_TXNS_DB_NAME.to_string(),
TableFlags::DUPLICATE_KEYS | TableFlags::DUP_FIXED_SIZE_VALUES | TableFlags::UINT_KEYS,
);

Self {
txs: HashSet::new(),
block_txs: HashMap::new(),
first_bn: 0,
latest_bn: 0,
txn_hashes: txn_hashes_table,
block_txns: block_txns_table,
}
}

pub(crate) fn has_transaction(&self, raw_tx_hash: Blake2bHash) -> bool {
self.txs.contains(&raw_tx_hash)
/// Returns true if the validity store has the given transaction hash.
pub(crate) fn has_transaction(
&self,
db_tx: &TransactionProxy,
raw_tx_hash: Blake2bHash,
) -> bool {
db_tx
.get::<Blake2bHash, u32>(&self.txn_hashes, &raw_tx_hash)
.is_some()
}

pub(crate) fn _add_block_transactions(&mut self, block: &Block) {
if let Some(txs) = block.transactions() {
for tx in txs {
let raw_tx_hash = tx.raw_tx_hash();
/// Returns the first block number stored in the validity store
pub(crate) fn first_bn(&self, db_tx: &TransactionProxy) -> u32 {
// Initialize the cursor for the database.
let mut cursor = db_tx.cursor(&self.block_txns);

if let Some(block_txs) = self.block_txs.get_mut(&block.block_number()) {
block_txs.insert(raw_tx_hash.clone().into());
} else {
let mut block_txns = HashSet::new();
block_txns.insert(raw_tx_hash.clone().into());

self.block_txs.insert(block.block_number(), block_txns);
}

self.txs.insert(raw_tx_hash.into());
}
if let Some((key, _)) = cursor.first::<u32, Blake2bHash>() {
key
} else {
0
}
}

pub(crate) fn add_transaction(&mut self, block_number: u32, transaction: Blake2bHash) {
if let Some(block_txs) = self.block_txs.get_mut(&block_number) {
block_txs.insert(transaction.clone());
} else {
let mut block_txns = HashSet::new();
block_txns.insert(transaction.clone());
/// Obtains the last block number stored in the validity store.
pub(crate) fn last_bn(&self, db_tx: &TransactionProxy) -> u32 {
// Initialize the cursor for the database.
let mut cursor = db_tx.cursor(&self.block_txns);

self.block_txs.insert(block_number, block_txns);
if let Some((key, _)) = cursor.last::<u32, Blake2bHash>() {
key
} else {
0
}

self.txs.insert(transaction);
}

// We should only delete blocks from the front or back
pub(crate) fn delete_block_transactions(&mut self, block_number: u32) {
log::trace!(bn = block_number, "Deleting block from validity store");
/// Adds a transaction hash to the validity store
pub(crate) fn add_transaction(
&mut self,
db_txn: &mut WriteTransactionProxy,
block_number: u32,
transaction: Blake2bHash,
) {
db_txn.put(&self.txn_hashes, &transaction, &transaction);

if self.first_bn == self.latest_bn {
db_txn.put(&self.block_txns, &block_number, &transaction);
}

/// Delete the transactions associated to the given block number
pub(crate) fn delete_block_transactions(
&mut self,
db_txn: &mut WriteTransactionProxy,
block_number: u32,
) {
if self.first_bn(db_txn) == self.last_bn(db_txn) {
return;
}

// We are deleting from the back
if block_number == self.first_bn {
self.first_bn += 1;
} else if block_number == self.latest_bn {
self.latest_bn -= 1;
} else {
panic!("Trying to remove a block from the middle of the validity store");
}
log::trace!(bn = block_number, "Deleting block from validity store");

if let Some(block_txns) = self.block_txs.remove(&block_number) {
for tx in block_txns {
self.txs.remove(&tx);
}
let cursor = WriteTransaction::cursor(db_txn, &self.block_txns);

let block_txns: Vec<Blake2bHash> = cursor
.into_iter_dup_of::<_, Blake2bHash>(&block_number)
.map(|(_, hash)| hash)
.collect();

for txn_hash in block_txns {
db_txn.remove(&self.txn_hashes, &txn_hash);
}

db_txn.remove(&self.block_txns, &block_number);
}

pub(crate) fn prune_validity_store(&mut self) {
/// Prunes the validity store keeping only 'validity_window_blocks'
pub(crate) fn prune_validity_store(&mut self, db_txn: &mut WriteTransactionProxy) {
// Compute the number of blocks we currently have in the store
let mut diff = self.latest_bn - self.first_bn;
let first_bn = self.first_bn(db_txn);
let last_bn = self.last_bn(db_txn);
let num_blocks = last_bn - first_bn;

log::trace!(
first = self.first_bn,
latest = self.latest_bn,
diff = diff,
"Calculated diff for pruning validity store"
first = first_bn,
latest = last_bn,
blocks = num_blocks,
"Number of blocks in the validity store"
);

// We only need to keep up to VALIDITY_WINDOW_BLOCKS in the store
while diff > Policy::transaction_validity_window_blocks() {
self.delete_block_transactions(self.first_bn);
diff = self.latest_bn - self.first_bn;
if num_blocks > Policy::transaction_validity_window_blocks() {
// We need to prune the validity store
let count = num_blocks - Policy::transaction_validity_window_blocks();

for bn in [first_bn, first_bn + count] {
self.delete_block_transactions(db_txn, bn);
}
}
}

pub(crate) fn update_validity_store(&mut self, latest_bn: u32) {
if latest_bn > self.latest_bn {
self.latest_bn = latest_bn;
}
if self.first_bn == 0 {
self.first_bn = latest_bn;
/// Updates the validity store with the latest block number in the history store
/// Note: Sometimes we have blocks that do not include transactions
/// but we still need to track them in the validity store to mantain up to
/// 'validity_window_blocks' inside the validity store
pub(crate) fn update_validity_store(
&mut self,
db_txn: &mut WriteTransactionProxy,
latest_bn: u32,
) {
if latest_bn > self.last_bn(db_txn) {
db_txn.put(&self.block_txns, &latest_bn, &Blake2bHash::default());
}

self.prune_validity_store(db_txn)
}
}

0 comments on commit 3f82095

Please sign in to comment.