Skip to content

Commit

Permalink
minimize dial attempts to offline addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed May 3, 2024
1 parent 5416151 commit fcf0fd8
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 31 deletions.
4 changes: 3 additions & 1 deletion base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -4870,11 +4870,13 @@ pub unsafe extern "C" fn comms_config_create(
..Default::default()
},
network_discovery: NetworkDiscoveryConfig {
min_desired_peers: 16,
initial_peer_sync_delay: Some(Duration::from_secs(25)),
..Default::default()
},
connectivity: DhtConnectivityConfig {
update_interval: Duration::from_secs(180),
update_interval: Duration::from_secs(5 * 60),
minimum_desired_tcpv4_node_ratio: 0.0,
..Default::default()
},
..Default::default()
Expand Down
12 changes: 6 additions & 6 deletions common/config/presets/d_console_wallet.toml
Expand Up @@ -262,9 +262,9 @@ database_url = "data/wallet/dht.db"
# The size of the buffer (channel) which holds pending outbound message requests. Default: 20
#outbound_buffer_size = 20
# The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8
#num_neighbouring_nodes = 8
num_neighbouring_nodes = 6
# Number of random peers to include. Default: 4
#num_random_nodes = 4
num_random_nodes = 2
# Connections above the configured number of neighbouring and random nodes will be removed (default: false)
minimize_connections = true
# Send to this many peers when using the broadcast strategy. Default: 8
Expand Down Expand Up @@ -313,21 +313,21 @@ minimize_connections = true
#join_cooldown_interval = 120 # 10 * 60

# The interval to update the neighbouring and random pools, if necessary. Default: 2 minutes
#connectivity.update_interval = 120 # 2 * 60
connectivity.update_interval = 300 # 2 * 60
# The interval to change the random pool peers. Default = 2 hours
#connectivity.random_pool_refresh_interval = 7_200 # 2 * 60 * 60
# Length of cooldown when high connection failure rates are encountered. Default: 45s
#connectivity.high_failure_rate_cooldown = 45
# The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create,
# making sybil attacks costly. This setting does not guarantee this ratio is maintained.
# Currently, it only emits a warning if the ratio is below this setting. Default: 0.1 (10%)
#connectivity.minimum_desired_tcpv4_node_ratio = 0.1
connectivity.minimum_desired_tcpv4_node_ratio = 0.0

# True to enable network discovery, false to disable it. Default: true
#network_discovery.enabled = true
# A threshold for the minimum number of peers this node should ideally be aware of. If below this threshold a
# more "aggressive" strategy is employed. Default: 50
network_discovery.min_desired_peers = 8
network_discovery.min_desired_peers = 16
# The period to wait once the number of rounds given by `idle_after_num_rounds` has completed. Default: 30 mins
#network_discovery.idle_period = 1_800 # 30 * 60
# The minimum number of network discovery rounds to perform before idling (going to sleep). If there are less
Expand All @@ -341,7 +341,7 @@ network_discovery.min_desired_peers = 8
# The maximum number of peers we allow per round of sync. (Default: 500)
#network_discovery.max_peers_to_sync_per_round = 500
# Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled)
network_discovery.initial_peer_sync_delay = 15
network_discovery.initial_peer_sync_delay = 25

# Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs
#ban_duration = 21_600 # 6 * 60 * 60
Expand Down
9 changes: 9 additions & 0 deletions comms/core/src/connection_manager/dial_state.rs
Expand Up @@ -38,6 +38,8 @@ pub struct DialState {
cancel_signal: ShutdownSignal,
/// Reply channel for a connection result
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
/// Whether to minimize connections
minimize_connections: bool,
}

impl DialState {
Expand All @@ -46,12 +48,14 @@ impl DialState {
peer: Box<Peer>,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
cancel_signal: ShutdownSignal,
minimize_connections: bool,
) -> Self {
Self {
peer,
attempts: 0,
reply_tx,
cancel_signal,
minimize_connections,
}
}

Expand All @@ -66,6 +70,11 @@ impl DialState {
self
}

/// Returns true if the connections should be minimized
pub fn get_minimize_connections(&self) -> bool {
self.minimize_connections
}

/// The number of attempts
pub fn num_attempts(&self) -> usize {
self.attempts
Expand Down
41 changes: 35 additions & 6 deletions comms/core/src/connection_manager/dialer.rs
Expand Up @@ -81,6 +81,7 @@ pub(crate) enum DialerRequest {
Dial(
Box<Peer>,
Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
bool, // minimize_connections
),
CancelPendingDial(NodeId),
NotifyNewInboundConnection(Box<PeerConnection>),
Expand Down Expand Up @@ -175,8 +176,8 @@ where
use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection};
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);
match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Dial(peer, reply_tx, minimize_connections) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx, minimize_connections);
},
CancelPendingDial(peer_id) => {
self.cancel_dial(&peer_id);
Expand Down Expand Up @@ -317,6 +318,7 @@ where
pending_dials: &mut DialFuturesUnordered,
peer: Box<Peer>,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
minimize_connections: bool,
) {
if self.is_pending_dial(&peer.node_id) {
debug!(
Expand All @@ -337,7 +339,7 @@ where

let backoff = Arc::clone(&self.backoff);

let dial_state = DialState::new(peer, reply_tx, cancel_signal);
let dial_state = DialState::new(peer, reply_tx, cancel_signal, minimize_connections);
let node_identity = Arc::clone(&self.node_identity);
let conn_man_notifier = self.conn_man_notifier.clone();
let supported_protocols = self.our_supported_protocols.clone();
Expand Down Expand Up @@ -491,7 +493,11 @@ where
config: &ConnectionManagerConfig,
) -> (DialState, DialResult<TTransport::Output>) {
// Container for dial
let mut dial_state = Some(dial_state);
let mut dial_state = {
let mut temp_state = dial_state;
temp_state.peer_mut().addresses.reset_connection_attempts();
Some(temp_state)
};
let mut transport = Some(transport);

loop {
Expand Down Expand Up @@ -539,7 +545,7 @@ where
}
}

/// Attempts to dial a peer sequentially on all addresses.
/// Attempts to dial a peer sequentially on all addresses; if connections are to be minimized only.
/// Returns ownership of the given `DialState` and a success or failure result for the dial,
/// or None if the dial was cancelled inflight
async fn dial_peer(
Expand All @@ -551,7 +557,30 @@ where
DialState,
Result<(NoiseSocket<TTransport::Output>, Multiaddr), ConnectionManagerError>,
) {
let addresses = dial_state.peer().addresses.clone().into_vec();
let addresses = if dial_state.get_minimize_connections() {
dial_state
.peer()
.addresses
.clone()
.iter()
.filter(|addr| addr.last_failed_reason().is_none())
.map(|addr| addr.address().clone())
.collect::<Vec<_>>()
} else {
dial_state.peer().addresses.clone().into_vec()
};
if addresses.is_empty() {
let node_id_hex = dial_state.peer().node_id.clone().to_hex();
debug!(
target: LOG_TARGET,
"No more contactable addresses for peer '{}'",
node_id_hex
);
return (
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
);
}
let cancel_signal = dial_state.get_cancel_signal();
for address in addresses {
debug!(
Expand Down
4 changes: 3 additions & 1 deletion comms/core/src/connection_manager/error.rs
Expand Up @@ -74,14 +74,16 @@ pub enum ConnectionManagerError {
IdentityProtocolError(#[from] IdentityProtocolError),
#[error("The dial was cancelled")]
DialCancelled,
#[error("Invalid multiaddr: {0}")]
#[error("Invalid multiaddr")]
InvalidMultiaddr(String),
#[error("Failed to send wire format byte")]
WireFormatSendFailed,
#[error("Listener oneshot cancelled")]
ListenerOneshotCancelled,
#[error("Peer validation error: {0}")]
PeerValidationError(#[from] PeerValidatorError),
#[error("No contactable addresses for peer {0} left")]
NoContactableAddressesForPeer(String),
}

impl From<yamux::ConnectionError> for ConnectionManagerError {
Expand Down
13 changes: 10 additions & 3 deletions comms/core/src/connection_manager/manager.rs
Expand Up @@ -378,11 +378,17 @@ where
use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening};
trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request);
match request {
DialPeer { node_id, reply_tx } => {
DialPeer {
node_id,
reply_tx,
minimize_connections,
} => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "connection_manager::handle_request");
span.follows_from(tracing_id);
self.dial_peer(node_id, reply_tx).instrument(span).await
self.dial_peer(node_id, reply_tx, minimize_connections)
.instrument(span)
.await
},
CancelDial(node_id) => {
if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await {
Expand Down Expand Up @@ -493,10 +499,11 @@ where
&mut self,
node_id: NodeId,
reply: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
minimize_connections: bool,
) {
match self.peer_manager.find_by_node_id(&node_id).await {
Ok(Some(peer)) => {
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply))
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply, minimize_connections))
.await;
},
Ok(None) => {
Expand Down
17 changes: 14 additions & 3 deletions comms/core/src/connection_manager/requester.rs
Expand Up @@ -37,6 +37,7 @@ pub enum ConnectionManagerRequest {
DialPeer {
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
minimize_connections: bool,
},
/// Cancels a pending dial if one exists
CancelDial(NodeId),
Expand Down Expand Up @@ -75,9 +76,14 @@ impl ConnectionManagerRequester {
}

/// Attempt to connect to a remote peer
pub async fn dial_peer(&mut self, node_id: NodeId) -> Result<PeerConnection, ConnectionManagerError> {
pub async fn dial_peer(
&mut self,
node_id: NodeId,
minimize_connections: bool,
) -> Result<PeerConnection, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_dial_peer(node_id, Some(reply_tx)).await?;
self.send_dial_peer(node_id, Some(reply_tx), minimize_connections)
.await?;
reply_rx
.await
.map_err(|_| ConnectionManagerError::ActorRequestCanceled)?
Expand All @@ -97,9 +103,14 @@ impl ConnectionManagerRequester {
&mut self,
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
minimize_connections: bool,
) -> Result<(), ConnectionManagerError> {
self.sender
.send(ConnectionManagerRequest::DialPeer { node_id, reply_tx })
.send(ConnectionManagerRequest::DialPeer {
node_id,
reply_tx,
minimize_connections,
})
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Expand Up @@ -129,7 +129,7 @@ async fn smoke() {

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false))
.await
.unwrap();

Expand Down Expand Up @@ -237,7 +237,7 @@ async fn banned() {

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false))
.await
.unwrap();

Expand Down
18 changes: 12 additions & 6 deletions comms/core/src/connection_manager/tests/manager.rs
Expand Up @@ -76,7 +76,7 @@ async fn connect_to_nonexistent_peer() {

rt_handle.spawn(connection_manager.run());

let err = requester.dial_peer(NodeId::default()).await.unwrap_err();
let err = requester.dial_peer(NodeId::default(), false).await.unwrap_err();
unpack_enum!(ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFoundError) = err);

shutdown.trigger();
Expand Down Expand Up @@ -150,7 +150,10 @@ async fn dial_success() {
.await
.unwrap();

let mut conn_out = conn_man1.dial_peer(node_identity2.node_id().clone()).await.unwrap();
let mut conn_out = conn_man1
.dial_peer(node_identity2.node_id().clone(), false)
.await
.unwrap();
assert_eq!(conn_out.peer_node_id(), node_identity2.node_id());
let peer2 = peer_manager1
.find_by_node_id(conn_out.peer_node_id())
Expand Down Expand Up @@ -272,7 +275,10 @@ async fn dial_success_aux_tcp_listener() {
);
conn_man2.wait_until_listening().await.unwrap();

let mut connection = conn_man2.dial_peer(node_identity1.node_id().clone()).await.unwrap();
let mut connection = conn_man2
.dial_peer(node_identity1.node_id().clone(), false)
.await
.unwrap();
assert_eq!(connection.peer_node_id(), node_identity1.node_id());

let mut substream_out = connection.open_substream(&TEST_PROTO).await.unwrap();
Expand Down Expand Up @@ -356,8 +362,8 @@ async fn simultaneous_dial_events() {

// Dial at the same time
let (result1, result2) = future::join(
conn_man1.dial_peer(node_identities[1].node_id().clone()),
conn_man2.dial_peer(node_identities[0].node_id().clone()),
conn_man1.dial_peer(node_identities[1].node_id().clone(), false),
conn_man2.dial_peer(node_identities[0].node_id().clone(), false),
)
.await;

Expand Down Expand Up @@ -420,7 +426,7 @@ async fn dial_cancelled() {
let node_id = node_identity2.node_id().clone();
async move {
ready_tx.send(()).unwrap();
cm.dial_peer(node_id).await
cm.dial_peer(node_id, false).await
}
});

Expand Down
10 changes: 9 additions & 1 deletion comms/core/src/connectivity/manager.rs
Expand Up @@ -164,6 +164,10 @@ impl ConnectivityManagerActor {
})
}

fn minimize_connections(&self) -> bool {
self.config.maintain_n_closest_connections_only.is_some()
}

pub async fn run(mut self) {
debug!(target: LOG_TARGET, "ConnectivityManager started");

Expand Down Expand Up @@ -335,7 +339,11 @@ impl ConnectivityManagerActor {
},
}

if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
if let Err(err) = self
.connection_manager
.send_dial_peer(node_id, reply_tx, self.minimize_connections())
.await
{
error!(
target: LOG_TARGET,
"Failed to send dial request to connection manager: {:?}", err
Expand Down
1 change: 1 addition & 0 deletions comms/core/src/net_address/multiaddr_with_stats.rs
Expand Up @@ -183,6 +183,7 @@ impl MultiaddrWithStats {
/// Reset the connection attempts on this net address for a later session of retries
pub fn reset_connection_attempts(&mut self) {
self.connection_attempts = 0;
self.last_failed_reason = None;
}

/// Mark that a connection could not be established with this net address
Expand Down
6 changes: 6 additions & 0 deletions comms/core/src/net_address/mutliaddresses_with_stats.rs
Expand Up @@ -467,9 +467,15 @@ mod test {
assert_eq!(net_addresses.addresses[0].connection_attempts(), 2);
assert_eq!(net_addresses.addresses[1].connection_attempts(), 1);
assert_eq!(net_addresses.addresses[2].connection_attempts(), 1);
assert!(net_addresses.addresses[0].last_failed_reason().is_some());
assert!(net_addresses.addresses[1].last_failed_reason().is_some());
assert!(net_addresses.addresses[2].last_failed_reason().is_some());
net_addresses.reset_connection_attempts();
assert_eq!(net_addresses.addresses[0].connection_attempts(), 0);
assert_eq!(net_addresses.addresses[1].connection_attempts(), 0);
assert_eq!(net_addresses.addresses[2].connection_attempts(), 0);
assert!(net_addresses.addresses[0].last_failed_reason().is_none());
assert!(net_addresses.addresses[1].last_failed_reason().is_none());
assert!(net_addresses.addresses[2].last_failed_reason().is_none());
}
}

0 comments on commit fcf0fd8

Please sign in to comment.