Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZKS-02] Update the set of authorized validators #3118

Merged
merged 11 commits into from Mar 12, 2024
5 changes: 5 additions & 0 deletions node/bft/ledger-service/src/ledger.rs
Expand Up @@ -108,6 +108,11 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
self.ledger.get_hash(height)
}

/// Returns the block round for the given block height, if it exists.
fn get_block_round(&self, height: u32) -> Result<u64> {
self.ledger.get_block(height).map(|block| block.round())
}

/// Returns the block for the given block height.
fn get_block(&self, height: u32) -> Result<Block<N>> {
self.ledger.get_block(height)
Expand Down
40 changes: 29 additions & 11 deletions node/bft/ledger-service/src/mock.rs
Expand Up @@ -32,35 +32,35 @@ use tracing::*;
#[derive(Debug)]
pub struct MockLedgerService<N: Network> {
committee: Committee<N>,
height_to_hash: Mutex<BTreeMap<u32, N::BlockHash>>,
height_to_round_and_hash: Mutex<BTreeMap<u32, (u64, N::BlockHash)>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably be better if this was an RwLock, as I can see several read-only operations available

}

impl<N: Network> MockLedgerService<N> {
/// Initializes a new mock ledger service.
pub fn new(committee: Committee<N>) -> Self {
Self { committee, height_to_hash: Default::default() }
Self { committee, height_to_round_and_hash: Default::default() }
}

/// Initializes a new mock ledger service at the specified height.
pub fn new_at_height(committee: Committee<N>, height: u32) -> Self {
let mut height_to_hash = BTreeMap::new();
for i in 0..=height {
height_to_hash.insert(i, (Field::<N>::from_u32(i)).into());
height_to_hash.insert(i, (i as u64 * 2, Field::<N>::from_u32(i).into()));
}
Self { committee, height_to_hash: Mutex::new(height_to_hash) }
Self { committee, height_to_round_and_hash: Mutex::new(height_to_hash) }
}
}

#[async_trait]
impl<N: Network> LedgerService<N> for MockLedgerService<N> {
/// Returns the latest round in the ledger.
fn latest_round(&self) -> u64 {
*self.height_to_hash.lock().keys().last().unwrap_or(&0) as u64
*self.height_to_round_and_hash.lock().keys().last().unwrap_or(&0) as u64
}

/// Returns the latest block height in the canonical ledger.
fn latest_block_height(&self) -> u32 {
self.height_to_hash.lock().last_key_value().map(|(height, _)| *height).unwrap_or(0)
self.height_to_round_and_hash.lock().last_key_value().map(|(height, _)| *height).unwrap_or(0)
}

/// Returns the latest block in the ledger.
Expand All @@ -78,21 +78,39 @@ impl<N: Network> LedgerService<N> for MockLedgerService<N> {

/// Returns `true` if the given block height exists in the canonical ledger.
fn contains_block_height(&self, height: u32) -> bool {
self.height_to_hash.lock().contains_key(&height)
self.height_to_round_and_hash.lock().contains_key(&height)
}

/// Returns the canonical block height for the given block hash, if it exists.
fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32> {
match self.height_to_hash.lock().iter().find_map(|(height, h)| if h == hash { Some(*height) } else { None }) {
match self
.height_to_round_and_hash
.lock()
.iter()
.find_map(|(height, (_, h))| if h == hash { Some(*height) } else { None })
{
Some(height) => Ok(height),
None => bail!("Missing block {hash}"),
}
}

/// Returns the canonical block hash for the given block height, if it exists.
fn get_block_hash(&self, height: u32) -> Result<N::BlockHash> {
match self.height_to_hash.lock().get(&height).cloned() {
Some(hash) => Ok(hash),
match self.height_to_round_and_hash.lock().get(&height).cloned() {
Some((_, hash)) => Ok(hash),
None => bail!("Missing block {height}"),
}
}

/// Returns the block round for the given block height, if it exists.
fn get_block_round(&self, height: u32) -> Result<u64> {
match self
.height_to_round_and_hash
.lock()
.iter()
.find_map(|(h, (round, _))| if *h == height { Some(*round) } else { None })
{
Some(round) => Ok(round),
None => bail!("Missing block {height}"),
}
}
Expand Down Expand Up @@ -205,7 +223,7 @@ impl<N: Network> LedgerService<N> for MockLedgerService<N> {
block.height(),
self.latest_block_height()
);
self.height_to_hash.lock().insert(block.height(), block.hash());
self.height_to_round_and_hash.lock().insert(block.height(), (block.round(), block.hash()));
Ok(())
}
}
5 changes: 5 additions & 0 deletions node/bft/ledger-service/src/prover.rs
Expand Up @@ -81,6 +81,11 @@ impl<N: Network> LedgerService<N> for ProverLedgerService<N> {
bail!("Block {height} does not exist in prover")
}

/// Returns the block round for the given block height, if it exists.
fn get_block_round(&self, height: u32) -> Result<u64> {
bail!("Block {height} does not exist in prover")
}

/// Returns the block for the given block height.
fn get_block(&self, height: u32) -> Result<Block<N>> {
bail!("Block {height} does not exist in prover")
Expand Down
3 changes: 3 additions & 0 deletions node/bft/ledger-service/src/traits.rs
Expand Up @@ -51,6 +51,9 @@ pub trait LedgerService<N: Network>: Debug + Send + Sync {
/// Returns the block hash for the given block height, if it exists.
fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;

/// Returns the block round for the given block height, if it exists.
fn get_block_round(&self, height: u32) -> Result<u64>;

/// Returns the block for the given block height.
fn get_block(&self, height: u32) -> Result<Block<N>>;

Expand Down
5 changes: 5 additions & 0 deletions node/bft/ledger-service/src/translucent.rs
Expand Up @@ -92,6 +92,11 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for TranslucentLedgerS
self.inner.get_block_hash(height)
}

/// Returns the block round for the given block height, if it exists.
fn get_block_round(&self, height: u32) -> Result<u64> {
self.inner.get_block_round(height)
}

/// Returns the block for the given block height.
fn get_block(&self, height: u32) -> Result<Block<N>> {
self.inner.get_block(height)
Expand Down
113 changes: 97 additions & 16 deletions node/bft/src/gateway.rs
Expand Up @@ -14,7 +14,7 @@

use crate::{
events::{EventCodec, PrimaryPing},
helpers::{assign_to_worker, Cache, PrimarySender, Resolver, SyncSender, WorkerSender},
helpers::{assign_to_worker, Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender},
spawn_blocking,
Worker,
CONTEXT,
Expand All @@ -39,7 +39,7 @@ use snarkos_node_bft_events::{
ValidatorsResponse,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::communication_service::CommunicationService;
use snarkos_node_sync::{communication_service::CommunicationService, MAX_BLOCKS_BEHIND};
use snarkos_node_tcp::{
is_bogon_ip,
is_unspecified_or_broadcast_ip,
Expand Down Expand Up @@ -100,6 +100,8 @@ pub trait Transport<N: Network>: Send + Sync {
pub struct Gateway<N: Network> {
/// The account of the node.
account: Account<N>,
/// The storage.
storage: Storage<N>,
/// The ledger service.
ledger: Arc<dyn LedgerService<N>>,
/// The TCP stack.
Expand Down Expand Up @@ -133,6 +135,7 @@ impl<N: Network> Gateway<N> {
/// Initializes a new gateway.
pub fn new(
account: Account<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
Expand All @@ -149,6 +152,7 @@ impl<N: Network> Gateway<N> {
// Return the gateway.
Ok(Self {
account,
storage,
ledger,
tcp,
cache: Default::default(),
Expand Down Expand Up @@ -330,18 +334,38 @@ impl<N: Network> Gateway<N> {

/// Returns `true` if the given address is an authorized validator.
pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
// Determine if the validator address is a member of the committee lookback or the current committee.
// Determine if the validator address is a member of the committee lookback,
// the current committee, or the previous committee lookbacks.
// We allow leniency in this validation check in order to accommodate these two scenarios:
// 1. New validators should be able to connect immediately once bonded as a committee member.
// 2. Existing validators must remain connected until they are no longer bonded as a committee member.
// (i.e. meaning they must stay online until the next block has been produced)
self.ledger
.get_committee_lookback_for_round(self.ledger.latest_round())

// Determine if the validator is in the current committee with lookback.
if self
.ledger
.get_committee_lookback_for_round(self.storage.current_round())
.map_or(false, |committee| committee.is_committee_member(validator_address))
|| self
.ledger
.current_committee()
.map_or(false, |committee| committee.is_committee_member(validator_address))
{
return true;
}

// Determine if the validator is in the latest committee on the ledger.
if self.ledger.current_committee().map_or(false, |committee| committee.is_committee_member(validator_address)) {
return true;
}

// Retrieve the previous block height to consider from the sync tolerance.
let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
// Determine if the validator is in any of the previous committee lookbacks.
match self.ledger.get_block_round(previous_block_height) {
Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
self.ledger
.get_committee_lookback_for_round(round)
.map_or(false, |committee| committee.is_committee_member(validator_address))
}),
Err(_) => false,
}
}

/// Returns the maximum number of connected peers.
Expand Down Expand Up @@ -1341,16 +1365,22 @@ mod prop_tests {
};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_tcp::P2P;
use snarkvm::{
ledger::committee::{
prop_tests::{CommitteeContext, ValidatorSet},
Committee,
ledger::{
committee::{
prop_tests::{CommitteeContext, ValidatorSet},
test_helpers::sample_committee_for_round_and_members,
Committee,
},
narwhal::{batch_certificate::test_helpers::sample_batch_certificate_for_round, BatchHeader},
},
prelude::{MainnetV0, PrivateKey},
utilities::TestRng,
};

use indexmap::IndexMap;
use indexmap::{IndexMap, IndexSet};
use proptest::{
prelude::{any, any_with, Arbitrary, BoxedStrategy, Just, Strategy},
sample::Selector,
Expand Down Expand Up @@ -1402,6 +1432,7 @@ mod prop_tests {
.prop_map(|(storage, _, private_key, address)| {
Gateway::new(
Account::try_from(private_key).unwrap(),
storage.clone(),
storage.ledger().clone(),
address.ip(),
&[],
Expand Down Expand Up @@ -1450,7 +1481,9 @@ mod prop_tests {
let (storage, _, private_key, dev) = input;
let account = Account::try_from(private_key).unwrap();

let gateway = Gateway::new(account.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
let gateway =
Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
.unwrap();
let tcp_config = gateway.tcp().config();
assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
Expand All @@ -1465,7 +1498,9 @@ mod prop_tests {
let (storage, _, private_key, dev) = input;
let account = Account::try_from(private_key).unwrap();

let gateway = Gateway::new(account.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
let gateway =
Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
.unwrap();
let tcp_config = gateway.tcp().config();
if let Some(socket_addr) = dev.ip() {
assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
Expand All @@ -1490,7 +1525,8 @@ mod prop_tests {
let worker_storage = storage.clone();
let account = Account::try_from(private_key).unwrap();

let gateway = Gateway::new(account, storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
let gateway =
Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();

let (primary_sender, _) = init_primary_channels();

Expand Down Expand Up @@ -1525,4 +1561,49 @@ mod prop_tests {
);
assert_eq!(gateway.num_workers(), workers.len() as u8);
}

#[proptest]
fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
let rng = &mut TestRng::default();

// Initialize the round parameters.
let current_round = 2;
let committee_size = 4;
let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
let (_, _, private_key, dev) = input;
let account = Account::try_from(private_key).unwrap();

// Sample the certificates.
let mut certificates = IndexSet::new();
for _ in 0..committee_size {
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
}
let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
// Initialize the committee.
let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
// Sample extra certificates from non-committee members.
for _ in 0..committee_size {
certificates.insert(sample_batch_certificate_for_round(current_round, rng));
}
// Initialize the ledger.
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
// Initialize the gateway.
let gateway =
Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
// Insert certificate to the storage.
for certificate in certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
// Check that the current committee members are authorized validators.
for i in 0..certificates.clone().len() {
let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
if i < committee_size {
assert!(is_authorized);
} else {
assert!(!is_authorized);
}
}
}
}
2 changes: 1 addition & 1 deletion node/bft/src/primary.rs
Expand Up @@ -109,7 +109,7 @@ impl<N: Network> Primary<N> {
dev: Option<u16>,
) -> Result<Self> {
// Initialize the gateway.
let gateway = Gateway::new(account, ledger.clone(), ip, trusted_validators, dev)?;
let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
// Initialize the sync module.
let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
// Initialize the primary instance.
Expand Down
1 change: 1 addition & 0 deletions node/bft/src/worker.rs
Expand Up @@ -511,6 +511,7 @@ mod tests {
fn contains_block_height(&self, height: u32) -> bool;
fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
fn get_block_round(&self, height: u32) -> Result<u64>;
fn get_block(&self, height: u32) -> Result<Block<N>>;
fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
fn get_solution(&self, solution_id: &PuzzleCommitment<N>) -> Result<ProverSolution<N>>;
Expand Down
9 changes: 6 additions & 3 deletions node/bft/tests/components/mod.rs
Expand Up @@ -59,20 +59,23 @@ pub fn sample_storage<N: Network>(ledger: Arc<TranslucentLedgerService<N, Consen
}

/// Samples a new gateway with the given ledger.
pub fn sample_gateway<N: Network>(ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>) -> Gateway<N> {
pub fn sample_gateway<N: Network>(
storage: Storage<N>,
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Gateway<N> {
let num_nodes: u16 = ledger.current_committee().unwrap().num_members().try_into().unwrap();
let (accounts, _committee) = primary::new_test_committee(num_nodes);
let account = Account::from_str(&accounts[0].private_key().to_string()).unwrap();
// Initialize the gateway.
Gateway::new(account, ledger, None, &[], None).unwrap()
Gateway::new(account, storage, ledger, None, &[], None).unwrap()
}

/// Samples a new worker with the given ledger.
pub fn sample_worker<N: Network>(id: u8, ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>) -> Worker<N> {
// Sample a storage.
let storage = sample_storage(ledger.clone());
// Sample a gateway.
let gateway = sample_gateway(ledger.clone());
let gateway = sample_gateway(storage.clone(), ledger.clone());
// Sample a dummy proposed batch.
let proposed_batch = Arc::new(RwLock::new(None));
// Construct the worker instance.
Expand Down