@@ -9,29 +9,29 @@ var SHARD_COUNT = 32
9
9
10
10
// A "thread" safe map of type string:Anything.
11
11
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12
- type ConcurrentMap []* ConcurrentMapShared
12
+ type ConcurrentMap [ V any ] []* ConcurrentMapShared [ V ]
13
13
14
14
// A "thread" safe string to anything map.
15
- type ConcurrentMapShared struct {
16
- items map [string ]interface {}
15
+ type ConcurrentMapShared [ V any ] struct {
16
+ items map [string ]V
17
17
sync.RWMutex // Read Write mutex, guards access to internal map.
18
18
}
19
19
20
20
// Creates a new concurrent map.
21
- func New () ConcurrentMap {
22
- m := make (ConcurrentMap , SHARD_COUNT )
21
+ func New [ V any ] () ConcurrentMap [ V ] {
22
+ m := make (ConcurrentMap [ V ] , SHARD_COUNT )
23
23
for i := 0 ; i < SHARD_COUNT ; i ++ {
24
- m [i ] = & ConcurrentMapShared {items : make (map [string ]interface {} )}
24
+ m [i ] = & ConcurrentMapShared [ V ] {items : make (map [string ]V )}
25
25
}
26
26
return m
27
27
}
28
28
29
29
// GetShard returns shard under given key
30
- func (m ConcurrentMap ) GetShard (key string ) * ConcurrentMapShared {
30
+ func (m ConcurrentMap [ V ] ) GetShard (key string ) * ConcurrentMapShared [ V ] {
31
31
return m [uint (fnv32 (key ))% uint (SHARD_COUNT )]
32
32
}
33
33
34
- func (m ConcurrentMap ) MSet (data map [string ]interface {} ) {
34
+ func (m ConcurrentMap [ V ] ) MSet (data map [string ]V ) {
35
35
for key , value := range data {
36
36
shard := m .GetShard (key )
37
37
shard .Lock ()
@@ -41,7 +41,7 @@ func (m ConcurrentMap) MSet(data map[string]interface{}) {
41
41
}
42
42
43
43
// Sets the given value under the specified key.
44
- func (m ConcurrentMap ) Set (key string , value interface {} ) {
44
+ func (m ConcurrentMap [ V ] ) Set (key string , value V ) {
45
45
// Get map shard.
46
46
shard := m .GetShard (key )
47
47
shard .Lock ()
@@ -53,10 +53,10 @@ func (m ConcurrentMap) Set(key string, value interface{}) {
53
53
// It is called while lock is held, therefore it MUST NOT
54
54
// try to access other keys in same map, as it can lead to deadlock since
55
55
// Go sync.RWLock is not reentrant
56
- type UpsertCb func (exist bool , valueInMap interface {} , newValue interface {}) interface {}
56
+ type UpsertCb [ V any ] func (exist bool , valueInMap V , newValue V ) V
57
57
58
58
// Insert or Update - updates existing element or inserts a new one using UpsertCb
59
- func (m ConcurrentMap ) Upsert (key string , value interface {} , cb UpsertCb ) (res interface {} ) {
59
+ func (m ConcurrentMap [ V ] ) Upsert (key string , value V , cb UpsertCb [ V ] ) (res V ) {
60
60
shard := m .GetShard (key )
61
61
shard .Lock ()
62
62
v , ok := shard .items [key ]
@@ -67,7 +67,7 @@ func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res i
67
67
}
68
68
69
69
// Sets the given value under the specified key if no value was associated with it.
70
- func (m ConcurrentMap ) SetIfAbsent (key string , value interface {} ) bool {
70
+ func (m ConcurrentMap [ V ] ) SetIfAbsent (key string , value V ) bool {
71
71
// Get map shard.
72
72
shard := m .GetShard (key )
73
73
shard .Lock ()
@@ -80,7 +80,7 @@ func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
80
80
}
81
81
82
82
// Get retrieves an element from map under given key.
83
- func (m ConcurrentMap ) Get (key string ) (interface {} , bool ) {
83
+ func (m ConcurrentMap [ V ] ) Get (key string ) (V , bool ) {
84
84
// Get shard
85
85
shard := m .GetShard (key )
86
86
shard .RLock ()
@@ -91,7 +91,7 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
91
91
}
92
92
93
93
// Count returns the number of elements within the map.
94
- func (m ConcurrentMap ) Count () int {
94
+ func (m ConcurrentMap [ V ] ) Count () int {
95
95
count := 0
96
96
for i := 0 ; i < SHARD_COUNT ; i ++ {
97
97
shard := m [i ]
@@ -103,7 +103,7 @@ func (m ConcurrentMap) Count() int {
103
103
}
104
104
105
105
// Looks up an item under specified key
106
- func (m ConcurrentMap ) Has (key string ) bool {
106
+ func (m ConcurrentMap [ V ] ) Has (key string ) bool {
107
107
// Get shard
108
108
shard := m .GetShard (key )
109
109
shard .RLock ()
@@ -114,7 +114,7 @@ func (m ConcurrentMap) Has(key string) bool {
114
114
}
115
115
116
116
// Remove removes an element from the map.
117
- func (m ConcurrentMap ) Remove (key string ) {
117
+ func (m ConcurrentMap [ V ] ) Remove (key string ) {
118
118
// Try to get shard.
119
119
shard := m .GetShard (key )
120
120
shard .Lock ()
@@ -124,12 +124,12 @@ func (m ConcurrentMap) Remove(key string) {
124
124
125
125
// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
126
126
// If returns true, the element will be removed from the map
127
- type RemoveCb func (key string , v interface {} , exists bool ) bool
127
+ type RemoveCb [ V any ] func (key string , v V , exists bool ) bool
128
128
129
129
// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
130
130
// If callback returns true and element exists, it will remove it from the map
131
131
// Returns the value returned by the callback (even if element was not present in the map)
132
- func (m ConcurrentMap ) RemoveCb (key string , cb RemoveCb ) bool {
132
+ func (m ConcurrentMap [ V ] ) RemoveCb (key string , cb RemoveCb [ V ] ) bool {
133
133
// Try to get shard.
134
134
shard := m .GetShard (key )
135
135
shard .Lock ()
@@ -143,7 +143,7 @@ func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
143
143
}
144
144
145
145
// Pop removes an element from the map and returns it
146
- func (m ConcurrentMap ) Pop (key string ) (v interface {} , exists bool ) {
146
+ func (m ConcurrentMap [ V ] ) Pop (key string ) (v V , exists bool ) {
147
147
// Try to get shard.
148
148
shard := m .GetShard (key )
149
149
shard .Lock ()
@@ -154,40 +154,40 @@ func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
154
154
}
155
155
156
156
// IsEmpty checks if map is empty.
157
- func (m ConcurrentMap ) IsEmpty () bool {
157
+ func (m ConcurrentMap [ V ] ) IsEmpty () bool {
158
158
return m .Count () == 0
159
159
}
160
160
161
161
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
162
- type Tuple struct {
162
+ type Tuple [ V any ] struct {
163
163
Key string
164
- Val interface {}
164
+ Val V
165
165
}
166
166
167
167
// Iter returns an iterator which could be used in a for range loop.
168
168
//
169
169
// Deprecated: using IterBuffered() will get a better performence
170
- func (m ConcurrentMap ) Iter () <- chan Tuple {
170
+ func (m ConcurrentMap [ V ] ) Iter () <- chan Tuple [ V ] {
171
171
chans := snapshot (m )
172
- ch := make (chan Tuple )
172
+ ch := make (chan Tuple [ V ] )
173
173
go fanIn (chans , ch )
174
174
return ch
175
175
}
176
176
177
177
// IterBuffered returns a buffered iterator which could be used in a for range loop.
178
- func (m ConcurrentMap ) IterBuffered () <- chan Tuple {
178
+ func (m ConcurrentMap [ V ] ) IterBuffered () <- chan Tuple [ V ] {
179
179
chans := snapshot (m )
180
180
total := 0
181
181
for _ , c := range chans {
182
182
total += cap (c )
183
183
}
184
- ch := make (chan Tuple , total )
184
+ ch := make (chan Tuple [ V ] , total )
185
185
go fanIn (chans , ch )
186
186
return ch
187
187
}
188
188
189
189
// Clear removes all items from map.
190
- func (m ConcurrentMap ) Clear () {
190
+ func (m ConcurrentMap [ V ] ) Clear () {
191
191
for item := range m .IterBuffered () {
192
192
m .Remove (item .Key )
193
193
}
@@ -197,23 +197,23 @@ func (m ConcurrentMap) Clear() {
197
197
// which likely takes a snapshot of `m`.
198
198
// It returns once the size of each buffered channel is determined,
199
199
// before all the channels are populated using goroutines.
200
- func snapshot (m ConcurrentMap ) (chans []chan Tuple ) {
200
+ func snapshot [ V any ] (m ConcurrentMap [ V ] ) (chans []chan Tuple [ V ] ) {
201
201
//When you access map items before initializing.
202
- if len (m ) == 0 {
202
+ if len (m ) == 0 {
203
203
panic (`cmap.ConcurrentMap is not initialized. Should run New() before usage.` )
204
204
}
205
- chans = make ([]chan Tuple , SHARD_COUNT )
205
+ chans = make ([]chan Tuple [ V ] , SHARD_COUNT )
206
206
wg := sync.WaitGroup {}
207
207
wg .Add (SHARD_COUNT )
208
208
// Foreach shard.
209
209
for index , shard := range m {
210
- go func (index int , shard * ConcurrentMapShared ) {
210
+ go func (index int , shard * ConcurrentMapShared [ V ] ) {
211
211
// Foreach key, value pair.
212
212
shard .RLock ()
213
- chans [index ] = make (chan Tuple , len (shard .items ))
213
+ chans [index ] = make (chan Tuple [ V ] , len (shard .items ))
214
214
wg .Done ()
215
215
for key , val := range shard .items {
216
- chans [index ] <- Tuple {key , val }
216
+ chans [index ] <- Tuple [ V ] {key , val }
217
217
}
218
218
shard .RUnlock ()
219
219
close (chans [index ])
@@ -224,11 +224,11 @@ func snapshot(m ConcurrentMap) (chans []chan Tuple) {
224
224
}
225
225
226
226
// fanIn reads elements from channels `chans` into channel `out`
227
- func fanIn (chans []chan Tuple , out chan Tuple ) {
227
+ func fanIn [ V any ] (chans []chan Tuple [ V ] , out chan Tuple [ V ] ) {
228
228
wg := sync.WaitGroup {}
229
229
wg .Add (len (chans ))
230
230
for _ , ch := range chans {
231
- go func (ch chan Tuple ) {
231
+ go func (ch chan Tuple [ V ] ) {
232
232
for t := range ch {
233
233
out <- t
234
234
}
@@ -239,9 +239,9 @@ func fanIn(chans []chan Tuple, out chan Tuple) {
239
239
close (out )
240
240
}
241
241
242
- // Items returns all items as map[string]interface{}
243
- func (m ConcurrentMap ) Items () map [string ]interface {} {
244
- tmp := make (map [string ]interface {} )
242
+ // Items returns all items as map[string]V
243
+ func (m ConcurrentMap [ V ] ) Items () map [string ]V {
244
+ tmp := make (map [string ]V )
245
245
246
246
// Insert items to temporary map.
247
247
for item := range m .IterBuffered () {
@@ -251,15 +251,15 @@ func (m ConcurrentMap) Items() map[string]interface{} {
251
251
return tmp
252
252
}
253
253
254
- // Iterator callback,called for every key,value found in
254
+ // Iterator callbacalled for every key,value found in
255
255
// maps. RLock is held for all calls for a given shard
256
256
// therefore callback sess consistent view of a shard,
257
257
// but not across the shards
258
- type IterCb func (key string , v interface {} )
258
+ type IterCb [ V any ] func (key string , v V )
259
259
260
260
// Callback based iterator, cheapest way to read
261
261
// all elements in a map.
262
- func (m ConcurrentMap ) IterCb (fn IterCb ) {
262
+ func (m ConcurrentMap [ V ] ) IterCb (fn IterCb [ V ] ) {
263
263
for idx := range m {
264
264
shard := (m )[idx ]
265
265
shard .RLock ()
@@ -271,15 +271,15 @@ func (m ConcurrentMap) IterCb(fn IterCb) {
271
271
}
272
272
273
273
// Keys returns all keys as []string
274
- func (m ConcurrentMap ) Keys () []string {
274
+ func (m ConcurrentMap [ V ] ) Keys () []string {
275
275
count := m .Count ()
276
276
ch := make (chan string , count )
277
277
go func () {
278
278
// Foreach shard.
279
279
wg := sync.WaitGroup {}
280
280
wg .Add (SHARD_COUNT )
281
281
for _ , shard := range m {
282
- go func (shard * ConcurrentMapShared ) {
282
+ go func (shard * ConcurrentMapShared [ V ] ) {
283
283
// Foreach key, value pair.
284
284
shard .RLock ()
285
285
for key := range shard .items {
@@ -302,9 +302,9 @@ func (m ConcurrentMap) Keys() []string {
302
302
}
303
303
304
304
//Reviles ConcurrentMap "private" variables to json marshal.
305
- func (m ConcurrentMap ) MarshalJSON () ([]byte , error ) {
305
+ func (m ConcurrentMap [ V ] ) MarshalJSON () ([]byte , error ) {
306
306
// Create a temporary map, which will hold all item spread across shards.
307
- tmp := make (map [string ]interface {} )
307
+ tmp := make (map [string ]V )
308
308
309
309
// Insert items to temporary map.
310
310
for item := range m .IterBuffered () {
@@ -326,13 +326,13 @@ func fnv32(key string) uint32 {
326
326
327
327
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
328
328
// will probably won't know which to type to unmarshal into, in such case
329
- // we'll end up with a value of type map[string]interface{} , In most cases this isn't
329
+ // we'll end up with a value of type map[string]V , In most cases this isn't
330
330
// out value type, this is why we've decided to remove this functionality.
331
331
332
332
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
333
333
// // Reverse process of Marshal.
334
334
335
- // tmp := make(map[string]interface{} )
335
+ // tmp := make(map[string]V )
336
336
337
337
// // Unmarshal into a single map.
338
338
// if err := json.Unmarshal(b, &tmp); err != nil {
0 commit comments