From 8adaaa87ecf03a9e6629e1a6ae8312cd6f811241 Mon Sep 17 00:00:00 2001 From: Andrey Ni Date: Wed, 17 Aug 2022 21:39:24 +0600 Subject: [PATCH] Map key can be any comparable type now --- concurrent_map.go | 157 +++++++++++++++++++++-------------- concurrent_map_bench_test.go | 55 +++++++++--- concurrent_map_test.go | 9 +- 3 files changed, 141 insertions(+), 80 deletions(-) diff --git a/concurrent_map.go b/concurrent_map.go index 5a7311a..baccab0 100644 --- a/concurrent_map.go +++ b/concurrent_map.go @@ -2,36 +2,62 @@ package cmap import ( "encoding/json" + "fmt" "sync" ) var SHARD_COUNT = 32 +type Stringer interface { + fmt.Stringer + comparable +} + // A "thread" safe map of type string:Anything. // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. -type ConcurrentMap[V any] []*ConcurrentMapShared[V] +type ConcurrentMap[K comparable, V any] struct { + shards []*ConcurrentMapShared[K, V] + sharding func(key K) uint32 +} // A "thread" safe string to anything map. -type ConcurrentMapShared[V any] struct { - items map[string]V +type ConcurrentMapShared[K comparable, V any] struct { + items map[K]V sync.RWMutex // Read Write mutex, guards access to internal map. } -// Creates a new concurrent map. -func New[V any]() ConcurrentMap[V] { - m := make(ConcurrentMap[V], SHARD_COUNT) +func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { + m := ConcurrentMap[K, V]{ + sharding: sharding, + shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT), + } for i := 0; i < SHARD_COUNT; i++ { - m[i] = &ConcurrentMapShared[V]{items: make(map[string]V)} + m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)} } return m } +// Creates a new concurrent map. +func New[V any]() ConcurrentMap[string, V] { + return create[string, V](fnv32) +} + +// Creates a new concurrent map. +func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] { + return create[K, V](strfnv32[K]) +} + +// Creates a new concurrent map. +func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { + return create[K, V](sharding) +} + // GetShard returns shard under given key -func (m ConcurrentMap[V]) GetShard(key string) *ConcurrentMapShared[V] { - return m[uint(fnv32(key))%uint(SHARD_COUNT)] +func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] { + return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)] } -func (m ConcurrentMap[V]) MSet(data map[string]V) { +func (m ConcurrentMap[K, V]) MSet(data map[K]V) { for key, value := range data { shard := m.GetShard(key) shard.Lock() @@ -41,7 +67,7 @@ func (m ConcurrentMap[V]) MSet(data map[string]V) { } // Sets the given value under the specified key. -func (m ConcurrentMap[V]) Set(key string, value V) { +func (m ConcurrentMap[K, V]) Set(key K, value V) { // Get map shard. shard := m.GetShard(key) shard.Lock() @@ -56,7 +82,7 @@ func (m ConcurrentMap[V]) Set(key string, value V) { type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V // Insert or Update - updates existing element or inserts a new one using UpsertCb -func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) { +func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) { shard := m.GetShard(key) shard.Lock() v, ok := shard.items[key] @@ -67,7 +93,7 @@ func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) { } // Sets the given value under the specified key if no value was associated with it. -func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool { +func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool { // Get map shard. shard := m.GetShard(key) shard.Lock() @@ -80,7 +106,7 @@ func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool { } // Get retrieves an element from map under given key. -func (m ConcurrentMap[V]) Get(key string) (V, bool) { +func (m ConcurrentMap[K, V]) Get(key K) (V, bool) { // Get shard shard := m.GetShard(key) shard.RLock() @@ -91,10 +117,10 @@ func (m ConcurrentMap[V]) Get(key string) (V, bool) { } // Count returns the number of elements within the map. -func (m ConcurrentMap[V]) Count() int { +func (m ConcurrentMap[K, V]) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { - shard := m[i] + shard := m.shards[i] shard.RLock() count += len(shard.items) shard.RUnlock() @@ -103,7 +129,7 @@ func (m ConcurrentMap[V]) Count() int { } // Looks up an item under specified key -func (m ConcurrentMap[V]) Has(key string) bool { +func (m ConcurrentMap[K, V]) Has(key K) bool { // Get shard shard := m.GetShard(key) shard.RLock() @@ -114,7 +140,7 @@ func (m ConcurrentMap[V]) Has(key string) bool { } // Remove removes an element from the map. -func (m ConcurrentMap[V]) Remove(key string) { +func (m ConcurrentMap[K, V]) Remove(key K) { // Try to get shard. shard := m.GetShard(key) shard.Lock() @@ -124,12 +150,12 @@ func (m ConcurrentMap[V]) Remove(key string) { // RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held // If returns true, the element will be removed from the map -type RemoveCb[V any] func(key string, v V, exists bool) bool +type RemoveCb[K any, V any] func(key K, v V, exists bool) bool // RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params // If callback returns true and element exists, it will remove it from the map // Returns the value returned by the callback (even if element was not present in the map) -func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool { +func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool { // Try to get shard. shard := m.GetShard(key) shard.Lock() @@ -143,7 +169,7 @@ func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool { } // Pop removes an element from the map and returns it -func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) { +func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) { // Try to get shard. shard := m.GetShard(key) shard.Lock() @@ -154,40 +180,40 @@ func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) { } // IsEmpty checks if map is empty. -func (m ConcurrentMap[V]) IsEmpty() bool { +func (m ConcurrentMap[K, V]) IsEmpty() bool { return m.Count() == 0 } // Used by the Iter & IterBuffered functions to wrap two variables together over a channel, -type Tuple[V any] struct { - Key string +type Tuple[K comparable, V any] struct { + Key K Val V } // Iter returns an iterator which could be used in a for range loop. // // Deprecated: using IterBuffered() will get a better performence -func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] { +func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] { chans := snapshot(m) - ch := make(chan Tuple[V]) + ch := make(chan Tuple[K, V]) go fanIn(chans, ch) return ch } // IterBuffered returns a buffered iterator which could be used in a for range loop. -func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] { +func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] { chans := snapshot(m) total := 0 for _, c := range chans { total += cap(c) } - ch := make(chan Tuple[V], total) + ch := make(chan Tuple[K, V], total) go fanIn(chans, ch) return ch } // Clear removes all items from map. -func (m ConcurrentMap[V]) Clear() { +func (m ConcurrentMap[K, V]) Clear() { for item := range m.IterBuffered() { m.Remove(item.Key) } @@ -197,23 +223,23 @@ func (m ConcurrentMap[V]) Clear() { // which likely takes a snapshot of `m`. // It returns once the size of each buffered channel is determined, // before all the channels are populated using goroutines. -func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) { +func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) { //When you access map items before initializing. - if len(m) == 0 { + if len(m.shards) == 0 { panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`) } - chans = make([]chan Tuple[V], SHARD_COUNT) + chans = make([]chan Tuple[K, V], SHARD_COUNT) wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) // Foreach shard. - for index, shard := range m { - go func(index int, shard *ConcurrentMapShared[V]) { + for index, shard := range m.shards { + go func(index int, shard *ConcurrentMapShared[K, V]) { // Foreach key, value pair. shard.RLock() - chans[index] = make(chan Tuple[V], len(shard.items)) + chans[index] = make(chan Tuple[K, V], len(shard.items)) wg.Done() for key, val := range shard.items { - chans[index] <- Tuple[V]{key, val} + chans[index] <- Tuple[K, V]{key, val} } shard.RUnlock() close(chans[index]) @@ -224,11 +250,11 @@ func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) { } // fanIn reads elements from channels `chans` into channel `out` -func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) { +func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) { wg := sync.WaitGroup{} wg.Add(len(chans)) for _, ch := range chans { - go func(ch chan Tuple[V]) { + go func(ch chan Tuple[K, V]) { for t := range ch { out <- t } @@ -240,8 +266,8 @@ func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) { } // Items returns all items as map[string]V -func (m ConcurrentMap[V]) Items() map[string]V { - tmp := make(map[string]V) +func (m ConcurrentMap[K, V]) Items() map[K]V { + tmp := make(map[K]V) // Insert items to temporary map. for item := range m.IterBuffered() { @@ -255,13 +281,13 @@ func (m ConcurrentMap[V]) Items() map[string]V { // maps. RLock is held for all calls for a given shard // therefore callback sess consistent view of a shard, // but not across the shards -type IterCb[V any] func(key string, v V) +type IterCb[K comparable, V any] func(key K, v V) // Callback based iterator, cheapest way to read // all elements in a map. -func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) { - for idx := range m { - shard := (m)[idx] +func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) { + for idx := range m.shards { + shard := (m.shards)[idx] shard.RLock() for key, value := range shard.items { fn(key, value) @@ -271,15 +297,15 @@ func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) { } // Keys returns all keys as []string -func (m ConcurrentMap[V]) Keys() []string { +func (m ConcurrentMap[K, V]) Keys() []K { count := m.Count() - ch := make(chan string, count) + ch := make(chan K, count) go func() { // Foreach shard. wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) - for _, shard := range m { - go func(shard *ConcurrentMapShared[V]) { + for _, shard := range m.shards { + go func(shard *ConcurrentMapShared[K, V]) { // Foreach key, value pair. shard.RLock() for key := range shard.items { @@ -294,17 +320,17 @@ func (m ConcurrentMap[V]) Keys() []string { }() // Generate keys - keys := make([]string, 0, count) + keys := make([]K, 0, count) for k := range ch { keys = append(keys, k) } return keys } -//Reviles ConcurrentMap "private" variables to json marshal. -func (m ConcurrentMap[V]) MarshalJSON() ([]byte, error) { +// Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) { // Create a temporary map, which will hold all item spread across shards. - tmp := make(map[string]V) + tmp := make(map[K]V) // Insert items to temporary map. for item := range m.IterBuffered() { @@ -312,6 +338,9 @@ func (m ConcurrentMap[V]) MarshalJSON() ([]byte, error) { } return json.Marshal(tmp) } +func strfnv32[K fmt.Stringer](key K) uint32 { + return fnv32(key.String()) +} func fnv32(key string) uint32 { hash := uint32(2166136261) @@ -325,17 +354,17 @@ func fnv32(key string) uint32 { } // Reverse process of Marshal. -func (m *ConcurrentMap[V]) UnmarshalJSON(b []byte) (err error) { - tmp := make(map[string]V) - - // Unmarshal into a single map. - if err := json.Unmarshal(b, &tmp); err != nil { - return err - } - - // foreach key,value pair in temporary map insert into our concurrent map. - for key, val := range tmp { - m.Set(key, val) - } +func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) { + tmp := make(map[K]V) + + // Unmarshal into a single map. + if err := json.Unmarshal(b, &tmp); err != nil { + return err + } + + // foreach key,value pair in temporary map insert into our concurrent map. + for key, val := range tmp { + m.Set(key, val) + } return nil } diff --git a/concurrent_map_bench_test.go b/concurrent_map_bench_test.go index 80f8978..3c71063 100644 --- a/concurrent_map_bench_test.go +++ b/concurrent_map_bench_test.go @@ -6,6 +6,12 @@ import ( "testing" ) +type Integer int + +func (i Integer) String() string { + return strconv.Itoa(int(i)) +} + func BenchmarkItems(b *testing.B) { m := New[Animal]() @@ -18,6 +24,33 @@ func BenchmarkItems(b *testing.B) { } } +func BenchmarkItemsInteger(b *testing.B) { + m := NewStringer[Integer, Animal]() + + // Insert 100 elements. + for i := 0; i < 10000; i++ { + m.Set((Integer)(i), Animal{strconv.Itoa(i)}) + } + for i := 0; i < b.N; i++ { + m.Items() + } +} +func directSharding(key uint32) uint32 { + return key +} + +func BenchmarkItemsInt(b *testing.B) { + m := NewWithCustomShardingFunction[uint32, Animal](directSharding) + + // Insert 100 elements. + for i := 0; i < 10000; i++ { + m.Set((uint32)(i), Animal{strconv.Itoa(i)}) + } + for i := 0; i < b.N; i++ { + m.Items() + } +} + func BenchmarkMarshalJson(b *testing.B) { m := New[Animal]() @@ -89,7 +122,7 @@ func benchmarkMultiInsertDifferent(b *testing.B) { func BenchmarkMultiInsertDifferentSyncMap(b *testing.B) { var m sync.Map finished := make(chan struct{}, b.N) - _, set := GetSetSyncMap[string](&m, finished) + _, set := GetSetSyncMap[string, string](&m, finished) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -130,7 +163,7 @@ func BenchmarkMultiInsertSame(b *testing.B) { func BenchmarkMultiInsertSameSyncMap(b *testing.B) { var m sync.Map finished := make(chan struct{}, b.N) - _, set := GetSetSyncMap[string](&m, finished) + _, set := GetSetSyncMap[string, string](&m, finished) m.Store("key", "value") b.ResetTimer() for i := 0; i < b.N; i++ { @@ -158,7 +191,7 @@ func BenchmarkMultiGetSame(b *testing.B) { func BenchmarkMultiGetSameSyncMap(b *testing.B) { var m sync.Map finished := make(chan struct{}, b.N) - get, _ := GetSetSyncMap[string](&m, finished) + get, _ := GetSetSyncMap[string, string](&m, finished) m.Store("key", "value") b.ResetTimer() for i := 0; i < b.N; i++ { @@ -187,7 +220,7 @@ func benchmarkMultiGetSetDifferent(b *testing.B) { func BenchmarkMultiGetSetDifferentSyncMap(b *testing.B) { var m sync.Map finished := make(chan struct{}, 2*b.N) - get, set := GetSetSyncMap[string](&m, finished) + get, set := GetSetSyncMap[string, string](&m, finished) m.Store("-1", "value") b.ResetTimer() for i := 0; i < b.N; i++ { @@ -232,7 +265,7 @@ func benchmarkMultiGetSetBlock(b *testing.B) { func BenchmarkMultiGetSetBlockSyncMap(b *testing.B) { var m sync.Map finished := make(chan struct{}, 2*b.N) - get, set := GetSetSyncMap[string](&m, finished) + get, set := GetSetSyncMap[string, string](&m, finished) for i := 0; i < b.N; i++ { m.Store(strconv.Itoa(i%100), "value") } @@ -259,13 +292,13 @@ func BenchmarkMultiGetSetBlock_256_Shard(b *testing.B) { runWithShards(benchmarkMultiGetSetBlock, b, 256) } -func GetSet[V any](m ConcurrentMap[V], finished chan struct{}) (set func(key string, value V), get func(key string, value V)) { - return func(key string, value V) { +func GetSet[K comparable, V any](m ConcurrentMap[K, V], finished chan struct{}) (set func(key K, value V), get func(key K, value V)) { + return func(key K, value V) { for i := 0; i < 10; i++ { m.Get(key) } finished <- struct{}{} - }, func(key string, value V) { + }, func(key K, value V) { for i := 0; i < 10; i++ { m.Set(key, value) } @@ -273,14 +306,14 @@ func GetSet[V any](m ConcurrentMap[V], finished chan struct{}) (set func(key str } } -func GetSetSyncMap[V any](m *sync.Map, finished chan struct{}) (get func(key string, value V), set func(key string, value V)) { - get = func(key string, value V) { +func GetSetSyncMap[K comparable, V any](m *sync.Map, finished chan struct{}) (get func(key K, value V), set func(key K, value V)) { + get = func(key K, value V) { for i := 0; i < 10; i++ { m.Load(key) } finished <- struct{}{} } - set = func(key string, value V) { + set = func(key K, value V) { for i := 0; i < 10; i++ { m.Store(key, value) } diff --git a/concurrent_map_test.go b/concurrent_map_test.go index 78753de..4e5d52e 100644 --- a/concurrent_map_test.go +++ b/concurrent_map_test.go @@ -14,7 +14,7 @@ type Animal struct { func TestMapCreation(t *testing.T) { m := New[string]() - if m == nil { + if m.shards == nil { t.Error("map is null.") } @@ -454,8 +454,8 @@ func TestKeys(t *testing.T) { func TestMInsert(t *testing.T) { animals := map[string]Animal{ - "elephant": Animal{"elephant"}, - "monkey": Animal{"monkey"}, + "elephant": {"elephant"}, + "monkey": {"monkey"}, } m := New[Animal]() m.MSet(animals) @@ -526,7 +526,7 @@ func TestKeysWhenRemoving(t *testing.T) { // Remove 10 elements concurrently. Num := 10 for i := 0; i < Num; i++ { - go func(c *ConcurrentMap[Animal], n int) { + go func(c *ConcurrentMap[string, Animal], n int) { c.Remove(strconv.Itoa(n)) }(&m, i) } @@ -538,7 +538,6 @@ func TestKeysWhenRemoving(t *testing.T) { } } -// func TestUnDrainedIter(t *testing.T) { m := New[Animal]() // Insert 100 elements.