diff --git a/README.md b/README.md index f74bd5f4..b8df189d 100644 --- a/README.md +++ b/README.md @@ -522,3 +522,27 @@ receivers: foo: bar url: http://127.0.0.1:3100/loki/api/v1/push ``` + +# Prometheus + +The Prometheus receiver emits event count metrics that Prometheus can scrape. Prometheus must +be configured to scrape the kubernetes-event-exporter metrics. + +Resource kinds and event reasons must be specified. Metrics will be emitted for only those +resources and their associated events. + +```yaml +receivers: + - name: "prometheus" + prometheus: + # Specify a prefix for all event count metrics + eventsMetricsNamePrefix: "metric_prefix_" + # Specify resource kinds and which event reasons to capture for each kind + # Only events with the given reasons for each kind will be emitted + reasonFilter: + Pod: + - "FailedMount" + - "Unhealthy" + Job: + - "DeadlineExceeded" +``` diff --git a/config.example.yaml b/config.example.yaml index 3b6e694b..5d2d07f8 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -122,3 +122,9 @@ receivers: deliveryStreamName: "kubernetes-events" region: "us-east-1" deDot: true + - name: "prometheus" + prometheus: + eventsMetricsNamePrefix: "metric_prefix_" + reasonFilter: + Pod: + - "FailedMount" diff --git a/go.mod b/go.mod index 28b541b9..83590a92 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/prometheus/exporter-toolkit v0.10.0 github.com/rs/zerolog v1.28.0 github.com/slack-go/slack v0.12.0 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.9.0 google.golang.org/api v0.107.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 k8s.io/api v0.26.7 @@ -103,6 +103,7 @@ require ( github.com/spf13/cast v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xdg-go/scram v1.1.2 + github.com/stretchr/objx v0.5.2 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/go.sum b/go.sum index 79562f97..f5c68bf9 100644 --- a/go.sum +++ b/go.sum @@ -288,6 +288,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -304,6 +306,8 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= diff --git a/pkg/sinks/prometheus.go b/pkg/sinks/prometheus.go new file mode 100644 index 00000000..d16fbe64 --- /dev/null +++ b/pkg/sinks/prometheus.go @@ -0,0 +1,104 @@ +package sinks + +import ( + "context" + "strings" + + "k8s.io/utils/strings/slices" + + "github.com/prometheus/client_golang/prometheus" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" +) + +func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec { + v := prometheus.NewGaugeVec(opts, labelNames) + prometheus.MustRegister(v) + return v +} + +type PrometheusConfig struct { + EventsMetricsNamePrefix string `yaml:"eventsMetricsNamePrefix"` + ReasonFilter map[string][]string `yaml:"reasonFilter"` +} + +type PrometheusGaugeVec interface { + With(labels prometheus.Labels) prometheus.Gauge + Delete(labels prometheus.Labels) bool +} + +type PrometheusSink struct { + cfg *PrometheusConfig + kinds []string + metricsByKind map[string]PrometheusGaugeVec +} + +func NewPrometheusSink(config *PrometheusConfig) (Sink, error) { + if config.EventsMetricsNamePrefix == "" { + config.EventsMetricsNamePrefix = "event_exporter_" + } + + metricsByKind := map[string]PrometheusGaugeVec{} + + log.Info().Msgf("Initializing new Prometheus sink...") + kinds := []string{} + for kind := range config.ReasonFilter { + kinds = append(kinds, kind) + metricName := config.EventsMetricsNamePrefix + strings.ToLower(kind) + "_event_count" + metricLabels := []string{strings.ToLower(kind), "namespace", "reason"} + metricsByKind[kind] = newGaugeVec( + prometheus.GaugeOpts{ + Name: metricName, + Help: "Event counts for " + kind + " resources.", + }, metricLabels) + + log.Info().Msgf("Created metric: %s, will emit events: %v with additional labels: %v", kind, config.ReasonFilter[kind], metricLabels) + } + + return &PrometheusSink{ + cfg: config, + kinds: kinds, + metricsByKind: metricsByKind, + }, nil +} + +func (o *PrometheusSink) Send(_ context.Context, ev *kube.EnhancedEvent) error { + kind := ev.InvolvedObject.Kind + if slices.Contains(o.kinds, kind) { + for _, reason := range o.cfg.ReasonFilter[kind] { + if ev.Reason == reason { + SetEventCount(o.metricsByKind[kind], ev.InvolvedObject, reason, ev.Count) + } else { + DeleteEventCount(o.metricsByKind[kind], ev.InvolvedObject, reason) + } + } + } + + return nil +} + +func (o *PrometheusSink) Close() { + // No-op +} + +func getMetricLabels(obj kube.EnhancedObjectReference, reason string) prometheus.Labels { + prometheusLabels := prometheus.Labels{ + strings.ToLower(obj.Kind): obj.Name, + "namespace": obj.Namespace, + "reason": reason, + } + + return prometheusLabels +} + +func SetEventCount(metric PrometheusGaugeVec, obj kube.EnhancedObjectReference, reason string, count int32) { + labels := getMetricLabels(obj, reason) + log.Info().Msgf("Setting event count metric with labels: %v", labels) + metric.With(labels).Set(float64(count)) +} + +func DeleteEventCount(metric PrometheusGaugeVec, obj kube.EnhancedObjectReference, reason string) { + labels := getMetricLabels(obj, reason) + log.Info().Msgf("Deleting event count metric with labels: %v", labels) + metric.Delete(labels) +} diff --git a/pkg/sinks/prometheus_test.go b/pkg/sinks/prometheus_test.go new file mode 100644 index 00000000..1dfc3eaf --- /dev/null +++ b/pkg/sinks/prometheus_test.go @@ -0,0 +1,137 @@ +package sinks + +import ( + "context" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/stretchr/testify/mock" +) + +type mockGauge struct { + mock.Mock + prometheus.Gauge +} + +func (m *mockGauge) Set(count float64) { + m.Called(count) +} + +type mockGuageVec struct { + mock.Mock + *prometheus.GaugeVec +} + +func (v *mockGuageVec) With(labels prometheus.Labels) prometheus.Gauge { + withArgs := v.Called(labels) + return withArgs.Get(0).(prometheus.Gauge) +} + +func (v *mockGuageVec) Delete(labels prometheus.Labels) bool { + deleteArgs := v.Called(labels) + return deleteArgs.Get(0).(bool) +} + +func mockEvent(kind string, name string, namespace string, reason string, count int32) *kube.EnhancedEvent { + ev := &kube.EnhancedEvent{} + ev.Reason = reason + ev.Count = count + ev.InvolvedObject.Kind = kind + ev.InvolvedObject.Name = name + ev.InvolvedObject.Namespace = namespace + + return ev +} + +func TestPrometheusSink_Send(t *testing.T) { + configKind := "Pod" + configReason := "Starting" + testEvent := mockEvent("Pod", "testpod", "testnamespace", "Starting", 1) + + tests := []struct { + name string + configKind string + configReason string + ev *kube.EnhancedEvent + wantPrometheusLabels prometheus.Labels + wantErr bool + wantSetCalled bool + wantDeleteCalled bool + }{ + { + name: "emits desired resource event", + configKind: configKind, + configReason: configReason, + ev: testEvent, + wantPrometheusLabels: prometheus.Labels{ + strings.ToLower(configKind): testEvent.InvolvedObject.Name, + "namespace": testEvent.InvolvedObject.Namespace, + "reason": configReason, + }, + wantErr: false, + wantSetCalled: true, + wantDeleteCalled: false, + }, + { + name: "deletes desired resource event", + configKind: configKind, + configReason: "Creating", + ev: testEvent, + wantPrometheusLabels: prometheus.Labels{ + strings.ToLower(configKind): testEvent.InvolvedObject.Name, + "namespace": testEvent.InvolvedObject.Namespace, + "reason": "Creating", + }, + wantErr: false, + wantSetCalled: false, + wantDeleteCalled: true, + }, + { + name: "does nothing if kind is not expected", + configKind: "ReplicaSet", + configReason: "SuccessfulCreate", + ev: testEvent, + wantPrometheusLabels: prometheus.Labels{}, + wantErr: false, + wantSetCalled: false, + wantDeleteCalled: false, + }, + } + for _, tt := range tests { + mockGauge := &mockGauge{} + mockGauge.On("Set", mock.Anything).Return() + mockPodMetric := &mockGuageVec{} + mockPodMetric.On("With", mock.Anything).Return(mockGauge) + mockPodMetric.On("Delete", mock.Anything).Return(true) + + t.Run(tt.name, func(t *testing.T) { + o := &PrometheusSink{ + cfg: &PrometheusConfig{ + EventsMetricsNamePrefix: "test_prefix_", + ReasonFilter: map[string][]string{tt.configKind: {tt.configReason}}, + }, + kinds: []string{tt.configKind}, + metricsByKind: map[string]PrometheusGaugeVec{tt.configKind: mockPodMetric}, + } + if err := o.Send(context.TODO(), tt.ev); (err != nil) != tt.wantErr { + t.Errorf("PrometheusSink.Send() error = %v, wantErr %v", err, tt.wantErr) + } + + if tt.wantSetCalled { + mockPodMetric.AssertCalled(t, "With", tt.wantPrometheusLabels) + mockGauge.AssertCalled(t, "Set", float64(1)) + } else { + mockPodMetric.AssertNotCalled(t, "With") + mockGauge.AssertNotCalled(t, "Set") + } + + if tt.wantDeleteCalled { + mockPodMetric.AssertCalled(t, "Delete", tt.wantPrometheusLabels) + } else { + mockPodMetric.AssertNotCalled(t, "Delete") + } + }) + } +} diff --git a/pkg/sinks/receiver.go b/pkg/sinks/receiver.go index 21fc35a5..1f68d8e1 100644 --- a/pkg/sinks/receiver.go +++ b/pkg/sinks/receiver.go @@ -26,6 +26,7 @@ type ReceiverConfig struct { BigQuery *BigQueryConfig `yaml:"bigquery"` EventBridge *EventBridgeConfig `yaml:"eventbridge"` Pipe *PipeConfig `yaml:"pipe"` + Prometheus *PrometheusConfig `yaml:"prometheus"` } func (r *ReceiverConfig) Validate() error { @@ -122,5 +123,9 @@ func (r *ReceiverConfig) GetSink() (Sink, error) { return NewLoki(r.Loki) } + if r.Prometheus != nil { + return NewPrometheusSink(r.Prometheus) + } + return nil, errors.New("unknown sink") }