Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Lower bound the batch proposal production time #3193

Merged
merged 10 commits into from
Apr 3, 2024
2 changes: 2 additions & 0 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The minimum number of seconds to wait before proposing a batch.
pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
Expand Down
143 changes: 134 additions & 9 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
Worker,
MAX_BATCH_DELAY_IN_MS,
MAX_WORKERS,
MIN_BATCH_DELAY_IN_SECS,
PRIMARY_PING_IN_MS,
WORKER_PING_IN_MS,
};
Expand Down Expand Up @@ -90,6 +91,8 @@ pub struct Primary<N: Network> {
bft_sender: Arc<OnceCell<BFTSender<N>>>,
/// The batch proposal, if the primary is currently proposing a batch.
proposed_batch: Arc<ProposedBatch<N>>,
/// The timestamp of the most recent proposed batch.
latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
/// The recently-signed batch proposals (a map from the address to the round, batch ID, and signature).
signed_proposals: Arc<RwLock<HashMap<Address<N>, (u64, Field<N>, Signature<N>)>>>,
/// The spawned handles.
Expand Down Expand Up @@ -124,6 +127,7 @@ impl<N: Network> Primary<N> {
workers: Arc::from(vec![]),
bft_sender: Default::default(),
proposed_batch: Default::default(),
latest_proposed_batch_timestamp: Default::default(),
signed_proposals: Default::default(),
handles: Default::default(),
propose_lock: Default::default(),
Expand Down Expand Up @@ -317,10 +321,18 @@ impl<N: Network> Primary<N> {

// Retrieve the current round.
let round = self.current_round();
// Compute the previous round.
let previous_round = round.saturating_sub(1);

#[cfg(feature = "metrics")]
metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);

// Ensure that the primary does not create a new proposal too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
return Ok(());
}

// Ensure the primary has not proposed a batch for this round before.
if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
// If a BFT sender was provided, attempt to advance the current round.
Expand Down Expand Up @@ -359,8 +371,6 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = round.saturating_sub(1);
// Retrieve the previous certificates.
let previous_certificates = self.storage.get_certificates_for_round(previous_round);

Expand Down Expand Up @@ -498,6 +508,8 @@ impl<N: Network> Primary<N> {
let proposal = Proposal::new(committee_lookback, batch_header.clone(), transmissions)?;
// Broadcast the batch to all validators for signing.
self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
// Set the timestamp of the latest proposed batch.
*self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
// Set the proposed batch.
*self.proposed_batch.write() = Some(proposal);
Ok(())
Expand Down Expand Up @@ -595,6 +607,15 @@ impl<N: Network> Primary<N> {
}
}

// Compute the previous round.
let previous_round = batch_round.saturating_sub(1);
// Ensure that the peer did not propose a batch too quickly.
if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - {e} from '{peer_ip}'");
}

// If the peer is ahead, use the batch header to sync up to the peer.
let mut transmissions = self.sync_with_batch_header_from_peer(peer_ip, &batch_header).await?;

Expand Down Expand Up @@ -1189,6 +1210,32 @@ impl<N: Network> Primary<N> {
Ok(())
}

/// Ensure the primary is not creating batch proposals too frequently.
/// This checks that the certificate timestamp for the previous round is within the expected range.
fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
// Retrieve the timestamp of the previous timestamp to check against.
let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
// Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
Some(certificate) => certificate.timestamp(),
None => match self.gateway.account().address() == author {
// If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
true => *self.latest_proposed_batch_timestamp.read(),
// If we do not see a previous certificate for the author, then proceed optimistically.
false => return Ok(()),
},
};

// Determine the elapsed time since the previous timestamp.
let elapsed = timestamp
.checked_sub(previous_timestamp)
.ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
// Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
false => Ok(()),
}
}

/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
Expand Down Expand Up @@ -1766,6 +1813,9 @@ mod tests {
// Fill primary storage.
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;

// Try to propose a batch. There are no transmissions in the workers so the method should
// just return without proposing a batch.
assert!(primary.propose_batch().await.is_ok());
Expand Down Expand Up @@ -1834,6 +1884,9 @@ mod tests {
primary.storage.insert_certificate(certificate, transmissions).unwrap();
}

// Sleep for a while to ensure the primary is ready to propose the next round.
tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;

// Advance to the next round.
assert!(primary.storage.increment_to_next_round(round).is_ok());

Expand All @@ -1859,7 +1912,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1895,7 +1948,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1926,7 +1979,7 @@ mod tests {
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1968,7 +2021,7 @@ mod tests {
// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -1998,6 +2051,78 @@ mod tests {
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_invalid_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let invalid_timestamp = now(); // Use a timestamp that is too early.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
invalid_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_with_past_timestamp() {
let round = 2;
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Generate certificates.
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal with an author that isn't the primary.
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let past_timestamp = now() - 5; // Use a timestamp that is in the past.
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
previous_certificates,
past_timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should error.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_signature_from_peer() {
let mut rng = TestRng::default();
Expand All @@ -2006,7 +2131,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2079,7 +2204,7 @@ mod tests {

// Create a valid proposal.
let round = 1;
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down Expand Up @@ -2116,7 +2241,7 @@ mod tests {
let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);

// Create a valid proposal.
let timestamp = now();
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
primary.gateway.account(),
primary.ledger.current_committee().unwrap(),
Expand Down