Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Skip proposing duplicated transmissions #3178

Merged
merged 9 commits into from Mar 22, 2024
146 changes: 119 additions & 27 deletions node/bft/src/primary.rs
Expand Up @@ -99,6 +99,9 @@ pub struct Primary<N: Network> {
}

impl<N: Network> Primary<N> {
/// The maximum number of unconfirmed transmissions to send to the primary.
pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;

/// Initializes a new primary instance.
pub fn new(
account: Account<N>,
Expand Down Expand Up @@ -394,38 +397,61 @@ impl<N: Network> Primary<N> {
let mut num_transactions = 0;
// Take the transmissions from the workers.
for worker in self.workers.iter() {
for (id, transmission) in worker.drain(num_transmissions_per_worker) {
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue;
// Initialize a tracker for included transmissions for the current worker.
let mut num_transmissions_included_for_worker = 0;
// Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
// Determine the number of remaining transmissions for the worker.
let num_remaining_transmissions =
num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
// Drain the worker.
let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
// If the worker is empty, break early.
if worker_transmissions.peek().is_none() {
break 'outer;
}
// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => {
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue;
}
// Iterate through the worker transmissions.
'inner: for (id, transmission) in worker_transmissions {
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue 'inner;
}
(TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => {
// Check if the transaction is still valid.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue;
// Check if the storage already contain the transmission.
// Note: We do not skip if this is the first transmission in the proposal, to ensure that
// the primary does not propose a batch with no transmissions.
if !transmissions.is_empty() && self.storage.contains_transmission(id) {
trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
continue 'inner;
}
// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => {
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue 'inner;
}
}
(TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => {
// Check if the transaction is still valid.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue 'inner;
}
// Increment the number of transactions.
num_transactions += 1;
}
// Increment the number of transactions.
num_transactions += 1;
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue 'inner,
}
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue,
// Insert the transmission into the map.
transmissions.insert(id, transmission);
num_transmissions_included_for_worker += 1;
}
// Insert the transmission into the map.
transmissions.insert(id, transmission);
}
}
// If there are no unconfirmed transmissions to propose, return early.
Expand Down Expand Up @@ -1761,6 +1787,72 @@ mod tests {
assert!(primary.proposed_batch.read().is_some());
}

#[tokio::test]
async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
let round = 3;
let prev_round = round - 1;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;

// Fill primary storage.
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Get transmissions from previous certificates.
let previous_certificate_ids: IndexSet<_> =
primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect();

// Track the number of transmissions in the previous round.
let mut num_transmissions_in_previous_round = 0;

// Generate a solution and a transaction.
let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);

// Store it on one of the workers.
primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();

// Check that the worker has 2 transmissions.
assert_eq!(primary.workers[0].num_transmissions(), 2);

// Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
for (_, account) in accounts.iter() {
let (certificate, transmissions) = create_batch_certificate(
account.address(),
&accounts,
round,
previous_certificate_ids.clone(),
&mut rng,
);

// Add the transmissions to the worker.
for (transmission_id, transmission) in transmissions.iter() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
}

// Insert the certificate to storage.
num_transmissions_in_previous_round += transmissions.len();
primary.storage.insert_certificate(certificate, transmissions).unwrap();
}

// Advance to the next round.
assert!(primary.storage.increment_to_next_round(round).is_ok());

// Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);

// Propose the batch.
assert!(primary.propose_batch().await.is_ok());

// Check that the proposal only contains the new transmissions that were not in previous certificates.
let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
assert_eq!(proposed_transmissions.len(), 2);
assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment)));
assert!(proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id)));
}

#[tokio::test]
async fn test_batch_propose_from_peer() {
let mut rng = TestRng::default();
Expand Down
16 changes: 10 additions & 6 deletions node/consensus/src/lib.rs
Expand Up @@ -28,6 +28,7 @@ use snarkos_node_bft::{
Storage as NarwhalStorage,
},
spawn_blocking,
Primary,
BFT,
};
use snarkos_node_bft_ledger_service::LedgerService;
Expand Down Expand Up @@ -238,14 +239,17 @@ impl<N: Network> Consensus<N> {
}

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > N::MAX_SOLUTIONS || num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
|| num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
{
return Ok(());
}
// Retrieve the solutions.
let solutions = {
// Determine the available capacity.
let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed);
let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
// Acquire the lock on the queue.
let mut queue = self.solutions_queue.lock();
// Determine the number of solutions to send.
Expand Down Expand Up @@ -304,14 +308,14 @@ impl<N: Network> Consensus<N> {
}

// If the memory pool of this node is full, return early.
let num_unconfirmed = self.num_unconfirmed_transmissions();
if num_unconfirmed > BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
return Ok(());
}
// Retrieve the transactions.
let transactions = {
// Determine the available capacity.
let capacity = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
// Acquire the lock on the transactions queue.
let mut tx_queue = self.transactions_queue.lock();
// Determine the number of deployments to send.
Expand Down