From 6defa78f3f3e2442b2a481c8bfd1f0e3add44787 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 5 Dec 2023 16:30:16 +0100 Subject: [PATCH 01/25] Add node name to Collector struct Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/consistent_hashing.go | 2 +- cmd/otel-allocator/allocation/least_weighted.go | 2 +- cmd/otel-allocator/allocation/strategy.go | 9 +++++++-- cmd/otel-allocator/allocation/strategy_test.go | 10 +++++----- cmd/otel-allocator/collector/collector.go | 4 ++-- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index d98aca72f6..3a22d437db 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -161,7 +161,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect } // Insert the new collectors for _, i := range diff.Additions() { - c.collectors[i.Name] = NewCollector(i.Name) + c.collectors[i.Name] = NewCollector(i.Name, i.Node) c.consistentHasher.Add(c.collectors[i.Name]) } diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 6ae9c5eb2b..486b4ccaf6 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -191,7 +191,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col } // Insert the new collectors for _, i := range diff.Additions() { - allocator.collectors[i.Name] = NewCollector(i.Name) + allocator.collectors[i.Name] = NewCollector(i.Name, i.Node) } if allocateTargets { for _, item := range allocator.targetItems { diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index b994557732..880f6de4ca 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -106,6 +106,7 @@ var _ consistent.Member = Collector{} // This struct can be extended with information like annotations and labels in the future. type Collector struct { Name string + Node string NumTargets int } @@ -117,8 +118,8 @@ func (c Collector) String() string { return c.Name } -func NewCollector(name string) *Collector { - return &Collector{Name: name} +func NewCollector(name, node string) *Collector { + return &Collector{Name: name, Node: node} } func init() { @@ -130,4 +131,8 @@ func init() { if err != nil { panic(err) } + err = Register(perNodeStrategyName, newPerNodeAllocator) + if err != nil { + panic(err) + } } diff --git a/cmd/otel-allocator/allocation/strategy_test.go b/cmd/otel-allocator/allocation/strategy_test.go index c12529d8d8..10c61f5365 100644 --- a/cmd/otel-allocator/allocation/strategy_test.go +++ b/cmd/otel-allocator/allocation/strategy_test.go @@ -89,11 +89,11 @@ func Benchmark_Setting(b *testing.B) { } func TestCollectorDiff(t *testing.T) { - collector0 := NewCollector("collector-0") - collector1 := NewCollector("collector-1") - collector2 := NewCollector("collector-2") - collector3 := NewCollector("collector-3") - collector4 := NewCollector("collector-4") + collector0 := NewCollector("collector-0", "") + collector1 := NewCollector("collector-1", "") + collector2 := NewCollector("collector-2", "") + collector3 := NewCollector("collector-3", "") + collector4 := NewCollector("collector-4", "") type args struct { current map[string]*Collector new map[string]*Collector diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index eaeee8e16c..d5fdc49bea 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -77,7 +77,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( for i := range pods.Items { pod := pods.Items[i] if pod.GetObjectMeta().GetDeletionTimestamp() == nil { - collectorMap[pod.Name] = allocation.NewCollector(pod.Name) + collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) } } @@ -130,7 +130,7 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap switch event.Type { //nolint:exhaustive case watch.Added: - collectorMap[pod.Name] = allocation.NewCollector(pod.Name) + collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) case watch.Deleted: delete(collectorMap, pod.Name) } From 318fd6c2204baa63797c95d83f596ca82a136e90 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 5 Dec 2023 16:31:03 +0100 Subject: [PATCH 02/25] Add per node allocation algorithm Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 259 ++++++++++++++++++ .../allocation/per_node_test.go | 17 ++ 2 files changed, 276 insertions(+) create mode 100644 cmd/otel-allocator/allocation/per_node.go create mode 100644 cmd/otel-allocator/allocation/per_node_test.go diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go new file mode 100644 index 0000000000..05489a9090 --- /dev/null +++ b/cmd/otel-allocator/allocation/per_node.go @@ -0,0 +1,259 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocation + +import ( + "sync" + + "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +var _ Allocator = &perNodeAllocator{} + +const ( + perNodeStrategyName = "per-node" + + nodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name" +) + +type perNodeAllocator struct { + // m protects collectors and targetItems for concurrent use. + m sync.RWMutex + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]*Collector + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]*target.Item + + // collectorKey -> job -> target item hash -> true + targetItemsPerJobPerCollector map[string]map[string]map[string]bool + + log logr.Logger + + filter Filter +} + +func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collector) { + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", perNodeStrategyName)) + defer timer.ObserveDuration() + + CollectorsAllocatable.WithLabelValues(perNodeStrategyName).Set(float64(len(collectors))) + if len(collectors) == 0 { + allocator.log.Info("No collector instances present") + return + } + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for collector changes + collectorsDiff := diff.Maps(allocator.collectors, collectors) + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + allocator.handleCollectors(collectorsDiff) + } +} + +func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector]) { + // Clear removed collectors + for _, k := range diff.Removals() { + delete(allocator.collectors, k.Name) + delete(allocator.targetItemsPerJobPerCollector, k.Name) + TargetsPerCollector.WithLabelValues(k.Name, perNodeStrategyName).Set(0) + } + + // Insert the new collectors + for _, i := range diff.Additions() { + allocator.collectors[i.Name] = NewCollector(i.Name, i.Node) + } +} + +func (allocator *perNodeAllocator) SetTargets(targets map[string]*target.Item) { + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", perNodeStrategyName)) + defer timer.ObserveDuration() + + if allocator.filter != nil { + targets = allocator.filter.Apply(targets) + } + RecordTargetsKept(targets) + + allocator.m.Lock() + defer allocator.m.Unlock() + + if len(allocator.collectors) == 0 { + allocator.log.Info("No collector instances present, saving targets to allocate to collector(s)") + // If there were no targets discovered previously, assign this as the new set of target items + if len(allocator.targetItems) == 0 { + allocator.log.Info("Not discovered any targets previously, saving targets found to the targetItems set") + for k, item := range targets { + allocator.targetItems[k] = item + } + } else { + // If there were previously discovered targets, add or remove accordingly + targetsDiffEmptyCollectorSet := diff.Maps(allocator.targetItems, targets) + + // Check for additions + if len(targetsDiffEmptyCollectorSet.Additions()) > 0 { + allocator.log.Info("New targets discovered, adding new targets to the targetItems set") + for k, item := range targetsDiffEmptyCollectorSet.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Add item to item pool + allocator.targetItems[k] = item + } + } + } + + // Check for deletions + if len(targetsDiffEmptyCollectorSet.Removals()) > 0 { + allocator.log.Info("Targets removed, Removing targets from the targetItems set") + for k := range targetsDiffEmptyCollectorSet.Removals() { + // Delete item from target items + delete(allocator.targetItems, k) + } + } + } + return + } + // Check for target changes + targetsDiff := diff.Maps(allocator.targetItems, targets) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + allocator.handleTargets(targetsDiff) + } +} +func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]) { + // Check for removals + for k, item := range allocator.targetItems { + // if the current item is in the removals list + if _, ok := diff.Removals()[k]; ok { + c := allocator.collectors[item.CollectorName] + c.NumTargets-- + delete(allocator.targetItems, k) + delete(allocator.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash()) + TargetsPerCollector.WithLabelValues(item.CollectorName, perNodeStrategyName).Set(float64(c.NumTargets)) + } + } + + // Check for additions + for k, item := range diff.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Add item to item pool and assign a collector + allocator.addTargetToTargetItems(item) + } + } +} + +func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) { + chosenCollector := allocator.findCollector(tg.Labels) + // TODO: How to handle this edge case? Can we have items without a collector? + if chosenCollector == nil { + allocator.log.V(2).Info("Couldn't find a collector for the target item", "item", tg, "collectors", allocator.collectors) + return + } + tg.CollectorName = chosenCollector.Name + allocator.targetItems[tg.Hash()] = tg + allocator.addCollectorTargetItemMapping(tg) + chosenCollector.NumTargets++ + TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) +} + +func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collector { + var col *Collector + for _, v := range allocator.collectors { + if nodeNameLabelValue, ok := labels[nodeNameLabel]; ok { + if v.Node == string(nodeNameLabelValue) { + col = v + break + } + } + } + + return col +} + +func (allocator *perNodeAllocator) addCollectorTargetItemMapping(tg *target.Item) { + if allocator.targetItemsPerJobPerCollector[tg.CollectorName] == nil { + allocator.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) + } + if allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil { + allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool) + } + allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true +} + +func (allocator *perNodeAllocator) TargetItems() map[string]*target.Item { + allocator.m.RLock() + defer allocator.m.RUnlock() + targetItemsCopy := make(map[string]*target.Item) + for k, v := range allocator.targetItems { + targetItemsCopy[k] = v + } + return targetItemsCopy +} + +func (allocator *perNodeAllocator) Collectors() map[string]*Collector { + allocator.m.RLock() + defer allocator.m.RUnlock() + collectorsCopy := make(map[string]*Collector) + for k, v := range allocator.collectors { + collectorsCopy[k] = v + } + return collectorsCopy +} + +func (allocator *perNodeAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { + allocator.m.RLock() + defer allocator.m.RUnlock() + if _, ok := allocator.targetItemsPerJobPerCollector[collector]; !ok { + return []*target.Item{} + } + if _, ok := allocator.targetItemsPerJobPerCollector[collector][job]; !ok { + return []*target.Item{} + } + targetItemsCopy := make([]*target.Item, len(allocator.targetItemsPerJobPerCollector[collector][job])) + index := 0 + for targetHash := range allocator.targetItemsPerJobPerCollector[collector][job] { + targetItemsCopy[index] = allocator.targetItems[targetHash] + index++ + } + return targetItemsCopy +} + +func (allocator *perNodeAllocator) SetFilter(filter Filter) { + allocator.filter = filter +} + +func newPerNodeAllocator(log logr.Logger, opts ...AllocationOption) Allocator { + pnAllocator := &perNodeAllocator{ + log: log, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*target.Item), + targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool), + } + + for _, opt := range opts { + opt(pnAllocator) + } + + return pnAllocator +} diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go new file mode 100644 index 0000000000..b05793e31c --- /dev/null +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocation + +// TODO: Add tests From d298b69d759faa9508bc4573e47fad59c21d5616 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 7 Dec 2023 16:46:56 +0100 Subject: [PATCH 03/25] More docs Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 46 ++++++++++++++++++++--- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 05489a9090..6cda34b0fd 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -29,9 +29,15 @@ var _ Allocator = &perNodeAllocator{} const ( perNodeStrategyName = "per-node" - nodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name" + podNodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name" ) +// perNodeAllocator makes decisions to distribute work among +// a number of OpenTelemetry collectors based on the node on which +// the collector is running. This allocator should be used only when +// collectors are running as daemon set (agent) on each node. +// Users need to call SetTargets when they have new targets in their +// clusters and call SetCollectors when the collectors have changed. type perNodeAllocator struct { // m protects collectors and targetItems for concurrent use. m sync.RWMutex @@ -48,6 +54,8 @@ type perNodeAllocator struct { filter Filter } +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// This method is called when Collectors are added or removed. func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collector) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", perNodeStrategyName)) defer timer.ObserveDuration() @@ -68,6 +76,8 @@ func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collecto } } +// handleCollectors receives the new and removed collectors and reconciles the current state. +// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map. func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector]) { // Clear removed collectors for _, k := range diff.Removals() { @@ -82,6 +92,9 @@ func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector } } +// SetTargets accepts a list of targets that will be used to make +// load balancing decisions. This method should be called when there are +// new targets discovered or existing targets are shutdown. func (allocator *perNodeAllocator) SetTargets(targets map[string]*target.Item) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", perNodeStrategyName)) defer timer.ObserveDuration() @@ -138,12 +151,19 @@ func (allocator *perNodeAllocator) SetTargets(targets map[string]*target.Item) { allocator.handleTargets(targetsDiff) } } + +// handleTargets receives the new and removed targets and reconciles the current state. +// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector. +// Any net-new additions are assigned to the collector on the same node as the target. func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]) { // Check for removals for k, item := range allocator.targetItems { // if the current item is in the removals list if _, ok := diff.Removals()[k]; ok { - c := allocator.collectors[item.CollectorName] + c, ok := allocator.collectors[item.CollectorName] + if !ok { + continue + } c.NumTargets-- delete(allocator.targetItems, k) delete(allocator.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash()) @@ -163,9 +183,15 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] } } +// addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, which acquire the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. +// INVARIANT: allocator.collectors must have at least 1 collector set. +// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target +// item while it's being encoded by the server JSON handler. +// Also, any targets that cannot be assigned to a collector due to no matching node name will be dropped. func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) { chosenCollector := allocator.findCollector(tg.Labels) - // TODO: How to handle this edge case? Can we have items without a collector? if chosenCollector == nil { allocator.log.V(2).Info("Couldn't find a collector for the target item", "item", tg, "collectors", allocator.collectors) return @@ -177,11 +203,15 @@ func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) { TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) } +// findCollector finds the collector that matches the node of the target, on the basis of the +// pod node label. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. This method assumes there are is at least 1 collector set. func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collector { var col *Collector for _, v := range allocator.collectors { - if nodeNameLabelValue, ok := labels[nodeNameLabel]; ok { - if v.Node == string(nodeNameLabelValue) { + if podNodeNameLabelValue, ok := labels[podNodeNameLabel]; ok { + if v.Node == string(podNodeNameLabelValue) { col = v break } @@ -191,6 +221,9 @@ func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collect return col } +// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets +// this allows the allocator to respond without any extra allocations to http calls. The caller of this method +// has to acquire a lock. func (allocator *perNodeAllocator) addCollectorTargetItemMapping(tg *target.Item) { if allocator.targetItemsPerJobPerCollector[tg.CollectorName] == nil { allocator.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) @@ -201,6 +234,7 @@ func (allocator *perNodeAllocator) addCollectorTargetItemMapping(tg *target.Item allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true } +// TargetItems returns a shallow copy of the targetItems map. func (allocator *perNodeAllocator) TargetItems() map[string]*target.Item { allocator.m.RLock() defer allocator.m.RUnlock() @@ -211,6 +245,7 @@ func (allocator *perNodeAllocator) TargetItems() map[string]*target.Item { return targetItemsCopy } +// Collectors returns a shallow copy of the collectors map. func (allocator *perNodeAllocator) Collectors() map[string]*Collector { allocator.m.RLock() defer allocator.m.RUnlock() @@ -239,6 +274,7 @@ func (allocator *perNodeAllocator) GetTargetsForCollectorAndJob(collector string return targetItemsCopy } +// SetFilter sets the filtering hook to use. func (allocator *perNodeAllocator) SetFilter(filter Filter) { allocator.filter = filter } From 886a80cf8489808ddd0cb65b69d61358d4f2c8ee Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 7 Dec 2023 16:47:15 +0100 Subject: [PATCH 04/25] Add tests Signed-off-by: Matej Gera --- .../allocation/allocatortest.go | 1 + .../allocation/per_node_test.go | 67 ++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/cmd/otel-allocator/allocation/allocatortest.go b/cmd/otel-allocator/allocation/allocatortest.go index 88312a80a5..dedb8fd739 100644 --- a/cmd/otel-allocator/allocation/allocatortest.go +++ b/cmd/otel-allocator/allocation/allocatortest.go @@ -52,6 +52,7 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector { toReturn[collector] = &Collector{ Name: collector, NumTargets: 0, + Node: fmt.Sprintf("node-%d", i), } } return toReturn diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index b05793e31c..caec92f6f8 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -14,4 +14,69 @@ package allocation -// TODO: Add tests +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var loggerPerNode = logf.Log.WithName("unit-tests") + +// Tests that two targets with the same target url and job name but different label set are both added. +func TestAllocationPerNode(t *testing.T) { + // prepare allocator with initial targets and collectors + s, _ := New("per-node", loggerPerNode) + + cols := MakeNCollectors(3, 0) + s.SetCollectors(cols) + firstLabels := model.LabelSet{ + "test": "test1", + podNodeNameLabel: "node-0", + } + secondLabels := model.LabelSet{ + "test": "test2", + podNodeNameLabel: "node-1", + } + // no label, should be skipped + thirdLabels := model.LabelSet{ + "test": "test3", + } + firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "") + thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "") + + targetList := map[string]*target.Item{ + firstTarget.Hash(): firstTarget, + secondTarget.Hash(): secondTarget, + thirdTarget.Hash(): thirdTarget, + } + + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify length + actualItems := s.TargetItems() + + // one target should be skipped + expectedTargetLen := len(targetList) - 1 + assert.Len(t, actualItems, expectedTargetLen) + + // verify allocation to nodes + for targetHash, item := range targetList { + actualItem, found := actualItems[targetHash] + // if third target, should be skipped + if targetHash != thirdTarget.Hash() { + assert.True(t, found, "target with hash %s not found", item.Hash()) + } else { + assert.False(t, found, "target with hash %s should not be found", item.Hash()) + return + } + + itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName) + assert.Len(t, itemsForCollector, 1) + assert.Equal(t, actualItem, itemsForCollector[0]) + } +} From 23089e3653542ff96ef5cdd79b25ad2bb47fa1dc Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 7 Dec 2023 17:01:44 +0100 Subject: [PATCH 05/25] Update APIs and docs with new strategy Signed-off-by: Matej Gera --- apis/v1alpha1/allocation_strategy.go | 5 ++++- apis/v1alpha1/opentelemetrycollector_types.go | 3 ++- .../manifests/opentelemetry.io_opentelemetrycollectors.yaml | 5 +++-- .../crd/bases/opentelemetry.io_opentelemetrycollectors.yaml | 5 +++-- docs/api.md | 4 ++-- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/apis/v1alpha1/allocation_strategy.go b/apis/v1alpha1/allocation_strategy.go index 49c7945171..9b65bd7e27 100644 --- a/apis/v1alpha1/allocation_strategy.go +++ b/apis/v1alpha1/allocation_strategy.go @@ -16,7 +16,7 @@ package v1alpha1 type ( // OpenTelemetryTargetAllocatorAllocationStrategy represent which strategy to distribute target to each collector - // +kubebuilder:validation:Enum=least-weighted;consistent-hashing + // +kubebuilder:validation:Enum=least-weighted;consistent-hashing;per-node OpenTelemetryTargetAllocatorAllocationStrategy string ) @@ -26,4 +26,7 @@ const ( // OpenTelemetryTargetAllocatorAllocationStrategyConsistentHashing targets will be consistently added to collectors, which allows a high-availability setup. OpenTelemetryTargetAllocatorAllocationStrategyConsistentHashing OpenTelemetryTargetAllocatorAllocationStrategy = "consistent-hashing" + + // OpenTelemetryTargetAllocatorAllocationStrategyPerNode targets will be assigned to the collector on the node they reside on (use only with daemon set). + OpenTelemetryTargetAllocatorAllocationStrategyPerNode OpenTelemetryTargetAllocatorAllocationStrategy = "per-node" ) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 07ae504dde..8dd15bb28b 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -297,7 +297,8 @@ type OpenTelemetryTargetAllocator struct { // +optional Resources v1.ResourceRequirements `json:"resources,omitempty"` // AllocationStrategy determines which strategy the target allocator should use for allocation. - // The current options are least-weighted and consistent-hashing. The default option is least-weighted + // The current options are least-weighted, consistent-hashing and per-node. The default is + // least-weighted. // +optional AllocationStrategy OpenTelemetryTargetAllocatorAllocationStrategy `json:"allocationStrategy,omitempty"` // FilterStrategy determines how to filter targets before allocating them among the collectors. diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index c89d429c48..b0935d1e07 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -4832,11 +4832,12 @@ spec: allocationStrategy: description: AllocationStrategy determines which strategy the target allocator should use for allocation. The current options - are least-weighted and consistent-hashing. The default option - is least-weighted + are least-weighted, consistent-hashing and per-node. The default + is least-weighted. enum: - least-weighted - consistent-hashing + - per-node type: string enabled: description: Enabled indicates whether to use a target allocation diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 6bc2e810ed..cbd4484449 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -4829,11 +4829,12 @@ spec: allocationStrategy: description: AllocationStrategy determines which strategy the target allocator should use for allocation. The current options - are least-weighted and consistent-hashing. The default option - is least-weighted + are least-weighted, consistent-hashing and per-node. The default + is least-weighted. enum: - least-weighted - consistent-hashing + - per-node type: string enabled: description: Enabled indicates whether to use a target allocation diff --git a/docs/api.md b/docs/api.md index 12a06d1a64..d35e40f625 100644 --- a/docs/api.md +++ b/docs/api.md @@ -18027,9 +18027,9 @@ TargetAllocator indicates a value which determines whether to spawn a target all allocationStrategy enum - AllocationStrategy determines which strategy the target allocator should use for allocation. The current options are least-weighted and consistent-hashing. The default option is least-weighted
+ AllocationStrategy determines which strategy the target allocator should use for allocation. The current options are least-weighted, consistent-hashing and per-node. The default option is least-weighted

- Enum: least-weighted, consistent-hashing
+ Enum: least-weighted, consistent-hashing, per-node
false From 7f82f692290b2c01909dc2066a3e1574f93f9fe3 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 8 Dec 2023 12:41:37 +0100 Subject: [PATCH 06/25] Add changelog Signed-off-by: Matej Gera --- .chloggen/per-node-allocation-strategy.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100755 .chloggen/per-node-allocation-strategy.yaml diff --git a/.chloggen/per-node-allocation-strategy.yaml b/.chloggen/per-node-allocation-strategy.yaml new file mode 100755 index 0000000000..e4a6637bfe --- /dev/null +++ b/.chloggen/per-node-allocation-strategy.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new "per node" allocation strategy to target allocator. This strategy will allocate targets to nodes on which given target resides. It should only be used conjunction with the daemonset mode. + +# One or more tracking issues related to the change +issues: [1828] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 959c3c5906fd25778b587ed7541d4d9764b2f5f3 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 8 Dec 2023 14:56:56 +0100 Subject: [PATCH 07/25] Fix lint Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 3 ++- cmd/otel-allocator/allocation/per_node_test.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 6cda34b0fd..5c3ad276dd 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -17,9 +17,10 @@ package allocation import ( "sync" - "github.com/go-logr/logr" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" + + "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index caec92f6f8..6774221257 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" logf "sigs.k8s.io/controller-runtime/pkg/log" From 082e678411dc16dfa12c9d9c67a3ea05437586aa Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 8 Dec 2023 16:03:35 +0100 Subject: [PATCH 08/25] Add more labels to match node name Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 26 ++++++++++++------- .../allocation/per_node_test.go | 8 +++--- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 5c3ad276dd..8aecbbde06 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -27,11 +27,7 @@ import ( var _ Allocator = &perNodeAllocator{} -const ( - perNodeStrategyName = "per-node" - - podNodeNameLabel model.LabelName = "__meta_kubernetes_pod_node_name" -) +const perNodeStrategyName = "per-node" // perNodeAllocator makes decisions to distribute work among // a number of OpenTelemetry collectors based on the node on which @@ -55,6 +51,15 @@ type perNodeAllocator struct { filter Filter } +// nodeLabels are labels that are used to identify the node on which the given +// target is residing. To learn more about these labels, please refer to: +// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config +var nodeLabels = []model.LabelName{ + "__meta_kubernetes_pod_node_name", + "__meta_kubernetes_node_name", + "__meta_kubernetes_endpoint_node_name", +} + // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. // This method is called when Collectors are added or removed. func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collector) { @@ -211,10 +216,13 @@ func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) { func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collector { var col *Collector for _, v := range allocator.collectors { - if podNodeNameLabelValue, ok := labels[podNodeNameLabel]; ok { - if v.Node == string(podNodeNameLabelValue) { - col = v - break + // Try to match against a node label. + for _, l := range nodeLabels { + if nodeNameLabelValue, ok := labels[l]; ok { + if v.Node == string(nodeNameLabelValue) { + col = v + break + } } } } diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index 6774221257..a42c1537dc 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -34,12 +34,12 @@ func TestAllocationPerNode(t *testing.T) { cols := MakeNCollectors(3, 0) s.SetCollectors(cols) firstLabels := model.LabelSet{ - "test": "test1", - podNodeNameLabel: "node-0", + "test": "test1", + "__meta_kubernetes_pod_node_name": "node-0", } secondLabels := model.LabelSet{ - "test": "test2", - podNodeNameLabel: "node-1", + "test": "test2", + "__meta_kubernetes_node_name": "node-1", } // no label, should be skipped thirdLabels := model.LabelSet{ From 2d29ae601bd0e22691ae9798699dcb02031b5f44 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 18 Dec 2023 09:34:25 +0100 Subject: [PATCH 09/25] Better handling of unassignable targets / jobs Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 29 +++++++++++++++---- .../allocation/per_node_test.go | 16 +++++----- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 8aecbbde06..7abe62d81f 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -178,15 +178,29 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] } // Check for additions + unassignedTargetsForJobs := make(map[string]struct{}) for k, item := range diff.Additions() { // Do nothing if the item is already there if _, ok := allocator.targetItems[k]; ok { continue } else { // Add item to item pool and assign a collector - allocator.addTargetToTargetItems(item) + collectorAssigned := allocator.addTargetToTargetItems(item) + if !collectorAssigned { + unassignedTargetsForJobs[item.JobName] = struct{}{} + } + } + } + // Check for unassigned targets + if len(unassignedTargetsForJobs) > 0 { + jobs := make([]string, 0, len(unassignedTargetsForJobs)) + for j := range unassignedTargetsForJobs { + jobs = append(jobs, j) } + + allocator.log.Info("Could not assign targets for the following jobs due to missing node labels", "jobs", jobs) } + } // addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems @@ -195,18 +209,21 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // INVARIANT: allocator.collectors must have at least 1 collector set. // NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target // item while it's being encoded by the server JSON handler. -// Also, any targets that cannot be assigned to a collector due to no matching node name will be dropped. -func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) { +// Also, any targets that cannot be assigned to a collector, due to no matching node name, will remain unassigned. These +// targets are still "silently" added to the targetItems map, to prevent them from being reported as unassigned on each new +// target items setting. +func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) bool { + allocator.targetItems[tg.Hash()] = tg chosenCollector := allocator.findCollector(tg.Labels) if chosenCollector == nil { allocator.log.V(2).Info("Couldn't find a collector for the target item", "item", tg, "collectors", allocator.collectors) - return + return false } tg.CollectorName = chosenCollector.Name - allocator.targetItems[tg.Hash()] = tg allocator.addCollectorTargetItemMapping(tg) chosenCollector.NumTargets++ - TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) + TargetsPerCollector.WithLabelValues(chosenCollector.Name, perNodeStrategyName).Set(float64(chosenCollector.NumTargets)) + return true } // findCollector finds the collector that matches the node of the target, on the basis of the diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index a42c1537dc..242f567e8a 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -62,21 +62,23 @@ func TestAllocationPerNode(t *testing.T) { actualItems := s.TargetItems() // one target should be skipped - expectedTargetLen := len(targetList) - 1 + expectedTargetLen := len(targetList) assert.Len(t, actualItems, expectedTargetLen) // verify allocation to nodes for targetHash, item := range targetList { actualItem, found := actualItems[targetHash] // if third target, should be skipped - if targetHash != thirdTarget.Hash() { - assert.True(t, found, "target with hash %s not found", item.Hash()) - } else { - assert.False(t, found, "target with hash %s should not be found", item.Hash()) - return - } + assert.True(t, found, "target with hash %s not found", item.Hash()) + // only the first two targets should be allocated itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName) + + // first two should be assigned one to each collector; if third target, should not be assigned + if targetHash == thirdTarget.Hash() { + assert.Len(t, itemsForCollector, 0) + continue + } assert.Len(t, itemsForCollector, 1) assert.Equal(t, actualItem, itemsForCollector[0]) } From c08f560c56e1ec7183cf26adfa7e3912d17d94eb Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 18 Dec 2023 09:34:57 +0100 Subject: [PATCH 10/25] Adjust webhook validation Signed-off-by: Matej Gera --- apis/v1alpha1/collector_webhook.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apis/v1alpha1/collector_webhook.go b/apis/v1alpha1/collector_webhook.go index 7b0cefddfa..cbae5ae335 100644 --- a/apis/v1alpha1/collector_webhook.go +++ b/apis/v1alpha1/collector_webhook.go @@ -180,10 +180,14 @@ func (c CollectorWebhook) validate(r *OpenTelemetryCollector) (admission.Warning } // validate target allocation - if r.Spec.TargetAllocator.Enabled && r.Spec.Mode != ModeStatefulSet { + if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode != ModeStatefulSet && r.Spec.Mode != ModeDaemonSet) { return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode) } + if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet { + return warnings, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet) + } + // validate Prometheus config for target allocation if r.Spec.TargetAllocator.Enabled { promCfg, err := ta.ConfigToPromConfig(r.Spec.Config) From 8c0f9e6f7cd641779f4cd0e029d0b7074cf6286c Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 18 Dec 2023 10:44:53 +0100 Subject: [PATCH 11/25] Add E2E test; put TA E2E tests into separate target Signed-off-by: Matej Gera --- Makefile | 5 + hack/modify-test-images.sh | 5 +- kuttl-test-targetallocator.yaml | 7 + .../opampbridge/00-install.yaml | 2 +- .../targetallocator-features/00-assert.yaml | 0 .../targetallocator-features/00-install.yaml | 0 .../targetallocator-features/01-assert.yaml | 0 .../targetallocator-features/01-liveness.yaml | 0 .../00-assert.yaml | 52 +++++++ .../00-install.yaml | 133 ++++++++++++++++++ .../01-assert.yaml | 0 .../01-install.yaml | 49 +++++++ .../check-daemonset.sh | 15 ++ .../00-assert.yaml | 0 .../00-install.yaml | 0 .../01-assert.yaml | 20 +++ .../01-install.yaml | 0 17 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 kuttl-test-targetallocator.yaml rename tests/{e2e => e2e-targetallocator}/targetallocator-features/00-assert.yaml (100%) rename tests/{e2e => e2e-targetallocator}/targetallocator-features/00-install.yaml (100%) rename tests/{e2e => e2e-targetallocator}/targetallocator-features/01-assert.yaml (100%) rename tests/{e2e => e2e-targetallocator}/targetallocator-features/01-liveness.yaml (100%) create mode 100644 tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml create mode 100644 tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml rename tests/{e2e/targetallocator-prometheuscr => e2e-targetallocator/targetallocator-kubernetessd}/01-assert.yaml (100%) create mode 100644 tests/e2e-targetallocator/targetallocator-kubernetessd/01-install.yaml create mode 100755 tests/e2e-targetallocator/targetallocator-kubernetessd/check-daemonset.sh rename tests/{e2e => e2e-targetallocator}/targetallocator-prometheuscr/00-assert.yaml (100%) rename tests/{e2e => e2e-targetallocator}/targetallocator-prometheuscr/00-install.yaml (100%) create mode 100644 tests/e2e-targetallocator/targetallocator-prometheuscr/01-assert.yaml rename tests/{e2e => e2e-targetallocator}/targetallocator-prometheuscr/01-install.yaml (100%) diff --git a/Makefile b/Makefile index d14311a893..9ef3d7dd19 100644 --- a/Makefile +++ b/Makefile @@ -233,6 +233,11 @@ e2e-multi-instrumentation: e2e-opampbridge: $(KUTTL) test --config kuttl-test-opampbridge.yaml +# Target allocator end-to-tests +.PHONY: e2e-targetallocator +e2e-targetallocator: + $(KUTTL) test --config kuttl-test-targetallocator.yaml + .PHONY: prepare-e2e prepare-e2e: kuttl set-image-controller container container-target-allocator container-operator-opamp-bridge start-kind cert-manager install-metrics-server install-targetallocator-prometheus-crds load-image-all deploy TARGETALLOCATOR_IMG=$(TARGETALLOCATOR_IMG) OPERATOROPAMPBRIDGE_IMG=$(OPERATOROPAMPBRIDGE_IMG) OPERATOR_IMG=$(IMG) SED_BIN="$(SED)" ./hack/modify-test-images.sh diff --git a/hack/modify-test-images.sh b/hack/modify-test-images.sh index 5c98e53984..005332b3a7 100755 --- a/hack/modify-test-images.sh +++ b/hack/modify-test-images.sh @@ -7,9 +7,10 @@ DEFAULT_OPERATOROPAMPBRIDGE_IMG=${DEFAULT_OPERATOROPAMPBRIDGE_IMG:-local/opentel DEFAULT_OPERATOR_IMG=${DEFAULT_OPERATOR_IMG:-local/opentelemetry-operator:e2e} ${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/smoke-targetallocator/*.yaml -${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/targetallocator-features/00-install.yaml +${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-features/00-install.yaml ${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/prometheus-config-validation/*.yaml -${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/targetallocator-prometheuscr/*.yaml +${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-prometheuscr/*.yaml +${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-kubernetessd/*.yaml ${SED_BIN} -i "s#${DEFAULT_OPERATOR_IMG}#${OPERATOR_IMG}#g" tests/e2e-multi-instrumentation/*.yaml diff --git a/kuttl-test-targetallocator.yaml b/kuttl-test-targetallocator.yaml new file mode 100644 index 0000000000..12bf709985 --- /dev/null +++ b/kuttl-test-targetallocator.yaml @@ -0,0 +1,7 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestSuite +artifactsDir: ./tests/_build/artifacts/ +testDirs: + - ./tests/e2e-targetallocator/ +timeout: 150 +parallel: 4 diff --git a/tests/e2e-opampbridge/opampbridge/00-install.yaml b/tests/e2e-opampbridge/opampbridge/00-install.yaml index 3f4f353f15..da833f429a 100644 --- a/tests/e2e-opampbridge/opampbridge/00-install.yaml +++ b/tests/e2e-opampbridge/opampbridge/00-install.yaml @@ -26,7 +26,7 @@ kind: OpAMPBridge metadata: name: test spec: - image: "local/opentelemetry-operator-opamp-bridge:e2e" + image: "ghcr.io/open-telemetry/opentelemetry-operator/operator-opamp-bridge:v0.90.0-14-g082e678" endpoint: ws://opamp-server:4320/v1/opamp capabilities: AcceptsOpAMPConnectionSettings: true diff --git a/tests/e2e/targetallocator-features/00-assert.yaml b/tests/e2e-targetallocator/targetallocator-features/00-assert.yaml similarity index 100% rename from tests/e2e/targetallocator-features/00-assert.yaml rename to tests/e2e-targetallocator/targetallocator-features/00-assert.yaml diff --git a/tests/e2e/targetallocator-features/00-install.yaml b/tests/e2e-targetallocator/targetallocator-features/00-install.yaml similarity index 100% rename from tests/e2e/targetallocator-features/00-install.yaml rename to tests/e2e-targetallocator/targetallocator-features/00-install.yaml diff --git a/tests/e2e/targetallocator-features/01-assert.yaml b/tests/e2e-targetallocator/targetallocator-features/01-assert.yaml similarity index 100% rename from tests/e2e/targetallocator-features/01-assert.yaml rename to tests/e2e-targetallocator/targetallocator-features/01-assert.yaml diff --git a/tests/e2e/targetallocator-features/01-liveness.yaml b/tests/e2e-targetallocator/targetallocator-features/01-liveness.yaml similarity index 100% rename from tests/e2e/targetallocator-features/01-liveness.yaml rename to tests/e2e-targetallocator/targetallocator-features/01-liveness.yaml diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml new file mode 100644 index 0000000000..4874342bf9 --- /dev/null +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml @@ -0,0 +1,52 @@ +# This KUTTL assert uses the check-daemonset.sh script to ensure the number of ready pods in a daemonset matches the desired count, retrying until successful or a timeout occurs. The script is needed as the number of Kubernetes cluster nodes can vary and we cannot statically set desiredNumberScheduled and numberReady in the assert for daemonset status. + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: +- script: ./tests/e2e/smoke-daemonset/check-daemonset.sh +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: prometheus-kubernetessd-targetallocator +status: + replicas: 1 + readyReplicas: 1 + observedGeneration: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: prometheus-kubernetessd-targetallocator +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: prometheus-kubernetessd-collector +data: + collector.yaml: | + exporters: + prometheus: + endpoint: 0.0.0.0:9090 + processors: null + receivers: + prometheus: + config: {} + target_allocator: + collector_id: ${POD_NAME} + endpoint: http://prometheus-kubernetessd-targetallocator:80 + interval: 30s + service: + pipelines: + metrics: + exporters: + - prometheus + processors: [] + receivers: + - prometheus +--- +# Print TA pod logs if test fails +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - selector: app.kubernetes.io/managed-by=opentelemetry-operator diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml new file mode 100644 index 0000000000..1a97786c14 --- /dev/null +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml @@ -0,0 +1,133 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ta +automountServiceAccountToken: true +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: collector +automountServiceAccountToken: true +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: ta +rules: +- apiGroups: [""] + resources: + - pods + - nodes + - services + - endpoints + - configmaps + - secrets + - namespaces + verbs: + - get + - watch + - list +- apiGroups: ["apps"] + resources: + - statefulsets + - daemonsets + - services + - endpoints + verbs: + - get + - watch + - list +- apiGroups: ["discovery.k8s.io"] + resources: + - endpointslices + verbs: + - get + - watch + - list +- apiGroups: ["networking.k8s.io"] + resources: + - ingresses + verbs: + - get + - watch + - list +- nonResourceURLs: ["/metrics"] + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: collector +rules: +- apiGroups: [""] + resources: + - pods + - nodes + - nodes/metrics + - services + - endpoints + verbs: + - get + - watch + - list +- apiGroups: ["networking.k8s.io"] + resources: + - ingresses + verbs: + - get + - watch + - list +- nonResourceURLs: ["/metrics", "/metrics/cadvisor"] + verbs: ["get"] +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - command: kubectl create clusterrolebinding ta-$NAMESPACE --clusterrole=ta --serviceaccount=$NAMESPACE:ta + - command: kubectl create clusterrolebinding collector-$NAMESPACE --clusterrole=collector --serviceaccount=$NAMESPACE:collector +--- +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: prometheus-kubernetessd +spec: + mode: daemonset + serviceAccount: collector + targetAllocator: + enabled: true + serviceAccount: ta + image: "local/opentelemetry-operator-targetallocator:e2e" + prometheusCR: + enabled: false + config: | + receivers: + prometheus: + config: + scrape_configs: + - job_name: kubelet + scheme: https + authorization: + credentials_file: /var/run/secrets/kubernetes.io/serviceaccount/token + tls_config: + ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + insecure_skip_verify: true + honor_labels: true + kubernetes_sd_configs: + - role: node + metric_relabel_configs: + - action: keep + regex: "kubelet_running_pods" + source_labels: [__name__] + + processors: + + exporters: + prometheus: + endpoint: 0.0.0.0:9090 + service: + pipelines: + metrics: + receivers: [prometheus] + processors: [] + exporters: [prometheus] diff --git a/tests/e2e/targetallocator-prometheuscr/01-assert.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/01-assert.yaml similarity index 100% rename from tests/e2e/targetallocator-prometheuscr/01-assert.yaml rename to tests/e2e-targetallocator/targetallocator-kubernetessd/01-assert.yaml diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/01-install.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/01-install.yaml new file mode 100644 index 0000000000..5090dd5c89 --- /dev/null +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/01-install.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: check-metrics +spec: + template: + spec: + restartPolicy: OnFailure + containers: + - name: check-metrics + image: curlimages/curl + args: + - /bin/sh + - -c + - curl -s http://prometheus-kubernetessd-collector:9090/metrics | grep "kubelet_running_pods{" +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: check-ta-jobs +spec: + template: + spec: + restartPolicy: OnFailure + containers: + - name: check-metrics + image: curlimages/curl + args: + - /bin/sh + - -c + - curl -s http://prometheus-kubernetessd-targetallocator/scrape_configs | grep "kubelet" +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: check-ta-scrape-configs +spec: + template: + spec: + restartPolicy: OnFailure + containers: + - name: check-metrics + image: curlimages/curl + args: + - /bin/sh + - -c + # First get the collector pod name, subsequently check that the targets for the collector include the node name label. + - curl -s http://prometheus-kubernetessd-targetallocator/jobs/kubelet/targets?collector_id=$(curl -s http://prometheus-kubernetessd-targetallocator/jobs/kubelet/targets | grep -oE "prometheus-kubernetessd-collector-.{5}") | grep "__meta_kubernetes_node_name" \ No newline at end of file diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/check-daemonset.sh b/tests/e2e-targetallocator/targetallocator-kubernetessd/check-daemonset.sh new file mode 100755 index 0000000000..ae9c64b8d9 --- /dev/null +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/check-daemonset.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Name of the daemonset to check +DAEMONSET_NAME="prometheus-kubernetessd-collector" + +# Get the desired and ready pod counts for the daemonset +read DESIRED READY <<< $(kubectl get daemonset -n $NAMESPACE $DAEMONSET_NAME -o custom-columns=:status.desiredNumberScheduled,:status.numberReady --no-headers) + +# Check if the desired count matches the ready count +if [ "$DESIRED" -eq "$READY" ]; then + echo "Desired count ($DESIRED) matches the ready count ($READY) for $DAEMONSET_NAME." +else + echo "Desired count ($DESIRED) does not match the ready count ($READY) for $DAEMONSET_NAME." + exit 1 +fi diff --git a/tests/e2e/targetallocator-prometheuscr/00-assert.yaml b/tests/e2e-targetallocator/targetallocator-prometheuscr/00-assert.yaml similarity index 100% rename from tests/e2e/targetallocator-prometheuscr/00-assert.yaml rename to tests/e2e-targetallocator/targetallocator-prometheuscr/00-assert.yaml diff --git a/tests/e2e/targetallocator-prometheuscr/00-install.yaml b/tests/e2e-targetallocator/targetallocator-prometheuscr/00-install.yaml similarity index 100% rename from tests/e2e/targetallocator-prometheuscr/00-install.yaml rename to tests/e2e-targetallocator/targetallocator-prometheuscr/00-install.yaml diff --git a/tests/e2e-targetallocator/targetallocator-prometheuscr/01-assert.yaml b/tests/e2e-targetallocator/targetallocator-prometheuscr/01-assert.yaml new file mode 100644 index 0000000000..b3b95bf022 --- /dev/null +++ b/tests/e2e-targetallocator/targetallocator-prometheuscr/01-assert.yaml @@ -0,0 +1,20 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: check-metrics +status: + succeeded: 1 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: check-ta-jobs +status: + succeeded: 1 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: check-ta-scrape-configs +status: + succeeded: 1 \ No newline at end of file diff --git a/tests/e2e/targetallocator-prometheuscr/01-install.yaml b/tests/e2e-targetallocator/targetallocator-prometheuscr/01-install.yaml similarity index 100% rename from tests/e2e/targetallocator-prometheuscr/01-install.yaml rename to tests/e2e-targetallocator/targetallocator-prometheuscr/01-install.yaml From acf7644fa95f0f2b08858566c3edb366bb333a15 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 18 Dec 2023 11:12:06 +0100 Subject: [PATCH 12/25] Format; fix mistakenly changed test image Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 2 +- tests/e2e-opampbridge/opampbridge/00-install.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 7abe62d81f..cd9871cd2b 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -191,6 +191,7 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] } } } + // Check for unassigned targets if len(unassignedTargetsForJobs) > 0 { jobs := make([]string, 0, len(unassignedTargetsForJobs)) @@ -200,7 +201,6 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] allocator.log.Info("Could not assign targets for the following jobs due to missing node labels", "jobs", jobs) } - } // addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems diff --git a/tests/e2e-opampbridge/opampbridge/00-install.yaml b/tests/e2e-opampbridge/opampbridge/00-install.yaml index da833f429a..3f4f353f15 100644 --- a/tests/e2e-opampbridge/opampbridge/00-install.yaml +++ b/tests/e2e-opampbridge/opampbridge/00-install.yaml @@ -26,7 +26,7 @@ kind: OpAMPBridge metadata: name: test spec: - image: "ghcr.io/open-telemetry/opentelemetry-operator/operator-opamp-bridge:v0.90.0-14-g082e678" + image: "local/opentelemetry-operator-opamp-bridge:e2e" endpoint: ws://opamp-server:4320/v1/opamp capabilities: AcceptsOpAMPConnectionSettings: true From 57d23bdc9b54c6d7ffaada70c56cc6148755c008 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 19 Dec 2023 13:20:51 +0100 Subject: [PATCH 13/25] Use node name instead of collector name for collector map Signed-off-by: Matej Gera --- .../allocation/allocatortest.go | 2 +- .../allocation/consistent_hashing.go | 2 +- .../allocation/least_weighted.go | 2 +- cmd/otel-allocator/allocation/per_node.go | 43 +++---------------- cmd/otel-allocator/allocation/strategy.go | 4 +- cmd/otel-allocator/target/target.go | 19 ++++++++ 6 files changed, 30 insertions(+), 42 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocatortest.go b/cmd/otel-allocator/allocation/allocatortest.go index dedb8fd739..10617205dc 100644 --- a/cmd/otel-allocator/allocation/allocatortest.go +++ b/cmd/otel-allocator/allocation/allocatortest.go @@ -52,7 +52,7 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector { toReturn[collector] = &Collector{ Name: collector, NumTargets: 0, - Node: fmt.Sprintf("node-%d", i), + NodeName: fmt.Sprintf("node-%d", i), } } return toReturn diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 3a22d437db..f69a2f25d2 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -161,7 +161,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect } // Insert the new collectors for _, i := range diff.Additions() { - c.collectors[i.Name] = NewCollector(i.Name, i.Node) + c.collectors[i.Name] = NewCollector(i.Name, i.NodeName) c.consistentHasher.Add(c.collectors[i.Name]) } diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 486b4ccaf6..729dc85680 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -191,7 +191,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col } // Insert the new collectors for _, i := range diff.Additions() { - allocator.collectors[i.Name] = NewCollector(i.Name, i.Node) + allocator.collectors[i.Name] = NewCollector(i.Name, i.NodeName) } if allocateTargets { for _, item := range allocator.targetItems { diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index cd9871cd2b..ab7012e157 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -22,7 +22,6 @@ import ( "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" ) var _ Allocator = &perNodeAllocator{} @@ -38,7 +37,7 @@ const perNodeStrategyName = "per-node" type perNodeAllocator struct { // m protects collectors and targetItems for concurrent use. m sync.RWMutex - // collectors is a map from a Collector's name to a Collector instance + // collectors is a map from a Collector's node name to a Collector instance collectors map[string]*Collector // targetItems is a map from a target item's hash to the target items allocated state targetItems map[string]*target.Item @@ -51,15 +50,6 @@ type perNodeAllocator struct { filter Filter } -// nodeLabels are labels that are used to identify the node on which the given -// target is residing. To learn more about these labels, please refer to: -// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config -var nodeLabels = []model.LabelName{ - "__meta_kubernetes_pod_node_name", - "__meta_kubernetes_node_name", - "__meta_kubernetes_endpoint_node_name", -} - // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. // This method is called when Collectors are added or removed. func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collector) { @@ -87,14 +77,14 @@ func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collecto func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector]) { // Clear removed collectors for _, k := range diff.Removals() { - delete(allocator.collectors, k.Name) + delete(allocator.collectors, k.NodeName) delete(allocator.targetItemsPerJobPerCollector, k.Name) TargetsPerCollector.WithLabelValues(k.Name, perNodeStrategyName).Set(0) } // Insert the new collectors for _, i := range diff.Additions() { - allocator.collectors[i.Name] = NewCollector(i.Name, i.Node) + allocator.collectors[i.NodeName] = NewCollector(i.Name, i.NodeName) } } @@ -166,7 +156,7 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] for k, item := range allocator.targetItems { // if the current item is in the removals list if _, ok := diff.Removals()[k]; ok { - c, ok := allocator.collectors[item.CollectorName] + c, ok := allocator.collectors[item.GetNodeName()] if !ok { continue } @@ -214,8 +204,8 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // target items setting. func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) bool { allocator.targetItems[tg.Hash()] = tg - chosenCollector := allocator.findCollector(tg.Labels) - if chosenCollector == nil { + chosenCollector, ok := allocator.collectors[tg.GetNodeName()] + if !ok { allocator.log.V(2).Info("Couldn't find a collector for the target item", "item", tg, "collectors", allocator.collectors) return false } @@ -226,27 +216,6 @@ func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) bool return true } -// findCollector finds the collector that matches the node of the target, on the basis of the -// pod node label. -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. This method assumes there are is at least 1 collector set. -func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collector { - var col *Collector - for _, v := range allocator.collectors { - // Try to match against a node label. - for _, l := range nodeLabels { - if nodeNameLabelValue, ok := labels[l]; ok { - if v.Node == string(nodeNameLabelValue) { - col = v - break - } - } - } - } - - return col -} - // addCollectorTargetItemMapping keeps track of which collector has which jobs and targets // this allows the allocator to respond without any extra allocations to http calls. The caller of this method // has to acquire a lock. diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 880f6de4ca..87967a0985 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -106,7 +106,7 @@ var _ consistent.Member = Collector{} // This struct can be extended with information like annotations and labels in the future. type Collector struct { Name string - Node string + NodeName string NumTargets int } @@ -119,7 +119,7 @@ func (c Collector) String() string { } func NewCollector(name, node string) *Collector { - return &Collector{Name: name, Node: node} + return &Collector{Name: name, NodeName: node} } func init() { diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go index ee5d58cf95..a73e40b6a0 100644 --- a/cmd/otel-allocator/target/target.go +++ b/cmd/otel-allocator/target/target.go @@ -21,6 +21,15 @@ import ( "github.com/prometheus/common/model" ) +// nodeLabels are labels that are used to identify the node on which the given +// target is residing. To learn more about these labels, please refer to: +// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config +var nodeLabels = []model.LabelName{ + "__meta_kubernetes_pod_node_name", + "__meta_kubernetes_node_name", + "__meta_kubernetes_endpoint_node_name", +} + // LinkJSON This package contains common structs and methods that relate to scrape targets. type LinkJSON struct { Link string `json:"_link"` @@ -39,6 +48,16 @@ func (t *Item) Hash() string { return t.hash } +func (t *Item) GetNodeName() string { + for _, label := range nodeLabels { + if val, ok := t.Labels[label]; ok { + return string(val) + } + } + + return "" +} + // NewItem Creates a new target item. // INVARIANTS: // * Item fields must not be modified after creation. From 0e09fc8c7edb6eb59645b23a0b20080fbf7993ea Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 19 Dec 2023 13:21:12 +0100 Subject: [PATCH 14/25] Add TA E2E tests to GH action Signed-off-by: Matej Gera --- .github/workflows/e2e.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 0ab9811ad4..5503f39b89 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -32,6 +32,7 @@ jobs: - e2e-multi-instrumentation - e2e-pdb - e2e-opampbridge + - e2e-targetallocator steps: - name: Check out code into the Go module directory From 9c6c6e6801556fdd4d1182cacd5260e0a2b19776 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 19 Dec 2023 14:55:58 +0100 Subject: [PATCH 15/25] Bump kuttl timeout Signed-off-by: Matej Gera --- kuttl-test-targetallocator.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kuttl-test-targetallocator.yaml b/kuttl-test-targetallocator.yaml index 12bf709985..0faba9afd7 100644 --- a/kuttl-test-targetallocator.yaml +++ b/kuttl-test-targetallocator.yaml @@ -3,5 +3,4 @@ kind: TestSuite artifactsDir: ./tests/_build/artifacts/ testDirs: - ./tests/e2e-targetallocator/ -timeout: 150 -parallel: 4 +timeout: 300 From 94b336e5de73c9dbac344c3ffeef23e21d03c19f Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 4 Jan 2024 17:12:24 +0100 Subject: [PATCH 16/25] Validate mode and strategy Signed-off-by: Matej Gera --- apis/v1alpha1/collector_webhook.go | 4 ++++ apis/v1alpha1/collector_webhook_test.go | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/apis/v1alpha1/collector_webhook.go b/apis/v1alpha1/collector_webhook.go index 9fbec3b063..87a171422c 100644 --- a/apis/v1alpha1/collector_webhook.go +++ b/apis/v1alpha1/collector_webhook.go @@ -200,6 +200,10 @@ func (c CollectorWebhook) validate(r *OpenTelemetryCollector) (admission.Warning return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode) } + if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode == ModeDaemonSet && r.Spec.TargetAllocator.AllocationStrategy != OpenTelemetryTargetAllocatorAllocationStrategyPerNode) { + return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which must be used with target allocation strategy %s ", r.Spec.Mode, OpenTelemetryTargetAllocatorAllocationStrategyPerNode) + } + if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet { return warnings, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet) } diff --git a/apis/v1alpha1/collector_webhook_test.go b/apis/v1alpha1/collector_webhook_test.go index e947a8c6e5..3525d713cf 100644 --- a/apis/v1alpha1/collector_webhook_test.go +++ b/apis/v1alpha1/collector_webhook_test.go @@ -539,6 +539,19 @@ func TestOTELColValidatingWebhook(t *testing.T) { }, expectedErr: "the OpenTelemetry Spec Prometheus configuration is incorrect", }, + { + name: "invalid target allocation strategy", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeDaemonSet, + TargetAllocator: OpenTelemetryTargetAllocator{ + Enabled: true, + AllocationStrategy: OpenTelemetryTargetAllocatorAllocationStrategyLeastWeighted, + }, + }, + }, + expectedErr: "does not support the target allocation deployment", + }, { name: "invalid port name", otelcol: OpenTelemetryCollector{ From 023ce4b5ec06cd4b54a0a26140a359661894e793 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 4 Jan 2024 17:15:27 +0100 Subject: [PATCH 17/25] Handle cases of missing node name when (re)adding collector Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 6 ++++++ cmd/otel-allocator/collector/collector.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index ab7012e157..376036d1e8 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -86,6 +86,12 @@ func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector for _, i := range diff.Additions() { allocator.collectors[i.NodeName] = NewCollector(i.Name, i.NodeName) } + + // For a case where a collector is removed and added back, we need + // to re-allocate any already existing targets. + for _, item := range allocator.targetItems { + allocator.addTargetToTargetItems(item) + } } // SetTargets accepts a list of targets that will be used to make diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index d5fdc49bea..58bbbee866 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -128,6 +128,11 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap return "" } + if pod.Spec.NodeName == "" { + k.log.Info("Node name is missing from the spec. Restarting watch routine") + return "" + } + switch event.Type { //nolint:exhaustive case watch.Added: collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) From 5976307a51a562f1a2d2e226a0c42fa05601aa91 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 8 Jan 2024 16:05:50 +0100 Subject: [PATCH 18/25] Adjust test cases after node name fix Signed-off-by: Matej Gera --- apis/v1alpha1/collector_webhook_test.go | 2 +- .../collector/collector_test.go | 24 +++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/apis/v1alpha1/collector_webhook_test.go b/apis/v1alpha1/collector_webhook_test.go index 02eabb866e..8aa9f8ac60 100644 --- a/apis/v1alpha1/collector_webhook_test.go +++ b/apis/v1alpha1/collector_webhook_test.go @@ -680,7 +680,7 @@ func TestOTELColValidatingWebhook(t *testing.T) { }, }, }, - expectedErr: "does not support the target allocation deployment", + expectedErr: "mode is set to daemonset, which must be used with target allocation strategy per-node", }, { name: "invalid port name", diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index 5367828e23..eb7eec5070 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -70,6 +70,9 @@ func pod(name string) *v1.Pod { Namespace: "test-ns", Labels: labelSet, }, + Spec: v1.PodSpec{ + NodeName: "test-node", + }, } } @@ -98,13 +101,16 @@ func Test_runWatch(t *testing.T) { }, want: map[string]*allocation.Collector{ "test-pod1": { - Name: "test-pod1", + Name: "test-pod1", + NodeName: "test-node", }, "test-pod2": { - Name: "test-pod2", + Name: "test-pod2", + NodeName: "test-node", }, "test-pod3": { - Name: "test-pod3", + Name: "test-pod3", + NodeName: "test-node", }, }, }, @@ -120,19 +126,23 @@ func Test_runWatch(t *testing.T) { }, collectorMap: map[string]*allocation.Collector{ "test-pod1": { - Name: "test-pod1", + Name: "test-pod1", + NodeName: "test-node", }, "test-pod2": { - Name: "test-pod2", + Name: "test-pod2", + NodeName: "test-node", }, "test-pod3": { - Name: "test-pod3", + Name: "test-pod3", + NodeName: "test-node", }, }, }, want: map[string]*allocation.Collector{ "test-pod1": { - Name: "test-pod1", + Name: "test-pod1", + NodeName: "test-node", }, }, }, From 3ac78c7c61b5af52fb3b4a53fe57af1fdcf85768 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 9 Jan 2024 15:17:41 +0100 Subject: [PATCH 19/25] Adjust E2E tests Signed-off-by: Matej Gera --- kuttl-test-targetallocator.yaml | 2 +- .../targetallocator-kubernetessd/00-install.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kuttl-test-targetallocator.yaml b/kuttl-test-targetallocator.yaml index 0faba9afd7..eca8f5336e 100644 --- a/kuttl-test-targetallocator.yaml +++ b/kuttl-test-targetallocator.yaml @@ -3,4 +3,4 @@ kind: TestSuite artifactsDir: ./tests/_build/artifacts/ testDirs: - ./tests/e2e-targetallocator/ -timeout: 300 +timeout: 600 diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml index 1a97786c14..9442b4023f 100644 --- a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-install.yaml @@ -96,8 +96,8 @@ spec: serviceAccount: collector targetAllocator: enabled: true + allocationStrategy: "per-node" serviceAccount: ta - image: "local/opentelemetry-operator-targetallocator:e2e" prometheusCR: enabled: false config: | From 0b3363e0f740866784e6a2530f550aee7b99ec96 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 15 Jan 2024 10:24:12 +0100 Subject: [PATCH 20/25] Put TA config validation into separate method Signed-off-by: Matej Gera --- apis/v1alpha1/collector_webhook.go | 74 +++++++++++++++++------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/apis/v1alpha1/collector_webhook.go b/apis/v1alpha1/collector_webhook.go index 0eb28433f7..2333b243bf 100644 --- a/apis/v1alpha1/collector_webhook.go +++ b/apis/v1alpha1/collector_webhook.go @@ -228,40 +228,14 @@ func (c CollectorWebhook) validate(ctx context.Context, r *OpenTelemetryCollecto return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'AdditionalContainers'", r.Spec.Mode) } - // validate target allocation - if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode != ModeStatefulSet && r.Spec.Mode != ModeDaemonSet) { - return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode) - } - - if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode == ModeDaemonSet && r.Spec.TargetAllocator.AllocationStrategy != OpenTelemetryTargetAllocatorAllocationStrategyPerNode) { - return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which must be used with target allocation strategy %s ", r.Spec.Mode, OpenTelemetryTargetAllocatorAllocationStrategyPerNode) - } - - if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet { - return warnings, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet) - } - - // validate Prometheus config for target allocation + // validate target allocator configs if r.Spec.TargetAllocator.Enabled { - promCfg, err := ta.ConfigToPromConfig(r.Spec.Config) - if err != nil { - return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) - } - err = ta.ValidatePromConfig(promCfg, r.Spec.TargetAllocator.Enabled, featuregate.EnableTargetAllocatorRewrite.IsEnabled()) + taWarnings, err := c.validateTargetAllocatorConfig(ctx, r) if err != nil { - return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) + return warnings, err } - err = ta.ValidateTargetAllocatorConfig(r.Spec.TargetAllocator.PrometheusCR.Enabled, promCfg) - if err != nil { - return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) - } - // if the prometheusCR is enabled, it needs a suite of permissions to function - if r.Spec.TargetAllocator.PrometheusCR.Enabled { - if subjectAccessReviews, err := c.reviewer.CheckPolicyRules(ctx, r.GetNamespace(), r.Spec.TargetAllocator.ServiceAccount, targetAllocatorCRPolicyRules...); err != nil { - return warnings, fmt.Errorf("unable to check rbac rules %w", err) - } else if allowed, deniedReviews := rbac.AllSubjectAccessReviewsAllowed(subjectAccessReviews); !allowed { - warnings = append(warnings, warningsGroupedByResource(deniedReviews)...) - } + if taWarnings != nil { + warnings = append(warnings, taWarnings...) } } @@ -368,6 +342,44 @@ func (c CollectorWebhook) validate(ctx context.Context, r *OpenTelemetryCollecto return warnings, nil } +func (c CollectorWebhook) validateTargetAllocatorConfig(ctx context.Context, r *OpenTelemetryCollector) (admission.Warnings, error) { + if r.Spec.Mode != ModeStatefulSet && r.Spec.Mode != ModeDaemonSet { + return nil, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode) + } + + if r.Spec.Mode == ModeDaemonSet && r.Spec.TargetAllocator.AllocationStrategy != OpenTelemetryTargetAllocatorAllocationStrategyPerNode { + return nil, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which must be used with target allocation strategy %s ", r.Spec.Mode, OpenTelemetryTargetAllocatorAllocationStrategyPerNode) + } + + if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet { + return nil, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet) + } + + // validate Prometheus config for target allocation + promCfg, err := ta.ConfigToPromConfig(r.Spec.Config) + if err != nil { + return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) + } + err = ta.ValidatePromConfig(promCfg, r.Spec.TargetAllocator.Enabled, featuregate.EnableTargetAllocatorRewrite.IsEnabled()) + if err != nil { + return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) + } + err = ta.ValidateTargetAllocatorConfig(r.Spec.TargetAllocator.PrometheusCR.Enabled, promCfg) + if err != nil { + return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err) + } + // if the prometheusCR is enabled, it needs a suite of permissions to function + if r.Spec.TargetAllocator.PrometheusCR.Enabled { + if subjectAccessReviews, err := c.reviewer.CheckPolicyRules(ctx, r.GetNamespace(), r.Spec.TargetAllocator.ServiceAccount, targetAllocatorCRPolicyRules...); err != nil { + return nil, fmt.Errorf("unable to check rbac rules %w", err) + } else if allowed, deniedReviews := rbac.AllSubjectAccessReviewsAllowed(subjectAccessReviews); !allowed { + return warningsGroupedByResource(deniedReviews), nil + } + } + + return nil, nil +} + func checkAutoscalerSpec(autoscaler *AutoscalerSpec) error { if autoscaler.Behavior != nil { if autoscaler.Behavior.ScaleDown != nil && autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds != nil && From 02ee9211ceea63687a2669668cc72612252a7c4b Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 15 Jan 2024 11:05:24 +0100 Subject: [PATCH 21/25] Adjust logging and add metric for jobs with unassigned targets Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 8 ++------ cmd/otel-allocator/allocation/strategy.go | 8 ++++++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 376036d1e8..2135a7f5a6 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -190,12 +190,8 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // Check for unassigned targets if len(unassignedTargetsForJobs) > 0 { - jobs := make([]string, 0, len(unassignedTargetsForJobs)) - for j := range unassignedTargetsForJobs { - jobs = append(jobs, j) - } - - allocator.log.Info("Could not assign targets for the following jobs due to missing node labels", "jobs", jobs) + allocator.log.Info("Could not assign targets for some jobs due to missing node labels", "jobs", unassignedTargetsForJobs) + RecordJobsWithUnassignedTargets(unassignedTargetsForJobs) } } diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 87967a0985..a1bf773c2b 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -49,6 +49,10 @@ var ( Name: "opentelemetry_allocator_targets_remaining", Help: "Number of targets kept after filtering.", }) + jobsUnassigned = promauto.NewCounter(prometheus.CounterOpts{ + Name: "opentelemetry_allocator_jobs_with_unassigned_targets", + Help: "Number of jobs with targets that could not be assigned due to missing node label.", + }) ) type AllocationOption func(Allocator) @@ -67,6 +71,10 @@ func RecordTargetsKept(targets map[string]*target.Item) { targetsRemaining.Add(float64(len(targets))) } +func RecordJobsWithUnassignedTargets(jobs map[string]struct{}) { + jobsUnassigned.Add(float64(len(jobs))) +} + func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) { if p, ok := registry[name]; ok { return p(log.WithValues("allocator", name), opts...), nil From b8befdfaa3dfb427ced5a7ab98957a23867c02a3 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 18 Jan 2024 11:18:37 +0100 Subject: [PATCH 22/25] Address feedback Signed-off-by: Matej Gera --- apis/v1alpha1/collector_webhook.go | 6 +++--- cmd/otel-allocator/allocation/per_node.go | 10 +++++----- cmd/otel-allocator/allocation/strategy.go | 10 +++------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/apis/v1alpha1/collector_webhook.go b/apis/v1alpha1/collector_webhook.go index 2333b243bf..be384fb3a3 100644 --- a/apis/v1alpha1/collector_webhook.go +++ b/apis/v1alpha1/collector_webhook.go @@ -231,12 +231,12 @@ func (c CollectorWebhook) validate(ctx context.Context, r *OpenTelemetryCollecto // validate target allocator configs if r.Spec.TargetAllocator.Enabled { taWarnings, err := c.validateTargetAllocatorConfig(ctx, r) - if err != nil { - return warnings, err - } if taWarnings != nil { warnings = append(warnings, taWarnings...) } + if err != nil { + return warnings, err + } } // validator port config diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 2135a7f5a6..8e8e27bbd7 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -174,7 +174,7 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] } // Check for additions - unassignedTargetsForJobs := make(map[string]struct{}) + var unassignedTargets int for k, item := range diff.Additions() { // Do nothing if the item is already there if _, ok := allocator.targetItems[k]; ok { @@ -183,15 +183,15 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // Add item to item pool and assign a collector collectorAssigned := allocator.addTargetToTargetItems(item) if !collectorAssigned { - unassignedTargetsForJobs[item.JobName] = struct{}{} + unassignedTargets++ } } } // Check for unassigned targets - if len(unassignedTargetsForJobs) > 0 { - allocator.log.Info("Could not assign targets for some jobs due to missing node labels", "jobs", unassignedTargetsForJobs) - RecordJobsWithUnassignedTargets(unassignedTargetsForJobs) + if unassignedTargets > 0 { + allocator.log.Info("Could not assign targets for some jobs due to missing node labels", "targets", unassignedTargets) + TargetsUnassigned.Add(float64(unassignedTargets)) } } diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index a1bf773c2b..540c0ebbaa 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -49,9 +49,9 @@ var ( Name: "opentelemetry_allocator_targets_remaining", Help: "Number of targets kept after filtering.", }) - jobsUnassigned = promauto.NewCounter(prometheus.CounterOpts{ - Name: "opentelemetry_allocator_jobs_with_unassigned_targets", - Help: "Number of jobs with targets that could not be assigned due to missing node label.", + TargetsUnassigned = promauto.NewCounter(prometheus.CounterOpts{ + Name: "opentelemetry_allocator_targets_unassigned", + Help: "Number of targets that could not be assigned due to missing node label.", }) ) @@ -71,10 +71,6 @@ func RecordTargetsKept(targets map[string]*target.Item) { targetsRemaining.Add(float64(len(targets))) } -func RecordJobsWithUnassignedTargets(jobs map[string]struct{}) { - jobsUnassigned.Add(float64(len(jobs))) -} - func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) { if p, ok := registry[name]; ok { return p(log.WithValues("allocator", name), opts...), nil From 9075a843fdb16c355176199cbb548e72c48cb18d Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 6 Feb 2024 11:44:26 +0100 Subject: [PATCH 23/25] Simplify flow according to feedback; correct docs Signed-off-by: Matej Gera --- cmd/otel-allocator/allocation/per_node.go | 86 +++++------------------ cmd/otel-allocator/allocation/strategy.go | 2 +- 2 files changed, 19 insertions(+), 69 deletions(-) diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index 8e8e27bbd7..7820f0093c 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -68,29 +68,20 @@ func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collecto // Check for collector changes collectorsDiff := diff.Maps(allocator.collectors, collectors) if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { - allocator.handleCollectors(collectorsDiff) - } -} - -// handleCollectors receives the new and removed collectors and reconciles the current state. -// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map. -func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector]) { - // Clear removed collectors - for _, k := range diff.Removals() { - delete(allocator.collectors, k.NodeName) - delete(allocator.targetItemsPerJobPerCollector, k.Name) - TargetsPerCollector.WithLabelValues(k.Name, perNodeStrategyName).Set(0) - } + for _, k := range allocator.collectors { + delete(allocator.collectors, k.NodeName) + delete(allocator.targetItemsPerJobPerCollector, k.Name) + TargetsPerCollector.WithLabelValues(k.Name, perNodeStrategyName).Set(0) + } - // Insert the new collectors - for _, i := range diff.Additions() { - allocator.collectors[i.NodeName] = NewCollector(i.Name, i.NodeName) - } + for _, k := range collectors { + allocator.collectors[k.NodeName] = NewCollector(k.Name, k.NodeName) + } - // For a case where a collector is removed and added back, we need - // to re-allocate any already existing targets. - for _, item := range allocator.targetItems { - allocator.addTargetToTargetItems(item) + // Re-allocate any already existing targets. + for _, item := range allocator.targetItems { + allocator.addTargetToTargetItems(item) + } } } @@ -109,43 +100,6 @@ func (allocator *perNodeAllocator) SetTargets(targets map[string]*target.Item) { allocator.m.Lock() defer allocator.m.Unlock() - if len(allocator.collectors) == 0 { - allocator.log.Info("No collector instances present, saving targets to allocate to collector(s)") - // If there were no targets discovered previously, assign this as the new set of target items - if len(allocator.targetItems) == 0 { - allocator.log.Info("Not discovered any targets previously, saving targets found to the targetItems set") - for k, item := range targets { - allocator.targetItems[k] = item - } - } else { - // If there were previously discovered targets, add or remove accordingly - targetsDiffEmptyCollectorSet := diff.Maps(allocator.targetItems, targets) - - // Check for additions - if len(targetsDiffEmptyCollectorSet.Additions()) > 0 { - allocator.log.Info("New targets discovered, adding new targets to the targetItems set") - for k, item := range targetsDiffEmptyCollectorSet.Additions() { - // Do nothing if the item is already there - if _, ok := allocator.targetItems[k]; ok { - continue - } else { - // Add item to item pool - allocator.targetItems[k] = item - } - } - } - - // Check for deletions - if len(targetsDiffEmptyCollectorSet.Removals()) > 0 { - allocator.log.Info("Targets removed, Removing targets from the targetItems set") - for k := range targetsDiffEmptyCollectorSet.Removals() { - // Delete item from target items - delete(allocator.targetItems, k) - } - } - } - return - } // Check for target changes targetsDiff := diff.Maps(allocator.targetItems, targets) // If there are any additions or removals @@ -163,13 +117,12 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // if the current item is in the removals list if _, ok := diff.Removals()[k]; ok { c, ok := allocator.collectors[item.GetNodeName()] - if !ok { - continue + if ok { + c.NumTargets-- + TargetsPerCollector.WithLabelValues(item.CollectorName, perNodeStrategyName).Set(float64(c.NumTargets)) } - c.NumTargets-- delete(allocator.targetItems, k) delete(allocator.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash()) - TargetsPerCollector.WithLabelValues(item.CollectorName, perNodeStrategyName).Set(float64(c.NumTargets)) } } @@ -191,19 +144,16 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item] // Check for unassigned targets if unassignedTargets > 0 { allocator.log.Info("Could not assign targets for some jobs due to missing node labels", "targets", unassignedTargets) - TargetsUnassigned.Add(float64(unassignedTargets)) + TargetsUnassigned.Set(float64(unassignedTargets)) } } // addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. -// INVARIANT: allocator.collectors must have at least 1 collector set. -// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target -// item while it's being encoded by the server JSON handler. // Also, any targets that cannot be assigned to a collector, due to no matching node name, will remain unassigned. These -// targets are still "silently" added to the targetItems map, to prevent them from being reported as unassigned on each new -// target items setting. +// targets are still "silently" added to the targetItems map, to make sure they exist if collector for a node is added +// later and to prevent them from being reported as unassigned on each new target items setting. func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) bool { allocator.targetItems[tg.Hash()] = tg chosenCollector, ok := allocator.collectors[tg.GetNodeName()] diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 540c0ebbaa..b61313bd1f 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -49,7 +49,7 @@ var ( Name: "opentelemetry_allocator_targets_remaining", Help: "Number of targets kept after filtering.", }) - TargetsUnassigned = promauto.NewCounter(prometheus.CounterOpts{ + TargetsUnassigned = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_targets_unassigned", Help: "Number of targets that could not be assigned due to missing node label.", }) From 339ed5cc73c47c016dfd5fd7cf501067261f6ec4 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 6 Feb 2024 11:45:06 +0100 Subject: [PATCH 24/25] Add more unit tests for edge cases Signed-off-by: Matej Gera --- .../allocation/allocatortest.go | 5 ++- .../allocation/per_node_test.go | 43 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocatortest.go b/cmd/otel-allocator/allocation/allocatortest.go index 354678d2b7..c47f5976ce 100644 --- a/cmd/otel-allocator/allocation/allocatortest.go +++ b/cmd/otel-allocator/allocation/allocatortest.go @@ -61,8 +61,9 @@ func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*ta toReturn := map[string]*target.Item{} for i := startingIndex; i < n+startingIndex; i++ { label := model.LabelSet{ - "i": model.LabelValue(strconv.Itoa(i)), - "total": model.LabelValue(strconv.Itoa(n + startingIndex)), + "i": model.LabelValue(strconv.Itoa(i)), + "total": model.LabelValue(strconv.Itoa(n + startingIndex)), + "__meta_kubernetes_pod_node_name": model.LabelValue("node-0"), } newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, "") toReturn[newTarget.Hash()] = newTarget diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index 242f567e8a..a7d695bba7 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -83,3 +83,46 @@ func TestAllocationPerNode(t *testing.T) { assert.Equal(t, actualItem, itemsForCollector[0]) } } + +func TestTargetsWithNoCollectorsPerNode(t *testing.T) { + // prepare allocator with initial targets and collectors + c, _ := New("per-node", loggerPerNode) + + // Adding 10 new targets + numItems := 10 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItems, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, numItems) + + // Adding 5 new targets, and removing the old 10 targets + numItemsUpdate := 5 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10)) + actualTargetItemsUpdated := c.TargetItems() + assert.Len(t, actualTargetItemsUpdated, numItemsUpdate) + + // Adding 5 new targets, and one existing target + numItemsUpdate = 6 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14)) + actualTargetItemsUpdated = c.TargetItems() + assert.Len(t, actualTargetItemsUpdated, numItemsUpdate) + + // Adding collectors to test allocation + numCols := 2 + cols := MakeNCollectors(2, 0) + c.SetCollectors(cols) + + // Checking to see that there is no change to number of targets + actualTargetItems = c.TargetItems() + assert.Len(t, actualTargetItems, numItemsUpdate) + // Checking to see collectors are added correctly + actualCollectors := c.Collectors() + assert.Len(t, actualCollectors, numCols) + // Based on lable all targets should be assigned to node-0 + for name, ac := range actualCollectors { + if name == "node-0" { + assert.Equal(t, 6, ac.NumTargets) + } else { + assert.Equal(t, 0, ac.NumTargets) + } + } +} From 5d31f67f4a967c65900a74e540f37a20fc484e6d Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Tue, 6 Feb 2024 13:08:23 +0100 Subject: [PATCH 25/25] Fix test case assert file Signed-off-by: Matej Gera --- .../00-assert.yaml | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml index 4874342bf9..3875ffb539 100644 --- a/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml +++ b/tests/e2e-targetallocator/targetallocator-kubernetessd/00-assert.yaml @@ -26,24 +26,23 @@ metadata: data: collector.yaml: | exporters: - prometheus: - endpoint: 0.0.0.0:9090 - processors: null + prometheus: + endpoint: 0.0.0.0:9090 receivers: - prometheus: - config: {} - target_allocator: - collector_id: ${POD_NAME} - endpoint: http://prometheus-kubernetessd-targetallocator:80 - interval: 30s + prometheus: + config: {} + target_allocator: + collector_id: ${POD_NAME} + endpoint: http://prometheus-kubernetessd-targetallocator:80 + interval: 30s service: - pipelines: - metrics: - exporters: - - prometheus - processors: [] - receivers: - - prometheus + pipelines: + metrics: + exporters: + - prometheus + processors: [] + receivers: + - prometheus --- # Print TA pod logs if test fails apiVersion: kuttl.dev/v1beta1