diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 6883786dca..f0edf8a333 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,61 +16,20 @@ package dispatch import ( "context" "errors" - "fmt" "log/slog" "sort" "sync" "time" - "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" - "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" ) -// DispatcherMetrics represents metrics associated to a dispatcher. -type DispatcherMetrics struct { - aggrGroups prometheus.Gauge - processingDuration prometheus.Summary - aggrGroupLimitReached prometheus.Counter -} - -// NewDispatcherMetrics returns a new registered DispatchMetrics. -func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics { - m := DispatcherMetrics{ - aggrGroups: prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "alertmanager_dispatcher_aggregation_groups", - Help: "Number of active aggregation groups", - }, - ), - processingDuration: prometheus.NewSummary( - prometheus.SummaryOpts{ - Name: "alertmanager_dispatcher_alert_processing_duration_seconds", - Help: "Summary of latencies for the processing of alerts.", - }, - ), - aggrGroupLimitReached: prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total", - Help: "Number of times when dispatcher failed to create new aggregation group due to limit.", - }, - ), - } - - if r != nil { - r.MustRegister(m.aggrGroups, m.processingDuration) - if registerLimitMetrics { - r.MustRegister(m.aggrGroupLimitReached) - } - } - - return &m -} - // Dispatcher sorts incoming alerts into aggregation groups and // assigns the correct notifiers to each. type Dispatcher struct { @@ -84,8 +43,7 @@ type Dispatcher struct { timeout func(time.Duration) time.Duration mtx sync.RWMutex - aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup - aggrGroupsNum int + aggrGroupsPerRoute routeGroups done chan struct{} ctx context.Context @@ -94,14 +52,6 @@ type Dispatcher struct { logger *slog.Logger } -// Limits describes limits used by Dispatcher. -type Limits interface { - // MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have. - // 0 or negative value = unlimited. - // If dispatcher hits this limit, it will not create additional groups, but will log an error instead. - MaxNumberOfAggregationGroups() int -} - // NewDispatcher returns a new Dispatcher. func NewDispatcher( ap provider.Alerts, @@ -135,8 +85,10 @@ func (d *Dispatcher) Run() { d.done = make(chan struct{}) d.mtx.Lock() - d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} - d.aggrGroupsNum = 0 + d.aggrGroupsPerRoute = routeGroups{ + groupsNum: &atomic.Int64{}, + limits: d.limits, + } d.metrics.aggrGroups.Set(0) d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() @@ -185,60 +137,58 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } func (d *Dispatcher) doMaintenance() { - d.mtx.Lock() - defer d.mtx.Unlock() - for _, groups := range d.aggrGroupsPerRoute { - for _, ag := range groups { - if ag.empty() { - ag.stop() - d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey()) - delete(groups, ag.fingerprint()) - d.aggrGroupsNum-- - d.metrics.aggrGroups.Dec() - } - } + type groupToRemove struct { + route *Route + fp model.Fingerprint + ag *aggrGroup } -} -// AlertGroup represents how alerts exist within an aggrGroup. -type AlertGroup struct { - Alerts types.AlertSlice - Labels model.LabelSet - Receiver string - GroupKey string - RouteID string -} + var toRemove []groupToRemove -type AlertGroups []*AlertGroup + // First pass: collect groups to remove + d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool { + groups.Range(func(fp model.Fingerprint, ag *aggrGroup) bool { + if ag.empty() { + toRemove = append(toRemove, groupToRemove{route, fp, ag}) + } + return true + }) + return true + }) -func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] } -func (ag AlertGroups) Less(i, j int) bool { - if ag[i].Labels.Equal(ag[j].Labels) { - return ag[i].Receiver < ag[j].Receiver + // Second pass: remove collected groups + for _, item := range toRemove { + item.ag.stop() + d.marker.DeleteByGroupKey(item.ag.routeID, item.ag.GroupKey()) + groupsMap := d.aggrGroupsPerRoute.GetRoute(item.route) + if groupsMap != nil { + groupsMap.RemoveGroup(item.fp) + d.metrics.aggrGroups.Dec() + } } - return ag[i].Labels.Before(ag[j].Labels) } -func (ag AlertGroups) Len() int { return len(ag) } // Groups returns a slice of AlertGroups from the dispatcher's internal state. func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) { - groups := AlertGroups{} - - d.mtx.RLock() - defer d.mtx.RUnlock() + // Snapshot the outer map in routeGroups to + // avoid holding the read lock when dispatcher has 1000s of aggregation groups. + routeGroups := routeGroups{} + d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool { + routeGroups.AddRoute(route) + return true + }) - // Keep a list of receivers for an alert to prevent checking each alert - // again against all routes. The alert has already matched against this - // route on ingestion. + // TODO: move this processing out of Dispatcher, it does not belong here. + alertGroups := AlertGroups{} receivers := map[model.Fingerprint][]string{} - now := time.Now() - for route, ags := range d.aggrGroupsPerRoute { + routeGroups.Range(func(route *Route, _ *fingerprintGroups) bool { if !routeFilter(route) { - continue + return true } - for _, ag := range ags { + // Read inner fingerprintGroups if necessary. + d.aggrGroupsPerRoute.GetRoute(route).Range(func(fp model.Fingerprint, ag *aggrGroup) bool { receiver := route.RouteOpts.Receiver alertGroup := &AlertGroup{ Labels: ag.labels, @@ -268,22 +218,24 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ filteredAlerts = append(filteredAlerts, a) } if len(filteredAlerts) == 0 { - continue + return true } alertGroup.Alerts = filteredAlerts - groups = append(groups, alertGroup) - } - } - sort.Sort(groups) - for i := range groups { - sort.Sort(groups[i].Alerts) + alertGroups = append(alertGroups, alertGroup) + return true + }) + return true + }) + sort.Sort(alertGroups) + for i := range alertGroups { + sort.Sort(alertGroups[i].Alerts) } for i := range receivers { sort.Strings(receivers[i]) } - return groups, receivers + return alertGroups, receivers } // Stop the dispatcher. @@ -303,51 +255,35 @@ func (d *Dispatcher) Stop() { <-d.done } -// notifyFunc is a function that performs notification for the alert -// with the given fingerprint. It aborts on context cancelation. -// Returns false iff notifying failed. -type notifyFunc func(context.Context, ...*types.Alert) bool - // processAlert determines in which aggregation group the alert falls // and inserts it. func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { groupLabels := getGroupLabels(alert, route) - fp := groupLabels.Fingerprint() - d.mtx.Lock() - defer d.mtx.Unlock() - - routeGroups, ok := d.aggrGroupsPerRoute[route] - if !ok { - routeGroups = map[model.Fingerprint]*aggrGroup{} - d.aggrGroupsPerRoute[route] = routeGroups - } - - ag, ok := routeGroups[fp] - if ok { - ag.insert(alert) + routeGroups := d.aggrGroupsPerRoute.AddRoute(route) + group := routeGroups.GetGroup(fp) + if group != nil { + group.insert(alert) return } - // If the group does not exist, create it. But check the limit first. - if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { + // If the group does not exist, create it. + group, count, limit := routeGroups.AddGroup(fp, newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)) + if group == nil { + // Rate limited. d.metrics.aggrGroupLimitReached.Inc() - d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", count, "limit", limit) return } - - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) - routeGroups[fp] = ag - d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st // alert is already there. - ag.insert(alert) + group.insert(alert) - go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { + go group.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { logger := d.logger.With("num_alerts", len(alerts), "err", err) @@ -363,183 +299,3 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { return err == nil }) } - -func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet { - groupLabels := model.LabelSet{} - for ln, lv := range alert.Labels { - if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll { - groupLabels[ln] = lv - } - } - - return groupLabels -} - -// aggrGroup aggregates alert fingerprints into groups to which a -// common set of routing options applies. -// It emits notifications in the specified intervals. -type aggrGroup struct { - labels model.LabelSet - opts *RouteOpts - logger *slog.Logger - routeID string - routeKey string - - alerts *store.Alerts - ctx context.Context - cancel func() - done chan struct{} - next *time.Timer - timeout func(time.Duration) time.Duration - - mtx sync.RWMutex - hasFlushed bool -} - -// newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup { - if to == nil { - to = func(d time.Duration) time.Duration { return d } - } - ag := &aggrGroup{ - labels: labels, - routeID: r.ID(), - routeKey: r.Key(), - opts: &r.RouteOpts, - timeout: to, - alerts: store.NewAlerts(), - done: make(chan struct{}), - } - ag.ctx, ag.cancel = context.WithCancel(ctx) - - ag.logger = logger.With("aggrGroup", ag) - - // Set an initial one-time wait before flushing - // the first batch of notifications. - ag.next = time.NewTimer(ag.opts.GroupWait) - - return ag -} - -func (ag *aggrGroup) fingerprint() model.Fingerprint { - return ag.labels.Fingerprint() -} - -func (ag *aggrGroup) GroupKey() string { - return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels) -} - -func (ag *aggrGroup) String() string { - return ag.GroupKey() -} - -func (ag *aggrGroup) run(nf notifyFunc) { - defer close(ag.done) - defer ag.next.Stop() - - for { - select { - case now := <-ag.next.C: - // Give the notifications time until the next flush to - // finish before terminating them. - ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval)) - - // The now time we retrieve from the ticker is the only reliable - // point of time reference for the subsequent notification pipeline. - // Calculating the current time directly is prone to flaky behavior, - // which usually only becomes apparent in tests. - ctx = notify.WithNow(ctx, now) - - // Populate context with information needed along the pipeline. - ctx = notify.WithGroupKey(ctx, ag.GroupKey()) - ctx = notify.WithGroupLabels(ctx, ag.labels) - ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) - ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) - ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) - ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) - ctx = notify.WithRouteID(ctx, ag.routeID) - - // Wait the configured interval before calling flush again. - ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupInterval) - ag.hasFlushed = true - ag.mtx.Unlock() - - ag.flush(func(alerts ...*types.Alert) bool { - return nf(ctx, alerts...) - }) - - cancel() - - case <-ag.ctx.Done(): - return - } - } -} - -func (ag *aggrGroup) stop() { - // Calling cancel will terminate all in-process notifications - // and the run() loop. - ag.cancel() - <-ag.done -} - -// insert inserts the alert into the aggregation group. -func (ag *aggrGroup) insert(alert *types.Alert) { - if err := ag.alerts.Set(alert); err != nil { - ag.logger.Error("error on set alert", "err", err) - } - - // Immediately trigger a flush if the wait duration for this - // alert is already over. - ag.mtx.Lock() - defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { - ag.next.Reset(0) - } -} - -func (ag *aggrGroup) empty() bool { - return ag.alerts.Empty() -} - -// flush sends notifications for all new alerts. -func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { - if ag.empty() { - return - } - - var ( - alerts = ag.alerts.List() - alertsSlice = make(types.AlertSlice, 0, len(alerts)) - resolvedSlice = make(types.AlertSlice, 0, len(alerts)) - now = time.Now() - ) - for _, alert := range alerts { - a := *alert - // Ensure that alerts don't resolve as time move forwards. - if a.ResolvedAt(now) { - resolvedSlice = append(resolvedSlice, &a) - } else { - a.EndsAt = time.Time{} - } - alertsSlice = append(alertsSlice, &a) - } - sort.Stable(alertsSlice) - - ag.logger.Debug("flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) - - if notify(alertsSlice...) { - // Delete all resolved alerts as we just sent a notification for them, - // and we don't want to send another one. However, we need to make sure - // that each resolved alert has not fired again during the flush as then - // we would delete an active alert thinking it was resolved. - if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil { - ag.logger.Error("error on delete alerts", "err", err) - } - } -} - -type nilLimits struct{} - -func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 } diff --git a/dispatch/dispatch_bench_test.go b/dispatch/dispatch_bench_test.go new file mode 100644 index 0000000000..317883f991 --- /dev/null +++ b/dispatch/dispatch_bench_test.go @@ -0,0 +1,414 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + + "github.com/prometheus/alertmanager/provider/mem" + "github.com/prometheus/alertmanager/types" +) + +// setupBenchmarkDispatcher creates a dispatcher with the specified number of aggregation groups. +func setupBenchmarkDispatcher(totalGroups, emptyGroups int) (*Dispatcher, func()) { + r := prometheus.NewRegistry() + marker := types.NewMarker(r) + logger := promslog.NewNopLogger() + + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + panic(err) + } + + // Create route with fine-grained grouping to maximize group count + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{"alertname": {}, "instance": {}, "job": {}}, + GroupWait: 0, + GroupInterval: 1 * time.Hour, // Long interval to avoid interference + RepeatInterval: 1 * time.Hour, + }, + } + + timeout := func(d time.Duration) time.Duration { return d } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, r)) + + // Start the dispatcher to initialize context + go dispatcher.Run() + + // Wait a bit for dispatcher to initialize + time.Sleep(10 * time.Millisecond) + + // Create aggregation groups by processing alerts + nonEmptyCount := totalGroups - emptyGroups + + // Create alerts that will generate the desired number of groups + for i := 0; i < totalGroups; i++ { + alert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i)), + "instance": model.LabelValue(fmt.Sprintf("inst_%d", i)), + "job": model.LabelValue(fmt.Sprintf("job_%d", i)), + }, + StartsAt: time.Now().Add(-time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + + // Put alerts only for non-empty groups + if i < nonEmptyCount { + alerts.Put(alert) + } else { + // For empty groups, put and then immediately expire the alert + alerts.Put(alert) + // Create a resolved version to make group empty + resolvedAlert := *alert + resolvedAlert.EndsAt = time.Now().Add(-time.Second) + alerts.Put(&resolvedAlert) + } + } + + // Wait for alerts to be processed and groups to be created + time.Sleep(100 * time.Millisecond) + + cleanup := func() { + dispatcher.Stop() + alerts.Close() + } + + return dispatcher, cleanup +} + +// Benchmark maintenance impact on alert processing. +func BenchmarkDispatch_100k_AggregationGroups_10k_Empty(b *testing.B) { + benchmarkProcessAlertDuringMaintenance(b, 100_000, 10_000) +} + +func BenchmarkDispatch_100k_AggregationGroups_20k_Empty(b *testing.B) { + benchmarkProcessAlertDuringMaintenance(b, 100_000, 20_000) +} + +func BenchmarkDispatch_100k_AggregationGroups_30k_Empty(b *testing.B) { + benchmarkProcessAlertDuringMaintenance(b, 100_000, 30_000) +} + +func BenchmarkDispatch_100k_AggregationGroups_40k_Empty(b *testing.B) { + benchmarkProcessAlertDuringMaintenance(b, 100_000, 40_000) +} + +func BenchmarkDispatch_100k_AggregationGroups_50k_Empty(b *testing.B) { + benchmarkProcessAlertDuringMaintenance(b, 100_000, 50_000) +} + +// Benchmark Groups() impact on alert processing. +func BenchmarkDispatch_20k_AggregationGroups_Groups_Impact(b *testing.B) { + benchmarkProcessAlertDuringGroups(b, 20_000, 2_000) +} + +func BenchmarkDispatch_50k_AggregationGroups_Groups_Impact(b *testing.B) { + benchmarkProcessAlertDuringGroups(b, 50_000, 5_000) +} + +func BenchmarkDispatch_100k_AggregationGroups_Groups_Impact(b *testing.B) { + benchmarkProcessAlertDuringGroups(b, 100_000, 10_000) +} + +func benchmarkProcessAlertDuringMaintenance(b *testing.B, totalGroups, emptyGroups int) { + dispatcher, cleanup := setupBenchmarkDispatcher(totalGroups-emptyGroups, 0) // Start with non-empty groups only + defer cleanup() + + // Create test route + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "test", + GroupBy: map[model.LabelName]struct{}{"alertname": {}, "instance": {}}, // Use instance for more groups + GroupWait: 0, + GroupInterval: 1 * time.Hour, + RepeatInterval: 1 * time.Hour, + }, + } + + // Pre-create test alerts for main processing + alerts := make([]*types.Alert, b.N) + for i := 0; i < b.N; i++ { + alerts[i] = &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "BenchmarkAlert", + "instance": model.LabelValue(fmt.Sprintf("bench_inst_%d", i%100)), + }, + StartsAt: time.Now().Add(-time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + } + + b.ResetTimer() + + // Measure baseline alert processing rate (no maintenance) + start := time.Now() + for i := 0; i < b.N; i++ { + dispatcher.processAlert(alerts[i], route) + } + baselineDuration := time.Since(start) + + // Now measure processing rate during continuous maintenance with empty group creation + var maintenanceTime time.Duration + var duration time.Duration + + // Run maintenance and empty group generation in background + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // Goroutine 1: Maintenance + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + maintenanceStart := time.Now() + for { + select { + case <-ticker.C: + dispatcher.doMaintenance() + case <-ctx.Done(): + maintenanceTime = time.Since(maintenanceStart) + return + } + } + }() + + // Goroutine 2: Continuously create empty groups for maintenance to clean up + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(100 * time.Millisecond) // Create empty groups more frequently + defer ticker.Stop() + + emptyGroupCounter := 0 + for { + select { + case <-ticker.C: + // Create a batch of empty groups for maintenance to clean up + batchSize := emptyGroups / 5 // Create 1/5th of target empty groups each cycle + if batchSize < 100 { + batchSize = 100 // Minimum batch size + } + + for i := 0; i < batchSize; i++ { + // Create alert that will form a new group + emptyAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "EmptyGroupAlert", + "instance": model.LabelValue(fmt.Sprintf("empty_%d_%d", emptyGroupCounter, i)), + }, + StartsAt: time.Now().Add(-time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + + // Process the alert to create the group + dispatcher.processAlert(emptyAlert, route) + + // Immediately resolve it to make the group empty + resolvedAlert := *emptyAlert + resolvedAlert.EndsAt = time.Now().Add(-time.Second) + dispatcher.processAlert(&resolvedAlert, route) + } + emptyGroupCounter++ + case <-ctx.Done(): + return + } + } + }() + + // Small delay to let empty group generation start + time.Sleep(50 * time.Millisecond) + + // Measure processing under contention + start = time.Now() + for i := 0; i < b.N; i++ { + dispatcher.processAlert(alerts[i], route) + } + duration = time.Since(start) + cancel() + wg.Wait() + + baselineRate := float64(b.N) / baselineDuration.Seconds() + rate := float64(b.N) / duration.Seconds() + impact := (duration.Seconds() - baselineDuration.Seconds()) / baselineDuration.Seconds() * 100 + + // Report metrics + b.ReportMetric(float64(maintenanceTime.Milliseconds()), "ms/maintenance") + b.ReportMetric(baselineRate, "baseline_alerts/sec") + b.ReportMetric(rate, "alerts/sec") + b.ReportMetric(impact, "maintenance_overhead_%") +} + +func benchmarkProcessAlertDuringGroups(b *testing.B, totalGroups, emptyGroups int) { + dispatcher, cleanup := setupBenchmarkDispatcher(totalGroups-emptyGroups, 0) // Start with non-empty groups only + defer cleanup() + + // Create test route + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "test", + GroupBy: map[model.LabelName]struct{}{"alertname": {}, "instance": {}}, // Use instance for more groups + GroupWait: 0, + GroupInterval: 1 * time.Hour, + RepeatInterval: 1 * time.Hour, + }, + } + + // Pre-create test alerts for main processing + alerts := make([]*types.Alert, b.N) + for i := 0; i < b.N; i++ { + alerts[i] = &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "BenchmarkAlert", + "instance": model.LabelValue(fmt.Sprintf("bench_inst_%d", i%100)), + }, + StartsAt: time.Now().Add(-time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + } + + b.ResetTimer() + + // Measure baseline alert processing rate (no Groups() calls) + start := time.Now() + for i := 0; i < b.N; i++ { + dispatcher.processAlert(alerts[i], route) + } + baselineDuration := time.Since(start) + + // Now measure processing rate during continuous Groups() calls with empty group creation + var groupsTime time.Duration + var groupsCallCount int64 + var duration time.Duration + + // Run Groups() calls and empty group generation in background + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // Goroutine 1: Continuous Groups() calls + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) // Call Groups() frequently to measure impact + defer ticker.Stop() + + groupsStart := time.Now() + for { + select { + case <-ticker.C: + // Call Groups() with no filters to get all groups (worst case) + _, _ = dispatcher.Groups(func(*Route) bool { return true }, func(*types.Alert, time.Time) bool { return true }) + groupsCallCount++ + case <-ctx.Done(): + groupsTime = time.Since(groupsStart) + return + } + } + }() + + // Goroutine 2: Continuously create empty groups for more realistic Groups() load + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + emptyGroupCounter := 0 + for { + select { + case <-ticker.C: + // Create a batch of empty groups for Groups() to process + batchSize := emptyGroups / 10 // Create 1/10th of target empty groups each cycle + if batchSize < 50 { + batchSize = 50 // Minimum batch size + } + + for i := 0; i < batchSize; i++ { + // Create alert that will form a new group + emptyAlert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "EmptyGroupAlert", + "instance": model.LabelValue(fmt.Sprintf("empty_%d_%d", emptyGroupCounter, i)), + }, + StartsAt: time.Now().Add(-time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + + // Process the alert to create the group + dispatcher.processAlert(emptyAlert, route) + + // Immediately resolve it to make the group empty + resolvedAlert := *emptyAlert + resolvedAlert.EndsAt = time.Now().Add(-time.Second) + dispatcher.processAlert(&resolvedAlert, route) + } + emptyGroupCounter++ + case <-ctx.Done(): + return + } + } + }() + + // Small delay to let Groups() calls and empty group generation start + time.Sleep(50 * time.Millisecond) + + // Measure processing under Groups() call contention + start = time.Now() + for i := 0; i < b.N; i++ { + dispatcher.processAlert(alerts[i], route) + } + duration = time.Since(start) + cancel() + wg.Wait() + + rate := float64(b.N) / duration.Seconds() + baselineRate := float64(b.N) / baselineDuration.Seconds() + impact := (duration.Seconds() - baselineDuration.Seconds()) / baselineDuration.Seconds() * 100 + groupsRate := float64(groupsCallCount) / groupsTime.Seconds() + + // Report metrics + b.ReportMetric(float64(groupsCallCount), "groups_calls_total") + b.ReportMetric(groupsRate, "groups_calls/sec") + b.ReportMetric(baselineRate, "baseline_alerts/sec") + b.ReportMetric(rate, "alerts/sec") + b.ReportMetric(impact, "groups_overhead_%") +} diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 913032c6ce..553aaca829 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,12 +17,12 @@ import ( "context" "fmt" "log/slog" - "reflect" - "sort" "sync" "testing" "time" + "go.uber.org/atomic" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -35,329 +35,6 @@ import ( "github.com/prometheus/alertmanager/types" ) -func TestAggrGroup(t *testing.T) { - lset := model.LabelSet{ - "a": "v1", - "b": "v2", - } - opts := &RouteOpts{ - Receiver: "n1", - GroupBy: map[model.LabelName]struct{}{ - "a": {}, - "b": {}, - }, - GroupWait: 1 * time.Second, - GroupInterval: 300 * time.Millisecond, - RepeatInterval: 1 * time.Hour, - } - route := &Route{ - RouteOpts: *opts, - } - - var ( - a1 = &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v3", - }, - StartsAt: time.Now().Add(time.Minute), - EndsAt: time.Now().Add(time.Hour), - }, - UpdatedAt: time.Now(), - } - a2 = &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v4", - }, - StartsAt: time.Now().Add(-time.Hour), - EndsAt: time.Now().Add(2 * time.Hour), - }, - UpdatedAt: time.Now(), - } - a3 = &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v5", - }, - StartsAt: time.Now().Add(time.Minute), - EndsAt: time.Now().Add(5 * time.Minute), - }, - UpdatedAt: time.Now(), - } - ) - - var ( - last = time.Now() - current = time.Now() - lastCurMtx = &sync.Mutex{} - alertsCh = make(chan types.AlertSlice) - ) - - ntfy := func(ctx context.Context, alerts ...*types.Alert) bool { - // Validate that the context is properly populated. - if _, ok := notify.Now(ctx); !ok { - t.Errorf("now missing") - } - if _, ok := notify.GroupKey(ctx); !ok { - t.Errorf("group key missing") - } - if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) { - t.Errorf("wrong group labels: %q", lbls) - } - if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver { - t.Errorf("wrong receiver: %q", rcv) - } - if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval { - t.Errorf("wrong repeat interval: %q", ri) - } - - lastCurMtx.Lock() - last = current - // Subtract a millisecond to allow for races. - current = time.Now().Add(-time.Millisecond) - lastCurMtx.Unlock() - - alertsCh <- types.AlertSlice(alerts) - - return true - } - - removeEndsAt := func(as types.AlertSlice) types.AlertSlice { - for i, a := range as { - ac := *a - ac.EndsAt = time.Time{} - as[i] = &ac - } - return as - } - - // Test regular situation where we wait for group_wait to send out alerts. - ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) - go ag.run(ntfy) - - ag.insert(a1) - - select { - case <-time.After(2 * opts.GroupWait): - t.Fatalf("expected initial batch after group_wait") - - case batch := <-alertsCh: - lastCurMtx.Lock() - s := time.Since(last) - lastCurMtx.Unlock() - if s < opts.GroupWait { - t.Fatalf("received batch too early after %v", s) - } - exp := removeEndsAt(types.AlertSlice{a1}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } - } - - for i := 0; i < 3; i++ { - // New alert should come in after group interval. - ag.insert(a3) - - select { - case <-time.After(2 * opts.GroupInterval): - t.Fatalf("expected new batch after group interval but received none") - - case batch := <-alertsCh: - lastCurMtx.Lock() - s := time.Since(last) - lastCurMtx.Unlock() - if s < opts.GroupInterval { - t.Fatalf("received batch too early after %v", s) - } - exp := removeEndsAt(types.AlertSlice{a1, a3}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } - } - } - - ag.stop() - - // Add an alert that started more than group_interval in the past. We expect - // immediate flushing. - // Finally, set all alerts to be resolved. After successful notify the aggregation group - // should empty itself. - ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) - go ag.run(ntfy) - - ag.insert(a1) - ag.insert(a2) - - // a2 lies way in the past so the initial group_wait should be skipped. - select { - case <-time.After(opts.GroupWait / 2): - t.Fatalf("expected immediate alert but received none") - - case batch := <-alertsCh: - exp := removeEndsAt(types.AlertSlice{a1, a2}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } - } - - for i := 0; i < 3; i++ { - // New alert should come in after group interval. - ag.insert(a3) - - select { - case <-time.After(2 * opts.GroupInterval): - t.Fatalf("expected new batch after group interval but received none") - - case batch := <-alertsCh: - lastCurMtx.Lock() - s := time.Since(last) - lastCurMtx.Unlock() - if s < opts.GroupInterval { - t.Fatalf("received batch too early after %v", s) - } - exp := removeEndsAt(types.AlertSlice{a1, a2, a3}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } - } - } - - // Resolve an alert, and it should be removed after the next batch was sent. - a1r := *a1 - a1r.EndsAt = time.Now() - ag.insert(&a1r) - exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) - - select { - case <-time.After(2 * opts.GroupInterval): - t.Fatalf("expected new batch after group interval but received none") - case batch := <-alertsCh: - lastCurMtx.Lock() - s := time.Since(last) - lastCurMtx.Unlock() - if s < opts.GroupInterval { - t.Fatalf("received batch too early after %v", s) - } - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } - } - - // Resolve all remaining alerts, they should be removed after the next batch was sent. - // Do not add a1r as it should have been deleted following the previous batch. - a2r, a3r := *a2, *a3 - resolved := types.AlertSlice{&a2r, &a3r} - for _, a := range resolved { - a.EndsAt = time.Now() - ag.insert(a) - } - - select { - case <-time.After(2 * opts.GroupInterval): - t.Fatalf("expected new batch after group interval but received none") - - case batch := <-alertsCh: - lastCurMtx.Lock() - s := time.Since(last) - lastCurMtx.Unlock() - if s < opts.GroupInterval { - t.Fatalf("received batch too early after %v", s) - } - sort.Sort(batch) - - if !reflect.DeepEqual(batch, resolved) { - t.Fatalf("expected alerts %v but got %v", resolved, batch) - } - - if !ag.empty() { - t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag) - } - } - - ag.stop() -} - -func TestGroupLabels(t *testing.T) { - a := &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v3", - }, - }, - } - - route := &Route{ - RouteOpts: RouteOpts{ - GroupBy: map[model.LabelName]struct{}{ - "a": {}, - "b": {}, - }, - GroupByAll: false, - }, - } - - expLs := model.LabelSet{ - "a": "v1", - "b": "v2", - } - - ls := getGroupLabels(a, route) - - if !reflect.DeepEqual(ls, expLs) { - t.Fatalf("expected labels are %v, but got %v", expLs, ls) - } -} - -func TestGroupByAllLabels(t *testing.T) { - a := &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v3", - }, - }, - } - - route := &Route{ - RouteOpts: RouteOpts{ - GroupBy: map[model.LabelName]struct{}{}, - GroupByAll: true, - }, - } - - expLs := model.LabelSet{ - "a": "v1", - "b": "v2", - "c": "v3", - } - - ls := getGroupLabels(a, route) - - if !reflect.DeepEqual(ls, expLs) { - t.Fatalf("expected labels are %v, but got %v", expLs, ls) - } -} - func TestGroups(t *testing.T) { confData := `receivers: - name: 'kafka' @@ -750,14 +427,18 @@ func TestDispatcher_DoMaintenance(t *testing.T) { ctx := context.Background() dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, promslog.NewNopLogger(), NewDispatcherMetrics(false, r)) - aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup) - aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup) + + // Initialize the dispatcher's aggrGroupsPerRoute directly (avoid copying the struct) + dispatcher.aggrGroupsPerRoute = routeGroups{ + groupsNum: &atomic.Int64{}, + limits: nilLimits{}, + } + groupsMap := dispatcher.aggrGroupsPerRoute.AddRoute(route) // Insert an aggregation group with no alerts. labels := model.LabelSet{"alertname": "1"} aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger()) - aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1 - dispatcher.aggrGroupsPerRoute = aggrGroups + groupsMap.AddGroup(aggrGroup1.fingerprint(), aggrGroup1) // Must run otherwise doMaintenance blocks on aggrGroup1.stop(). go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true }) diff --git a/dispatch/group.go b/dispatch/group.go new file mode 100644 index 0000000000..3ecf4f2430 --- /dev/null +++ b/dispatch/group.go @@ -0,0 +1,225 @@ +// Copyright 2018 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "context" + "fmt" + "log/slog" + "sort" + "sync" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/store" + "github.com/prometheus/alertmanager/types" +) + +// AlertGroup represents how alerts exist within an aggrGroup. +type AlertGroup struct { + Alerts types.AlertSlice + Labels model.LabelSet + Receiver string + GroupKey string + RouteID string +} + +type AlertGroups []*AlertGroup + +func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] } +func (ag AlertGroups) Less(i, j int) bool { + if ag[i].Labels.Equal(ag[j].Labels) { + return ag[i].Receiver < ag[j].Receiver + } + return ag[i].Labels.Before(ag[j].Labels) +} +func (ag AlertGroups) Len() int { return len(ag) } + +// aggrGroup aggregates alert fingerprints into groups to which a +// common set of routing options applies. +// It emits notifications in the specified intervals. +type aggrGroup struct { + labels model.LabelSet + opts *RouteOpts + logger *slog.Logger + routeID string + routeKey string + + alerts *store.Alerts + ctx context.Context + cancel func() + done chan struct{} + next *time.Timer + timeout func(time.Duration) time.Duration + + mtx sync.RWMutex + hasFlushed bool +} + +// newAggrGroup returns a new aggregation group. +func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup { + if to == nil { + to = func(d time.Duration) time.Duration { return d } + } + ag := &aggrGroup{ + labels: labels, + routeID: r.ID(), + routeKey: r.Key(), + opts: &r.RouteOpts, + timeout: to, + alerts: store.NewAlerts(), + done: make(chan struct{}), + } + ag.ctx, ag.cancel = context.WithCancel(ctx) + + ag.logger = logger.With("aggrGroup", ag) + + // Set an initial one-time wait before flushing + // the first batch of notifications. + ag.next = time.NewTimer(ag.opts.GroupWait) + + return ag +} + +func (ag *aggrGroup) fingerprint() model.Fingerprint { + return ag.labels.Fingerprint() +} + +func (ag *aggrGroup) GroupKey() string { + return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels) +} + +func (ag *aggrGroup) String() string { + return ag.GroupKey() +} + +func (ag *aggrGroup) run(nf notifyFunc) { + defer close(ag.done) + defer ag.next.Stop() + + for { + select { + case now := <-ag.next.C: + // Give the notifications time until the next flush to + // finish before terminating them. + ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval)) + + // The now time we retrieve from the ticker is the only reliable + // point of time reference for the subsequent notification pipeline. + // Calculating the current time directly is prone to flaky behavior, + // which usually only becomes apparent in tests. + ctx = notify.WithNow(ctx, now) + + // Populate context with information needed along the pipeline. + ctx = notify.WithGroupKey(ctx, ag.GroupKey()) + ctx = notify.WithGroupLabels(ctx, ag.labels) + ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) + ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) + ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) + ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) + ctx = notify.WithRouteID(ctx, ag.routeID) + + // Wait the configured interval before calling flush again. + ag.mtx.Lock() + ag.next.Reset(ag.opts.GroupInterval) + ag.hasFlushed = true + ag.mtx.Unlock() + + ag.flush(func(alerts ...*types.Alert) bool { + return nf(ctx, alerts...) + }) + + cancel() + + case <-ag.ctx.Done(): + return + } + } +} + +func (ag *aggrGroup) stop() { + // Calling cancel will terminate all in-process notifications + // and the run() loop. + ag.cancel() + <-ag.done +} + +// insert inserts the alert into the aggregation group. +func (ag *aggrGroup) insert(alert *types.Alert) { + if err := ag.alerts.Set(alert); err != nil { + ag.logger.Error("error on set alert", "err", err) + } + + // Immediately trigger a flush if the wait duration for this + // alert is already over. + ag.mtx.Lock() + defer ag.mtx.Unlock() + if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { + ag.next.Reset(0) + } +} + +func (ag *aggrGroup) empty() bool { + return ag.alerts.Empty() +} + +// flush sends notifications for all new alerts. +func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { + if ag.empty() { + return + } + + var ( + alerts = ag.alerts.List() + alertsSlice = make(types.AlertSlice, 0, len(alerts)) + resolvedSlice = make(types.AlertSlice, 0, len(alerts)) + now = time.Now() + ) + for _, alert := range alerts { + a := *alert + // Ensure that alerts don't resolve as time move forwards. + if a.ResolvedAt(now) { + resolvedSlice = append(resolvedSlice, &a) + } else { + a.EndsAt = time.Time{} + } + alertsSlice = append(alertsSlice, &a) + } + sort.Stable(alertsSlice) + + ag.logger.Debug("flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) + + if notify(alertsSlice...) { + // Delete all resolved alerts as we just sent a notification for them, + // and we don't want to send another one. However, we need to make sure + // that each resolved alert has not fired again during the flush as then + // we would delete an active alert thinking it was resolved. + if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil { + ag.logger.Error("error on delete alerts", "err", err) + } + } +} + +func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet { + groupLabels := model.LabelSet{} + for ln, lv := range alert.Labels { + if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll { + groupLabels[ln] = lv + } + } + + return groupLabels +} diff --git a/dispatch/group_test.go b/dispatch/group_test.go new file mode 100644 index 0000000000..ac1ea97567 --- /dev/null +++ b/dispatch/group_test.go @@ -0,0 +1,352 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "context" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" +) + +func TestAggrGroup(t *testing.T) { + lset := model.LabelSet{ + "a": "v1", + "b": "v2", + } + opts := &RouteOpts{ + Receiver: "n1", + GroupBy: map[model.LabelName]struct{}{ + "a": {}, + "b": {}, + }, + GroupWait: 1 * time.Second, + GroupInterval: 300 * time.Millisecond, + RepeatInterval: 1 * time.Hour, + } + route := &Route{ + RouteOpts: *opts, + } + + var ( + a1 = &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v3", + }, + StartsAt: time.Now().Add(time.Minute), + EndsAt: time.Now().Add(time.Hour), + }, + UpdatedAt: time.Now(), + } + a2 = &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v4", + }, + StartsAt: time.Now().Add(-time.Hour), + EndsAt: time.Now().Add(2 * time.Hour), + }, + UpdatedAt: time.Now(), + } + a3 = &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v5", + }, + StartsAt: time.Now().Add(time.Minute), + EndsAt: time.Now().Add(5 * time.Minute), + }, + UpdatedAt: time.Now(), + } + ) + + var ( + last = time.Now() + current = time.Now() + lastCurMtx = &sync.Mutex{} + alertsCh = make(chan types.AlertSlice) + ) + + ntfy := func(ctx context.Context, alerts ...*types.Alert) bool { + // Validate that the context is properly populated. + if _, ok := notify.Now(ctx); !ok { + t.Errorf("now missing") + } + if _, ok := notify.GroupKey(ctx); !ok { + t.Errorf("group key missing") + } + if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) { + t.Errorf("wrong group labels: %q", lbls) + } + if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver { + t.Errorf("wrong receiver: %q", rcv) + } + if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval { + t.Errorf("wrong repeat interval: %q", ri) + } + + lastCurMtx.Lock() + last = current + // Subtract a millisecond to allow for races. + current = time.Now().Add(-time.Millisecond) + lastCurMtx.Unlock() + + alertsCh <- types.AlertSlice(alerts) + + return true + } + + removeEndsAt := func(as types.AlertSlice) types.AlertSlice { + for i, a := range as { + ac := *a + ac.EndsAt = time.Time{} + as[i] = &ac + } + return as + } + + // Test regular situation where we wait for group_wait to send out alerts. + ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) + go ag.run(ntfy) + + ag.insert(a1) + + select { + case <-time.After(2 * opts.GroupWait): + t.Fatalf("expected initial batch after group_wait") + + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupWait { + t.Fatalf("received batch too early after %v", s) + } + exp := removeEndsAt(types.AlertSlice{a1}) + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + + for i := 0; i < 3; i++ { + // New alert should come in after group interval. + ag.insert(a3) + + select { + case <-time.After(2 * opts.GroupInterval): + t.Fatalf("expected new batch after group interval but received none") + + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { + t.Fatalf("received batch too early after %v", s) + } + exp := removeEndsAt(types.AlertSlice{a1, a3}) + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + } + + ag.stop() + + // Add an alert that started more than group_interval in the past. We expect + // immediate flushing. + // Finally, set all alerts to be resolved. After successful notify the aggregation group + // should empty itself. + ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) + go ag.run(ntfy) + + ag.insert(a1) + ag.insert(a2) + + // a2 lies way in the past so the initial group_wait should be skipped. + select { + case <-time.After(opts.GroupWait / 2): + t.Fatalf("expected immediate alert but received none") + + case batch := <-alertsCh: + exp := removeEndsAt(types.AlertSlice{a1, a2}) + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + + for i := 0; i < 3; i++ { + // New alert should come in after group interval. + ag.insert(a3) + + select { + case <-time.After(2 * opts.GroupInterval): + t.Fatalf("expected new batch after group interval but received none") + + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { + t.Fatalf("received batch too early after %v", s) + } + exp := removeEndsAt(types.AlertSlice{a1, a2, a3}) + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + } + + // Resolve an alert, and it should be removed after the next batch was sent. + a1r := *a1 + a1r.EndsAt = time.Now() + ag.insert(&a1r) + exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + + select { + case <-time.After(2 * opts.GroupInterval): + t.Fatalf("expected new batch after group interval but received none") + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { + t.Fatalf("received batch too early after %v", s) + } + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + + // Resolve all remaining alerts, they should be removed after the next batch was sent. + // Do not add a1r as it should have been deleted following the previous batch. + a2r, a3r := *a2, *a3 + resolved := types.AlertSlice{&a2r, &a3r} + for _, a := range resolved { + a.EndsAt = time.Now() + ag.insert(a) + } + + select { + case <-time.After(2 * opts.GroupInterval): + t.Fatalf("expected new batch after group interval but received none") + + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { + t.Fatalf("received batch too early after %v", s) + } + sort.Sort(batch) + + if !reflect.DeepEqual(batch, resolved) { + t.Fatalf("expected alerts %v but got %v", resolved, batch) + } + + if !ag.empty() { + t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag) + } + } + + ag.stop() +} + +func TestGroupLabels(t *testing.T) { + a := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v3", + }, + }, + } + + route := &Route{ + RouteOpts: RouteOpts{ + GroupBy: map[model.LabelName]struct{}{ + "a": {}, + "b": {}, + }, + GroupByAll: false, + }, + } + + expLs := model.LabelSet{ + "a": "v1", + "b": "v2", + } + + ls := getGroupLabels(a, route) + + if !reflect.DeepEqual(ls, expLs) { + t.Fatalf("expected labels are %v, but got %v", expLs, ls) + } +} + +func TestGroupByAllLabels(t *testing.T) { + a := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v3", + }, + }, + } + + route := &Route{ + RouteOpts: RouteOpts{ + GroupBy: map[model.LabelName]struct{}{}, + GroupByAll: true, + }, + } + + expLs := model.LabelSet{ + "a": "v1", + "b": "v2", + "c": "v3", + } + + ls := getGroupLabels(a, route) + + if !reflect.DeepEqual(ls, expLs) { + t.Fatalf("expected labels are %v, but got %v", expLs, ls) + } +} diff --git a/dispatch/limit.go b/dispatch/limit.go new file mode 100644 index 0000000000..4ce9639383 --- /dev/null +++ b/dispatch/limit.go @@ -0,0 +1,26 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +// Limits describes limits used by Dispatcher. +type Limits interface { + // MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have. + // 0 or negative value = unlimited. + // If dispatcher hits this limit, it will not create additional groups, but will log an error instead. + MaxNumberOfAggregationGroups() int +} + +type nilLimits struct{} + +func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 } diff --git a/dispatch/map.go b/dispatch/map.go new file mode 100644 index 0000000000..ffd9b89cc7 --- /dev/null +++ b/dispatch/map.go @@ -0,0 +1,135 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "sync" + + "go.uber.org/atomic" + + "github.com/prometheus/common/model" +) + +// routeGroups is a map of routes to fingerprintGroups. +// It is a nested map implementation which avoid lock contention. +// The outer map is protected by a single mutex. +// The inner maps are protected by dedicated mutexes. +// The shared atomic counter is used to track the number of nested groups. +// Limits are shared between all groups. +// // Each branch of the map can hold its own R(W)Lock. +type routeGroups struct { + mu sync.RWMutex + routeGroups map[*Route]*fingerprintGroups + groupsNum *atomic.Int64 + limits Limits +} + +// AddRoute adds a new route to the map, initializing the inner maps if needed. +// If the route already exists, it returns the existing fingerprintGroups. +func (rg *routeGroups) AddRoute(route *Route) *fingerprintGroups { + rg.mu.Lock() + defer rg.mu.Unlock() + if rg.routeGroups == nil { + rg.routeGroups = make(map[*Route]*fingerprintGroups) + } + if rg.routeGroups[route] == nil { + rg.routeGroups[route] = &fingerprintGroups{ + aggrGroups: make(map[model.Fingerprint]*aggrGroup), + groupsNum: rg.groupsNum, + limits: rg.limits, + } + } + return rg.routeGroups[route] +} + +// GetRoute returns the fingerprintGroups for the given route. +func (rg *routeGroups) GetRoute(route *Route) *fingerprintGroups { + rg.mu.RLock() + defer rg.mu.RUnlock() + return rg.routeGroups[route] +} + +// Range iterates over the routeGroups. +func (rg *routeGroups) Range(fn func(*Route, *fingerprintGroups) bool) { + rg.mu.RLock() + defer rg.mu.RUnlock() + for route, groups := range rg.routeGroups { + if !fn(route, groups) { + break + } + } +} + +// fingerprintGroups is a map of fingerprints to aggregation groups. +// It is protected by a dedicated RW mutex. +// It inherits the shared atomic counter from the parent routeGroups to track the number of total groups. +// It inherits the limits from the parent routeGroups. +type fingerprintGroups struct { + mu sync.RWMutex + aggrGroups map[model.Fingerprint]*aggrGroup + groupsNum *atomic.Int64 + limits Limits +} + +// LimitReached checks if the number of groups has reached the limit. +func (fg *fingerprintGroups) LimitReached() bool { + if limit := fg.limits.MaxNumberOfAggregationGroups(); limit > 0 && fg.groupsNum.Load() >= int64(limit) { + return true + } + return false +} + +// AddGroup adds a new aggregation group to the map, initializing the inner maps if needed. +// If the group already exists, it returns the existing aggregation group. +func (fg *fingerprintGroups) AddGroup(fp model.Fingerprint, ag *aggrGroup) (group *aggrGroup, count int64, limit int) { + fg.mu.Lock() + defer fg.mu.Unlock() + + if fg.aggrGroups == nil { + fg.aggrGroups = make(map[model.Fingerprint]*aggrGroup) + } + // Check if we've reached the rate limit before creating a new group. + if fg.LimitReached() { + return nil, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups() + } + fg.aggrGroups[fp] = ag + fg.groupsNum.Add(1) + return ag, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups() +} + +// RemoveGroup removes an aggregation group from the map. +func (fg *fingerprintGroups) RemoveGroup(fp model.Fingerprint) { + fg.mu.Lock() + defer fg.mu.Unlock() + delete(fg.aggrGroups, fp) + fg.groupsNum.Sub(1) +} + +// GetGroup returns an aggregation group by fingerprint. +func (fg *fingerprintGroups) GetGroup(fp model.Fingerprint) *aggrGroup { + fg.mu.RLock() + defer fg.mu.RUnlock() + return fg.aggrGroups[fp] +} + +// Range iterates over the fingerprintGroups. +func (fg *fingerprintGroups) Range(fn func(model.Fingerprint, *aggrGroup) bool) { + fg.mu.RLock() + defer fg.mu.RUnlock() + for fp, ag := range fg.aggrGroups { + if !fn(fp, ag) { + break + } + } +} diff --git a/dispatch/metric.go b/dispatch/metric.go new file mode 100644 index 0000000000..8010e389ab --- /dev/null +++ b/dispatch/metric.go @@ -0,0 +1,58 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// DispatcherMetrics represents metrics associated to a dispatcher. +type DispatcherMetrics struct { + aggrGroups prometheus.Gauge + processingDuration prometheus.Summary + aggrGroupLimitReached prometheus.Counter +} + +// NewDispatcherMetrics returns a new registered DispatchMetrics. +func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics { + m := DispatcherMetrics{ + aggrGroups: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "alertmanager_dispatcher_aggregation_groups", + Help: "Number of active aggregation groups", + }, + ), + processingDuration: prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: "alertmanager_dispatcher_alert_processing_duration_seconds", + Help: "Summary of latencies for the processing of alerts.", + }, + ), + aggrGroupLimitReached: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total", + Help: "Number of times when dispatcher failed to create new aggregation group due to limit.", + }, + ), + } + + if r != nil { + r.MustRegister(m.aggrGroups, m.processingDuration) + if registerLimitMetrics { + r.MustRegister(m.aggrGroupLimitReached) + } + } + + return &m +} diff --git a/dispatch/notify.go b/dispatch/notify.go new file mode 100644 index 0000000000..0ee39b0882 --- /dev/null +++ b/dispatch/notify.go @@ -0,0 +1,25 @@ +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatch + +import ( + "context" + + "github.com/prometheus/alertmanager/types" +) + +// notifyFunc is a function that performs notification for the alert +// with the given fingerprint. It aborts on context cancelation. +// Returns false iff notifying failed. +type notifyFunc func(context.Context, ...*types.Alert) bool diff --git a/dispatch/route.go b/dispatch/route.go index e174672d3f..cbea5ee3bc 100644 --- a/dispatch/route.go +++ b/dispatch/route.go @@ -1,4 +1,4 @@ -// Copyright 2015 Prometheus Team +// Copyright Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/dispatch/route_test.go b/dispatch/route_test.go index 6a9d7d4588..3be2ecd447 100644 --- a/dispatch/route_test.go +++ b/dispatch/route_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 Prometheus Team +// Copyright Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at