Skip to content

Commit a851bb1

Browse files
authored
Delay hashing scrape targets (#3777)
* Fix duplicate targets in benchmark * Add more target counts to the benchmark * Delay hashing scrape targets Instead of hashing scrape targets when they're received from service discovery, wait until the allocator actually needs the hash. This lets us avoid calculating hashes for targets filtered out by relabeling.
1 parent 3a74faf commit a851bb1

15 files changed

+164
-110
lines changed

cmd/otel-allocator/benchmark_test.go

+49-41
Original file line numberDiff line numberDiff line change
@@ -28,65 +28,70 @@ import (
2828
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/target"
2929
)
3030

31+
var targetCounts = []int{1000, 10000, 100000, 800000}
32+
3133
// BenchmarkProcessTargets benchmarks the whole target allocation pipeline. It starts with data the prometheus
3234
// discovery manager would normally output, and pushes it all the way into the allocator. It notably doe *not* check
3335
// the HTTP server afterward. Test data is chosen to be reasonably representative of what the Prometheus service discovery
3436
// outputs in the real world.
3537
func BenchmarkProcessTargets(b *testing.B) {
36-
numTargets := 800000
3738
targetsPerGroup := 5
3839
groupsPerJob := 20
39-
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
40-
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
41-
b.Run(strategy, func(b *testing.B) {
42-
targetDiscoverer := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
43-
targetDiscoverer.UpdateTsets(tsets)
44-
b.ResetTimer()
45-
for i := 0; i < b.N; i++ {
46-
targetDiscoverer.Reload()
47-
}
48-
})
40+
for _, numTargets := range targetCounts {
41+
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
42+
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
43+
b.Run(fmt.Sprintf("%s/%d", strategy, numTargets), func(b *testing.B) {
44+
targetDiscoverer := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
45+
targetDiscoverer.UpdateTsets(tsets)
46+
b.ResetTimer()
47+
for i := 0; i < b.N; i++ {
48+
targetDiscoverer.Reload()
49+
}
50+
})
51+
}
4952
}
5053
}
5154

5255
// BenchmarkProcessTargetsWithRelabelConfig is BenchmarkProcessTargets with a relabel config set. The relabel config
5356
// does not actually modify any records, but does force the prehook to perform any necessary conversions along the way.
5457
func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
55-
numTargets := 800000
5658
targetsPerGroup := 5
5759
groupsPerJob := 20
58-
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
59-
prehookConfig := make(map[string][]*relabel.Config, len(tsets))
60-
for jobName := range tsets {
61-
// keep all targets in half the jobs, drop the rest
62-
jobNrStr := strings.Split(jobName, "-")[1]
63-
jobNr, err := strconv.Atoi(jobNrStr)
64-
require.NoError(b, err)
65-
var action relabel.Action
66-
if jobNr%2 == 0 {
67-
action = "keep"
68-
} else {
69-
action = "drop"
60+
for _, numTargets := range targetCounts {
61+
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
62+
prehookConfig := make(map[string][]*relabel.Config, len(tsets))
63+
for jobName := range tsets {
64+
// keep all targets in half the jobs, drop the rest
65+
jobNrStr := strings.Split(jobName, "-")[1]
66+
jobNr, err := strconv.Atoi(jobNrStr)
67+
require.NoError(b, err)
68+
var action relabel.Action
69+
if jobNr%2 == 0 {
70+
action = "keep"
71+
} else {
72+
action = "drop"
73+
}
74+
prehookConfig[jobName] = []*relabel.Config{
75+
{
76+
Action: action,
77+
Regex: relabel.MustNewRegexp(".*"),
78+
SourceLabels: model.LabelNames{"__address__"},
79+
},
80+
}
7081
}
71-
prehookConfig[jobName] = []*relabel.Config{
72-
{
73-
Action: action,
74-
Regex: relabel.MustNewRegexp(".*"),
75-
SourceLabels: model.LabelNames{"__address__"},
76-
},
82+
83+
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
84+
b.Run(fmt.Sprintf("%s/%d", strategy, numTargets), func(b *testing.B) {
85+
targetDiscoverer := createTestDiscoverer(strategy, prehookConfig)
86+
targetDiscoverer.UpdateTsets(tsets)
87+
b.ResetTimer()
88+
for i := 0; i < b.N; i++ {
89+
targetDiscoverer.Reload()
90+
}
91+
})
7792
}
7893
}
7994

80-
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
81-
b.Run(strategy, func(b *testing.B) {
82-
targetDiscoverer := createTestDiscoverer(strategy, prehookConfig)
83-
targetDiscoverer.UpdateTsets(tsets)
84-
b.ResetTimer()
85-
for i := 0; i < b.N; i++ {
86-
targetDiscoverer.Reload()
87-
}
88-
})
89-
}
9095
}
9196

9297
func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[string][]*targetgroup.Group {
@@ -140,7 +145,10 @@ func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[str
140145
}
141146
targets := []model.LabelSet{}
142147
for i := 0; i < numTargets; i++ {
143-
targets = append(targets, exampleTarget.Clone())
148+
newTarget := exampleTarget.Clone()
149+
// ensure each target has a unique label to avoid deduplication
150+
newTarget["target_id"] = model.LabelValue(strconv.Itoa(i))
151+
targets = append(targets, newTarget)
144152
}
145153
groups := make([]*targetgroup.Group, numGroups)
146154
for i := 0; i < numGroups; i++ {

cmd/otel-allocator/internal/allocation/allocator.go

+33-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package allocation
55

66
import (
77
"errors"
8+
"runtime"
9+
"slices"
810
"sync"
911

1012
"github.com/go-logr/logr"
@@ -73,20 +75,22 @@ func (a *allocator) SetFallbackStrategy(strategy Strategy) {
7375
// SetTargets accepts a list of targets that will be used to make
7476
// load balancing decisions. This method should be called when there are
7577
// new targets discovered or existing targets are shutdown.
76-
func (a *allocator) SetTargets(targets map[string]*target.Item) {
78+
func (a *allocator) SetTargets(targets []*target.Item) {
7779
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", a.strategy.GetName()))
7880
defer timer.ObserveDuration()
7981

8082
if a.filter != nil {
8183
targets = a.filter.Apply(targets)
8284
}
8385
RecordTargetsKept(targets)
86+
concurrency := runtime.NumCPU() * 2 // determined experimentally
87+
targetMap := buildTargetMap(targets, concurrency)
8488

8589
a.m.Lock()
8690
defer a.m.Unlock()
8791

8892
// Check for target changes
89-
targetsDiff := diff.Maps(a.targetItems, targets)
93+
targetsDiff := diff.Maps(a.targetItems, targetMap)
9094
// If there are any additions or removals
9195
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
9296
a.handleTargets(targetsDiff)
@@ -302,3 +306,30 @@ func (a *allocator) handleCollectors(diff diff.Changes[*Collector]) {
302306
TargetsUnassigned.Set(float64(unassignedTargets))
303307
}
304308
}
309+
310+
const minChunkSize = 100 // for small target counts, it's not worth it to spawn a lot of goroutines
311+
312+
// buildTargetMap builds a map of targets, using their hashes as keys. It does this concurrently, and the concurrency
313+
// is configurable via the concurrency parameter. We do this in parallel because target hashing is surprisingly
314+
// expensive.
315+
func buildTargetMap(targets []*target.Item, concurrency int) map[string]*target.Item {
316+
// technically there may be duplicates, so this may overallocate, but in the majority of cases it will be exact
317+
result := make(map[string]*target.Item, len(targets))
318+
chunkSize := len(targets) / concurrency
319+
chunkSize = max(chunkSize, minChunkSize)
320+
wg := sync.WaitGroup{}
321+
for chunk := range slices.Chunk(targets, chunkSize) {
322+
wg.Add(1)
323+
go func(ch []*target.Item) {
324+
defer wg.Done()
325+
for _, item := range ch {
326+
item.Hash()
327+
}
328+
}(chunk)
329+
}
330+
wg.Wait()
331+
for _, item := range targets {
332+
result[item.Hash()] = item
333+
}
334+
return result
335+
}

cmd/otel-allocator/internal/allocation/allocator_test.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,7 @@ func TestAllocationCollision(t *testing.T) {
174174
firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "")
175175
secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "")
176176

177-
targetList := map[string]*target.Item{
178-
firstTarget.Hash(): firstTarget,
179-
secondTarget.Hash(): secondTarget,
180-
}
177+
targetList := []*target.Item{firstTarget, secondTarget}
181178

182179
// test that targets and collectors are added properly
183180
allocator.SetTargets(targetList)

cmd/otel-allocator/internal/allocation/least_weighted_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"math/rand"
10+
"slices"
1011
"testing"
1112

1213
"github.com/stretchr/testify/assert"
@@ -128,7 +129,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
128129
for index := range targets {
129130
shouldDelete := rand.Intn(toDelete) //nolint:gosec
130131
if counter < shouldDelete {
131-
delete(targets, index)
132+
targets = slices.Delete(targets, index, index)
132133
}
133134
counter++
134135
}
@@ -144,9 +145,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
144145
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
145146
}
146147
// adding targets at 'random'
147-
for _, item := range MakeNNewTargets(13, 3, 100) {
148-
targets[item.Hash()] = item
149-
}
148+
targets = append(targets, MakeNNewTargets(13, 3, 100)...)
150149
s.SetTargets(targets)
151150

152151
targetItemLen = len(s.TargetItems())

cmd/otel-allocator/internal/allocation/per_node_test.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ func TestAllocationPerNode(t *testing.T) {
5656
thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "")
5757
fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "")
5858

59-
targetList := map[string]*target.Item{
60-
firstTarget.Hash(): firstTarget,
61-
secondTarget.Hash(): secondTarget,
62-
thirdTarget.Hash(): thirdTarget,
63-
fourthTarget.Hash(): fourthTarget,
59+
targetList := []*target.Item{
60+
firstTarget,
61+
secondTarget,
62+
thirdTarget,
63+
fourthTarget,
6464
}
6565

6666
// test that targets and collectors are added properly
@@ -74,16 +74,16 @@ func TestAllocationPerNode(t *testing.T) {
7474
assert.Len(t, actualItems, expectedTargetLen)
7575

7676
// verify allocation to nodes
77-
for targetHash, item := range targetList {
78-
actualItem, found := actualItems[targetHash]
77+
for _, item := range targetList {
78+
actualItem, found := actualItems[item.Hash()]
7979
// if third target, should be skipped
8080
assert.True(t, found, "target with hash %s not found", item.Hash())
8181

8282
// only the first two targets should be allocated
8383
itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName)
8484

8585
// first two should be assigned one to each collector; if third target, should not be assigned
86-
if targetHash == thirdTarget.Hash() {
86+
if item.Hash() == thirdTarget.Hash() {
8787
assert.Len(t, itemsForCollector, 0)
8888
continue
8989
}
@@ -123,11 +123,11 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
123123
thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "")
124124
fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "")
125125

126-
targetList := map[string]*target.Item{
127-
firstTarget.Hash(): firstTarget,
128-
secondTarget.Hash(): secondTarget,
129-
thirdTarget.Hash(): thirdTarget,
130-
fourthTarget.Hash(): fourthTarget,
126+
targetList := []*target.Item{
127+
firstTarget,
128+
secondTarget,
129+
thirdTarget,
130+
fourthTarget,
131131
}
132132

133133
// test that targets and collectors are added properly
@@ -141,8 +141,8 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
141141
assert.Len(t, actualItems, expectedTargetLen)
142142

143143
// verify allocation to nodes
144-
for targetHash, item := range targetList {
145-
actualItem, found := actualItems[targetHash]
144+
for _, item := range targetList {
145+
actualItem, found := actualItems[item.Hash()]
146146

147147
assert.True(t, found, "target with hash %s not found", item.Hash())
148148

@@ -151,7 +151,7 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
151151
// first two should be assigned one to each collector; if third target, it should be assigned
152152
// according to the fallback strategy which may assign it to the otherwise empty collector or
153153
// one of the others, depending on the strategy and collector loop order
154-
if targetHash == thirdTarget.Hash() {
154+
if item.Hash() == thirdTarget.Hash() {
155155
assert.Empty(t, item.GetNodeName())
156156
assert.NotZero(t, len(itemsForCollector))
157157
continue

cmd/otel-allocator/internal/allocation/strategy.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ var (
5050
type Option func(Allocator)
5151

5252
type Filter interface {
53-
Apply(map[string]*target.Item) map[string]*target.Item
53+
Apply([]*target.Item) []*target.Item
5454
}
5555

5656
func WithFilter(filter Filter) Option {
@@ -69,7 +69,7 @@ func WithFallbackStrategy(fallbackStrategy string) Option {
6969
}
7070
}
7171

72-
func RecordTargetsKept(targets map[string]*target.Item) {
72+
func RecordTargetsKept(targets []*target.Item) {
7373
TargetsRemaining.Set(float64(len(targets)))
7474
}
7575

@@ -90,7 +90,7 @@ func GetRegisteredAllocatorNames() []string {
9090

9191
type Allocator interface {
9292
SetCollectors(collectors map[string]*Collector)
93-
SetTargets(targets map[string]*target.Item)
93+
SetTargets(targets []*target.Item)
9494
TargetItems() map[string]*target.Item
9595
Collectors() map[string]*Collector
9696
GetTargetsForCollectorAndJob(collector string, job string) []*target.Item

cmd/otel-allocator/internal/allocation/testutils.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ func colIndex(index, numCols int) int {
2424
return index % numCols
2525
}
2626

27-
func MakeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*target.Item {
28-
toReturn := map[string]*target.Item{}
27+
func MakeNNewTargets(n int, numCollectors int, startingIndex int) []*target.Item {
28+
toReturn := []*target.Item{}
2929
for i := startingIndex; i < n+startingIndex; i++ {
3030
collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
3131
label := labels.Labels{
3232
{Name: "i", Value: strconv.Itoa(i)},
3333
{Name: "total", Value: strconv.Itoa(n + startingIndex)},
3434
}
3535
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, collector)
36-
toReturn[newTarget.Hash()] = newTarget
36+
toReturn = append(toReturn, newTarget)
3737
}
3838
return toReturn
3939
}
@@ -51,16 +51,16 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
5151
return toReturn
5252
}
5353

54-
func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*target.Item {
55-
toReturn := map[string]*target.Item{}
54+
func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) []*target.Item {
55+
toReturn := []*target.Item{}
5656
for i := startingIndex; i < n+startingIndex; i++ {
5757
label := labels.Labels{
5858
{Name: "i", Value: strconv.Itoa(i)},
5959
{Name: "total", Value: strconv.Itoa(n + startingIndex)},
6060
{Name: "__meta_kubernetes_pod_node_name", Value: "node-0"},
6161
}
6262
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, "")
63-
toReturn[newTarget.Hash()] = newTarget
63+
toReturn = append(toReturn, newTarget)
6464
}
6565
return toReturn
6666
}

cmd/otel-allocator/internal/prehook/prehook.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
type Hook interface {
14-
Apply(map[string]*target.Item) map[string]*target.Item
14+
Apply([]*target.Item) []*target.Item
1515
SetConfig(map[string][]*relabel.Config)
1616
GetConfig() map[string][]*relabel.Config
1717
}

0 commit comments

Comments
 (0)