Skip to content

Commit 2eccedb

Browse files
committed
Delay hashing scrape targets
Instead of hashing scrape targets when they're received from service discovery, wait until the allocator actually needs the hash. This lets us avoid calculating hashes for targets filtered out by relabeling.
1 parent 00f2d89 commit 2eccedb

14 files changed

+84
-62
lines changed

cmd/otel-allocator/internal/allocation/allocator.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,24 @@ func (a *allocator) SetFallbackStrategy(strategy Strategy) {
7373
// SetTargets accepts a list of targets that will be used to make
7474
// load balancing decisions. This method should be called when there are
7575
// new targets discovered or existing targets are shutdown.
76-
func (a *allocator) SetTargets(targets map[string]*target.Item) {
76+
func (a *allocator) SetTargets(targets []*target.Item) {
7777
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", a.strategy.GetName()))
7878
defer timer.ObserveDuration()
7979

8080
if a.filter != nil {
8181
targets = a.filter.Apply(targets)
8282
}
8383
RecordTargetsKept(targets)
84+
targetMap := make(map[string]*target.Item, len(targets))
85+
for _, tg := range targets {
86+
targetMap[tg.Hash()] = tg
87+
}
8488

8589
a.m.Lock()
8690
defer a.m.Unlock()
8791

8892
// Check for target changes
89-
targetsDiff := diff.Maps(a.targetItems, targets)
93+
targetsDiff := diff.Maps(a.targetItems, targetMap)
9094
// If there are any additions or removals
9195
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
9296
a.handleTargets(targetsDiff)

cmd/otel-allocator/internal/allocation/allocator_test.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,7 @@ func TestAllocationCollision(t *testing.T) {
174174
firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "")
175175
secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "")
176176

177-
targetList := map[string]*target.Item{
178-
firstTarget.Hash(): firstTarget,
179-
secondTarget.Hash(): secondTarget,
180-
}
177+
targetList := []*target.Item{firstTarget, secondTarget}
181178

182179
// test that targets and collectors are added properly
183180
allocator.SetTargets(targetList)

cmd/otel-allocator/internal/allocation/least_weighted_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"math/rand"
10+
"slices"
1011
"testing"
1112

1213
"github.com/stretchr/testify/assert"
@@ -128,7 +129,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
128129
for index := range targets {
129130
shouldDelete := rand.Intn(toDelete) //nolint:gosec
130131
if counter < shouldDelete {
131-
delete(targets, index)
132+
targets = slices.Delete(targets, index, index)
132133
}
133134
counter++
134135
}
@@ -144,9 +145,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
144145
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
145146
}
146147
// adding targets at 'random'
147-
for _, item := range MakeNNewTargets(13, 3, 100) {
148-
targets[item.Hash()] = item
149-
}
148+
targets = append(targets, MakeNNewTargets(13, 3, 100)...)
150149
s.SetTargets(targets)
151150

152151
targetItemLen = len(s.TargetItems())

cmd/otel-allocator/internal/allocation/per_node_test.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ func TestAllocationPerNode(t *testing.T) {
5656
thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "")
5757
fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "")
5858

59-
targetList := map[string]*target.Item{
60-
firstTarget.Hash(): firstTarget,
61-
secondTarget.Hash(): secondTarget,
62-
thirdTarget.Hash(): thirdTarget,
63-
fourthTarget.Hash(): fourthTarget,
59+
targetList := []*target.Item{
60+
firstTarget,
61+
secondTarget,
62+
thirdTarget,
63+
fourthTarget,
6464
}
6565

6666
// test that targets and collectors are added properly
@@ -74,16 +74,16 @@ func TestAllocationPerNode(t *testing.T) {
7474
assert.Len(t, actualItems, expectedTargetLen)
7575

7676
// verify allocation to nodes
77-
for targetHash, item := range targetList {
78-
actualItem, found := actualItems[targetHash]
77+
for _, item := range targetList {
78+
actualItem, found := actualItems[item.Hash()]
7979
// if third target, should be skipped
8080
assert.True(t, found, "target with hash %s not found", item.Hash())
8181

8282
// only the first two targets should be allocated
8383
itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName)
8484

8585
// first two should be assigned one to each collector; if third target, should not be assigned
86-
if targetHash == thirdTarget.Hash() {
86+
if item.Hash() == thirdTarget.Hash() {
8787
assert.Len(t, itemsForCollector, 0)
8888
continue
8989
}
@@ -123,11 +123,11 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
123123
thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "")
124124
fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "")
125125

126-
targetList := map[string]*target.Item{
127-
firstTarget.Hash(): firstTarget,
128-
secondTarget.Hash(): secondTarget,
129-
thirdTarget.Hash(): thirdTarget,
130-
fourthTarget.Hash(): fourthTarget,
126+
targetList := []*target.Item{
127+
firstTarget,
128+
secondTarget,
129+
thirdTarget,
130+
fourthTarget,
131131
}
132132

133133
// test that targets and collectors are added properly
@@ -141,8 +141,8 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
141141
assert.Len(t, actualItems, expectedTargetLen)
142142

143143
// verify allocation to nodes
144-
for targetHash, item := range targetList {
145-
actualItem, found := actualItems[targetHash]
144+
for _, item := range targetList {
145+
actualItem, found := actualItems[item.Hash()]
146146

147147
assert.True(t, found, "target with hash %s not found", item.Hash())
148148

@@ -151,7 +151,7 @@ func TestAllocationPerNodeUsingFallback(t *testing.T) {
151151
// first two should be assigned one to each collector; if third target, it should be assigned
152152
// according to the fallback strategy which may assign it to the otherwise empty collector or
153153
// one of the others, depending on the strategy and collector loop order
154-
if targetHash == thirdTarget.Hash() {
154+
if item.Hash() == thirdTarget.Hash() {
155155
assert.Empty(t, item.GetNodeName())
156156
assert.NotZero(t, len(itemsForCollector))
157157
continue

cmd/otel-allocator/internal/allocation/strategy.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ var (
5050
type Option func(Allocator)
5151

5252
type Filter interface {
53-
Apply(map[string]*target.Item) map[string]*target.Item
53+
Apply([]*target.Item) []*target.Item
5454
}
5555

5656
func WithFilter(filter Filter) Option {
@@ -69,7 +69,7 @@ func WithFallbackStrategy(fallbackStrategy string) Option {
6969
}
7070
}
7171

72-
func RecordTargetsKept(targets map[string]*target.Item) {
72+
func RecordTargetsKept(targets []*target.Item) {
7373
TargetsRemaining.Set(float64(len(targets)))
7474
}
7575

@@ -90,7 +90,7 @@ func GetRegisteredAllocatorNames() []string {
9090

9191
type Allocator interface {
9292
SetCollectors(collectors map[string]*Collector)
93-
SetTargets(targets map[string]*target.Item)
93+
SetTargets(targets []*target.Item)
9494
TargetItems() map[string]*target.Item
9595
Collectors() map[string]*Collector
9696
GetTargetsForCollectorAndJob(collector string, job string) []*target.Item

cmd/otel-allocator/internal/allocation/testutils.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ func colIndex(index, numCols int) int {
2424
return index % numCols
2525
}
2626

27-
func MakeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*target.Item {
28-
toReturn := map[string]*target.Item{}
27+
func MakeNNewTargets(n int, numCollectors int, startingIndex int) []*target.Item {
28+
toReturn := []*target.Item{}
2929
for i := startingIndex; i < n+startingIndex; i++ {
3030
collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
3131
label := labels.Labels{
3232
{Name: "i", Value: strconv.Itoa(i)},
3333
{Name: "total", Value: strconv.Itoa(n + startingIndex)},
3434
}
3535
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, collector)
36-
toReturn[newTarget.Hash()] = newTarget
36+
toReturn = append(toReturn, newTarget)
3737
}
3838
return toReturn
3939
}
@@ -51,16 +51,16 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
5151
return toReturn
5252
}
5353

54-
func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*target.Item {
55-
toReturn := map[string]*target.Item{}
54+
func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) []*target.Item {
55+
toReturn := []*target.Item{}
5656
for i := startingIndex; i < n+startingIndex; i++ {
5757
label := labels.Labels{
5858
{Name: "i", Value: strconv.Itoa(i)},
5959
{Name: "total", Value: strconv.Itoa(n + startingIndex)},
6060
{Name: "__meta_kubernetes_pod_node_name", Value: "node-0"},
6161
}
6262
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, "")
63-
toReturn[newTarget.Hash()] = newTarget
63+
toReturn = append(toReturn, newTarget)
6464
}
6565
return toReturn
6666
}

cmd/otel-allocator/internal/prehook/prehook.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
type Hook interface {
14-
Apply(map[string]*target.Item) map[string]*target.Item
14+
Apply([]*target.Item) []*target.Item
1515
SetConfig(map[string][]*relabel.Config)
1616
GetConfig() map[string][]*relabel.Config
1717
}

cmd/otel-allocator/internal/prehook/relabel.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,34 @@ func newRelabelConfigTargetFilter(log logr.Logger) Hook {
2626
}
2727
}
2828

29-
func (tf *relabelConfigTargetFilter) Apply(targets map[string]*target.Item) map[string]*target.Item {
29+
func (tf *relabelConfigTargetFilter) Apply(targets []*target.Item) []*target.Item {
3030
numTargets := len(targets)
3131

3232
// need to wait until relabelCfg is set
3333
if len(tf.relabelCfg) == 0 {
3434
return targets
3535
}
3636

37+
var newTargets []*target.Item
38+
3739
// Note: jobNameKey != tItem.JobName (jobNameKey is hashed)
38-
for jobNameKey, tItem := range targets {
39-
var keepTarget bool
40+
for _, tItem := range targets {
41+
keepTarget := true
4042
lset := tItem.Labels
4143
for _, cfg := range tf.relabelCfg[tItem.JobName] {
4244
lset, keepTarget = relabel.Process(lset, cfg)
4345
if !keepTarget {
44-
delete(targets, jobNameKey)
4546
break // inner loop
4647
}
4748
}
49+
50+
if keepTarget {
51+
newTargets = append(newTargets, tItem)
52+
}
4853
}
4954

5055
tf.log.V(2).Info("Filtering complete", "seen", numTargets, "kept", len(targets))
51-
return targets
56+
return newTargets
5257
}
5358

5459
func (tf *relabelConfigTargetFilter) SetConfig(cfgs map[string][]*relabel.Config) {

cmd/otel-allocator/internal/prehook/relabel_test.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ func colIndex(index, numCols int) int {
167167
return index % numCols
168168
}
169169

170-
func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startingIndex int) (map[string]*target.Item, int, map[string]*target.Item, map[string][]*relabel.Config) {
171-
toReturn := map[string]*target.Item{}
172-
expectedMap := make(map[string]*target.Item)
170+
func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startingIndex int) ([]*target.Item, int, []*target.Item, map[string][]*relabel.Config) {
171+
toReturn := []*target.Item{}
172+
expected := []*target.Item{}
173173
numItemsRemaining := n
174174
relabelConfig := make(map[string][]*relabel.Config)
175175
for i := startingIndex; i < n+startingIndex; i++ {
@@ -189,15 +189,14 @@ func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startin
189189

190190
relabelConfig[jobName] = rCfgs[index].cfg
191191

192-
targetKey := newTarget.Hash()
193192
if relabelConfigs[index].isDrop {
194193
numItemsRemaining--
195194
} else {
196-
expectedMap[targetKey] = newTarget
195+
expected = append(expected, newTarget)
197196
}
198-
toReturn[targetKey] = newTarget
197+
toReturn = append(toReturn, newTarget)
199198
}
200-
return toReturn, numItemsRemaining, expectedMap, relabelConfig
199+
return toReturn, numItemsRemaining, expected, relabelConfig
201200
}
202201

203202
func TestApply(t *testing.T) {

cmd/otel-allocator/internal/server/mocks_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type mockAllocator struct {
1717
}
1818

1919
func (m *mockAllocator) SetCollectors(_ map[string]*allocation.Collector) {}
20-
func (m *mockAllocator) SetTargets(_ map[string]*target.Item) {}
20+
func (m *mockAllocator) SetTargets(_ []*target.Item) {}
2121
func (m *mockAllocator) Collectors() map[string]*allocation.Collector { return nil }
2222
func (m *mockAllocator) GetTargetsForCollectorAndJob(_ string, _ string) []*target.Item { return nil }
2323
func (m *mockAllocator) SetFilter(_ allocation.Filter) {}

cmd/otel-allocator/internal/server/server_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,12 @@ func TestServer_TargetsHandler(t *testing.T) {
160160
t.Run(tt.name, func(t *testing.T) {
161161
listenAddr := ":8080"
162162
s := NewServer(logger, tt.args.allocator, listenAddr)
163+
targets := []*target.Item{}
164+
for _, item := range tt.args.cMap {
165+
targets = append(targets, item)
166+
}
163167
tt.args.allocator.SetCollectors(map[string]*allocation.Collector{"test-collector": {Name: "test-collector"}})
164-
tt.args.allocator.SetTargets(tt.args.cMap)
168+
tt.args.allocator.SetTargets(targets)
165169
request := httptest.NewRequest("GET", fmt.Sprintf("/jobs/%s/targets?collector_id=%s", tt.args.job, tt.args.collector), nil)
166170
w := httptest.NewRecorder()
167171

cmd/otel-allocator/internal/target/discovery.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type Discoverer struct {
5454
scrapeConfigsUpdater scrapeConfigsUpdater
5555
targetSets map[string][]*targetgroup.Group
5656
triggerReload chan struct{}
57-
processTargetsCallBack func(targets map[string]*Item)
57+
processTargetsCallBack func(targets []*Item)
5858
mtxTargets sync.Mutex
5959
}
6060

@@ -66,7 +66,7 @@ type scrapeConfigsUpdater interface {
6666
UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error
6767
}
6868

69-
func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater, setTargets func(targets map[string]*Item)) *Discoverer {
69+
func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater, setTargets func(targets []*Item)) *Discoverer {
7070
return &Discoverer{
7171
log: log,
7272
manager: manager,
@@ -163,19 +163,25 @@ func (m *Discoverer) reloader() {
163163
func (m *Discoverer) Reload() {
164164
m.mtxScrape.Lock()
165165
var wg sync.WaitGroup
166-
targets := map[string]*Item{}
167166
timer := prometheus.NewTimer(processTargetsDuration)
168167
defer timer.ObserveDuration()
169168

169+
// count targets and preallocate
170+
targetCount := 0
171+
for _, groups := range m.targetSets {
172+
for _, group := range groups {
173+
targetCount += len(group.Targets)
174+
}
175+
}
176+
targets := make([]*Item, 0, targetCount)
177+
170178
for jobName, groups := range m.targetSets {
171179
wg.Add(1)
172180
// Run the sync in parallel as these take a while and at high load can't catch up.
173181
go func(jobName string, groups []*targetgroup.Group) {
174182
processedTargets := m.processTargetGroups(jobName, groups)
175183
m.mtxTargets.Lock()
176-
for k, v := range processedTargets {
177-
targets[k] = v
178-
}
184+
targets = append(targets, processedTargets...)
179185
m.mtxTargets.Unlock()
180186
wg.Done()
181187
}(jobName, groups)
@@ -186,10 +192,17 @@ func (m *Discoverer) Reload() {
186192
}
187193

188194
// processTargetGroups processes the target groups and returns a map of targets.
189-
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group) map[string]*Item {
195+
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group) []*Item {
190196
builder := labels.NewBuilder(labels.Labels{})
191197
timer := prometheus.NewTimer(processTargetGroupsDuration.WithLabelValues(jobName))
192-
targets := map[string]*Item{}
198+
199+
// count targets and preallocate
200+
targetCount := 0
201+
for _, group := range groups {
202+
targetCount += len(group.Targets)
203+
}
204+
targets := make([]*Item, 0, targetCount)
205+
193206
defer timer.ObserveDuration()
194207
var count float64 = 0
195208
for _, tg := range groups {
@@ -205,7 +218,7 @@ func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.G
205218
builder.Set(string(ln), string(lv))
206219
}
207220
item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "")
208-
targets[item.Hash()] = item
221+
targets = append(targets, item)
209222
}
210223
}
211224
targetsDiscovered.WithLabelValues(jobName).Set(count)

cmd/otel-allocator/internal/target/discovery_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestDiscovery(t *testing.T) {
6363
require.NoError(t, err)
6464
d := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics)
6565
results := make(chan []string)
66-
manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, func(targets map[string]*Item) {
66+
manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, func(targets []*Item) {
6767
var result []string
6868
for _, t := range targets {
6969
result = append(result, t.TargetURL)

0 commit comments

Comments
 (0)