Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): More efficient snarked ledger synchronization #292

Merged
merged 4 commits into from Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ledger/src/database/database_impl.rs
Expand Up @@ -61,7 +61,7 @@ impl DatabaseImpl<V2> {
naccounts: self.naccounts,
uuid: next_uuid(),
directory: new_directory,
hashes_matrix: HashesMatrix::new(self.depth as usize),
hashes_matrix: self.hashes_matrix.clone(),
// root_hash: RefCell::new(*self.root_hash.borrow()),
}
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/src/tree_version.rs
Expand Up @@ -23,8 +23,8 @@ impl TreeVersion for V2 {
type Account = Account;
type TokenId = TokenId;

fn hash_node(depth: usize, left: Fp, right: Fp) -> Fp {
let param = format!("MinaMklTree{:03}", depth);
fn hash_node(height: usize, left: Fp, right: Fp) -> Fp {
let param = format!("MinaMklTree{height:03}");

crate::hash::hash_with_kimchi(param.as_str(), &[left, right])
}
Expand Down
92 changes: 76 additions & 16 deletions node/src/action_kind.rs
Expand Up @@ -355,13 +355,28 @@ pub enum ActionKind {
TransitionFrontierSyncLedgerStakingSuccess,
TransitionFrontierSyncLedgerInit,
TransitionFrontierSyncLedgerSuccess,
TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted,
TransitionFrontierSyncLedgerSnarkedChildAccountsReceived,
TransitionFrontierSyncLedgerSnarkedChildAccountsRejected,
TransitionFrontierSyncLedgerSnarkedChildHashesAccepted,
TransitionFrontierSyncLedgerSnarkedChildHashesReceived,
TransitionFrontierSyncLedgerSnarkedPeerQueryError,
TransitionFrontierSyncLedgerSnarkedPeerQueryInit,
TransitionFrontierSyncLedgerSnarkedPeerQueryPending,
TransitionFrontierSyncLedgerSnarkedPeerQueryRetry,
TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess,
TransitionFrontierSyncLedgerSnarkedChildHashesRejected,
TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncPending,
TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncSuccess,
TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted,
TransitionFrontierSyncLedgerSnarkedNumAccountsReceived,
TransitionFrontierSyncLedgerSnarkedNumAccountsRejected,
TransitionFrontierSyncLedgerSnarkedNumAccountsSuccess,
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError,
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit,
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending,
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry,
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess,
TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError,
TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit,
TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending,
TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry,
TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess,
TransitionFrontierSyncLedgerSnarkedPeersQuery,
TransitionFrontierSyncLedgerSnarkedPending,
TransitionFrontierSyncLedgerSnarkedSuccess,
Expand Down Expand Up @@ -392,7 +407,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 325;
pub const COUNT: u16 = 340;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -1155,27 +1170,72 @@ impl ActionKindGet for TransitionFrontierSyncLedgerSnarkedAction {
match self {
Self::Pending => ActionKind::TransitionFrontierSyncLedgerSnarkedPending,
Self::PeersQuery => ActionKind::TransitionFrontierSyncLedgerSnarkedPeersQuery,
Self::PeerQueryInit { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryInit
Self::PeerQueryNumAccountsInit { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit
}
Self::PeerQueryPending { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryPending
Self::PeerQueryNumAccountsPending { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending
}
Self::PeerQueryRetry { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryRetry
Self::PeerQueryNumAccountsRetry { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry
}
Self::PeerQueryError { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryError
Self::PeerQueryNumAccountsError { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError
}
Self::PeerQuerySuccess { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess
Self::PeerQueryNumAccountsSuccess { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess
}
Self::NumAccountsReceived { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsReceived
}
Self::NumAccountsAccepted { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted
}
Self::NumAccountsRejected { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsRejected
}
Self::NumAccountsSuccess { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsSuccess
}
Self::MerkleTreeSyncPending => {
ActionKind::TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncPending
}
Self::PeerQueryAddressInit { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit
}
Self::PeerQueryAddressPending { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending
}
Self::PeerQueryAddressRetry { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry
}
Self::PeerQueryAddressError { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError
}
Self::PeerQueryAddressSuccess { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess
}
Self::ChildHashesReceived { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesReceived
}
Self::ChildHashesAccepted { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesAccepted
}
Self::ChildHashesRejected { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesRejected
}
Self::ChildAccountsReceived { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsReceived
}
Self::ChildAccountsAccepted { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted
}
Self::ChildAccountsRejected { .. } => {
ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsRejected
}
Self::MerkleTreeSyncSuccess => {
ActionKind::TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncSuccess
}
Self::Success => ActionKind::TransitionFrontierSyncLedgerSnarkedSuccess,
}
}
Expand Down
171 changes: 116 additions & 55 deletions node/src/ledger/ledger_service.rs
Expand Up @@ -22,7 +22,7 @@ use ledger::{
validate_block::block_body_hash,
},
verifier::Verifier,
Account, AccountIndex, BaseLedger, Database, Mask, TreeVersion, UnregisterBehavior,
Account, BaseLedger, Database, Mask, UnregisterBehavior,
};
use mina_hasher::Fp;
use mina_p2p_messages::{
Expand Down Expand Up @@ -68,11 +68,6 @@ use crate::{

use super::{ledger_empty_hash_at_depth, LedgerAddress, LEDGER_DEPTH};

fn ledger_hash(depth: usize, left: Fp, right: Fp) -> Fp {
let height = LEDGER_DEPTH - depth - 1;
ledger::V2::hash_node(height, left, right)
}

fn merkle_root(mask: &mut Mask) -> LedgerHash {
MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
}
Expand Down Expand Up @@ -145,6 +140,69 @@ impl LedgerCtx {
.or_else(|| self.sync.mask(hash))
}

/// Returns the mask for a snarked ledger being synchronized or an error if it is not present
pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
self.sync.pending_sync_snarked_ledger_mask(hash)
}

/// Copies the contents of an existing snarked ledger into the target
/// hash under the pending sync snarked ledgers state.
fn copy_snarked_ledger_contents_for_sync(
&mut self,
origin_snarked_ledger_hash: LedgerHash,
target_snarked_ledger_hash: LedgerHash,
overwrite: bool,
) -> Result<bool, String> {
if !overwrite
&& self
.sync
.snarked_ledgers
.contains_key(&target_snarked_ledger_hash)
{
return Ok(false);
}

let origin = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use self.mask(hash) here instead? So that we copy mask either from self.snarked_ledgers or self.sync.snarked_ledgers. Otherwise we won't be able to reuse mid sync data (not sure if that falls within scope of this pr yet though).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@binier it falls in the scope of this PR yes, I just don't handle that case yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the use of mask, I made the calls more strict (new pending_sync_snarked_ledger_mask method) so that they only check for snarked ledgers, and also ones that exists (instead of creating a new empty mask).

For this case in particular I check both origins (already synced, then syncing).

.snarked_ledgers
.get(&origin_snarked_ledger_hash)
.or_else(|| {
// If it doesn't exist in completed ledgers, it may be
// an in-progress ledger from a previous attempt that we can reuse
self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
})
.ok_or(format!(
"Tried to copy from non-existing snarked ledger with hash: {}",
origin_snarked_ledger_hash.to_string()
))?;

let target = origin.copy();
self.sync
.snarked_ledgers
.insert(target_snarked_ledger_hash, target);

Ok(true)
}

fn compute_snarked_ledger_hashes(
&mut self,
snarked_ledger_hash: &LedgerHash,
) -> Result<(), String> {
let origin = self
.snarked_ledgers
.get_mut(&snarked_ledger_hash)
.or_else(|| self.sync.snarked_ledgers.get_mut(&snarked_ledger_hash))
.ok_or(format!(
"Cannot hash non-existing snarked ledger: {}",
snarked_ledger_hash.to_string()
))?;

// Our ledger is lazy when it comes to hashing, but retrieving the
// merkle root hash forces all pending hashes to be computed.
let _force_hashing = origin.merkle_root();

Ok(())
}

/// Returns a mutable reference to the [StagedLedger] with the specified `hash` if it exists or `None` otherwise.
fn staged_ledger_mut(&mut self, hash: &LedgerHash) -> Option<&mut StagedLedger> {
match self.staged_ledgers.get_mut(&hash) {
Expand Down Expand Up @@ -335,6 +393,13 @@ impl LedgerSyncState {
.or_else(|| Some((self.staged_ledgers.get(hash)?.ledger(), true)))
}

fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
self.snarked_ledgers
.get(hash)
.cloned()
.ok_or_else(|| format!("Missing sync snarked ledger {}", hash.to_string()))
}

/// Returns a [Mask] instance for the snarked ledger with [hash]. If it doesn't
/// exist a new instance is created.
fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> &mut Mask {
Expand All @@ -356,64 +421,63 @@ pub trait LedgerService: redux::Service {
}

impl<T: LedgerService> TransitionFrontierSyncLedgerSnarkedService for T {
fn hashes_set(
fn compute_snarked_ledger_hashes(
&mut self,
snarked_ledger_hash: LedgerHash,
parent: &LedgerAddress,
(left, right): (LedgerHash, LedgerHash),
snarked_ledger_hash: &LedgerHash,
) -> Result<(), String> {
let (left, right) = (left.0.to_field(), right.0.to_field());
let hash = ledger_hash(parent.length(), left, right);
self.ctx_mut()
.compute_snarked_ledger_hashes(snarked_ledger_hash)?;

let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash);
Ok(())
}

if hash != mask.get_inner_hash_at_addr(parent.clone())? {
return Err("Inner hash found at address but doesn't match the expected hash".into());
}
fn copy_snarked_ledger_contents_for_sync(
&mut self,
origin_snarked_ledger_hash: LedgerHash,
target_snarked_ledger_hash: LedgerHash,
overwrite: bool,
) -> Result<bool, String> {
self.ctx_mut().copy_snarked_ledger_contents_for_sync(
origin_snarked_ledger_hash,
target_snarked_ledger_hash,
overwrite,
)
}

// TODO(binier): the `if` condition is temporary until we make
// sure we don't call `hashes_set` for the same key for the
// same ledger. This can happen E.g. if root snarked ledger
// is the same as staking or next epoch ledger, in which case
// we will sync same ledger twice. That causes assertion to fail
// in `set_cached_hash_unchecked`.
//
// remove once we have an optimization to not sync same ledgers/addrs
// multiple times.
if mask.get_cached_hash(&parent.child_left()).is_none() {
mask.set_cached_hash_unchecked(&parent.child_left(), left);
mask.set_cached_hash_unchecked(&parent.child_right(), right);
}
fn child_hashes_get(
&mut self,
snarked_ledger_hash: LedgerHash,
parent: &LedgerAddress,
) -> Result<(LedgerHash, LedgerHash), String> {
let mut mask = self
.ctx_mut()
.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);

Ok(())
Ok((left_hash, right_hash))
}

fn accounts_set(
&mut self,
snarked_ledger_hash: LedgerHash,
parent: &LedgerAddress,
accounts: Vec<MinaBaseAccountBinableArgStableV2>,
) -> Result<(), ()> {
// TODO(binier): validate hashes
let mut addr = parent.clone();
let first_addr = loop {
if addr.length() == LEDGER_DEPTH {
break addr;
}
addr = addr.child_left();
};
let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash);

let first_index = first_addr.to_index();
accounts
) -> Result<LedgerHash, String> {
let mut mask = self
.ctx_mut()
.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
let accounts: Vec<_> = accounts
.into_iter()
.enumerate()
.try_for_each(|(index, account)| {
let index = AccountIndex(first_index.0 + index as u64);
mask.set_at_index(index, Box::new((&account).into()))
})?;
.map(|account| Box::new((&account).into()))
.collect();

Ok(())
mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
.or_else(|()| Err("Failed when setting accounts".to_owned()))?;

let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);

Ok(computed_hash)
}
}

Expand All @@ -431,11 +495,6 @@ impl<T: LedgerService> TransitionFrontierSyncLedgerStagedService for T {
.ctx_mut()
.sync
.snarked_ledger_mut(snarked_ledger_hash.clone());
// TODO(binier): TMP. Remove for prod version.
snarked_ledger
.validate_inner_hashes()
.map_err(|_| "downloaded hash and recalculated mismatch".to_owned())?;

let mask = snarked_ledger.copy();

let staged_ledger = if let Some(parts) = parts {
Expand Down Expand Up @@ -1064,6 +1123,8 @@ fn dump_application_to_file(
mod tests {
use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;

use crate::ledger::hash_node_at_depth;

use super::*;

#[test]
Expand All @@ -1080,7 +1141,7 @@ mod tests {
(addr, expected_hash, left, right)
})
.for_each(|(address, expected_hash, left, right)| {
let hash = ledger_hash(address.length(), left.0.to_field(), right.0.to_field());
let hash = hash_node_at_depth(address.length(), left.0.to_field(), right.0.to_field());
let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
assert_eq!(hash.to_string(), expected_hash);
});
Expand Down