Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

As Non light client: broadcast BlockHeaders #2343

Merged
merged 1 commit into from
May 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 68 additions & 2 deletions consensus/src/sync/live/block_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
hash_map::{Entry as HashMapEntry, HashMap},
BTreeSet, HashSet,
},
error::Error,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -441,26 +442,91 @@ impl<N: Network> BlockQueue<N> {
Some(QueuedBlock::Missing(blocks))
}

/// Publishes a given Block on the BlockHeaderTopic. It will strip the micro bodies from the blocks if they are present.
///
/// ## Panic
/// For macro blocks the body must be present and if it is not this function will panic!
fn publish_block_header(&self, mut block: Block) {
let network = Arc::clone(&self.network);

tokio::spawn(async move {
let block_id = format!("{}", block);
log::debug!(block = block_id, "Broadcasting on BlockHeaderTopic",);

match block {
// Remove the body from micro blocks before publishing to the block header topic.
Block::Micro(ref mut micro_block) => micro_block.body = None,
// Macro blocks must be always sent with body.
Block::Macro(ref macro_block) => {
if macro_block.body.is_none() {
panic!("Macro block bodies must exists in order to publish on BlockHeaderTopic")
}
}
}

if let Err(e) = network.publish::<BlockHeaderTopic>(block).await {
debug!(
block = block_id,
error = &e as &dyn Error,
"Failed to publish block header"
);
}
});
}

/// Fetches the relevant blocks for any given `BlockchainEvent`
fn get_new_blocks_from_blockchain_event(&self, event: BlockchainEvent) -> Vec<Block> {
// Collect block numbers and hashes of newly added blocks first.
let mut block_infos = vec![];

match event {
BlockchainEvent::Extended(block_hash)
| BlockchainEvent::HistoryAdopted(block_hash)
BlockchainEvent::Extended(block_hash) => {
// A bit hacky: include_micro_bodies is used here to determine whether or not this node is a light node.
// Non light clients should republish all blocks to the BlockHeaderTopic.
if self.config.include_micro_bodies {
if let Ok(block) = self.blockchain.read().get_block(&block_hash, true) {
self.publish_block_header(block.clone());
block_infos.push(block);
}
} else if let Ok(block) = self.blockchain.read().get_block(&block_hash, false) {
block_infos.push(block);
}
}
BlockchainEvent::HistoryAdopted(block_hash)
| BlockchainEvent::Finalized(block_hash)
| BlockchainEvent::EpochFinalized(block_hash) => {
if let Ok(block) = self.blockchain.read().get_block(&block_hash, false) {
block_infos.push(block);
}
}
BlockchainEvent::Rebranched(_, new_blocks) => {
// A bit hacky: include_micro_bodies is used here to determine whether or not this node is a light node.
// Non light clients should republish all blocks to the BlockHeaderTopic.
if self.config.include_micro_bodies {
// `new_blocks` does not include the bodies. The last block adopted (the new one) needs to be fetched
// with its body included.
let (block_hash, _block) =
new_blocks.last().expect("Rebranched with no new blocks");
if let Ok(block) = self.blockchain.read().get_block(block_hash, true) {
self.publish_block_header(block);
}
}

for (_block_hash, block) in new_blocks {
block_infos.push(block);
}
}
BlockchainEvent::Stored(block) => {
// A bit hacky: include_micro_bodies is used here to determine whether or not this node is a light node.
// Non light clients should republish all blocks to the BlockHeaderTopic.
if self.config.include_micro_bodies {
// `block` does not include the body. It needs to be fetched with its body included.
if let Ok(block_with_body) =
self.blockchain.read().get_block(&block.hash(), true)
{
self.publish_block_header(block_with_body);
}
}
block_infos.push(block);
}
}
Expand Down