Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor validators #1022

Draft
wants to merge 6 commits into
base: development
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,14 +29,8 @@ impl ValidatorSignatureService for TariSignatureService {
}

impl VoteSignatureService for TariSignatureService {
fn verify(
&self,
signature: &ValidatorSignature,
leaf_hash: &FixedHash,
block_id: &BlockId,
decision: &QuorumDecision,
) -> bool {
let challenge = self.create_challenge(leaf_hash, block_id, decision);
fn verify(&self, signature: &ValidatorSignature, block_id: &BlockId, decision: &QuorumDecision) -> bool {
let challenge = self.create_challenge(block_id, decision);
signature.verify(challenge)
}
}
33 changes: 24 additions & 9 deletions dan_layer/consensus/src/hotstuff/error.rs
Expand Up @@ -67,27 +67,23 @@ pub enum HotStuffError {
},
#[error("Pacemaker channel dropped: {details}")]
PacemakerChannelDropped { details: String },
#[error(
"Bad new view message: HighQC height {high_qc_height}, received new height {received_new_height}: {details}"
)]
BadNewViewMessage {
high_qc_height: NodeHeight,
received_new_height: NodeHeight,
details: String,
},

#[error("BUG Invariant error occurred: {0}")]
InvariantError(String),
#[error("Sync error: {0}")]
SyncError(anyhow::Error),
#[error("Fallen behind: local_height={local_height}, qc_height={qc_height}")]
#[error("Fallen behind: local_height={local_height}, qc_height={qc_height}, detected during: {detected_at}")]
FallenBehind {
local_height: NodeHeight,
qc_height: NodeHeight,
detected_at: String,
},
#[error("Transaction executor error: {0}")]
TransactionExecutorError(String),
#[error("Invalid sync request: {details}")]
InvalidSyncRequest { details: String },
#[error("New view validation error: {0}")]
NewViewValidationError(#[from] NewViewValidationError),
#[error("Some input versions were not resolved at execution time: {0}")]
VersionedSubstateIdError(#[from] VersionedSubstateIdError),
}
Expand All @@ -98,6 +94,25 @@ impl From<EpochManagerError> for HotStuffError {
}
}

#[derive(Debug, thiserror::Error)]
pub enum NewViewValidationError {
#[error(
"NEWVIEW for height less than the locked block, locked block: {locked_height} new height: {new_view_height}"
)]
NewViewHeightLessThanLockedBlock {
locked_height: NodeHeight,
new_view_height: NodeHeight,
},
#[error(
"Bad new view message: HighQC height {high_qc_height}, received new height {received_new_height}: {details}"
)]
BadNewViewMessage {
high_qc_height: NodeHeight,
received_new_height: NodeHeight,
details: String,
},
}

#[derive(Debug, thiserror::Error)]
pub enum ProposalValidationError {
#[error("Storage error: {0}")]
Expand Down
38 changes: 18 additions & 20 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Expand Up @@ -20,14 +20,7 @@ use tokio::{sync::mpsc, time};

use super::config::HotstuffConfig;
use crate::{
block_validations::{
check_base_layer_block_hash,
check_hash_and_height,
check_network,
check_proposed_by_leader,
check_quorum_certificate,
check_signature,
},
validations::block_validations::check_block,
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
Expand Down Expand Up @@ -86,7 +79,8 @@ where TConsensusSpec: ConsensusSpec
self.process_local_proposal(current_height, msg).await?;
},
HotstuffMessage::ForeignProposal(ref proposal) => {
self.check_proposal(&proposal.block).await?;
self.check_proposal(proposal.block)
.await?;
self.report_message_ready(from, msg)?;
},
msg => {
Expand All @@ -111,17 +105,21 @@ where TConsensusSpec: ConsensusSpec
self.message_buffer.clear_buffer();
}

async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> {
check_base_layer_block_hash::<TConsensusSpec>(block, &self.epoch_manager, &self.config).await?;
check_network(block, self.network)?;
check_hash_and_height(block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, block)?;
check_signature(block)?;
check_quorum_certificate::<TConsensusSpec>(block, &self.vote_signing_service, &self.epoch_manager).await?;
async fn check_proposal(
&mut self,
block: &Block,
) -> Result<(), HotStuffError> {


check_block::<TConsensusSpec>(
block,
&self.epoch_manager,
&self.config,
self.network,
&self.leader_strategy,
&self.vote_signing_service,
)
.await?;
Ok(())
}

Expand Down
Expand Up @@ -1273,9 +1273,8 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_validator_node(block.epoch(), &self.local_validator_addr)
.await?;
let leaf_hash = vn.get_node_hash(self.network);

let signature = self.vote_signing_service.sign_vote(&leaf_hash, block.id(), &decision);
let signature = self.vote_signing_service.sign_vote(block.id(), &decision);

Ok(VoteMessage {
epoch: block.epoch(),
Expand Down
56 changes: 8 additions & 48 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Expand Up @@ -22,6 +22,7 @@ use crate::{
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError},
messages::ProposalMessage,
traits::ConsensusSpec,
validations,
};

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_receive_foreign_proposal";
Expand Down Expand Up @@ -73,13 +74,7 @@ where TConsensusSpec: ConsensusSpec
.await?;

let local_shard = self.epoch_manager.get_local_committee_info(block.epoch()).await?;
if let Err(err) = self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
) {
if let Err(err) = self.validate_proposed_block(&from, &block, local_shard.shard(), &foreign_receive_counter) {
warn!(
target: LOG_TARGET,
"🔥 FOREIGN PROPOSAL: Invalid proposal from {}: {}. Ignoring.",
Expand Down Expand Up @@ -208,49 +203,14 @@ where TConsensusSpec: ConsensusSpec
&self,
from: &TConsensusSpec::Addr,
candidate_block: &Block,
foreign_shard: Shard,
local_shard: Shard,
foreign_receive_counter: &ForeignReceiveCounters,
) -> Result<(), ProposalValidationError> {
let Some(incoming_count) = candidate_block.get_foreign_counter(&local_shard) else {
debug!(target:LOG_TARGET, "Our bucket {local_shard:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
};
let current_count = foreign_receive_counter.get_count(&foreign_shard);
if current_count + 1 != incoming_count {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_count}, but the index was {incoming_count}", expected_count = current_count + 1);
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
details: format!(
"Expected foreign receive count to be {} but it was {}",
current_count + 1,
incoming_count
),
});
}
if candidate_block.height().is_zero() || candidate_block.is_genesis() {
return Err(ProposalValidationError::ProposingGenesisBlock {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
}

let calculated_hash = candidate_block.calculate_hash().into();
if calculated_hash != *candidate_block.id() {
return Err(ProposalValidationError::NodeHashMismatch {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
calculated_hash,
});
}

// TODO: validate justify signatures
// self.validate_qc(candidate_block.justify(), committee)?;

Ok(())
validations::foreign_proposal_validations::check_foreign_proposal_message::<TConsensusSpec>(
from,
candidate_block,
local_shard,
foreign_receive_counter,
)
}
}
80 changes: 24 additions & 56 deletions dan_layer/consensus/src/hotstuff/on_receive_new_view.rs
Expand Up @@ -19,6 +19,7 @@ use super::vote_receiver::VoteReceiver;
use crate::{
hotstuff::{common::calculate_last_dummy_block, error::HotStuffError, pacemaker_handle::PaceMakerHandle},
messages::NewViewMessage,
validations::new_view_validations::check_new_view_message,
traits::{ConsensusSpec, LeaderStrategy},
};

Expand Down Expand Up @@ -76,6 +77,26 @@ where TConsensusSpec: ConsensusSpec

#[allow(clippy::too_many_lines)]
pub async fn handle(&mut self, from: TConsensusSpec::Addr, message: NewViewMessage) -> Result<(), HotStuffError> {
let local_committee = self.epoch_manager.get_local_committee(message.epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_info(message.epoch).await?;
let locked = self.store.with_read_tx(|tx| LockedBlock::get(tx))?;
match check_new_view_message::<TConsensusSpec>(
&message,
&self.epoch_manager,
&locked,
&self.leader_strategy,
&local_committee,
&local_committee_shard,
)
.await
{
Ok(()) => {},
Err(e) => {
warn!(target: LOG_TARGET, "❌ Ignoring NEW_VIEW message because it failed validation: {}", e);
return Ok(());
},
}

let NewViewMessage {
high_qc,
new_height,
Expand All @@ -90,29 +111,6 @@ where TConsensusSpec: ConsensusSpec
from
);

if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? {
warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for epoch {} because the epoch is invalid or we are not registered for that epoch", epoch);
return Ok(());
}

// TODO: This prevents syncing the blocks from previous epoch.
// if !self.epoch_manager.is_validator_in_local_committee(&from, epoch).await? {
// return Err(HotStuffError::ReceivedMessageFromNonCommitteeMember {
// epoch,
// sender: from.to_string(),
// context: format!("Received NEWVIEW from {}", from),
// });
// }

// We can never accept NEWVIEWS for heights that are lower than the locked block height
let locked = self.store.with_read_tx(|tx| LockedBlock::get(tx))?;
if new_height < locked.height() {
warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for height less than the locked block, locked block: {} new height: {}", locked, new_height);
return Ok(());
}

self.validate_qc(&high_qc)?;

// Sync if we do not have the block for this valid QC
let exists = self
.store
Expand All @@ -126,23 +124,7 @@ where TConsensusSpec: ConsensusSpec
return Err(HotStuffError::FallenBehind {
local_height: leaf.height(),
qc_height: high_qc.block_height(),
});
}

let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
let leader = self
.leader_strategy
.get_leader_for_next_block(&local_committee, new_height);
let our_node = self.epoch_manager.get_our_validator_node(epoch).await?;

if *leader != our_node.address {
warn!(target: LOG_TARGET, "❌ New View failed, leader is {} at height:{}", leader, new_height);
return Err(HotStuffError::NotTheLeader {
details: format!(
"Received NEWVIEW height {} but this not is not the leader for that height",
new_height
),
detected_at: "NEWVIEW".to_string(),
});
}

Expand All @@ -154,15 +136,6 @@ where TConsensusSpec: ConsensusSpec
self.vote_receiver.handle(from.clone(), vote, false).await?;
}

// Are nodes requesting to create more than the minimum number of dummy blocks?
if high_qc.block_height().saturating_sub(new_height).as_u64() > local_committee.len() as u64 {
return Err(HotStuffError::BadNewViewMessage {
details: format!("Validator {from} requested an invalid number of dummy blocks"),
high_qc_height: high_qc.block_height(),
received_new_height: new_height,
});
}

// Take note of unique NEWVIEWs so that we can count them
let newview_count = self.collect_new_views(from, new_height, &high_qc);

Expand All @@ -179,7 +152,7 @@ where TConsensusSpec: ConsensusSpec

let threshold = self.epoch_manager.get_local_threshold_for_epoch(epoch).await?;

info!(
debug!(
target: LOG_TARGET,
"🌟 Received NEWVIEW for height {} (QC: {}) has {} votes out of {}",
new_height,
Expand All @@ -190,7 +163,7 @@ where TConsensusSpec: ConsensusSpec
// Once we have received enough (quorum) NEWVIEWS, we can create the dummy block(s) and propose the next block.
// Any subsequent NEWVIEWs for this height/view are ignored.
if newview_count == threshold {
info!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, high_qc.as_high_qc(), newview_count, threshold);
debug!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, high_qc.as_high_qc(), newview_count, threshold);

let high_qc_block = self.store.with_read_tx(|tx| high_qc.get_block(tx))?;
// Determine how many missing blocks we must fill without actually creating them.
Expand Down Expand Up @@ -220,9 +193,4 @@ where TConsensusSpec: ConsensusSpec

Ok(())
}

fn validate_qc(&self, _qc: &QuorumCertificate) -> Result<(), HotStuffError> {
// TODO
Ok(())
}
}