Skip to content

Commit 5fb7788

Browse files
Alerting: Update rules version when folder title is updated (grafana#53013)
* remove support for bus from scheduler * rename event to FolderTitleUpdated and fire only if title has changed * add method to increase version of all rules that belong to a folder * update ngalert service to subscribe to folder title change event call data store and update scheduler * add tests
1 parent b3f7dee commit 5fb7788

12 files changed

+324
-82
lines changed

pkg/events/events.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type DataSourceCreated struct {
7171
OrgID int64 `json:"org_id"`
7272
}
7373

74-
type FolderUpdated struct {
74+
type FolderTitleUpdated struct {
7575
Timestamp time.Time `json:"timestamp"`
7676
Title string `json:"name"`
7777
ID int64 `json:"id"`

pkg/services/dashboards/service/folder_service.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"strings"
7-
"time"
87

98
"github.com/grafana/grafana/pkg/bus"
109
"github.com/grafana/grafana/pkg/events"
@@ -207,6 +206,7 @@ func (f *FolderServiceImpl) UpdateFolder(ctx context.Context, user *models.Signe
207206
}
208207

209208
dashFolder := query.Result
209+
currentTitle := dashFolder.Title
210210

211211
if !dashFolder.IsFolder {
212212
return dashboards.ErrFolderNotFound
@@ -238,14 +238,16 @@ func (f *FolderServiceImpl) UpdateFolder(ctx context.Context, user *models.Signe
238238
}
239239
cmd.Result = folder
240240

241-
if err := f.bus.Publish(ctx, &events.FolderUpdated{
242-
Timestamp: time.Now(),
243-
Title: folder.Title,
244-
ID: dash.Id,
245-
UID: dash.Uid,
246-
OrgID: orgID,
247-
}); err != nil {
248-
f.log.Error("failed to publish FolderUpdated event", "folder", folder.Title, "user", user.UserId, "error", err)
241+
if currentTitle != folder.Title {
242+
if err := f.bus.Publish(ctx, &events.FolderTitleUpdated{
243+
Timestamp: folder.Updated,
244+
Title: folder.Title,
245+
ID: dash.Id,
246+
UID: dash.Uid,
247+
OrgID: orgID,
248+
}); err != nil {
249+
f.log.Error("failed to publish FolderTitleUpdated event", "folder", folder.Title, "user", user.UserId, "error", err)
250+
}
249251
}
250252

251253
return nil

pkg/services/ngalert/models/alert_rule.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,13 @@ func (alertRule *AlertRule) Diff(rule *AlertRule, ignore ...string) cmputil.Diff
188188

189189
// AlertRuleKey is the alert definition identifier
190190
type AlertRuleKey struct {
191-
OrgID int64
192-
UID string
191+
OrgID int64 `xorm:"org_id"`
192+
UID string `xorm:"uid"`
193+
}
194+
195+
type AlertRuleKeyWithVersion struct {
196+
Version int64
197+
AlertRuleKey `xorm:"extends"`
193198
}
194199

195200
// AlertRuleGroupKey is the identifier of a group of alerts

pkg/services/ngalert/models/testing.go

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/grafana/grafana-plugin-sdk-go/data"
1010

11+
models2 "github.com/grafana/grafana/pkg/models"
1112
"github.com/grafana/grafana/pkg/util"
1213
)
1314

@@ -132,6 +133,18 @@ func WithSequentialGroupIndex() AlertRuleMutator {
132133
}
133134
}
134135

136+
func WithOrgID(orgId int64) AlertRuleMutator {
137+
return func(rule *AlertRule) {
138+
rule.OrgID = orgId
139+
}
140+
}
141+
142+
func WithNamespace(namespace *models2.Folder) AlertRuleMutator {
143+
return func(rule *AlertRule) {
144+
rule.NamespaceUID = namespace.Uid
145+
}
146+
}
147+
135148
func GenerateAlertLabels(count int, prefix string) data.Labels {
136149
labels := make(data.Labels, count)
137150
for i := 0; i < count; i++ {

pkg/services/ngalert/ngalert.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/grafana/grafana/pkg/api/routing"
1212
"github.com/grafana/grafana/pkg/bus"
13+
"github.com/grafana/grafana/pkg/events"
1314
"github.com/grafana/grafana/pkg/expr"
1415
"github.com/grafana/grafana/pkg/infra/kvstore"
1516
"github.com/grafana/grafana/pkg/infra/log"
@@ -21,6 +22,7 @@ import (
2122
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2223
"github.com/grafana/grafana/pkg/services/ngalert/image"
2324
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
25+
"github.com/grafana/grafana/pkg/services/ngalert/models"
2426
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
2527
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
2628
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
@@ -162,7 +164,12 @@ func (ng *AlertNG) init() error {
162164
}
163165

164166
stateManager := state.NewManager(ng.Log, ng.Metrics.GetStateMetrics(), appUrl, store, store, ng.dashboardService, ng.imageService, clk)
165-
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager, ng.bus)
167+
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager)
168+
169+
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
170+
if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) {
171+
subscribeToFolderChanges(ng.Log, ng.bus, store, scheduler)
172+
}
166173

167174
ng.stateManager = stateManager
168175
ng.schedule = scheduler
@@ -207,6 +214,31 @@ func (ng *AlertNG) init() error {
207214
return DeclareFixedRoles(ng.accesscontrol)
208215
}
209216

217+
func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore store.RuleStore, scheduler schedule.ScheduleService) {
218+
// if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and
219+
// clean up the current state
220+
bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error {
221+
// do not block the upstream execution
222+
go func(evt *events.FolderTitleUpdated) {
223+
logger.Debug("Got folder title updated event. updating rules in the folder", "folder_uid", evt.UID)
224+
updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(context.Background(), evt.OrgID, evt.UID)
225+
if err != nil {
226+
logger.Error("Failed to update alert rules in the folder after its title was changed", "err", err, "folder_uid", evt.UID, "folder", evt.Title)
227+
return
228+
}
229+
if len(updated) > 0 {
230+
logger.Debug("rules that belong to the folder have been updated successfully. clearing their status", "updated_rules", len(updated))
231+
for _, key := range updated {
232+
scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version)
233+
}
234+
} else {
235+
logger.Debug("no alert rules found in the folder. nothing to update", "folder_uid", evt.UID, "folder", evt.Title)
236+
}
237+
}(e)
238+
return nil
239+
})
240+
}
241+
210242
// Run starts the scheduler and Alertmanager.
211243
func (ng *AlertNG) Run(ctx context.Context) error {
212244
ng.Log.Debug("ngalert starting")

pkg/services/ngalert/ngalert_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package ngalert
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/mock"
10+
"github.com/stretchr/testify/require"
11+
12+
busmock "github.com/grafana/grafana/pkg/bus/mock"
13+
"github.com/grafana/grafana/pkg/events"
14+
"github.com/grafana/grafana/pkg/infra/log"
15+
models2 "github.com/grafana/grafana/pkg/models"
16+
"github.com/grafana/grafana/pkg/services/ngalert/models"
17+
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
18+
"github.com/grafana/grafana/pkg/services/ngalert/store"
19+
"github.com/grafana/grafana/pkg/util"
20+
)
21+
22+
func Test_subscribeToFolderChanges(t *testing.T) {
23+
orgID := rand.Int63()
24+
folder := &models2.Folder{
25+
Id: 0,
26+
Uid: util.GenerateShortUID(),
27+
Title: "Folder" + util.GenerateShortUID(),
28+
}
29+
rules := models.GenerateAlertRules(5, models.AlertRuleGen(models.WithOrgID(orgID), models.WithNamespace(folder)))
30+
31+
bus := busmock.New()
32+
db := store.NewFakeRuleStore(t)
33+
db.Folders[orgID] = append(db.Folders[orgID], folder)
34+
db.PutRule(context.Background(), rules...)
35+
36+
scheduler := &schedule.FakeScheduleService{}
37+
scheduler.EXPECT().UpdateAlertRule(mock.Anything, mock.Anything).Return()
38+
39+
subscribeToFolderChanges(log.New("test"), bus, db, scheduler)
40+
41+
err := bus.Publish(context.Background(), &events.FolderTitleUpdated{
42+
Timestamp: time.Now(),
43+
Title: "Folder" + util.GenerateShortUID(),
44+
ID: folder.Id,
45+
UID: folder.Uid,
46+
OrgID: orgID,
47+
})
48+
require.NoError(t, err)
49+
50+
require.Eventuallyf(t, func() bool {
51+
return len(db.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) {
52+
c, ok := cmd.(store.GenericRecordedQuery)
53+
if !ok || c.Name != "IncreaseVersionForAllRulesInNamespace" {
54+
return nil, false
55+
}
56+
return c, true
57+
})) > 0
58+
}, time.Second, 10*time.Millisecond, "expected to call db store method but nothing was called")
59+
60+
var calledTimes int
61+
require.Eventuallyf(t, func() bool {
62+
for _, call := range scheduler.Calls {
63+
if call.Method == "UpdateAlertRule" {
64+
calledTimes++
65+
}
66+
}
67+
return calledTimes == len(rules)
68+
}, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes)
69+
70+
for _, rule := range rules {
71+
scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version)
72+
}
73+
}

pkg/services/ngalert/schedule/schedule.go

+2-42
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88

99
prometheusModel "github.com/prometheus/common/model"
1010

11-
"github.com/grafana/grafana/pkg/bus"
12-
"github.com/grafana/grafana/pkg/events"
1311
"github.com/grafana/grafana/pkg/infra/log"
1412
"github.com/grafana/grafana/pkg/models"
1513
"github.com/grafana/grafana/pkg/services/alerting"
@@ -27,23 +25,19 @@ import (
2725

2826
// ScheduleService is an interface for a service that schedules the evaluation
2927
// of alert rules.
30-
//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go
28+
//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go --with-expecter
3129
type ScheduleService interface {
3230
// Run the scheduler until the context is canceled or the scheduler returns
3331
// an error. The scheduler is terminated when this function returns.
3432
Run(context.Context) error
3533
// UpdateAlertRule notifies scheduler that a rule has been changed
3634
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64)
37-
// UpdateAlertRulesByNamespaceUID notifies scheduler that all rules in a namespace should be updated.
38-
UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error
3935
// DeleteAlertRule notifies scheduler that a rule has been changed
4036
DeleteAlertRule(key ngmodels.AlertRuleKey)
4137
// the following are used by tests only used for tests
4238
evalApplied(ngmodels.AlertRuleKey, time.Time)
4339
stopApplied(ngmodels.AlertRuleKey)
4440
overrideCfg(cfg SchedulerCfg)
45-
46-
folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error
4741
}
4842

4943
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
@@ -97,9 +91,6 @@ type schedule struct {
9791
// current tick depends on its evaluation interval and when it was
9892
// last evaluated.
9993
schedulableAlertRules alertRulesRegistry
100-
101-
// bus is used to hook into events that should cause rule updates.
102-
bus bus.Bus
10394
}
10495

10596
// SchedulerCfg is the scheduler configuration.
@@ -117,7 +108,7 @@ type SchedulerCfg struct {
117108
}
118109

119110
// NewScheduler returns a new schedule.
120-
func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager, bus bus.Bus) *schedule {
111+
func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager) *schedule {
121112
ticker := alerting.NewTicker(cfg.C, cfg.Cfg.BaseInterval, cfg.Metrics.Ticker)
122113

123114
sch := schedule{
@@ -138,12 +129,9 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
138129
stateManager: stateManager,
139130
minRuleInterval: cfg.Cfg.MinInterval,
140131
schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
141-
bus: bus,
142132
alertsSender: cfg.AlertSender,
143133
}
144134

145-
bus.AddEventListener(sch.folderUpdateHandler)
146-
147135
return &sch
148136
}
149137

@@ -165,26 +153,6 @@ func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int6
165153
ruleInfo.update(ruleVersion(lastVersion))
166154
}
167155

168-
// UpdateAlertRulesByNamespaceUID looks for the active rule evaluation for every rule in the given namespace and commands it to update the rule.
169-
func (sch *schedule) UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error {
170-
q := ngmodels.ListAlertRulesQuery{
171-
OrgID: orgID,
172-
NamespaceUIDs: []string{uid},
173-
}
174-
if err := sch.ruleStore.ListAlertRules(ctx, &q); err != nil {
175-
return err
176-
}
177-
178-
for _, r := range q.Result {
179-
sch.UpdateAlertRule(ngmodels.AlertRuleKey{
180-
OrgID: orgID,
181-
UID: r.UID,
182-
}, r.Version)
183-
}
184-
185-
return nil
186-
}
187-
188156
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
189157
func (sch *schedule) DeleteAlertRule(key ngmodels.AlertRuleKey) {
190158
// It can happen that the scheduler has deleted the alert rule before the
@@ -465,14 +433,6 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State)
465433
}
466434
}
467435

468-
// folderUpdateHandler listens for folder update events and updates all rules in the given folder.
469-
func (sch *schedule) folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error {
470-
if sch.disableGrafanaFolder {
471-
return nil
472-
}
473-
return sch.UpdateAlertRulesByNamespaceUID(ctx, evt.OrgID, evt.UID)
474-
}
475-
476436
// overrideCfg is only used on tests.
477437
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
478438
sch.clock = cfg.C

0 commit comments

Comments
 (0)