Skip to content

Commit

Permalink
Limit wallet peer connections
Browse files Browse the repository at this point in the history
Added functionality to limit the number of base node peer connections that a wallet
can have, based on a config setting. The furtherest nodes will be disconnected, but
nodes on the allow list (e.g. connected base node) will be ignored.
  • Loading branch information
hansieodendaal committed May 8, 2024
1 parent 4033f07 commit 13c7cc6
Show file tree
Hide file tree
Showing 50 changed files with 585 additions and 187 deletions.
Expand Up @@ -160,7 +160,7 @@ impl WalletGrpcServer {

fn get_consensus_constants(&self) -> Result<&ConsensusConstants, WalletStorageError> {
// If we don't have the chain metadata, we hope that VNReg consensus constants did not change - worst case, we
// spend more than we need to or the the transaction is rejected.
// spend more than we need to or the transaction is rejected.
let height = self
.wallet
.db
Expand Down
Expand Up @@ -200,7 +200,7 @@ impl WalletEventMonitor {
);
match msg {
ConnectivityEvent::PeerConnected(_) |
ConnectivityEvent::PeerDisconnected(_) => {
ConnectivityEvent::PeerDisconnected(..) => {
self.trigger_peer_state_refresh().await;
},
// Only the above variants trigger state refresh
Expand Down
2 changes: 1 addition & 1 deletion base_layer/chat_ffi/src/byte_vector.rs
Expand Up @@ -100,7 +100,7 @@ pub unsafe extern "C" fn chat_byte_vector_destroy(bytes: *mut ChatByteVector) {
///
/// # Safety
/// None
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn chat_byte_vector_get_at(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Expand Up @@ -564,7 +564,7 @@ where T: ContactsBackend + 'static
fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
use ConnectivityEvent::{PeerBanned, PeerDisconnected};
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
PeerDisconnected(node_id, _) | PeerBanned(node_id) => {
if let Some(pos) = self.liveness_data.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Expand Up @@ -2407,7 +2407,7 @@ fn get_previous_timestamps<T: BlockchainBackend>(
Ok(timestamps)
}

/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip.
/// Gets all blocks ordered from the block that connects (via prev_hash) to the main chain, to the orphan tip.
#[allow(clippy::ptr_arg)]
fn get_orphan_link_main_chain<T: BlockchainBackend>(
db: &T,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Expand Up @@ -2530,9 +2530,9 @@ impl BlockchainBackend for LMDBDatabase {
}
trace!(
target: LOG_TARGET,
"Finished calculating new smt (size: {}), took: #{}s",
"Finished calculating new smt (size: {}), took: {:.2?}",
smt.size(),
start.elapsed().as_millis()
start.elapsed()
);
Ok(smt)
}
Expand Down
Expand Up @@ -35,7 +35,7 @@ use crate::transactions::{
};

/// Create a unique unspent transaction priority based on the transaction fee, maturity of the oldest input UTXO and the
/// excess_sig. The excess_sig is included to ensure the the priority key unique so it can be used with a BTreeMap.
/// excess_sig. The excess_sig is included to ensure the priority key unique so it can be used with a BTreeMap.
/// Normally, duplicate keys will be overwritten in a BTreeMap.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub struct FeePriority(Vec<u8>);
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/transactions/crypto_factories.rs
Expand Up @@ -31,7 +31,7 @@ impl CryptoFactories {
///
/// ## Parameters
///
/// * `max_proof_range`: Sets the the maximum value in range proofs, where `max = 2^max_proof_range`
/// * `max_proof_range`: Sets the maximum value in range proofs, where `max = 2^max_proof_range`
pub fn new(max_proof_range: usize) -> Self {
Self {
commitment: Arc::new(CommitmentFactory::default()),
Expand Down
Expand Up @@ -23,7 +23,7 @@
// Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License,
// Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0.

//! Encrypted data using the the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.
//! Encrypted data using the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.

use std::mem::size_of;

Expand Down
Expand Up @@ -472,7 +472,7 @@ impl SenderTransactionProtocol {
Ok((public_nonce, public_excess))
}

/// Add partial signatures, add the the recipient info to sender state and move to the Finalizing state
/// Add partial signatures, add the recipient info to sender state and move to the Finalizing state
pub async fn add_single_recipient_info<KM: TransactionKeyManagerInterface>(
&mut self,
mut rec: RecipientSignedMessage,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/backend.rs
Expand Up @@ -41,7 +41,7 @@ pub trait ArrayLike {
/// Return the item at the given index
fn get(&self, index: usize) -> Result<Option<Self::Value>, Self::Error>;

/// Remove all stored items from the the backend.
/// Remove all stored items from the backend.
fn clear(&mut self) -> Result<(), Self::Error>;

/// Finds the index of the specified stored item, it will return None if the object could not be found.
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/sparse_merkle_tree/proofs.rs
Expand Up @@ -98,7 +98,7 @@ pub struct InclusionProof<H> {
/// ```
pub struct ExclusionProof<H> {
siblings: Vec<NodeHash>,
// The terminal node of the tree proof, or `None` if the the node is `Empty`.
// The terminal node of the tree proof, or `None` if the node is `Empty`.
leaf: Option<LeafNode<H>>,
phantom: std::marker::PhantomData<H>,
}
Expand Down
5 changes: 5 additions & 0 deletions base_layer/p2p/src/initialization.rs
Expand Up @@ -562,6 +562,11 @@ impl ServiceInitializer for P2pInitializer {
network_byte: self.network.as_byte(),
user_agent: self.user_agent.clone(),
})
.with_minimize_connections(if self.config.dht.minimize_connections {
Some(self.config.dht.num_neighbouring_nodes + self.config.dht.num_random_nodes)
} else {
None
})
.set_self_liveness_check(config.listener_self_liveness_check_interval);

if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses {
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/services/liveness/service.rs
Expand Up @@ -28,6 +28,7 @@ use tari_comms::{
connectivity::{ConnectivityRequester, ConnectivitySelection},
peer_manager::NodeId,
types::CommsPublicKey,
Minimized,
PeerManager,
};
use tari_comms_dht::{
Expand Down Expand Up @@ -360,7 +361,7 @@ where
target: LOG_TARGET,
"Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures
);
conn.disconnect().await?;
conn.disconnect(Minimized::No).await?;
}
}
self.state.clear_failed_pings();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/transport.rs
Expand Up @@ -147,7 +147,7 @@ pub struct TorTransportConfig {
/// When set to true, outbound TCP connections bypass the tor proxy. Defaults to false for better privacy, setting
/// to true may improve network performance for TCP nodes.
pub proxy_bypass_for_outbound_tcp: bool,
/// If set, instructs tor to forward traffic the the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// If set, instructs tor to forward traffic the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// is used.
pub forward_address: Option<Multiaddr>,
/// If set, the listener will bind to this address instead of the forward_address.
Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/connectivity_service/service.rs
Expand Up @@ -27,6 +27,7 @@ use tari_comms::{
connectivity::{ConnectivityError, ConnectivityRequester},
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
Minimized,
PeerConnection,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
Expand Down Expand Up @@ -225,7 +226,7 @@ impl WalletConnectivityService {

async fn disconnect_base_node(&mut self, node_id: NodeId) {
if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await {
match connection.disconnect().await {
match connection.disconnect(Minimized::No).await {
Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id),
Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e),
}
Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/connectivity_service/test.rs
Expand Up @@ -34,6 +34,7 @@ use tari_comms::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_node_identity,
},
Minimized,
};
use tari_shutdown::Shutdown;
use tari_test_utils::runtime::spawn_until_shutdown;
Expand Down Expand Up @@ -177,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() {
mock_state.add_active_connection(conn.clone()).await;
// Empty out all the calls
let _result = mock_state.take_calls().await;
conn.disconnect().await.unwrap();
conn.disconnect(Minimized::No).await.unwrap();

let barrier = Arc::new(Barrier::new(2));
let pending_request = task::spawn({
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -1293,7 +1293,7 @@ where
let uo_len = uo.len();
trace!(
target: LOG_TARGET,
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})",
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {} ms)",
uo_len,
start_new.elapsed().as_millis(),
start.elapsed().as_millis(),
Expand Down Expand Up @@ -1362,7 +1362,7 @@ where
let enough_spendable = utxos_total_value > amount + fee_with_change;
trace!(
target: LOG_TARGET,
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})",
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {} ms)",
utxos.len(),
uo_len,
start_new.elapsed().as_millis(),
Expand Down
Expand Up @@ -38,6 +38,7 @@ use tari_comms::{
protocol::rpc::RpcClientLease,
traits::OrOptional,
types::CommsPublicKey,
Minimized,
PeerConnection,
};
use tari_core::{
Expand Down Expand Up @@ -193,7 +194,7 @@ where
});

if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await {
if connection.clone().disconnect().await.is_ok() {
if connection.clone().disconnect(Minimized::No).await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer);
}
}
Expand Down
31 changes: 23 additions & 8 deletions base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -27,7 +27,7 @@
//! becoming a `CompletedTransaction` with the `Completed` status. This means that the transaction has been
//! negotiated between the parties and is now ready to be broadcast to the Base Layer. The funds are still encumbered
//! as pending because the transaction has not been mined yet.
//! 3. The finalized `CompletedTransaction` will be sent back to the the receiver so that they have a copy.
//! 3. The finalized `CompletedTransaction` will be sent back to the receiver so that they have a copy.
//! 4. The wallet will broadcast the `CompletedTransaction` to a Base Node to be added to the mempool. Its status will
//! move from `Completed` to `Broadcast`.
//! 5. Wait until the transaction is mined. The `CompleteTransaction` status will then move from `Broadcast` to `Mined`
Expand Down Expand Up @@ -131,7 +131,13 @@ use tari_comms::{
transports::MemoryTransport,
types::CommsPublicKey,
};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig};
use tari_comms_dht::{
store_forward::SafConfig,
DbConnectionUrl,
DhtConfig,
DhtConnectivityConfig,
NetworkDiscoveryConfig,
};
use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact};
use tari_core::{
borsh::FromBytes,
Expand Down Expand Up @@ -818,7 +824,7 @@ pub unsafe extern "C" fn byte_vector_destroy(bytes: *mut ByteVector) {
///
/// # Safety
/// None
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn byte_vector_get_at(ptr: *mut ByteVector, position: c_uint, error_out: *mut c_int) -> c_uchar {
Expand Down Expand Up @@ -1778,7 +1784,7 @@ pub unsafe extern "C" fn unblinded_outputs_get_length(
///
/// # Safety
/// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn unblinded_outputs_get_at(
Expand Down Expand Up @@ -2884,7 +2890,7 @@ pub unsafe extern "C" fn contacts_get_length(contacts: *mut TariContacts, error_
///
/// # Safety
/// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn contacts_get_at(
Expand Down Expand Up @@ -3185,7 +3191,7 @@ pub unsafe extern "C" fn completed_transactions_get_length(
/// # Safety
/// The ```completed_transaction_destroy``` method must be called when finished with a TariCompletedTransaction to
/// prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn completed_transactions_get_at(
Expand Down Expand Up @@ -3278,7 +3284,7 @@ pub unsafe extern "C" fn pending_outbound_transactions_get_length(
/// # Safety
/// The ```pending_outbound_transaction_destroy``` method must be called when finished with a
/// TariPendingOutboundTransaction to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn pending_outbound_transactions_get_at(
Expand Down Expand Up @@ -3370,7 +3376,7 @@ pub unsafe extern "C" fn pending_inbound_transactions_get_length(
/// # Safety
/// The ```pending_inbound_transaction_destroy``` method must be called when finished with a
/// TariPendingOutboundTransaction to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn pending_inbound_transactions_get_at(
Expand Down Expand Up @@ -4851,6 +4857,9 @@ pub unsafe extern "C" fn comms_config_create(
max_concurrent_inbound_tasks: 25,
max_concurrent_outbound_tasks: 50,
dht: DhtConfig {
num_neighbouring_nodes: 6,
num_random_nodes: 2,
minimize_connections: true,
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
auto_join: true,
Expand All @@ -4861,9 +4870,15 @@ pub unsafe extern "C" fn comms_config_create(
..Default::default()
},
network_discovery: NetworkDiscoveryConfig {
min_desired_peers: 16,
initial_peer_sync_delay: Some(Duration::from_secs(25)),
..Default::default()
},
connectivity: DhtConnectivityConfig {
update_interval: Duration::from_secs(5 * 60),
minimum_desired_tcpv4_node_ratio: 0.0,
..Default::default()
},
..Default::default()
},
allow_test_addresses: true,
Expand Down
6 changes: 4 additions & 2 deletions common/config/presets/c_base_node_c.toml
Expand Up @@ -189,7 +189,7 @@ listener_self_liveness_check_interval = 15
# When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for
# better privacy
#tor.proxy_bypass_for_outbound_tcp = false
# If set, instructs tor to forward traffic the the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port)
# If set, instructs tor to forward traffic the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port)
#tor.forward_address =
# If set, the listener will bind to this address instead of the forward_address. You need to make sure that this listener is connectable from the forward_address.
#tor.listener_address_override =
Expand All @@ -213,7 +213,9 @@ database_url = "data/base_node/dht.db"
# The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8
#num_neighbouring_nodes = 8
# Number of random peers to include. Default: 4
#num_random_nodes= 4
#num_random_nodes = 4
# Connections above the configured number of neighbouring and random nodes will be removed (default: false)
#minimize_connections = false
# Send to this many peers when using the broadcast strategy. Default: 8
#broadcast_factor = 8
# Send to this many peers when using the propagate strategy. Default: 4
Expand Down

0 comments on commit 13c7cc6

Please sign in to comment.