Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for configurable number of shards #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 16 additions & 10 deletions concurrent_map.go
Expand Up @@ -5,10 +5,10 @@ import (
"sync"
)

var SHARD_COUNT = 32
const SHARD_COUNT = 32

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
// To avoid lock bottlenecks this map is dived to several (len(m)) map shards.
type ConcurrentMap []*ConcurrentMapShared

// A "thread" safe string to anything map.
Expand All @@ -18,9 +18,15 @@ type ConcurrentMapShared struct {
}

// Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
func New(shardCount ...int) ConcurrentMap {
var nShards int
if len(shardCount) > 0 {
nShards = shardCount[0]
} else {
nShards = SHARD_COUNT
}
m := make(ConcurrentMap, nShards)
for i := 0; i < nShards; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
Expand All @@ -29,7 +35,7 @@ func New() ConcurrentMap {
// Returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
sum := fnv32([]byte(key))
return m[uint(sum)%uint(SHARD_COUNT)]
return m[uint(sum)%uint(len(m))]
}

func (m ConcurrentMap) MSet(data map[string]interface{}) {
Expand Down Expand Up @@ -94,7 +100,7 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
for i := 0; i < len(m); i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
Expand Down Expand Up @@ -141,7 +147,7 @@ func (m ConcurrentMap) Iter() <-chan Tuple {
ch := make(chan Tuple)
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
// Foreach shard.
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
Expand All @@ -165,7 +171,7 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
ch := make(chan Tuple, m.Count())
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
// Foreach shard.
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
Expand Down Expand Up @@ -203,7 +209,7 @@ func (m ConcurrentMap) Keys() []string {
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
Expand Down
6 changes: 3 additions & 3 deletions concurrent_map_bench_test.go
Expand Up @@ -3,6 +3,8 @@ package cmap
import "testing"
import "strconv"

var m ConcurrentMap

func BenchmarkItems(b *testing.B) {
m := New()

Expand Down Expand Up @@ -177,10 +179,8 @@ func GetSet(m ConcurrentMap, finished chan struct{}) (set func(key, value string
}

func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
oldShardsCount := SHARD_COUNT
SHARD_COUNT = shardsCount
m = New(shardsCount)
bench(b)
SHARD_COUNT = oldShardsCount
}

func BenchmarkKeys(b *testing.B) {
Expand Down
6 changes: 1 addition & 5 deletions concurrent_map_test.go
Expand Up @@ -272,12 +272,8 @@ func TestConcurrent(t *testing.T) {
}

func TestJsonMarshal(t *testing.T) {
SHARD_COUNT = 2
defer func() {
SHARD_COUNT = 32
}()
expected := "{\"a\":1,\"b\":2}"
m := New()
m := New(2)
m.Set("a", 1)
m.Set("b", 2)
j, err := json.Marshal(m)
Expand Down