@@ -29,9 +29,15 @@ var _ Allocator = &perNodeAllocator{}
29
29
const (
30
30
perNodeStrategyName = "per-node"
31
31
32
- nodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name"
32
+ podNodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name"
33
33
)
34
34
35
+ // perNodeAllocator makes decisions to distribute work among
36
+ // a number of OpenTelemetry collectors based on the node on which
37
+ // the collector is running. This allocator should be used only when
38
+ // collectors are running as daemon set (agent) on each node.
39
+ // Users need to call SetTargets when they have new targets in their
40
+ // clusters and call SetCollectors when the collectors have changed.
35
41
type perNodeAllocator struct {
36
42
// m protects collectors and targetItems for concurrent use.
37
43
m sync.RWMutex
@@ -48,6 +54,8 @@ type perNodeAllocator struct {
48
54
filter Filter
49
55
}
50
56
57
+ // SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
58
+ // This method is called when Collectors are added or removed.
51
59
func (allocator * perNodeAllocator ) SetCollectors (collectors map [string ]* Collector ) {
52
60
timer := prometheus .NewTimer (TimeToAssign .WithLabelValues ("SetCollectors" , perNodeStrategyName ))
53
61
defer timer .ObserveDuration ()
@@ -68,6 +76,8 @@ func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collecto
68
76
}
69
77
}
70
78
79
+ // handleCollectors receives the new and removed collectors and reconciles the current state.
80
+ // Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map.
71
81
func (allocator * perNodeAllocator ) handleCollectors (diff diff.Changes [* Collector ]) {
72
82
// Clear removed collectors
73
83
for _ , k := range diff .Removals () {
@@ -82,6 +92,9 @@ func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector
82
92
}
83
93
}
84
94
95
+ // SetTargets accepts a list of targets that will be used to make
96
+ // load balancing decisions. This method should be called when there are
97
+ // new targets discovered or existing targets are shutdown.
85
98
func (allocator * perNodeAllocator ) SetTargets (targets map [string ]* target.Item ) {
86
99
timer := prometheus .NewTimer (TimeToAssign .WithLabelValues ("SetTargets" , perNodeStrategyName ))
87
100
defer timer .ObserveDuration ()
@@ -138,12 +151,19 @@ func (allocator *perNodeAllocator) SetTargets(targets map[string]*target.Item) {
138
151
allocator .handleTargets (targetsDiff )
139
152
}
140
153
}
154
+
155
+ // handleTargets receives the new and removed targets and reconciles the current state.
156
+ // Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
157
+ // Any net-new additions are assigned to the collector on the same node as the target.
141
158
func (allocator * perNodeAllocator ) handleTargets (diff diff.Changes [* target.Item ]) {
142
159
// Check for removals
143
160
for k , item := range allocator .targetItems {
144
161
// if the current item is in the removals list
145
162
if _ , ok := diff .Removals ()[k ]; ok {
146
- c := allocator .collectors [item .CollectorName ]
163
+ c , ok := allocator .collectors [item .CollectorName ]
164
+ if ! ok {
165
+ continue
166
+ }
147
167
c .NumTargets --
148
168
delete (allocator .targetItems , k )
149
169
delete (allocator.targetItemsPerJobPerCollector [item.CollectorName ][item.JobName ], item .Hash ())
@@ -163,9 +183,15 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]
163
183
}
164
184
}
165
185
186
+ // addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems
187
+ // This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
188
+ // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
189
+ // INVARIANT: allocator.collectors must have at least 1 collector set.
190
+ // NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
191
+ // item while it's being encoded by the server JSON handler.
192
+ // Also, any targets that cannot be assigned to a collector due to no matching node name will be dropped.
166
193
func (allocator * perNodeAllocator ) addTargetToTargetItems (tg * target.Item ) {
167
194
chosenCollector := allocator .findCollector (tg .Labels )
168
- // TODO: How to handle this edge case? Can we have items without a collector?
169
195
if chosenCollector == nil {
170
196
allocator .log .V (2 ).Info ("Couldn't find a collector for the target item" , "item" , tg , "collectors" , allocator .collectors )
171
197
return
@@ -177,11 +203,15 @@ func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) {
177
203
TargetsPerCollector .WithLabelValues (chosenCollector .Name , leastWeightedStrategyName ).Set (float64 (chosenCollector .NumTargets ))
178
204
}
179
205
206
+ // findCollector finds the collector that matches the node of the target, on the basis of the
207
+ // pod node label.
208
+ // This method is called from within SetTargets and SetCollectors, whose caller
209
+ // acquires the needed lock. This method assumes there are is at least 1 collector set.
180
210
func (allocator * perNodeAllocator ) findCollector (labels model.LabelSet ) * Collector {
181
211
var col * Collector
182
212
for _ , v := range allocator .collectors {
183
- if nodeNameLabelValue , ok := labels [nodeNameLabel ]; ok {
184
- if v .Node == string (nodeNameLabelValue ) {
213
+ if podNodeNameLabelValue , ok := labels [podNodeNameLabel ]; ok {
214
+ if v .Node == string (podNodeNameLabelValue ) {
185
215
col = v
186
216
break
187
217
}
@@ -191,6 +221,9 @@ func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collect
191
221
return col
192
222
}
193
223
224
+ // addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
225
+ // this allows the allocator to respond without any extra allocations to http calls. The caller of this method
226
+ // has to acquire a lock.
194
227
func (allocator * perNodeAllocator ) addCollectorTargetItemMapping (tg * target.Item ) {
195
228
if allocator .targetItemsPerJobPerCollector [tg .CollectorName ] == nil {
196
229
allocator .targetItemsPerJobPerCollector [tg .CollectorName ] = make (map [string ]map [string ]bool )
@@ -201,6 +234,7 @@ func (allocator *perNodeAllocator) addCollectorTargetItemMapping(tg *target.Item
201
234
allocator.targetItemsPerJobPerCollector [tg.CollectorName ][tg.JobName ][tg .Hash ()] = true
202
235
}
203
236
237
+ // TargetItems returns a shallow copy of the targetItems map.
204
238
func (allocator * perNodeAllocator ) TargetItems () map [string ]* target.Item {
205
239
allocator .m .RLock ()
206
240
defer allocator .m .RUnlock ()
@@ -211,6 +245,7 @@ func (allocator *perNodeAllocator) TargetItems() map[string]*target.Item {
211
245
return targetItemsCopy
212
246
}
213
247
248
+ // Collectors returns a shallow copy of the collectors map.
214
249
func (allocator * perNodeAllocator ) Collectors () map [string ]* Collector {
215
250
allocator .m .RLock ()
216
251
defer allocator .m .RUnlock ()
@@ -239,6 +274,7 @@ func (allocator *perNodeAllocator) GetTargetsForCollectorAndJob(collector string
239
274
return targetItemsCopy
240
275
}
241
276
277
+ // SetFilter sets the filtering hook to use.
242
278
func (allocator * perNodeAllocator ) SetFilter (filter Filter ) {
243
279
allocator .filter = filter
244
280
}
0 commit comments