diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index d6fcbae66f..ff2fe665a5 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -49,6 +49,8 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port /// The maximum number of milliseconds to wait before proposing a batch. pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms +/// The minimum number of seconds to wait before proposing a batch. +pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds /// The maximum number of milliseconds to wait before timing out on a fetch. pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms /// The maximum number of seconds allowed for the leader to send their certificate. diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index a2a070183e..5ecccd2132 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -34,6 +34,7 @@ use crate::{ Worker, MAX_BATCH_DELAY_IN_MS, MAX_WORKERS, + MIN_BATCH_DELAY_IN_SECS, PRIMARY_PING_IN_MS, WORKER_PING_IN_MS, }; @@ -90,6 +91,8 @@ pub struct Primary { bft_sender: Arc>>, /// The batch proposal, if the primary is currently proposing a batch. proposed_batch: Arc>, + /// The timestamp of the most recent proposed batch. + latest_proposed_batch_timestamp: Arc>, /// The recently-signed batch proposals (a map from the address to the round, batch ID, and signature). signed_proposals: Arc, (u64, Field, Signature)>>>, /// The spawned handles. @@ -124,6 +127,7 @@ impl Primary { workers: Arc::from(vec![]), bft_sender: Default::default(), proposed_batch: Default::default(), + latest_proposed_batch_timestamp: Default::default(), signed_proposals: Default::default(), handles: Default::default(), propose_lock: Default::default(), @@ -317,10 +321,18 @@ impl Primary { // Retrieve the current round. let round = self.current_round(); + // Compute the previous round. + let previous_round = round.saturating_sub(1); #[cfg(feature = "metrics")] metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64); + // Ensure that the primary does not create a new proposal too quickly. + if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) { + debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed()); + return Ok(()); + } + // Ensure the primary has not proposed a batch for this round before. if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) { // If a BFT sender was provided, attempt to advance the current round. @@ -359,8 +371,6 @@ impl Primary { } } - // Compute the previous round. - let previous_round = round.saturating_sub(1); // Retrieve the previous certificates. let previous_certificates = self.storage.get_certificates_for_round(previous_round); @@ -498,6 +508,8 @@ impl Primary { let proposal = Proposal::new(committee_lookback, batch_header.clone(), transmissions)?; // Broadcast the batch to all validators for signing. self.gateway.broadcast(Event::BatchPropose(batch_header.into())); + // Set the timestamp of the latest proposed batch. + *self.latest_proposed_batch_timestamp.write() = proposal.timestamp(); // Set the proposed batch. *self.proposed_batch.write() = Some(proposal); Ok(()) @@ -595,6 +607,15 @@ impl Primary { } } + // Compute the previous round. + let previous_round = batch_round.saturating_sub(1); + // Ensure that the peer did not propose a batch too quickly. + if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { + // Proceed to disconnect the validator. + self.gateway.disconnect(peer_ip); + bail!("Malicious peer - {e} from '{peer_ip}'"); + } + // If the peer is ahead, use the batch header to sync up to the peer. let mut transmissions = self.sync_with_batch_header_from_peer(peer_ip, &batch_header).await?; @@ -1189,6 +1210,32 @@ impl Primary { Ok(()) } + /// Ensure the primary is not creating batch proposals too frequently. + /// This checks that the certificate timestamp for the previous round is within the expected range. + fn check_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { + // Retrieve the timestamp of the previous timestamp to check against. + let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + Some(certificate) => certificate.timestamp(), + None => match self.gateway.account().address() == author { + // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + true => *self.latest_proposed_batch_timestamp.read(), + // If we do not see a previous certificate for the author, then proceed optimistically. + false => return Ok(()), + }, + }; + + // Determine the elapsed time since the previous timestamp. + let elapsed = timestamp + .checked_sub(previous_timestamp) + .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { + true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), + false => Ok(()), + } + } + /// Stores the certified batch and broadcasts it to all validators, returning the certificate. async fn store_and_broadcast_certificate(&self, proposal: &Proposal, committee: &Committee) -> Result<()> { // Create the batch certificate and transmissions. @@ -1766,6 +1813,9 @@ mod tests { // Fill primary storage. store_certificate_chain(&primary, &accounts, round, &mut rng); + // Sleep for a while to ensure the primary is ready to propose the next round. + tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + // Try to propose a batch. There are no transmissions in the workers so the method should // just return without proposing a batch. assert!(primary.propose_batch().await.is_ok()); @@ -1834,6 +1884,9 @@ mod tests { primary.storage.insert_certificate(certificate, transmissions).unwrap(); } + // Sleep for a while to ensure the primary is ready to propose the next round. + tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + // Advance to the next round. assert!(primary.storage.increment_to_next_round(round).is_ok()); @@ -1859,7 +1912,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -1895,7 +1948,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -1926,7 +1979,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -1968,7 +2021,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -1998,6 +2051,78 @@ mod tests { ); } + #[tokio::test] + async fn test_batch_propose_from_peer_with_invalid_timestamp() { + let round = 2; + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng).await; + + // Generate certificates. + let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); + + // Create a valid proposal with an author that isn't the primary. + let peer_account = &accounts[1]; + let peer_ip = peer_account.0; + let invalid_timestamp = now(); // Use a timestamp that is too early. + let proposal = create_test_proposal( + &peer_account.1, + primary.ledger.current_committee().unwrap(), + round, + previous_certificates, + invalid_timestamp, + &mut rng, + ); + + // Make sure the primary is aware of the transmissions in the proposal. + for (transmission_id, transmission) in proposal.transmissions() { + primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) + } + + // The author must be known to resolver to pass propose checks. + primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + + // Try to process the batch proposal from the peer, should error. + assert!( + primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err() + ); + } + + #[tokio::test] + async fn test_batch_propose_from_peer_with_past_timestamp() { + let round = 2; + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng).await; + + // Generate certificates. + let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); + + // Create a valid proposal with an author that isn't the primary. + let peer_account = &accounts[1]; + let peer_ip = peer_account.0; + let past_timestamp = now() - 5; // Use a timestamp that is in the past. + let proposal = create_test_proposal( + &peer_account.1, + primary.ledger.current_committee().unwrap(), + round, + previous_certificates, + past_timestamp, + &mut rng, + ); + + // Make sure the primary is aware of the transmissions in the proposal. + for (transmission_id, transmission) in proposal.transmissions() { + primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone()) + } + + // The author must be known to resolver to pass propose checks. + primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + + // Try to process the batch proposal from the peer, should error. + assert!( + primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err() + ); + } + #[tokio::test(flavor = "multi_thread")] async fn test_batch_signature_from_peer() { let mut rng = TestRng::default(); @@ -2006,7 +2131,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2079,7 +2204,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2116,7 +2241,7 @@ mod tests { let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); // Create a valid proposal. - let timestamp = now(); + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(),