-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathetcd.go
328 lines (287 loc) · 9.33 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package etcd
import (
"context"
"fmt"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"io"
"strings"
"sync"
"time"
)
// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rpcRegisterTarget string
rootDirectory string
mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
}
func createNoOpLogger() *zap.Logger {
// Create a no-op write syncer
noOpWriter := zapcore.AddSync(io.Discard)
// Create a basic zap core with the no-op writer
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
noOpWriter,
zapcore.InfoLevel, // You can set this to any level that suits your needs
)
// Create the logger using the core
return zap.New(core)
}
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
// Increase keep-alive queue capacity and message size
PermitWithoutStream: true,
Logger: createNoOpLogger(),
MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB
}
// Apply provided options to the config
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
r, err := resolver.NewBuilder(client)
if err != nil {
return nil, err
}
s := &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
rootDirectory: rootDirectory,
connMap: make(map[string][]*grpc.ClientConn),
}
go s.watchServiceChanges()
return s, nil
}
// initializeConnMap fetches all existing endpoints and populates the local map
func (r *SvcDiscoveryRegistryImpl) initializeConnMap() error {
fullPrefix := fmt.Sprintf("%s/", r.rootDirectory)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
return err
}
r.connMap = make(map[string][]*grpc.ClientConn)
for _, kv := range resp.Kvs {
prefix, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil {
continue
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
}
return nil
}
// WithDialTimeout sets a custom dial timeout for the etcd client
func WithDialTimeout(timeout time.Duration) ZkOption {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = timeout
}
}
// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client
func WithMaxCallSendMsgSize(size int) ZkOption {
return func(cfg *clientv3.Config) {
cfg.MaxCallSendMsgSize = size
}
}
// WithUsernameAndPassword sets a username and password for the etcd client
func WithUsernameAndPassword(username, password string) ZkOption {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
}
}
// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
// GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
r.mu.RLock()
defer r.mu.RUnlock()
if len(r.connMap) == 0 {
r.initializeConnMap()
}
return r.connMap[fullServiceKey], nil
}
// GetConn returns a single gRPC client connection for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("etcd:///%s/%s", r.rootDirectory, serviceName)
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
}
// GetSelfConnTarget returns the connection target for the current service
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return r.rpcRegisterTarget
}
// AddOption appends gRPC dial options to the existing options
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.mu.Lock()
defer r.mu.Unlock()
r.connMap = make(map[string][]*grpc.ClientConn)
r.dialOptions = append(r.dialOptions, opts...)
}
// CloseConn closes a given gRPC client connection
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// Register registers a new service endpoint with etcd
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s/%s:%d", r.rootDirectory, serviceName, host, port)
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 30) //
if err != nil {
return err
}
r.leaseID = leaseResp.ID
r.rpcRegisterTarget = fmt.Sprintf("%s:%d", host, port)
endpoint := endpoints.Endpoint{Addr: r.rpcRegisterTarget}
err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID))
if err != nil {
return err
}
go r.keepAliveLease(r.leaseID)
return nil
}
// keepAliveLease maintains the lease alive by sending keep-alive requests
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil {
return
}
for ka := range ch {
if ka != nil {
} else {
return
}
}
}
// watchServiceChanges watches for changes in the service directory
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
for range watchChan {
r.mu.RLock()
r.initializeConnMap()
r.mu.RUnlock()
}
}
// refreshConnMap fetches the latest endpoints and updates the local map
func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
r.mu.Lock()
defer r.mu.Unlock()
fullPrefix := fmt.Sprintf("%s/", prefix)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
return
}
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
for _, kv := range resp.Kvs {
_, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil {
continue
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
}
}
// splitEndpoint splits the endpoint string into prefix and address
func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) {
lastSlashIndex := strings.LastIndex(input, "/")
if lastSlashIndex != -1 {
part1 := input[:lastSlashIndex]
part2 := input[lastSlashIndex+1:]
return part1, part2
}
return input, ""
}
// UnRegister removes the service endpoint from etcd
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
}
err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
if err != nil {
return err
}
return nil
}
// Close closes the etcd client connection
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()
}
r.mu.Lock()
defer r.mu.Unlock()
}
// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease
func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error {
cfg := clientv3.Config{
Endpoints: etcdServers,
}
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return errors.Wrap(err, "failed to connect to etcd")
}
defer client.Close()
var opCtx context.Context
var cancel context.CancelFunc
if cfg.DialTimeout != 0 {
opCtx, cancel = context.WithTimeout(ctx, cfg.DialTimeout)
} else {
opCtx, cancel = context.WithTimeout(ctx, 10*time.Second)
}
defer cancel()
resp, err := client.Get(opCtx, etcdRoot)
if err != nil {
return errors.Wrap(err, "failed to get the root node from etcd")
}
if len(resp.Kvs) == 0 {
if createIfNotExist {
var leaseTTL int64 = 10
var leaseResp *clientv3.LeaseGrantResponse
if leaseTTL > 0 {
leaseResp, err = client.Grant(opCtx, leaseTTL)
if err != nil {
return errors.Wrap(err, "failed to create lease in etcd")
}
}
putOpts := []clientv3.OpOption{}
if leaseResp != nil {
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
}
_, err := client.Put(opCtx, etcdRoot, "", putOpts...)
if err != nil {
return errors.Wrap(err, "failed to create the root node in etcd")
}
} else {
return fmt.Errorf("root node %s does not exist in etcd", etcdRoot)
}
}
return nil
}