Skip to content

Commit

Permalink
Buffer votes for blocks we don't know about
Browse files Browse the repository at this point in the history
Votes can arrive for blocks we aren't yet aware of in (at least)
two circumstances:
* If we disconnect and reconnect, we will download the blocks we
missed, but nodes might send us their votes for the next block
before we've received the missed blocks.
* Due to network latency, a vote for a block proposal could arrive
before the block proposal itself.

Before this commit, the node ignores these votes and the network
eventually recovers via a new view. However, this slows things
down so we should recover faster if possible. Instead, we store
votes for unknown blocks in memory and replay them if the block
later becomes known to us.

The implementation is fairly straight forward but there are a few
caveats and TODOs:
* The return type of `Consensus::proposal` is now more
complicated, as it doesn't just return a `Message::Vote` any
more. If the proposal results in some buffered votes being
replayed and those votes form a supermajority, then the node can
immediately propose the next block.
* There's nothing which limits the memory usage of buffered votes.
A malicious node is perfectly able to send us loads of votes with
non-existant block hashes, which we will store forever. I've
raised #719 to resolve this.
* When applying buffered votes as a result of a proposal, they
take priority over our own vote. This means we lose out on the
cosigner reward. #720
* The unreliability test could be improved to be more efficient
and to assert a stricter condition on the network - #721.
  • Loading branch information
JamesHinshelwood committed Mar 18, 2024
1 parent 4e56834 commit e3addf4
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 33 deletions.
92 changes: 67 additions & 25 deletions zilliqa/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
AggregateQc, BitSlice, BitVec, Block, BlockHeader, BlockRef, Committee, ExternalMessage,
InternalMessage, NewView, Proposal, QuorumCertificate, Vote,
},
node::MessageSender,
node::{MessageSender, NetworkMessage},
pool::TransactionPool,
state::{Address, State},
time::SystemTime,
Expand Down Expand Up @@ -128,6 +128,9 @@ pub struct Consensus {
message_sender: MessageSender,
pub block_store: BlockStore,
votes: BTreeMap<Hash, (Vec<NodeSignature>, BitVec, u128, bool)>,
/// Votes for a block we don't have stored. They are retained in case we recieve the block later.
// TODO(#719): Consider how to limit the size of this.
buffered_votes: BTreeMap<Hash, Vec<Vote>>,
new_views: BTreeMap<u64, NewViewVote>,
high_qc: QuorumCertificate,
view: View,
Expand Down Expand Up @@ -281,6 +284,7 @@ impl Consensus {
block_store,
message_sender,
votes: BTreeMap::new(),
buffered_votes: BTreeMap::new(),
new_views: BTreeMap::new(),
high_qc,
view: View::new(start_view),
Expand Down Expand Up @@ -322,7 +326,7 @@ impl Consensus {
&mut self,
peer_id: PeerId,
public_key: NodePublicKey,
) -> Result<Option<(Option<PeerId>, ExternalMessage)>> {
) -> Result<Option<NetworkMessage>> {
if self.state.get_stake(public_key)?.is_none() {
info!(%peer_id, "peer does not have sufficient stake");
return Ok(None);
Expand Down Expand Up @@ -481,7 +485,7 @@ impl Consensus {
&mut self,
proposal: Proposal,
during_sync: bool,
) -> Result<Option<(PeerId, Vote)>> {
) -> Result<Option<NetworkMessage>> {
self.cleanup_votes();
let (block, transactions) = proposal.into_parts();
let head_block = self.head_block();
Expand Down Expand Up @@ -569,6 +573,27 @@ impl Consensus {
);
}

if let Some(buffered_votes) = self.buffered_votes.remove(&block.hash()) {
// If we've buffered votes for this block, process them now.
let count = buffered_votes.len();
for (i, vote) in buffered_votes.into_iter().enumerate() {
trace!("applying buffered vote {} of {count}", i + 1);
if let Some((block, transactions)) = self.vote(vote)? {
// If we reached the supermajority while processing this vote, send the next block proposal.
// Further votes are ignored (including our own).
// TODO(#720): We should prioritise our own vote.
trace!("supermajority reached, sending next proposal");
return Ok(Some((
None,
ExternalMessage::Proposal(Proposal::from_parts(block, transactions)),
)));
}
}

// If we reach this point, we had some buffered votes but they were not sufficient to reach a
// supermajority.
}

if !block.committee.iter().any(|v| v.peer_id == self.peer_id()) {
trace!(
"can't vote for block proposal, we aren't in the committee of length {:?}",
Expand All @@ -581,7 +606,7 @@ impl Consensus {

if !during_sync {
trace!(proposal_view, ?next_leader, "voting for block");
return Ok(Some((next_leader, vote)));
return Ok(Some((Some(next_leader), ExternalMessage::Vote(vote))));
}
}
} else {
Expand Down Expand Up @@ -705,30 +730,46 @@ impl Consensus {
}

pub fn vote(&mut self, vote: Vote) -> Result<Option<(Block, Vec<VerifiedTransaction>)>> {
let Some(block) = self.get_block(&vote.block_hash)? else {
trace!(vote_view = vote.view, "ignoring vote, missing block");
return Ok(None);
};
let block_hash = block.hash();
let block_view = block.view();
let block_hash = vote.block_hash;
let block_view = vote.view;
let current_view = self.view.get_view();
trace!(block_view, current_view, %block_hash, "handling vote");

// if we are not the leader of the round in which the vote counts
// if the vote is too old and does not count anymore
if block_view + 1 < self.view.get_view() {
trace!("vote is too old");
return Ok(None);
}

// Verify the signature in the vote matches the public key in the vote. This tells us that the vote was created
// by the owner of `vote.public_key`, but we don't yet know that a vote from that node is valid. In other
// words, a malicious node which is not part of the consensus committee may send us a vote and this check will
// still pass. We later validate that the owner of `vote.public_key` is a valid voter.
vote.verify()?;

// Retrieve the actual block this vote is for.
let Some(block) = self.get_block(&block_hash)? else {
trace!("vote for unknown block, buffering");
// If we don't have the block yet, we buffer the vote in case we recieve the block later. Note that we
// don't know the leader of this view without the block, so we may be storing this unnecessarily, however
// non-malicious nodes should only have sent us this vote if they thought we were the leader.
self.buffered_votes
.entry(block_hash)
.or_default()
.push(vote);
return Ok(None);
};

// Check if we are the leader if we are not the leader of the round in which the vote counts
// The vote is in the happy path (?) - so the view is block view + 1
if !self.are_we_leader_for_view(block_hash, block_view + 1) {
if self.leader(&block.committee, block_view + 1).peer_id != self.peer_id() {
trace!(
vote_view = block_view + 1,
?block_hash,
"skipping vote, not the leader"
);
return Ok(None);
}
// if the vote is too old and does not count anymore
if block_view + 1 < self.view.get_view() {
trace!("vote is too old");
return Ok(None);
}

// verify the sender's signature on block_hash
let (index, _) = block
Expand All @@ -737,7 +778,6 @@ impl Consensus {
.enumerate()
.find(|(_, v)| v.public_key == vote.public_key)
.unwrap();
vote.verify()?;

let committee_size = block.committee.len();
let (mut signatures, mut cosigned, mut cosigned_weight, mut supermajority_reached) =
Expand Down Expand Up @@ -782,7 +822,7 @@ impl Consensus {
// if we are already in the round in which the vote counts and have reached supermajority
if block_view + 1 == self.view.get_view() {
let qc =
self.qc_from_bits(block_hash, &signatures, cosigned.clone(), vote.view);
self.qc_from_bits(block_hash, &signatures, cosigned.clone(), block_view);
let parent_hash = qc.block_hash;
let parent = self
.get_block(&parent_hash)?
Expand Down Expand Up @@ -1438,7 +1478,7 @@ impl Consensus {

// Checks for the validity of a block and adds it to our block store if valid.
// Returns true when the block is valid and newly seen and false otherwise.
pub fn receive_block(&mut self, proposal: Proposal) -> Result<bool> {
pub fn receive_block(&mut self, proposal: Proposal) -> Result<(bool, Option<NetworkMessage>)> {
let (block, transactions) = proposal.into_parts();
trace!(
"received block: {} number: {}, view: {}",
Expand All @@ -1452,7 +1492,7 @@ impl Consensus {
block.hash(),
self.head_block()
);
return Ok(false);
return Ok((false, None));
}

// Check whether it is loose or not - we do not store loose blocks.
Expand All @@ -1465,7 +1505,7 @@ impl Consensus {
);
self.block_store
.request_blocks(None, block.header.number.saturating_sub(1))?;
return Ok(false);
return Ok((false, None));
}

match self.check_block(&block) {
Expand All @@ -1481,7 +1521,7 @@ impl Consensus {

let current_head = self.head_block();

self.proposal(
let response = self.proposal(
Proposal::from_parts_with_hashes(
block,
transactions
Expand All @@ -1496,12 +1536,14 @@ impl Consensus {
)?;

// Return whether the head block hash changed as to whether it was new
Ok(self.head_block().hash() != current_head.hash())
let was_new = self.head_block().hash() != current_head.hash();

Ok((was_new, response))
}
Err(e) => {
warn!(?e, "invalid block received during sync!");

Ok(false)
Ok((false, None))
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions zilliqa/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl MessageSender {
}
}

/// Messages sent by [Consensus].
/// Tuple of (destination, message).
pub type NetworkMessage = (Option<PeerId>, ExternalMessage);

/// The central data structure for a blockchain node.
///
/// # Transaction Lifecycle
Expand Down Expand Up @@ -131,10 +135,13 @@ impl Node {
ExternalMessage::Proposal(m) => {
let m_view = m.header.view;

if let Some((leader, vote)) = self.consensus.proposal(m, false)? {
if let Some((to, message)) = self.consensus.proposal(m, false)? {
self.reset_timeout.send(())?;
self.message_sender
.send_external_message(leader, ExternalMessage::Vote(vote))?;
if let Some(to) = to {
self.message_sender.send_external_message(to, message)?;
} else {
self.message_sender.broadcast_external_message(message)?;
}
} else {
info!("We had nothing to respond to proposal, lets try to join committee for view {m_view:}");
self.message_sender.send_external_message(
Expand Down Expand Up @@ -556,7 +563,15 @@ impl Node {
let length_recvd = response.proposals.len();

for block in response.proposals {
was_new = self.consensus.receive_block(block)?;
let (new, response) = self.consensus.receive_block(block)?;
was_new = new;
if let Some((to, message)) = response {
if let Some(to) = to {
self.message_sender.send_external_message(to, message)?;
} else {
self.message_sender.broadcast_external_message(message)?;
}
}
}

if was_new && length_recvd > 1 {
Expand Down
48 changes: 44 additions & 4 deletions zilliqa/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ mod consensus;
mod eth;
mod persistence;
mod staking;
mod unreliable;
mod web3;
mod zil;
use std::{env, ops::DerefMut};
use std::{collections::HashSet, env, ops::DerefMut};

use ethers::{
solc::SHANGHAI_SOLC,
Expand Down Expand Up @@ -156,6 +157,9 @@ struct Network {
// We keep `nodes` and `receivers` separate so we can independently borrow each half of this struct, while keeping
// the borrow checker happy.
nodes: Vec<TestNode>,
// We keep track of a list of disconnected nodes. These nodes will not recieve any messages until they are removed
// from this list.
disconnected: HashSet<usize>,
/// A stream of messages from each node. The stream items are a tuple of (source, destination, message).
/// If the destination is `None`, the message is a broadcast.
receivers: Vec<BoxStream<'static, StreamMessage>>,
Expand Down Expand Up @@ -262,6 +266,7 @@ impl Network {
genesis_committee,
genesis_deposits,
nodes,
disconnected: HashSet::new(),
is_main,
shard_id,
receivers,
Expand Down Expand Up @@ -703,14 +708,23 @@ impl Network {
}
AnyMessage::External(external_message) => {
let nodes: Vec<(usize, &TestNode)> = if let Some(destination) = destination {
vec![self
let (index, node) = self
.nodes
.iter()
.enumerate()
.find(|(_, n)| n.peer_id == destination)
.unwrap()]
.unwrap();
if self.disconnected.contains(&index) {
vec![]
} else {
vec![(index, node)]
}
} else {
self.nodes.iter().enumerate().collect()
self.nodes
.iter()
.enumerate()
.filter(|(index, _)| !self.disconnected.contains(index))
.collect()
};
for (index, node) in nodes.iter() {
let span = tracing::span!(tracing::Level::INFO, "handle_message", index);
Expand Down Expand Up @@ -796,10 +810,36 @@ impl Network {
.unwrap();
}

pub fn disconnect_node(&mut self, index: usize) {
self.disconnected.insert(index);
}

pub fn connect_node(&mut self, index: usize) {
self.disconnected.remove(&index);
}

pub fn random_index(&mut self) -> usize {
self.rng.lock().unwrap().gen_range(0..self.nodes.len())
}

pub async fn wallet_of_node(
&mut self,
index: usize,
) -> SignerMiddleware<Provider<LocalRpcClient>, LocalWallet> {
let key = SigningKey::random(self.rng.lock().unwrap().deref_mut());
let wallet: LocalWallet = key.into();
let node = &self.nodes[index];
let client = LocalRpcClient {
id: Arc::new(AtomicU64::new(0)),
rpc_module: node.rpc_module.clone(),
};
let provider = Provider::new(client);

SignerMiddleware::new_with_provider_chain(provider, wallet)
.await
.unwrap()
}

/// Returns (index, TestNode)
fn find_node(&self, peer_id: PeerId) -> Option<(usize, &TestNode)> {
self.nodes
Expand Down
35 changes: 35 additions & 0 deletions zilliqa/tests/it/unreliable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::Network;

#[zilliqa_macros::test]
async fn blocks_are_produced_while_a_node_restarts(mut network: Network) {
let restarted_node = network.random_index();
let wallet = network.wallet_of_node(restarted_node).await;

// Select a wallet connected to a different node, so we can query the network when the first node is disconnected.
let other_wallet = loop {
let i = network.random_index();
if i != restarted_node {
break i;
}
};
let other_wallet = network.wallet_of_node(other_wallet).await;

// Produce a few blocks to start with. Enough for everyone to join the consensus committee.
// TODO(#721): Once the committee is visible in the API, we can avoid waiting as long.
network.run_until_block(&wallet, 8.into(), 400).await;

// Disconnect the node we are 'restarting'.
network.disconnect_node(restarted_node);

// Produce 2 more blocks.
network.run_until_block(&other_wallet, 10.into(), 400).await;

// Reconnect the 'restarted' node.
network.connect_node(restarted_node);

// TODO(#721): We should assert here that a new view occurred if-and-only-if the 'restarted' node was the proposer
// of blocks 3 or 4. This would tell us that we aren't producing new views unnecessarily.

// Ensure more blocks are produced.
network.run_until_block(&wallet, 12.into(), 400).await;
}

0 comments on commit e3addf4

Please sign in to comment.