Skip to content

Commit f0bd290

Browse files
authored
Add exclusion rules to filter unwanted events (#259)
* Add exclusion rules to filter unwanted events * Add json logic error handling * Fix format/imports with goimport * Rework tests to cater for out of order events Looks like the ordering for events received from the channel is non-deterministic. I had changed the tests in this PR to expect events in an exact order, which passed reliably on my local machine but failed when run by the GitHub Actions workflow. In this commit I've reverted the existing big picture test (the one with no exclusion rules) to not check the payload, and modified the new test (including an exclusion rule) to: a) wait 1 second for all events to be received on the channel b) verify that the excluded event is not received regardless of order c) verify that the expected number of events is received * Fix format
1 parent fc7dbd3 commit f0bd290

File tree

9 files changed

+254
-5
lines changed

9 files changed

+254
-5
lines changed

README.md

+58
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,64 @@ Open your browser to http://localhost:9090.
165165

166166
An example of a useful query is [rate(kubewatch_event_count[5m])](<http://localhost:9090/graph?g0.range_input=1h&g0.expr=rate(kubewatch_event_count%5B1m%5D)&g0.tab=0>)
167167

168+
## Event filtering
169+
170+
Events can be excluded from Sloop by adding `exclusionRules` to the config file:
171+
172+
```
173+
{
174+
"defaultNamespace": "default",
175+
"defaultKind": "Pod",
176+
"defaultLookback": "1h",
177+
[...]
178+
"exclusionRules": {
179+
"_all": [
180+
{"==": [ { "var": "metadata.namespace" }, "kube-system" ]}
181+
],
182+
"Pod": [
183+
{"==": [ { "var": "metadata.name" }, "sloop-0" ]}
184+
],
185+
"Job": [
186+
{"in": [ { "var": "metadata.name" }, [ "cron1", "cron3" ] ]}
187+
]
188+
}
189+
}`
190+
191+
```
192+
193+
Adding rules can help to reduce resources consumed by Sloop and remove unwanted noise from the UI for events that are of no interest.
194+
195+
### Limiting rules to specific kinds
196+
197+
* Rules under the special key `_all` are evaluated against events for objects of any kind
198+
* Rules under any other key are evaluated only against objects whose kind matches the key, e.g. `Pod` only applies to pods, `Job` only applies to jobs etc.
199+
200+
### Rule format and supported operations
201+
202+
Rules should follow the [JsonLogic](https://jsonlogic.com) format and are evaluated against the json representation of the Kubernetes API object related to the event (see below).
203+
204+
Available operators, such as `==` and `in` shown above, are documented [here](https://jsonlogic.com/operations.html).
205+
206+
### Data available to rule logic
207+
208+
Kubernetes API conventions for [objects](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#objects) require the following keys to exist in the json data for all resources, all of which can be referenced in rules:
209+
210+
* `metadata`
211+
* `spec`
212+
* `status`
213+
214+
Some commonly useful fields under the `metadata` [object](https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#ObjectMeta) are:
215+
216+
* `name`
217+
* `namespace`
218+
* `labels`
219+
220+
#### Type specific data
221+
222+
Some resources contain additional type-specific fields, for example `PersistentVolumeClaimSpec` objects have fields named `selector` and `storageClassName`.
223+
224+
Type specific fields for each object and their corresponding keys in the object json representation are documented in the [core API](https://pkg.go.dev/k8s.io/[email protected]/core/v1), e.g. for `PersistentVolumeClaimSpec` objects the documentation is [here](https://pkg.go.dev/k8s.io/[email protected]/core/v1#PersistentVolumeClaimSpec).
225+
168226
## Contributing
169227

170228
Refer to [CONTRIBUTING.md](CONTRIBUTING.md)<br>

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.19
55
require (
66
github.com/Jeffail/gabs/v2 v2.2.0
77
github.com/dgraph-io/badger/v2 v2.0.3
8+
github.com/diegoholiveira/jsonlogic/v3 v3.2.7
89
github.com/ghodss/yaml v1.0.0
910
github.com/golang/glog v1.0.0
1011
github.com/golang/protobuf v1.5.2
@@ -46,6 +47,8 @@ require (
4647
github.com/json-iterator/go v1.1.12 // indirect
4748
github.com/mailru/easyjson v0.7.6 // indirect
4849
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
50+
github.com/mitchellh/copystructure v1.0.0 // indirect
51+
github.com/mitchellh/reflectwalk v1.0.0 // indirect
4952
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
5053
github.com/modern-go/reflect2 v1.0.2 // indirect
5154
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect

go.sum

+6
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d
7777
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
7878
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
7979
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
80+
github.com/diegoholiveira/jsonlogic/v3 v3.2.7 h1:awX07pFPnlntZzRNBcO4a2Ivxa77NMt+narq/6xcS0E=
81+
github.com/diegoholiveira/jsonlogic/v3 v3.2.7/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
8082
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
8183
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
8284
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -220,8 +222,12 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
220222
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
221223
github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM=
222224
github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
225+
github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ=
226+
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
223227
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
224228
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
229+
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
230+
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
225231
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
226232
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
227233
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=

pkg/sloop/common/utilities.go

+24
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,27 @@ func Contains(stringList []string, elem string) bool {
3737
func GetFilePath(filePath string, fileName string) string {
3838
return path.Join(filePath, fileName)
3939
}
40+
41+
func Max(x int, y int) int {
42+
if x < y {
43+
return y
44+
}
45+
return x
46+
}
47+
48+
func Truncate(text string, width int, delimiter ...string) (string, error) {
49+
d := "..."
50+
if len(delimiter) > 0 {
51+
d = delimiter[0]
52+
}
53+
d_len := len(d)
54+
if width < 0 {
55+
return "", fmt.Errorf("invalid width")
56+
}
57+
if len(text) <= width {
58+
return text, nil
59+
}
60+
r := []rune(text)
61+
truncated := r[:(Max(width, d_len) - d_len)]
62+
return string(truncated) + d, nil
63+
}

pkg/sloop/common/utilities_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,31 @@ func Test_GetFilePath(t *testing.T) {
6060
actualOutput := GetFilePath(filePrefix, fileName)
6161
assert.Equal(t, expectedOutput, actualOutput)
6262
}
63+
64+
func Test_Truncate_StringLongerThanWidth(t *testing.T) {
65+
stringLong := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis felis laoreet dictum."
66+
expectedOutput := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis..."
67+
actualOutput, _ := Truncate(stringLong, 80)
68+
assert.Equal(t, expectedOutput, actualOutput)
69+
}
70+
71+
func Test_Truncate_StringShorterThanWidth(t *testing.T) {
72+
stringMedium := "Lorem ipsum dolor"
73+
expectedOutput := "Lorem ipsum dolor"
74+
actualOutput, _ := Truncate(stringMedium, 80)
75+
assert.Equal(t, expectedOutput, actualOutput)
76+
}
77+
78+
func Test_Truncate_WidthShorterThanDelimiter(t *testing.T) {
79+
stringShort := "Lorem"
80+
expectedOutput := "..."
81+
actualOutput, _ := Truncate(stringShort, 1)
82+
assert.Equal(t, expectedOutput, actualOutput)
83+
}
84+
85+
func Test_Truncate_StringEmpty(t *testing.T) {
86+
stringEmpty := ""
87+
expectedOutput := ""
88+
actualOutput, _ := Truncate(stringEmpty, 1)
89+
assert.Equal(t, expectedOutput, actualOutput)
90+
}

pkg/sloop/ingress/kubewatcher.go

+53-1
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
package ingress
99

1010
import (
11+
"bytes"
1112
"context"
1213
"encoding/json"
1314
"fmt"
15+
"reflect"
16+
"strings"
1417
"sync"
1518
"sync/atomic"
1619
"time"
1720

21+
"github.com/diegoholiveira/jsonlogic/v3"
1822
"github.com/golang/glog"
1923
"github.com/golang/protobuf/ptypes"
2024
"github.com/pkg/errors"
@@ -67,6 +71,7 @@ type kubeWatcherImpl struct {
6771
stopped bool
6872
refreshCrd *time.Ticker
6973
currentContext string
74+
exclusionRules map[string][]any
7075
}
7176

7277
var (
@@ -79,11 +84,12 @@ var (
7984
)
8085

8186
// Todo: Add additional parameters for filtering
82-
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool) (KubeWatcher, error) {
87+
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool, exclusionRules map[string][]any) (KubeWatcher, error) {
8388
kw := &kubeWatcherImpl{resync: resync, protection: &sync.Mutex{}}
8489
kw.stopChan = make(chan struct{})
8590
kw.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo)
8691
kw.outchan = outChan
92+
kw.exclusionRules = exclusionRules
8793

8894
kw.startWellKnownInformers(kubeClient, enableGranularMetrics)
8995
if includeCrds {
@@ -304,6 +310,13 @@ func (i *kubeWatcherImpl) processUpdate(kind string, obj interface{}, watchResul
304310
}
305311
glog.V(99).Infof("processUpdate: obj json: %v", resourceJson)
306312

313+
eventExcluded := i.eventExcluded(kind, resourceJson)
314+
if eventExcluded {
315+
objName := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Name")
316+
glog.V(2).Infof("Event for object excluded: %s/%s", kind, objName)
317+
return
318+
}
319+
307320
kubeMetadata, err := kubeextractor.ExtractMetadata(resourceJson)
308321
if err != nil || kubeMetadata.Namespace == "" {
309322
// We are only grabbing namespace here for a prometheus metric, so if metadata extract fails we just log and continue
@@ -360,6 +373,45 @@ func (i *kubeWatcherImpl) refreshCrdInformers(masterURL string, kubeContext stri
360373
}
361374
}
362375

376+
func (i *kubeWatcherImpl) getExclusionRules(kind string) []any {
377+
kindRules, _ := i.exclusionRules[kind]
378+
globalRules, _ := i.exclusionRules["_all"]
379+
combinedRules := append(
380+
kindRules,
381+
globalRules...,
382+
)
383+
glog.V(common.GlogVerbose).Infof("Fetched rules: %s", combinedRules)
384+
return combinedRules
385+
}
386+
387+
func (i *kubeWatcherImpl) eventExcluded(kind string, resourceJson string) bool {
388+
filters := i.getExclusionRules(kind)
389+
for _, logic := range filters {
390+
logicJson, err := json.Marshal(logic)
391+
if err != nil {
392+
glog.Errorf(`Failed to parse event filtering rule "%s": %s`, string(logicJson), err)
393+
return false
394+
}
395+
var result bytes.Buffer
396+
err = jsonlogic.Apply(
397+
strings.NewReader(string(logicJson)),
398+
strings.NewReader(resourceJson),
399+
&result,
400+
)
401+
if err != nil {
402+
glog.Errorf(`Failed to apply event filtering rule "%s": %s`, string(logicJson), err)
403+
return false
404+
}
405+
resultBool := strings.Contains(result.String(), "true")
406+
if resultBool {
407+
truncated, _ := common.Truncate(resourceJson, 40)
408+
glog.V(2).Infof(`Event matched logic: logic="%s" resource="%s"`, string(logicJson), truncated)
409+
return true
410+
}
411+
}
412+
return false
413+
}
414+
363415
func (i *kubeWatcherImpl) Stop() {
364416
glog.Infof("Stopping kubeWatcher")
365417

pkg/sloop/ingress/kubewatcher_test.go

+77-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ func Test_bigPicture(t *testing.T) {
8181
masterURL := "url"
8282
kubeContext := "" // empty string makes things work
8383
enableGranularMetrics := true
84-
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics)
84+
exclusionRules := map[string][]any{}
85+
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules)
8586
assert.NoError(t, err)
8687

8788
// create service and await corresponding event
@@ -100,6 +101,81 @@ func Test_bigPicture(t *testing.T) {
100101
kw.Stop()
101102
}
102103

104+
// As above but specify non-default exclusion rules to exclude events for service named s2
105+
func Test_bigPictureWithExclusionRules(t *testing.T) {
106+
newCrdClient = newTestCrdClient(reactionListOfOne) // force startCustomInformers() to use a fake clientset
107+
108+
kubeClient := kubernetesFake.NewSimpleClientset()
109+
outChan := make(chan typed.KubeWatchResult, 5)
110+
resync := 30 * time.Minute
111+
includeCrds := true
112+
masterURL := "url"
113+
kubeContext := "" // empty string makes things work
114+
enableGranularMetrics := true
115+
exclusionRules := map[string][]any{
116+
"_all": []any{
117+
map[string]any{
118+
"==": []any{
119+
map[string]any{
120+
"var": "metadata.name",
121+
},
122+
"s2",
123+
},
124+
},
125+
},
126+
}
127+
128+
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules)
129+
assert.NoError(t, err)
130+
131+
// create namespace
132+
ns := "ns"
133+
_, err = kubeClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
134+
if err != nil {
135+
t.FailNow()
136+
}
137+
138+
// create first service
139+
svc := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s1"}}
140+
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
141+
if err != nil {
142+
t.Fatalf("Error creating service: %v\n", err)
143+
}
144+
145+
// create second service, corresponding event should be excluded by exclusion rule
146+
svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s2"}}
147+
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
148+
if err != nil {
149+
t.Fatalf("Error creating service: %v\n", err)
150+
}
151+
152+
// create third service
153+
svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s3"}}
154+
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
155+
if err != nil {
156+
t.Fatalf("Error creating service: %v\n", err)
157+
}
158+
159+
eventCount := 0
160+
loop:
161+
for {
162+
select {
163+
case <-time.After(1 * time.Second):
164+
break loop
165+
case result, ok := <-outChan:
166+
if ok {
167+
eventCount++
168+
assert.NotContains(t, result.Payload, `"name":"s2"`)
169+
} else {
170+
t.Fatalf("Channel closed unexpectedly: %v\n", ok)
171+
}
172+
}
173+
}
174+
assert.Equal(t, 3, eventCount) // assert no event for service named s2
175+
176+
kw.Stop()
177+
}
178+
103179
func Test_getCrdList(t *testing.T) {
104180
crdClient, _ := newTestCrdClient(reactionError)(&rest.Config{})
105181
crdList, err := getCrdList(crdClient)

pkg/sloop/server/internal/config/config.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ type SloopConfig struct {
2929
// These fields can only come from command line
3030
ConfigFile string
3131
// These fields can only come from file because they use complex types
32-
LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"`
33-
ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"`
32+
LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"`
33+
ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"`
34+
ExclusionRules map[string][]any `json:"exclusionRules"`
3435
// Normal fields that can come from file or cmd line
3536
DisableKubeWatcher bool `json:"disableKubeWatch"`
3637
KubeWatchResyncInterval time.Duration `json:"kubeWatchResyncInterval"`
@@ -177,6 +178,7 @@ func getDefaultConfig() *SloopConfig {
177178
EnableGranularMetrics: false,
178179
PrivilegedAccess: true,
179180
BadgerDetailLogEnabled: false,
181+
ExclusionRules: map[string][]any{},
180182
}
181183
return &defaultConfig
182184
}

pkg/sloop/server/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func RealMain() error {
103103
return errors.Wrap(err, "failed to create kubernetes client")
104104
}
105105

106-
kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics)
106+
kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics, conf.ExclusionRules)
107107
if err != nil {
108108
return errors.Wrap(err, "failed to initialize kubeWatcher")
109109
}

0 commit comments

Comments
 (0)