Skip to content

Commit 43ea5e5

Browse files
committed
support unsubscribe
Signed-off-by: Jiangnan Jia <[email protected]>
1 parent c8e75a4 commit 43ea5e5

File tree

5 files changed

+246
-2
lines changed

5 files changed

+246
-2
lines changed

control_plane.go

+165-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package opensergo
1717
import (
1818
"log"
1919
"os"
20+
"reflect"
2021
"sync"
22+
"time"
2123

2224
"github.com/opensergo/opensergo-control-plane/pkg/controller"
2325
"github.com/opensergo/opensergo-control-plane/pkg/model"
@@ -43,7 +45,11 @@ func NewControlPlane() (*ControlPlane, error) {
4345
return nil, err
4446
}
4547

46-
cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
48+
handlers := []model.SubscribeRequestHandler{
49+
cp.handleSubscribeRequest,
50+
cp.handleUnSubscribeRequest,
51+
}
52+
cp.server = transport.NewServer(uint32(10246), handlers)
4753
cp.operator = operator
4854

4955
hostname, herr := os.Hostname()
@@ -62,6 +68,10 @@ func (c *ControlPlane) Start() error {
6268
if err != nil {
6369
return err
6470
}
71+
72+
// start the delete-connection goroutine
73+
c.delConn()
74+
6575
// Run the transport server
6676
err = c.server.Run()
6777
if err != nil {
@@ -106,7 +116,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
106116
})
107117
}
108118

119+
func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error {
120+
if stream == nil {
121+
return nil
122+
}
123+
return stream.SendMsg(&trpb.SubscribeResponse{
124+
Status: status,
125+
Ack: ack,
126+
ControlPlane: c.protoDesc,
127+
ResponseId: respId,
128+
})
129+
}
130+
109131
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
132+
133+
if trpb.SubscribeOpType_SUBSCRIBE != request.OpType {
134+
return nil
135+
}
136+
110137
//var labels []model.LabelKV
111138
//if request.Target.Labels != nil {
112139
// for _, label := range request.Target.Labels {
@@ -160,3 +187,140 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
160187
}
161188
return nil
162189
}
190+
191+
// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK.
192+
//
193+
// 1.remove cache of SubscribeTarget in Connection
194+
// 2.remove watcher if there is no SubscribeTarget for the same kind in Connection cache.
195+
func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
196+
197+
if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
198+
return nil
199+
}
200+
201+
for _, kind := range request.Target.Kinds {
202+
namespacedApp := model.NamespacedApp{
203+
Namespace: request.Target.Namespace,
204+
App: request.Target.App,
205+
}
206+
// remove the relation of Connection and SubscribeTarget from local cache
207+
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)
208+
209+
if err != nil {
210+
status := &trpb.Status{
211+
// TODO: defined a new errorCode
212+
Code: transport.RegisterWatcherError,
213+
Message: "Remove from watcher error",
214+
Details: nil,
215+
}
216+
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
217+
if err != nil {
218+
// TODO: log here
219+
}
220+
continue
221+
}
222+
223+
// handle the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. only push into a chan named delSubscribeConnChan waiting for delete.
224+
// 1. if there is no kind cached in current Connection, push subscribeConnInfo with an empty NamespaceApp into delSubscribeConnChan,
225+
// then, will delete the watcher for CRD.
226+
// 2. if the number of relation between Connection and SubscribeTarget < 1, then push subscribeConnInfo with current NamespaceApp into delSubscribeConnChan,
227+
// then 1st, will delete the SubscribeTarget which is cached in current Connection
228+
// 2nd, will delte the watcher for CRD if there's no kind cached in current Connection
229+
existConnection := c.server.ConnectionManager().ExistConnection(namespacedApp.Namespace, namespacedApp.App, kind)
230+
if !existConnection {
231+
delSubscribeConnChan <- delSubscribeConnInfo{
232+
stream: stream,
233+
request: request,
234+
namespaceApp: model.NamespacedApp{},
235+
kind: kind,
236+
}
237+
} else {
238+
targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind)
239+
if len(targetConnections) < 1 {
240+
delSubscribeConnChan <- delSubscribeConnInfo{
241+
stream: stream,
242+
request: request,
243+
namespaceApp: namespacedApp,
244+
kind: kind,
245+
}
246+
}
247+
}
248+
}
249+
250+
return nil
251+
}
252+
253+
// delSubscribeConnChan a chan for delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher to stop watching.
254+
var delSubscribeConnChan chan delSubscribeConnInfo
255+
256+
type delSubscribeConnInfo struct {
257+
stream model.OpenSergoTransportStream
258+
request *trpb.SubscribeRequest
259+
namespaceApp model.NamespacedApp
260+
kind string
261+
}
262+
263+
// delConn a goroutine contains the logic of delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher.
264+
//
265+
// 1. at the beginning of current goroutine, should wait for the status of server is started.
266+
//
267+
// 2. when receive a delSubscribeConnInfo from delSubscribeConnChan, waiting a silence time to prevent inaccurate data statistics caused by network jitter.
268+
//
269+
// after the silence time, is the actually logic of deleting local cache and removing watcher.
270+
func (c *ControlPlane) delConn() {
271+
go func() {
272+
// waiting for server is started.
273+
for !c.server.IsStarted() {
274+
time.Sleep(time.Duration(1) * time.Second)
275+
}
276+
277+
// receive from delSubscribeConnChan
278+
currDelConnInfo := <-delSubscribeConnChan
279+
namespaceApp := currDelConnInfo.namespaceApp
280+
kind := currDelConnInfo.kind
281+
request := currDelConnInfo.request
282+
stream := currDelConnInfo.stream
283+
284+
// wait a silence for network jitter
285+
// TODO make time of sleep is configurable
286+
time.Sleep(time.Duration(5) * time.Second)
287+
var err error
288+
289+
// RemoveSubscribeTarget from CRDWatcher
290+
// if namespaceApp is not an empty struct, means that need to delete SubscribeTarget cache in CRDWatcher
291+
if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) {
292+
// re-count the number of SubscribeTarget
293+
targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind)
294+
if len(targetConnections) < 1 {
295+
if crdWatcher, existed := c.operator.GetWatcher(kind); existed {
296+
err = crdWatcher.RemoveSubscribeTarget(model.SubscribeTarget{
297+
Namespace: namespaceApp.Namespace,
298+
AppName: namespaceApp.App,
299+
Kind: kind,
300+
})
301+
}
302+
}
303+
}
304+
305+
// remove the CRDWatch from KubernetesOperator, to stop watching the kind.
306+
existConnection := c.server.ConnectionManager().ExistConnection(namespaceApp.Namespace, namespaceApp.App, kind)
307+
if !existConnection {
308+
c.operator.RemoveWatcher(model.SubscribeTarget{
309+
Namespace: namespaceApp.Namespace,
310+
AppName: namespaceApp.App,
311+
Kind: kind,
312+
})
313+
}
314+
315+
// send ackMessage for UnSubscribeConfig request.
316+
status := &trpb.Status{
317+
Code: transport.Success,
318+
Message: "unSubscribe success",
319+
Details: nil,
320+
}
321+
err = c.sendAckToStream(stream, transport.ACKFlag, status, request.RequestId)
322+
if err != nil {
323+
// TODO: log here
324+
}
325+
}()
326+
}

pkg/controller/crd_watcher.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package controller
1616

1717
import (
1818
"context"
19+
"go.uber.org/atomic"
1920
"log"
2021
"net/http"
2122
"strconv"
@@ -56,6 +57,7 @@ type CRDWatcher struct {
5657
crdGenerator func() client.Object
5758
sendDataHandler model.DataEntirePushHandler
5859

60+
deleted *atomic.Bool
5961
updateMux sync.RWMutex
6062
}
6163

@@ -97,7 +99,24 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error {
9799
}
98100

99101
func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
100-
// TODO: implement me
102+
// TODO: validate the target
103+
if target.Kind != r.kind {
104+
return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind)
105+
}
106+
r.updateMux.Lock()
107+
defer r.updateMux.Unlock()
108+
109+
// remove the subscribe-cache in this CRDWatcher
110+
delete(r.subscribedList, target)
111+
delete(r.subscribedNamespaces, target.Namespace)
112+
delete(r.subscribedApps, target.NamespacedApp())
113+
114+
// delete the matched crdCache which comes from k8s
115+
// TODO the 2nd param need fix to correct.
116+
r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
117+
Namespace: target.Namespace,
118+
App: target.AppName,
119+
}, "")
101120

102121
return nil
103122
}
@@ -119,6 +138,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app model.NamespacedApp) bool {
119138
}
120139

121140
func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
141+
// TODO optimize the logic of destroy the current controller
142+
// r.deleted.Load() is a flag marked the deleted status of controller
143+
// can not destroy the current controller
144+
//
145+
if r.deleted.Load() {
146+
return ctrl.Result{}, nil
147+
}
148+
122149
if !r.HasAnySubscribedOfNamespace(req.Namespace) {
123150
// Ignore unmatched namespace
124151
return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil
@@ -216,9 +243,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) {
216243
}
217244

218245
func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error {
246+
// TODO optimized delete logic here
247+
r.deleted.Store(false)
219248
return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r)
220249
}
221250

251+
func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error {
252+
// TODO optimized delete logic here
253+
r.deleted.Store(true)
254+
return nil
255+
}
256+
222257
func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) {
223258
var packRule *anypb.Any
224259
var err error
@@ -338,6 +373,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
338373
subscribedNamespaces: make(map[string]bool),
339374
subscribedApps: make(map[model.NamespacedApp]bool),
340375
crdGenerator: crdGenerator,
376+
deleted: atomic.NewBool(false),
341377
crdCache: NewCRDCache(kind),
342378
sendDataHandler: sendDataHandler,
343379
}

pkg/controller/k8s_operator.go

+20
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
129129

130130
existingWatcher, exists := k.controllers[target.Kind]
131131
if exists {
132+
// TODO optimized delete logic here
133+
existingWatcher.deleted.Store(false)
132134
if existingWatcher.HasSubscribed(target) {
133135
// Target has been subscribed
134136
return existingWatcher, nil
@@ -168,6 +170,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
168170

169171
existingWatcher, exists := k.controllers[target.Kind]
170172
if exists && !existingWatcher.HasSubscribed(target) {
173+
// TODO optimized delete logic here
174+
existingWatcher.deleted.Store(false)
171175
// TODO: think more about here
172176
err = existingWatcher.AddSubscribeTarget(target)
173177
if err != nil {
@@ -200,6 +204,22 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
200204
return nil
201205
}
202206

207+
func (k *KubernetesOperator) RemoveWatcher(target model.SubscribeTarget) error {
208+
k.controllerMux.Lock()
209+
defer k.controllerMux.Unlock()
210+
211+
crdWatch, exists := k.controllers[target.Kind]
212+
if exists {
213+
err := crdWatch.ShutdownWithManager(k.crdManager)
214+
if err != nil {
215+
// TODO add log
216+
}
217+
delete(k.controllers, target.Kind)
218+
}
219+
220+
return nil
221+
}
222+
203223
// Close exit the K8S KubernetesOperator
204224
func (k *KubernetesOperator) Close() error {
205225
k.ctxCancel()

pkg/transport/grpc/connection.go

+20
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,26 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
154154
return nil
155155
}
156156

157+
func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error {
158+
c.updateMux.Lock()
159+
defer c.updateMux.Unlock()
160+
161+
err := c.removeInternal(namespacedApp, kind, identifier)
162+
if err != nil {
163+
return err
164+
}
165+
166+
return nil
167+
}
168+
169+
func (c *ConnectionManager) ExistConnection(namespace, app, kind string) bool {
170+
connections, success := c.Get(namespace, app, kind)
171+
if success && len(connections) > 0 {
172+
return true
173+
}
174+
return false
175+
}
176+
157177
func NewConnectionManager() *ConnectionManager {
158178
return &ConnectionManager{
159179
connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap),

pkg/transport/grpc/server.go

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ func (s *Server) ComponentName() string {
6161
return "OpenSergoUniversalTransportServer"
6262
}
6363

64+
func (s *Server) IsStarted() bool {
65+
return s.started.Load()
66+
}
67+
6468
func (s *Server) Run() error {
6569
if s.started.CAS(false, true) {
6670
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))

0 commit comments

Comments
 (0)