Skip to content

Commit

Permalink
Merge pull request #3164 from niklaslong/tests/gateway-no-noise
Browse files Browse the repository at this point in the history
[Test] gateway e2e responder handshake testing (no noise)
  • Loading branch information
howardwu committed Apr 12, 2024
2 parents af0b74a + e01a329 commit f42209e
Show file tree
Hide file tree
Showing 12 changed files with 524 additions and 98 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ version = "0.2"
version = "1"

[dev-dependencies.pea2pea]
version = "0.46"
version = "0.49"

[dev-dependencies.snarkos-node-router]
path = "./router"
Expand Down
3 changes: 3 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ version = "5"
[dev-dependencies.paste]
version = "1"

[dev-dependencies.pea2pea]
version = "0.49"

[dev-dependencies.proptest]
version = "1.4.0"

Expand Down
1 change: 1 addition & 0 deletions node/bft/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod primary;
pub mod test_peer;
pub mod utils;

pub type CurrentNetwork = snarkvm::prelude::MainnetV0;
Expand Down
9 changes: 5 additions & 4 deletions node/bft/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ impl TestValidator {
impl TestNetwork {
// Creates a new test network with the given configuration.
pub fn new(config: TestNetworkConfig) -> Self {
let mut rng = TestRng::default();

if let Some(log_level) = config.log_level {
initialize_logger(log_level);
}

let (accounts, committee) = new_test_committee(config.num_nodes);
let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng);
let bonded_balances: IndexMap<_, _> = committee
.members()
.iter()
Expand All @@ -148,7 +150,6 @@ impl TestNetwork {

let mut validators = HashMap::with_capacity(config.num_nodes as usize);
for (id, account) in accounts.into_iter().enumerate() {
let mut rng = TestRng::fixed(id as u64);
let gen_ledger =
genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng);
let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, Default::default()));
Expand Down Expand Up @@ -329,12 +330,12 @@ impl TestNetwork {
}

// Initializes a new test committee.
pub fn new_test_committee(n: u16) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) {
pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) {
let mut accounts = Vec::with_capacity(n as usize);
let mut members = IndexMap::with_capacity(n as usize);
for i in 0..n {
// Sample the account.
let account = Account::new(&mut TestRng::fixed(i as u64)).unwrap();
let account = Account::new(rng).unwrap();
info!("Validator {}: {}", i, account.address());

members.insert(account.address(), (MIN_VALIDATOR_STAKE, false));
Expand Down
146 changes: 146 additions & 0 deletions node/bft/tests/common/test_peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::common::CurrentNetwork;
use snarkos_node_bft_events::{Event, EventCodec};

use std::{
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};

use pea2pea::{
protocols::{Handshake, OnDisconnect, Reading, Writing},
Config,
Connection,
ConnectionSide,
Node,
Pea2Pea,
};

use tokio::{
sync::mpsc::{self, Receiver, Sender},
time::timeout,
};

pub struct TestPeer {
inner_node: InnerNode,
inbound_rx: Receiver<(SocketAddr, Event<CurrentNetwork>)>,
}

#[derive(Clone)]
struct InnerNode {
// The pea2pea node instance.
node: Node,
// The inbound channel sender, used to consolidate inbound messages into a single queue so they
// can be read in order in tests.
inbound_tx: Sender<(SocketAddr, Event<CurrentNetwork>)>,
}

impl TestPeer {
pub async fn new() -> Self {
let (tx, rx) = mpsc::channel(100);
let inner_node = InnerNode {
node: Node::new(Config {
max_connections: 200,
listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)),
..Default::default()
}),
inbound_tx: tx,
};

inner_node.enable_handshake().await;
inner_node.enable_reading().await;
inner_node.enable_writing().await;
inner_node.enable_disconnect().await;
inner_node.node().start_listening().await.unwrap();

Self { inner_node, inbound_rx: rx }
}

pub fn listening_addr(&self) -> SocketAddr {
self.inner_node.node().listening_addr().expect("addr should be present")
}

pub async fn connect(&self, target: SocketAddr) -> io::Result<()> {
self.inner_node.node().connect(target).await?;
Ok(())
}

// Note: the codec doesn't actually support sending bytes post-handshake, perhaps this should
// be relaxed by making a test-only codec in future.
pub fn unicast(&self, target: SocketAddr, message: Event<CurrentNetwork>) -> io::Result<()> {
self.inner_node.unicast(target, message)?;
Ok(())
}

pub async fn recv(&mut self) -> (SocketAddr, Event<CurrentNetwork>) {
match self.inbound_rx.recv().await {
Some(message) => message,
None => panic!("all senders dropped!"),
}
}

pub async fn recv_timeout(&mut self, duration: Duration) -> (SocketAddr, Event<CurrentNetwork>) {
match timeout(duration, self.recv()).await {
Ok(message) => message,
_ => panic!("timed out waiting for message"),
}
}
}

impl Pea2Pea for InnerNode {
fn node(&self) -> &Node {
&self.node
}
}

impl Handshake for InnerNode {
// Set the timeout on the test peer to be longer than the gateway's timeout.
const TIMEOUT_MS: u64 = 10_000;

async fn perform_handshake(&self, connection: Connection) -> io::Result<Connection> {
// Don't perform the Aleo handshake so we can test the edge cases fully.
Ok(connection)
}
}

impl Writing for InnerNode {
type Codec = EventCodec<CurrentNetwork>;
type Message = Event<CurrentNetwork>;

fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
Default::default()
}
}

impl Reading for InnerNode {
type Codec = EventCodec<CurrentNetwork>;
type Message = Event<CurrentNetwork>;

fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
Default::default()
}

async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
self.inbound_tx.send((peer_addr, message)).await.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "failed to send message to test peer, all receivers have been dropped")
})
}
}

impl OnDisconnect for InnerNode {
async fn on_disconnect(&self, _peer_addr: SocketAddr) {}
}
76 changes: 72 additions & 4 deletions node/bft/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::common::CurrentNetwork;
use snarkos_node_bft::helpers::PrimarySender;
use crate::common::{primary, CurrentNetwork, TranslucentLedgerService};
use snarkos_account::Account;
use snarkos_node_bft::{
helpers::{PrimarySender, Storage},
Gateway,
Worker,
};

use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
ledger::narwhal::Data,
console::account::Address,
ledger::{
committee::Committee,
narwhal::{BatchHeader, Data},
store::helpers::memory::ConsensusMemory,
},
prelude::{
block::Transaction,
committee::MIN_VALIDATOR_STAKE,
puzzle::{Solution, SolutionID},
Field,
Network,
Expand All @@ -26,9 +39,11 @@ use snarkvm::{
},
};

use std::time::Duration;
use std::{sync::Arc, time::Duration};

use ::bytes::Bytes;
use indexmap::IndexMap;
use parking_lot::RwLock;
use rand::Rng;
use tokio::{sync::oneshot, task::JoinHandle, time::sleep};
use tracing::*;
Expand Down Expand Up @@ -164,3 +179,56 @@ pub fn fire_unconfirmed_transactions(
}
})
}

/// Samples a new ledger with the given number of nodes.
pub fn sample_ledger(
accounts: &[Account<CurrentNetwork>],
committee: &Committee<CurrentNetwork>,
rng: &mut TestRng,
) -> Arc<TranslucentLedgerService<CurrentNetwork, ConsensusMemory<CurrentNetwork>>> {
let num_nodes = committee.num_members();
let bonded_balances: IndexMap<_, _> =
committee.members().iter().map(|(address, (amount, _))| (*address, (*address, *address, *amount))).collect();
let gen_key = *accounts[0].private_key();
let public_balance_per_validator =
(CurrentNetwork::STARTING_SUPPLY - (num_nodes as u64) * MIN_VALIDATOR_STAKE) / (num_nodes as u64);
let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
for account in accounts.iter() {
balances.insert(account.address(), public_balance_per_validator);
}

let gen_ledger =
primary::genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), rng);
Arc::new(TranslucentLedgerService::new(gen_ledger, Default::default()))
}

/// Samples a new storage with the given ledger.
pub fn sample_storage<N: Network>(ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>) -> Storage<N> {
Storage::new(ledger, Arc::new(BFTMemoryService::new()), BatchHeader::<N>::MAX_GC_ROUNDS as u64)
}

/// Samples a new gateway with the given ledger.
pub fn sample_gateway<N: Network>(
account: Account<N>,
storage: Storage<N>,
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Gateway<N> {
// Initialize the gateway.
Gateway::new(account, storage, ledger, None, &[], None).unwrap()
}

/// Samples a new worker with the given ledger.
pub fn sample_worker<N: Network>(
id: u8,
account: Account<N>,
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Worker<N> {
// Sample a storage.
let storage = sample_storage(ledger.clone());
// Sample a gateway.
let gateway = sample_gateway(account, storage.clone(), ledger.clone());
// Sample a dummy proposed batch.
let proposed_batch = Arc::new(RwLock::new(None));
// Construct the worker instance.
Worker::new(id, Arc::new(gateway.clone()), storage.clone(), ledger, proposed_batch).unwrap()
}

0 comments on commit f42209e

Please sign in to comment.