Skip to content

Commit

Permalink
Limit wallet peer connections
Browse files Browse the repository at this point in the history
Added functionality to limit the number of base node peer connections that a wallet
can have, based on a config setting. The furtherest nodes will be disconnected.
  • Loading branch information
hansieodendaal committed Apr 24, 2024
1 parent cf579f5 commit b60fd69
Show file tree
Hide file tree
Showing 25 changed files with 234 additions and 85 deletions.
Expand Up @@ -160,7 +160,7 @@ impl WalletGrpcServer {

fn get_consensus_constants(&self) -> Result<&ConsensusConstants, WalletStorageError> {
// If we don't have the chain metadata, we hope that VNReg consensus constants did not change - worst case, we
// spend more than we need to or the the transaction is rejected.
// spend more than we need to or the transaction is rejected.
let height = self
.wallet
.db
Expand Down
2 changes: 1 addition & 1 deletion base_layer/chat_ffi/src/byte_vector.rs
Expand Up @@ -100,7 +100,7 @@ pub unsafe extern "C" fn chat_byte_vector_destroy(bytes: *mut ChatByteVector) {
///
/// # Safety
/// None
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn chat_byte_vector_get_at(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Expand Up @@ -2405,7 +2405,7 @@ fn get_previous_timestamps<T: BlockchainBackend>(
Ok(timestamps)
}

/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip.
/// Gets all blocks ordered from the block that connects (via prev_hash) to the main chain, to the orphan tip.
#[allow(clippy::ptr_arg)]
fn get_orphan_link_main_chain<T: BlockchainBackend>(
db: &T,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Expand Up @@ -2554,9 +2554,9 @@ impl BlockchainBackend for LMDBDatabase {
}
trace!(
target: LOG_TARGET,
"Finished calculating new smt (size: {}), took: #{}s",
"Finished calculating new smt (size: {}), took: {:.2?}",
smt.size(),
start.elapsed().as_millis()
start.elapsed()
);
Ok(smt)
}
Expand Down
Expand Up @@ -35,7 +35,7 @@ use crate::transactions::{
};

/// Create a unique unspent transaction priority based on the transaction fee, maturity of the oldest input UTXO and the
/// excess_sig. The excess_sig is included to ensure the the priority key unique so it can be used with a BTreeMap.
/// excess_sig. The excess_sig is included to ensure the priority key unique so it can be used with a BTreeMap.
/// Normally, duplicate keys will be overwritten in a BTreeMap.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub struct FeePriority(Vec<u8>);
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/transactions/crypto_factories.rs
Expand Up @@ -31,7 +31,7 @@ impl CryptoFactories {
///
/// ## Parameters
///
/// * `max_proof_range`: Sets the the maximum value in range proofs, where `max = 2^max_proof_range`
/// * `max_proof_range`: Sets the maximum value in range proofs, where `max = 2^max_proof_range`
pub fn new(max_proof_range: usize) -> Self {
Self {
commitment: Arc::new(CommitmentFactory::default()),
Expand Down
Expand Up @@ -23,7 +23,7 @@
// Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License,
// Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0.

//! Encrypted data using the the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.
//! Encrypted data using the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce.

use std::mem::size_of;

Expand Down
Expand Up @@ -472,7 +472,7 @@ impl SenderTransactionProtocol {
Ok((public_nonce, public_excess))
}

/// Add partial signatures, add the the recipient info to sender state and move to the Finalizing state
/// Add partial signatures, add the recipient info to sender state and move to the Finalizing state
pub async fn add_single_recipient_info<KM: TransactionKeyManagerInterface>(
&mut self,
mut rec: RecipientSignedMessage,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/backend.rs
Expand Up @@ -41,7 +41,7 @@ pub trait ArrayLike {
/// Return the item at the given index
fn get(&self, index: usize) -> Result<Option<Self::Value>, Self::Error>;

/// Remove all stored items from the the backend.
/// Remove all stored items from the backend.
fn clear(&mut self) -> Result<(), Self::Error>;

/// Finds the index of the specified stored item, it will return None if the object could not be found.
Expand Down
2 changes: 1 addition & 1 deletion base_layer/mmr/src/sparse_merkle_tree/proofs.rs
Expand Up @@ -98,7 +98,7 @@ pub struct InclusionProof<H> {
/// ```
pub struct ExclusionProof<H> {
siblings: Vec<NodeHash>,
// The terminal node of the tree proof, or `None` if the the node is `Empty`.
// The terminal node of the tree proof, or `None` if the node is `Empty`.
leaf: Option<LeafNode<H>>,
phantom: std::marker::PhantomData<H>,
}
Expand Down
5 changes: 5 additions & 0 deletions base_layer/p2p/src/initialization.rs
Expand Up @@ -559,6 +559,11 @@ impl ServiceInitializer for P2pInitializer {
network_byte: self.network.as_byte(),
user_agent: config.user_agent.clone(),
})
.with_minimize_connections(if self.config.dht.minimize_connections {
Some(self.config.dht.num_neighbouring_nodes + self.config.dht.num_random_nodes)
} else {
None
})
.set_self_liveness_check(config.listener_self_liveness_check_interval);

if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/transport.rs
Expand Up @@ -147,7 +147,7 @@ pub struct TorTransportConfig {
/// When set to true, outbound TCP connections bypass the tor proxy. Defaults to false for better privacy, setting
/// to true may improve network performance for TCP nodes.
pub proxy_bypass_for_outbound_tcp: bool,
/// If set, instructs tor to forward traffic the the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// If set, instructs tor to forward traffic the provided address. Otherwise, an OS-assigned port on 127.0.0.1
/// is used.
pub forward_address: Option<Multiaddr>,
/// If set, the listener will bind to this address instead of the forward_address.
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -1293,7 +1293,7 @@ where
let uo_len = uo.len();
trace!(
target: LOG_TARGET,
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})",
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {} ms)",
uo_len,
start_new.elapsed().as_millis(),
start.elapsed().as_millis(),
Expand Down Expand Up @@ -1362,7 +1362,7 @@ where
let enough_spendable = utxos_total_value > amount + fee_with_change;
trace!(
target: LOG_TARGET,
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})",
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {} ms)",
utxos.len(),
uo_len,
start_new.elapsed().as_millis(),
Expand Down
29 changes: 21 additions & 8 deletions base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -27,7 +27,7 @@
//! becoming a `CompletedTransaction` with the `Completed` status. This means that the transaction has been
//! negotiated between the parties and is now ready to be broadcast to the Base Layer. The funds are still encumbered
//! as pending because the transaction has not been mined yet.
//! 3. The finalized `CompletedTransaction` will be sent back to the the receiver so that they have a copy.
//! 3. The finalized `CompletedTransaction` will be sent back to the receiver so that they have a copy.
//! 4. The wallet will broadcast the `CompletedTransaction` to a Base Node to be added to the mempool. Its status will
//! move from `Completed` to `Broadcast`.
//! 5. Wait until the transaction is mined. The `CompleteTransaction` status will then move from `Broadcast` to `Mined`
Expand Down Expand Up @@ -131,7 +131,13 @@ use tari_comms::{
transports::MemoryTransport,
types::CommsPublicKey,
};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig};
use tari_comms_dht::{
store_forward::SafConfig,
DbConnectionUrl,
DhtConfig,
DhtConnectivityConfig,
NetworkDiscoveryConfig,
};
use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact};
use tari_core::{
borsh::FromBytes,
Expand Down Expand Up @@ -818,7 +824,7 @@ pub unsafe extern "C" fn byte_vector_destroy(bytes: *mut ByteVector) {
///
/// # Safety
/// None
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn byte_vector_get_at(ptr: *mut ByteVector, position: c_uint, error_out: *mut c_int) -> c_uchar {
Expand Down Expand Up @@ -1778,7 +1784,7 @@ pub unsafe extern "C" fn unblinded_outputs_get_length(
///
/// # Safety
/// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn unblinded_outputs_get_at(
Expand Down Expand Up @@ -2884,7 +2890,7 @@ pub unsafe extern "C" fn contacts_get_length(contacts: *mut TariContacts, error_
///
/// # Safety
/// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn contacts_get_at(
Expand Down Expand Up @@ -3185,7 +3191,7 @@ pub unsafe extern "C" fn completed_transactions_get_length(
/// # Safety
/// The ```completed_transaction_destroy``` method must be called when finished with a TariCompletedTransaction to
/// prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn completed_transactions_get_at(
Expand Down Expand Up @@ -3278,7 +3284,7 @@ pub unsafe extern "C" fn pending_outbound_transactions_get_length(
/// # Safety
/// The ```pending_outbound_transaction_destroy``` method must be called when finished with a
/// TariPendingOutboundTransaction to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn pending_outbound_transactions_get_at(
Expand Down Expand Up @@ -3370,7 +3376,7 @@ pub unsafe extern "C" fn pending_inbound_transactions_get_length(
/// # Safety
/// The ```pending_inbound_transaction_destroy``` method must be called when finished with a
/// TariPendingOutboundTransaction to prevent a memory leak
// converting between here is fine as its used to clamp the the array to length
// converting between here is fine as its used to clamp the array to length
#[allow(clippy::cast_possible_wrap)]
#[no_mangle]
pub unsafe extern "C" fn pending_inbound_transactions_get_at(
Expand Down Expand Up @@ -4851,6 +4857,9 @@ pub unsafe extern "C" fn comms_config_create(
max_concurrent_inbound_tasks: 25,
max_concurrent_outbound_tasks: 50,
dht: DhtConfig {
num_neighbouring_nodes: 6,
num_random_nodes: 2,
minimize_connections: true,
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
auto_join: true,
Expand All @@ -4864,6 +4873,10 @@ pub unsafe extern "C" fn comms_config_create(
initial_peer_sync_delay: Some(Duration::from_secs(25)),
..Default::default()
},
connectivity: DhtConnectivityConfig {
update_interval: Duration::from_secs(180),
..Default::default()
},
..Default::default()
},
allow_test_addresses: true,
Expand Down
6 changes: 4 additions & 2 deletions common/config/presets/c_base_node_c.toml
Expand Up @@ -192,7 +192,7 @@ listener_self_liveness_check_interval = 15
# When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for
# better privacy
#tor.proxy_bypass_for_outbound_tcp = false
# If set, instructs tor to forward traffic the the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port)
# If set, instructs tor to forward traffic the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port)
#tor.forward_address =
# If set, the listener will bind to this address instead of the forward_address. You need to make sure that this listener is connectable from the forward_address.
#tor.listener_address_override =
Expand All @@ -216,7 +216,9 @@ database_url = "data/base_node/dht.db"
# The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8
#num_neighbouring_nodes = 8
# Number of random peers to include. Default: 4
#num_random_nodes= 4
#num_random_nodes = 4
# Connections above the configured number of neighbouring and random nodes will be removed (default: false)
#minimize_connections = false
# Send to this many peers when using the broadcast strategy. Default: 8
#broadcast_factor = 8
# Send to this many peers when using the propagate strategy. Default: 4
Expand Down
6 changes: 4 additions & 2 deletions common/config/presets/d_console_wallet.toml
Expand Up @@ -242,7 +242,7 @@ event_channel_size = 3500
# When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for
# better privacy
#tor.proxy_bypass_for_outbound_tcp = false
# If set, instructs tor to forward traffic the the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = )
# If set, instructs tor to forward traffic the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = )
#tor.forward_address =

# Use a SOCKS5 proxy transport. This transport recognises any addresses supported by the proxy.
Expand All @@ -264,7 +264,9 @@ database_url = "data/wallet/dht.db"
# The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8
#num_neighbouring_nodes = 8
# Number of random peers to include. Default: 4
#num_random_nodes= 4
#num_random_nodes = 4
# Connections above the configured number of neighbouring and random nodes will be removed (default: false)
minimize_connections = true
# Send to this many peers when using the broadcast strategy. Default: 8
#broadcast_factor = 8
# Send to this many peers when using the propagate strategy. Default: 4
Expand Down
1 change: 1 addition & 0 deletions comms/core/Cargo.toml
Expand Up @@ -15,6 +15,7 @@ tari_metrics = { path = "../../infrastructure/metrics", optional = true, version
tari_storage = { path = "../../infrastructure/storage", version = "1.0.0-pre.12" }
tari_shutdown = { path = "../../infrastructure/shutdown" , version = "1.0.0-pre.12"}
tari_utilities = { version = "0.7" }
tari_common = { path = "../../common", version = "1.0.0-pre.11a" }

anyhow = "1.0.53"
async-trait = "0.1.36"
Expand Down
14 changes: 14 additions & 0 deletions comms/core/src/builder/mod.rs
Expand Up @@ -70,6 +70,7 @@ use crate::{
/// # #[tokio::main]
/// # async fn main() {
/// use std::env::temp_dir;
/// use tari_comms::connectivity::ConnectivityConfig;
///
/// use tari_storage::{
/// lmdb_store::{LMDBBuilder, LMDBConfig},
Expand Down Expand Up @@ -126,6 +127,7 @@ pub struct CommsBuilder {
connection_manager_config: ConnectionManagerConfig,
connectivity_config: ConnectivityConfig,
shutdown_signal: Option<ShutdownSignal>,
maintain_n_closest_connections_only: Option<usize>,
}

impl Default for CommsBuilder {
Expand All @@ -139,6 +141,7 @@ impl Default for CommsBuilder {
connection_manager_config: ConnectionManagerConfig::default(),
connectivity_config: ConnectivityConfig::default(),
shutdown_signal: None,
maintain_n_closest_connections_only: None,
}
}
}
Expand Down Expand Up @@ -292,6 +295,17 @@ impl CommsBuilder {
self
}

/// The closest number of peer connections to maintain; connections above the threshold will be removed
pub fn with_minimize_connections(mut self, connections: Option<usize>) -> Self {
self.maintain_n_closest_connections_only = connections;
self.connectivity_config.maintain_n_closest_connections_only = connections;
if let Some(val) = connections {
self.connectivity_config.reaper_min_connection_threshold = val;
}
self.connectivity_config.connection_pool_refresh_interval = Duration::from_secs(180);
self
}

fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
let file_lock = self.peer_storage_file_lock.take();

Expand Down
2 changes: 1 addition & 1 deletion comms/core/src/connection_manager/dialer.rs
Expand Up @@ -598,7 +598,7 @@ where

let noise_upgrade_time = timer.elapsed();
debug!(
"Dial - upgraded noise: {} on address: {} on tcp after: {}",
"Dial - upgraded noise: {} on address: {} on tcp after: {} ms",
node_id.short_str(),
moved_address,
timer.elapsed().as_millis()
Expand Down
4 changes: 4 additions & 0 deletions comms/core/src/connectivity/config.rs
Expand Up @@ -49,6 +49,9 @@ pub struct ConnectivityConfig {
/// next connection attempt.
/// Default: 24 hours
pub expire_peer_last_seen_duration: Duration,
/// The closest number of peer connections to maintain; connections above the threshold will be removed
/// (default: disabled)
pub maintain_n_closest_connections_only: Option<usize>,
}

impl Default for ConnectivityConfig {
Expand All @@ -62,6 +65,7 @@ impl Default for ConnectivityConfig {
max_failures_mark_offline: 1,
connection_tie_break_linger: Duration::from_secs(2),
expire_peer_last_seen_duration: Duration::from_secs(24 * 60 * 60),
maintain_n_closest_connections_only: None,
}
}
}

0 comments on commit b60fd69

Please sign in to comment.