Skip to content

Commit

Permalink
Merge pull request #2395 from AleoHQ/optimize-deserialize
Browse files Browse the repository at this point in the history
Add parallelism to `BatchHeader` and `Committee` deserialization
  • Loading branch information
howardwu committed Mar 20, 2024
2 parents eefd27d + 230d42b commit e9d5f8f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion ledger/committee/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ license = "Apache-2.0"
edition = "2021"

[features]
default = [ ]
default = [ "rayon" ]
serial = [ "console/serial" ]
wasm = [ "console/wasm" ]
metrics = [ "dep:metrics" ]
Expand Down Expand Up @@ -74,6 +74,10 @@ optional = true
version = "0.4"
optional = true

[dependencies.rayon]
version = "1"
optional = true

[dependencies.test-strategy]
version = "0.3.1"
optional = true
Expand Down
30 changes: 19 additions & 11 deletions ledger/committee/src/bytes.rs
Expand Up @@ -37,18 +37,26 @@ impl<N: Network> FromBytes for Committee<N> {
Self::MAX_COMMITTEE_SIZE,
)));
}

// Calculate the number of bytes per member. Each member is a (address, stake, is_open) tuple.
let member_byte_size = Address::<N>::size_in_bytes() + 8 + 1;
// Read the member bytes.
let mut member_bytes = vec![0u8; num_members as usize * member_byte_size];
reader.read_exact(&mut member_bytes)?;
// Read the members.
let mut members = IndexMap::with_capacity(num_members as usize);
for _ in 0..num_members {
// Read the address.
let member = Address::read_le(&mut reader)?;
// Read the stake.
let stake = u64::read_le(&mut reader)?;
// Read the is_open flag.
let is_open = bool::read_le(&mut reader)?;
// Insert the member and (stake, is_open).
members.insert(member, (stake, is_open));
}
let members = cfg_chunks!(member_bytes, member_byte_size)
.map(|mut bytes| {
// Read the address.
let member = Address::<N>::read_le(&mut bytes)?;
// Read the stake.
let stake = u64::read_le(&mut bytes)?;
// Read the is_open flag.
let is_open = bool::read_le(&mut bytes)?;
// Insert the member and (stake, is_open).
Ok((member, (stake, is_open)))
})
.collect::<Result<IndexMap<_, _>, std::io::Error>>()?;

// Read the total stake.
let total_stake = u64::read_le(&mut reader)?;
// Construct the committee.
Expand Down
8 changes: 5 additions & 3 deletions ledger/committee/src/lib.rs
Expand Up @@ -28,11 +28,14 @@ use console::{
program::{Literal, LiteralType},
types::{Address, Field},
};
use ledger_narwhal_batch_header::BatchHeader;

use indexmap::IndexMap;
use ledger_narwhal_batch_header::BatchHeader;
use std::collections::HashSet;

#[cfg(not(feature = "serial"))]
use rayon::prelude::*;

/// The minimum amount of stake required for a validator to bond.
pub const MIN_VALIDATOR_STAKE: u64 = 10_000_000_000_000u64; // microcredits
/// The minimum amount of stake required for a delegator to bond.
Expand Down Expand Up @@ -349,7 +352,6 @@ mod tests {
use console::prelude::TestRng;

use parking_lot::RwLock;
use rayon::prelude::*;
use std::sync::Arc;

type CurrentNetwork = console::network::MainnetV0;
Expand All @@ -359,7 +361,7 @@ mod tests {
// Initialize a tracker for the leaders.
let leaders = Arc::new(RwLock::new(IndexMap::<Address<CurrentNetwork>, i64>::new()));
// Iterate through the rounds.
(1..=num_rounds).into_par_iter().for_each(|round| {
cfg_into_iter!(1..=num_rounds).for_each(|round| {
// Compute the leader.
let leader = committee.get_leader(round).unwrap();
// Increment the leader count for the current leader.
Expand Down
6 changes: 5 additions & 1 deletion ledger/narwhal/batch-header/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ license = "Apache-2.0"
edition = "2021"

[features]
default = [ ]
default = [ "rayon" ]
serial = [ "console/serial" ]
wasm = [ "console/wasm" ]
test-helpers = [ "narwhal-transmission-id/test-helpers", "time" ]
Expand All @@ -43,6 +43,10 @@ version = "=0.16.19"
version = "2.0"
features = [ "serde" ]

[dependencies.rayon]
version = "1"
optional = true

[dependencies.serde_json]
version = "1.0"
features = [ "preserve_order" ]
Expand Down
13 changes: 8 additions & 5 deletions ledger/narwhal/batch-header/src/bytes.rs
Expand Up @@ -60,12 +60,15 @@ impl<N: Network> FromBytes for BatchHeader<N> {
Self::MAX_CERTIFICATES
)));
}

// Read the previous certificate ID bytes.
let mut previous_certificate_id_bytes =
vec![0u8; num_previous_certificate_ids as usize * Field::<N>::size_in_bytes()];
reader.read_exact(&mut previous_certificate_id_bytes)?;
// Read the previous certificate IDs.
let mut previous_certificate_ids = IndexSet::new();
for _ in 0..num_previous_certificate_ids {
// Read the certificate ID.
previous_certificate_ids.insert(Field::read_le(&mut reader)?);
}
let previous_certificate_ids = cfg_chunks!(previous_certificate_id_bytes, Field::<N>::size_in_bytes())
.map(Field::read_le)
.collect::<Result<IndexSet<_>, _>>()?;

// Read the signature.
let signature = Signature::read_le(&mut reader)?;
Expand Down
6 changes: 5 additions & 1 deletion ledger/narwhal/batch-header/src/lib.rs
Expand Up @@ -26,9 +26,13 @@ use console::{
prelude::*,
types::Field,
};
use indexmap::IndexSet;
use narwhal_transmission_id::TransmissionID;

use indexmap::IndexSet;

#[cfg(not(feature = "serial"))]
use rayon::prelude::*;

#[derive(Clone, PartialEq, Eq)]
pub struct BatchHeader<N: Network> {
/// The batch ID, defined as the hash of the author, round number, timestamp, transmission IDs,
Expand Down

0 comments on commit e9d5f8f

Please sign in to comment.