Skip to content

Commit

Permalink
Migrate PoW to PoS history proactively
Browse files Browse the repository at this point in the history
  • Loading branch information
Eligioo committed May 2, 2024
1 parent 0c1839f commit d818b04
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 140 deletions.
51 changes: 3 additions & 48 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pow-migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ clap = { version = "4.5", features = ["derive"] }
convert_case = "0.6"
hex = "0.4"
humantime = "2.1"
indicatif = "0.17"
log = { package = "tracing", version = "0.1", features = ["log"] }
nimiq-blockchain = { workspace = true }
nimiq-bls = { workspace = true }
Expand All @@ -47,11 +46,11 @@ nimiq-lib = { workspace = true, features = [
"zkp-prover",
"parallel",
] }
nimiq-primitives = { workspace = true, features = ["policy"]}
nimiq-primitives = { workspace = true, features = ["policy"] }
nimiq-serde = { workspace = true }
nimiq-transaction = { workspace = true }
nimiq-vrf = { workspace = true }
nimiq_rpc = "0.3"
nimiq_rpc = "0.3.1"
percentage = "0.1"
rand = "0.8"
serde = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion pow-migration/src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn get_pos_genesis(
"Building history tree. This may take some time"
);
let start = Instant::now();
let history_root = match get_history_root(client, final_block.number, env).await {
let history_root = match get_history_root(env).await {
Ok(history_root) => {
let duration = start.elapsed();
log::info!(
Expand Down
189 changes: 112 additions & 77 deletions pow-migration/src/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fmt::Write;
use std::time::Duration;

use indicatif::{HumanDuration, ProgressBar, ProgressState, ProgressStyle};
use nimiq_blockchain::{interface::HistoryInterface, HistoryStore};
use nimiq_database::{
traits::{Database, WriteTransaction},
Expand All @@ -23,6 +22,10 @@ use nimiq_transaction::{
historic_transaction::HistoricTransaction, ExecutedTransaction, Transaction, TransactionFlags,
};
use thiserror::Error;
use tokio::{
sync::{mpsc, watch},
time::sleep,
};

/// Error types that can be returned
#[derive(Error, Debug)]
Expand Down Expand Up @@ -102,100 +105,132 @@ fn from_pow_transaction(pow_transaction: &PoWTransaction) -> Result<Transaction,
Ok(tx)
}

/// Gets the PoS genesis history root by getting all of the transactions from the
/// PoW chain and building a single history tree.
pub async fn get_history_root(
client: &Client,
cutting_pow_block_number: u32,
/// Task that is responsible for migrating the PoW history into a PoS history up until an instructed block height.
pub async fn migrate_history(
mut rx_candidate_block: mpsc::Receiver<u32>,
tx_migration_completed: watch::Sender<u32>,
env: DatabaseProxy,
) -> Result<Blake2bHash, Error> {
pow_client: Client,
block_confirmations: u32,
) {
let mut history_store_height = get_history_store_height(env.clone()).await;
let history_store = HistoryStore::new(env.clone());
let mut pow_head_height = pow_client.block_number().await.unwrap();

// Setup progress bar
let pb = ProgressBar::new(cutting_pow_block_number as u64);
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] Block: {pos}, {percent}% (~{eta} remaining)",
)
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
write!(w, "{}", HumanDuration(state.eta())).unwrap()
})
.progress_chars("#>-"),
);

// We might have already some work done. Check if the database already content
// a history tree and if so, get its last leaf block number.
let start = match history_store.get_last_leaf_block_number(None) {
Some(block_height) => {
if block_height > cutting_pow_block_number {
// If the last leaf in the HS has a block height greater than the
// `cutting_pow_block_number` we are potentially dealing with an
// incompatible DB.
log::error!("Found incompatible history store in the database");
return Err(Error::HistoryRootError);
}
// If there was already a history tree that we can use, continue with
// the next block
block_height + 1
while let Some(candidate_block) = rx_candidate_block.recv().await {
// Only migrate the part of the PoW history which we haven't processed yet
if candidate_block <= history_store_height {
tx_migration_completed.send(candidate_block).unwrap();
continue;
}
// If there is no history tree, start from the genesis
None => 1,
};

// Get transactions of each block and add them to the PoS history store
for block_height in start..cutting_pow_block_number {
// Refresh the progress bar position
pb.set_position(block_height as u64);
log::info!(
candidate_block,
"Start migrating PoW history up to the next candidate block"
);

// Get all transactions for this block height
let mut transactions = vec![];
let block = client.get_block_by_number(block_height, false).await?;
let mut network_id = NetworkId::Main;
match block.transactions {
PoWTransactionSequence::BlockHashes(hashes) => {
if hashes.is_empty() {
continue;
}
for hash in hashes {
log::trace!(hash, "Processing transaction");
let pow_transaction = client.get_transaction_by_hash_2(&hash).await?;
let pos_transaction = from_pow_transaction(&pow_transaction)?;
network_id = pos_transaction.network_id;
// Get transactions of each block and add them to the PoS history store
for block_height in (history_store_height + 1)..=candidate_block {
// Check if we are closing in on the PoW head block.
// If this is true, we may need to take some extra time in order to make sure
// that blocks leading up to the head block are confirmed before we
// can safely add them to the PoS history store.
if pow_head_height - block_height <= block_confirmations {
loop {
// Check if the block has been confirmed.
if pow_head_height > block_height + block_confirmations {
break;
}

assert_eq!(
pow_transaction.hash,
pos_transaction.hash::<Blake2bHash>().to_hex()
pow_head_height = pow_client.block_number().await.unwrap();
log::info!(
block = block_height,
"Waiting for block to be confirmed before we can migrate it"
);
transactions.push(ExecutedTransaction::Ok(pos_transaction));
sleep(Duration::from_secs(60)).await;
}
}

let block = pow_client
.get_block_by_number(block_height, false)
.await
.unwrap();

// Get all transactions for this block height
let mut transactions = vec![];
let mut network_id = NetworkId::Main;
match block.transactions {
PoWTransactionSequence::BlockHashes(hashes) => {
if hashes.is_empty() {
// Mark that we've migrated up until this block.
history_store_height = block_height;
tx_migration_completed.send(block_height).unwrap();
continue;
}

for hash in hashes {
log::trace!(hash, "Processing transaction");
let pow_transaction =
pow_client.get_transaction_by_hash_2(&hash).await.unwrap();
let pos_transaction = from_pow_transaction(&pow_transaction).unwrap();
network_id = pos_transaction.network_id;

assert_eq!(
pow_transaction.hash,
pos_transaction.hash::<Blake2bHash>().to_hex()
);
transactions.push(ExecutedTransaction::Ok(pos_transaction));
}

// Mark that we've migrated up until this block.
history_store_height = block_height;
tx_migration_completed.send(block_height).unwrap();
}
PoWTransactionSequence::Transactions(_) => panic!("Unexpected transaction type"),
}
PoWTransactionSequence::Transactions(_) => panic!("Unexpected transaction type"),

// Add transactions to the history store
let mut txn = env.write_transaction();
history_store.add_to_history(
&mut txn,
0,
&HistoricTransaction::from(
network_id,
block_height,
block.timestamp.into(),
transactions,
vec![],
vec![],
),
);
txn.commit();
}

// Add transactions to the history store
let mut txn = env.write_transaction();
history_store.add_to_history(
&mut txn,
0,
&HistoricTransaction::from(
network_id,
block_height,
block.timestamp.into(),
transactions,
vec![],
vec![],
),
log::info!(
candidate_block,
"Finished migrating PoW history up to the candidate block"
);
txn.commit();
tx_migration_completed.send(candidate_block).unwrap();
}
}

// Get history tree root
history_store
/// Get the PoS genesis history root by getting all of the transactions from the
/// PoW chain and building a single history tree.
pub async fn get_history_root(env: DatabaseProxy) -> Result<Blake2bHash, Error> {
HistoryStore::new(env.clone())
.get_history_tree_root(0, None)
.ok_or(Error::HistoryRootError)
}

/// Get the current block height of the PoS history store
pub async fn get_history_store_height(env: DatabaseProxy) -> u32 {
if let Some(block_height) = HistoryStore::new(env.clone()).get_last_leaf_block_number(None) {
block_height
} else {
1
}
}

#[cfg(test)]
mod test {

Expand Down

0 comments on commit d818b04

Please sign in to comment.