Skip to content

Commit d3f4832

Browse files
authored
Implement consul datasource (alibaba#116)
Add consul datasource implementation
1 parent fd908a5 commit d3f4832

File tree

7 files changed

+449
-2
lines changed

7 files changed

+449
-2
lines changed

.travis.yml

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ dist: xenial
44
sudo: required
55

66
go:
7-
- 1.12.x
87
- 1.13.x
98
- 1.14.x
109
env:

ext/datasource/consul/consul.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package consul
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/alibaba/sentinel-golang/ext/datasource"
9+
"github.com/alibaba/sentinel-golang/logging"
10+
"github.com/alibaba/sentinel-golang/util"
11+
"github.com/hashicorp/consul/api"
12+
)
13+
14+
type consulDataSource struct {
15+
datasource.Base
16+
*options
17+
18+
propertyKey string
19+
kvQuerier KVQuerier
20+
isInitialized util.AtomicBool
21+
cancel context.CancelFunc
22+
queryOptions api.QueryOptions
23+
}
24+
25+
var (
26+
ErrNilConsulClient = errors.New("nil consul client")
27+
ErrInvalidConsulConfig = errors.New("invalid consul config")
28+
29+
logger = logging.GetDefaultLogger()
30+
)
31+
32+
// NewDatasource returns new consul datasource instance
33+
func NewDatasource(propertyKey string, opts ...Option) (datasource.DataSource, error) {
34+
var options = evaluateOptions(opts)
35+
// if not consul client is specified, initialize from the configuration
36+
if options.consulClient == nil {
37+
if options.consulConfig == nil {
38+
return nil, ErrInvalidConsulConfig
39+
}
40+
client, err := api.NewClient(options.consulConfig)
41+
if err != nil {
42+
return nil, err
43+
}
44+
options.consulClient = client
45+
}
46+
47+
// still no consul client, throw error
48+
if options.consulClient == nil {
49+
return nil, ErrNilConsulClient
50+
}
51+
return newConsulDataSource(propertyKey, options), nil
52+
}
53+
54+
func newConsulDataSource(propertyKey string, options *options) *consulDataSource {
55+
ctx, cancel := context.WithCancel(options.queryOptions.Context())
56+
ds := &consulDataSource{
57+
propertyKey: propertyKey,
58+
options: options,
59+
kvQuerier: options.consulClient.KV(),
60+
cancel: cancel,
61+
queryOptions: *options.queryOptions.WithContext(ctx),
62+
}
63+
64+
for _, h := range options.propertyHandlers {
65+
ds.AddPropertyHandler(h)
66+
}
67+
return ds
68+
}
69+
70+
// ReadSource implement datasource.DataSource interface
71+
func (c *consulDataSource) ReadSource() ([]byte, error) {
72+
pair, meta, err := c.kvQuerier.Get(c.propertyKey, &c.queryOptions)
73+
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
c.queryOptions.WaitIndex = meta.LastIndex
79+
if pair == nil {
80+
return []byte{}, nil
81+
}
82+
return pair.Value, nil
83+
}
84+
85+
// Initialize implement datasource.DataSource interface
86+
func (c *consulDataSource) Initialize() error {
87+
if !c.isInitialized.CompareAndSet(false, true) {
88+
return errors.New("duplicate initialize consul datasource")
89+
}
90+
if err := c.doReadAndUpdate(); err != nil {
91+
logger.Errorf("[consul] doReadAndUpdate failed: %s", err.Error())
92+
return err
93+
}
94+
95+
if !c.disableWatch {
96+
go util.RunWithRecover(c.watch, logger)
97+
}
98+
return nil
99+
}
100+
101+
func (c *consulDataSource) watch() {
102+
for {
103+
if err := c.doReadAndUpdate(); err != nil {
104+
if errors.Is(err, context.Canceled) {
105+
return
106+
}
107+
108+
if api.IsRetryableError(err) {
109+
logger.Warnf("[consul] doUpdate failed with retryable error: %s", err.Error())
110+
time.Sleep(time.Second)
111+
continue
112+
}
113+
114+
logger.Errorf("[consul] doUpdate failed: %s", err.Error())
115+
}
116+
}
117+
}
118+
119+
func (c *consulDataSource) doUpdate(src []byte) (err error) {
120+
if len(src) == 0 {
121+
return c.Handle(nil)
122+
}
123+
return c.Handle(src)
124+
}
125+
126+
func (c *consulDataSource) doReadAndUpdate() (err error) {
127+
src, err := c.ReadSource()
128+
if err != nil {
129+
return err
130+
}
131+
return c.doUpdate(src)
132+
}
133+
134+
func (c *consulDataSource) Close() error {
135+
if c.cancel != nil {
136+
c.cancel()
137+
}
138+
logger.Info("[consul] close consul datasource")
139+
return nil
140+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package consul
2+
3+
import (
4+
"time"
5+
6+
"github.com/hashicorp/consul/api"
7+
)
8+
9+
func Example_consulDatasource_CustomizeClient() {
10+
client, err := api.NewClient(&api.Config{
11+
Address: "127.0.0.1:8500",
12+
})
13+
if err != nil {
14+
// todo something
15+
}
16+
ds, err := NewDatasource("property_key",
17+
// customize consul client
18+
WithConsulClient(client),
19+
// disable dynamic datasource watch
20+
WithDisableWatch(true),
21+
// preset property handlers
22+
WithPropertyHandlers(),
23+
// reset queryOptions, defaultQueryOptions as default
24+
WithQueryOptions(&api.QueryOptions{}),
25+
)
26+
27+
if err != nil {
28+
// todo something
29+
}
30+
31+
if err := ds.Initialize(); err != nil {
32+
// todo something
33+
}
34+
}
35+
36+
func Example_consulDatasource_CustomizeConfig() {
37+
ds, err := NewDatasource("property_key",
38+
// customize consul config
39+
WithConsulConfig(&api.Config{
40+
Address: "127.0.0.1:8500",
41+
}),
42+
// disable dynamic datasource watch
43+
WithDisableWatch(true),
44+
// preset property handlers
45+
WithPropertyHandlers(),
46+
// reset queryOptions, defaultQueryOptions as default
47+
WithQueryOptions(&api.QueryOptions{
48+
WaitIndex: 0,
49+
// override default WaitTime(5min)
50+
WaitTime: time.Second * 90,
51+
}),
52+
)
53+
54+
if err != nil {
55+
// todo something
56+
}
57+
58+
if err := ds.Initialize(); err != nil {
59+
// todo something
60+
}
61+
}

ext/datasource/consul/consul_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package consul
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/alibaba/sentinel-golang/core/system"
9+
"github.com/alibaba/sentinel-golang/ext/datasource"
10+
"github.com/hashicorp/consul/api"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
)
14+
15+
const (
16+
TestSystemRules = `[
17+
{
18+
"id": 0,
19+
"metricType": 0,
20+
"adaptiveStrategy": 0
21+
},
22+
{
23+
"id": 1,
24+
"metricType": 0,
25+
"adaptiveStrategy": 0
26+
},
27+
{
28+
"id": 2,
29+
"metricType": 0,
30+
"adaptiveStrategy": 0
31+
}
32+
]`
33+
)
34+
35+
var (
36+
SystemRules = []*system.SystemRule{
37+
{MetricType: 0, Strategy: 0},
38+
{MetricType: 0, Strategy: 0},
39+
{MetricType: 0, Strategy: 0},
40+
}
41+
)
42+
43+
type consulClientMock struct {
44+
mock.Mock
45+
pair *api.KVPair
46+
lock sync.Mutex
47+
}
48+
49+
func (c *consulClientMock) Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error) {
50+
c.lock.Lock()
51+
defer c.lock.Unlock()
52+
return c.pair, &api.QueryMeta{
53+
LastIndex: c.pair.ModifyIndex,
54+
LastContentHash: "",
55+
LastContact: 0,
56+
KnownLeader: false,
57+
RequestTime: 0,
58+
AddressTranslationEnabled: false,
59+
CacheHit: false,
60+
CacheAge: 0,
61+
}, nil
62+
}
63+
64+
func (c consulClientMock) List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error) {
65+
panic("implement me")
66+
}
67+
68+
func newQuerierMock() *consulClientMock {
69+
return &consulClientMock{
70+
pair: &api.KVPair{
71+
Key: "property_key",
72+
CreateIndex: 0,
73+
ModifyIndex: 0,
74+
LockIndex: 0,
75+
Flags: 0,
76+
Value: []byte(TestSystemRules),
77+
Session: "",
78+
},
79+
}
80+
}
81+
82+
func (c *consulClientMock) resetPair(pair *api.KVPair) {
83+
c.lock.Lock()
84+
defer c.lock.Unlock()
85+
c.pair = pair
86+
}
87+
88+
func TestConsulDatasource(t *testing.T) {
89+
mock := newQuerierMock()
90+
ds := newConsulDataSource("property_key", evaluateOptions([]Option{}))
91+
ds.kvQuerier = mock
92+
93+
ds.AddPropertyHandler(datasource.NewDefaultPropertyHandler(
94+
datasource.SystemRulesJsonConverter,
95+
func(rule interface{}) error {
96+
assert.NotNil(t, rule)
97+
assert.EqualValues(t, SystemRules, rule)
98+
return nil
99+
},
100+
))
101+
102+
assert.Nil(t, ds.Initialize())
103+
assert.EqualError(t, ds.Initialize(), "duplicate initialize consul datasource")
104+
105+
t.Run("WatchSourceChange", func(t *testing.T) {
106+
mock.resetPair(&api.KVPair{
107+
Key: "property_key",
108+
CreateIndex: 0,
109+
ModifyIndex: 1,
110+
LockIndex: 0,
111+
Flags: 0,
112+
Value: []byte(TestSystemRules),
113+
Session: "",
114+
})
115+
})
116+
time.Sleep(time.Second)
117+
}

0 commit comments

Comments
 (0)