Skip to content

Commit

Permalink
feat: peergossip
Browse files Browse the repository at this point in the history
  • Loading branch information
joske committed Mar 7, 2024
1 parent 6aba25d commit 4747fd4
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 4 deletions.
6 changes: 5 additions & 1 deletion cli/src/commands/start.rs
Expand Up @@ -144,6 +144,10 @@ pub struct Start {
#[clap(long)]
/// If development mode is enabled, specify the custom bonded balances as a json object. (default: None)
dev_bonded_balances: Option<BondedBalances>,

#[clap(long = "allow-outside-peers")]
/// If the flag is set, the node will engage in P2P gossip to request more peers. (default: false)
peergossip: bool,
}

impl Start {
Expand Down Expand Up @@ -527,7 +531,7 @@ impl Start {
// Initialize the node.
let bft_ip = if self.dev.is_some() { self.bft } else { None };
match node_type {
NodeType::Validator => Node::new_validator(self.node, bft_ip, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode).await,
NodeType::Validator => Node::new_validator(self.node, bft_ip, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.peergossip).await,
NodeType::Prover => Node::new_prover(self.node, account, &trusted_peers, genesis, storage_mode).await,
NodeType::Client => Node::new_client(self.node, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode).await,
}
Expand Down
15 changes: 12 additions & 3 deletions node/router/src/heartbeat.rs
Expand Up @@ -107,6 +107,12 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
return;
}

let is_validator = self.router().node_type().is_validator();
// Skip if the node is not requesting peers.
if is_validator && !self.router().peergossip() {
return;
}

// Retrieve the trusted peers.
let trusted = self.router().trusted_peers();
// Retrieve the bootstrap peers.
Expand Down Expand Up @@ -216,9 +222,12 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
self.router().connect(peer_ip);
}
// Request more peers from the connected peers.
for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
self.send(peer_ip, Message::PeerRequest(PeerRequest));
let is_validator = self.router().node_type().is_validator();
if !is_validator || self.router().peergossip() {
// Request more peers from the connected peers.
for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
self.send(peer_ip, Message::PeerRequest(PeerRequest));
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions node/router/src/inbound.rs
Expand Up @@ -125,6 +125,9 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
if !self.router().cache.contains_outbound_peer_request(peer_ip) {
bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
}
if self.router().node_type().is_validator() && !self.router().peergossip() {
bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
}

match self.peer_response(peer_ip, &message.peers) {
true => Ok(()),
Expand Down
9 changes: 9 additions & 0 deletions node/router/src/lib.rs
Expand Up @@ -95,6 +95,8 @@ pub struct InnerRouter<N: Network> {
handles: Mutex<Vec<JoinHandle<()>>>,
/// The boolean flag for the development mode.
is_dev: bool,
/// If the flag is set, the node will not engage in P2P gossip to request more peers.
peergossip: bool,
}

impl<N: Network> Router<N> {
Expand All @@ -116,6 +118,7 @@ impl<N: Network> Router<N> {
trusted_peers: &[SocketAddr],
max_peers: u16,
is_dev: bool,
peergossip: bool,
) -> Result<Self> {
// Initialize the TCP stack.
let tcp = Tcp::new(Config::new(node_ip, max_peers));
Expand All @@ -133,6 +136,7 @@ impl<N: Network> Router<N> {
restricted_peers: Default::default(),
handles: Default::default(),
is_dev,
peergossip,
})))
}
}
Expand Down Expand Up @@ -251,6 +255,11 @@ impl<N: Network> Router<N> {
self.is_dev
}

/// Returns `true` if the node is not engaging in P2P gossip to request more peers.
pub fn peergossip(&self) -> bool {
self.peergossip
}

/// Returns the listener IP address from the (ambiguous) peer address.
pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option<SocketAddr> {
self.resolver.get_listener(peer_addr)
Expand Down
3 changes: 3 additions & 0 deletions node/router/tests/common/mod.rs
Expand Up @@ -78,6 +78,7 @@ pub async fn client(listening_port: u16, max_peers: u16) -> TestRouter<CurrentNe
&[],
max_peers,
true,
false,
)
.await
.expect("couldn't create client router")
Expand All @@ -94,6 +95,7 @@ pub async fn prover(listening_port: u16, max_peers: u16) -> TestRouter<CurrentNe
&[],
max_peers,
true,
false,
)
.await
.expect("couldn't create prover router")
Expand All @@ -110,6 +112,7 @@ pub async fn validator(listening_port: u16, max_peers: u16) -> TestRouter<Curren
&[],
max_peers,
true,
false,
)
.await
.expect("couldn't create validator router")
Expand Down
1 change: 1 addition & 0 deletions node/src/client/mod.rs
Expand Up @@ -117,6 +117,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
matches!(storage_mode, StorageMode::Development(_)),
false,
)
.await?;
// Load the coinbase puzzle.
Expand Down
2 changes: 2 additions & 0 deletions node/src/node.rs
Expand Up @@ -50,6 +50,7 @@ impl<N: Network> Node<N> {
genesis: Block<N>,
cdn: Option<String>,
storage_mode: StorageMode,
peergossip: bool,
) -> Result<Self> {
Ok(Self::Validator(Arc::new(
Validator::new(
Expand All @@ -63,6 +64,7 @@ impl<N: Network> Node<N> {
genesis,
cdn,
storage_mode,
peergossip,
)
.await?,
)))
Expand Down
1 change: 1 addition & 0 deletions node/src/prover/mod.rs
Expand Up @@ -110,6 +110,7 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
matches!(storage_mode, StorageMode::Development(_)),
false,
)
.await?;
// Load the coinbase puzzle.
Expand Down
3 changes: 3 additions & 0 deletions node/src/validator/mod.rs
Expand Up @@ -83,6 +83,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
genesis: Block<N>,
cdn: Option<String>,
storage_mode: StorageMode,
peergossip: bool,
) -> Result<Self> {
// Prepare the shutdown flag.
let shutdown: Arc<AtomicBool> = Default::default();
Expand Down Expand Up @@ -125,6 +126,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
matches!(storage_mode, StorageMode::Development(_)),
peergossip,
)
.await?;

Expand Down Expand Up @@ -496,6 +498,7 @@ mod tests {
genesis,
None,
storage_mode,
false,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions node/tests/common/node.rs
Expand Up @@ -59,6 +59,7 @@ pub async fn validator() -> Validator<CurrentNetwork, ConsensusMemory<CurrentNet
sample_genesis_block(), // Should load the current network's genesis block.
None, // No CDN.
StorageMode::Production,
false,
)
.await
.expect("couldn't create validator instance")
Expand Down

0 comments on commit 4747fd4

Please sign in to comment.