diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index f91cd52bf4..d954c98a6d 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -99,6 +99,9 @@ pub struct Primary { } impl Primary { + /// The maximum number of unconfirmed transmissions to send to the primary. + pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * 2; + /// Initializes a new primary instance. pub fn new( account: Account, @@ -394,38 +397,61 @@ impl Primary { let mut num_transactions = 0; // Take the transmissions from the workers. for worker in self.workers.iter() { - for (id, transmission) in worker.drain(num_transmissions_per_worker) { - // Check if the ledger already contains the transmission. - if self.ledger.contains_transmission(&id).unwrap_or(true) { - trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); - continue; + // Initialize a tracker for included transmissions for the current worker. + let mut num_transmissions_included_for_worker = 0; + // Keep draining the worker until the desired number of transmissions is reached or the worker is empty. + 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker { + // Determine the number of remaining transmissions for the worker. + let num_remaining_transmissions = + num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker); + // Drain the worker. + let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable(); + // If the worker is empty, break early. + if worker_transmissions.peek().is_none() { + break 'outer; } - // Check the transmission is still valid. - match (id, transmission.clone()) { - (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { - // Check if the solution is still valid. - if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { - trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); - continue; - } + // Iterate through the worker transmissions. + 'inner: for (id, transmission) in worker_transmissions { + // Check if the ledger already contains the transmission. + if self.ledger.contains_transmission(&id).unwrap_or(true) { + trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); + continue 'inner; } - (TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => { - // Check if the transaction is still valid. - if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { - trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); - continue; + // Check if the storage already contain the transmission. + // Note: We do not skip if this is the first transmission in the proposal, to ensure that + // the primary does not propose a batch with no transmissions. + if !transmissions.is_empty() && self.storage.contains_transmission(id) { + trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); + continue 'inner; + } + // Check the transmission is still valid. + match (id, transmission.clone()) { + (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { + // Check if the solution is still valid. + if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { + trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + continue 'inner; + } + } + (TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => { + // Check if the transaction is still valid. + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { + trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + continue 'inner; + } + // Increment the number of transactions. + num_transactions += 1; } - // Increment the number of transactions. - num_transactions += 1; + // Note: We explicitly forbid including ratifications, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => continue, + // All other combinations are clearly invalid. + _ => continue 'inner, } - // Note: We explicitly forbid including ratifications, - // as the protocol currently does not support ratifications. - (TransmissionID::Ratification, Transmission::Ratification) => continue, - // All other combinations are clearly invalid. - _ => continue, + // Insert the transmission into the map. + transmissions.insert(id, transmission); + num_transmissions_included_for_worker += 1; } - // Insert the transmission into the map. - transmissions.insert(id, transmission); } } // If there are no unconfirmed transmissions to propose, return early. @@ -1761,6 +1787,72 @@ mod tests { assert!(primary.proposed_batch.read().is_some()); } + #[tokio::test] + async fn test_propose_batch_skip_transmissions_from_previous_certificates() { + let round = 3; + let prev_round = round - 1; + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng).await; + let peer_account = &accounts[1]; + let peer_ip = peer_account.0; + + // Fill primary storage. + store_certificate_chain(&primary, &accounts, round, &mut rng); + + // Get transmissions from previous certificates. + let previous_certificate_ids: IndexSet<_> = + primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect(); + + // Track the number of transmissions in the previous round. + let mut num_transmissions_in_previous_round = 0; + + // Generate a solution and a transaction. + let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng); + let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); + + // Store it on one of the workers. + primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap(); + primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); + + // Check that the worker has 2 transmissions. + assert_eq!(primary.workers[0].num_transmissions(), 2); + + // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage. + for (_, account) in accounts.iter() { + let (certificate, transmissions) = create_batch_certificate( + account.address(), + &accounts, + round, + previous_certificate_ids.clone(), + &mut rng, + ); + + // Add the transmissions to the worker. + for (transmission_id, transmission) in transmissions.iter() { + primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()); + } + + // Insert the certificate to storage. + num_transmissions_in_previous_round += transmissions.len(); + primary.storage.insert_certificate(certificate, transmissions).unwrap(); + } + + // Advance to the next round. + assert!(primary.storage.increment_to_next_round(round).is_ok()); + + // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions. + assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2); + + // Propose the batch. + assert!(primary.propose_batch().await.is_ok()); + + // Check that the proposal only contains the new transmissions that were not in previous certificates. + let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone(); + assert_eq!(proposed_transmissions.len(), 2); + assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment))); + assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id))); + } + #[tokio::test] async fn test_batch_propose_from_peer() { let mut rng = TestRng::default(); diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 75b02e89e0..7fcab5b00e 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -28,6 +28,7 @@ use snarkos_node_bft::{ Storage as NarwhalStorage, }, spawn_blocking, + Primary, BFT, }; use snarkos_node_bft_ledger_service::LedgerService; @@ -238,14 +239,17 @@ impl Consensus { } // If the memory pool of this node is full, return early. - let num_unconfirmed = self.num_unconfirmed_transmissions(); - if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH { + let num_unconfirmed_solutions = self.num_unconfirmed_solutions(); + let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); + if num_unconfirmed_solutions >= N::MAX_SOLUTIONS + || num_unconfirmed_transmissions >= Primary::::MAX_TRANSMISSIONS_TOLERANCE + { return Ok(()); } // Retrieve the solutions. let solutions = { // Determine the available capacity. - let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed); + let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions); // Acquire the lock on the queue. let mut queue = self.solutions_queue.lock(); // Determine the number of solutions to send. @@ -304,14 +308,14 @@ impl Consensus { } // If the memory pool of this node is full, return early. - let num_unconfirmed = self.num_unconfirmed_transmissions(); - if num_unconfirmed > BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH { + let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); + if num_unconfirmed_transmissions >= Primary::::MAX_TRANSMISSIONS_TOLERANCE { return Ok(()); } // Retrieve the transactions. let transactions = { // Determine the available capacity. - let capacity = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed); + let capacity = Primary::::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions); // Acquire the lock on the transactions queue. let mut tx_queue = self.transactions_queue.lock(); // Determine the number of deployments to send.