From 0fa5b5b0556086e5aa5e8a36a7ea5468432055b7 Mon Sep 17 00:00:00 2001 From: Bruno Deferrari Date: Wed, 20 Mar 2024 09:29:53 -0300 Subject: [PATCH 1/4] fix(ledger): When copying root masks, copy the hashes matrix too If this is not done, the cloned mask will need to recompute its internal hashes, which is expensive. --- ledger/src/database/database_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/database/database_impl.rs b/ledger/src/database/database_impl.rs index 0207085d2..5c8c62a27 100644 --- a/ledger/src/database/database_impl.rs +++ b/ledger/src/database/database_impl.rs @@ -61,7 +61,7 @@ impl DatabaseImpl { naccounts: self.naccounts, uuid: next_uuid(), directory: new_directory, - hashes_matrix: HashesMatrix::new(self.depth as usize), + hashes_matrix: self.hashes_matrix.clone(), // root_hash: RefCell::new(*self.root_hash.borrow()), } } From 564c7eda101091ec55918edab962403d3f444388 Mon Sep 17 00:00:00 2001 From: Bruno Deferrari Date: Fri, 29 Mar 2024 12:35:28 -0300 Subject: [PATCH 2/4] feat(bootstrap): Implement optimized ledger sync mechanism fixup(bootstrap): adjust enabling conditions review: fix to where mid-sync ledgers are stored and retrieved from review: Split num accounts fetch vs merkle tree sync states in transition frontier review: rename some sync status fields to make them less ambiguous fix(transition-frontier): Check that the ledger doesn't exist in the sync snarked ledgers feat(transition-frontier): Make copying of ledgers for sync more strict fix(transition-frontier): Fix ledger copying handling for current state machine that produces the genesis block and syncs immediately refactor(transition-frontier): Remove some code duplication refactor(transition-frontier): Add success states for NumAccounts and MerkleTreeSync (WIP) feat(transition-frontier): Implement missing enabling conditions --- ledger/src/tree_version.rs | 4 +- node/src/ledger/ledger_service.rs | 171 +++++--- node/src/ledger/mod.rs | 99 +++++ node/src/p2p/p2p_effects.rs | 54 ++- .../sync/ledger/snarked/mod.rs | 5 + ...on_frontier_sync_ledger_snarked_actions.rs | 392 ++++++++++++++---- ...on_frontier_sync_ledger_snarked_effects.rs | 308 ++++++++++++-- ...on_frontier_sync_ledger_snarked_reducer.rs | 292 ++++++++++--- ...on_frontier_sync_ledger_snarked_service.rs | 29 +- ...tion_frontier_sync_ledger_snarked_state.rs | 310 +++++++++----- .../transition_frontier_sync_ledger_state.rs | 7 +- .../sync/transition_frontier_sync_effects.rs | 125 +++++- .../transition_frontier_effects.rs | 10 +- 13 files changed, 1451 insertions(+), 355 deletions(-) diff --git a/ledger/src/tree_version.rs b/ledger/src/tree_version.rs index 169fe7476..01c860114 100644 --- a/ledger/src/tree_version.rs +++ b/ledger/src/tree_version.rs @@ -23,8 +23,8 @@ impl TreeVersion for V2 { type Account = Account; type TokenId = TokenId; - fn hash_node(depth: usize, left: Fp, right: Fp) -> Fp { - let param = format!("MinaMklTree{:03}", depth); + fn hash_node(height: usize, left: Fp, right: Fp) -> Fp { + let param = format!("MinaMklTree{height:03}"); crate::hash::hash_with_kimchi(param.as_str(), &[left, right]) } diff --git a/node/src/ledger/ledger_service.rs b/node/src/ledger/ledger_service.rs index 29840cacc..7dc9669f4 100644 --- a/node/src/ledger/ledger_service.rs +++ b/node/src/ledger/ledger_service.rs @@ -22,7 +22,7 @@ use ledger::{ validate_block::block_body_hash, }, verifier::Verifier, - Account, AccountIndex, BaseLedger, Database, Mask, TreeVersion, UnregisterBehavior, + Account, BaseLedger, Database, Mask, UnregisterBehavior, }; use mina_hasher::Fp; use mina_p2p_messages::{ @@ -68,11 +68,6 @@ use crate::{ use super::{ledger_empty_hash_at_depth, LedgerAddress, LEDGER_DEPTH}; -fn ledger_hash(depth: usize, left: Fp, right: Fp) -> Fp { - let height = LEDGER_DEPTH - depth - 1; - ledger::V2::hash_node(height, left, right) -} - fn merkle_root(mask: &mut Mask) -> LedgerHash { MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into() } @@ -145,6 +140,69 @@ impl LedgerCtx { .or_else(|| self.sync.mask(hash)) } + /// Returns the mask for a snarked ledger being synchronized or an error if it is not present + pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result { + self.sync.pending_sync_snarked_ledger_mask(hash) + } + + /// Copies the contents of an existing snarked ledger into the target + /// hash under the pending sync snarked ledgers state. + fn copy_snarked_ledger_contents_for_sync( + &mut self, + origin_snarked_ledger_hash: LedgerHash, + target_snarked_ledger_hash: LedgerHash, + overwrite: bool, + ) -> Result { + if !overwrite + && self + .sync + .snarked_ledgers + .contains_key(&target_snarked_ledger_hash) + { + return Ok(false); + } + + let origin = self + .snarked_ledgers + .get(&origin_snarked_ledger_hash) + .or_else(|| { + // If it doesn't exist in completed ledgers, it may be + // an in-progress ledger from a previous attempt that we can reuse + self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash) + }) + .ok_or(format!( + "Tried to copy from non-existing snarked ledger with hash: {}", + origin_snarked_ledger_hash.to_string() + ))?; + + let target = origin.copy(); + self.sync + .snarked_ledgers + .insert(target_snarked_ledger_hash, target); + + Ok(true) + } + + fn compute_snarked_ledger_hashes( + &mut self, + snarked_ledger_hash: &LedgerHash, + ) -> Result<(), String> { + let origin = self + .snarked_ledgers + .get_mut(&snarked_ledger_hash) + .or_else(|| self.sync.snarked_ledgers.get_mut(&snarked_ledger_hash)) + .ok_or(format!( + "Cannot hash non-existing snarked ledger: {}", + snarked_ledger_hash.to_string() + ))?; + + // Our ledger is lazy when it comes to hashing, but retrieving the + // merkle root hash forces all pending hashes to be computed. + let _force_hashing = origin.merkle_root(); + + Ok(()) + } + /// Returns a mutable reference to the [StagedLedger] with the specified `hash` if it exists or `None` otherwise. fn staged_ledger_mut(&mut self, hash: &LedgerHash) -> Option<&mut StagedLedger> { match self.staged_ledgers.get_mut(&hash) { @@ -335,6 +393,13 @@ impl LedgerSyncState { .or_else(|| Some((self.staged_ledgers.get(hash)?.ledger(), true))) } + fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result { + self.snarked_ledgers + .get(hash) + .cloned() + .ok_or_else(|| format!("Missing sync snarked ledger {}", hash.to_string())) + } + /// Returns a [Mask] instance for the snarked ledger with [hash]. If it doesn't /// exist a new instance is created. fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> &mut Mask { @@ -356,36 +421,41 @@ pub trait LedgerService: redux::Service { } impl TransitionFrontierSyncLedgerSnarkedService for T { - fn hashes_set( + fn compute_snarked_ledger_hashes( &mut self, - snarked_ledger_hash: LedgerHash, - parent: &LedgerAddress, - (left, right): (LedgerHash, LedgerHash), + snarked_ledger_hash: &LedgerHash, ) -> Result<(), String> { - let (left, right) = (left.0.to_field(), right.0.to_field()); - let hash = ledger_hash(parent.length(), left, right); + self.ctx_mut() + .compute_snarked_ledger_hashes(snarked_ledger_hash)?; - let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash); + Ok(()) + } - if hash != mask.get_inner_hash_at_addr(parent.clone())? { - return Err("Inner hash found at address but doesn't match the expected hash".into()); - } + fn copy_snarked_ledger_contents_for_sync( + &mut self, + origin_snarked_ledger_hash: LedgerHash, + target_snarked_ledger_hash: LedgerHash, + overwrite: bool, + ) -> Result { + self.ctx_mut().copy_snarked_ledger_contents_for_sync( + origin_snarked_ledger_hash, + target_snarked_ledger_hash, + overwrite, + ) + } - // TODO(binier): the `if` condition is temporary until we make - // sure we don't call `hashes_set` for the same key for the - // same ledger. This can happen E.g. if root snarked ledger - // is the same as staking or next epoch ledger, in which case - // we will sync same ledger twice. That causes assertion to fail - // in `set_cached_hash_unchecked`. - // - // remove once we have an optimization to not sync same ledgers/addrs - // multiple times. - if mask.get_cached_hash(&parent.child_left()).is_none() { - mask.set_cached_hash_unchecked(&parent.child_left(), left); - mask.set_cached_hash_unchecked(&parent.child_right(), right); - } + fn child_hashes_get( + &mut self, + snarked_ledger_hash: LedgerHash, + parent: &LedgerAddress, + ) -> Result<(LedgerHash, LedgerHash), String> { + let mut mask = self + .ctx_mut() + .pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?; + let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?); + let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?); - Ok(()) + Ok((left_hash, right_hash)) } fn accounts_set( @@ -393,27 +463,21 @@ impl TransitionFrontierSyncLedgerSnarkedService for T { snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, accounts: Vec, - ) -> Result<(), ()> { - // TODO(binier): validate hashes - let mut addr = parent.clone(); - let first_addr = loop { - if addr.length() == LEDGER_DEPTH { - break addr; - } - addr = addr.child_left(); - }; - let mask = self.ctx_mut().sync.snarked_ledger_mut(snarked_ledger_hash); - - let first_index = first_addr.to_index(); - accounts + ) -> Result { + let mut mask = self + .ctx_mut() + .pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?; + let accounts: Vec<_> = accounts .into_iter() - .enumerate() - .try_for_each(|(index, account)| { - let index = AccountIndex(first_index.0 + index as u64); - mask.set_at_index(index, Box::new((&account).into())) - })?; + .map(|account| Box::new((&account).into())) + .collect(); - Ok(()) + mask.set_all_accounts_rooted_at(parent.clone(), &accounts) + .or_else(|()| Err("Failed when setting accounts".to_owned()))?; + + let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?); + + Ok(computed_hash) } } @@ -431,11 +495,6 @@ impl TransitionFrontierSyncLedgerStagedService for T { .ctx_mut() .sync .snarked_ledger_mut(snarked_ledger_hash.clone()); - // TODO(binier): TMP. Remove for prod version. - snarked_ledger - .validate_inner_hashes() - .map_err(|_| "downloaded hash and recalculated mismatch".to_owned())?; - let mask = snarked_ledger.copy(); let staged_ledger = if let Some(parts) = parts { @@ -1064,6 +1123,8 @@ fn dump_application_to_file( mod tests { use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1; + use crate::ledger::hash_node_at_depth; + use super::*; #[test] @@ -1080,7 +1141,7 @@ mod tests { (addr, expected_hash, left, right) }) .for_each(|(address, expected_hash, left, right)| { - let hash = ledger_hash(address.length(), left.0.to_field(), right.0.to_field()); + let hash = hash_node_at_depth(address.length(), left.0.to_field(), right.0.to_field()); let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into(); assert_eq!(hash.to_string(), expected_hash); }); diff --git a/node/src/ledger/mod.rs b/node/src/ledger/mod.rs index 5c13bfc6b..bc384aff4 100644 --- a/node/src/ledger/mod.rs +++ b/node/src/ledger/mod.rs @@ -1,4 +1,5 @@ mod ledger_config; +use ledger::TreeVersion; pub use ledger_config::*; mod ledger_service; @@ -27,3 +28,101 @@ lazy_static::lazy_static! { pub fn ledger_empty_hash_at_depth(depth: usize) -> LedgerHash { LEDGER_HASH_EMPTIES.get(depth).unwrap().clone() } + +/// Given the hash of the subtree containing all accounts of height `subtree_height` +/// compute the hash of a tree of size `LEDGER_DEPTH` if all other nodes were +/// empty. +pub fn complete_height_tree_with_empties( + content_hash: &LedgerHash, + subtree_height: usize, +) -> LedgerHash { + assert!(LEDGER_DEPTH >= subtree_height); + let content_hash = content_hash.0.to_field(); + + let computed_hash = (subtree_height..LEDGER_DEPTH).fold(content_hash, |prev_hash, height| { + let depth = LEDGER_DEPTH - height; + let empty_right = ledger_empty_hash_at_depth(depth).0.to_field(); + ledger::V2::hash_node(height, prev_hash, empty_right) + }); + + LedgerHash::from_fp(computed_hash) +} + +/// Returns the minimum tree height required for storing `num_accounts` accounts. +pub fn tree_height_for_num_accounts(num_accounts: u64) -> usize { + if num_accounts == 1 { + 1 + } else if num_accounts.is_power_of_two() { + num_accounts.ilog2() as usize + } else { + num_accounts.next_power_of_two().ilog2() as usize + } +} + +/// Given the hash of the subtree containing `num_accounts` accounts +/// compute the hash of a tree of size `LEDGER_DEPTH` if all other nodes were +/// empty. +/// +/// NOTE: For out of range sizes, en empty tree hash is returned. +pub fn complete_num_accounts_tree_with_empties( + contents_hash: &LedgerHash, + num_accounts: u64, +) -> LedgerHash { + // Note, we assume there is always at least one account + if num_accounts == 0 { + return ledger_empty_hash_at_depth(0); + } + + let subtree_height = tree_height_for_num_accounts(num_accounts); + + // This would not be a valid number of accounts because it doesn't fit the tree + if subtree_height > LEDGER_DEPTH { + ledger_empty_hash_at_depth(0) + } else { + complete_height_tree_with_empties(contents_hash, subtree_height) + } +} + +pub fn hash_node_at_depth( + depth: usize, + left: mina_hasher::Fp, + right: mina_hasher::Fp, +) -> mina_hasher::Fp { + let height = LEDGER_DEPTH - depth - 1; + ledger::V2::hash_node(height, left, right) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_complete_with_empties() { + let subtree_height = 14; + let expected_hash: LedgerHash = "jwxdRe86RJV99CZbxZzb4JoDwEnvNQbc6Ha8iPx7pr3FxYpjHBG" + .parse() + .unwrap(); + let contents_hash = "jwav4pBszibQqek634VUQEc5WZAbF3CnT7sMyhqXe3vucyXdjJs" + .parse() + .unwrap(); + + let actual_hash = complete_height_tree_with_empties(&contents_hash, subtree_height); + + assert_eq!(expected_hash, actual_hash); + } + + #[test] + fn test_complete_with_empties_with_num_accounts() { + let subtree_height = 8517; + let expected_hash: LedgerHash = "jwxdRe86RJV99CZbxZzb4JoDwEnvNQbc6Ha8iPx7pr3FxYpjHBG" + .parse() + .unwrap(); + let contents_hash = "jwav4pBszibQqek634VUQEc5WZAbF3CnT7sMyhqXe3vucyXdjJs" + .parse() + .unwrap(); + + let actual_hash = complete_num_accounts_tree_with_empties(&contents_hash, subtree_height); + + assert_eq!(expected_hash, actual_hash); + } +} diff --git a/node/src/p2p/p2p_effects.rs b/node/src/p2p/p2p_effects.rs index e22cc4234..5db5303ea 100644 --- a/node/src/p2p/p2p_effects.rs +++ b/node/src/p2p/p2p_effects.rs @@ -104,16 +104,29 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM P2pDisconnectionAction::Init { .. } => {} P2pDisconnectionAction::Finish { peer_id } => { if let Some(s) = store.state().transition_frontier.sync.ledger() { - let rpc_ids = s + let snarked_ledger_num_accounts_rpc_id = s .snarked() - .map(|s| s.peer_query_pending_rpc_ids(&peer_id).collect()) + .and_then(|s| s.peer_num_accounts_rpc_id(&peer_id)); + let snarked_ledger_address_rpc_ids = s + .snarked() + .map(|s| s.peer_address_query_pending_rpc_ids(&peer_id).collect()) .unwrap_or(vec![]); let staged_ledger_parts_fetch_rpc_id = s.staged().and_then(|s| s.parts_fetch_rpc_id(&peer_id)); - for rpc_id in rpc_ids { + for rpc_id in snarked_ledger_address_rpc_ids { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id, + error: PeerLedgerQueryError::Disconnected, + }, + ); + } + + if let Some(rpc_id) = snarked_ledger_num_accounts_rpc_id { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { peer_id, rpc_id, error: PeerLedgerQueryError::Disconnected, @@ -244,11 +257,13 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery); } P2pChannelsRpcAction::Timeout { peer_id, id } => { - store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { - peer_id, - rpc_id: id, - error: PeerLedgerQueryError::Timeout, - }); + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id: id, + error: PeerLedgerQueryError::Timeout, + }, + ); store.dispatch( TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError { peer_id, @@ -274,7 +289,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM match response.as_ref() { None => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { peer_id, rpc_id: id, error: PeerLedgerQueryError::DataUnavailable, @@ -330,7 +345,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM Some(P2pRpcResponse::LedgerQuery(answer)) => match answer { MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(left, right) => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id: id, response: PeerLedgerQueryResponse::ChildHashes( @@ -342,7 +357,7 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM } MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(accounts) => { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id: id, response: PeerLedgerQueryResponse::ChildAccounts( @@ -351,7 +366,20 @@ pub fn node_p2p_effects(store: &mut Store, action: P2pActionWithM }, ); } - _ => {} + MinaLedgerSyncLedgerAnswerStableV2::NumAccounts( + count, + contents_hash, + ) => { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id: id, + response: PeerLedgerQueryResponse::NumAccounts( + count.as_u64(), contents_hash.clone() + ), + }, + ); + } }, Some(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock( parts, diff --git a/node/src/transition_frontier/sync/ledger/snarked/mod.rs b/node/src/transition_frontier/sync/ledger/snarked/mod.rs index 150c16a6e..d1d84a5d3 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/mod.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/mod.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; pub enum PeerLedgerQueryResponse { ChildHashes(LedgerHash, LedgerHash), ChildAccounts(Vec), + NumAccounts(u64, LedgerHash), } impl PeerLedgerQueryResponse { @@ -28,6 +29,10 @@ impl PeerLedgerQueryResponse { pub fn is_child_accounts(&self) -> bool { matches!(self, Self::ChildAccounts(..)) } + + pub fn is_num_accounts(&self) -> bool { + matches!(self, Self::NumAccounts(..)) + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs index b9911fdcf..8e3ad275d 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_actions.rs @@ -7,10 +7,15 @@ use crate::p2p::PeerId; use crate::transition_frontier::sync::ledger::TransitionFrontierSyncLedgerState; use super::{ - PeerLedgerQueryError, PeerLedgerQueryResponse, PeerRpcState, + LedgerAddressQuery, PeerLedgerQueryError, PeerLedgerQueryResponse, PeerRpcState, TransitionFrontierSyncLedgerSnarkedState, }; +/// Once we reach subtrees of this height, we begin performing +/// queries to fetch all the accounts in the subtree at once +/// instead of fetching intermediary hashes. +pub const ACCOUNT_SUBTREE_HEIGHT: usize = 6; + pub type TransitionFrontierSyncLedgerSnarkedActionWithMeta = redux::ActionWithMeta; pub type TransitionFrontierSyncLedgerSnarkedActionWithMetaRef<'a> = @@ -20,25 +25,70 @@ pub type TransitionFrontierSyncLedgerSnarkedActionWithMetaRef<'a> = pub enum TransitionFrontierSyncLedgerSnarkedAction { Pending, PeersQuery, - PeerQueryInit { + + // For NumAccounts query + PeerQueryNumAccountsInit { + peer_id: PeerId, + }, + PeerQueryNumAccountsPending { + peer_id: PeerId, + rpc_id: P2pRpcId, + }, + PeerQueryNumAccountsRetry { + peer_id: PeerId, + }, + PeerQueryNumAccountsError { + peer_id: PeerId, + rpc_id: P2pRpcId, + error: PeerLedgerQueryError, + }, + PeerQueryNumAccountsSuccess { + peer_id: PeerId, + rpc_id: P2pRpcId, + response: PeerLedgerQueryResponse, + }, + NumAccountsReceived { + num_accounts: u64, + contents_hash: LedgerHash, + sender: PeerId, + }, + NumAccountsAccepted { + num_accounts: u64, + contents_hash: LedgerHash, + sender: PeerId, + }, + NumAccountsRejected { + num_accounts: u64, + sender: PeerId, + }, + NumAccountsSuccess { + num_accounts: u64, + contents_hash: LedgerHash, + }, + + MerkleTreeSyncPending, + + // For child hashes and content queries + PeerQueryAddressInit { address: LedgerAddress, + expected_hash: LedgerHash, peer_id: PeerId, }, - PeerQueryPending { + PeerQueryAddressPending { address: LedgerAddress, peer_id: PeerId, rpc_id: P2pRpcId, }, - PeerQueryRetry { + PeerQueryAddressRetry { address: LedgerAddress, peer_id: PeerId, }, - PeerQueryError { + PeerQueryAddressError { peer_id: PeerId, rpc_id: P2pRpcId, error: PeerLedgerQueryError, }, - PeerQuerySuccess { + PeerQueryAddressSuccess { peer_id: PeerId, rpc_id: P2pRpcId, response: PeerLedgerQueryResponse, @@ -48,11 +98,33 @@ pub enum TransitionFrontierSyncLedgerSnarkedAction { hashes: (LedgerHash, LedgerHash), sender: PeerId, }, + ChildHashesAccepted { + address: LedgerAddress, + hashes: (LedgerHash, LedgerHash), + previous_hashes: (LedgerHash, LedgerHash), + sender: PeerId, + }, + ChildHashesRejected { + address: LedgerAddress, + hashes: (LedgerHash, LedgerHash), + sender: PeerId, + }, ChildAccountsReceived { address: LedgerAddress, accounts: Vec, sender: PeerId, }, + ChildAccountsAccepted { + address: LedgerAddress, + count: u64, + sender: PeerId, + }, + ChildAccountsRejected { + address: LedgerAddress, + sender: PeerId, + }, + + MerkleTreeSyncSuccess, Success, } @@ -65,21 +137,155 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar }) } TransitionFrontierSyncLedgerSnarkedAction::PeersQuery => { + // This condition passes if: + // - there are available peers to query + // - there is a snarked ledger to sync + // - there are either queued num_accounts or address queries + // or queries to retry let peers_available = state .p2p .ready_peers_iter() .any(|(_, p)| p.channels.rpc.can_send_request()); - peers_available - && state - .transition_frontier - .sync - .ledger() - .and_then(|s| s.snarked()) - .map_or(false, |s| { - s.sync_next().is_some() || s.sync_retry_iter().next().is_some() - }) + let sync_next_available = state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + s.is_num_accounts_query_next() + || s.sync_address_next().is_some() + || s.sync_address_retry_iter().next().is_some() + }); + peers_available && sync_next_available } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { address, peer_id } => { + + // num accounts + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => None + .or_else(|| { + let target_best_tip = state.transition_frontier.sync.best_tip()?; + let ledger = state.transition_frontier.sync.ledger()?.snarked()?; + let target = ledger.target(); + + let check_num_accounts = matches!( + ledger, + TransitionFrontierSyncLedgerSnarkedState::NumAccountsPending { .. } + ); + + let peer = state.p2p.get_ready_peer(peer_id)?; + let check_peer_available = check_peer_available(peer, target, target_best_tip); + + Some(check_num_accounts && check_peer_available) + }) + .unwrap_or(false), + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .map_or(false, |pending| { + pending + .attempts + .get(peer_id) + .map(|peer_rpc_state| matches!(peer_rpc_state, PeerRpcState::Init { .. })) + .unwrap_or(false) + }), + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { + None.or_else(|| { + let target_best_tip = state.transition_frontier.sync.best_tip()?; + let ledger = state.transition_frontier.sync.ledger()?.snarked()?; + let target = ledger.target(); + + let check_num_accounts = matches!( + ledger, + TransitionFrontierSyncLedgerSnarkedState::NumAccountsPending { .. } + ); + + let peer = state.p2p.get_ready_peer(peer_id)?; + let check_peer_available = check_peer_available(peer, target, target_best_tip); + + Some(check_num_accounts && check_peer_available) + }) + .unwrap_or(false) + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { + peer_id, + rpc_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + s.peer_num_account_query_get(peer_id, *rpc_id) + .and_then(|s| s.attempts.get(peer_id)) + .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) + }), + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id, + .. + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + // TODO(tizoc): check if expected response kind is correct. + s.peer_num_account_query_get(peer_id, *rpc_id) + .and_then(|s| s.attempts.get(peer_id)) + .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) + }), + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { sender, .. } + | TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { sender, .. } + | TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { sender, .. } => { + state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()) + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsSuccess { .. } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.num_accounts_pending()) + .is_some(), + + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncPending => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + matches!( + s, + TransitionFrontierSyncLedgerSnarkedState::NumAccountsSuccess { .. } + ) + }), + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncSuccess => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()) + .map_or(false, |s| { + matches!( + s, + TransitionFrontierSyncLedgerSnarkedState::MerkleTreeSyncPending { .. } + ) + }), + + // hashes and contents + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + address, + peer_id, + expected_hash: _, + } => { None.or_else(|| { let target_best_tip = state.transition_frontier.sync.best_tip()?; let ledger = state.transition_frontier.sync.ledger()?.snarked()?; @@ -88,11 +294,15 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar // This is true if there is a next address that needs to be queried // from a peer and it matches the one requested by this action. let check_next_addr = match ledger { - TransitionFrontierSyncLedgerSnarkedState::Pending { - pending, - next_addr, + TransitionFrontierSyncLedgerSnarkedState::MerkleTreeSyncPending { + queue, + pending_addresses: pending, .. - } => next_addr.as_ref().map_or(false, |next_addr| { + } => queue.front().map_or(false, |query| { + let LedgerAddressQuery { + address: next_addr, .. + } = query; + next_addr == address && (next_addr.to_index().0 != 0 || pending.is_empty()) }), @@ -100,28 +310,16 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar }; let peer = state.p2p.get_ready_peer(peer_id)?; - let check_peer_available = { - let peer_best_tip = peer.best_tip.as_ref()?; - if !peer.channels.rpc.can_send_request() { - false - } else if target.staged.is_some() { - // if peer has same best tip, then he has same root - // so we can sync root snarked+staged ledger from that peer. - target_best_tip.hash() == peer_best_tip.hash() - } else { - &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.staking_epoch_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.next_epoch_ledger_hash() - } - }; + let check_peer_available = check_peer_available(peer, target, target_best_tip); Some(check_next_addr && check_peer_available) }) .unwrap_or(false) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { address, peer_id } => { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + address, + peer_id, + } => { None.or_else(|| { let target_best_tip = state.transition_frontier.sync.best_tip()?; let ledger = state.transition_frontier.sync.ledger()?.snarked()?; @@ -133,32 +331,19 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .transition_frontier .sync .ledger() - .and_then(|s| s.snarked()?.sync_retry_iter().next()) + .and_then(|s| s.snarked()?.sync_address_retry_iter().next()) .map_or(false, |addr| &addr == address); let peer = state.p2p.get_ready_peer(peer_id)?; - let check_peer_available = { - let peer_best_tip = peer.best_tip.as_ref()?; - if !peer.channels.rpc.can_send_request() { - false - } else if target.staged.is_some() { - // if peer has same best tip, then he has same root - // so we can sync root snarked+staged ledger from that peer. - target_best_tip.hash() == peer_best_tip.hash() - } else { - &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.staking_epoch_ledger_hash() - || &target.snarked_ledger_hash - == peer_best_tip.next_epoch_ledger_hash() - } - }; + let check_peer_available = check_peer_available(peer, target, target_best_tip); Some(check_next_addr && check_peer_available) }) .unwrap_or(false) } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { peer_id, .. } => state + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { + peer_id, .. + } => state .transition_frontier .sync .ledger() @@ -169,20 +354,24 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .filter_map(|(_, query_state)| query_state.attempts.get(peer_id)) .any(|peer_rpc_state| matches!(peer_rpc_state, PeerRpcState::Init { .. })) }), - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { + peer_id, + rpc_id, + .. } => state .transition_frontier .sync .ledger() .and_then(|s| s.snarked()) .map_or(false, |s| { - s.peer_query_get(peer_id, *rpc_id) + s.peer_address_query_get(peer_id, *rpc_id) .and_then(|(_, s)| s.attempts.get(peer_id)) .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) }), - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { + peer_id, + rpc_id, + .. } => { state .transition_frontier @@ -190,9 +379,8 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .ledger() .and_then(|s| s.snarked()) .map_or(false, |s| { - // TODO(binier): check if expected response - // kind is correct. - s.peer_query_get(peer_id, *rpc_id) + // TODO(binier): check if expected response kind is correct. + s.peer_address_query_get(peer_id, *rpc_id) .and_then(|(_, s)| s.attempts.get(peer_id)) .map_or(false, |s| matches!(s, PeerRpcState::Pending { .. })) }) @@ -201,8 +389,18 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar address, sender, .. + } + | TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { + address, + sender, + .. + } + | TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { + address, + sender, + .. } => { - address.length() < LEDGER_DEPTH - 1 + address.length() < LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT && state .transition_frontier .sync @@ -215,16 +413,30 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar address, sender, .. + } + | TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { + address, + sender, + } => state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()), + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address, + count, + sender, } => { - state - .transition_frontier - .sync - .ledger() - .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) - .and_then(|s| s.attempts.get(sender)) - // TODO(binier): check if expected response - // kind is correct. - .map_or(false, |s| s.is_success()) + *count > 0 + && state + .transition_frontier + .sync + .ledger() + .and_then(|s| s.snarked()?.fetch_pending()?.get(address)) + .and_then(|s| s.attempts.get(sender)) + .map_or(false, |s| s.is_success()) } TransitionFrontierSyncLedgerSnarkedAction::Success => state .transition_frontier @@ -232,17 +444,43 @@ impl redux::EnablingCondition for TransitionFrontierSyncLedgerSnar .ledger() .and_then(|s| s.snarked()) .map_or(false, |s| match s { - TransitionFrontierSyncLedgerSnarkedState::Pending { - pending, - next_addr, + TransitionFrontierSyncLedgerSnarkedState::MerkleTreeSyncPending { + queue, + pending_addresses: pending, .. - } => next_addr.is_none() && pending.is_empty(), + } => queue.is_empty() && pending.is_empty(), _ => false, }), } } } +fn check_peer_available( + peer: &p2p::P2pPeerStatusReady, + target: &crate::transition_frontier::sync::ledger::SyncLedgerTarget, + target_best_tip: &openmina_core::block::BlockWithHash< + std::sync::Arc, + >, +) -> bool { + None.or_else(|| { + let peer_best_tip = peer.best_tip.as_ref()?; + let available = if !peer.channels.rpc.can_send_request() { + false + } else if target.staged.is_some() { + // if peer has same best tip, then he has same root + // so we can sync root snarked+staged ledger from that peer. + target_best_tip.hash() == peer_best_tip.hash() + } else { + &target.snarked_ledger_hash == peer_best_tip.snarked_ledger_hash() + || &target.snarked_ledger_hash == peer_best_tip.staking_epoch_ledger_hash() + || &target.snarked_ledger_hash == peer_best_tip.next_epoch_ledger_hash() + }; + + Some(available) + }) + .unwrap_or(false) +} + use crate::transition_frontier::{ sync::{ledger::TransitionFrontierSyncLedgerAction, TransitionFrontierSyncAction}, TransitionFrontierAction, diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs index b756eaa21..1be479bec 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_effects.rs @@ -3,15 +3,46 @@ use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest}; use p2p::PeerId; use redux::ActionMeta; -use crate::ledger::{LedgerAddress, LEDGER_DEPTH}; +use crate::ledger::{hash_node_at_depth, LedgerAddress, LEDGER_DEPTH}; use crate::Store; use super::{ PeerLedgerQueryResponse, TransitionFrontierSyncLedgerSnarkedAction, - TransitionFrontierSyncLedgerSnarkedService, + TransitionFrontierSyncLedgerSnarkedService, ACCOUNT_SUBTREE_HEIGHT, }; -fn query_peer_init( +fn peer_query_num_accounts_init(store: &mut Store, peer_id: PeerId) { + let Some((ledger_hash, rpc_id)) = None.or_else(|| { + let state = store.state(); + let ledger = state.transition_frontier.sync.ledger()?; + let ledger_hash = ledger.snarked()?.ledger_hash(); + + let p = state.p2p.get_ready_peer(&peer_id)?; + let rpc_id = p.channels.rpc.next_local_rpc_id(); + + Some((ledger_hash.clone(), rpc_id)) + }) else { + return; + }; + + if store.dispatch(P2pChannelsRpcAction::RequestSend { + peer_id, + id: rpc_id, + request: P2pRpcRequest::LedgerQuery( + ledger_hash, + MinaLedgerSyncLedgerQueryStableV1::NumAccounts, + ), + }) { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + rpc_id, + }, + ); + } +} + +fn peer_query_address_init( store: &mut Store, peer_id: PeerId, address: LedgerAddress, @@ -29,7 +60,7 @@ fn query_peer_init( return; }; - let query = if address.length() >= LEDGER_DEPTH - 1 { + let query = if address.length() >= LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT { MinaLedgerSyncLedgerQueryStableV1::WhatContents(address.clone().into()) } else { MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(address.clone().into()) @@ -41,7 +72,7 @@ fn query_peer_init( request: P2pRpcRequest::LedgerQuery(ledger_hash, query), }) { store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { address, peer_id, rpc_id, @@ -70,19 +101,35 @@ impl TransitionFrontierSyncLedgerSnarkedAction { .collect::>(); peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1)); + // If this dispatches, we can avoid even trying the following steps because we will + // not query address unless we have completed the Num_accounts request first. + if let Some((peer_id, _)) = peer_ids.first() { + if store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { + peer_id: *peer_id, + }, + ) || store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { + peer_id: *peer_id, + }, + ) { + return; + } + } + let mut retry_addresses = store .state() .transition_frontier .sync .ledger() .and_then(|s| s.snarked()) - .map_or(vec![], |s| s.sync_retry_iter().collect()); + .map_or(vec![], |s| s.sync_address_retry_iter().collect()); retry_addresses.reverse(); for (peer_id, _) in peer_ids { if let Some(address) = retry_addresses.last() { if store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { peer_id, address: address.clone(), }, @@ -98,12 +145,14 @@ impl TransitionFrontierSyncLedgerSnarkedAction { .sync .ledger() .and_then(|s| s.snarked()) - .and_then(|s| s.sync_next()); + .and_then(|s| s.sync_address_next()); match address { - Some(address) => { + Some((address, expected_hash)) => { + // This dispatch here will pop from the queue and update sync_next store.dispatch( - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { peer_id, + expected_hash, address, }, ); @@ -113,23 +162,133 @@ impl TransitionFrontierSyncLedgerSnarkedAction { } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { peer_id, address } => { - query_peer_init(store, *peer_id, address.clone()); + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => { + peer_query_num_accounts_init(store, *peer_id) + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { + peer_query_num_accounts_init(store, *peer_id) + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { .. } => { + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + response, + .. + } => { + match response { + PeerLedgerQueryResponse::NumAccounts(count, contents_hash) => { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { + num_accounts: *count, + contents_hash: contents_hash.clone(), + sender: *peer_id, + }, + ); + } + // TODO(tizoc): These shouldn't happen, log some warning or something + PeerLedgerQueryResponse::ChildHashes(_, _) => {} + PeerLedgerQueryResponse::ChildAccounts(_) => {} + } + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { + num_accounts, + contents_hash, + sender, + } => { + let Some(snarked_ledger_hash) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + Some(snarked_ledger.ledger_hash().clone()) + }) else { + return; + }; + + // Given the claimed number of accounts we can figure out the height of the subtree, + // and compute the root hash assuming all other nodes contain empty hashes. + // The result must match the snarked ledger hash for this response to be considered + // valid. + // NOTE: incorrect account numbers may be accepted (if they fall in the same range) + // because what is actually being validated is the content hash and tree height, + // not the actual number of accounts. + let actual_hash = crate::ledger::complete_num_accounts_tree_with_empties( + contents_hash, + *num_accounts, + ); + + if snarked_ledger_hash == actual_hash { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { + num_accounts: *num_accounts, + contents_hash: contents_hash.clone(), + sender: *sender, + }, + ); + } else { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { + num_accounts: *num_accounts, + sender: *sender, + }, + ); + } + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { + num_accounts, + contents_hash, + .. + } => { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsSuccess { + num_accounts: *num_accounts, + contents_hash: contents_hash.clone(), + }, + ); + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsSuccess { .. } => { + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncPending); + } + + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncPending => { + if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { + store + .dispatch(TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncSuccess); + } + } + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncSuccess => { + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); + } + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + peer_id, + address, + expected_hash: _, + } => { + peer_query_address_init(store, *peer_id, address.clone()); } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { peer_id, address } => { - query_peer_init(store, *peer_id, address.clone()); + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + peer_id, + address, + } => { + peer_query_address_init(store, *peer_id, address.clone()); } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { .. } => { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { .. } => { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id, response, } => { let ledger = store.state().transition_frontier.sync.ledger(); let Some(address) = ledger - .and_then(|s| s.snarked()?.peer_query_get(peer_id, *rpc_id)) + .and_then(|s| s.snarked()?.peer_address_query_get(peer_id, *rpc_id)) .map(|(addr, _)| addr.clone()) else { return; @@ -154,49 +313,136 @@ impl TransitionFrontierSyncLedgerSnarkedAction { }, ); } + // TODO(tizoc): This shouldn't happen, log some warning or something + PeerLedgerQueryResponse::NumAccounts(_, _) => {} } } TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { address, - hashes, + hashes: (left_hash, right_hash), + sender, .. } => { - let Some(snarked_ledger_hash) = None.or_else(|| { - let ledger = store.state().transition_frontier.sync.ledger()?; - Some(ledger.snarked()?.ledger_hash().clone()) + let Some((snarked_ledger_hash, parent_hash)) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + let parent_hash = snarked_ledger + .fetch_pending()? + .get(address)? + .expected_hash + .clone(); + Some((snarked_ledger.ledger_hash().clone(), parent_hash)) }) else { return; }; - store - .service - .hashes_set(snarked_ledger_hash, address, hashes.clone()) + + let actual_hash = hash_node_at_depth( + address.length(), + left_hash.0.to_field(), + right_hash.0.to_field(), + ); + if actual_hash != parent_hash.0.to_field() { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { + address: address.clone(), + hashes: (left_hash.clone(), right_hash.clone()), + sender: *sender, + }, + ); + + return; + } + + // TODO: for async ledger this needs an intermediary action + let (previous_left_hash, previous_right_hash) = store + .service() + .child_hashes_get(snarked_ledger_hash, address) .unwrap(); + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { + address: address.clone(), + hashes: (left_hash.clone(), right_hash.clone()), + previous_hashes: (previous_left_hash, previous_right_hash), + sender: *sender, + }, + ); + } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { .. } => { if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); } } + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { address, accounts, - .. + sender, } => { - let Some(snarked_ledger_hash) = None.or_else(|| { - let ledger = store.state().transition_frontier.sync.ledger()?; - Some(ledger.snarked()?.ledger_hash().clone()) + let Some((snarked_ledger_hash, parent_hash)) = None.or_else(|| { + let snarked_ledger = + store.state().transition_frontier.sync.ledger()?.snarked()?; + Some(( + snarked_ledger.ledger_hash().clone(), + snarked_ledger + .fetch_pending()? + .get(address)? + .expected_hash + .clone(), + )) }) else { return; }; - store + + // After setting the accounts, we get the new computed hash. + // It must be equal to the parent node hash, otherwise we got + // bad data from the peer. + let computed_hash = store .service - .accounts_set(snarked_ledger_hash, address, accounts.clone()) + .accounts_set(snarked_ledger_hash.clone(), address, accounts.clone()) .unwrap(); + if computed_hash != parent_hash { + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { + address: address.clone(), + sender: *sender, + }, + ); + return; + } + + // Setting accounts doesn't immediately compute the hashes in the merkle tree, + // so we force that here before continuing. + let compute_hashes_result = store + .service + .compute_snarked_ledger_hashes(&snarked_ledger_hash); + + if let Err(_) = compute_hashes_result { + // TODO(tizoc): log this error + } + + store.dispatch( + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address: address.clone(), + count: accounts.len() as u64, + sender: *sender, + }, + ); + } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { .. } => { if !store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery) { store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::Success); } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { .. } => { + // TODO(tizoc): we do nothing here, but the peer must be punished somehow + store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { .. } => {} TransitionFrontierSyncLedgerSnarkedAction::Success => {} } } diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs index 9445ef48e..5e47115d4 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_reducer.rs @@ -1,7 +1,10 @@ -use crate::ledger::{ledger_empty_hash_at_depth, LEDGER_DEPTH}; +use std::iter; + +use crate::ledger::{ledger_empty_hash_at_depth, tree_height_for_num_accounts, LEDGER_DEPTH}; use super::{ - LedgerQueryPending, PeerRpcState, TransitionFrontierSyncLedgerSnarkedAction, + LedgerAddressQuery, LedgerAddressQueryPending, PeerRpcState, + TransitionFrontierSyncLedgerSnarkedAction, TransitionFrontierSyncLedgerSnarkedActionWithMetaRef, TransitionFrontierSyncLedgerSnarkedState, }; @@ -13,18 +16,161 @@ impl TransitionFrontierSyncLedgerSnarkedState { // handled in parent reducer. } TransitionFrontierSyncLedgerSnarkedAction::PeersQuery => {} - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { address, peer_id } => { - if let Self::Pending { - pending, - next_addr, - end_addr, + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsInit { peer_id } => { + if let Self::NumAccountsPending { + pending_num_accounts, + .. + } = self + { + pending_num_accounts + .attempts + .insert(*peer_id, PeerRpcState::Init { time: meta.time() }); + } + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsPending { + peer_id, + rpc_id, + } => { + let Self::NumAccountsPending { + pending_num_accounts, + .. + } = self + else { + return; + }; + + let Some(rpc_state) = pending_num_accounts.attempts.get_mut(peer_id) else { + return; + }; + + *rpc_state = PeerRpcState::Pending { + time: meta.time(), + rpc_id: *rpc_id, + }; + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsRetry { peer_id } => { + if let Self::NumAccountsPending { + pending_num_accounts, .. } = self { + pending_num_accounts + .attempts + .insert(*peer_id, PeerRpcState::Init { time: meta.time() }); + } + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError { + peer_id, + rpc_id, + error, + } => { + let Some(rpc_state) = self.peer_num_account_query_state_get_mut(peer_id, *rpc_id) + else { + return; + }; + + *rpc_state = PeerRpcState::Error { + time: meta.time(), + rpc_id: *rpc_id, + error: error.clone(), + }; + } + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess { + peer_id, + rpc_id, + .. + } => { + let Some(rpc_state) = self.peer_num_account_query_state_get_mut(peer_id, *rpc_id) + else { + return; + }; + *rpc_state = PeerRpcState::Success { + time: meta.time(), + rpc_id: *rpc_id, + }; + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsAccepted { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } + TransitionFrontierSyncLedgerSnarkedAction::NumAccountsSuccess { + num_accounts, + contents_hash, + } => { + let Self::NumAccountsPending { target, .. } = self else { + return; + }; + + let target = target.clone(); + + *self = Self::NumAccountsSuccess { + time: meta.time(), + target, + num_accounts: *num_accounts, + contents_hash: contents_hash.clone(), + }; + } + + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncPending => { + let Self::NumAccountsSuccess { + target, + num_accounts, + contents_hash, + .. + } = self + else { + return; + }; + + // We know at which node to begin querying, so we skip all the intermediary depths + let first_query = LedgerAddressQuery { + address: ledger::Address::first( + LEDGER_DEPTH - tree_height_for_num_accounts(*num_accounts), + ), + expected_hash: contents_hash.clone(), + }; + + *self = Self::MerkleTreeSyncPending { + time: meta.time(), + target: target.clone(), + total_accounts_expected: *num_accounts, + synced_accounts_count: 0, + synced_hashes_count: 0, + queue: iter::once(first_query).collect(), + pending_addresses: Default::default(), + }; + } + TransitionFrontierSyncLedgerSnarkedAction::MerkleTreeSyncSuccess => { + let Self::MerkleTreeSyncPending { target, .. } = self else { + return; + }; + *self = Self::MerkleTreeSyncSuccess { + time: meta.time(), + target: target.clone(), + }; + } + + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + address, + expected_hash, + peer_id, + } => { + if let Self::MerkleTreeSyncPending { + queue, + pending_addresses: pending, + .. + } = self + { + let _next = queue.pop_front(); + //debug_assert_eq!(next.as_ref().map(|p| &p.0), Some(address)); + pending.insert( address.clone(), - LedgerQueryPending { + LedgerAddressQueryPending { time: meta.time(), + expected_hash: expected_hash.clone(), attempts: std::iter::once(( *peer_id, PeerRpcState::Init { time: meta.time() }, @@ -32,30 +178,17 @@ impl TransitionFrontierSyncLedgerSnarkedState { .collect(), }, ); - *next_addr = next_addr - .as_ref() - .map(|addr| { - addr.next() - .filter(|addr| { - let mut end_addr = end_addr.clone(); - while end_addr.length() < addr.length() { - end_addr = end_addr.child_right(); - } - while end_addr.length() > addr.length() { - let Some(addr) = end_addr.parent() else { - return true; - }; - end_addr = addr; - } - addr <= &end_addr - }) - .unwrap_or_else(|| addr.next_depth()) - }) - .filter(|addr| addr.length() < LEDGER_DEPTH); } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryRetry { address, peer_id } => { - if let Self::Pending { pending, .. } = self { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressRetry { + address, + peer_id, + } => { + if let Self::MerkleTreeSyncPending { + pending_addresses: pending, + .. + } = self + { if let Some(pending) = pending.get_mut(address) { pending .attempts @@ -63,12 +196,16 @@ impl TransitionFrontierSyncLedgerSnarkedState { } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryPending { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressPending { address, peer_id, rpc_id, } => { - let Self::Pending { pending, .. } = self else { + let Self::MerkleTreeSyncPending { + pending_addresses: pending, + .. + } = self + else { return; }; let Some(rpc_state) = pending @@ -83,12 +220,13 @@ impl TransitionFrontierSyncLedgerSnarkedState { rpc_id: *rpc_id, }; } - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryError { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError { peer_id, rpc_id, error, } => { - let Some(rpc_state) = self.peer_query_get_mut(peer_id, *rpc_id) else { + let Some(rpc_state) = self.peer_address_query_state_get_mut(peer_id, *rpc_id) + else { return; }; @@ -98,10 +236,13 @@ impl TransitionFrontierSyncLedgerSnarkedState { error: error.clone(), }; } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { - peer_id, rpc_id, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { + peer_id, + rpc_id, + .. } => { - let Some(rpc_state) = self.peer_query_get_mut(peer_id, *rpc_id) else { + let Some(rpc_state) = self.peer_address_query_state_get_mut(peer_id, *rpc_id) + else { return; }; *rpc_state = PeerRpcState::Success { @@ -109,50 +250,77 @@ impl TransitionFrontierSyncLedgerSnarkedState { rpc_id: *rpc_id, }; } - TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesAccepted { address, hashes, + previous_hashes, .. } => { - let Self::Pending { - pending, - next_addr, - end_addr, + let Self::MerkleTreeSyncPending { + queue, + pending_addresses: pending, + synced_hashes_count: num_hashes_accepted, .. } = self else { return; }; - let addr = address; - pending.remove(&addr); + + // Once hashes are accepted, we can consider this query fulfilled + pending.remove(address); + let (left, right) = hashes; + let (previous_left, previous_right) = previous_hashes; - let empty_hash = ledger_empty_hash_at_depth(addr.length() + 1); - if right == &empty_hash { - *next_addr = - Some(addr.next_depth()).filter(|addr| addr.length() < LEDGER_DEPTH); - let addr = match left == &empty_hash { - true => addr.child_left(), - false => addr.child_right(), - }; - if addr.length() > end_addr.length() - || (addr.length() == end_addr.length() - && addr.to_index() < end_addr.to_index()) - { - *end_addr = addr.prev().unwrap_or(addr); - } + // TODO(tizoc): for non-stale hashes, we can consider the full subtree + // as accepted. Given the value of `num_accounts` and the position + // in the tree we could estimate how many accounts and hashes + // from that subtree will be skipped and add them to the count. + + // Empty node hashes are not counted in the stats. + let empty = ledger_empty_hash_at_depth(address.length() + 1); + *num_hashes_accepted += (*left != empty) as u64 + (*right != empty) as u64; + + if left != previous_left { + queue.push_back(LedgerAddressQuery { + address: address.child_left(), + expected_hash: left.clone(), + }); + } + if right != previous_right { + queue.push_back(LedgerAddressQuery { + address: address.child_right(), + expected_hash: right.clone(), + }); } } - TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { - address, .. + TransitionFrontierSyncLedgerSnarkedAction::ChildHashesRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsReceived { .. } => {} + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsAccepted { + address, + count, + .. } => { - let Self::Pending { pending, .. } = self else { + let Self::MerkleTreeSyncPending { + pending_addresses: pending, + synced_accounts_count, + .. + } = self + else { return; }; + + *synced_accounts_count += count; pending.remove(address); } + TransitionFrontierSyncLedgerSnarkedAction::ChildAccountsRejected { .. } => { + // TODO(tizoc): should this be reflected in the state somehow? + } TransitionFrontierSyncLedgerSnarkedAction::Success => { - let Self::Pending { target, .. } = self else { + let Self::MerkleTreeSyncPending { target, .. } = self else { return; }; *self = Self::Success { diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs index 618efb53f..8e055dd1d 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_service.rs @@ -3,17 +3,38 @@ use mina_p2p_messages::v2::{LedgerHash, MinaBaseAccountBinableArgStableV2}; use crate::ledger::LedgerAddress; pub trait TransitionFrontierSyncLedgerSnarkedService: redux::Service { - fn hashes_set( + /// For the given ledger, compute the merkle root hash, forcing + /// all pending hashes to be computed too. + fn compute_snarked_ledger_hashes( + &mut self, + snarked_ledger_hash: &LedgerHash, + ) -> Result<(), String>; + + /// Creates a new copy of the ledger stored under the `origin` hash + /// and stores it under the `target` hash. If `overwrite` is false, + /// only copy the ledger if the target doesn't exist already. + fn copy_snarked_ledger_contents_for_sync( + &mut self, + origin: LedgerHash, + target: LedgerHash, + overwrite: bool, + ) -> Result; + + /// For the given ledger, get the two children hashes at the `parent` + /// address. + fn child_hashes_get( &mut self, snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, - hashes: (LedgerHash, LedgerHash), - ) -> Result<(), String>; + ) -> Result<(LedgerHash, LedgerHash), String>; + /// For the given ledger, sets all accounts in `accounts` under + /// the subtree starting at the `parent` address. The result + /// is the hash computed for that subtree. fn accounts_set( &mut self, snarked_ledger_hash: LedgerHash, parent: &LedgerAddress, accounts: Vec, - ) -> Result<(), ()>; + ) -> Result; } diff --git a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs index c105fee0a..1bd240001 100644 --- a/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs +++ b/node/src/transition_frontier/sync/ledger/snarked/transition_frontier_sync_ledger_snarked_state.rs @@ -1,32 +1,54 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use mina_p2p_messages::v2::LedgerHash; use redux::Timestamp; use serde::{Deserialize, Serialize}; -use crate::ledger::LedgerAddress; +use crate::ledger::{tree_height_for_num_accounts, LedgerAddress}; use crate::p2p::channels::rpc::P2pRpcId; use crate::p2p::PeerId; use crate::rpc::LedgerSyncProgress; use crate::transition_frontier::sync::ledger::SyncLedgerTarget; -use super::PeerLedgerQueryError; +use super::{PeerLedgerQueryError, ACCOUNT_SUBTREE_HEIGHT}; -static SYNC_PENDING_EMPTY: BTreeMap = BTreeMap::new(); +static SYNC_PENDING_EMPTY: BTreeMap = BTreeMap::new(); #[serde_with::serde_as] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum TransitionFrontierSyncLedgerSnarkedState { + NumAccountsPending { + time: Timestamp, + target: SyncLedgerTarget, + pending_num_accounts: LedgerNumAccountsQueryPending, + }, + NumAccountsSuccess { + time: Timestamp, + target: SyncLedgerTarget, + /// NumAccounts value accepted from peer + num_accounts: u64, + /// Hash of the subtree containing all accounts + contents_hash: LedgerHash, + }, /// Doing BFS to sync snarked ledger tree. - Pending { + MerkleTreeSyncPending { time: Timestamp, target: SyncLedgerTarget, - + /// Number of accounts in this ledger (as claimed by the Num_accounts query result) + total_accounts_expected: u64, + /// Number of accounts received and accepted so far + synced_accounts_count: u64, + /// Number of hashes received and accepted so far + synced_hashes_count: u64, + /// Queue of addresses to query and the expected contents hash + queue: VecDeque, + /// Pending ongoing address queries and their attempts #[serde_as(as = "Vec<(_, _)>")] - pending: BTreeMap, - /// `None` means we are done. - next_addr: Option, - end_addr: LedgerAddress, + pending_addresses: BTreeMap, + }, + MerkleTreeSyncSuccess { + time: Timestamp, + target: SyncLedgerTarget, }, Success { time: Timestamp, @@ -35,8 +57,20 @@ pub enum TransitionFrontierSyncLedgerSnarkedState { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LedgerQueryPending { +pub struct LedgerAddressQuery { + pub address: LedgerAddress, + pub expected_hash: LedgerHash, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LedgerAddressQueryPending { pub time: Timestamp, + pub expected_hash: LedgerHash, + pub attempts: BTreeMap, +} + +#[derive(Serialize, Deserialize, Default, Debug, Clone)] +pub struct LedgerNumAccountsQueryPending { pub attempts: BTreeMap, } @@ -68,6 +102,15 @@ impl PeerRpcState { } } + pub fn rpc_id(&self) -> Option { + match self { + Self::Init { .. } => None, + Self::Pending { rpc_id, .. } => Some(*rpc_id), + Self::Error { rpc_id, .. } => Some(*rpc_id), + Self::Success { rpc_id, .. } => Some(*rpc_id), + } + } + pub fn is_pending(&self) -> bool { matches!(self, Self::Pending { .. }) } @@ -83,19 +126,30 @@ impl PeerRpcState { impl TransitionFrontierSyncLedgerSnarkedState { pub fn pending(time: Timestamp, target: SyncLedgerTarget) -> Self { - Self::Pending { + Self::NumAccountsPending { time, target, - pending: Default::default(), - next_addr: Some(LedgerAddress::root()), - end_addr: LedgerAddress::root(), + pending_num_accounts: Default::default(), + } + } + + pub fn is_pending(&self) -> bool { + match self { + Self::NumAccountsPending { .. } + | Self::MerkleTreeSyncPending { .. } + | Self::NumAccountsSuccess { .. } + | Self::MerkleTreeSyncSuccess { .. } => true, + Self::Success { .. } => false, } } pub fn target(&self) -> &SyncLedgerTarget { match self { - Self::Pending { target, .. } => target, - Self::Success { target, .. } => target, + Self::NumAccountsPending { target, .. } + | Self::MerkleTreeSyncPending { target, .. } + | Self::NumAccountsSuccess { target, .. } + | Self::MerkleTreeSyncSuccess { target, .. } + | Self::Success { target, .. } => target, } } @@ -103,16 +157,37 @@ impl TransitionFrontierSyncLedgerSnarkedState { &self.target().snarked_ledger_hash } - pub fn fetch_pending(&self) -> Option<&BTreeMap> { + pub fn is_num_accounts_query_next(&self) -> bool { match self { - Self::Pending { pending, .. } => Some(pending), + Self::NumAccountsPending { .. } => true, + _ => false, + } + } + + pub fn num_accounts_pending(&self) -> Option<&LedgerNumAccountsQueryPending> { + match self { + Self::NumAccountsPending { + pending_num_accounts, + .. + } => Some(pending_num_accounts), _ => None, } } - pub fn sync_retry_iter(&self) -> impl '_ + Iterator { + pub fn fetch_pending(&self) -> Option<&BTreeMap> { + match self { + Self::MerkleTreeSyncPending { + pending_addresses, .. + } => Some(pending_addresses), + _ => None, + } + } + + pub fn sync_address_retry_iter(&self) -> impl '_ + Iterator { let pending = match self { - Self::Pending { pending, .. } => pending, + Self::MerkleTreeSyncPending { + pending_addresses, .. + } => pending_addresses, _ => &SYNC_PENDING_EMPTY, }; pending @@ -121,113 +196,130 @@ impl TransitionFrontierSyncLedgerSnarkedState { .map(|(addr, _)| addr.clone()) } - pub fn sync_next(&self) -> Option { + pub fn sync_address_next(&self) -> Option<(LedgerAddress, LedgerHash)> { match self { - Self::Pending { next_addr, .. } => next_addr.clone(), + Self::MerkleTreeSyncPending { queue, .. } => match queue.front().map(|a| a.clone()) { + Some(LedgerAddressQuery { + address, + expected_hash, + }) => Some((address, expected_hash)), + _ => None, + }, _ => None, } } pub fn estimation(&self) -> Option { - const BITS: usize = 35; - - if let Self::Success { .. } = self { - return Some(LedgerSyncProgress { - fetched: 1, - estimation: 1, - }); + match self { + Self::NumAccountsPending { .. } | Self::NumAccountsSuccess { .. } => None, + Self::MerkleTreeSyncPending { + total_accounts_expected, + synced_accounts_count, + synced_hashes_count, + .. + } => { + // TODO(tizoc): this approximation is very rough, could be improved. + // Also we count elements to be fetched and not request to be made which + // would be more accurate (accounts are fetched in groups of 64, hashes of 2). + let tree_height = tree_height_for_num_accounts(*total_accounts_expected); + let fill_ratio = (*total_accounts_expected as f64) / 2f64.powf(tree_height as f64); + let num_hashes_estimate = 2u64.pow((tree_height - ACCOUNT_SUBTREE_HEIGHT) as u32); + let num_hashes_estimate = (num_hashes_estimate as f64 * fill_ratio).ceil() as u64; + let fetched = *synced_accounts_count + synced_hashes_count; + let estimation = fetched.max(*total_accounts_expected + num_hashes_estimate); + + Some(LedgerSyncProgress { + fetched, + estimation, + }) + } + Self::MerkleTreeSyncSuccess { .. } | Self::Success { .. } => { + return Some(LedgerSyncProgress { + fetched: 1, + estimation: 1, + }) + } } - - let Self::Pending { - next_addr, - end_addr, - .. - } = self - else { - return None; - }; - - let next_addr = next_addr.as_ref()?; - - let current_length = next_addr.length(); - - // The ledger is a binary tree, it synchronizes layer by layer, the next layer is at most - // twice as big as this layer, but can be smaller (by one). For simplicity, let's call - // a branch or a leaf of the tree a tree element (or an element) and make no distinction. - // This doesn't matter for the estimation. Total 35 layers (0, 1, 2, ..., 34). On the last - // layer there could be 2 ^ 34 items. Of course it is much less than that. So the first - // few layers contain only 1 element. - - // When the sync algorithm asks childs on the item, it gets two values, left and right. - // The algorithm asks childs only on the existing item, so the left child must exist. - // But the right child can be missing. In this case it is marked by the special constant. - // If the sync algorithm encounters a non-existent right child, it sets `end_addr` - // to the address of the left sibling (the last existing element of this layer). - - // The `end_addr` is initialized with layer is zero and position is zero (root). - - // Let it be the first non-existent (right child). - // In extereme case it will be right sibling of root, so layer is zero and position is one. - // Therefore, further `estimated_this_layer` cannot be zero. - let estimated_end_addr = end_addr.next().unwrap_or(end_addr.clone()); - - // The chance of `end_addr` being updated during fetching the layer is 50%, so its length - // (number of layers) may be less than the current layer. Let's calculate end address - // at the current layer. - let estimated_this_layer = - estimated_end_addr.to_index().0 << (current_length - estimated_end_addr.length()); - - // The number of items on the previous layer is twice less than the number of items - // on this layer, but cannot be 0. - let prev_layers = (0..current_length) - .map(|layer| (estimated_this_layer >> (current_length - layer)).max(1)) - .sum::(); - - // Number of layers pending. - let further_layers_number = BITS - 1 - current_length; - // Assume the next layer contains twice as many, but it could be twice as many minus one. - // So the estimate may become smaller. - let estimated_next_layer = estimated_this_layer * 2; - // Sum of powers of 2 is power of 2 minus 1 - let estimated_next_layers = ((1 << further_layers_number) - 1) * estimated_next_layer; - - // We have this many elements on this layer. Add one, because address indexes start at 0. - let this_layer = next_addr.to_index().0 + 1; - - Some(LedgerSyncProgress { - fetched: prev_layers + this_layer, - estimation: prev_layers + estimated_this_layer + estimated_next_layers, - }) } - pub fn peer_query_get( + pub fn peer_num_account_query_get( &self, peer_id: &PeerId, rpc_id: P2pRpcId, - ) -> Option<(&LedgerAddress, &LedgerQueryPending)> { + ) -> Option<&LedgerNumAccountsQueryPending> { match self { - Self::Pending { pending, .. } => { + Self::NumAccountsPending { + pending_num_accounts, + .. + } => { let expected_rpc_id = rpc_id; - pending.iter().find(|(_, s)| { - s.attempts.get(peer_id).map_or(false, |s| match s { + pending_num_accounts.attempts.get(peer_id).and_then(|s| { + if s.rpc_id()? == expected_rpc_id { + Some(pending_num_accounts) + } else { + None + } + }) + } + _ => None, + } + } + + pub fn peer_num_account_query_state_get_mut( + &mut self, + peer_id: &PeerId, + rpc_id: P2pRpcId, + ) -> Option<&mut PeerRpcState> { + match self { + Self::NumAccountsPending { + pending_num_accounts, + .. + } => { + let expected_rpc_id = rpc_id; + pending_num_accounts + .attempts + .get_mut(peer_id) + .filter(|s| match s { PeerRpcState::Pending { rpc_id, .. } => *rpc_id == expected_rpc_id, PeerRpcState::Error { rpc_id, .. } => *rpc_id == expected_rpc_id, PeerRpcState::Success { rpc_id, .. } => *rpc_id == expected_rpc_id, _ => false, }) + } + _ => None, + } + } + + pub fn peer_address_query_get( + &self, + peer_id: &PeerId, + rpc_id: P2pRpcId, + ) -> Option<(&LedgerAddress, &LedgerAddressQueryPending)> { + match self { + Self::MerkleTreeSyncPending { + pending_addresses, .. + } => { + let expected_rpc_id = rpc_id; + pending_addresses.iter().find(|(_, s)| { + s.attempts + .get(peer_id) + .map_or(false, |s| s.rpc_id() == Some(expected_rpc_id)) }) } _ => None, } } - pub fn peer_query_get_mut( + pub fn peer_address_query_state_get_mut( &mut self, peer_id: &PeerId, rpc_id: P2pRpcId, ) -> Option<&mut PeerRpcState> { match self { - Self::Pending { pending, .. } => { + Self::MerkleTreeSyncPending { + pending_addresses: pending, + .. + } => { let expected_rpc_id = rpc_id; pending.iter_mut().find_map(|(_, s)| { s.attempts.get_mut(peer_id).filter(|s| match s { @@ -242,12 +334,14 @@ impl TransitionFrontierSyncLedgerSnarkedState { } } - pub fn peer_query_pending_rpc_ids<'a>( + pub fn peer_address_query_pending_rpc_ids<'a>( &'a self, peer_id: &'a PeerId, ) -> impl 'a + Iterator { let pending = match self { - Self::Pending { pending, .. } => pending, + Self::MerkleTreeSyncPending { + pending_addresses, .. + } => pending_addresses, _ => &SYNC_PENDING_EMPTY, }; pending.values().filter_map(move |s| { @@ -257,4 +351,20 @@ impl TransitionFrontierSyncLedgerSnarkedState { .and_then(|(_, s)| s.pending_rpc_id()) }) } + + pub fn peer_num_accounts_rpc_id(&self, peer_id: &PeerId) -> Option { + let pending = match self { + Self::NumAccountsPending { + pending_num_accounts, + .. + } => Some(pending_num_accounts), + _ => None, + }; + + pending? + .attempts + .iter() + .find(|(id, _)| *id == peer_id) + .and_then(|(_, s)| s.pending_rpc_id()) + } } diff --git a/node/src/transition_frontier/sync/ledger/transition_frontier_sync_ledger_state.rs b/node/src/transition_frontier/sync/ledger/transition_frontier_sync_ledger_state.rs index d5754998b..33f4b88d0 100644 --- a/node/src/transition_frontier/sync/ledger/transition_frontier_sync_ledger_state.rs +++ b/node/src/transition_frontier/sync/ledger/transition_frontier_sync_ledger_state.rs @@ -43,7 +43,7 @@ impl TransitionFrontierSyncLedgerState { pub fn is_snarked_ledger_synced(&self) -> bool { match self { Self::Init { .. } => false, - Self::Snarked(TransitionFrontierSyncLedgerSnarkedState::Pending { .. }) => false, + Self::Snarked(s) if s.is_pending() => false, _ => true, } } @@ -67,7 +67,10 @@ impl TransitionFrontierSyncLedgerState { pub fn update_target(&mut self, time: Timestamp, new_target: SyncLedgerTarget) { match self { - Self::Snarked(TransitionFrontierSyncLedgerSnarkedState::Pending { target, .. }) => { + Self::Snarked( + TransitionFrontierSyncLedgerSnarkedState::NumAccountsPending { target, .. } + | TransitionFrontierSyncLedgerSnarkedState::MerkleTreeSyncPending { target, .. }, + ) => { if target.snarked_ledger_hash == new_target.snarked_ledger_hash { *target = new_target; } else { diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index e59be6598..ffdfe2053 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -1,19 +1,21 @@ +use openmina_core::block::ArcBlockWithHash; use p2p::channels::rpc::P2pChannelsRpcAction; use redux::ActionMeta; use crate::p2p::channels::rpc::P2pRpcRequest; +use crate::service::TransitionFrontierSyncLedgerSnarkedService; use crate::transition_frontier::TransitionFrontierService; use crate::Store; use super::ledger::snarked::TransitionFrontierSyncLedgerSnarkedAction; use super::ledger::staged::TransitionFrontierSyncLedgerStagedAction; -use super::ledger::TransitionFrontierSyncLedgerAction; -use super::TransitionFrontierSyncAction; +use super::ledger::{SyncLedgerTarget, TransitionFrontierSyncLedgerAction}; +use super::{TransitionFrontierSyncAction, TransitionFrontierSyncState}; impl TransitionFrontierSyncAction { pub fn effects(&self, _: &ActionMeta, store: &mut Store) where - S: TransitionFrontierService, + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, { match self { TransitionFrontierSyncAction::Init { best_tip, .. } => { @@ -42,7 +44,11 @@ impl TransitionFrontierSyncAction { store.dispatch(TransitionFrontierSyncAction::LedgerRootPending); } } - TransitionFrontierSyncAction::BestTipUpdate { .. } => { + TransitionFrontierSyncAction::BestTipUpdate { best_tip, .. } => { + // TODO(tizoc): this is currently required because how how complicated the BestTipUpdate reducer is, + // once that is simplified this should be handled in separate actions. + maybe_copy_ledgers_for_sync(store, best_tip).unwrap(); + // if root snarked ledger changed. store.dispatch(TransitionFrontierSyncLedgerAction::Init); // if root snarked ledger stayed same but root block changed @@ -56,7 +62,18 @@ impl TransitionFrontierSyncAction { // TODO(binier): cleanup ledgers } + // TODO(tizoc): this action is never called with the current implementation, + // either remove it or figure out how to recover it as a reaction to + // `BestTipUpdate` above. Currently this logic is handled by + // `maybe_copy_ledgers_for_sync` at the end of this file. + // Same kind of applies to `LedgerNextEpochPending` and `LedgerRootPending` + // in some cases, but issue is mostly about `LedgerStakingPending` because + // it is the one most likely to be affected by the first `BestTipUpdate` + // action processed by the state machine. TransitionFrontierSyncAction::LedgerStakingPending => { + prepare_staking_epoch_ledger_for_sync(store, &sync_best_tip(store.state())) + .unwrap(); + store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerStakingSuccess => { @@ -65,12 +82,20 @@ impl TransitionFrontierSyncAction { } } TransitionFrontierSyncAction::LedgerNextEpochPending => { + prepare_next_epoch_ledger_for_sync(store, &sync_best_tip(store.state())).unwrap(); + store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerNextEpochSuccess => { store.dispatch(TransitionFrontierSyncAction::LedgerRootPending); } TransitionFrontierSyncAction::LedgerRootPending => { + prepare_transition_frontier_root_ledger_for_sync( + store, + &sync_best_tip(store.state()), + ) + .unwrap(); + store.dispatch(TransitionFrontierSyncLedgerAction::Init); } TransitionFrontierSyncAction::LedgerRootSuccess => { @@ -213,3 +238,95 @@ impl TransitionFrontierSyncAction { } } } + +// Helper functions + +/// Gets from the current state the best tip sync target +fn sync_best_tip(state: &crate::State) -> ArcBlockWithHash { + state.transition_frontier.sync.best_tip().unwrap().clone() +} + +/// For snarked ledger sync targets, copy the previous snarked ledger if required +fn maybe_copy_ledgers_for_sync( + store: &mut Store, + best_tip: &ArcBlockWithHash, +) -> Result +where + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, +{ + let sync = &store.state().transition_frontier.sync; + + match sync { + TransitionFrontierSyncState::StakingLedgerPending(_) => { + prepare_staking_epoch_ledger_for_sync(store, best_tip) + } + TransitionFrontierSyncState::NextEpochLedgerPending(_) => { + prepare_next_epoch_ledger_for_sync(store, best_tip) + } + + TransitionFrontierSyncState::RootLedgerPending(_) => { + prepare_transition_frontier_root_ledger_for_sync(store, best_tip) + } + _ => Ok(true), + } +} + +/// Copies (if necessary) the genesis ledger into the sync ledger state +/// for the staking epoch ledger to use as a starting point. +fn prepare_staking_epoch_ledger_for_sync( + store: &mut Store, + best_tip: &ArcBlockWithHash, +) -> Result +where + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, +{ + let target = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash; + let origin = best_tip.genesis_ledger_hash().clone(); + + store + .service() + .copy_snarked_ledger_contents_for_sync(origin, target, false) +} + +/// Copies (if necessary) the staking ledger into the sync ledger state +/// for the next epoch ledger to use as a starting point. +fn prepare_next_epoch_ledger_for_sync( + store: &mut Store, + best_tip: &ArcBlockWithHash, +) -> Result +where + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, +{ + let sync = &store.state().transition_frontier.sync; + let root_block = sync.root_block().unwrap(); + let Some(next_epoch_sync) = SyncLedgerTarget::next_epoch(best_tip, root_block) else { + return Ok(false); + }; + let target = next_epoch_sync.snarked_ledger_hash; + let origin = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash; + + store + .service() + .copy_snarked_ledger_contents_for_sync(origin, target, false) +} + +/// Copies (if necessary) the next epoch ledger into the sync ledger state +/// for the transition frontier root ledger to use as a starting point. +fn prepare_transition_frontier_root_ledger_for_sync( + store: &mut Store, + best_tip: &ArcBlockWithHash, +) -> Result +where + S: TransitionFrontierService + TransitionFrontierSyncLedgerSnarkedService, +{ + let sync = &store.state().transition_frontier.sync; + let root_block = sync.root_block().unwrap(); + let next_epoch_sync = SyncLedgerTarget::next_epoch(best_tip, root_block) + .unwrap_or_else(|| SyncLedgerTarget::staking_epoch(best_tip)); + let target = root_block.snarked_ledger_hash().clone(); + let origin = next_epoch_sync.snarked_ledger_hash; + + store + .service() + .copy_snarked_ledger_contents_for_sync(origin, target, false) +} diff --git a/node/src/transition_frontier/transition_frontier_effects.rs b/node/src/transition_frontier/transition_frontier_effects.rs index 04f0b5d96..bbabd0bbb 100644 --- a/node/src/transition_frontier/transition_frontier_effects.rs +++ b/node/src/transition_frontier/transition_frontier_effects.rs @@ -190,7 +190,6 @@ pub fn transition_frontier_effects( stats.syncing_block_update(state); } } - // TODO(tizoc): push new snarked roots here? } // Bootstrap/Catchup is practically complete at this point. // This effect is where the finalization part needs to be @@ -342,8 +341,9 @@ fn handle_transition_frontier_sync_ledger_action( } TransitionFrontierSyncLedgerAction::Snarked(a) => { match a { - TransitionFrontierSyncLedgerSnarkedAction::PeerQueryInit { - ref address, .. + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit { + ref address, + .. } => { if let Some(stats) = store.service.stats() { let (start, end) = (meta.time(), meta.time()); @@ -368,7 +368,7 @@ fn handle_transition_frontier_sync_ledger_action( } } } - TransitionFrontierSyncLedgerSnarkedAction::PeerQuerySuccess { + TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess { peer_id, rpc_id, ref response, @@ -382,7 +382,7 @@ fn handle_transition_frontier_sync_ledger_action( .ledger() .and_then(|s| s.snarked()) .and_then(|s| { - Some((s.target().kind, s.peer_query_get(&peer_id, rpc_id)?)) + Some((s.target().kind, s.peer_address_query_get(&peer_id, rpc_id)?)) }) .map(|(kind, (_, s))| (kind, s.time, meta.time())) { From ff2137c8cc63b468b100bd5a9b1799b685eaec63 Mon Sep 17 00:00:00 2001 From: Bruno Deferrari Date: Fri, 29 Mar 2024 12:35:46 -0300 Subject: [PATCH 3/4] chore: Update action_kind.rs --- node/src/action_kind.rs | 92 ++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 1f3a9cf30..1e672c203 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -355,13 +355,28 @@ pub enum ActionKind { TransitionFrontierSyncLedgerStakingSuccess, TransitionFrontierSyncLedgerInit, TransitionFrontierSyncLedgerSuccess, + TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted, TransitionFrontierSyncLedgerSnarkedChildAccountsReceived, + TransitionFrontierSyncLedgerSnarkedChildAccountsRejected, + TransitionFrontierSyncLedgerSnarkedChildHashesAccepted, TransitionFrontierSyncLedgerSnarkedChildHashesReceived, - TransitionFrontierSyncLedgerSnarkedPeerQueryError, - TransitionFrontierSyncLedgerSnarkedPeerQueryInit, - TransitionFrontierSyncLedgerSnarkedPeerQueryPending, - TransitionFrontierSyncLedgerSnarkedPeerQueryRetry, - TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess, + TransitionFrontierSyncLedgerSnarkedChildHashesRejected, + TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncPending, + TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncSuccess, + TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted, + TransitionFrontierSyncLedgerSnarkedNumAccountsReceived, + TransitionFrontierSyncLedgerSnarkedNumAccountsRejected, + TransitionFrontierSyncLedgerSnarkedNumAccountsSuccess, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry, + TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry, + TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess, TransitionFrontierSyncLedgerSnarkedPeersQuery, TransitionFrontierSyncLedgerSnarkedPending, TransitionFrontierSyncLedgerSnarkedSuccess, @@ -392,7 +407,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 325; + pub const COUNT: u16 = 340; } impl std::fmt::Display for ActionKind { @@ -1155,27 +1170,72 @@ impl ActionKindGet for TransitionFrontierSyncLedgerSnarkedAction { match self { Self::Pending => ActionKind::TransitionFrontierSyncLedgerSnarkedPending, Self::PeersQuery => ActionKind::TransitionFrontierSyncLedgerSnarkedPeersQuery, - Self::PeerQueryInit { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryInit + Self::PeerQueryNumAccountsInit { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsInit } - Self::PeerQueryPending { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryPending + Self::PeerQueryNumAccountsPending { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsPending } - Self::PeerQueryRetry { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryRetry + Self::PeerQueryNumAccountsRetry { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsRetry } - Self::PeerQueryError { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryError + Self::PeerQueryNumAccountsError { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsError } - Self::PeerQuerySuccess { .. } => { - ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQuerySuccess + Self::PeerQueryNumAccountsSuccess { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryNumAccountsSuccess + } + Self::NumAccountsReceived { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsReceived + } + Self::NumAccountsAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsAccepted + } + Self::NumAccountsRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsRejected + } + Self::NumAccountsSuccess { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedNumAccountsSuccess + } + Self::MerkleTreeSyncPending => { + ActionKind::TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncPending + } + Self::PeerQueryAddressInit { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit + } + Self::PeerQueryAddressPending { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending + } + Self::PeerQueryAddressRetry { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressRetry + } + Self::PeerQueryAddressError { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError + } + Self::PeerQueryAddressSuccess { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedPeerQueryAddressSuccess } Self::ChildHashesReceived { .. } => { ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesReceived } + Self::ChildHashesAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesAccepted + } + Self::ChildHashesRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildHashesRejected + } Self::ChildAccountsReceived { .. } => { ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsReceived } + Self::ChildAccountsAccepted { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsAccepted + } + Self::ChildAccountsRejected { .. } => { + ActionKind::TransitionFrontierSyncLedgerSnarkedChildAccountsRejected + } + Self::MerkleTreeSyncSuccess => { + ActionKind::TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncSuccess + } Self::Success => ActionKind::TransitionFrontierSyncLedgerSnarkedSuccess, } } From 549e60022aa682b9f48f4bea03471db20e08b638 Mon Sep 17 00:00:00 2001 From: Bruno Deferrari Date: Wed, 27 Mar 2024 10:40:25 -0300 Subject: [PATCH 4/4] chore: Update status.md --- status.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/status.md b/status.md index 6ffa03eba..a60258547 100644 --- a/status.md +++ b/status.md @@ -50,10 +50,8 @@ - [x] Snarked ledgers (staking and next epoch ledgers + transition frontier root) - [x] Handling of peer disconnections, timeouts or cases when the peer doesn't have the data - [x] Detecting ledger hash mismatches for the downloaded chunk - - [ ] Handling ledger hash mismatches gracefully, without crashing the node - - [ ] Optimized snarked ledgers synchronization (reusing previous ledgers when constructing the next during (re)synchronization) - - Not done right now, each synchronization starts from scratch. - - Tested this on a server (very quick sync) and locally (slower) and both were able to sync up to berkeleynet (it took a while locally). On mainnet, with a much bigger ledger we may not be able to sync up in time without this optimization. + - [x] Handling ledger hash mismatches gracefully, without crashing the node + - [x] Optimized snarked ledgers synchronization (reusing previous ledgers when constructing the next during (re)synchronization) - [x] Staged ledgers (transition frontier root) - [x] Handling of peer disconnections, timeouts or cases when the peer doesn't have the data - [x] Detection and handling of validation errors