Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Lower bound the batch proposal production time #3193

Merged
merged 10 commits into from Apr 3, 2024
2 changes: 2 additions & 0 deletions node/bft/src/lib.rs
Expand Up @@ -49,6 +49,8 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The maximum number of seconds to wait before proposing a batch (rounded down to the nearest second).
pub const MAX_BATCH_DELAY_IN_SECS: u64 = MAX_BATCH_DELAY_IN_MS / 1000; // seconds
raychu86 marked this conversation as resolved.
Show resolved Hide resolved
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
Expand Down
136 changes: 127 additions & 9 deletions node/bft/src/primary.rs
Expand Up @@ -33,6 +33,7 @@ use crate::{
Transport,
Worker,
MAX_BATCH_DELAY_IN_MS,
MAX_BATCH_DELAY_IN_SECS,
MAX_WORKERS,
PRIMARY_PING_IN_MS,
WORKER_PING_IN_MS,
Expand Down Expand Up @@ -317,10 +318,18 @@ impl<N: Network> Primary<N> {

// Retrieve the current round.
let round = self.current_round();
// Compute the previous round.
let previous_round = round.saturating_sub(1);

#[cfg(feature = "metrics")]
metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);

// Ensure that the primary does not create a new proposal too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
return Ok(());
}

// Ensure the primary has not proposed a batch for this round before.
if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
// If a BFT sender was provided, attempt to advance the current round.
Expand Down Expand Up @@ -359,8 +368,6 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = round.saturating_sub(1);
// Retrieve the previous certificates.
let previous_certificates = self.storage.get_certificates_for_round(previous_round);

Expand Down Expand Up @@ -595,6 +602,15 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = batch_round.saturating_sub(1);
// Ensure that the peer did not propose a batch too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - {e} from '{peer_ip}'");
}

// If the peer is ahead, use the batch header to sync up to the peer.
let mut transmissions = self.sync_with_batch_header_from_peer(peer_ip, &batch_header).await?;

Expand Down Expand Up @@ -1189,6 +1205,30 @@ impl<N: Network> Primary<N> {
Ok(())
}

/// Ensure the primary is not creating batch proposals too frequently.
/// This checks that the certificate timestamp for the previous round is within the expected range.
fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
// Ensure that the primary does not create a new proposal too quickly.
match self.storage.get_certificate_for_round_with_author(previous_round, author) {
// Ensure that the previous certificate was created at least `MAX_BATCH_DELAY_IN_SECS` seconds ago.
Some(certificate) => {
// Determine the elapsed time since the previous certificate.
let elapsed = timestamp.checked_sub(certificate.timestamp()).ok_or_else(|| {
anyhow!("Proposed batch has a timestamp earlier than the certificate at round {previous_round}")
})?;
// Ensure the elapsed time is within the expected range.
match elapsed < MAX_BATCH_DELAY_IN_SECS as i64 {
true => {
bail!("Proposed batch was created too quickly after the certificate at round {previous_round}")
}
false => Ok(()),
}
}
// If we do not see a previous certificate for the author, then proceed optimistically.
None => Ok(()),
}
}

/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
Expand Down Expand Up @@ -1766,6 +1806,9 @@ mod tests {
// Fill primary storage.
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MAX_BATCH_DELAY_IN_SECS + 1)).await;

// Try to propose a batch. There are no transmissions in the workers so the method should
// just return without proposing a batch.
assert!(primary.propose_batch().await.is_ok());
Expand Down Expand Up @@ -1834,6 +1877,9 @@ mod tests {
primary.storage.insert_certificate(certificate, transmissions).unwrap();
}

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MAX_BATCH_DELAY_IN_SECS + 1)).await;

// Advance to the next round.
assert!(primary.storage.increment_to_next_round(round).is_ok());

Expand All @@ -1859,7 +1905,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1895,7 +1941,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1926,7 +1972,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1968,7 +2014,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1998,6 +2044,78 @@ mod tests {
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_invalid_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let invalid_timestamp = now(); // Use a timestamp that is too early.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
invalid_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_past_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let past_timestamp = now() - 5; // Use a timestamp that is in the past.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
past_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_signature_from_peer() {
let mut rng = TestRng::default();
Expand All @@ -2006,7 +2124,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2079,7 +2197,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2116,7 +2234,7 @@ mod tests {
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal.
let timestamp = now();
let timestamp = now() + MAX_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down