Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round quorum before advancement #2897

Closed
wants to merge 15 commits into from
Closed
4 changes: 2 additions & 2 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::{
events::{EventCodec, PrimaryPing},
helpers::{assign_to_worker, Cache, PrimarySender, Resolver, SyncSender, WorkerSender},
helpers::{assign_to_worker, PeerCache, PrimarySender, Resolver, SyncSender, WorkerSender},
spawn_blocking,
CONTEXT,
MAX_BATCH_DELAY_IN_MS,
Expand Down Expand Up @@ -104,7 +104,7 @@ pub struct Gateway<N: Network> {
/// The TCP stack.
tcp: Tcp,
/// The cache.
cache: Arc<Cache<N>>,
cache: Arc<PeerCache<N>>,
/// The resolver.
resolver: Arc<Resolver<N>>,
/// The set of trusted validators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use time::OffsetDateTime;

#[derive(Debug)]
pub struct Cache<N: Network> {
pub struct PeerCache<N: Network> {
/// The ordered timestamp map of peer connections and cache hits.
seen_inbound_connections: RwLock<BTreeMap<i64, HashMap<IpAddr, u32>>>,
/// The ordered timestamp map of peer IPs and cache hits.
Expand All @@ -42,14 +42,14 @@ pub struct Cache<N: Network> {
seen_outbound_validators_requests: RwLock<HashMap<SocketAddr, u32>>,
}

impl<N: Network> Default for Cache<N> {
impl<N: Network> Default for PeerCache<N> {
/// Initializes a new instance of the cache.
fn default() -> Self {
Self::new()
}
}

impl<N: Network> Cache<N> {
impl<N: Network> PeerCache<N> {
/// Initializes a new instance of the cache.
pub fn new() -> Self {
Self {
Expand All @@ -65,7 +65,7 @@ impl<N: Network> Cache<N> {
}
}

impl<N: Network> Cache<N> {
impl<N: Network> PeerCache<N> {
/// Inserts a new timestamp for the given peer connection, returning the number of recent connection requests.
pub fn insert_inbound_connection(&self, peer_ip: IpAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_connections, peer_ip, interval_in_secs)
Expand All @@ -87,7 +87,7 @@ impl<N: Network> Cache<N> {
}
}

impl<N: Network> Cache<N> {
impl<N: Network> PeerCache<N> {
/// Inserts a new timestamp for the given peer, returning the number of recent events.
pub fn insert_outbound_event(&self, peer_ip: SocketAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_outbound_events, peer_ip, interval_in_secs)
Expand All @@ -104,7 +104,7 @@ impl<N: Network> Cache<N> {
}
}

impl<N: Network> Cache<N> {
impl<N: Network> PeerCache<N> {
/// Returns `true` if the cache contains a validators request from the given IP.
pub fn contains_outbound_validators_request(&self, peer_ip: SocketAddr) -> bool {
self.seen_outbound_validators_requests.read().get(&peer_ip).map(|r| *r > 0).unwrap_or(false)
Expand All @@ -121,7 +121,7 @@ impl<N: Network> Cache<N> {
}
}

impl<N: Network> Cache<N> {
impl<N: Network> PeerCache<N> {
/// Insert a new timestamp for the given key, returning the number of recent entries.
fn retain_and_insert<K: Copy + Clone + PartialEq + Eq + Hash>(
map: &RwLock<BTreeMap<i64, HashMap<K, u32>>>,
Expand Down Expand Up @@ -233,7 +233,7 @@ mod tests {
paste::paste! {
#[test]
fn [<test_seen_ $name s>]() {
let cache = Cache::<CurrentNetwork>::default();
let cache = PeerCache::<CurrentNetwork>::default();
let input = Input::input();

// Check that the cache is empty.
Expand Down
267 changes: 267 additions & 0 deletions node/bft/src/helpers/cache_round.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{ensure, Result};
use snarkvm::{
console::types::{Address, Field},
ledger::committee::Committee,
prelude::Network,
};
use std::collections::HashSet;

#[derive(Copy, Clone, Debug)]
struct AddressWithCoordinate<N: Network> {
address: Address<N>,
x: Field<N>,
}

impl<N: Network> From<Address<N>> for AddressWithCoordinate<N> {
fn from(address: Address<N>) -> Self {
Self { address, x: address.to_group().to_x_coordinate() }
}
}

#[derive(Debug)]
pub struct RoundCache<N: Network> {
/// The current highest round which has (stake-weighted) quorum
last_highest_round_with_quorum: u64,
/// A sorted list of (round, Vec<AddressWithCoordinate<N>>), indicating the last seen highest round for each address
highest_rounds: Vec<(u64, Vec<AddressWithCoordinate<N>>)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this Vec have to be sorted? It seems only to quickly fetch an item? If that is true, why not use HashMap? This gives you O(1) lookups and O(1) inserts. Now you have O(n) inserts and O(log n) lookups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back to this if we decide to keep this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's faster and more lightweight, unless the collection can become arbitrarily large

/// A list of (AddressWithCoordinate<N>, round) to quickly find an Address' round by their x coordinate
address_rounds: Vec<(AddressWithCoordinate<N>, u64)>,
}

impl<N: Network> Default for RoundCache<N> {
/// Initializes a new instance of the cache.
fn default() -> Self {
Self::new()
}
}

impl<N: Network> RoundCache<N> {
/// Initializes a new instance of the cache.
pub fn new() -> Self {
Self {
last_highest_round_with_quorum: Default::default(),
highest_rounds: Default::default(),
address_rounds: Default::default(),
}
}

/// Insert a round seen for a validator.
fn insert_round_for_validator(&mut self, round: u64, validator: AddressWithCoordinate<N>) {
match self.highest_rounds.binary_search_by_key(&round, |(r, _)| *r) {
// Add the validator to the existing round.
Ok(new_address_index) => self.highest_rounds[new_address_index].1.push(validator),
// Initialize a new round.
Err(new_address_index) => self.highest_rounds.insert(new_address_index, (round, vec![validator])),
}
}

/// Find and prune a validator from the list of highest rounds.
fn prune_validator_from_highest_rounds(&mut self, round: u64, validator: Field<N>) -> Result<()> {
// Find the index of the round.
let round_index = self.highest_rounds.binary_search_by_key(&round, |(r, _)| *r).map_err(anyhow::Error::msg)?;
// Find the index of the address.
let address_index =
self.highest_rounds[round_index].1.binary_search_by_key(&validator, |a| a.x).map_err(anyhow::Error::msg)?;
// Remove the address from the round.
self.highest_rounds[round_index].1.remove(address_index);
// Remove the round if it's empty.
if self.highest_rounds[round_index].1.is_empty() {
self.highest_rounds.remove(round_index);
}
Ok(())
}

/// Find and prune validators which are no longer in the committee
fn prune_stale_validators(&mut self, committee: &Committee<N>) -> Result<()> {
// Determine which addresses are no longer in the committee.
let addresses_to_prune = self
.address_rounds
.iter()
.filter_map(|(a, _)| (!committee.members().contains_key(&a.address)).then_some(a.x))
.collect::<Vec<_>>();
ljedrz marked this conversation as resolved.
Show resolved Hide resolved
// Prune the stale addresses.
for address_x in addresses_to_prune {
// Find the index of the address.
let address_index =
self.address_rounds.binary_search_by_key(&address_x, |&(a, _)| a.x).map_err(anyhow::Error::msg)?;
// Get the old round for the address.
let old_round = self.address_rounds[address_index].1;
// Remove the address.
self.address_rounds.remove(address_index);
// Prune the address from the highest rounds.
self.prune_validator_from_highest_rounds(old_round, address_x)?;
}
Ok(())
}

/// Update the cache based on a new (round, address) pair. This does two things:
/// - If the round is higher than a previous one from this address, set it in `highest_rounds`
/// - Keep incrementing `last_highest_round_with_quorum` as long as it passes a stake-weighted quorum
/// We ignore the case where tomorrow's stake-weighted quorum round is *lower* than the current one
pub fn update(&mut self, round: u64, validator_address: Address<N>, committee: &Committee<N>) -> Result<u64> {
ensure!(committee.members().contains_key(&validator_address), "Address is not a member of the committee");
let validator = AddressWithCoordinate::from(validator_address);

// Determine if validator was inserted into the cache.
let mut inserted = false;
// Only consider updating the cache if we see a high round.
if round > self.last_highest_round_with_quorum {
match self.address_rounds.binary_search_by_key(&validator.x, |&(a, _)| a.x) {
// Update the existing validator.
Ok(address_index) => {
let (_, old_round) = self.address_rounds[address_index];
// Should we update the validator's highest seen round?
if old_round < round {
inserted = true;
self.address_rounds[address_index].1 = round;
self.prune_validator_from_highest_rounds(old_round, validator.x)?;
self.insert_round_for_validator(round, validator);
}
}
// Insert the new validator.
Err(address_index) => {
inserted = true;
self.address_rounds.insert(address_index, (validator, round));
self.insert_round_for_validator(round, validator);
}
}
// Prune validators if the cache exceeds the current committee size.
if self.address_rounds.len() > committee.num_members() {
self.prune_stale_validators(committee)?;
}
// Ensure the cache does not contain more validators than the current committee size.
ensure!(self.address_rounds.len() <= committee.num_members());
ensure!(self.highest_rounds.iter().map(|(_, a)| a.len()).sum::<usize>() <= committee.num_members());
}

// Check if we reached quorum on a new round.
if inserted {
while committee.is_quorum_threshold_reached(&self.validators_in_support(committee)?) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The committee being used here may changed based on the round you are checking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its fine to use the single given committee (from get_previous_committee_for_round) to update completely, because who cares whether old outdated committees have high rounds..

Sidenote: if we were to only update by 1 round at a time based on the given committee, instead of always taking just <quorum> batch proposals to update to the correct round, we will then need <quorum> + i batch proposals to increase our round by i. If <quorum> > max_gc_rounds, then we might never catch up...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case where you won't ever be able to see quorum because the fixed committee you are using doesn't include newly bonded validators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, it seems so yes... Because we use our old outdated round to determine the committee: self.ledger.get_previous_committee_for_round(self.current_round()).

The only alternative I see now is to use the peer's advertised batch_round - but one question on that: is it possible that we don't know the committee yet for rounds far in the future so that get_previous_committee_for_round(batch_round)? will keep failing? Or do we expect that to succeed at some point?

self.last_highest_round_with_quorum += 1;
raychu86 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Ok(self.last_highest_round_with_quorum)
}

/// Update the cache based on a new round with quorum.
pub fn update_quorum_round(&mut self, round: u64) {
self.last_highest_round_with_quorum = round;
}

/// Count the total stake backing an increase of last_highest_round_with_quorum
fn validators_in_support(&self, committee: &Committee<N>) -> Result<HashSet<Address<N>>> {
let mut validators_in_support = HashSet::with_capacity(committee.num_members());
// Get the index for the next round.
let quorum_index =
match self.highest_rounds.binary_search_by_key(&(self.last_highest_round_with_quorum + 1), |(r, _)| *r) {
Ok(quorum_index) => quorum_index,
Err(quorum_index) => quorum_index,
};
// Find the validators in support of the next round.
if let Some(highest_rounds) = self.highest_rounds.get(quorum_index..) {
for (_, addresses) in highest_rounds {
validators_in_support.extend(addresses.iter().map(|a| a.address));
}
};
Ok(validators_in_support)
}

/// Return `self.last_highest_round_with_quorum`
pub fn last_highest_round(&self) -> u64 {
self.last_highest_round_with_quorum
}
}

#[cfg(test)]
mod tests {
use super::*;
use indexmap::IndexMap;
use snarkvm::{
prelude::{Testnet3, Uniform},
utilities::TestRng,
};

type CurrentNetwork = Testnet3;

#[test]
fn test_round_cache() {
let mut rng = TestRng::default();

let num_validators = 200;
let mut addresses = Vec::new();
for _ in 0..num_validators {
addresses.push(Address::<CurrentNetwork>::rand(&mut rng));
}

let minimum_stake = 1000000000000;
let accepts_delegators = true;
let committee_members =
addresses.iter().map(|&a| (a, (minimum_stake, accepts_delegators))).collect::<IndexMap<_, _>>();
let committee = Committee::<CurrentNetwork>::new(0, committee_members).unwrap();

// Test case 1: when we always observe increasing round numbers
let mut cache = RoundCache::<CurrentNetwork>::default();
// Check that the cache is empty
assert_eq!(cache.last_highest_round(), 0);
for round in 1..1000 {
cache.update(round as u64, addresses[round % num_validators], &committee).unwrap();
}
// Check that the cache is correctly updated
assert_eq!(cache.last_highest_round(), 866);

// Test case 2: when we always observe the same round number
let mut cache = RoundCache::<CurrentNetwork>::default();
for round in 1..1000 {
cache.update(round as u64, addresses[0], &committee).unwrap();
}
// Check that the cache is correctly updated
assert_eq!(cache.last_highest_round(), 0);

// Test case 3: when we observe non-consecutive round numbers
let mut cache = RoundCache::<CurrentNetwork>::default();
for round in 0..50 {
cache.update(0, addresses[round % num_validators], &committee).unwrap();
cache.update(10, addresses[round + 50 % num_validators], &committee).unwrap();
cache.update(15, addresses[round + 100 % num_validators], &committee).unwrap();
cache.update(20, addresses[round + 150 % num_validators], &committee).unwrap();
}
// Check that the cache is correctly updated
assert_eq!(cache.last_highest_round(), 10);

// Test case 4: remove and add validators from the committee
let mut cache = RoundCache::<CurrentNetwork>::default();
for round in 1..1000 {
cache.update(round as u64, addresses[round % num_validators], &committee).unwrap();
}

// Remove a member from the committee
let mut committee_members = committee.members().clone();
committee_members.remove(&addresses[0]);
let committee = Committee::<CurrentNetwork>::new(0, committee_members).unwrap();
// Updating with address which is not in the committee should fail
assert!(cache.update(1001, addresses[0], &committee).is_err());
// Updating with a smaller commitee should prune the removed addresses from the cache
cache.update(1001, addresses[1], &committee).unwrap();

// Add a member back to the committee
let mut committee_members = committee.members().clone();
let new_address = Address::<CurrentNetwork>::rand(&mut rng);
committee_members.insert(new_address, (minimum_stake, accepts_delegators));
let committee = Committee::<CurrentNetwork>::new(0, committee_members).unwrap();
cache.update(1000, new_address, &committee).unwrap();
}
}
7 changes: 5 additions & 2 deletions node/bft/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod cache;
pub use cache::*;
pub mod cache_peer;
pub use cache_peer::*;

pub mod cache_round;
pub use cache_round::*;

pub mod channels;
pub use channels::*;
Expand Down