diff --git a/ledger/src/tree_version.rs b/ledger/src/tree_version.rs index 169fe7476..01c860114 100644 --- a/ledger/src/tree_version.rs +++ b/ledger/src/tree_version.rs @@ -23,8 +23,8 @@ impl TreeVersion for V2 { type Account = Account; type TokenId = TokenId; - fn hash_node(depth: usize, left: Fp, right: Fp) -> Fp { - let param = format!("MinaMklTree{:03}", depth); + fn hash_node(height: usize, left: Fp, right: Fp) -> Fp { + let param = format!("MinaMklTree{height:03}"); crate::hash::hash_with_kimchi(param.as_str(), &[left, right]) } diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 9f2177217..8e7f50d9c 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -368,13 +368,25 @@ pub enum ActionKind { TransitionFrontierSyncLedgerStakingSuccess, TransitionFrontierSyncLedgerInit, TransitionFrontierSyncLedgerSuccess, + TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted, TransitionFrontierSyncLedgerSnarkedChildAccountsReceived, + TransitionFrontierSyncLedgerSnarkedChildAccountsRejected, + TransitionFrontierSyncLedgerSnarkedChildHashesAccepted, TransitionFrontierSyncLedgerSnarkedChildHashesReceived, - TransitionFrontierSyncLedgerSnarkedPeerQueryError, - TransitionFrontierSyncLedgerSnarkedPeerQueryInit, - TransitionFrontierSyncLedgerSnarkedPeerQueryPending, - TransitionFrontierSyncLedgerSnarkedPeerQueryRetry, - TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess, + TransitionFrontierSyncLedgerSnarkedChildHashesRejected, + TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted, + TransitionFrontierSyncLedgerSnarkedNumAccountsReceived, + TransitionFrontierSyncLedgerSnarkedNumAccountsRejected, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess, TransitionFrontierSyncLedgerSnarkedPeersQuery, TransitionFrontierSyncLedgerSnarkedPending, TransitionFrontierSyncLedgerSnarkedSuccess, @@ -406,7 +418,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 313; + pub const COUNT: u16 = 325; } impl std::fmt::Display for ActionKind { @@ -1365,27 +1377,63 @@ impl ActionKindGet for TransitionFrontierSyncLedgerSnarkedAction { match self { Self::Pending => ActionKind::TransitionFrontierSyncLedgerSnarkedPending, Self::PeersQuery => ActionKind::TransitionFrontierSyncLedgerSnarkedPeersQuery, - Self::PeerQueryInit { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryInit + Self::PeerQueryNumAccountsInit { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit } - Self::PeerQueryPending { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryPending + Self::PeerQueryNumAccountsPending { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending } - Self::PeerQueryRetry { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryRetry + Self::PeerQueryNumAccountsRetry { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry } - Self::PeerQueryError { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryError + Self::PeerQueryNumAccountsError { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError } - Self::PeerQuerySuccess { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess + Self::PeerQueryNumAccountsSuccess { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess + } + Self::NumAccountsReceived { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsReceived + } + Self::NumAccountsAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted + } + Self::NumAccountsRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsRejected + } + Self::PeerQueryAddressInit { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit + } + Self::PeerQueryAddressPending { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending + } + Self::PeerQueryAddressRetry { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry + } + Self::PeerQueryAddressError { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError + } + Self::PeerQueryAddressSuccess { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess } Self::ChildHashesReceived { .. } => { ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesReceived } + Self::ChildHashesAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesAccepted + } + Self::ChildHashesRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesRejected + } Self::ChildAccountsReceived { .. } => { ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsReceived } + Self::ChildAccountsAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted + } + Self::ChildAccountsRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsRejected + } Self::Success => ActionKind::TransitionFrontierSyncLedgerSnarkedSuccess, } } diff --git a/node/src/ledger/ledger_service.rs b/node/src/ledger/ledger_service.rs index 23ba2424b..02b0bfec6 100644 --- a/node/src/ledger/ledger_service.rs +++ b/node/src/ledger/ledger_service.rs @@ -22,7 +22,7 @@ use ledger::{ validate_block::block_body_hash, }, verifier::Verifier, - Account, AccountIndex, BaseLedger, Database, Mask, TreeVersion, UnregisterBehavior, + Account, BaseLedger, Database, Mask, UnregisterBehavior, }; use mina_hasher::Fp; use mina_p2p_messages::{ @@ -68,11 +68,6 @@ use crate::{ use super::{ledger_empty_hash_at_depth, LedgerAddress, LEDGER_DEPTH}; -fn ledger_hash(depth: usize, left: Fp, right: Fp) -> Fp { - let height = LEDGER_DEPTH - depth - 1; - ledger::V2::hash_node(height, left, right) -} - fn merkle_root(mask: &mut Mask) -> LedgerHash { MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into() } @@ -166,6 +161,54 @@ impl LedgerCtx { .or_else(|| self.sync.mask(hash)) } + fn copy_snarked_ledger_contents( + &mut self, + origin_snarked_ledger_hash: LedgerHash, + target_snarked_ledger_hash: LedgerHash, + overwrite: bool, + ) -> Result { + if !overwrite + && self + .snarked_ledgers + .contains_key(&target_snarked_ledger_hash) + { + return Ok(false); + } + + let origin = self + .snarked_ledgers + .get_mut(&origin_snarked_ledger_hash) + .ok_or(format!( + "Tried to copy from non-existing snarked ledger with hash: {}", + origin_snarked_ledger_hash.to_string() + ))?; + + let target = origin.copy(); + self.snarked_ledgers + .insert(target_snarked_ledger_hash, target); + + Ok(true) + } + + fn compute_snarked_ledger_hashes( + &mut self, + snarked_ledger_hash: &LedgerHash, + ) -> Result<(), String> { + let origin = self + .snarked_ledgers + .get_mut(&snarked_ledger_hash) + .ok_or(format!( + "Cannot hash non-existing snarked ledger: {}", + snarked_ledger_hash.to_string() + ))?; + + // Our ledger is lazy when it comes to hashing, but retrieving the + // merkle root hash forces all pending hashes to be computed. + let _force_hashing = origin.merkle_root(); + + Ok(()) + } + /// Returns a mutable reference to the [StagedLedger] with the specified `hash` if it exists or `None` otherwise. fn staged_ledger_mut(&mut self, hash: &LedgerHash) -> Option<&mut StagedLedger> { match self.staged_ledgers.get_mut(&hash) { @@ -377,36 +420,39 @@ pub trait LedgerService: redux::Service { } impl TransitionFrontierSyncLedgerSnarkedService for T { - fn hashes_set( + fn compute_snarked_ledger_hashes( &mut self, - snarked_ledger_hash: LedgerHash, - parent: &LedgerAddress, - (left, right): (LedgerHash, LedgerHash), + snarked_ledger_hash: &LedgerHash, ) -> Result<(), String> { - let (left, right) = (left.0.to_field(), right.0.to_field()); - let hash = ledger_hash(parent.length(), left, right); + self.ctx_mut() + .compute_snarked_ledger_hashes(snarked_ledger_hash)?; - let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash); + Ok(()) + } - if hash != mask.get_inner_hash_at_addr(parent.clone())? { - return Err("Inner hash found at address but doesn't match the expected hash".into()); - } + fn copy_snarked_ledger_contents( + &mut self, + origin_snarked_ledger_hash: LedgerHash, + target_snarked_ledger_hash: LedgerHash, + overwrite: bool, + ) -> Result { + self.ctx_mut().copy_snarked_ledger_contents( + origin_snarked_ledger_hash, + target_snarked_ledger_hash, + overwrite, + ) + } - // TODO(binier): the `if` condition is temporary until we make - // sure we don't call `hashes_set` for the same key for the - // same ledger. This can happen E.g. if root snarked ledger - // is the same as staking or next epoch ledger, in which case - // we will sync same ledger twice. That causes assertion to fail - // in `set_cached_hash_unchecked`. - // - // remove once we have an optimization to not sync same ledgers/addrs - // multiple times. - if mask.get_cached_hash(&parent.child_left()).is_none() { - mask.set_cached_hash_unchecked(&parent.child_left(), left); - mask.set_cached_hash_unchecked(&parent.child_right(), right); - } + fn child_hashes_get( + &mut self, + snarked_ledger_hash: LedgerHash, + parent: &LedgerAddress, + ) -> Result<(LedgerHash, LedgerHash), String> { + let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash); + let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?); + let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?); - Ok(()) + Ok((left_hash, right_hash)) } fn accounts_set( @@ -414,27 +460,19 @@ impl TransitionFrontierSyncLedgerSnarkedService for T { snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, accounts: Vec, - ) -> Result<(), ()> { - // TODO(binier): validate hashes - let mut addr = parent.clone(); - let first_addr = loop { - if addr.length() == LEDGER_DEPTH { - break addr; - } - addr = addr.child_left(); - }; + ) -> Result { let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash); - - let first_index = first_addr.to_index(); - accounts + let accounts: Vec<_> = accounts .into_iter() - .enumerate() - .try_for_each(|(index, account)| { - let index = AccountIndex(first_index.0 + index as u64); - mask.set_at_index(index, Box::new((&account).into())) - })?; + .map(|account| Box::new((&account).into())) + .collect(); - Ok(()) + mask.set_all_accounts_rooted_at(parent.clone(), &accounts) + .or_else(|()| Err("Failed when setting accounts".to_owned()))?; + + let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?); + + Ok(computed_hash) } } @@ -452,11 +490,6 @@ impl TransitionFrontierSyncLedgerStagedService for T { .ctx_mut() .sync .snarked_ledger_mut(snarked_ledger_hash.clone()); - // TODO(binier): TMP. Remove for prod version. - snarked_ledger - .validate_inner_hashes() - .map_err(|_| "downloaded hash and recalculated mismatch".to_owned())?; - let mask = snarked_ledger.copy(); let staged_ledger = if let Some(parts) = parts { @@ -1085,6 +1118,8 @@ fn dump_application_to_file( mod tests { use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1; + use crate::ledger::hash_node_at_depth; + use super::*; #[test] @@ -1101,7 +1136,7 @@ mod tests { (addr, expected_hash, left, right) }) .for_each(|(address, expected_hash, left, right)| { - let hash = ledger_hash(address.length(), left.0.to_field(), right.0.to_field()); + let hash = hash_node_at_depth(address.length(), left.0.to_field(), right.0.to_field()); let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into(); assert_eq!(hash.to_string(), expected_hash); }); diff --git a/node/src/ledger/mod.rs b/node/src/ledger/mod.rs index 5c13bfc6b..be92eba6f 100644 --- a/node/src/ledger/mod.rs +++ b/node/src/ledger/mod.rs @@ -1,4 +1,5 @@ mod ledger_config; +use ledger::TreeVersion; pub use ledger_config::*; mod ledger_service; @@ -27,3 +28,100 @@ lazy_static::lazy_static! { pub fn ledger_empty_hash_at_depth(depth: usize) -> LedgerHash { LEDGER_HASH_EMPTIES.get(depth).unwrap().clone() } + +/// Given the hash of the subtree containing all accounts of height `subtree_height` +/// compute the hash of a tree of size `LEDGER_DEPTH` if all other nodes were +/// empty. +pub fn complete_height_tree_with_empties( + content_hash: &LedgerHash, + subtree_height: usize, +) -> LedgerHash { + assert!(LEDGER_DEPTH >= subtree_height); + let content_hash = content_hash.0.to_field(); + + let computed_hash = (subtree_height..LEDGER_DEPTH).fold(content_hash, |prev_hash, height| { + let empty_right = ledger::V2::empty_hash_at_height(height); + ledger::V2::hash_node(height, prev_hash, empty_right) + }); + + LedgerHash::from_fp(computed_hash) +} + +/// Returns the minimum tree height required for storing `num_accounts` accounts. +pub fn tree_height_for_num_accounts(num_accounts: u64) -> usize { + if num_accounts == 1 { + 1 + } else if num_accounts.is_power_of_two() { + num_accounts.ilog2() as usize + } else { + num_accounts.next_power_of_two().ilog2() as usize + } +} + +/// Given the hash of the subtree containing `num_accounts` accounts +/// compute the hash of a tree of size `LEDGER_DEPTH` if all other nodes were +/// empty. +/// +/// NOTE: For out of range sizes, en empty tree hash is returned. +pub fn complete_num_accounts_tree_with_empties( + contents_hash: &LedgerHash, + num_accounts: u64, +) -> LedgerHash { + // Note, we assume there is always at least one account + if num_accounts == 0 { + return LedgerHash::from_fp(ledger::V2::empty_hash_at_height(LEDGER_DEPTH)); + } + + let subtree_height = tree_height_for_num_accounts(num_accounts); + + // This would not be a valid number of accounts because it doesn't fit the tree + if subtree_height > LEDGER_DEPTH { + LedgerHash::from_fp(ledger::V2::empty_hash_at_height(LEDGER_DEPTH)) + } else { + complete_height_tree_with_empties(contents_hash, subtree_height) + } +} + +pub fn hash_node_at_depth( + depth: usize, + left: mina_hasher::Fp, + right: mina_hasher::Fp, +) -> mina_hasher::Fp { + let height = LEDGER_DEPTH - depth - 1; + ledger::V2::hash_node(height, left, right) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_complete_with_empties() { + let subtree_height = 14; + let expected_hash: LedgerHash = "jwxdRe86RJV99CZbxZzb4JoDwEnvNQbc6Ha8iPx7pr3FxYpjHBG" + .parse() + .unwrap(); + let contents_hash = "jwav4pBszibQqek634VUQEc5WZAbF3CnT7sMyhqXe3vucyXdjJs" + .parse() + .unwrap(); + + let actual_hash = complete_height_tree_with_empties(&contents_hash, subtree_height); + + assert_eq!(expected_hash, actual_hash); + } + + #[test] + fn test_complete_with_empties_with_num_accounts() { + let subtree_height = 8517; + let expected_hash: LedgerHash = "jwxdRe86RJV99CZbxZzb4JoDwEnvNQbc6Ha8iPx7pr3FxYpjHBG" + .parse() + .unwrap(); + let contents_hash = "jwav4pBszibQqek634VUQEc5WZAbF3CnT7sMyhqXe3vucyXdjJs" + .parse() + .unwrap(); + + let actual_hash = complete_num_accounts_tree_with_empties(&contents_hash, subtree_height); + + assert_eq!(expected_hash, actual_hash); + } +} diff --git a/node/src/p2p/p2p_effects.rs b/node/src/p2p/p2p_effects.rs index e22cc4234..5db5303ea 100644 --- a/node/src/p2p/p2p_effects.rs +++ b/node/src/p2p/p2p_effects.rs @@ -104,16 +104,29 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM P2pDisconnectionAction::Init { .. } => {} P2pDisconnectionAction::Finish { peer_id } => { if let Some(s) = store.state().transition_frontier.sync.ledger() { - let rpc_ids = s + let snarked_ledger_num_accounts_rpc_id = s .snarked() - .map(|s| s.peer_query_pending_rpc_ids(&peer_id).collect()) + .and_then(|s| s.peer_num_accounts_rpc_id(&peer_id)); + let snarked_ledger_address_rpc_ids = s + .snarked() + .map(|s| s.peer_address_query_pending_rpc_ids(&peer_id).collect()) .unwrap_or(vec![]); let staged_ledger_parts_fetch_rpc_id = s.staged().and_then(|s| s.parts_fetch_rpc_id(&peer_id)); - for rpc_id in rpc_ids { + for rpc_id in snarked_ledger_address_rpc_ids { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id, + error: PeerLedgerQueryError::Disconnected, + }, + ); + } + + if let Some(rpc_id) = snarked_ledger_num_accounts_rpc_id { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { peer_id, rpc_id, error: PeerLedgerQueryError::Disconnected, @@ -244,11 +257,13 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery); } P2pChannelsRpcAction::Timeout { peer_id, id } => { - store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { - peer_id, - rpc_id: id, - error: PeerLedgerQueryError::Timeout, - }); + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id: id, + error: PeerLedgerQueryError::Timeout, + }, + ); store.dispatch( TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError { peer_id, @@ -274,7 +289,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM match response.as_ref() { None => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { peer_id, rpc_id: id, error: PeerLedgerQueryError::DataUnavailable, @@ -330,7 +345,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM Some(P2pRpcResponse::LedgerQuery(answer)) => match answer { MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(left, right) => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id: id, response: PeerLedgerQueryResponse::ChildHashes( @@ -342,7 +357,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM } MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(accounts) => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id: id, response: PeerLedgerQueryResponse::ChildAccounts( @@ -351,7 +366,20 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM }, ); } - _ => {} + MinaLedgerSyncLedgerAnswerStableV2::NumAccounts( + count, + contents_hash, + ) => { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id: id, + response: PeerLedgerQueryResponse::NumAccounts( + count.as_u64(), contents_hash.clone() + ), + }, + ); + } }, Some(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock( parts, diff --git a/node/src/transition_frontier/sync/ledger/snarked/mod.rs b/node/src/transition_frontier/sync/ledger/snarked/mod.rs index 150c16a6e..d1d84a5d3 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/mod.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/mod.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; pub enum PeerLedgerQueryResponse { ChildHashes(LedgerHash, LedgerHash), ChildAccounts(Vec), + NumAccounts(u64, LedgerHash), } impl PeerLedgerQueryResponse { @@ -28,6 +29,10 @@ impl PeerLedgerQueryResponse { pub fn is_child_accounts(&self) -> bool { matches!(self, Self::ChildAccounts(..)) } + + pub fn is_num_accounts(&self) -> bool { + matches!(self, Self::NumAccounts(..)) + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs index b9911fdcf..f72dd8af9 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs @@ -7,10 +7,15 @@ use crate::p2p::PeerId; use crate::transition_frontier::sync::ledger::TransitionFrontierSyncLedgerState; use super::{ - PeerLedgerQueryError, PeerLedgerQueryResponse, PeerRpcState, + LedgerQueryQueued, PeerLedgerQueryError, PeerLedgerQueryResponse, PeerRpcState, TransitionFrontierSyncLedgerSnarkedState, }; +/// Once we reach subtrees of this height, we begin performing +/// queries to fetch all the accounts in the subtree at once +/// instead of fetching intermediary hashes. +pub const ACCOUNT_SUBTREE_HEIGHT: usize = 6; + pub type TransitionFrontierSyncLedgerSnarkedActionWithMeta = redux::ActionWithMeta; pub type TransitionFrontierSyncLedgerSnarkedActionWithMetaRef<'a> = @@ -20,25 +25,64 @@ pub type TransitionFrontierSyncLedgerSnarkedActionWithMetaRef<'a> = pub enum TransitionFrontierSyncLedgerSnarkedAction { Pending, PeersQuery, - PeerQueryInit { + + // For NumAccounts query + PeerQueryNumAccountsInit { + peer_id: PeerId, + }, + PeerQueryNumAccountsPending { + peer_id: PeerId, + rpc_id: P2pRpcId, + }, + PeerQueryNumAccountsRetry { + peer_id: PeerId, + }, + PeerQueryNumAccountsError { + peer_id: PeerId, + rpc_id: P2pRpcId, + error: PeerLedgerQueryError, + }, + PeerQueryNumAccountsSuccess { + peer_id: PeerId, + rpc_id: P2pRpcId, + response: PeerLedgerQueryResponse, + }, + NumAccountsReceived { + num_accounts: u64, + contents_hash: LedgerHash, + sender: PeerId, + }, + NumAccountsAccepted { + num_accounts: u64, + contents_hash: LedgerHash, + sender: PeerId, + }, + NumAccountsRejected { + num_accounts: u64, + sender: PeerId, + }, + + // For child hashes and content queries + PeerQueryAddressInit { address: LedgerAddress, + expected_hash: LedgerHash, peer_id: PeerId, }, - PeerQueryPending { + PeerQueryAddressPending { address: LedgerAddress, peer_id: PeerId, rpc_id: P2pRpcId, }, - PeerQueryRetry { + PeerQueryAddressRetry { address: LedgerAddress, peer_id: PeerId, }, - PeerQueryError { + PeerQueryAddressError { peer_id: PeerId, rpc_id: P2pRpcId, error: PeerLedgerQueryError, }, - PeerQuerySuccess { + PeerQueryAddressSuccess { peer_id: PeerId, rpc_id: P2pRpcId, response: PeerLedgerQueryResponse, @@ -48,11 +92,30 @@ pub enum TransitionFrontierSyncLedgerSnarkedAction { hashes: (LedgerHash, LedgerHash), sender: PeerId, }, + ChildHashesAccepted { + address: LedgerAddress, + hashes: (LedgerHash, LedgerHash), + previous_hashes: (LedgerHash, LedgerHash), + }, + ChildHashesRejected { + address: LedgerAddress, + hashes: (LedgerHash, LedgerHash), + sender: PeerId, + }, ChildAccountsReceived { address: LedgerAddress, accounts: Vec, sender: PeerId, }, + ChildAccountsAccepted { + address: LedgerAddress, + count: u64, + sender: PeerId, + }, + ChildAccountsRejected { + address: LedgerAddress, + sender: PeerId, + }, Success, } @@ -65,21 +128,139 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar }) } TransitionFrontierSyncLedgerSnarkedAction::PeersQuery => { + // This condition passes if: + // - there are available peers to query + // - there is a snarked ledger to sync + // - there are either queued num_accounts or address queries + // or queries to retry let peers_available = state .p2p .ready_peers_iter() .any(|(_, p)| p.channels.rpc.can_send_request()); - peers_available - && state - .transition_frontier - .sync - .ledger() - .and_then(|s| s.snarked()) - .map_or(false, |s| { - s.sync_next().is_some() || s.sync_retry_iter().next().is_some() - }) + let sync_next_available = state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + s.is_num_accounts_query_next() + || s.sync_address_next().is_some() + || s.sync_address_retry_iter().next().is_some() + }); + peers_available && sync_next_available + } + + // num accounts + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => { + None.or_else(|| { + let target_best_tip = state.transition_frontier.sync.best_tip()?; + let ledger = state.transition_frontier.sync.ledger()?.snarked()?; + let target = ledger.target(); + + // True if the next queued query is NumAccounts + let check_num_accounts = match ledger { + TransitionFrontierSyncLedgerSnarkedState::Pending { + pending_num_accounts: None, + .. + } => ledger.is_num_accounts_query_next(), + _ => false, + }; + + let peer = state.p2p.get_ready_peer(peer_id)?; + let check_peer_available = check_peer_available(peer, target, target_best_tip); + + Some(check_num_accounts && check_peer_available) + }) + .unwrap_or(false) + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .map_or(false, |pending| { + pending + .attempts + .get(peer_id) + .map(|peer_rpc_state| matches!(peer_rpc_state, PeerRpcState::Init { .. })) + .unwrap_or(false) + }), + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { + None.or_else(|| { + let target_best_tip = state.transition_frontier.sync.best_tip()?; + let ledger = state.transition_frontier.sync.ledger()?.snarked()?; + let target = ledger.target(); + + // True if the next queued query is NumAccounts + let check_num_accounts = match ledger { + TransitionFrontierSyncLedgerSnarkedState::Pending { + pending_num_accounts: Some(_), + .. + } => ledger.is_num_accounts_query_next(), + _ => false, + }; + + let peer = state.p2p.get_ready_peer(peer_id)?; + let check_peer_available = check_peer_available(peer, target, target_best_tip); + + Some(check_num_accounts && check_peer_available) + }) + .unwrap_or(false) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { address, peer_id } => { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { + peer_id, + rpc_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + s.peer_num_account_query_get(peer_id, *rpc_id) + .and_then(|s| s.attempts.get(peer_id)) + .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) + }), + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + // TODO(tizoc): check if expected response kind is correct. + s.peer_num_account_query_get(peer_id, *rpc_id) + .and_then(|s| s.attempts.get(peer_id)) + .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) + }), + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { sender, .. } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()), + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { sender, .. } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()), + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { .. } => true, // TODO(sync): implement + + // hashes and contents + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + address, + peer_id, + expected_hash: _, + } => { None.or_else(|| { let target_best_tip = state.transition_frontier.sync.best_tip()?; let ledger = state.transition_frontier.sync.ledger()?.snarked()?; @@ -89,39 +270,34 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar // from a peer and it matches the one requested by this action. let check_next_addr = match ledger { TransitionFrontierSyncLedgerSnarkedState::Pending { - pending, - next_addr, + queue, + pending_addresses: pending, .. - } => next_addr.as_ref().map_or(false, |next_addr| { - next_addr == address - && (next_addr.to_index().0 != 0 || pending.is_empty()) + } => queue.front().map_or(false, |query| { + if let LedgerQueryQueued::Address { + address: next_addr, .. + } = query + { + next_addr == address + && (next_addr.to_index().0 != 0 || pending.is_empty()) + } else { + false + } }), _ => false, }; let peer = state.p2p.get_ready_peer(peer_id)?; - let check_peer_available = { - let peer_best_tip = peer.best_tip.as_ref()?; - if !peer.channels.rpc.can_send_request() { - false - } else if target.staged.is_some() { - // if peer has same best tip, then he has same root - // so we can sync root snarked+staged ledger from that peer. - target_best_tip.hash() == peer_best_tip.hash() - } else { - &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.staking_epoch_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.next_epoch_ledger_hash() - } - }; + let check_peer_available = check_peer_available(peer, target, target_best_tip); Some(check_next_addr && check_peer_available) }) .unwrap_or(false) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { address, peer_id } => { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + address, + peer_id, + } => { None.or_else(|| { let target_best_tip = state.transition_frontier.sync.best_tip()?; let ledger = state.transition_frontier.sync.ledger()?.snarked()?; @@ -133,32 +309,19 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .transition_frontier .sync .ledger() - .and_then(|s| s.snarked()?.sync_retry_iter().next()) + .and_then(|s| s.snarked()?.sync_address_retry_iter().next()) .map_or(false, |addr| &addr == address); let peer = state.p2p.get_ready_peer(peer_id)?; - let check_peer_available = { - let peer_best_tip = peer.best_tip.as_ref()?; - if !peer.channels.rpc.can_send_request() { - false - } else if target.staged.is_some() { - // if peer has same best tip, then he has same root - // so we can sync root snarked+staged ledger from that peer. - target_best_tip.hash() == peer_best_tip.hash() - } else { - &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.staking_epoch_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.next_epoch_ledger_hash() - } - }; + let check_peer_available = check_peer_available(peer, target, target_best_tip); Some(check_next_addr && check_peer_available) }) .unwrap_or(false) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { peer_id, .. } => state + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { + peer_id, .. + } => state .transition_frontier .sync .ledger() @@ -169,20 +332,24 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .filter_map(|(_, query_state)| query_state.attempts.get(peer_id)) .any(|peer_rpc_state| matches!(peer_rpc_state, PeerRpcState::Init { .. })) }), - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id, + .. } => state .transition_frontier .sync .ledger() .and_then(|s| s.snarked()) .map_or(false, |s| { - s.peer_query_get(peer_id, *rpc_id) + s.peer_address_query_get(peer_id, *rpc_id) .and_then(|(_, s)| s.attempts.get(peer_id)) .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) }), - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { + peer_id, + rpc_id, + .. } => { state .transition_frontier @@ -192,7 +359,7 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .map_or(false, |s| { // TODO(binier): check if expected response // kind is correct. - s.peer_query_get(peer_id, *rpc_id) + s.peer_address_query_get(peer_id, *rpc_id) .and_then(|(_, s)| s.attempts.get(peer_id)) .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) }) @@ -202,7 +369,7 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar sender, .. } => { - address.length() < LEDGER_DEPTH - 1 + address.length() < LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT && state .transition_frontier .sync @@ -211,21 +378,45 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .and_then(|s| s.attempts.get(sender)) .map_or(false, |s| s.is_success()) } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { address, .. } => { + // The hashes have been received, and during the check the pending value must + // be present because the expected hash is there + address.length() < LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT + && state + .transition_frontier + .sync + .ledger() + .and_then(|s| Some(s.snarked()?.fetch_pending()?.contains_key(address))) + .unwrap_or(false) + } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { .. } => true, // TODO(sync): implement TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { address, sender, .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()), + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address, + count, + sender, } => { - state - .transition_frontier - .sync - .ledger() - .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) - .and_then(|s| s.attempts.get(sender)) - // TODO(binier): check if expected response - // kind is correct. - .map_or(false, |s| s.is_success()) + *count > 0 + && state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) + .and_then(|s| s.attempts.get(sender)) + // TODO(tizoc): check if expected response kind is correct. + .map_or(false, |s| s.is_success()) } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { .. } => true, // TODO(sync): implement TransitionFrontierSyncLedgerSnarkedAction::Success => state .transition_frontier .sync @@ -233,16 +424,42 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .and_then(|s| s.snarked()) .map_or(false, |s| match s { TransitionFrontierSyncLedgerSnarkedState::Pending { - pending, - next_addr, + queue, + pending_addresses: pending, .. - } => next_addr.is_none() && pending.is_empty(), + } => queue.is_empty() && pending.is_empty(), _ => false, }), } } } +fn check_peer_available( + peer: &p2p::P2pPeerStatusReady, + target: &crate::transition_frontier::sync::ledger::SyncLedgerTarget, + target_best_tip: &openmina_core::block::BlockWithHash< + std::sync::Arc, + >, +) -> bool { + None.or_else(|| { + let peer_best_tip = peer.best_tip.as_ref()?; + let available = if !peer.channels.rpc.can_send_request() { + false + } else if target.staged.is_some() { + // if peer has same best tip, then he has same root + // so we can sync root snarked+staged ledger from that peer. + target_best_tip.hash() == peer_best_tip.hash() + } else { + &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() + || &target.snarked_ledger_hash == peer_best_tip.staking_epoch_ledger_hash() + || &target.snarked_ledger_hash == peer_best_tip.next_epoch_ledger_hash() + }; + + Some(available) + }) + .unwrap_or(false) +} + use crate::transition_frontier::{ sync::{ledger::TransitionFrontierSyncLedgerAction, TransitionFrontierSyncAction}, TransitionFrontierAction, diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs index b756eaa21..8d4d9f23a 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs @@ -3,15 +3,46 @@ use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest}; use p2p::PeerId; use redux::ActionMeta; -use crate::ledger::{LedgerAddress, LEDGER_DEPTH}; +use crate::ledger::{hash_node_at_depth, LedgerAddress, LEDGER_DEPTH}; use crate::Store; use super::{ PeerLedgerQueryResponse, TransitionFrontierSyncLedgerSnarkedAction, - TransitionFrontierSyncLedgerSnarkedService, + TransitionFrontierSyncLedgerSnarkedService, ACCOUNT_SUBTREE_HEIGHT, }; -fn query_peer_init( +fn peer_query_num_accounts_init(store: &mut Store, peer_id: PeerId) { + let Some((ledger_hash, rpc_id)) = None.or_else(|| { + let state = store.state(); + let ledger = state.transition_frontier.sync.ledger()?; + let ledger_hash = ledger.snarked()?.ledger_hash(); + + let p = state.p2p.get_ready_peer(&peer_id)?; + let rpc_id = p.channels.rpc.next_local_rpc_id(); + + Some((ledger_hash.clone(), rpc_id)) + }) else { + return; + }; + + if store.dispatch(P2pChannelsRpcAction::RequestSend { + peer_id, + id: rpc_id, + request: P2pRpcRequest::LedgerQuery( + ledger_hash, + MinaLedgerSyncLedgerQueryStableV1::NumAccounts, + ), + }) { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + rpc_id, + }, + ); + } +} + +fn peer_query_address_init( store: &mut Store, peer_id: PeerId, address: LedgerAddress, @@ -29,7 +60,7 @@ fn query_peer_init( return; }; - let query = if address.length() >= LEDGER_DEPTH - 1 { + let query = if address.length() >= LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT { MinaLedgerSyncLedgerQueryStableV1::WhatContents(address.clone().into()) } else { MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(address.clone().into()) @@ -41,7 +72,7 @@ fn query_peer_init( request: P2pRpcRequest::LedgerQuery(ledger_hash, query), }) { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { address, peer_id, rpc_id, @@ -70,19 +101,35 @@ impl TransitionFrontierSyncLedgerSnarkedAction { .collect::>(); peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1)); + // If this dispatches, we can avoid even trying the following steps because we will + // not query address unless we have completed the Num_accounts request first. + if let Some((peer_id, _)) = peer_ids.first() { + if store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { + peer_id: *peer_id, + }, + ) || store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { + peer_id: *peer_id, + }, + ) { + return; + } + } + let mut retry_addresses = store .state() .transition_frontier .sync .ledger() .and_then(|s| s.snarked()) - .map_or(vec![], |s| s.sync_retry_iter().collect()); + .map_or(vec![], |s| s.sync_address_retry_iter().collect()); retry_addresses.reverse(); for (peer_id, _) in peer_ids { if let Some(address) = retry_addresses.last() { if store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { peer_id, address: address.clone(), }, @@ -98,12 +145,14 @@ impl TransitionFrontierSyncLedgerSnarkedAction { .sync .ledger() .and_then(|s| s.snarked()) - .and_then(|s| s.sync_next()); + .and_then(|s| s.sync_address_next()); match address { - Some(address) => { + Some((address, expected_hash)) => { + // This dispatch here will pop from the queue and update sync_next store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { peer_id, + expected_hash, address, }, ); @@ -113,23 +162,113 @@ impl TransitionFrontierSyncLedgerSnarkedAction { } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { peer_id, address } => { - query_peer_init(store, *peer_id, address.clone()); + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => { + peer_query_num_accounts_init(store, *peer_id) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { peer_id, address } => { - query_peer_init(store, *peer_id, address.clone()); + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { + peer_query_num_accounts_init(store, *peer_id) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { .. } => { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { .. } => { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + response, + .. + } => { + match response { + PeerLedgerQueryResponse::NumAccounts(count, contents_hash) => { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { + num_accounts: *count, + contents_hash: contents_hash.clone(), + sender: *peer_id, + }, + ); + } + // TODO(tizoc): These shouldn't happen, log some warning or something + PeerLedgerQueryResponse::ChildHashes(_, _) => {} + PeerLedgerQueryResponse::ChildAccounts(_) => {} + } + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { + num_accounts, + contents_hash, + sender, + } => { + let Some(snarked_ledger_hash) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + Some(snarked_ledger.ledger_hash().clone()) + }) else { + return; + }; + + // Given the claimed number of accounts we can figure out the height of the subtree, + // and compute the root hash assuming all other nodes contain empty hashes. + // The result must match the snarked ledger hash for this response to be considered + // valid. + // NOTE: incorrect account numbers may be accepted (if they fall in the same range) + // because what is actually being validated is the content hash and tree height, + // not the actual number of accounts. + let actual_hash = crate::ledger::complete_num_accounts_tree_with_empties( + contents_hash, + *num_accounts, + ); + + if snarked_ledger_hash == actual_hash { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { + num_accounts: *num_accounts, + contents_hash: contents_hash.clone(), + sender: *sender, + }, + ); + } else { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { + num_accounts: *num_accounts, + sender: *sender, + }, + ); + } + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { .. } => { + if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); + } + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + peer_id, + address, + expected_hash: _, + } => { + peer_query_address_init(store, *peer_id, address.clone()); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + peer_id, + address, + } => { + peer_query_address_init(store, *peer_id, address.clone()); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { .. } => { + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id, response, } => { let ledger = store.state().transition_frontier.sync.ledger(); let Some(address) = ledger - .and_then(|s| s.snarked()?.peer_query_get(peer_id, *rpc_id)) + .and_then(|s| s.snarked()?.peer_address_query_get(peer_id, *rpc_id)) .map(|(addr, _)| addr.clone()) else { return; @@ -154,49 +293,135 @@ impl TransitionFrontierSyncLedgerSnarkedAction { }, ); } + // TODO(tizoc): This shouldn't happen, log some warning or something + PeerLedgerQueryResponse::NumAccounts(_, _) => {} } } TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { address, - hashes, + hashes: (left_hash, right_hash), + sender, .. } => { - let Some(snarked_ledger_hash) = None.or_else(|| { - let ledger = store.state().transition_frontier.sync.ledger()?; - Some(ledger.snarked()?.ledger_hash().clone()) + let Some((snarked_ledger_hash, parent_hash)) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + let parent_hash = snarked_ledger + .fetch_pending()? + .get(address)? + .expected_hash + .clone(); + Some((snarked_ledger.ledger_hash().clone(), parent_hash)) }) else { return; }; - store - .service - .hashes_set(snarked_ledger_hash, address, hashes.clone()) + + let actual_hash = hash_node_at_depth( + address.length(), + left_hash.0.to_field(), + right_hash.0.to_field(), + ); + if actual_hash != parent_hash.0.to_field() { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { + address: address.clone(), + hashes: (left_hash.clone(), right_hash.clone()), + sender: *sender, + }, + ); + + return; + } + + // TODO: for async ledger this needs an intermediary action + let (previous_left_hash, previous_right_hash) = store + .service() + .child_hashes_get(snarked_ledger_hash, address) .unwrap(); + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { + address: address.clone(), + hashes: (left_hash.clone(), right_hash.clone()), + previous_hashes: (previous_left_hash, previous_right_hash), + }, + ); + } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { .. } => { if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); } } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { address, accounts, - .. + sender, } => { - let Some(snarked_ledger_hash) = None.or_else(|| { - let ledger = store.state().transition_frontier.sync.ledger()?; - Some(ledger.snarked()?.ledger_hash().clone()) + let Some((snarked_ledger_hash, parent_hash)) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + Some(( + snarked_ledger.ledger_hash().clone(), + snarked_ledger + .fetch_pending()? + .get(address)? + .expected_hash + .clone(), + )) }) else { return; }; - store + + // After setting the accounts, we get the new computed hash. + // It must be equal to the parent node hash, otherwise we got + // bad data from the peer. + let computed_hash = store .service - .accounts_set(snarked_ledger_hash, address, accounts.clone()) + .accounts_set(snarked_ledger_hash.clone(), address, accounts.clone()) .unwrap(); + if computed_hash != parent_hash { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { + address: address.clone(), + sender: *sender, + }, + ); + return; + } + + // Setting accounts doesn't immediately compute the hashes in the merkle tree, + // so we force that here before continuing. + let compute_hashes_result = store + .service + .compute_snarked_ledger_hashes(&snarked_ledger_hash); + + if let Err(_) = compute_hashes_result { + // TODO(tizoc): log this error + } + + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address: address.clone(), + count: accounts.len() as u64, + sender: *sender, + }, + ); + } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { .. } => { if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { .. } => {} TransitionFrontierSyncLedgerSnarkedAction::Success => {} } } diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs index 9445ef48e..b027c038a 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs @@ -1,7 +1,12 @@ -use crate::ledger::{ledger_empty_hash_at_depth, LEDGER_DEPTH}; +use crate::{ + ledger::{ledger_empty_hash_at_depth, tree_height_for_num_accounts, LEDGER_DEPTH}, + transition_frontier::sync::ledger::snarked::{ + LedgerNumAccountsQueryPending, LedgerQueryQueued, + }, +}; use super::{ - LedgerQueryPending, PeerRpcState, TransitionFrontierSyncLedgerSnarkedAction, + LedgerAddressQueryPending, PeerRpcState, TransitionFrontierSyncLedgerSnarkedAction, TransitionFrontierSyncLedgerSnarkedActionWithMetaRef, TransitionFrontierSyncLedgerSnarkedState, }; @@ -13,18 +18,143 @@ impl TransitionFrontierSyncLedgerSnarkedState { // handled in parent reducer. } TransitionFrontierSyncLedgerSnarkedAction::PeersQuery => {} - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { address, peer_id } => { + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => { + if let Self::Pending { + queue, + pending_num_accounts, + .. + } = self + { + let next = queue.pop_front(); + debug_assert!(matches!(next, Some(LedgerQueryQueued::NumAccounts))); + + *pending_num_accounts = Some(LedgerNumAccountsQueryPending { + time: meta.time(), + attempts: std::iter::once(( + *peer_id, + PeerRpcState::Init { time: meta.time() }, + )) + .collect(), + }); + } + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + rpc_id, + } => { + let Self::Pending { + pending_num_accounts: Some(pending), + .. + } = self + else { + return; + }; + + let Some(rpc_state) = pending.attempts.get_mut(peer_id) else { + return; + }; + + *rpc_state = PeerRpcState::Pending { + time: meta.time(), + rpc_id: *rpc_id, + }; + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { if let Self::Pending { - pending, - next_addr, - end_addr, + pending_num_accounts: Some(pending), .. } = self { + pending + .attempts + .insert(*peer_id, PeerRpcState::Init { time: meta.time() }); + } + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { + peer_id, + rpc_id, + error, + } => { + let Some(rpc_state) = self.peer_num_account_query_state_get_mut(peer_id, *rpc_id) + else { + return; + }; + + *rpc_state = PeerRpcState::Error { + time: meta.time(), + rpc_id: *rpc_id, + error: error.clone(), + }; + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id, + .. + } => { + let Some(rpc_state) = self.peer_num_account_query_state_get_mut(peer_id, *rpc_id) + else { + return; + }; + *rpc_state = PeerRpcState::Success { + time: meta.time(), + rpc_id: *rpc_id, + }; + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { + num_accounts: accepted_num_accounts, + contents_hash, + .. + } => { + let Self::Pending { + pending_num_accounts, + num_accounts, + num_accounts_accepted, + num_hashes_accepted, + queue, + .. + } = self + else { + return; + }; + + *num_accounts = *accepted_num_accounts; + *num_accounts_accepted = 0; + *num_hashes_accepted = 0; + *pending_num_accounts = None; + + // We know at which node to begin querying, so we skip all the intermediary depths + queue.push_back(LedgerQueryQueued::Address { + address: ledger::Address::first( + LEDGER_DEPTH - tree_height_for_num_accounts(*num_accounts), + ), + expected_hash: contents_hash.clone(), + }); + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + address, + expected_hash, + peer_id, + } => { + if let Self::Pending { + queue, + pending_addresses: pending, + .. + } = self + { + let _next = queue.pop_front(); + //debug_assert_eq!(next.as_ref().map(|p| &p.0), Some(address)); + pending.insert( address.clone(), - LedgerQueryPending { + LedgerAddressQueryPending { time: meta.time(), + expected_hash: expected_hash.clone(), attempts: std::iter::once(( *peer_id, PeerRpcState::Init { time: meta.time() }, @@ -32,30 +162,17 @@ impl TransitionFrontierSyncLedgerSnarkedState { .collect(), }, ); - *next_addr = next_addr - .as_ref() - .map(|addr| { - addr.next() - .filter(|addr| { - let mut end_addr = end_addr.clone(); - while end_addr.length() < addr.length() { - end_addr = end_addr.child_right(); - } - while end_addr.length() > addr.length() { - let Some(addr) = end_addr.parent() else { - return true; - }; - end_addr = addr; - } - addr <= &end_addr - }) - .unwrap_or_else(|| addr.next_depth()) - }) - .filter(|addr| addr.length() < LEDGER_DEPTH); } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { address, peer_id } => { - if let Self::Pending { pending, .. } = self { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + address, + peer_id, + } => { + if let Self::Pending { + pending_addresses: pending, + .. + } = self + { if let Some(pending) = pending.get_mut(address) { pending .attempts @@ -63,12 +180,16 @@ impl TransitionFrontierSyncLedgerSnarkedState { } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { address, peer_id, rpc_id, } => { - let Self::Pending { pending, .. } = self else { + let Self::Pending { + pending_addresses: pending, + .. + } = self + else { return; }; let Some(rpc_state) = pending @@ -83,12 +204,13 @@ impl TransitionFrontierSyncLedgerSnarkedState { rpc_id: *rpc_id, }; } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { peer_id, rpc_id, error, } => { - let Some(rpc_state) = self.peer_query_get_mut(peer_id, *rpc_id) else { + let Some(rpc_state) = self.peer_address_query_state_get_mut(peer_id, *rpc_id) + else { return; }; @@ -98,10 +220,13 @@ impl TransitionFrontierSyncLedgerSnarkedState { error: error.clone(), }; } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { + peer_id, + rpc_id, + .. } => { - let Some(rpc_state) = self.peer_query_get_mut(peer_id, *rpc_id) else { + let Some(rpc_state) = self.peer_address_query_state_get_mut(peer_id, *rpc_id) + else { return; }; *rpc_state = PeerRpcState::Success { @@ -109,48 +234,74 @@ impl TransitionFrontierSyncLedgerSnarkedState { rpc_id: *rpc_id, }; } - TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { address, hashes, - .. + previous_hashes, } => { let Self::Pending { - pending, - next_addr, - end_addr, + queue, + pending_addresses: pending, + num_hashes_accepted, .. } = self else { return; }; - let addr = address; - pending.remove(&addr); + + // Once hashes are accepted, we can consider this query fulfilled + pending.remove(address); + let (left, right) = hashes; + let (previous_left, previous_right) = previous_hashes; - let empty_hash = ledger_empty_hash_at_depth(addr.length() + 1); - if right == &empty_hash { - *next_addr = - Some(addr.next_depth()).filter(|addr| addr.length() < LEDGER_DEPTH); - let addr = match left == &empty_hash { - true => addr.child_left(), - false => addr.child_right(), - }; - if addr.length() > end_addr.length() - || (addr.length() == end_addr.length() - && addr.to_index() < end_addr.to_index()) - { - *end_addr = addr.prev().unwrap_or(addr); - } + // TODO(tizoc): for non-stale hashes, we can consider the full subtree + // as accepted. Given the value of `num_accounts` and the position + // in the tree we could estimate how many accounts and hashes + // from that subtree will be skipped and add them to the count. + + // Empty node hashes are not counted in the stats. + let empty = ledger_empty_hash_at_depth(address.length() + 1); + *num_hashes_accepted += (*left != empty) as u64 + (*right != empty) as u64; + + if left != previous_left { + queue.push_back(LedgerQueryQueued::Address { + address: address.child_left(), + expected_hash: left.clone(), + }); + } + if right != previous_right { + queue.push_back(LedgerQueryQueued::Address { + address: address.child_right(), + expected_hash: right.clone(), + }); } } - TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { - address, .. + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address, + count, + .. } => { - let Self::Pending { pending, .. } = self else { + let Self::Pending { + pending_addresses: pending, + num_accounts_accepted, + .. + } = self + else { return; }; + + *num_accounts_accepted += count; pending.remove(address); } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } TransitionFrontierSyncLedgerSnarkedAction::Success => { let Self::Pending { target, .. } = self else { return; diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs index 618efb53f..caf206074 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs @@ -3,17 +3,38 @@ use mina_p2p_messages::v2::{LedgerHash, MinaBaseAccountBinableArgStableV2}; use crate::ledger::LedgerAddress; pub trait TransitionFrontierSyncLedgerSnarkedService: redux::Service { - fn hashes_set( + /// For the given ledger, compute the merkle root hash, forcing + /// all pending hashes to be computed too. + fn compute_snarked_ledger_hashes( + &mut self, + snarked_ledger_hash: &LedgerHash, + ) -> Result<(), String>; + + /// Creates a new copy of the ledger stored under the `origin` hash + /// and stores it under the `target` hash. If `overwrite` is false, + /// only copy the ledger if the target doesn't exist already. + fn copy_snarked_ledger_contents( + &mut self, + origin: LedgerHash, + target: LedgerHash, + overwrite: bool, + ) -> Result; + + /// For the given ledger, get the two children hashes at the `parent` + /// address. + fn child_hashes_get( &mut self, snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, - hashes: (LedgerHash, LedgerHash), - ) -> Result<(), String>; + ) -> Result<(LedgerHash, LedgerHash), String>; + /// For the given ledger, sets all accounts in `accounts` under + /// the subtree starting at the `parent` address. The result + /// is the hash computed for that subtree. fn accounts_set( &mut self, snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, accounts: Vec, - ) -> Result<(), ()>; + ) -> Result; } diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs index c105fee0a..8b08e7556 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs @@ -1,18 +1,18 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use mina_p2p_messages::v2::LedgerHash; use redux::Timestamp; use serde::{Deserialize, Serialize}; -use crate::ledger::LedgerAddress; +use crate::ledger::{tree_height_for_num_accounts, LedgerAddress}; use crate::p2p::channels::rpc::P2pRpcId; use crate::p2p::PeerId; use crate::rpc::LedgerSyncProgress; use crate::transition_frontier::sync::ledger::SyncLedgerTarget; -use super::PeerLedgerQueryError; +use super::{PeerLedgerQueryError, ACCOUNT_SUBTREE_HEIGHT}; -static SYNC_PENDING_EMPTY: BTreeMap = BTreeMap::new(); +static SYNC_PENDING_EMPTY: BTreeMap = BTreeMap::new(); #[serde_with::serde_as] #[derive(Serialize, Deserialize, Debug, Clone)] @@ -21,12 +21,16 @@ pub enum TransitionFrontierSyncLedgerSnarkedState { Pending { time: Timestamp, target: SyncLedgerTarget, - + num_accounts: u64, + num_accounts_accepted: u64, + num_hashes_accepted: u64, + /// Queue of addresses to query and the expected contents hash + queue: VecDeque, + /// Pending ongoing address queries and their attempts #[serde_as(as = "Vec<(_, _)>")] - pending: BTreeMap, - /// `None` means we are done. - next_addr: Option, - end_addr: LedgerAddress, + pending_addresses: BTreeMap, + /// Pending num account query attempts + pending_num_accounts: Option, }, Success { time: Timestamp, @@ -35,7 +39,23 @@ pub enum TransitionFrontierSyncLedgerSnarkedState { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LedgerQueryPending { +pub enum LedgerQueryQueued { + NumAccounts, + Address { + address: LedgerAddress, + expected_hash: LedgerHash, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LedgerAddressQueryPending { + pub time: Timestamp, + pub expected_hash: LedgerHash, + pub attempts: BTreeMap, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LedgerNumAccountsQueryPending { pub time: Timestamp, pub attempts: BTreeMap, } @@ -68,6 +88,15 @@ impl PeerRpcState { } } + pub fn rpc_id(&self) -> Option { + match self { + Self::Init { .. } => None, + Self::Pending { rpc_id, .. } => Some(*rpc_id), + Self::Error { rpc_id, .. } => Some(*rpc_id), + Self::Success { rpc_id, .. } => Some(*rpc_id), + } + } + pub fn is_pending(&self) -> bool { matches!(self, Self::Pending { .. }) } @@ -86,9 +115,12 @@ impl TransitionFrontierSyncLedgerSnarkedState { Self::Pending { time, target, - pending: Default::default(), - next_addr: Some(LedgerAddress::root()), - end_addr: LedgerAddress::root(), + num_accounts: 0, + num_accounts_accepted: 0, + num_hashes_accepted: 0, + queue: vec![LedgerQueryQueued::NumAccounts].into(), + pending_addresses: Default::default(), + pending_num_accounts: Default::default(), } } @@ -103,16 +135,39 @@ impl TransitionFrontierSyncLedgerSnarkedState { &self.target().snarked_ledger_hash } - pub fn fetch_pending(&self) -> Option<&BTreeMap> { + pub fn is_num_accounts_query_next(&self) -> bool { + match self { + Self::Pending { queue, .. } => queue + .front() + .map_or(false, |q| matches!(q, LedgerQueryQueued::NumAccounts)), + _ => false, + } + } + + pub fn num_accounts_pending(&self) -> Option<&LedgerNumAccountsQueryPending> { match self { - Self::Pending { pending, .. } => Some(pending), + Self::Pending { + pending_num_accounts, + .. + } => pending_num_accounts.as_ref(), _ => None, } } - pub fn sync_retry_iter(&self) -> impl '_ + Iterator { + pub fn fetch_pending(&self) -> Option<&BTreeMap> { + match self { + Self::Pending { + pending_addresses, .. + } => Some(pending_addresses), + _ => None, + } + } + + pub fn sync_address_retry_iter(&self) -> impl '_ + Iterator { let pending = match self { - Self::Pending { pending, .. } => pending, + Self::Pending { + pending_addresses, .. + } => pending_addresses, _ => &SYNC_PENDING_EMPTY, }; pending @@ -121,113 +176,131 @@ impl TransitionFrontierSyncLedgerSnarkedState { .map(|(addr, _)| addr.clone()) } - pub fn sync_next(&self) -> Option { + pub fn sync_address_next(&self) -> Option<(LedgerAddress, LedgerHash)> { match self { - Self::Pending { next_addr, .. } => next_addr.clone(), + Self::Pending { queue, .. } => match queue.front().map(|a| a.clone()) { + Some(LedgerQueryQueued::Address { + address, + expected_hash, + }) => Some((address, expected_hash)), + _ => None, + }, _ => None, } } pub fn estimation(&self) -> Option { - const BITS: usize = 35; - - if let Self::Success { .. } = self { - return Some(LedgerSyncProgress { - fetched: 1, - estimation: 1, - }); + match self { + TransitionFrontierSyncLedgerSnarkedState::Pending { + num_accounts, + num_accounts_accepted, + num_hashes_accepted, + .. + } if *num_accounts > 0 => { + // TODO(tizoc): this approximation is very rough, could be improved. + // Also we count elements to be fetched and not request to be made which + // would be more accurate (accounts are feched in groups of 64, hashes of 2). + let tree_height = tree_height_for_num_accounts(*num_accounts); + let fill_ratio = (*num_accounts as f64) / 2f64.powf(tree_height as f64); + let num_hashes_estimate = 2u64.pow((tree_height - ACCOUNT_SUBTREE_HEIGHT) as u32); + let num_hashes_estimate = (num_hashes_estimate as f64 * fill_ratio).ceil() as u64; + let fetched = *num_accounts_accepted + num_hashes_accepted; + let estimation = fetched.max(*num_accounts + num_hashes_estimate); + + Some(LedgerSyncProgress { + fetched, + estimation, + }) + } + TransitionFrontierSyncLedgerSnarkedState::Success { .. } => { + return Some(LedgerSyncProgress { + fetched: 1, + estimation: 1, + }) + } + _ => None, } - - let Self::Pending { - next_addr, - end_addr, - .. - } = self - else { - return None; - }; - - let next_addr = next_addr.as_ref()?; - - let current_length = next_addr.length(); - - // The ledger is a binary tree, it synchronizes layer by layer, the next layer is at most - // twice as big as this layer, but can be smaller (by one). For simplicity, let's call - // a branch or a leaf of the tree a tree element (or an element) and make no distinction. - // This doesn't matter for the estimation. Total 35 layers (0, 1, 2, ..., 34). On the last - // layer there could be 2 ^ 34 items. Of course it is much less than that. So the first - // few layers contain only 1 element. - - // When the sync algorithm asks childs on the item, it gets two values, left and right. - // The algorithm asks childs only on the existing item, so the left child must exist. - // But the right child can be missing. In this case it is marked by the special constant. - // If the sync algorithm encounters a non-existent right child, it sets `end_addr` - // to the address of the left sibling (the last existing element of this layer). - - // The `end_addr` is initialized with layer is zero and position is zero (root). - - // Let it be the first non-existent (right child). - // In extereme case it will be right sibling of root, so layer is zero and position is one. - // Therefore, further `estimated_this_layer` cannot be zero. - let estimated_end_addr = end_addr.next().unwrap_or(end_addr.clone()); - - // The chance of `end_addr` being updated during fetching the layer is 50%, so its length - // (number of layers) may be less than the current layer. Let's calculate end address - // at the current layer. - let estimated_this_layer = - estimated_end_addr.to_index().0 << (current_length - estimated_end_addr.length()); - - // The number of items on the previous layer is twice less than the number of items - // on this layer, but cannot be 0. - let prev_layers = (0..current_length) - .map(|layer| (estimated_this_layer >> (current_length - layer)).max(1)) - .sum::(); - - // Number of layers pending. - let further_layers_number = BITS - 1 - current_length; - // Assume the next layer contains twice as many, but it could be twice as many minus one. - // So the estimate may become smaller. - let estimated_next_layer = estimated_this_layer * 2; - // Sum of powers of 2 is power of 2 minus 1 - let estimated_next_layers = ((1 << further_layers_number) - 1) * estimated_next_layer; - - // We have this many elements on this layer. Add one, because address indexes start at 0. - let this_layer = next_addr.to_index().0 + 1; - - Some(LedgerSyncProgress { - fetched: prev_layers + this_layer, - estimation: prev_layers + estimated_this_layer + estimated_next_layers, - }) } - pub fn peer_query_get( + pub fn peer_num_account_query_get( &self, peer_id: &PeerId, rpc_id: P2pRpcId, - ) -> Option<(&LedgerAddress, &LedgerQueryPending)> { + ) -> Option<&LedgerNumAccountsQueryPending> { match self { - Self::Pending { pending, .. } => { + Self::Pending { + pending_num_accounts: Some(pending), + .. + } => { let expected_rpc_id = rpc_id; - pending.iter().find(|(_, s)| { - s.attempts.get(peer_id).map_or(false, |s| match s { + pending.attempts.get(peer_id).and_then(|s| { + if s.rpc_id()? == expected_rpc_id { + Some(pending) + } else { + None + } + }) + } + _ => None, + } + } + + pub fn peer_num_account_query_state_get_mut( + &mut self, + peer_id: &PeerId, + rpc_id: P2pRpcId, + ) -> Option<&mut PeerRpcState> { + match self { + Self::Pending { + pending_num_accounts, + .. + } => { + let expected_rpc_id = rpc_id; + pending_num_accounts + .as_mut()? + .attempts + .get_mut(peer_id) + .filter(|s| match s { PeerRpcState::Pending { rpc_id, .. } => *rpc_id == expected_rpc_id, PeerRpcState::Error { rpc_id, .. } => *rpc_id == expected_rpc_id, PeerRpcState::Success { rpc_id, .. } => *rpc_id == expected_rpc_id, _ => false, }) + } + _ => None, + } + } + + pub fn peer_address_query_get( + &self, + peer_id: &PeerId, + rpc_id: P2pRpcId, + ) -> Option<(&LedgerAddress, &LedgerAddressQueryPending)> { + match self { + Self::Pending { + pending_addresses, .. + } => { + let expected_rpc_id = rpc_id; + pending_addresses.iter().find(|(_, s)| { + s.attempts + .get(peer_id) + .map_or(false, |s| s.rpc_id() == Some(expected_rpc_id)) }) } _ => None, } } - pub fn peer_query_get_mut( + pub fn peer_address_query_state_get_mut( &mut self, peer_id: &PeerId, rpc_id: P2pRpcId, ) -> Option<&mut PeerRpcState> { match self { - Self::Pending { pending, .. } => { + Self::Pending { + pending_addresses: pending, + .. + } => { let expected_rpc_id = rpc_id; pending.iter_mut().find_map(|(_, s)| { s.attempts.get_mut(peer_id).filter(|s| match s { @@ -242,12 +315,14 @@ impl TransitionFrontierSyncLedgerSnarkedState { } } - pub fn peer_query_pending_rpc_ids<'a>( + pub fn peer_address_query_pending_rpc_ids<'a>( &'a self, peer_id: &'a PeerId, ) -> impl 'a + Iterator { let pending = match self { - Self::Pending { pending, .. } => pending, + Self::Pending { + pending_addresses, .. + } => pending_addresses, _ => &SYNC_PENDING_EMPTY, }; pending.values().filter_map(move |s| { @@ -257,4 +332,20 @@ impl TransitionFrontierSyncLedgerSnarkedState { .and_then(|(_, s)| s.pending_rpc_id()) }) } + + pub fn peer_num_accounts_rpc_id(&self, peer_id: &PeerId) -> Option { + let pending = match self { + Self::Pending { + pending_num_accounts, + .. + } => pending_num_accounts.as_ref(), + _ => None, + }; + + pending? + .attempts + .iter() + .find(|(id, _)| *id == peer_id) + .and_then(|(_, s)| s.pending_rpc_id()) + } } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index e59be6598..a57bbaa0d 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -2,6 +2,7 @@ use p2p::channels::rpc::P2pChannelsRpcAction; use redux::ActionMeta; use crate::p2p::channels::rpc::P2pRpcRequest; +use crate::service::TransitionFrontierSyncLedgerSnarkedService; use crate::transition_frontier::TransitionFrontierService; use crate::Store; @@ -13,7 +14,7 @@ use super::TransitionFrontierSyncAction; impl TransitionFrontierSyncAction { pub fn effects(&self, _: &ActionMeta, store: &mut Store) where - S: TransitionFrontierService, + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, { match self { TransitionFrontierSyncAction::Init { best_tip, .. } => { @@ -57,6 +58,17 @@ impl TransitionFrontierSyncAction { // TODO(binier): cleanup ledgers } TransitionFrontierSyncAction::LedgerStakingPending => { + // The staking ledger is equal to the genesis ledger with changes on top, so + // we use it as a base to save work during synchronization. + let best_tip = store.state().transition_frontier.sync.best_tip().unwrap(); + let target = super::ledger::SyncLedgerTarget::staking_epoch(best_tip); + let origin = best_tip.genesis_ledger_hash().clone(); + let target = target.snarked_ledger_hash; + // TODO: for the async ledger this should be handled in intermediary action + store + .service() + .copy_snarked_ledger_contents(origin, target, false) + .unwrap(); store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerStakingSuccess => { @@ -65,12 +77,48 @@ impl TransitionFrontierSyncAction { } } TransitionFrontierSyncAction::LedgerNextEpochPending => { + // The next epoch ledger is equal to the staking ledger with changes on top, so + // we use it as a base to save work during synchronization. + let sync = &store.state().transition_frontier.sync; + let best_tip = sync.best_tip().unwrap(); + let root_block = sync.root_block().unwrap(); + let Some(next_epoch_sync) = + super::ledger::SyncLedgerTarget::next_epoch(best_tip, root_block) + else { + return; + }; + let origin = + super::ledger::SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash; + let target = next_epoch_sync.snarked_ledger_hash; + // TODO: for the async ledger this should be handled in intermediary action + store + .service() + .copy_snarked_ledger_contents(origin, target, false) + .unwrap(); + store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerNextEpochSuccess => { store.dispatch(TransitionFrontierSyncAction::LedgerRootPending); } TransitionFrontierSyncAction::LedgerRootPending => { + // The transition frontier root ledger is equal to the next epoch ledger with changes + // on top, so we use it as a base to save work during synchronization. + let sync = &store.state().transition_frontier.sync; + let best_tip = sync.best_tip().unwrap(); + let root_block = sync.root_block().unwrap(); + let next_epoch_sync = + super::ledger::SyncLedgerTarget::next_epoch(best_tip, root_block) + .unwrap_or_else(|| { + super::ledger::SyncLedgerTarget::staking_epoch(best_tip) + }); + let origin = next_epoch_sync.snarked_ledger_hash; + let target = root_block.snarked_ledger_hash().clone(); + // TODO: for the async ledger this should be handled in intermediary action + store + .service() + .copy_snarked_ledger_contents(origin, target, false) + .unwrap(); store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerRootSuccess => { diff --git a/node/src/transition_frontier/transition_frontier_effects.rs b/node/src/transition_frontier/transition_frontier_effects.rs index 85fbcc3ac..976a55397 100644 --- a/node/src/transition_frontier/transition_frontier_effects.rs +++ b/node/src/transition_frontier/transition_frontier_effects.rs @@ -178,7 +178,6 @@ pub fn transition_frontier_effects( stats.syncing_block_update(state); } } - // TODO(tizoc): push new snarked roots here? } // Bootstrap/Catchup is practically complete at this point. // This effect is where the finalization part needs to be @@ -323,8 +322,9 @@ fn handle_transition_frontier_sync_ledger_action( } TransitionFrontierSyncLedgerAction::Snarked(a) => { match a { - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { - ref address, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + ref address, + .. } => { if let Some(stats) = store.service.stats() { let (start, end) = (meta.time(), meta.time()); @@ -349,7 +349,7 @@ fn handle_transition_frontier_sync_ledger_action( } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id, ref response, @@ -363,7 +363,7 @@ fn handle_transition_frontier_sync_ledger_action( .ledger() .and_then(|s| s.snarked()) .and_then(|s| { - Some((s.target().kind, s.peer_query_get(&peer_id, rpc_id)?)) + Some((s.target().kind, s.peer_address_query_get(&peer_id, rpc_id)?)) }) .map(|(kind, (_, s))| (kind, s.time, meta.time())) {