Skip to content

Commit 1fdc1d8

Browse files
authored
Merge pull request #201 from withchao/main
feat: optimizing the code
2 parents 55c4bb8 + cca39bf commit 1fdc1d8

35 files changed

+1083
-252
lines changed

discovery/discovery_register.go

+22-31
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,36 @@ package discovery
1616

1717
import (
1818
"context"
19+
"errors"
1920

2021
"google.golang.org/grpc"
2122
)
2223

23-
//
24-
//type Conn interface {
25-
// GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) //1
26-
// GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) //2
27-
// GetSelfConnTarget() string //3
28-
// AddOption(opts ...grpc.DialOption) //4
29-
// CloseConn(conn *grpc.ClientConn) //5
30-
// // do not use this method for call rpc
31-
//
32-
// GetClientLocalConns() map[string][]*grpc.ClientConn //del
33-
//
34-
// GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) //del
35-
//}
36-
//
37-
//type SvcDiscoveryRegistry interface {
38-
// Conn
39-
// Register(serviceName, host string, port int, opts ...grpc.DialOption) error //6
40-
// UnRegister() error //7
41-
// RegisterConf2Registry(key string, conf []byte) error //del
42-
// GetConfFromRegistry(key string) ([]byte, error) //del
43-
// Close() //
44-
//}
24+
var ErrNotSupportedKeyValue = errors.New("discovery data not supported key value")
4525

4626
type Conn interface {
47-
GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) //1
48-
GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) //2
49-
GetSelfConnTarget() string //3
50-
AddOption(opts ...grpc.DialOption) //4
51-
CloseConn(conn *grpc.ClientConn) //5
52-
// do not use this method for call rpc
27+
GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error)
28+
GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error)
29+
IsSelfNode(cc grpc.ClientConnInterface) bool
30+
}
31+
32+
type WatchKey struct {
33+
Value []byte
5334
}
35+
36+
type WatchKeyHandler func(data *WatchKey) error
37+
38+
type KeyValue interface {
39+
SetKey(ctx context.Context, key string, value []byte) error
40+
GetKey(ctx context.Context, key string) ([]byte, error)
41+
WatchKey(ctx context.Context, key string, fn WatchKeyHandler) error
42+
}
43+
5444
type SvcDiscoveryRegistry interface {
5545
Conn
56-
Register(serviceName, host string, port int, opts ...grpc.DialOption) error //6
57-
UnRegister() error //7
46+
KeyValue
47+
AddOption(opts ...grpc.DialOption)
48+
Register(ctx context.Context, serviceName, host string, port int, opts ...grpc.DialOption) error
5849
Close()
59-
GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) //
50+
GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error)
6051
}

discovery/discovery_register_test.go

-43
This file was deleted.

discovery/etcd/etcd.go

+64-9
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"fmt"
66
"io"
77
"net"
8+
"strconv"
89
"strings"
910
"sync"
1011
"time"
1112

13+
"github.com/openimsdk/tools/discovery"
1214
"github.com/openimsdk/tools/errs"
1315
"github.com/openimsdk/tools/log"
1416
"github.com/openimsdk/tools/utils/datautil"
@@ -191,7 +193,7 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context,
191193
}
192194

193195
// GetConns returns gRPC client connections for a given service name
194-
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
196+
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
195197
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
196198
if len(r.connMap) == 0 {
197199
if err := r.initializeConnMap(opts...); err != nil {
@@ -200,11 +202,11 @@ func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName str
200202
}
201203
r.mu.RLock()
202204
defer r.mu.RUnlock()
203-
return datautil.Batch(func(t *addrConn) *grpc.ClientConn { return t.conn }, r.connMap[fullServiceKey]), nil
205+
return datautil.Batch(func(t *addrConn) grpc.ClientConnInterface { return t.conn }, r.connMap[fullServiceKey]), nil
204206
}
205207

206208
// GetConn returns a single gRPC client connection for a given service name
207-
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
209+
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
208210
target := fmt.Sprintf("etcd:///%s/%s", r.rootDirectory, serviceName)
209211

210212
dialOpts := append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))
@@ -222,6 +224,14 @@ func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
222224
return r.rpcRegisterTarget
223225
}
224226

227+
func (r *SvcDiscoveryRegistryImpl) IsSelfNode(cc grpc.ClientConnInterface) bool {
228+
cli, ok := cc.(*grpc.ClientConn)
229+
if !ok {
230+
return false
231+
}
232+
return r.GetSelfConnTarget() == cli.Target()
233+
}
234+
225235
// AddOption appends gRPC dial options to the existing options
226236
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
227237
r.mu.Lock()
@@ -231,20 +241,20 @@ func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
231241
}
232242

233243
// CloseConn closes a given gRPC client connection
234-
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
235-
conn.Close()
236-
}
244+
//func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
245+
// conn.Close()
246+
//}
237247

238248
// Register registers a new service endpoint with etcd
239-
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
240-
r.serviceKey = fmt.Sprintf("%s/%s/%s:%d", r.rootDirectory, serviceName, host, port)
249+
func (r *SvcDiscoveryRegistryImpl) Register(ctx context.Context, serviceName, host string, port int, opts ...grpc.DialOption) error {
250+
r.serviceKey = fmt.Sprintf("%s/%s/%s", r.rootDirectory, serviceName, net.JoinHostPort(host, strconv.Itoa(port)))
241251
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
242252
if err != nil {
243253
return err
244254
}
245255
r.endpointMgr = em
246256

247-
leaseResp, err := r.client.Grant(context.Background(), 30) //
257+
leaseResp, err := r.client.Grant(ctx, 30) //
248258
if err != nil {
249259
return err
250260
}
@@ -259,6 +269,12 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
259269
}
260270

261271
go r.keepAliveLease(r.leaseID)
272+
273+
//_, err := r.client.Put(ctx, BuildDiscoveryKey(serviceName), jsonutil.StructToJsonString(BuildDefaultTarget(host, port)))
274+
//if err != nil {
275+
// return err
276+
//}
277+
262278
return nil
263279
}
264280

@@ -405,3 +421,42 @@ func (r *SvcDiscoveryRegistryImpl) resetConnMap() {
405421
}
406422
r.connMap = make(map[string][]*addrConn)
407423
}
424+
425+
func (r *SvcDiscoveryRegistryImpl) SetKey(ctx context.Context, key string, data []byte) error {
426+
if _, err := r.client.Put(ctx, key, string(data)); err != nil {
427+
return errs.WrapMsg(err, "etcd put err")
428+
}
429+
return nil
430+
}
431+
432+
func (r *SvcDiscoveryRegistryImpl) GetKey(ctx context.Context, key string) ([]byte, error) {
433+
resp, err := r.client.Get(ctx, key)
434+
if err != nil {
435+
return nil, errs.WrapMsg(err, "etcd get err")
436+
}
437+
if len(resp.Kvs) == 0 {
438+
return nil, nil
439+
}
440+
return resp.Kvs[0].Value, nil
441+
}
442+
443+
func (r *SvcDiscoveryRegistryImpl) DelData(ctx context.Context, key string) error {
444+
if _, err := r.client.Delete(ctx, key); err != nil {
445+
return errs.WrapMsg(err, "etcd delete err")
446+
}
447+
return nil
448+
}
449+
450+
func (r *SvcDiscoveryRegistryImpl) WatchKey(ctx context.Context, key string, fn discovery.WatchKeyHandler) error {
451+
watchChan := r.client.Watch(ctx, key)
452+
for watchResp := range watchChan {
453+
for _, event := range watchResp.Events {
454+
if event.IsModify() && string(event.Kv.Key) == key {
455+
if err := fn(&discovery.WatchKey{Value: event.Kv.Value}); err != nil {
456+
return err
457+
}
458+
}
459+
}
460+
}
461+
return nil
462+
}

discovery/kubernetes/kubernetes.go

+39-11
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package kubernetes
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"log"
78
"os"
89
"sync"
910
"time"
1011

12+
"github.com/openimsdk/tools/discovery"
1113
"github.com/openimsdk/tools/errs"
1214
"google.golang.org/grpc"
1315
"google.golang.org/grpc/credentials/insecure"
@@ -28,7 +30,7 @@ type KubernetesConnManager struct {
2830
selfTarget string
2931

3032
mu sync.RWMutex
31-
connMap map[string][]*grpc.ClientConn
33+
connMap map[string][]grpc.ClientConnInterface
3234
}
3335

3436
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
@@ -48,7 +50,7 @@ func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*Ku
4850
clientset: clientset,
4951
namespace: namespace,
5052
dialOptions: options,
51-
connMap: make(map[string][]*grpc.ClientConn),
53+
connMap: make(map[string][]grpc.ClientConnInterface),
5254
}
5355

5456
go k.watchEndpoints()
@@ -69,7 +71,7 @@ func (k *KubernetesConnManager) initializeConns(serviceName string, opts ...grpc
6971

7072
// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)
7173

72-
var conns []*grpc.ClientConn
74+
var conns []grpc.ClientConnInterface
7375
for _, subset := range endpoints.Subsets {
7476
for _, address := range subset.Addresses {
7577
target := fmt.Sprintf("%s:%d", address.IP, port)
@@ -100,7 +102,7 @@ func (k *KubernetesConnManager) initializeConns(serviceName string, opts ...grpc
100102
}
101103

102104
// GetConns returns gRPC client connections for a given Kubernetes service name.
103-
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
105+
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
104106
k.mu.RLock()
105107

106108
conns, exists := k.connMap[serviceName]
@@ -126,7 +128,7 @@ func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string
126128
}
127129

128130
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
129-
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
131+
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
130132
var target string
131133

132134
if k.rpcTargets[serviceName] == "" {
@@ -189,6 +191,14 @@ func (k *KubernetesConnManager) GetSelfConnTarget() string {
189191
return k.selfTarget
190192
}
191193

194+
func (k *KubernetesConnManager) IsSelfNode(cc grpc.ClientConnInterface) bool {
195+
cli, ok := cc.(*grpc.ClientConn)
196+
if !ok {
197+
return false
198+
}
199+
return k.GetSelfConnTarget() == cli.Target()
200+
}
201+
192202
// AddOption appends gRPC dial options to the existing options.
193203
func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
194204
k.mu.Lock()
@@ -197,23 +207,25 @@ func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
197207
}
198208

199209
// CloseConn closes a given gRPC client connection.
200-
func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
201-
conn.Close()
202-
}
210+
//func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
211+
// conn.Close()
212+
//}
203213

204214
// Close closes all gRPC connections managed by KubernetesConnManager.
205215
func (k *KubernetesConnManager) Close() {
206216
k.mu.Lock()
207217
defer k.mu.Unlock()
208218
for _, conns := range k.connMap {
209219
for _, conn := range conns {
210-
_ = conn.Close()
220+
if closer, ok := conn.(io.Closer); ok {
221+
_ = closer.Close()
222+
}
211223
}
212224
}
213-
k.connMap = make(map[string][]*grpc.ClientConn)
225+
k.connMap = make(map[string][]grpc.ClientConnInterface)
214226
}
215227

216-
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
228+
func (k *KubernetesConnManager) Register(ctx context.Context, serviceName, host string, port int, opts ...grpc.DialOption) error {
217229
return nil
218230
}
219231

@@ -295,3 +307,19 @@ func (k *KubernetesConnManager) checkOpts(opts ...grpc.DialOption) error {
295307

296308
return nil
297309
}
310+
311+
func (k *KubernetesConnManager) SetKey(ctx context.Context, key string, data []byte) error {
312+
return discovery.ErrNotSupportedKeyValue
313+
}
314+
315+
func (k *KubernetesConnManager) GetKey(ctx context.Context, key string) ([]byte, error) {
316+
return nil, discovery.ErrNotSupportedKeyValue
317+
}
318+
319+
func (k *KubernetesConnManager) DelData(ctx context.Context, key string) error {
320+
return discovery.ErrNotSupportedKeyValue
321+
}
322+
323+
func (k *KubernetesConnManager) WatchKey(ctx context.Context, key string, fn discovery.WatchKeyHandler) error {
324+
return discovery.ErrNotSupportedKeyValue
325+
}

0 commit comments

Comments
 (0)