Skip to content

Commit

Permalink
Merge pull request #3193 from AleoHQ/min-proposal-time
Browse files Browse the repository at this point in the history
Lower bound the batch proposal production time
  • Loading branch information
howardwu committed Apr 3, 2024
2 parents 053512d + ef8b522 commit e26c608
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 9 deletions.
2 changes: 2 additions & 0 deletions node/bft/src/lib.rs
Expand Up @@ -49,6 +49,8 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The minimum number of seconds to wait before proposing a batch.
pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
Expand Down
143 changes: 134 additions & 9 deletions node/bft/src/primary.rs
Expand Up @@ -34,6 +34,7 @@ use crate::{
Worker,
MAX_BATCH_DELAY_IN_MS,
MAX_WORKERS,
MIN_BATCH_DELAY_IN_SECS,
PRIMARY_PING_IN_MS,
WORKER_PING_IN_MS,
};
Expand Down Expand Up @@ -90,6 +91,8 @@ pub struct Primary<N: Network> {
bft_sender: Arc<OnceCell<BFTSender<N>>>,
/// The batch proposal, if the primary is currently proposing a batch.
proposed_batch: Arc<ProposedBatch<N>>,
/// The timestamp of the most recent proposed batch.
latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
/// The recently-signed batch proposals (a map from the address to the round, batch ID, and signature).
signed_proposals: Arc<RwLock<HashMap<Address<N>, (u64, Field<N>, Signature<N>)>>>,
/// The spawned handles.
Expand Down Expand Up @@ -124,6 +127,7 @@ impl<N: Network> Primary<N> {
workers: Arc::from(vec![]),
bft_sender: Default::default(),
proposed_batch: Default::default(),
latest_proposed_batch_timestamp: Default::default(),
signed_proposals: Default::default(),
handles: Default::default(),
propose_lock: Default::default(),
Expand Down Expand Up @@ -317,10 +321,18 @@ impl<N: Network> Primary<N> {

// Retrieve the current round.
let round = self.current_round();
// Compute the previous round.
let previous_round = round.saturating_sub(1);

#[cfg(feature = "metrics")]
metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);

// Ensure that the primary does not create a new proposal too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
return Ok(());
}

// Ensure the primary has not proposed a batch for this round before.
if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
// If a BFT sender was provided, attempt to advance the current round.
Expand Down Expand Up @@ -359,8 +371,6 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = round.saturating_sub(1);
// Retrieve the previous certificates.
let previous_certificates = self.storage.get_certificates_for_round(previous_round);

Expand Down Expand Up @@ -498,6 +508,8 @@ impl<N: Network> Primary<N> {
let proposal = Proposal::new(committee_lookback, batch_header.clone(), transmissions)?;
// Broadcast the batch to all validators for signing.
self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
// Set the timestamp of the latest proposed batch.
*self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
// Set the proposed batch.
*self.proposed_batch.write() = Some(proposal);
Ok(())
Expand Down Expand Up @@ -595,6 +607,15 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = batch_round.saturating_sub(1);
// Ensure that the peer did not propose a batch too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - {e} from '{peer_ip}'");
}

// If the peer is ahead, use the batch header to sync up to the peer.
let mut transmissions = self.sync_with_batch_header_from_peer(peer_ip, &batch_header).await?;

Expand Down Expand Up @@ -1189,6 +1210,32 @@ impl<N: Network> Primary<N> {
Ok(())
}

/// Ensure the primary is not creating batch proposals too frequently.
/// This checks that the certificate timestamp for the previous round is within the expected range.
fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
// Retrieve the timestamp of the previous timestamp to check against.
let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
// Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
Some(certificate) => certificate.timestamp(),
None => match self.gateway.account().address() == author {
// If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
true => *self.latest_proposed_batch_timestamp.read(),
// If we do not see a previous certificate for the author, then proceed optimistically.
false => return Ok(()),
},
};

// Determine the elapsed time since the previous timestamp.
let elapsed = timestamp
.checked_sub(previous_timestamp)
.ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
// Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
false => Ok(()),
}
}

/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
Expand Down Expand Up @@ -1766,6 +1813,9 @@ mod tests {
// Fill primary storage.
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;

// Try to propose a batch. There are no transmissions in the workers so the method should
// just return without proposing a batch.
assert!(primary.propose_batch().await.is_ok());
Expand Down Expand Up @@ -1834,6 +1884,9 @@ mod tests {
primary.storage.insert_certificate(certificate, transmissions).unwrap();
}

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;

// Advance to the next round.
assert!(primary.storage.increment_to_next_round(round).is_ok());

Expand All @@ -1859,7 +1912,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1895,7 +1948,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1926,7 +1979,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1968,7 +2021,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1998,6 +2051,78 @@ mod tests {
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_invalid_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let invalid_timestamp = now(); // Use a timestamp that is too early.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
invalid_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_past_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let past_timestamp = now() - 5; // Use a timestamp that is in the past.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
past_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_signature_from_peer() {
let mut rng = TestRng::default();
Expand All @@ -2006,7 +2131,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2079,7 +2204,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2116,7 +2241,7 @@ mod tests {
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal.
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down

0 comments on commit e26c608

Please sign in to comment.