Skip to content

Commit

Permalink
Merge pull request #119 from niandy/master
Browse files Browse the repository at this point in the history
Map key can be any comparable type now
  • Loading branch information
orcaman committed Nov 8, 2022
2 parents d510f1f + ac73cd6 commit 85296bc
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 80 deletions.
157 changes: 93 additions & 64 deletions concurrent_map.go
Expand Up @@ -2,36 +2,62 @@ package cmap

import (
"encoding/json"
"fmt"
"sync"
)

var SHARD_COUNT = 32

type Stringer interface {
fmt.Stringer
comparable
}

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[V any] []*ConcurrentMapShared[V]
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}

// A "thread" safe string to anything map.
type ConcurrentMapShared[V any] struct {
items map[string]V
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[V] {
m := make(ConcurrentMap[V], SHARD_COUNT)
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
}
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared[V]{items: make(map[string]V)}
m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
}
return m
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
}

// Creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
return create[K, V](strfnv32[K])
}

// Creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
return create[K, V](sharding)
}

// GetShard returns shard under given key
func (m ConcurrentMap[V]) GetShard(key string) *ConcurrentMapShared[V] {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

func (m ConcurrentMap[V]) MSet(data map[string]V) {
func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -41,7 +67,7 @@ func (m ConcurrentMap[V]) MSet(data map[string]V) {
}

// Sets the given value under the specified key.
func (m ConcurrentMap[V]) Set(key string, value V) {
func (m ConcurrentMap[K, V]) Set(key K, value V) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -56,7 +82,7 @@ func (m ConcurrentMap[V]) Set(key string, value V) {
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) {
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
Expand All @@ -67,7 +93,7 @@ func (m ConcurrentMap[V]) Upsert(key string, value V, cb UpsertCb[V]) (res V) {
}

// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool {
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -80,7 +106,7 @@ func (m ConcurrentMap[V]) SetIfAbsent(key string, value V) bool {
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap[V]) Get(key string) (V, bool) {
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
Expand All @@ -91,10 +117,10 @@ func (m ConcurrentMap[V]) Get(key string) (V, bool) {
}

// Count returns the number of elements within the map.
func (m ConcurrentMap[V]) Count() int {
func (m ConcurrentMap[K, V]) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard := m.shards[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
Expand All @@ -103,7 +129,7 @@ func (m ConcurrentMap[V]) Count() int {
}

// Looks up an item under specified key
func (m ConcurrentMap[V]) Has(key string) bool {
func (m ConcurrentMap[K, V]) Has(key K) bool {
// Get shard
shard := m.GetShard(key)
shard.RLock()
Expand All @@ -114,7 +140,7 @@ func (m ConcurrentMap[V]) Has(key string) bool {
}

// Remove removes an element from the map.
func (m ConcurrentMap[V]) Remove(key string) {
func (m ConcurrentMap[K, V]) Remove(key K) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -124,12 +150,12 @@ func (m ConcurrentMap[V]) Remove(key string) {

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[V any] func(key string, v V, exists bool) bool
type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool {
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -143,7 +169,7 @@ func (m ConcurrentMap[V]) RemoveCb(key string, cb RemoveCb[V]) bool {
}

// Pop removes an element from the map and returns it
func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) {
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
Expand All @@ -154,40 +180,40 @@ func (m ConcurrentMap[V]) Pop(key string) (v V, exists bool) {
}

// IsEmpty checks if map is empty.
func (m ConcurrentMap[V]) IsEmpty() bool {
func (m ConcurrentMap[K, V]) IsEmpty() bool {
return m.Count() == 0
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[V any] struct {
Key string
type Tuple[K comparable, V any] struct {
Key K
Val V
}

// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] {
func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] {
chans := snapshot(m)
ch := make(chan Tuple[V])
ch := make(chan Tuple[K, V])
go fanIn(chans, ch)
return ch
}

// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] {
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple[V], total)
ch := make(chan Tuple[K, V], total)
go fanIn(chans, ch)
return ch
}

// Clear removes all items from map.
func (m ConcurrentMap[V]) Clear() {
func (m ConcurrentMap[K, V]) Clear() {
for item := range m.IterBuffered() {
m.Remove(item.Key)
}
Expand All @@ -197,23 +223,23 @@ func (m ConcurrentMap[V]) Clear() {
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
//When you access map items before initializing.
if len(m) == 0 {
if len(m.shards) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
}
chans = make([]chan Tuple[V], SHARD_COUNT)
chans = make([]chan Tuple[K, V], SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared[V]) {
for index, shard := range m.shards {
go func(index int, shard *ConcurrentMapShared[K, V]) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple[V], len(shard.items))
chans[index] = make(chan Tuple[K, V], len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple[V]{key, val}
chans[index] <- Tuple[K, V]{key, val}
}
shard.RUnlock()
close(chans[index])
Expand All @@ -224,11 +250,11 @@ func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple[V]) {
go func(ch chan Tuple[K, V]) {
for t := range ch {
out <- t
}
Expand All @@ -240,8 +266,8 @@ func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
}

// Items returns all items as map[string]V
func (m ConcurrentMap[V]) Items() map[string]V {
tmp := make(map[string]V)
func (m ConcurrentMap[K, V]) Items() map[K]V {
tmp := make(map[K]V)

// Insert items to temporary map.
for item := range m.IterBuffered() {
Expand All @@ -255,13 +281,13 @@ func (m ConcurrentMap[V]) Items() map[string]V {
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[V any] func(key string, v V)
type IterCb[K comparable, V any] func(key K, v V)

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) {
for idx := range m {
shard := (m)[idx]
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
for idx := range m.shards {
shard := (m.shards)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
Expand All @@ -271,15 +297,15 @@ func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) {
}

// Keys returns all keys as []string
func (m ConcurrentMap[V]) Keys() []string {
func (m ConcurrentMap[K, V]) Keys() []K {
count := m.Count()
ch := make(chan string, count)
ch := make(chan K, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared[V]) {
for _, shard := range m.shards {
go func(shard *ConcurrentMapShared[K, V]) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
Expand All @@ -294,24 +320,27 @@ func (m ConcurrentMap[V]) Keys() []string {
}()

// Generate keys
keys := make([]string, 0, count)
keys := make([]K, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}

//Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[V]) MarshalJSON() ([]byte, error) {
// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[string]V)
tmp := make(map[K]V)

// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return json.Marshal(tmp)
}
func strfnv32[K fmt.Stringer](key K) uint32 {
return fnv32(key.String())
}

func fnv32(key string) uint32 {
hash := uint32(2166136261)
Expand All @@ -325,17 +354,17 @@ func fnv32(key string) uint32 {
}

// Reverse process of Marshal.
func (m *ConcurrentMap[V]) UnmarshalJSON(b []byte) (err error) {
tmp := make(map[string]V)

// Unmarshal into a single map.
if err := json.Unmarshal(b, &tmp); err != nil {
return err
}

// foreach key,value pair in temporary map insert into our concurrent map.
for key, val := range tmp {
m.Set(key, val)
}
func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
tmp := make(map[K]V)

// Unmarshal into a single map.
if err := json.Unmarshal(b, &tmp); err != nil {
return err
}

// foreach key,value pair in temporary map insert into our concurrent map.
for key, val := range tmp {
m.Set(key, val)
}
return nil
}

0 comments on commit 85296bc

Please sign in to comment.