diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 9741f24757..bf363f0c18 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -511,11 +511,11 @@ impl BFT { info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); // Commit the leader certificate, and all previous leader certificates since the last committed round. - self.commit_leader_certificate::(leader_certificate).await + self.commit_leader_certificate::(leader_certificate).await } /// Commits the leader certificate, and all previous leader certificates since the last committed round. - async fn commit_leader_certificate( + async fn commit_leader_certificate( &self, leader_certificate: BatchCertificate, ) -> Result<()> { @@ -584,10 +584,6 @@ impl BFT { let mut transmissions = IndexMap::new(); // Start from the oldest leader certificate. for certificate in commit_subdag.values().flatten() { - // Update the DAG. - if IS_SYNCING { - self.dag.write().commit(certificate, self.storage().max_gc_rounds()); - } // Retrieve the transmissions. for transmission_id in certificate.transmission_ids() { // If the transmission already exists in the map, skip it. @@ -611,58 +607,54 @@ impl BFT { transmissions.insert(*transmission_id, transmission); } } - // If the node is not syncing, trigger consensus, as this will build a new block for the ledger. - if !IS_SYNCING { - // Construct the subdag. - let subdag = Subdag::from(commit_subdag.clone())?; - // Retrieve the anchor round. - let anchor_round = subdag.anchor_round(); - // Retrieve the number of transmissions. - let num_transmissions = transmissions.len(); - // Retrieve metadata about the subdag. - let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::>(); - - // Ensure the subdag anchor round matches the leader round. - ensure!( - anchor_round == leader_round, - "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}", - ); - - // Trigger consensus. - if let Some(consensus_sender) = self.consensus_sender.get() { - // Initialize a callback sender and receiver. - let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the subdag and transmissions to consensus. - consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?; - // Await the callback to continue. - match callback_receiver.await { - Ok(Ok(())) => (), // continue - Ok(Err(e)) => { - error!("BFT failed to advance the subdag for round {anchor_round} - {e}"); - return Ok(()); - } - Err(e) => { - error!("BFT failed to receive the callback for round {anchor_round} - {e}"); - return Ok(()); - } + // Trigger consensus, as this will build a new block for the ledger. + // Construct the subdag. + let subdag = Subdag::from(commit_subdag.clone())?; + // Retrieve the anchor round. + let anchor_round = subdag.anchor_round(); + // Retrieve the number of transmissions. + let num_transmissions = transmissions.len(); + // Retrieve metadata about the subdag. + let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::>(); + + // Ensure the subdag anchor round matches the leader round. + ensure!( + anchor_round == leader_round, + "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}", + ); + + // Trigger consensus. + if let Some(consensus_sender) = self.consensus_sender.get() { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the subdag and transmissions to consensus. + consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?; + // Await the callback to continue. + match callback_receiver.await { + Ok(Ok(())) => (), // continue + Ok(Err(e)) => { + error!("BFT failed to advance the subdag for round {anchor_round} - {e}"); + return Ok(()); + } + Err(e) => { + error!("BFT failed to receive the callback for round {anchor_round} - {e}"); + return Ok(()); } } + } - info!( - "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n" - ); - // Update the DAG, as the subdag was successfully included into a block. - let mut dag_write = self.dag.write(); - for certificate in commit_subdag.values().flatten() { - dag_write.commit(certificate, self.storage().max_gc_rounds()); - } + info!( + "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n" + ); + // Update the DAG, as the subdag was successfully included into a block. + let mut dag_write = self.dag.write(); + for certificate in commit_subdag.values().flatten() { + dag_write.commit(certificate, self.storage().max_gc_rounds()); } } - // Perform garbage collection based on the latest committed leader round if the node is not syncing. - if !IS_SYNCING { - self.storage().garbage_collect_certificates(latest_leader_round); - } + // Perform garbage collection based on the latest committed leader round. + self.storage().garbage_collect_certificates(latest_leader_round); Ok(()) } @@ -700,6 +692,11 @@ impl BFT { if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) { continue; } + // If the previous certificate already exists in the ledger, continue. + if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) { + continue; + } + // Retrieve the previous certificate. let previous_certificate = { // Start by retrieving the previous certificate from the DAG. @@ -710,28 +707,11 @@ impl BFT { None => match self.storage().get_certificate(*previous_certificate_id) { // If the previous certificate is found, return it. Some(previous_certificate) => previous_certificate, - // Otherwise, retrieve the previous certificate from the ledger. - None => { - if ALLOW_LEDGER_ACCESS { - match self.ledger().get_batch_certificate(previous_certificate_id) { - // If the previous certificate is found, return it. - Ok(previous_certificate) => previous_certificate, - // Otherwise, the previous certificate is missing, and throw an error. - Err(e) => { - bail!( - "Missing previous certificate {} for round {previous_round} - {e}", - fmt_id(previous_certificate_id) - ) - } - } - } else { - // Otherwise, the previous certificate is missing, and throw an error. - bail!( - "Missing previous certificate {} for round {previous_round}", - fmt_id(previous_certificate_id) - ) - } - } + // Otherwise, the previous certificate is missing, and throw an error. + None => bail!( + "Missing previous certificate {} for round {previous_round}", + fmt_id(previous_certificate_id) + ), }, } }; @@ -806,8 +786,8 @@ impl BFT { // Process the request to sync the BFT DAG at bootup. let self_ = self.clone(); self.spawn(async move { - while let Some((leader_certificates, certificates)) = rx_sync_bft_dag_at_bootup.recv().await { - self_.sync_bft_dag_at_bootup(leader_certificates, certificates).await; + while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await { + self_.sync_bft_dag_at_bootup(certificates).await; } }); @@ -824,51 +804,19 @@ impl BFT { }); } - /// Syncs the BFT DAG with the given leader certificates and batch certificates. + /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** + /// already exist in the ledger. /// - /// This method starts by inserting all certificates (except the latest leader certificate) - /// into the DAG. Then, it commits all leader certificates (except the latest leader certificate). - /// Finally, it updates the DAG with the latest leader certificate. - async fn sync_bft_dag_at_bootup( - &self, - leader_certificates: Vec>, - certificates: Vec>, - ) { - // Split the leader certificates into past leader certificates and the latest leader certificate. - let (past_leader_certificates, leader_certificate) = { - // Compute the penultimate index. - let index = leader_certificates.len().saturating_sub(1); - // Split the leader certificates. - let (past, latest) = leader_certificates.split_at(index); - debug_assert!(latest.len() == 1, "There should only be one latest leader certificate"); - // Retrieve the latest leader certificate. - match latest.first() { - Some(leader_certificate) => (past, leader_certificate.clone()), - // If there is no latest leader certificate, return early. - None => return, - } - }; - { - // Acquire the BFT write lock. - let mut dag = self.dag.write(); - // Iterate over the certificates. - for certificate in certificates { - // If the certificate is not the latest leader certificate, insert it. - if leader_certificate.id() != certificate.id() { - // Insert the certificate into the DAG. - dag.insert(certificate); - } - } - - // Iterate over the leader certificates. - for leader_certificate in past_leader_certificates { - // Commit the leader certificate. - dag.commit(leader_certificate, self.storage().max_gc_rounds()); - } - } - // Commit the latest leader certificate. - if let Err(e) = self.commit_leader_certificate::(leader_certificate).await { - error!("BFT failed to update the DAG with the latest leader certificate - {e}"); + /// This method commits all the certificates into the DAG. + /// Note that there is no need to insert the certificates into the DAG, because these certificates + /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags. + async fn sync_bft_dag_at_bootup(&self, certificates: Vec>) { + // Acquire the BFT write lock. + let mut dag = self.dag.write(); + + // Commit all the certificates excluding the latest leader certificate. + for certificate in certificates { + dag.commit(&certificate, self.storage().max_gc_rounds()); } } @@ -899,6 +847,7 @@ mod tests { use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; use snarkvm::{ + console::account::{Address, PrivateKey}, ledger::{ committee::Committee, narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round}, @@ -907,7 +856,7 @@ mod tests { }; use anyhow::Result; - use indexmap::IndexSet; + use indexmap::{IndexMap, IndexSet}; use std::sync::{atomic::Ordering, Arc}; type CurrentNetwork = snarkvm::console::network::MainnetV0; @@ -1321,11 +1270,467 @@ mod tests { } // Commit the leader certificate. - bft.commit_leader_certificate::(leader_certificate).await.unwrap(); + bft.commit_leader_certificate::(leader_certificate).await.unwrap(); // Ensure that the `gc_round` has been updated. assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds); Ok(()) } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_sync_bft_dag_at_bootup() -> Result<()> { + let rng = &mut TestRng::default(); + + // Initialize the round parameters. + let max_gc_rounds = 1; + let committee_round = 0; + let commit_round = 2; + let current_round = commit_round + 1; + + // Sample the current certificate and previous certificates. + let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates( + current_round, + rng, + ); + + // Initialize the committee. + let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members( + committee_round, + vec![ + certificates[0].author(), + certificates[1].author(), + certificates[2].author(), + certificates[3].author(), + ], + rng, + ); + + // Initialize the ledger. + let ledger = Arc::new(MockLedgerService::new(committee.clone())); + + // Initialize the storage. + let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); + // Insert the certificates into the storage. + for certificate in certificates.iter() { + storage.testing_only_insert_certificate_testing_only(certificate.clone()); + } + + // Get the leader certificate. + let leader = committee.get_leader(commit_round).unwrap(); + let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap(); + + // Initialize the BFT. + let account = Account::new(rng)?; + let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?; + + // Insert a mock DAG in the BFT. + *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); + + // Insert the previous certificates into the BFT. + for certificate in certificates.clone() { + assert!(bft.update_dag::(certificate).await.is_ok()); + } + + // Commit the leader certificate. + bft.commit_leader_certificate::(leader_certificate.clone()).await.unwrap(); + + // Simulate a bootup of the BFT. + + // Initialize a new instance of storage. + let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); + // Initialize a new instance of BFT. + let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], None)?; + + // Sync the BFT DAG at bootup. + bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await; + + // Check that the BFT starts from the same last committed round. + assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round()); + + // Ensure that both BFTs have committed the leader certificate. + assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); + assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); + + // Check the state of the bootup BFT. + for certificate in certificates { + let certificate_round = certificate.round(); + let certificate_id = certificate.id(); + // Check that the bootup BFT has committed the certificates. + assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); + // Check that the bootup BFT does not contain the certificates in its graph, because + // it should not need to order them again in subsequent subdags. + assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); + } + + Ok(()) + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> { + /* + 1. Run one uninterrupted BFT on a set of certificates for 2 leader commits. + 2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates. + 3. Observe that the uninterrupted BFT and the bootup BFT end in the same state. + */ + + let rng = &mut TestRng::default(); + + // Initialize the round parameters. + let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::::MAX_GC_ROUNDS as u64; + let committee_round = 0; + let commit_round = 2; + let current_round = commit_round + 1; + let next_round = current_round + 1; + + // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. + let (round_to_certificates_map, committee) = { + let private_keys = vec![ + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + ]; + let addresses = vec![ + Address::try_from(private_keys[0])?, + Address::try_from(private_keys[1])?, + Address::try_from(private_keys[2])?, + Address::try_from(private_keys[3])?, + ]; + let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members( + committee_round, + addresses, + rng, + ); + // Initialize a mapping from the round number to the set of batch certificates in the round. + let mut round_to_certificates_map: IndexMap< + u64, + IndexSet>, + > = IndexMap::new(); + let mut previous_certificates = IndexSet::with_capacity(4); + // Initialize the genesis batch certificates. + for _ in 0..4 { + previous_certificates.insert(sample_batch_certificate(rng)); + } + for round in 0..commit_round + 3 { + let mut current_certificates = IndexSet::new(); + let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { + IndexSet::new() + } else { + previous_certificates.iter().map(|c| c.id()).collect() + }; + let transmission_ids = + snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng) + .into_iter() + .collect::>(); + let timestamp = time::OffsetDateTime::now_utc().unix_timestamp(); + let committee_id = committee.id(); + for (i, private_key_1) in private_keys.iter().enumerate() { + let batch_header = snarkvm::ledger::narwhal::BatchHeader::new( + private_key_1, + round, + timestamp, + committee_id, + transmission_ids.clone(), + previous_certificate_ids.clone(), + rng, + ) + .unwrap(); + let mut signatures = IndexSet::with_capacity(4); + for (j, private_key_2) in private_keys.iter().enumerate() { + if i != j { + signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); + } + } + let certificate = + snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap(); + current_certificates.insert(certificate); + } + // Update the mapping. + round_to_certificates_map.insert(round, current_certificates.clone()); + previous_certificates = current_certificates.clone(); + } + (round_to_certificates_map, committee) + }; + + // Initialize the ledger. + let ledger = Arc::new(MockLedgerService::new(committee.clone())); + // Initialize the storage. + let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); + // Get the leaders for the next 2 commit rounds. + let leader = committee.get_leader(commit_round).unwrap(); + let next_leader = committee.get_leader(next_round).unwrap(); + // Insert the pre shutdown certificates into the storage. + let mut pre_shutdown_certificates: Vec> = Vec::new(); + for i in 1..=commit_round { + let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone(); + if i == commit_round { + // Only insert the leader certificate for the commit round. + let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader); + if let Some(c) = leader_certificate { + pre_shutdown_certificates.push(c.clone()); + } + continue; + } + pre_shutdown_certificates.extend(certificates); + } + for certificate in pre_shutdown_certificates.iter() { + storage.testing_only_insert_certificate_testing_only(certificate.clone()); + } + // Insert the post shutdown certificates into the storage. + let mut post_shutdown_certificates: Vec> = + Vec::new(); + for j in commit_round..=commit_round + 2 { + let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone(); + post_shutdown_certificates.extend(certificate); + } + for certificate in post_shutdown_certificates.iter() { + storage.testing_only_insert_certificate_testing_only(certificate.clone()); + } + // Get the leader certificates. + let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap(); + let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap(); + + // Initialize the BFT without bootup. + let account = Account::new(rng)?; + let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?; + + // Insert a mock DAG in the BFT without bootup. + *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); + + // Insert the certificates into the BFT without bootup. + for certificate in pre_shutdown_certificates.clone() { + assert!(bft.update_dag::(certificate).await.is_ok()); + } + + // Insert the post shutdown certificates into the BFT without bootup. + for certificate in post_shutdown_certificates.clone() { + assert!(bft.update_dag::(certificate).await.is_ok()); + } + // Commit the second leader certificate. + let commit_subdag = bft.order_dag_with_dfs::(next_leader_certificate.clone()).unwrap(); + let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::>(); + bft.commit_leader_certificate::(next_leader_certificate.clone()).await.unwrap(); + + // Simulate a bootup of the BFT. + + // Initialize a new instance of storage. + let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); + + // Initialize a new instance of BFT with bootup. + let bootup_bft = BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], None)?; + + // Sync the BFT DAG at bootup. + bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; + + // Insert the post shutdown certificates to the storage and BFT with bootup. + for certificate in post_shutdown_certificates.iter() { + bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone()); + } + for certificate in post_shutdown_certificates.clone() { + assert!(bootup_bft.update_dag::(certificate).await.is_ok()); + } + // Commit the second leader certificate. + let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::(next_leader_certificate.clone()).unwrap(); + let commit_subdag_metadata_bootup = + commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::>(); + let committed_certificates_bootup = commit_subdag_bootup.values().flatten(); + bootup_bft.commit_leader_certificate::(next_leader_certificate.clone()).await.unwrap(); + + // Check that the final state of both BFTs is the same. + + // Check that both BFTs start from the same last committed round. + assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round()); + + // Ensure that both BFTs have committed the leader certificates. + assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); + assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())); + assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id())); + assert!( + bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()) + ); + + // Check that the bootup BFT has committed the pre shutdown certificates. + for certificate in pre_shutdown_certificates.clone() { + let certificate_round = certificate.round(); + let certificate_id = certificate.id(); + // Check that both BFTs have committed the certificates. + assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id)); + assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); + // Check that the bootup BFT does not contain the certificates in its graph, because + // it should not need to order them again in subsequent subdags. + assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); + assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); + } + + // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus. + for certificate in committed_certificates_bootup.clone() { + let certificate_round = certificate.round(); + let certificate_id = certificate.id(); + // Check that the both BFTs have committed the certificates. + assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id)); + assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id)); + // Check that the bootup BFT does not contain the certificates in its graph, because + // it should not need to order them again in subsequent subdags. + assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); + assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id)); + } + + // Check that the commit subdag metadata for the second leader is the same for both BFTs. + assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata); + + Ok(()) + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> { + /* + 1. Run a bootup BFT that syncs with a set of pre shutdown certificates. + 2. Add post shutdown certificates to the bootup BFT. + 2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates. + */ + + let rng = &mut TestRng::default(); + + // Initialize the round parameters. + let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::::MAX_GC_ROUNDS as u64; + let committee_round = 0; + let commit_round = 2; + let current_round = commit_round + 1; + let next_round = current_round + 1; + + // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. + let (round_to_certificates_map, committee) = { + let private_keys = vec![ + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + PrivateKey::new(rng).unwrap(), + ]; + let addresses = vec![ + Address::try_from(private_keys[0])?, + Address::try_from(private_keys[1])?, + Address::try_from(private_keys[2])?, + Address::try_from(private_keys[3])?, + ]; + let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members( + committee_round, + addresses, + rng, + ); + // Initialize a mapping from the round number to the set of batch certificates in the round. + let mut round_to_certificates_map: IndexMap< + u64, + IndexSet>, + > = IndexMap::new(); + let mut previous_certificates = IndexSet::with_capacity(4); + // Initialize the genesis batch certificates. + for _ in 0..4 { + previous_certificates.insert(sample_batch_certificate(rng)); + } + for round in 0..=commit_round + 2 { + let mut current_certificates = IndexSet::new(); + let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 { + IndexSet::new() + } else { + previous_certificates.iter().map(|c| c.id()).collect() + }; + let transmission_ids = + snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng) + .into_iter() + .collect::>(); + let timestamp = time::OffsetDateTime::now_utc().unix_timestamp(); + let committee_id = committee.id(); + for (i, private_key_1) in private_keys.iter().enumerate() { + let batch_header = snarkvm::ledger::narwhal::BatchHeader::new( + private_key_1, + round, + timestamp, + committee_id, + transmission_ids.clone(), + previous_certificate_ids.clone(), + rng, + ) + .unwrap(); + let mut signatures = IndexSet::with_capacity(4); + for (j, private_key_2) in private_keys.iter().enumerate() { + if i != j { + signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap()); + } + } + let certificate = + snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap(); + current_certificates.insert(certificate); + } + // Update the mapping. + round_to_certificates_map.insert(round, current_certificates.clone()); + previous_certificates = current_certificates.clone(); + } + (round_to_certificates_map, committee) + }; + + // Initialize the ledger. + let ledger = Arc::new(MockLedgerService::new(committee.clone())); + // Initialize the storage. + let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); + // Get the leaders for the next 2 commit rounds. + let leader = committee.get_leader(commit_round).unwrap(); + let next_leader = committee.get_leader(next_round).unwrap(); + // Insert the pre shutdown certificates into the storage. + let mut pre_shutdown_certificates: Vec> = Vec::new(); + for i in 1..=commit_round { + let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone(); + if i == commit_round { + // Only insert the leader certificate for the commit round. + let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader); + if let Some(c) = leader_certificate { + pre_shutdown_certificates.push(c.clone()); + } + continue; + } + pre_shutdown_certificates.extend(certificates); + } + for certificate in pre_shutdown_certificates.iter() { + storage.testing_only_insert_certificate_testing_only(certificate.clone()); + } + // Initialize the bootup BFT. + let account = Account::new(rng)?; + let bootup_bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?; + // Insert a mock DAG in the BFT without bootup. + *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); + // Sync the BFT DAG at bootup. + bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; + + // Insert the post shutdown certificates into the storage. + let mut post_shutdown_certificates: Vec> = + Vec::new(); + for j in commit_round..=commit_round + 2 { + let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone(); + post_shutdown_certificates.extend(certificate); + } + for certificate in post_shutdown_certificates.iter() { + storage.testing_only_insert_certificate_testing_only(certificate.clone()); + } + + // Insert the post shutdown certificates into the DAG. + for certificate in post_shutdown_certificates.clone() { + assert!(bootup_bft.update_dag::(certificate).await.is_ok()); + } + + // Get the next leader certificate to commit. + let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap(); + let commit_subdag = bootup_bft.order_dag_with_dfs::(next_leader_certificate).unwrap(); + let committed_certificates = commit_subdag.values().flatten(); + + // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round. + for pre_shutdown_certificate in pre_shutdown_certificates.clone() { + for committed_certificate in committed_certificates.clone() { + assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id()); + } + } + Ok(()) + } } diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 8981883522..932c123530 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -63,7 +63,7 @@ pub fn init_consensus_channels() -> (ConsensusSender, ConsensusRe pub struct BFTSender { pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender)>, pub tx_primary_certificate: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, - pub tx_sync_bft_dag_at_bootup: mpsc::Sender<(Vec>, Vec>)>, + pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, } @@ -103,7 +103,7 @@ impl BFTSender { pub struct BFTReceiver { pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender)>, pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, - pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<(Vec>, Vec>)>, + pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, } diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 153e6556b3..753e62cbb2 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -236,22 +236,6 @@ impl Sync { /* Sync the BFT DAG */ - // Retrieve the leader certificates. - let leader_certificates = blocks - .iter() - .flat_map(|block| { - match block.authority() { - // If the block authority is a beacon, then skip the block. - Authority::Beacon(_) => None, - // If the block authority is a subdag, then retrieve the certificates. - Authority::Quorum(subdag) => Some(subdag.leader_certificate().clone()), - } - }) - .collect::>(); - if leader_certificates.is_empty() { - return Ok(()); - } - // Construct a list of the certificates. let certificates = blocks .iter() @@ -266,10 +250,10 @@ impl Sync { .flatten() .collect::>(); - // If a BFT sender was provided, send the certificate to the BFT. + // If a BFT sender was provided, send the certificates to the BFT. if let Some(bft_sender) = self.bft_sender.get() { // Await the callback to continue. - if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send((leader_certificates, certificates)).await { + if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await { bail!("Failed to update the BFT DAG from sync: {e}"); } }