@@ -21,6 +21,8 @@ import (
2121 "sync"
2222 "time"
2323
24+ "go.uber.org/atomic"
25+
2426 "github.com/prometheus/common/model"
2527
2628 "github.com/prometheus/alertmanager/notify"
@@ -41,8 +43,7 @@ type Dispatcher struct {
4143 timeout func (time.Duration ) time.Duration
4244
4345 mtx sync.RWMutex
44- aggrGroupsPerRoute map [* Route ]map [model.Fingerprint ]* aggrGroup
45- aggrGroupsNum int
46+ aggrGroupsPerRoute routeGroups
4647
4748 done chan struct {}
4849 ctx context.Context
@@ -84,8 +85,10 @@ func (d *Dispatcher) Run() {
8485 d .done = make (chan struct {})
8586
8687 d .mtx .Lock ()
87- d .aggrGroupsPerRoute = map [* Route ]map [model.Fingerprint ]* aggrGroup {}
88- d .aggrGroupsNum = 0
88+ d .aggrGroupsPerRoute = routeGroups {
89+ groupsNum : & atomic.Int64 {},
90+ limits : d .limits ,
91+ }
8992 d .metrics .aggrGroups .Set (0 )
9093 d .ctx , d .cancel = context .WithCancel (context .Background ())
9194 d .mtx .Unlock ()
@@ -134,40 +137,58 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
134137}
135138
136139func (d * Dispatcher ) doMaintenance () {
137- d .mtx .Lock ()
138- defer d .mtx .Unlock ()
139- for _ , groups := range d .aggrGroupsPerRoute {
140- for _ , ag := range groups {
140+ type groupToRemove struct {
141+ route * Route
142+ fp model.Fingerprint
143+ ag * aggrGroup
144+ }
145+
146+ var toRemove []groupToRemove
147+
148+ // First pass: collect groups to remove
149+ d .aggrGroupsPerRoute .Range (func (route * Route , groups * fingerprintGroups ) bool {
150+ groups .Range (func (fp model.Fingerprint , ag * aggrGroup ) bool {
141151 if ag .empty () {
142- ag .stop ()
143- d .marker .DeleteByGroupKey (ag .routeID , ag .GroupKey ())
144- delete (groups , ag .fingerprint ())
145- d .aggrGroupsNum --
146- d .metrics .aggrGroups .Dec ()
152+ toRemove = append (toRemove , groupToRemove {route , fp , ag })
147153 }
154+ return true
155+ })
156+ return true
157+ })
158+
159+ // Second pass: remove collected groups
160+ for _ , item := range toRemove {
161+ item .ag .stop ()
162+ d .marker .DeleteByGroupKey (item .ag .routeID , item .ag .GroupKey ())
163+ groupsMap := d .aggrGroupsPerRoute .getRoute (item .route )
164+ if groupsMap != nil {
165+ groupsMap .removeGroup (item .fp )
166+ d .metrics .aggrGroups .Dec ()
148167 }
149168 }
150169}
151170
152171// Groups returns a slice of AlertGroups from the dispatcher's internal state.
153172func (d * Dispatcher ) Groups (routeFilter func (* Route ) bool , alertFilter func (* types.Alert , time.Time ) bool ) (AlertGroups , map [model .Fingerprint ][]string ) {
154- groups := AlertGroups {}
155-
156- d .mtx .RLock ()
157- defer d .mtx .RUnlock ()
173+ // Snapshot the outer map in routeGroups to
174+ // avoid holding the read lock when dispatcher has 1000s of aggregation groups.
175+ routeGroups := routeGroups {}
176+ d .aggrGroupsPerRoute .Range (func (route * Route , groups * fingerprintGroups ) bool {
177+ routeGroups .addRoute (route )
178+ return true
179+ })
158180
159- // Keep a list of receivers for an alert to prevent checking each alert
160- // again against all routes. The alert has already matched against this
161- // route on ingestion.
181+ // TODO: move this processing out of Dispatcher, it does not belong here.
182+ alertGroups := AlertGroups {}
162183 receivers := map [model.Fingerprint ][]string {}
163-
164184 now := time .Now ()
165- for route , ags := range d . aggrGroupsPerRoute {
185+ routeGroups . Range ( func ( route * Route , _ * fingerprintGroups ) bool {
166186 if ! routeFilter (route ) {
167- continue
187+ return true
168188 }
169189
170- for _ , ag := range ags {
190+ // Read inner fingerprintGroups if necessary.
191+ d .aggrGroupsPerRoute .getRoute (route ).Range (func (fp model.Fingerprint , ag * aggrGroup ) bool {
171192 receiver := route .RouteOpts .Receiver
172193 alertGroup := & AlertGroup {
173194 Labels : ag .labels ,
@@ -197,22 +218,24 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
197218 filteredAlerts = append (filteredAlerts , a )
198219 }
199220 if len (filteredAlerts ) == 0 {
200- continue
221+ return true
201222 }
202223 alertGroup .Alerts = filteredAlerts
203224
204- groups = append (groups , alertGroup )
205- }
206- }
207- sort .Sort (groups )
208- for i := range groups {
209- sort .Sort (groups [i ].Alerts )
225+ alertGroups = append (alertGroups , alertGroup )
226+ return true
227+ })
228+ return true
229+ })
230+ sort .Sort (alertGroups )
231+ for i := range alertGroups {
232+ sort .Sort (alertGroups [i ].Alerts )
210233 }
211234 for i := range receivers {
212235 sort .Strings (receivers [i ])
213236 }
214237
215- return groups , receivers
238+ return alertGroups , receivers
216239}
217240
218241// Stop the dispatcher.
@@ -236,42 +259,31 @@ func (d *Dispatcher) Stop() {
236259// and inserts it.
237260func (d * Dispatcher ) processAlert (alert * types.Alert , route * Route ) {
238261 groupLabels := getGroupLabels (alert , route )
239-
240262 fp := groupLabels .Fingerprint ()
241263
242- d .mtx .Lock ()
243- defer d .mtx .Unlock ()
244-
245- routeGroups , ok := d .aggrGroupsPerRoute [route ]
246- if ! ok {
247- routeGroups = map [model.Fingerprint ]* aggrGroup {}
248- d .aggrGroupsPerRoute [route ] = routeGroups
249- }
250-
251- ag , ok := routeGroups [fp ]
252- if ok {
253- ag .insert (alert )
264+ routeGroups := d .aggrGroupsPerRoute .addRoute (route )
265+ group := routeGroups .getGroup (fp )
266+ if group != nil {
267+ group .insert (alert )
254268 return
255269 }
256270
257- // If the group does not exist, create it. But check the limit first.
258- if limit := d .limits .MaxNumberOfAggregationGroups (); limit > 0 && d .aggrGroupsNum >= limit {
271+ // If the group does not exist, create it.
272+ group , count , limit := routeGroups .addGroup (fp , newAggrGroup (d .ctx , groupLabels , route , d .timeout , d .logger ))
273+ if group == nil {
274+ // Rate limited.
259275 d .metrics .aggrGroupLimitReached .Inc ()
260- d .logger .Error ("Too many aggregation groups, cannot create new group for alert" , "groups" , d . aggrGroupsNum , "limit" , limit , "alert" , alert . Name () )
276+ d .logger .Error ("Too many aggregation groups, cannot create new group for alert" , "groups" , count , "limit" , limit )
261277 return
262278 }
263-
264- ag = newAggrGroup (d .ctx , groupLabels , route , d .timeout , d .logger )
265- routeGroups [fp ] = ag
266- d .aggrGroupsNum ++
267279 d .metrics .aggrGroups .Inc ()
268280
269281 // Insert the 1st alert in the group before starting the group's run()
270282 // function, to make sure that when the run() will be executed the 1st
271283 // alert is already there.
272- ag .insert (alert )
284+ group .insert (alert )
273285
274- go ag .run (func (ctx context.Context , alerts ... * types.Alert ) bool {
286+ go group .run (func (ctx context.Context , alerts ... * types.Alert ) bool {
275287 _ , _ , err := d .stage .Exec (ctx , d .logger , alerts ... )
276288 if err != nil {
277289 logger := d .logger .With ("num_alerts" , len (alerts ), "err" , err )
0 commit comments