Skip to content

Commit 9d9a110

Browse files
Support resource level rule replacement in hotspot module (alibaba#373)
1 parent c1fd2a4 commit 9d9a110

File tree

2 files changed

+419
-90
lines changed

2 files changed

+419
-90
lines changed

core/hotspot/rule_manager.go

+180-85
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var (
3535
tcGenFuncMap = make(map[ControlBehavior]TrafficControllerGenFunc, 4)
3636
tcMap = make(trafficControllerMap)
3737
tcMux = new(sync.RWMutex)
38-
currentRules = make([]*Rule, 0)
38+
currentRules = make(map[string][]*Rule, 0)
3939
updateRuleMux = new(sync.Mutex)
4040
)
4141

@@ -82,27 +82,36 @@ func getTrafficControllersFor(res string) []TrafficShapingController {
8282
return tcMap[res]
8383
}
8484

85-
// LoadRules replaces old rules with the given hotspot parameter flow control rules. Return value:
86-
//
87-
// bool: indicates whether the internal map has been changed;
88-
// error: indicates whether occurs the error.
85+
// LoadRules replaces all old hotspot param flow rules with the given rules.
86+
// Return value:
87+
// bool: indicates whether the internal map has been changed;
88+
// error: indicates whether occurs the error.
8989
func LoadRules(rules []*Rule) (bool, error) {
90+
resRulesMap := make(map[string][]*Rule, 16)
91+
for _, rule := range rules {
92+
resRules, exists := resRulesMap[rule.Resource]
93+
if !exists {
94+
resRules = make([]*Rule, 0, 1)
95+
}
96+
resRulesMap[rule.Resource] = append(resRules, rule)
97+
}
98+
9099
updateRuleMux.Lock()
91100
defer updateRuleMux.Unlock()
92-
isEqual := reflect.DeepEqual(currentRules, rules)
101+
isEqual := reflect.DeepEqual(currentRules, resRulesMap)
93102
if isEqual {
94103
logging.Info("[HotSpot] Load rules is the same with current rules, so ignore load operation.")
95104
return false, nil
96105
}
97106

98-
err := onRuleUpdate(rules)
107+
err := onRuleUpdate(resRulesMap)
99108
return true, err
100109
}
101110

102-
// GetRules returns all the rules based on copy.
103-
// It doesn't take effect for hotspot module if user changes the rule.
111+
// GetRules returns all the hotspot param flow rules based on copy.
112+
// It doesn't take effect for hotspot module if user changes the returned rules.
104113
// GetRules need to compete hotspot module's global lock and the high performance losses of copy,
105-
// reduce or do not call GetRules if possible
114+
// reduce or do not call GetRules if possible.
106115
func GetRules() []Rule {
107116
tcMux.RLock()
108117
rules := rulesFrom(tcMap)
@@ -115,10 +124,10 @@ func GetRules() []Rule {
115124
return ret
116125
}
117126

118-
// GetRulesOfResource returns specific resource's rules based on copy.
119-
// It doesn't take effect for hotspot module if user changes the rule.
127+
// GetRulesOfResource returns specific resource's hotspot parameter flow control rules based on copy.
128+
// It doesn't take effect for hotspot module if user changes the returned rules.
120129
// GetRulesOfResource need to compete hotspot module's global lock and the high performance losses of copy,
121-
// reduce or do not call GetRulesOfResource frequently if possible
130+
// reduce or do not call GetRulesOfResource frequently if possible.
122131
func GetRulesOfResource(res string) []Rule {
123132
tcMux.RLock()
124133
resTcs := tcMap[res]
@@ -131,13 +140,19 @@ func GetRulesOfResource(res string) []Rule {
131140
return ret
132141
}
133142

134-
// ClearRules clears all parameter flow rules.
143+
// ClearRules clears all hotspot param flow rules.
135144
func ClearRules() error {
136145
_, err := LoadRules(nil)
137146
return err
138147
}
139148

140-
func onRuleUpdate(rules []*Rule) (err error) {
149+
// ClearRulesOfResource clears resource level hotspot param flow rules.
150+
func ClearRulesOfResource(res string) error {
151+
_, err := LoadRulesOfResource(res, nil)
152+
return err
153+
}
154+
155+
func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
141156
defer func() {
142157
if r := recover(); r != nil {
143158
var ok bool
@@ -148,24 +163,20 @@ func onRuleUpdate(rules []*Rule) (err error) {
148163
}
149164
}()
150165

151-
newRuleMap := make(map[string][]*Rule, len(rules))
152-
for _, r := range rules {
153-
if err := IsValidRule(r); err != nil {
154-
logging.Warn("[HotSpot onRuleUpdate] Ignoring invalid hotspot rule when loading new rules", "rule", r, "err", err.Error())
155-
continue
166+
// ignore invalid rules
167+
validResRulesMap := make(map[string][]*Rule, len(rawResRulesMap))
168+
for res, rules := range rawResRulesMap {
169+
validResRules := make([]*Rule, 0, len(rules))
170+
for _, rule := range rules {
171+
if err := IsValidRule(rule); err != nil {
172+
logging.Warn("[HotSpot onRuleUpdate] Ignoring invalid hotspot param flow rule when loading new rules", "rule", rule, "err", err.Error())
173+
continue
174+
}
175+
validResRules = append(validResRules, rule)
156176
}
157-
res := r.ResourceName()
158-
ruleSet, ok := newRuleMap[res]
159-
if !ok {
160-
ruleSet = make([]*Rule, 0, 1)
177+
if len(validResRules) > 0 {
178+
validResRulesMap[res] = validResRules
161179
}
162-
ruleSet = append(ruleSet, r)
163-
newRuleMap[res] = ruleSet
164-
}
165-
166-
m := make(trafficControllerMap)
167-
for res, rules := range newRuleMap {
168-
m[res] = make([]TrafficShapingController, 0, len(rules))
169180
}
170181

171182
start := util.CurrentTimeNano()
@@ -179,49 +190,11 @@ func onRuleUpdate(rules []*Rule) (err error) {
179190
}
180191
tcMux.RUnlock()
181192

182-
for res, resRules := range newRuleMap {
183-
emptyTcList := make([]TrafficShapingController, 0, 0)
184-
for _, r := range resRules {
185-
oldResTcs := tcMapClone[res]
186-
if oldResTcs == nil {
187-
oldResTcs = emptyTcList
188-
}
189-
190-
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResTcs)
191-
// there is equivalent rule in old traffic shaping controller slice
192-
if equalIdx >= 0 {
193-
equalOldTC := oldResTcs[equalIdx]
194-
insertTcToTcMap(equalOldTC, res, m)
195-
// remove old tc from old resTcs
196-
tcMapClone[res] = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...)
197-
continue
198-
}
199-
200-
// generate new traffic shaping controller
201-
generator, supported := tcGenFuncMap[r.ControlBehavior]
202-
if !supported {
203-
logging.Warn("[HotSpot onRuleUpdate] Ignoring the frequent param flow rule due to unsupported control behavior", "rule", r)
204-
continue
205-
}
206-
var tc TrafficShapingController
207-
if reuseStatIdx >= 0 {
208-
// generate new traffic shaping controller with reusable statistic metric.
209-
tc = generator(r, oldResTcs[reuseStatIdx].BoundMetric())
210-
} else {
211-
tc = generator(r, nil)
212-
}
213-
if tc == nil {
214-
logging.Debug("[HotSpot onRuleUpdate] Ignoring the frequent param flow rule due to bad generated traffic controller", "rule", r)
215-
continue
216-
}
217-
218-
// remove the reused traffic shaping controller old res tcs
219-
if reuseStatIdx >= 0 {
220-
tcMapClone[res] = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...)
221-
}
222-
insertTcToTcMap(tc, res, m)
223-
}
193+
m := make(trafficControllerMap, len(validResRulesMap))
194+
for res, rules := range validResRulesMap {
195+
m[res] = buildResourceTrafficShapingController(res, rules, tcMapClone[res])
224196
}
197+
225198
for res, tcs := range m {
226199
if len(tcs) > 0 {
227200
// update resource slot chain
@@ -237,14 +210,102 @@ func onRuleUpdate(rules []*Rule) (err error) {
237210

238211
tcMux.Lock()
239212
tcMap = m
240-
currentRules = rules
241213
tcMux.Unlock()
242214

243-
logging.Debug("[HotSpot onRuleUpdate] Time statistic(ns) for updating hotSpot rule", "timeCost", util.CurrentTimeNano()-start)
244-
logRuleUpdate(newRuleMap)
215+
currentRules = rawResRulesMap
216+
217+
logging.Debug("[HotSpot onRuleUpdate] Time statistic(ns) for updating hotspot param flow rules", "timeCost", util.CurrentTimeNano()-start)
218+
logRuleUpdate(validResRulesMap)
219+
return nil
220+
}
221+
222+
func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
223+
defer func() {
224+
if r := recover(); r != nil {
225+
var ok bool
226+
err, ok = r.(error)
227+
if !ok {
228+
err = fmt.Errorf("%v", r)
229+
}
230+
}
231+
}()
232+
233+
validResRules := make([]*Rule, 0, len(rawResRules))
234+
for _, rule := range rawResRules {
235+
if err := IsValidRule(rule); err != nil {
236+
logging.Warn("[HotSpot onResourceRuleUpdate] Ignoring invalid hotspot param flow rule", "rule", rule, "reason", err.Error())
237+
continue
238+
}
239+
validResRules = append(validResRules, rule)
240+
}
241+
242+
start := util.CurrentTimeNano()
243+
oldResTcs := make([]TrafficShapingController, 0, 8)
244+
tcMux.RLock()
245+
oldResTcs = append(oldResTcs, tcMap[res]...)
246+
tcMux.RUnlock()
247+
248+
newResTcs := buildResourceTrafficShapingController(res, validResRules, oldResTcs)
249+
if len(newResTcs) > 0 {
250+
// update resource slot chain
251+
misc.RegisterRuleCheckSlotForResource(res, DefaultSlot)
252+
for _, tc := range newResTcs {
253+
if tc.BoundRule().MetricType == Concurrency {
254+
misc.RegisterStatSlotForResource(res, DefaultConcurrencyStatSlot)
255+
break
256+
}
257+
}
258+
}
259+
260+
tcMux.Lock()
261+
if len(newResTcs) == 0 {
262+
delete(tcMap, res)
263+
} else {
264+
tcMap[res] = newResTcs
265+
}
266+
tcMux.Unlock()
267+
268+
currentRules[res] = rawResRules
269+
270+
logging.Debug("[HotSpot onResourceRuleUpdate] Time statistic(ns) for updating hotspot param flow rules", "timeCost", util.CurrentTimeNano()-start)
271+
logging.Info("[HotSpot] load resource level hotspot param flow rules", "resource", res, "validResRules", validResRules)
245272
return nil
246273
}
247274

275+
// LoadRulesOfResource loads the given resource's hotspot param flow rules to the rule manager,
276+
// while all previous resource's rules will be replaced. The first returned value indicates whether
277+
// do real load operation, if the rules is the same with previous resource's rules, return false.
278+
func LoadRulesOfResource(res string, rules []*Rule) (bool, error) {
279+
if len(res) == 0 {
280+
return false, errors.New("empty resource")
281+
}
282+
283+
updateRuleMux.Lock()
284+
defer updateRuleMux.Unlock()
285+
286+
// clear resource rules
287+
if len(rules) == 0 {
288+
// clear resource's currentRules
289+
delete(currentRules, res)
290+
// clear tcMap
291+
tcMux.Lock()
292+
delete(tcMap, res)
293+
tcMux.Unlock()
294+
logging.Info("[HotSpot] clear resource level hotspot param flow rules", "resource", res)
295+
return true, nil
296+
}
297+
298+
// load resource level rules
299+
isEqual := reflect.DeepEqual(currentRules[res], rules)
300+
if isEqual {
301+
logging.Info("[HotSpot] Load resource level hotspot param flow rules is the same with current resource level rules, so ignore load operation.")
302+
return false, nil
303+
}
304+
305+
err := onResourceRuleUpdate(res, rules)
306+
return true, err
307+
}
308+
248309
func logRuleUpdate(m map[string][]*Rule) {
249310
rules := make([]*Rule, 0, 8)
250311
for _, rs := range m {
@@ -254,9 +315,9 @@ func logRuleUpdate(m map[string][]*Rule) {
254315
rules = append(rules, rs...)
255316
}
256317
if len(rules) == 0 {
257-
logging.Info("[HotspotRuleManager] Hotspot rules were cleared")
318+
logging.Info("[HotspotRuleManager] Hotspot param flow rules were cleared")
258319
} else {
259-
logging.Info("[HotspotRuleManager] Hotspot rules were loaded", "rules", rules)
320+
logging.Info("[HotspotRuleManager] Hotspot param flow rules were loaded", "rules", rules)
260321
}
261322
}
262323

@@ -304,14 +365,48 @@ func calculateReuseIndexFor(r *Rule, oldResTcs []TrafficShapingController) (equa
304365
return equalIdx, reuseStatIdx
305366
}
306367

307-
func insertTcToTcMap(tc TrafficShapingController, res string, m trafficControllerMap) {
308-
tcsOfRes, exists := m[res]
309-
if !exists {
310-
tcsOfRes = make([]TrafficShapingController, 0, 1)
311-
m[res] = append(tcsOfRes, tc)
312-
} else {
313-
m[res] = append(tcsOfRes, tc)
368+
// buildResourceTrafficShapingController builds TrafficShapingController slice from rules. the resource of rules must be equals to res.
369+
func buildResourceTrafficShapingController(res string, resRules []*Rule, oldResTcs []TrafficShapingController) []TrafficShapingController {
370+
newTcsOfRes := make([]TrafficShapingController, 0, len(resRules))
371+
for _, rule := range resRules {
372+
if res != rule.Resource {
373+
logging.Error(errors.Errorf("unmatched resource name, expect: %s, actual: %s", res, rule.Resource), "Unmatched resource name in hotspot.buildResourceTrafficShapingController()", "rule", rule)
374+
continue
375+
}
376+
377+
equalIdx, reuseStatIdx := calculateReuseIndexFor(rule, oldResTcs)
378+
// there is equivalent rule in old traffic shaping controller slice
379+
if equalIdx >= 0 {
380+
equalOldTC := oldResTcs[equalIdx]
381+
newTcsOfRes = append(newTcsOfRes, equalOldTC)
382+
// remove old tc from old resTcs
383+
oldResTcs = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...)
384+
continue
385+
}
386+
387+
// generate new traffic shaping controller
388+
generator, supported := tcGenFuncMap[rule.ControlBehavior]
389+
if !supported {
390+
logging.Warn("[HotSpot buildResourceTrafficShapingController] Ignoring the hotspot param flow rule due to unsupported control behavior", "rule", rule)
391+
continue
392+
}
393+
var tc TrafficShapingController
394+
if reuseStatIdx >= 0 {
395+
// generate new traffic shaping controller with reusable statistic metric.
396+
tc = generator(rule, oldResTcs[reuseStatIdx].BoundMetric())
397+
// remove the reused traffic shaping controller old res tcs
398+
oldResTcs = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...)
399+
} else {
400+
tc = generator(rule, nil)
401+
}
402+
if tc == nil {
403+
logging.Debug("[HotSpot buildResourceTrafficShapingController] Ignoring the hotspot param flow rule due to bad generated traffic controller", "rule", rule)
404+
continue
405+
}
406+
407+
newTcsOfRes = append(newTcsOfRes, tc)
314408
}
409+
return newTcsOfRes
315410
}
316411

317412
func IsValidRule(rule *Rule) error {

0 commit comments

Comments
 (0)