Skip to content

Commit f4933a3

Browse files
committed
Implement window
A window is implemented, where values are inserted and kept in memory for a short time before updating service directory values. Signed-off-by: Elis Lulja <[email protected]>
1 parent 96f69cb commit f4933a3

File tree

3 files changed

+121
-9
lines changed

3 files changed

+121
-9
lines changed

controllers/base.go

+26
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23+
"sync"
2324
"time"
2425

2526
"github.com/CloudNativeSDWAN/cnwan-operator/internal/types"
@@ -33,6 +34,29 @@ import (
3334
"sigs.k8s.io/controller-runtime/pkg/client"
3435
)
3536

37+
type window struct {
38+
values []*windowValue
39+
lock sync.Mutex
40+
}
41+
42+
func (w *window) getHighest() int {
43+
highest := 0
44+
for _, val := range w.values {
45+
if val.totalCount > highest {
46+
highest = val.totalCount
47+
}
48+
}
49+
50+
return highest
51+
}
52+
53+
type windowValue struct {
54+
epsliceName string
55+
epsliceCount int
56+
totalCount int
57+
timer *time.Timer
58+
}
59+
3660
// BaseReconciler is the base controller/reconciler upon which all other
3761
// controllers will be based.
3862
type BaseReconciler struct {
@@ -45,6 +69,7 @@ type BaseReconciler struct {
4569
CountPodKey string
4670

4771
epsliceCounter *counter
72+
srvWindows map[string]*window
4873
}
4974

5075
// NewBaseReconciler returns a new instance of a base reconciler to be used
@@ -64,6 +89,7 @@ func NewBaseReconciler(cli client.Client, scheme *runtime.Scheme, broker *sr.Bro
6489
CurrentNsPolicy: currNsPolicy,
6590
CountPodKey: countPodKey,
6691
epsliceCounter: newCounter(),
92+
srvWindows: map[string]*window{},
6793
}
6894
}
6995

controllers/counters.go

+21
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,27 @@ func (c *counter) getSrvCount(nsName, srvName string) int {
5050
return totalCount
5151
}
5252

53+
func (c *counter) resetCounterTo(nsName, srvName string, data map[string]int) {
54+
c.lock.Lock()
55+
defer c.lock.Unlock()
56+
57+
fullName := ktypes.NamespacedName{Namespace: nsName, Name: srvName}.String()
58+
c.counts[fullName] = data
59+
}
60+
61+
func (c *counter) getSrvData(nsName, srvName string) map[string]int {
62+
c.lock.RLock()
63+
defer c.lock.RUnlock()
64+
65+
fullName := ktypes.NamespacedName{Namespace: nsName, Name: srvName}.String()
66+
val, exists := c.counts[fullName]
67+
if !exists {
68+
return map[string]int{}
69+
}
70+
71+
return val
72+
}
73+
5374
func (c *counter) putSrvCount(nsName, srvName, epSliceName string, count int) {
5475
c.lock.Lock()
5576
defer c.lock.Unlock()

controllers/endpointslice_controller.go

+74-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controllers
1919
import (
2020
"fmt"
2121
"sync"
22+
"time"
2223

2324
"k8s.io/api/discovery/v1beta1"
2425
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
@@ -66,19 +67,83 @@ func (r *EndpointSliceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
6667
return ctrl.Result{}, nil
6768
}
6869

69-
r.epsliceCounter.putSrvCount(req.Namespace, data.srv, req.Name, data.count)
70+
srvname := ktypes.NamespacedName{Namespace: req.Namespace, Name: data.srv}
71+
wind, exists := r.srvWindows[srvname.String()]
72+
if !exists {
73+
r.srvWindows[srvname.String()] = &window{values: []*windowValue{}}
74+
wind = r.srvWindows[srvname.String()]
75+
}
7076

71-
if r.srvRecon == nil {
72-
l.Error(fmt.Errorf("service reconciler is nil"), "could not reconcile service from endpointslice")
73-
return ctrl.Result{}, nil
77+
wind.lock.Lock()
78+
defer wind.lock.Unlock()
79+
80+
countBeforeUpd := r.epsliceCounter.getSrvCount(srvname.Namespace, srvname.Name)
81+
r.epsliceCounter.putSrvCount(req.Namespace, srvname.Name, req.Name, data.count)
82+
newCount := r.epsliceCounter.getSrvCount(srvname.Namespace, srvname.Name)
83+
84+
if len(wind.values) > 0 {
85+
if newCount > wind.getHighest() {
86+
l.Info("new count detected and is the highest in the window, updating service registry...", "highest", wind.getHighest(), "new-count", newCount)
87+
r.srvRecon.cacheSrvWatch[srvname.String()] = true
88+
r.srvRecon.Reconcile(ctrl.Request{NamespacedName: srvname})
89+
} else {
90+
l.Info("new count detected, but not highest in window: performing cooldown...", "highest", wind.getHighest(), "new-count", newCount)
91+
}
92+
} else {
93+
if newCount > countBeforeUpd {
94+
l.Info("new count detected and window is empty, updating service registry...", "old-val", countBeforeUpd, "new-val", newCount)
95+
r.srvRecon.cacheSrvWatch[srvname.String()] = true
96+
r.srvRecon.Reconcile(ctrl.Request{NamespacedName: srvname})
97+
} else {
98+
l.Info("new count detected, but not higher than current value, performing cooldown...", "old-val", countBeforeUpd, "new-val", newCount)
99+
}
74100
}
75101

76-
srvname := ktypes.NamespacedName{Namespace: req.Namespace, Name: data.srv}
77-
r.srvRecon.lock.Lock()
78-
r.srvRecon.cacheSrvWatch[srvname.String()] = true
79-
r.srvRecon.lock.Unlock()
102+
wind.values = append(wind.values, &windowValue{
103+
epsliceName: req.Name,
104+
epsliceCount: data.count,
105+
totalCount: newCount,
106+
timer: time.AfterFunc(time.Minute, func() {
107+
r.exitWindow(srvname)
108+
}),
109+
})
110+
111+
return ctrl.Result{}, nil
112+
}
113+
114+
func (r *EndpointSliceReconciler) exitWindow(srv ktypes.NamespacedName) {
115+
l := r.Log.WithName("EndpointSliceReconciler").WithValues("Window", srv)
116+
117+
wind := r.srvWindows[srv.String()]
118+
wind.lock.Lock()
119+
defer wind.lock.Unlock()
120+
121+
if len(wind.values) == 0 {
122+
l.V(2).Info("window is empty, returning...")
123+
return
124+
}
125+
126+
oldestVal := wind.values[0]
127+
oldestVal.timer.Stop()
128+
defer func() {
129+
wind.values = wind.values[1:]
130+
}()
131+
132+
highestVal := r.epsliceCounter.getSrvCount(srv.Namespace, srv.Name)
133+
for i := 1; i < len(wind.values); i++ {
134+
if wind.values[i].totalCount > highestVal {
135+
highestVal = wind.values[i].totalCount
136+
}
137+
}
138+
139+
if oldestVal.totalCount <= highestVal && len(wind.values) > 1 {
140+
l.Info("highest value isn't changed, returning...", "exiting-value", oldestVal.totalCount, "highest", highestVal)
141+
return
142+
}
80143

81-
return r.srvRecon.Reconcile(ctrl.Request{NamespacedName: srvname})
144+
l.Info("updating service registry...", "highest-value", highestVal)
145+
r.srvRecon.cacheSrvWatch[srv.String()] = true
146+
r.srvRecon.Reconcile(ctrl.Request{NamespacedName: srv})
82147
}
83148

84149
// SetServiceReconciler sets the service reconciler, so that the endpointslice

0 commit comments

Comments
 (0)