Skip to content

Commit

Permalink
Limit wallet peer connections
Browse files Browse the repository at this point in the history
Added functionality to limit the number of base node peer connections that a wallet
can have, based on a config setting. The furtherest nodes will be disconnected, but
nodes on the allow list (e.g. connected base node) will be ignored.
  • Loading branch information
hansieodendaal committed May 7, 2024
1 parent 8cbfeda commit 7a599d6
Show file tree
Hide file tree
Showing 71 changed files with 694 additions and 248 deletions.
Expand Up @@ -160,7 +160,7 @@ impl WalletGrpcServer {

fn get_consensus_constants(&self) -> Result<&ConsensusConstants, WalletStorageError> {
// If we don't have the chain metadata, we hope that VNReg consensus constants did not change - worst case, we
// spend more than we need to or the the transaction is rejected.
// spend more than we need to or the transaction is rejected.
let height = self
.wallet
.db
Expand Down
Expand Up @@ -200,7 +200,7 @@ impl WalletEventMonitor {
);
match msg {
ConnectivityEvent::PeerConnected(_) |
ConnectivityEvent::PeerDisconnected(_) => {
ConnectivityEvent::PeerDisconnected(..) => {
self.trigger_peer_state_refresh().await;
},
// Only the above variants trigger state refresh
Expand Down
Expand Up @@ -53,7 +53,7 @@ impl CommandContext {
let start = Instant::now();
println!("☎️ Dialing peer...");

match connectivity.dial_peer(dest_node_id).await {
match connectivity.dial_peer(dest_node_id, false).await {
Ok(connection) => {
println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis());
println!("Connection: {}", connection);
Expand Down
2 changes: 1 addition & 1 deletion base_layer/chat_ffi/src/byte_vector.rs
Expand Up @@ -100,7 +100,7 @@ pub unsafe extern "C" fn chat_byte_vector_destroy(bytes: *mut ChatByteVector) {
///
/// # Safety
/// None
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn chat_byte_vector_get_at(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Expand Up @@ -563,7 +563,7 @@ where T: ContactsBackend + 'static
fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
use ConnectivityEvent::{PeerBanned, PeerDisconnected};
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
PeerDisconnected(node_id, _) | PeerBanned(node_id) => {
if let Some(pos) = self.liveness_data.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
Expand Down
Expand Up @@ -217,7 +217,7 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
}

async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
let connection = self.connectivity.dial_peer(peer).await?;
let connection = self.connectivity.dial_peer(peer, false).await?;
Ok(connection)
}

Expand Down
Expand Up @@ -230,7 +230,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
Expand Up @@ -276,7 +276,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Expand Up @@ -2407,7 +2407,7 @@ fn get_previous_timestamps<T: BlockchainBackend>(
Ok(timestamps)
}

/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip.
/// Gets all blocks ordered from the block that connects (via prev_hash) to the main chain, to the orphan tip.
#[allow(clippy::ptr_arg)]
fn get_orphan_link_main_chain<T: BlockchainBackend>(
db: &T,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Expand Up @@ -2530,9 +2530,9 @@ impl BlockchainBackend for LMDBDatabase {
}
trace!(
target: LOG_TARGET,
"Finished calculating new smt (size: {}), took: #{}s",
"Finished calculating new smt (size: {}), took: {:.2?}",
smt.size(),
start.elapsed().as_millis()
start.elapsed()
);
Ok(smt)
}
Expand Down
Expand Up @@ -35,7 +35,7 @@ use crate::transactions::{
};

/// Create a unique unspent transaction priority based on the transaction fee, maturity of the oldest input UTXO and the
/// excess_sig. The excess_sig is included to ensure the the priority key unique so it can be used with a BTreeMap.
/// excess_sig. The excess_sig is included to ensure the priority key unique so it can be used with a BTreeMap.
/// Normally, duplicate keys will be overwritten in a BTreeMap.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub struct FeePriority(Vec<u8>);
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/transactions/crypto_factories.rs
Expand Up @@ -31,7 +31,7 @@ impl CryptoFactories {
///
/// ## Parameters
///
/// * `max_proof_range`: Sets the the maximum value in range proofs, where `max = 2^max_proof_range`
/// * `max_proof_range`: Sets the maximum value in range proofs, where `max = 2^max_proof_range`
pub fn new(max_proof_range: usize) -> Self {
Self {
commitment: Arc::new(CommitmentFactory::default()),
Expand Down
Expand Up @@ -23,7 +23,7 @@
// Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License,
// Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0.

//! Encrypted data using the the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.
//! Encrypted data using the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.

use std::mem::size_of;

Expand Down
Expand Up @@ -472,7 +472,7 @@ impl SenderTransactionProtocol {
Ok((public_nonce, public_excess))
}

/// Add partial signatures, add the the recipient info to sender state and move to the Finalizing state
/// Add partial signatures, add the recipient info to sender state and move to the Finalizing state
pub async fn add_single_recipient_info<KM: TransactionKeyManagerInterface>(
&mut self,
mut rec: RecipientSignedMessage,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/tests/node_service.rs
Expand Up @@ -420,7 +420,7 @@ async fn propagate_and_forward_invalid_block() {
alice_node
.comms
.connectivity()
.dial_peer(bob_node.node_identity.node_id().clone())
.dial_peer(bob_node.node_identity.node_id().clone(), false)
.await
.unwrap();
wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await;
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/backend.rs
Expand Up @@ -41,7 +41,7 @@ pub trait ArrayLike {
/// Return the item at the given index
fn get(&self, index: usize) -> Result<Option<Self::Value>, Self::Error>;

/// Remove all stored items from the the backend.
/// Remove all stored items from the backend.
fn clear(&mut self) -> Result<(), Self::Error>;

/// Finds the index of the specified stored item, it will return None if the object could not be found.
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/sparse_merkle_tree/proofs.rs
Expand Up @@ -98,7 +98,7 @@ pub struct InclusionProof<H> {
/// ```
pub struct ExclusionProof<H> {
siblings: Vec<NodeHash>,
// The terminal node of the tree proof, or `None` if the the node is `Empty`.
// The terminal node of the tree proof, or `None` if the node is `Empty`.
leaf: Option<LeafNode<H>>,
phantom: std::marker::PhantomData<H>,
}
Expand Down
5 changes: 5 additions & 0 deletions base_layer/p2p/src/initialization.rs
Expand Up @@ -559,6 +559,11 @@ impl ServiceInitializer for P2pInitializer {
network_byte: self.network.as_byte(),
user_agent: config.user_agent.clone(),
})
.with_minimize_connections(if self.config.dht.minimize_connections {
Some(self.config.dht.num_neighbouring_nodes + self.config.dht.num_random_nodes)
} else {
None
})
.set_self_liveness_check(config.listener_self_liveness_check_interval);

if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses {
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/services/liveness/service.rs
Expand Up @@ -28,6 +28,7 @@ use tari_comms::{
connectivity::{ConnectivityRequester, ConnectivitySelection},
peer_manager::NodeId,
types::CommsPublicKey,
Minimized,
PeerManager,
};
use tari_comms_dht::{
Expand Down Expand Up @@ -360,7 +361,7 @@ where
target: LOG_TARGET,
"Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures
);
conn.disconnect().await?;
conn.disconnect(Minimized::No).await?;
}
}
self.state.clear_failed_pings();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/transport.rs
Expand Up @@ -147,7 +147,7 @@ pub struct TorTransportConfig {
/// When set to true, outbound TCP connections bypass the tor proxy. Defaults to false for better privacy, setting
/// to true may improve network performance for TCP nodes.
pub proxy_bypass_for_outbound_tcp: bool,
/// If set, instructs tor to forward traffic the the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// If set, instructs tor to forward traffic the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// is used.
pub forward_address: Option<Multiaddr>,
/// If set, the listener will bind to this address instead of the forward_address.
Expand Down
5 changes: 3 additions & 2 deletions base_layer/wallet/src/connectivity_service/service.rs
Expand Up @@ -27,6 +27,7 @@ use tari_comms::{
connectivity::{ConnectivityError, ConnectivityRequester},
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
Minimized,
PeerConnection,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
Expand Down Expand Up @@ -225,7 +226,7 @@ impl WalletConnectivityService {

async fn disconnect_base_node(&mut self, node_id: NodeId) {
if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await {
match connection.disconnect().await {
match connection.disconnect(Minimized::No).await {
Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id),
Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e),
}
Expand Down Expand Up @@ -321,7 +322,7 @@ impl WalletConnectivityService {
_ = self.base_node_watch.changed() => {
Ok(None)
}
result = self.connectivity.dial_peer(peer) => {
result = self.connectivity.dial_peer(peer, false) => {
Ok(Some(result?))
}
}
Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/connectivity_service/test.rs
Expand Up @@ -34,6 +34,7 @@ use tari_comms::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_node_identity,
},
Minimized,
};
use tari_shutdown::Shutdown;
use tari_test_utils::runtime::spawn_until_shutdown;
Expand Down Expand Up @@ -177,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() {
mock_state.add_active_connection(conn.clone()).await;
// Empty out all the calls
let _result = mock_state.take_calls().await;
conn.disconnect().await.unwrap();
conn.disconnect(Minimized::No).await.unwrap();

let barrier = Arc::new(Barrier::new(2));
let pending_request = task::spawn({
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -1293,7 +1293,7 @@ where
let uo_len = uo.len();
trace!(
target: LOG_TARGET,
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})",
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {} ms)",
uo_len,
start_new.elapsed().as_millis(),
start.elapsed().as_millis(),
Expand Down Expand Up @@ -1362,7 +1362,7 @@ where
let enough_spendable = utxos_total_value > amount + fee_with_change;
trace!(
target: LOG_TARGET,
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})",
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {} ms)",
utxos.len(),
uo_len,
start_new.elapsed().as_millis(),
Expand Down
Expand Up @@ -38,6 +38,7 @@ use tari_comms::{
protocol::rpc::RpcClientLease,
traits::OrOptional,
types::CommsPublicKey,
Minimized,
PeerConnection,
};
use tari_core::{
Expand Down Expand Up @@ -182,7 +183,7 @@ where
target: LOG_TARGET,
"Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer,
);
match self.resources.comms_connectivity.dial_peer(peer.clone()).await {
match self.resources.comms_connectivity.dial_peer(peer.clone(), true).await {
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
Expand All @@ -193,7 +194,7 @@ where
});

if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await {
if connection.clone().disconnect().await.is_ok() {
if connection.clone().disconnect(Minimized::No).await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer);
}
}
Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/tests/transaction_service_tests/service.rs
Expand Up @@ -578,7 +578,7 @@ async fn manage_single_transaction() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -742,7 +742,7 @@ async fn large_interactive_transaction() {
// Verify that Alice and Bob are connected
let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -2092,15 +2092,15 @@ async fn manage_multiple_transactions() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();
sleep(Duration::from_secs(3)).await;

// Connect alice to carol
let _peer_connection = alice_comms
.connectivity()
.dial_peer(carol_node_identity.node_id().clone())
.dial_peer(carol_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down

0 comments on commit 7a599d6

Please sign in to comment.