Skip to content

Commit bf8b878

Browse files
authored
fix(go-client): parllel schema loads if descriptor accessed from different go routines (#162)
goburrow cache calling loaderFunc multiple times if key is accessed from multiple go routines. Removed goburrowcache.Implemented simple reloader with Ticker.
1 parent 831f2a4 commit bf8b878

File tree

6 files changed

+113
-48
lines changed

6 files changed

+113
-48
lines changed

clients/go/client.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ package stencil
55

66
import (
77
"encoding/json"
8+
"sync"
89
"time"
910

10-
"github.com/goburrow/cache"
1111
"github.com/pkg/errors"
1212
"google.golang.org/protobuf/encoding/protojson"
1313
"google.golang.org/protobuf/proto"
@@ -77,22 +77,22 @@ func (o *Options) setDefaults() {
7777
// NewClient creates stencil client. Downloads proto descriptor file from given url and stores the definitions.
7878
// It will throw error if download fails or downloaded file is not fully contained descriptor file
7979
func NewClient(urls []string, options Options) (Client, error) {
80-
cacheOptions := []cache.Option{cache.WithMaximumSize(len(urls))}
8180
options.setDefaults()
82-
if options.AutoRefresh {
83-
cacheOptions = append(cacheOptions, cache.WithRefreshAfterWrite(options.RefreshInterval), cache.WithExpireAfterWrite(options.RefreshInterval))
84-
}
85-
newCache := cache.NewLoadingCache(options.RefreshStrategy.getLoader(options), cacheOptions...)
86-
s, err := newStore(urls, newCache)
87-
if err != nil {
88-
return nil, err
81+
stores := []*store{}
82+
for _, url := range urls {
83+
s, err := newStore(url, options)
84+
if err != nil {
85+
return nil, err
86+
}
87+
stores = append(stores, s)
8988
}
90-
return &stencilClient{urls: urls, store: s, options: options}, nil
89+
90+
return &stencilClient{urls: urls, stores: stores, options: options}, nil
9191
}
9292

9393
type stencilClient struct {
9494
urls []string
95-
store *store
95+
stores []*store
9696
options Options
9797
}
9898

@@ -133,8 +133,8 @@ func (s *stencilClient) Serialize(className string, data interface{}) (bytes []b
133133
}
134134

135135
func (s *stencilClient) getMatchingResolver(className string) (*Resolver, bool) {
136-
for _, url := range s.urls {
137-
resolver, ok := s.store.getResolver(url)
136+
for _, store := range s.stores {
137+
resolver, ok := store.getResolver()
138138
if !ok {
139139
return nil, false
140140
}
@@ -156,13 +156,21 @@ func (s *stencilClient) GetDescriptor(className string) (protoreflect.MessageDes
156156
}
157157

158158
func (s *stencilClient) Close() {
159-
if s.store != nil {
160-
s.store.Close()
159+
for _, store := range s.stores {
160+
if store != nil {
161+
store.Close()
162+
}
161163
}
162164
}
163165

164166
func (s *stencilClient) Refresh() {
165-
for _, url := range s.urls {
166-
s.store.Refresh(url)
167+
var wg sync.WaitGroup
168+
for _, st := range s.stores {
169+
wg.Add(1)
170+
go func(s *store) {
171+
defer wg.Done()
172+
s.refresh()
173+
}(st)
167174
}
175+
wg.Wait()
168176
}

clients/go/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func TestNewClient(t *testing.T) {
181181
}))
182182
client, _ := stencil.NewClient([]string{ts.URL}, stencil.Options{AutoRefresh: true, RefreshInterval: 2 * time.Millisecond})
183183
// wait for interval to end
184-
time.Sleep(2 * time.Millisecond)
184+
time.Sleep(3 * time.Millisecond)
185185
client.GetDescriptor("test.One")
186186
time.Sleep(1 * time.Millisecond)
187187
client.Close()
@@ -381,7 +381,7 @@ func TestRefreshStrategies(t *testing.T) {
381381
assert.NoError(t, err)
382382
assert.NotNil(t, client)
383383
// wait for refresh interval
384-
time.Sleep(2 * time.Millisecond)
384+
time.Sleep(3 * time.Millisecond)
385385
desc, err := client.GetDescriptor("test.stencil.One")
386386
assert.Nil(t, err)
387387
assert.NotNil(t, desc)
@@ -392,7 +392,7 @@ func TestRefreshStrategies(t *testing.T) {
392392
// simulates version update
393393
versions = `{"versions": [1,2]}`
394394
// wait for refresh interval
395-
time.Sleep(2 * time.Millisecond)
395+
time.Sleep(3 * time.Millisecond)
396396
desc, err = client.GetDescriptor("test.stencil.One")
397397
assert.Nil(t, err)
398398
assert.NotNil(t, desc)

clients/go/go.mod

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ module github.com/odpf/stencil/clients/go
33
go 1.16
44

55
require (
6-
github.com/davecgh/go-spew v1.1.1 // indirect
7-
github.com/goburrow/cache v0.1.4
86
github.com/pkg/errors v0.9.1
97
github.com/stretchr/testify v1.7.0
108
google.golang.org/protobuf v1.26.0

clients/go/go.sum

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
12
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2-
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4-
github.com/goburrow/cache v0.1.4 h1:As4KzO3hgmzPlnaMniZU9+VmoNYseUhuELbxy9mRBfw=
5-
github.com/goburrow/cache v0.1.4/go.mod h1:cDFesZDnIlrHoNlMYqqMpCRawuXulgx+y7mXU8HZ+/c=
63
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
74
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
85
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=

clients/go/refresh_strategy.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"encoding/json"
55
"fmt"
66
"strings"
7-
8-
"github.com/goburrow/cache"
97
)
108

119
// RefreshStrategy clients can configure which refresh strategy to use to download latest schema.
@@ -21,7 +19,7 @@ const (
2119
VersionBasedRefresh
2220
)
2321

24-
func (r RefreshStrategy) getLoader(opts Options) cache.LoaderFunc {
22+
func (r RefreshStrategy) getLoader(opts Options) loaderFunc {
2523
switch r {
2624
case VersionBasedRefresh:
2725
return versionBasedRefresh(opts)
@@ -42,9 +40,8 @@ func loadFromURL(url string, opts Options) (*Resolver, error) {
4240
return NewResolver(data)
4341
}
4442

45-
func longPollingRefresh(opts Options) cache.LoaderFunc {
46-
return func(k cache.Key) (cache.Value, error) {
47-
url := k.(string)
43+
func longPollingRefresh(opts Options) loaderFunc {
44+
return func(url string) (*Resolver, error) {
4845
return loadFromURL(url, opts)
4946
}
5047
}
@@ -53,11 +50,10 @@ type versionsModel struct {
5350
Versions []int `json:"versions"`
5451
}
5552

56-
func versionBasedRefresh(opts Options) cache.LoaderFunc {
53+
func versionBasedRefresh(opts Options) loaderFunc {
5754
lastVersion := 0
5855
logger := wrapLogger(opts.Logger)
59-
return func(k cache.Key) (cache.Value, error) {
60-
url := k.(string)
56+
return func(url string) (*Resolver, error) {
6157
versionsURL := fmt.Sprintf("%s/versions", strings.TrimRight(url, "/"))
6258
data, err := downloader(versionsURL, opts.HTTPOptions)
6359
if err != nil {

clients/go/store.go

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,93 @@
11
package stencil
22

33
import (
4-
"github.com/goburrow/cache"
4+
"io"
5+
"sync"
6+
"time"
57
)
68

9+
type loaderFunc func(string) (*Resolver, error)
10+
type timer struct {
11+
ticker *time.Ticker
12+
done chan bool
13+
}
14+
15+
func (t *timer) Close() error {
16+
t.ticker.Stop()
17+
t.done <- true
18+
return nil
19+
}
20+
21+
func setInterval(d time.Duration, f func(), waitForReader <-chan bool) io.Closer {
22+
ticker := time.NewTicker(d)
23+
done := make(chan bool)
24+
go (func() {
25+
for {
26+
select {
27+
case <-done:
28+
return
29+
case <-ticker.C:
30+
// wait for access
31+
refresh := <-waitForReader
32+
if refresh {
33+
f()
34+
}
35+
}
36+
}
37+
})()
38+
return &timer{ticker: ticker, done: done}
39+
}
40+
741
type store struct {
8-
cache.LoadingCache
42+
autoRefresh bool
43+
timer io.Closer
44+
access chan bool
45+
loader loaderFunc
46+
url string
47+
data *Resolver
48+
lock sync.RWMutex
949
}
1050

11-
func (s *store) getResolver(key string) (*Resolver, bool) {
12-
val, err := s.Get(key)
13-
if err != nil {
14-
return nil, false
51+
func (s *store) refresh() {
52+
val, err := s.loader(s.url)
53+
if err == nil {
54+
s.lock.Lock()
55+
defer s.lock.Unlock()
56+
s.data = val
1557
}
16-
return val.(*Resolver), true
1758
}
1859

19-
func newStore(urls []string, loadingCache cache.LoadingCache) (*store, error) {
20-
s := &store{loadingCache}
21-
for _, url := range urls {
22-
if _, err := loadingCache.Get(url); err != nil {
23-
return s, err
24-
}
60+
func (s *store) notify() {
61+
select {
62+
case s.access <- true:
63+
default:
64+
}
65+
}
66+
67+
func (s *store) getResolver() (*Resolver, bool) {
68+
s.notify()
69+
s.lock.RLock()
70+
defer s.lock.RUnlock()
71+
return s.data, s.data != nil
72+
}
73+
74+
func (s *store) Close() {
75+
close(s.access)
76+
if s.timer != nil {
77+
s.timer.Close()
78+
}
79+
}
80+
81+
func newStore(url string, options Options) (*store, error) {
82+
loader := options.RefreshStrategy.getLoader(options)
83+
s := &store{loader: loader, access: make(chan bool), url: url, autoRefresh: options.AutoRefresh}
84+
val, err := loader(url)
85+
if err != nil {
86+
return s, err
87+
}
88+
s.data = val
89+
if options.AutoRefresh {
90+
s.timer = setInterval(options.RefreshInterval, s.refresh, s.access)
2591
}
2692
return s, nil
2793
}

0 commit comments

Comments
 (0)