Skip to content

Commit f1d9927

Browse files
Cache functionality and delivery-service-based finality listeners
Signed-off-by: Alexandros Filios <[email protected]>
1 parent 74446d7 commit f1d9927

File tree

12 files changed

+934
-6
lines changed

12 files changed

+934
-6
lines changed

platform/common/utils/cache/cache.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cache
8+
9+
import "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
10+
11+
var logger = logging.MustGetLogger("fabric-sdk.cache")
12+
13+
type Map[K comparable, V any] interface {
14+
Get(K) (V, bool)
15+
Put(K, V)
16+
Update(K, func(bool, V) (bool, V)) bool
17+
Delete(...K)
18+
Len() int
19+
}
20+
21+
type rwLock interface {
22+
Lock()
23+
Unlock()
24+
RLock()
25+
RUnlock()
26+
}
27+
28+
type noLock struct{}
29+
30+
func (l *noLock) Lock() {}
31+
func (l *noLock) Unlock() {}
32+
func (l *noLock) RLock() {}
33+
func (l *noLock) RUnlock() {}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cache
8+
9+
import "fmt"
10+
11+
func evict[K comparable, V any](keys []K, m map[K]V, onEvict func(map[K]V)) {
12+
evicted := make(map[K]V, len(keys))
13+
for _, k := range keys {
14+
if v, ok := m[k]; ok {
15+
evicted[k] = v
16+
delete(m, k)
17+
} else {
18+
logger.Debugf("No need to evict [%k]. Was already deleted.")
19+
}
20+
}
21+
onEvict(evicted)
22+
}
23+
24+
type evictionCache[K comparable, V any] struct {
25+
m map[K]V
26+
l rwLock
27+
evictionPolicy EvictionPolicy[K]
28+
}
29+
30+
func (c *evictionCache[K, V]) String() string {
31+
return fmt.Sprintf("Content: [%v], Eviction policy: [%v]", c.m, c.evictionPolicy)
32+
}
33+
34+
type EvictionPolicy[K comparable] interface {
35+
// Push adds a key and must be invoked under write-lock
36+
Push(K)
37+
}
38+
39+
func (c *evictionCache[K, V]) Get(key K) (V, bool) {
40+
c.l.RLock()
41+
defer c.l.RUnlock()
42+
v, ok := c.m[key]
43+
return v, ok
44+
}
45+
46+
func (c *evictionCache[K, V]) Put(key K, value V) {
47+
c.l.Lock()
48+
defer c.l.Unlock()
49+
c.m[key] = value
50+
// We assume that a value is always new for performance reasons.
51+
// If we try to put again a value, this value will be put also in the LRU keys instead of just promoting the existing one.
52+
// If we put this value c.cap times, then this will evict all other values.
53+
c.evictionPolicy.Push(key)
54+
}
55+
56+
func (c *evictionCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool {
57+
c.l.Lock()
58+
defer c.l.Unlock()
59+
v, ok := c.m[key]
60+
keep, newValue := f(ok, v)
61+
if !keep {
62+
delete(c.m, key)
63+
} else {
64+
c.m[key] = newValue
65+
}
66+
if !ok && keep {
67+
c.evictionPolicy.Push(key)
68+
}
69+
return ok
70+
}
71+
72+
func (c *evictionCache[K, V]) Delete(keys ...K) {
73+
c.l.Lock()
74+
defer c.l.Unlock()
75+
for _, key := range keys {
76+
delete(c.m, key)
77+
}
78+
}
79+
80+
func (c *evictionCache[K, V]) Len() int {
81+
c.l.RLock()
82+
defer c.l.RUnlock()
83+
return len(c.m)
84+
}

platform/common/utils/cache/lru.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cache
8+
9+
import (
10+
"fmt"
11+
"sync"
12+
13+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
14+
)
15+
16+
// NewLRUCache creates a cache with limited size with LRU eviction policy.
17+
// It is guaranteed that at least size elements can be kept in the cache.
18+
// The cache is cleaned up when the cache is full, i.e. it contains size + buffer elements
19+
func NewLRUCache[K comparable, V any](size, buffer int, onEvict func(map[K]V)) *evictionCache[K, V] {
20+
m := map[K]V{}
21+
return &evictionCache[K, V]{
22+
m: m,
23+
l: &sync.RWMutex{},
24+
evictionPolicy: NewLRUEviction(size, buffer, func(keys []K) { evict(keys, m, onEvict) }),
25+
}
26+
}
27+
28+
func NewLRUEviction[K comparable](size, buffer int, evict func([]K)) *lruEviction[K] {
29+
return &lruEviction[K]{
30+
size: size,
31+
cap: size + buffer,
32+
keys: make([]K, 0, size+buffer),
33+
keySet: make(map[K]struct{}, size+buffer),
34+
evict: evict,
35+
}
36+
}
37+
38+
type lruEviction[K comparable] struct {
39+
// size is the minimum amount of entries guaranteed to be kept in cache.
40+
size int
41+
// cap + size is the maximum amount of entries that can be kept in cache. After that, a cleanup is invoked.
42+
cap int
43+
// keys keeps track of which keys should be evicted.
44+
// The last element of the slice is the most recent one.
45+
// Performance improvement: keep sliding index to avoid reallocating
46+
keys []K
47+
// keySet is for faster lookup whether a key exists
48+
keySet map[K]struct{}
49+
// evict is called when we evict
50+
evict func([]K)
51+
mu sync.Mutex
52+
}
53+
54+
func (c *lruEviction[K]) Push(key K) {
55+
c.mu.Lock()
56+
defer c.mu.Unlock()
57+
if _, ok := c.keySet[key]; ok {
58+
return
59+
}
60+
c.keySet[key] = struct{}{}
61+
c.keys = append(c.keys, key)
62+
if len(c.keys) <= c.cap {
63+
return
64+
}
65+
logger.Debugf("Capacity of %d exceeded. Evicting old keys by shifting LRU keys keeping only the %d most recent ones", c.cap, c.size)
66+
evicted := c.keys[0 : c.cap-c.size+1]
67+
for _, k := range evicted {
68+
delete(c.keySet, k)
69+
}
70+
c.evict(evicted)
71+
c.keys = (c.keys)[c.cap-c.size+1:]
72+
}
73+
74+
func (c *lruEviction[K]) String() string {
75+
return fmt.Sprintf("Keys: [%v], KeySet: [%v]", c.keys, collections.Keys(c.keySet))
76+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cache_test
8+
9+
import (
10+
"fmt"
11+
"sync"
12+
"sync/atomic"
13+
"testing"
14+
15+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache"
16+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
17+
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
18+
)
19+
20+
func TestLRUSimple(t *testing.T) {
21+
allEvicted := make(map[int]string)
22+
23+
c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) })
24+
25+
c.Put(1, "a")
26+
c.Put(2, "b")
27+
c.Put(3, "c")
28+
c.Put(4, "d")
29+
c.Put(5, "e")
30+
31+
assert.Equal(0, len(allEvicted))
32+
assert.Equal(5, c.Len())
33+
v, _ := c.Get(1)
34+
assert.Equal("a", v)
35+
36+
c.Put(6, "f")
37+
assert.Equal(map[int]string{1: "a", 2: "b", 3: "c"}, allEvicted)
38+
assert.Equal(3, c.Len())
39+
_, ok := c.Get(1)
40+
assert.False(ok)
41+
}
42+
43+
func TestLRUSameKey(t *testing.T) {
44+
allEvicted := make(map[int]string)
45+
46+
c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { collections.CopyMap(allEvicted, evicted) })
47+
48+
c.Put(1, "a")
49+
c.Put(2, "b")
50+
c.Put(3, "c")
51+
c.Put(1, "d")
52+
c.Put(1, "e")
53+
c.Put(1, "f")
54+
assert.Equal(0, len(allEvicted))
55+
assert.Equal(3, c.Len())
56+
v, _ := c.Get(1)
57+
assert.Equal("f", v)
58+
59+
c.Put(4, "g")
60+
c.Put(5, "h")
61+
assert.Equal(0, len(allEvicted))
62+
assert.Equal(5, c.Len())
63+
v, _ = c.Get(1)
64+
assert.Equal("f", v)
65+
66+
c.Put(6, "i")
67+
assert.Equal(map[int]string{1: "f", 2: "b", 3: "c"}, allEvicted)
68+
assert.Equal(3, c.Len())
69+
_, ok := c.Get(1)
70+
assert.False(ok)
71+
72+
allEvicted = map[int]string{}
73+
74+
c.Put(1, "j")
75+
c.Put(2, "k")
76+
77+
assert.Equal(0, len(allEvicted))
78+
assert.Equal(5, c.Len())
79+
v, _ = c.Get(4)
80+
assert.Equal("g", v)
81+
82+
c.Put(3, "l")
83+
84+
assert.Equal(map[int]string{4: "g", 5: "h", 6: "i"}, allEvicted)
85+
assert.Equal(3, c.Len())
86+
v, _ = c.Get(1)
87+
assert.Equal("j", v)
88+
}
89+
90+
func TestLRUParallel(t *testing.T) {
91+
evictedCount := atomic.Int32{}
92+
c := cache.NewLRUCache(3, 2, func(evicted map[int]string) { evictedCount.Add(int32(len(evicted))) })
93+
94+
var wg sync.WaitGroup
95+
wg.Add(100)
96+
for i := 0; i < 100; i++ {
97+
go func(i int) {
98+
c.Put(i, fmt.Sprintf("item-%d", i))
99+
wg.Done()
100+
}(i)
101+
}
102+
wg.Wait()
103+
104+
assert.Equal(4, c.Len())
105+
assert.Equal(96, int(evictedCount.Load()))
106+
}

platform/common/utils/cache/map.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cache
8+
9+
// NewMapCache creates a dummy implementation of the Cache interface.
10+
// It is backed by a map with unlimited capacity.
11+
func NewMapCache[K comparable, V any]() *mapCache[K, V] {
12+
return &mapCache[K, V]{
13+
m: map[K]V{},
14+
l: &noLock{},
15+
}
16+
}
17+
18+
type mapCache[K comparable, V any] struct {
19+
m map[K]V
20+
l rwLock
21+
}
22+
23+
func (c *mapCache[K, V]) Get(key K) (V, bool) {
24+
c.l.RLock()
25+
defer c.l.RUnlock()
26+
v, ok := c.m[key]
27+
return v, ok
28+
}
29+
30+
func (c *mapCache[K, V]) Put(key K, value V) {
31+
c.l.Lock()
32+
defer c.l.Unlock()
33+
c.m[key] = value
34+
}
35+
36+
func (c *mapCache[K, V]) Delete(keys ...K) {
37+
c.l.Lock()
38+
defer c.l.Unlock()
39+
for _, key := range keys {
40+
delete(c.m, key)
41+
}
42+
}
43+
44+
func (c *mapCache[K, V]) Update(key K, f func(bool, V) (bool, V)) bool {
45+
c.l.Lock()
46+
defer c.l.Unlock()
47+
v, ok := c.m[key]
48+
keep, newValue := f(ok, v)
49+
if !keep {
50+
delete(c.m, key)
51+
} else {
52+
c.m[key] = newValue
53+
}
54+
return ok
55+
}
56+
57+
func (c *mapCache[K, V]) Len() int {
58+
return len(c.m)
59+
}

0 commit comments

Comments
 (0)