Skip to content

Commit 3634849

Browse files
committed
Allow for configurable number of shards
closes orcaman#17
1 parent 65a174a commit 3634849

File tree

3 files changed

+20
-18
lines changed

3 files changed

+20
-18
lines changed

concurrent_map.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"sync"
77
)
88

9-
var SHARD_COUNT = 32
9+
const SHARD_COUNT = 32
1010

1111
// A "thread" safe map of type string:Anything.
12-
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12+
// To avoid lock bottlenecks this map is dived to several (len(m)) map shards.
1313
type ConcurrentMap []*ConcurrentMapShared
1414

1515
// A "thread" safe string to anything map.
@@ -19,9 +19,15 @@ type ConcurrentMapShared struct {
1919
}
2020

2121
// Creates a new concurrent map.
22-
func New() ConcurrentMap {
23-
m := make(ConcurrentMap, SHARD_COUNT)
24-
for i := 0; i < SHARD_COUNT; i++ {
22+
func New(shardCount ...int) ConcurrentMap {
23+
var nShards int
24+
if len(shardCount) > 0 {
25+
nShards = shardCount[0]
26+
} else {
27+
nShards = SHARD_COUNT
28+
}
29+
m := make(ConcurrentMap, nShards)
30+
for i := 0; i < nShards; i++ {
2531
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
2632
}
2733
return m
@@ -31,7 +37,7 @@ func New() ConcurrentMap {
3137
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
3238
hasher := fnv.New32()
3339
hasher.Write([]byte(key))
34-
return m[uint(hasher.Sum32())%uint(SHARD_COUNT)]
40+
return m[uint(hasher.Sum32())%uint(len(m))]
3541
}
3642

3743
func (m ConcurrentMap) MSet(data map[string]interface{}) {
@@ -79,7 +85,7 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
7985
// Returns the number of elements within the map.
8086
func (m ConcurrentMap) Count() int {
8187
count := 0
82-
for i := 0; i < SHARD_COUNT; i++ {
88+
for i := 0; i < len(m); i++ {
8389
shard := m[i]
8490
shard.RLock()
8591
count += len(shard.items)
@@ -126,7 +132,7 @@ func (m ConcurrentMap) Iter() <-chan Tuple {
126132
ch := make(chan Tuple)
127133
go func() {
128134
wg := sync.WaitGroup{}
129-
wg.Add(SHARD_COUNT)
135+
wg.Add(len(m))
130136
// Foreach shard.
131137
for _, shard := range m {
132138
go func(shard *ConcurrentMapShared) {
@@ -150,7 +156,7 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
150156
ch := make(chan Tuple, m.Count())
151157
go func() {
152158
wg := sync.WaitGroup{}
153-
wg.Add(SHARD_COUNT)
159+
wg.Add(len(m))
154160
// Foreach shard.
155161
for _, shard := range m {
156162
go func(shard *ConcurrentMapShared) {
@@ -188,7 +194,7 @@ func (m ConcurrentMap) Keys() []string {
188194
go func() {
189195
// Foreach shard.
190196
wg := sync.WaitGroup{}
191-
wg.Add(SHARD_COUNT)
197+
wg.Add(len(m))
192198
for _, shard := range m {
193199
go func(shard *ConcurrentMapShared) {
194200
// Foreach key, value pair.

concurrent_map_bench_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package cmap
33
import "testing"
44
import "strconv"
55

6+
var m ConcurrentMap
7+
68
func BenchmarkItems(b *testing.B) {
79
m := New()
810

@@ -177,10 +179,8 @@ func GetSet(m ConcurrentMap, finished chan struct{}) (set func(key, value string
177179
}
178180

179181
func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
180-
oldShardsCount := SHARD_COUNT
181-
SHARD_COUNT = shardsCount
182+
m = New(shardsCount)
182183
bench(b)
183-
SHARD_COUNT = oldShardsCount
184184
}
185185

186186
func BenchmarkKeys(b *testing.B) {

concurrent_map_test.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,8 @@ func TestConcurrent(t *testing.T) {
271271
}
272272

273273
func TestJsonMarshal(t *testing.T) {
274-
SHARD_COUNT = 2
275-
defer func() {
276-
SHARD_COUNT = 32
277-
}()
278274
expected := "{\"a\":1,\"b\":2}"
279-
m := New()
275+
m := New(2)
280276
m.Set("a", 1)
281277
m.Set("b", 2)
282278
j, err := json.Marshal(m)

0 commit comments

Comments
 (0)