Skip to content

Commit 69b0ca0

Browse files
committed
Add alert lifecycle observer
Signed-off-by: Emmanuel Lodovice <[email protected]>
1 parent 2a16a63 commit 69b0ca0

File tree

13 files changed

+578
-84
lines changed

13 files changed

+578
-84
lines changed

alertobserver/alertobserver.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2023 Prometheus Team
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package alertobserver
15+
16+
import (
17+
"github.com/prometheus/alertmanager/types"
18+
)
19+
20+
const (
21+
EventAlertReceived string = "received"
22+
EventAlertRejected string = "rejected"
23+
EventAlertAddedToAggrGroup string = "addedAggrGroup"
24+
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
25+
EventAlertPipelineStart string = "pipelineStart"
26+
EventAlertPipelinePassStage string = "pipelinePassStage"
27+
EventAlertMuted string = "muted"
28+
EventAlertSent string = "sent"
29+
EventAlertSendFailed string = "sendFailed"
30+
)
31+
32+
type AlertEventMeta map[string]interface{}
33+
34+
type LifeCycleObserver interface {
35+
Observe(event string, alerts []*types.Alert, meta AlertEventMeta)
36+
}

alertobserver/testing.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2023 Prometheus Team
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package alertobserver
15+
16+
import (
17+
"sync"
18+
19+
"github.com/prometheus/alertmanager/types"
20+
)
21+
22+
type FakeLifeCycleObserver struct {
23+
AlertsPerEvent map[string][]*types.Alert
24+
PipelineStageAlerts map[string][]*types.Alert
25+
MetaPerEvent map[string][]AlertEventMeta
26+
Mtx sync.RWMutex
27+
}
28+
29+
func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
30+
o.Mtx.Lock()
31+
defer o.Mtx.Unlock()
32+
if event == EventAlertPipelinePassStage {
33+
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
34+
} else {
35+
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
36+
}
37+
o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta)
38+
}
39+
40+
func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
41+
return &FakeLifeCycleObserver{
42+
PipelineStageAlerts: map[string][]*types.Alert{},
43+
AlertsPerEvent: map[string][]*types.Alert{},
44+
MetaPerEvent: map[string][]AlertEventMeta{},
45+
}
46+
}

api/api.go

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/prometheus/common/model"
2828
"github.com/prometheus/common/route"
2929

30+
"github.com/prometheus/alertmanager/alertobserver"
3031
apiv1 "github.com/prometheus/alertmanager/api/v1"
3132
apiv2 "github.com/prometheus/alertmanager/api/v2"
3233
"github.com/prometheus/alertmanager/cluster"
@@ -81,6 +82,9 @@ type Options struct {
8182
GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos
8283
// APICallback define the callback function that each api call will perform before returned.
8384
APICallback callback.Callback
85+
// AlertLCObserver is used to add hooks to the different alert life cycle events.
86+
// If nil then no observer methods will be invoked in the life cycle events.
87+
AlertLCObserver alertobserver.LifeCycleObserver
8488
}
8589

8690
func (o Options) validate() error {
@@ -124,6 +128,7 @@ func New(opts Options) (*API, error) {
124128
opts.Peer,
125129
log.With(l, "version", "v1"),
126130
opts.Registry,
131+
opts.AlertLCObserver,
127132
)
128133

129134
v2, err := apiv2.NewAPI(
@@ -136,6 +141,7 @@ func New(opts Options) (*API, error) {
136141
opts.Peer,
137142
log.With(l, "version", "v2"),
138143
opts.Registry,
144+
opts.AlertLCObserver,
139145
)
140146
if err != nil {
141147
return nil, err

api/v1/api.go

+30-15
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/prometheus/common/route"
3131
"github.com/prometheus/common/version"
3232

33+
"github.com/prometheus/alertmanager/alertobserver"
3334
"github.com/prometheus/alertmanager/api/metrics"
3435
"github.com/prometheus/alertmanager/cluster"
3536
"github.com/prometheus/alertmanager/config"
@@ -67,14 +68,15 @@ func setCORS(w http.ResponseWriter) {
6768

6869
// API provides registration of handlers for API routes.
6970
type API struct {
70-
alerts provider.Alerts
71-
silences *silence.Silences
72-
config *config.Config
73-
route *dispatch.Route
74-
uptime time.Time
75-
peer cluster.ClusterPeer
76-
logger log.Logger
77-
m *metrics.Alerts
71+
alerts provider.Alerts
72+
silences *silence.Silences
73+
config *config.Config
74+
route *dispatch.Route
75+
uptime time.Time
76+
peer cluster.ClusterPeer
77+
logger log.Logger
78+
m *metrics.Alerts
79+
alertLCObserver alertobserver.LifeCycleObserver
7880

7981
getAlertStatus getAlertStatusFn
8082

@@ -91,19 +93,21 @@ func New(
9193
peer cluster.ClusterPeer,
9294
l log.Logger,
9395
r prometheus.Registerer,
96+
o alertobserver.LifeCycleObserver,
9497
) *API {
9598
if l == nil {
9699
l = log.NewNopLogger()
97100
}
98101

99102
return &API{
100-
alerts: alerts,
101-
silences: silences,
102-
getAlertStatus: sf,
103-
uptime: time.Now(),
104-
peer: peer,
105-
logger: l,
106-
m: metrics.NewAlerts("v1", r),
103+
alerts: alerts,
104+
silences: silences,
105+
getAlertStatus: sf,
106+
uptime: time.Now(),
107+
peer: peer,
108+
logger: l,
109+
m: metrics.NewAlerts("v1", r),
110+
alertLCObserver: o,
107111
}
108112
}
109113

@@ -447,6 +451,10 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
447451
if err := a.Validate(); err != nil {
448452
validationErrs.Add(err)
449453
api.m.Invalid().Inc()
454+
if api.alertLCObserver != nil {
455+
m := alertobserver.AlertEventMeta{"msg": err.Error()}
456+
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
457+
}
450458
continue
451459
}
452460
validAlerts = append(validAlerts, a)
@@ -456,8 +464,15 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
456464
typ: errorInternal,
457465
err: err,
458466
}, nil)
467+
if api.alertLCObserver != nil {
468+
m := alertobserver.AlertEventMeta{"msg": err.Error()}
469+
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
470+
}
459471
return
460472
}
473+
if api.alertLCObserver != nil {
474+
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
475+
}
461476

462477
if validationErrs.Len() > 0 {
463478
api.respondError(w, apiError{

api/v1/api_test.go

+71-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/prometheus/common/model"
2929
"github.com/stretchr/testify/require"
3030

31+
"github.com/prometheus/alertmanager/alertobserver"
3132
"github.com/prometheus/alertmanager/config"
3233
"github.com/prometheus/alertmanager/dispatch"
3334
"github.com/prometheus/alertmanager/pkg/labels"
@@ -134,7 +135,7 @@ func TestAddAlerts(t *testing.T) {
134135
}
135136

136137
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
137-
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
138+
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
138139
defaultGlobalConfig := config.DefaultGlobalConfig()
139140
route := config.Route{}
140141
api.Update(&config.Config{
@@ -153,6 +154,74 @@ func TestAddAlerts(t *testing.T) {
153154
body, _ := io.ReadAll(res.Body)
154155

155156
require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))
157+
158+
observer := alertobserver.NewFakeLifeCycleObserver()
159+
api.alertLCObserver = observer
160+
r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
161+
w = httptest.NewRecorder()
162+
if err != nil {
163+
t.Errorf("Unexpected error %v", err)
164+
}
165+
api.addAlerts(w, r)
166+
if tc.code == 200 {
167+
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
168+
} else {
169+
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
170+
}
171+
}
172+
}
173+
174+
func TestAddAlertsWithAlertLCObserver(t *testing.T) {
175+
now := func(offset int) time.Time {
176+
return time.Now().Add(time.Duration(offset) * time.Second)
177+
}
178+
179+
for i, tc := range []struct {
180+
start, end time.Time
181+
err bool
182+
code int
183+
}{
184+
{time.Time{}, time.Time{}, false, 200},
185+
{now(1), now(0), false, 400},
186+
{now(0), time.Time{}, true, 500},
187+
} {
188+
alerts := []model.Alert{{
189+
StartsAt: tc.start,
190+
EndsAt: tc.end,
191+
Labels: model.LabelSet{"label1": "test1"},
192+
Annotations: model.LabelSet{"annotation1": "some text"},
193+
}}
194+
b, err := json.Marshal(&alerts)
195+
if err != nil {
196+
t.Errorf("Unexpected error %v", err)
197+
}
198+
199+
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
200+
observer := alertobserver.NewFakeLifeCycleObserver()
201+
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer)
202+
defaultGlobalConfig := config.DefaultGlobalConfig()
203+
route := config.Route{}
204+
api.Update(&config.Config{
205+
Global: &defaultGlobalConfig,
206+
Route: &route,
207+
})
208+
209+
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
210+
w := httptest.NewRecorder()
211+
if err != nil {
212+
t.Errorf("Unexpected error %v", err)
213+
}
214+
215+
api.addAlerts(w, r)
216+
res := w.Result()
217+
body, _ := io.ReadAll(res.Body)
218+
219+
require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))
220+
if tc.code == 200 {
221+
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
222+
} else {
223+
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
224+
}
156225
}
157226
}
158227

@@ -267,7 +336,7 @@ func TestListAlerts(t *testing.T) {
267336
},
268337
} {
269338
alertsProvider := newFakeAlerts(alerts, tc.err)
270-
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
339+
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
271340
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)
272341

273342
r, err := http.NewRequest("GET", "/api/v1/alerts", nil)

api/v2/api.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist"
3838
"github.com/prometheus/alertmanager/util/callback"
3939

40+
"github.com/prometheus/alertmanager/alertobserver"
4041
"github.com/prometheus/alertmanager/api/metrics"
4142
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
4243
"github.com/prometheus/alertmanager/api/v2/restapi"
@@ -76,8 +77,9 @@ type API struct {
7677
route *dispatch.Route
7778
setAlertStatus setAlertStatusFn
7879

79-
logger log.Logger
80-
m *metrics.Alerts
80+
logger log.Logger
81+
m *metrics.Alerts
82+
alertLCObserver alertobserver.LifeCycleObserver
8183

8284
Handler http.Handler
8385
}
@@ -100,6 +102,7 @@ func NewAPI(
100102
peer cluster.ClusterPeer,
101103
l log.Logger,
102104
r prometheus.Registerer,
105+
o alertobserver.LifeCycleObserver,
103106
) (*API, error) {
104107
if apiCallback == nil {
105108
apiCallback = callback.NoopAPICallback{}
@@ -115,6 +118,7 @@ func NewAPI(
115118
logger: l,
116119
m: metrics.NewAlerts("v2", r),
117120
uptime: time.Now(),
121+
alertLCObserver: o,
118122
}
119123

120124
// Load embedded swagger file.
@@ -402,19 +406,30 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
402406
if err := a.Validate(); err != nil {
403407
validationErrs.Add(err)
404408
api.m.Invalid().Inc()
409+
if api.alertLCObserver != nil {
410+
m := alertobserver.AlertEventMeta{"msg": err.Error()}
411+
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
412+
}
405413
continue
406414
}
407415
validAlerts = append(validAlerts, a)
408416
}
409417
if err := api.alerts.Put(validAlerts...); err != nil {
410418
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
419+
if api.alertLCObserver != nil {
420+
m := alertobserver.AlertEventMeta{"msg": err.Error()}
421+
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
422+
}
411423
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
412424
}
413425

414426
if validationErrs.Len() > 0 {
415427
level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error())
416428
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
417429
}
430+
if api.alertLCObserver != nil {
431+
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
432+
}
418433

419434
return alert_ops.NewPostAlertsOK()
420435
}

0 commit comments

Comments
 (0)