Skip to content

Commit 263a15d

Browse files
committed
support unsubscribe. (#7)
Signed-off-by: Jiangnan Jia <[email protected]>
1 parent e2612c2 commit 263a15d

File tree

5 files changed

+218
-2
lines changed

5 files changed

+218
-2
lines changed

control_plane.go

+138-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package opensergo
1616

1717
import (
1818
"os"
19+
"reflect"
1920
"sync"
21+
"time"
2022

2123
"github.com/opensergo/opensergo-control-plane/pkg/controller"
2224
"github.com/opensergo/opensergo-control-plane/pkg/model"
@@ -42,7 +44,8 @@ func NewControlPlane() (*ControlPlane, error) {
4244
return nil, err
4345
}
4446

45-
cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
47+
handlers := []model.SubscribeRequestHandler{cp.handleSubscribeRequest, cp.handleUnSubscribeRequest}
48+
cp.server = transport.NewServer(uint32(10246), handlers)
4649
cp.operator = operator
4750

4851
hostname, herr := os.Hostname()
@@ -61,6 +64,10 @@ func (c *ControlPlane) Start() error {
6164
if err != nil {
6265
return err
6366
}
67+
68+
// start the delete-connection goroutine
69+
c.delConn()
70+
6471
// Run the transport server
6572
err = c.server.Run()
6673
if err != nil {
@@ -105,7 +112,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
105112
})
106113
}
107114

115+
func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error {
116+
if stream == nil {
117+
return nil
118+
}
119+
return stream.SendMsg(&trpb.SubscribeResponse{
120+
Status: status,
121+
Ack: ack,
122+
ControlPlane: c.protoDesc,
123+
ResponseId: respId,
124+
})
125+
}
126+
108127
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
128+
129+
if trpb.SubscribeOpType_SUBSCRIBE != request.OpType {
130+
return nil
131+
}
132+
109133
//var labels []model.LabelKV
110134
//if request.Target.Labels != nil {
111135
// for _, label := range request.Target.Labels {
@@ -157,3 +181,116 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
157181
}
158182
return nil
159183
}
184+
185+
func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
186+
187+
if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
188+
return nil
189+
}
190+
191+
for _, kind := range request.Target.Kinds {
192+
namespacedApp := model.NamespacedApp{
193+
Namespace: request.Target.Namespace,
194+
App: request.Target.App,
195+
}
196+
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)
197+
198+
if err != nil {
199+
status := &trpb.Status{
200+
// TODO: defined a new errorCode
201+
Code: transport.RegisterWatcherError,
202+
Message: "Remove from watcher error",
203+
Details: nil,
204+
}
205+
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
206+
if err != nil {
207+
// TODO: log here
208+
}
209+
continue
210+
}
211+
212+
targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind)
213+
if len(targetConnections) < 1 {
214+
delSubscribeConnChan <- delSubscribeConnInfo{
215+
stream: stream,
216+
request: request,
217+
namespaceApp: namespacedApp,
218+
kind: kind,
219+
}
220+
} else {
221+
existConnection := c.server.ConnectionManager().ExistConnection(kind)
222+
if !existConnection {
223+
delSubscribeConnChan <- delSubscribeConnInfo{
224+
stream: stream,
225+
request: request,
226+
namespaceApp: model.NamespacedApp{},
227+
kind: kind,
228+
}
229+
}
230+
}
231+
}
232+
233+
return nil
234+
}
235+
236+
var delSubscribeConnChan chan delSubscribeConnInfo
237+
238+
type delSubscribeConnInfo struct {
239+
stream model.OpenSergoTransportStream
240+
request *trpb.SubscribeRequest
241+
namespaceApp model.NamespacedApp
242+
kind string
243+
}
244+
245+
func (c *ControlPlane) delConn() {
246+
go func() {
247+
for !c.server.IsStarted() {
248+
time.Sleep(time.Duration(1) * time.Second)
249+
}
250+
251+
currDelConnInfo := <-delSubscribeConnChan
252+
namespaceApp := currDelConnInfo.namespaceApp
253+
kind := currDelConnInfo.kind
254+
request := currDelConnInfo.request
255+
stream := currDelConnInfo.stream
256+
257+
// TODO make time of sleep is configurable
258+
time.Sleep(time.Duration(5) * time.Second)
259+
var err error
260+
261+
// RemoveSubscribeTarget from CRDWatcher
262+
if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) {
263+
targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind)
264+
if len(targetConnections) < 1 {
265+
if crdWatcher, existed := c.operator.GetWatcher(kind); existed {
266+
err = crdWatcher.RemoveSubscribeTarget(model.SubscribeTarget{
267+
Namespace: namespaceApp.Namespace,
268+
AppName: namespaceApp.App,
269+
Kind: kind,
270+
})
271+
}
272+
}
273+
}
274+
275+
// delete Connection and CRDWatch
276+
existConnection := c.server.ConnectionManager().ExistConnection(kind)
277+
if !existConnection {
278+
c.operator.RemoveWatcher(model.SubscribeTarget{
279+
Namespace: namespaceApp.Namespace,
280+
AppName: namespaceApp.App,
281+
Kind: kind,
282+
})
283+
}
284+
285+
// send ackMessage
286+
status := &trpb.Status{
287+
Code: transport.Success,
288+
Message: "unSubscribe success",
289+
Details: nil,
290+
}
291+
err = c.sendAckToStream(stream, transport.ACKFlag, status, request.RequestId)
292+
if err != nil {
293+
// TODO: log here
294+
}
295+
}()
296+
}

pkg/controller/crd_watcher.go

+35-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package controller
1616

1717
import (
1818
"context"
19+
"go.uber.org/atomic"
1920
"log"
2021
"net/http"
2122
"strconv"
@@ -55,6 +56,7 @@ type CRDWatcher struct {
5556
crdGenerator func() client.Object
5657
sendDataHandler model.DataEntirePushHandler
5758

59+
deleted *atomic.Bool
5860
updateMux sync.RWMutex
5961
}
6062

@@ -96,7 +98,22 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error {
9698
}
9799

98100
func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
99-
// TODO: implement me
101+
// TODO: validate the target
102+
if target.Kind != r.kind {
103+
return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind)
104+
}
105+
r.updateMux.Lock()
106+
defer r.updateMux.Unlock()
107+
108+
delete(r.subscribedList, target)
109+
delete(r.subscribedNamespaces, target.Namespace)
110+
delete(r.subscribedApps, target.AppName)
111+
112+
// TODO the 2nd param need fix to correct.
113+
r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
114+
Namespace: target.Namespace,
115+
App: target.AppName,
116+
}, "")
100117

101118
return nil
102119
}
@@ -118,6 +135,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app string) bool {
118135
}
119136

120137
func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
138+
// TODO optimize the logic of destroy the current controller
139+
// r.deleted.Load() is a flag marked the deleted status of controller
140+
// can not destroy the current controller
141+
//
142+
if r.deleted.Load() {
143+
return ctrl.Result{}, nil
144+
}
145+
121146
if r.HasAnySubscribedOfApp(req.Namespace) {
122147
// Ignore unmatched namespace
123148
return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil
@@ -217,9 +242,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) {
217242
}
218243

219244
func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error {
245+
// TODO optimized delete logic here
246+
r.deleted.Store(false)
220247
return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r)
221248
}
222249

250+
func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error {
251+
// TODO optimized delete logic here
252+
r.deleted.Store(true)
253+
return nil
254+
}
255+
223256
func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) {
224257
var packRule *anypb.Any
225258
var err error
@@ -336,6 +369,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
336369
subscribedNamespaces: make(map[string]bool),
337370
subscribedApps: make(map[string]bool),
338371
crdGenerator: crdGenerator,
372+
deleted: atomic.NewBool(false),
339373
crdCache: NewCRDCache(kind),
340374
sendDataHandler: sendDataHandler,
341375
}

pkg/controller/k8s_operator.go

+20
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
125125

126126
existingWatcher, exists := k.controllers[target.Kind]
127127
if exists {
128+
// TODO optimized delete logic here
129+
existingWatcher.deleted.Store(false)
128130
if existingWatcher.HasSubscribed(target) {
129131
// Target has been subscribed
130132
return existingWatcher, nil
@@ -164,6 +166,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
164166

165167
existingWatcher, exists := k.controllers[target.Kind]
166168
if exists && !existingWatcher.HasSubscribed(target) {
169+
// TODO optimized delete logic here
170+
existingWatcher.deleted.Store(false)
167171
// TODO: think more about here
168172
err = existingWatcher.AddSubscribeTarget(target)
169173
if err != nil {
@@ -196,6 +200,22 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
196200
return nil
197201
}
198202

203+
func (k *KubernetesOperator) RemoveWatcher(target model.SubscribeTarget) error {
204+
k.controllerMux.Lock()
205+
defer k.controllerMux.Unlock()
206+
207+
crdWatch, exists := k.controllers[target.Kind]
208+
if exists {
209+
err := crdWatch.ShutdownWithManager(k.crdManager)
210+
if err != nil {
211+
// TODO add log
212+
}
213+
delete(k.controllers, target.Kind)
214+
}
215+
216+
return nil
217+
}
218+
199219
// Close exit the K8S KubernetesOperator
200220
func (k *KubernetesOperator) Close() error {
201221
k.ctxCancel()

pkg/transport/grpc/connection.go

+21
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,27 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
154154
return nil
155155
}
156156

157+
func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error {
158+
c.updateMux.Lock()
159+
defer c.updateMux.Unlock()
160+
161+
err := c.removeInternal(namespacedApp, kind, identifier)
162+
if err != nil {
163+
return err
164+
}
165+
166+
return nil
167+
}
168+
169+
func (c *ConnectionManager) ExistConnection(kind string) bool {
170+
for _, kindConnections := range c.connectionMap {
171+
connectionMap := kindConnections[kind]
172+
return len(connectionMap) > 0
173+
}
174+
175+
return false
176+
}
177+
157178
func NewConnectionManager() *ConnectionManager {
158179
return &ConnectionManager{
159180
connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap),

pkg/transport/grpc/server.go

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ func (s *Server) ComponentName() string {
6060
return "OpenSergoUniversalTransportServer"
6161
}
6262

63+
func (s *Server) IsStarted() bool {
64+
return s.started.Load()
65+
}
66+
6367
func (s *Server) Run() error {
6468
if s.started.CAS(false, true) {
6569
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))

0 commit comments

Comments
 (0)