Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: make bucket size configurable #5389

Open
guillaumemichel opened this issue May 15, 2024 · 4 comments · May be fixed by #5414
Open

kad: make bucket size configurable #5389

guillaumemichel opened this issue May 15, 2024 · 4 comments · May be fixed by #5414

Comments

@guillaumemichel
Copy link
Contributor

Description

The bucket size should be customizable to fit the needs of kad's users. Currently the bucket size is bound to the static K_VALUE which is a constant (20 by default).

Motivation

The bucket size is a system parameter that must be adjusted to a network's need. 20 is the default value used in the IPFS DHT, but discv5 (not using libp2p) has a bucket size of 16 for instance. The bucket size should be configurable.

Current Implementation

The current bucket size if defined to be the K_VALUE constant also used for other purposes, and the value must be manually modified to change the bucket size, which is far from ideal.

This will likely be a breaking change since the KBucketsTable::new should probably be modified to accept the bucket size as additional parameter or in a Config.

Are you planning to do it yourself in a pull request ?

Yes

@dariusc93
Copy link
Contributor

I do like the idea of having this configurable as 20 may not always be suited for every use-case (though I do understand its reasoning for being that by default).

This will likely be a breaking change since the KBucketsTable::new should probably be modified to accept the bucket size as additional parameter or in a Config.

We could have this apart of Config that would pass the value down to KBucket

@drHuangMHT
Copy link
Contributor

Another thing I would like to point out is that the fields in kad::Config are all private and require method calls to access them. We can keep them private despite the inconvenience but the methods are mixing two different styles: (self)->Self and (&mut self)->&mut Self, which I would consider to be an inconsistency.

@dariusc93
Copy link
Contributor

dariusc93 commented May 16, 2024

Theres also ArrayVec which takes a const (which uses K_VALUE to determine its max cap). Havent looked at that crate myself but my guess is we could probably switch to Vec in this case? I think the purpose of using ArrayVec was to enforce the max cap and panic if it tries to push more than its current capacity but if we switch to a configurable bucket size, not sure on how we could pass that value to ArrayVec with its design

I can imagine we could do something like the following (incomplete and untested patch and ArrayVec is still use in other places obviously but more of an example or me just putting my thoughts out there)

diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs
index 302433a8d..a861821c0 100644
--- a/protocols/kad/src/behaviour.rs
+++ b/protocols/kad/src/behaviour.rs
@@ -175,6 +175,7 @@ pub enum StoreInserts {
 #[derive(Debug, Clone)]
 pub struct Config {
     kbucket_pending_timeout: Duration,
+    k_value: NonZeroUsize,
     query_config: QueryConfig,
     protocol_config: ProtocolConfig,
     record_ttl: Option<Duration>,
@@ -218,6 +219,7 @@ impl Config {
     pub fn new(protocol_name: StreamProtocol) -> Self {
         Config {
             kbucket_pending_timeout: Duration::from_secs(60),
+            k_value: K_VALUE,
             query_config: QueryConfig::default(),
             protocol_config: ProtocolConfig::new(protocol_name),
             record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
@@ -483,7 +485,7 @@ where
         Behaviour {
             store,
             caching: config.caching,
-            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
+            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout, config.k_value),
             kbucket_inserts: config.kbucket_inserts,
             protocol_config: config.protocol_config,
             record_filtering: config.record_filtering,
diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs
index 7ed10f7f8..84a3f6690 100644
--- a/protocols/kad/src/kbucket.rs
+++ b/protocols/kad/src/kbucket.rs
@@ -78,6 +78,7 @@ pub use entry::*;
 use arrayvec::ArrayVec;
 use bucket::KBucket;
 use std::collections::VecDeque;
+use std::num::NonZeroUsize;
 use std::time::Duration;
 use web_time::Instant;
 
@@ -156,11 +157,11 @@ where
     /// The given `pending_timeout` specifies the duration after creation of
     /// a [`PendingEntry`] after which it becomes eligible for insertion into
     /// a full bucket, replacing the least-recently (dis)connected node.
-    pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self {
+    pub(crate) fn new(local_key: TKey, pending_timeout: Duration, k_value: NonZeroUsize) -> Self {
         KBucketsTable {
             local_key,
             buckets: (0..NUM_BUCKETS)
-                .map(|_| KBucket::new(pending_timeout))
+                .map(|_| KBucket::new(pending_timeout, k_value))
                 .collect(),
             applied_pending: VecDeque::new(),
         }
@@ -535,7 +536,7 @@ mod tests {
         fn arbitrary(g: &mut Gen) -> TestTable {
             let local_key = Key::from(PeerId::random());
             let timeout = Duration::from_secs(g.gen_range(1..360));
-            let mut table = TestTable::new(local_key.into(), timeout);
+            let mut table = TestTable::new(local_key.into(), timeout, K_VALUE);
             let mut num_total = g.gen_range(0..100);
             for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
                 let ix = BucketIndex(i);
@@ -560,7 +561,7 @@ mod tests {
     fn buckets_are_non_overlapping_and_exhaustive() {
         let local_key = Key::from(PeerId::random());
         let timeout = Duration::from_secs(0);
-        let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
+        let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout, K_VALUE);
 
         let mut prev_max = U256::from(0);
 
@@ -577,7 +578,7 @@ mod tests {
     fn bucket_contains_range() {
         fn prop(ix: u8) {
             let index = BucketIndex(ix as usize);
-            let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
+            let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0), K_VALUE);
             let bucket_ref = KBucketRef {
                 index,
                 bucket: &mut bucket,
@@ -623,7 +624,7 @@ mod tests {
         let local_key = Key::from(PeerId::random());
         let other_id = Key::from(PeerId::random());
 
-        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
         if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
             match entry.insert((), NodeStatus::Connected) {
                 InsertResult::Inserted => (),
@@ -641,7 +642,7 @@ mod tests {
     #[test]
     fn entry_self() {
         let local_key = Key::from(PeerId::random());
-        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
 
         assert!(table.entry(&local_key).is_none())
     }
@@ -649,7 +650,7 @@ mod tests {
     #[test]
     fn closest() {
         let local_key = Key::from(PeerId::random());
-        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
         let mut count = 0;
         loop {
             if count == 100 {
@@ -684,7 +685,7 @@ mod tests {
     #[test]
     fn applied_pending() {
         let local_key = Key::from(PeerId::random());
-        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1));
+        let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1), K_VALUE);
         let expected_applied;
         let full_bucket_index;
         loop {
diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs
index 1bd4389eb..d5e10e2e7 100644
--- a/protocols/kad/src/kbucket/bucket.rs
+++ b/protocols/kad/src/kbucket/bucket.rs
@@ -25,6 +25,8 @@
 //! > buckets in a `KBucketsTable` and hence is enforced by the public API
 //! > of the `KBucketsTable` and in particular the public `Entry` API.
 
+use std::num::NonZeroUsize;
+
 use super::*;
 pub(crate) use crate::K_VALUE;
 /// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
@@ -96,7 +98,9 @@ pub(crate) struct Position(usize);
 #[derive(Debug, Clone)]
 pub(crate) struct KBucket<TKey, TVal> {
     /// The nodes contained in the bucket.
-    nodes: ArrayVec<Node<TKey, TVal>, { K_VALUE.get() }>,
+    nodes: Vec<Node<TKey, TVal>>,
+
+    k_value: NonZeroUsize,
 
     /// The position (index) in `nodes` that marks the first connected node.
     ///
@@ -162,9 +166,10 @@ where
     TVal: Clone,
 {
     /// Creates a new `KBucket` with the given timeout for pending entries.
-    pub(crate) fn new(pending_timeout: Duration) -> Self {
+    pub(crate) fn new(pending_timeout: Duration, k_value: NonZeroUsize) -> Self {
         KBucket {
-            nodes: ArrayVec::new(),
+            nodes: Vec::with_capacity(k_value.get()),
+            k_value,
             first_connected_pos: None,
             pending: None,
             pending_timeout,
@@ -205,7 +210,7 @@ where
     pub(crate) fn apply_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
         if let Some(pending) = self.pending.take() {
             if pending.replace <= Instant::now() {
-                if self.nodes.is_full() {
+                if self.nodes.len() == self.k_value.get() {
                     if self.status(Position(0)) == NodeStatus::Connected {
                         // The bucket is full with connected nodes. Drop the pending node.
                         return None;
@@ -316,7 +321,7 @@ where
     ) -> InsertResult<TKey> {
         match status {
             NodeStatus::Connected => {
-                if self.nodes.is_full() {
+                if self.nodes.len() == self.k_value.get() {
                     if self.first_connected_pos == Some(0) || self.pending.is_some() {
                         return InsertResult::Full;
                     } else {
@@ -336,7 +341,7 @@ where
                 InsertResult::Inserted
             }
             NodeStatus::Disconnected => {
-                if self.nodes.is_full() {
+                if self.nodes.len() == self.k_value.get() {
                     return InsertResult::Full;
                 }
                 if let Some(ref mut p) = self.first_connected_pos {
@@ -435,7 +440,7 @@ mod tests {
     impl Arbitrary for KBucket<Key<PeerId>, ()> {
         fn arbitrary(g: &mut Gen) -> KBucket<Key<PeerId>, ()> {
             let timeout = Duration::from_secs(g.gen_range(1..g.size()) as u64);
-            let mut bucket = KBucket::<Key<PeerId>, ()>::new(timeout);
+            let mut bucket = KBucket::<Key<PeerId>, ()>::new(timeout, K_VALUE);
             let num_nodes = g.gen_range(1..K_VALUE.get() + 1);
             for _ in 0..num_nodes {
                 let key = Key::from(PeerId::random());
@@ -480,7 +485,7 @@ mod tests {
     #[test]
     fn ordering() {
         fn prop(status: Vec<NodeStatus>) -> bool {
-            let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+            let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
 
             // The expected lists of connected and disconnected nodes.
             let mut connected = VecDeque::new();
@@ -522,7 +527,7 @@ mod tests {
 
     #[test]
     fn full_bucket() {
-        let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+        let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
 
         // Fill the bucket with disconnected nodes.
         fill_bucket(&mut bucket, NodeStatus::Disconnected);
@@ -590,7 +595,7 @@ mod tests {
 
     #[test]
     fn full_bucket_discard_pending() {
-        let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+        let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
         fill_bucket(&mut bucket, NodeStatus::Disconnected);
         let (first, _) = bucket.iter().next().unwrap();
         let first_disconnected = first.clone();

thoughts @guillaumemichel ?

EDIT:
(Note that the base of this patch is based on #5347)

@guillaumemichel
Copy link
Contributor Author

@dariusc93 I think it makes sense to switch to Vec

pub(crate) fn new(local_key: TKey, pending_timeout: Duration, k_value: NonZeroUsize) -> Self {

We could even define a proper Config for KBucketsTable as it starts having many parameters.

I was thinking of making the bucket size independent of the K_VALUE. By default bucket size is set to K_VALUE, but it should be possible to change the bucket size only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants