Skip to content

Commit 02e44fb

Browse files
authored
[target-allocator] Introduce "per node" allocation strategy to target allocator (#2430)
1 parent 94c8420 commit 02e44fb

34 files changed

+801
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
5+
component: target allocator
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add new "per node" allocation strategy to target allocator. This strategy will allocate targets to nodes on which given target resides. It should only be used conjunction with the daemonset mode.
9+
10+
# One or more tracking issues related to the change
11+
issues: [1828]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

.github/workflows/e2e.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
- e2e-autoscale
3131
- e2e-pdb
3232
- e2e-opampbridge
33+
- e2e-targetallocator
3334
- e2e-prometheuscr
3435
- e2e-multi-instrumentation
3536
include:

Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ e2e-multi-instrumentation:
246246
e2e-opampbridge:
247247
$(KUTTL) test --config kuttl-test-opampbridge.yaml
248248

249+
# Target allocator end-to-tests
250+
.PHONY: e2e-targetallocator
251+
e2e-targetallocator:
252+
$(KUTTL) test --config kuttl-test-targetallocator.yaml
253+
249254
.PHONY: prepare-e2e
250255
prepare-e2e: kuttl set-image-controller add-image-targetallocator add-image-opampbridge container container-target-allocator container-operator-opamp-bridge start-kind cert-manager install-metrics-server install-targetallocator-prometheus-crds load-image-all deploy
251256

apis/v1alpha1/allocation_strategy.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ package v1alpha1
1616

1717
type (
1818
// OpenTelemetryTargetAllocatorAllocationStrategy represent which strategy to distribute target to each collector
19-
// +kubebuilder:validation:Enum=least-weighted;consistent-hashing
19+
// +kubebuilder:validation:Enum=least-weighted;consistent-hashing;per-node
2020
OpenTelemetryTargetAllocatorAllocationStrategy string
2121
)
2222

@@ -26,4 +26,7 @@ const (
2626

2727
// OpenTelemetryTargetAllocatorAllocationStrategyConsistentHashing targets will be consistently added to collectors, which allows a high-availability setup.
2828
OpenTelemetryTargetAllocatorAllocationStrategyConsistentHashing OpenTelemetryTargetAllocatorAllocationStrategy = "consistent-hashing"
29+
30+
// OpenTelemetryTargetAllocatorAllocationStrategyPerNode targets will be assigned to the collector on the node they reside on (use only with daemon set).
31+
OpenTelemetryTargetAllocatorAllocationStrategyPerNode OpenTelemetryTargetAllocatorAllocationStrategy = "per-node"
2932
)

apis/v1alpha1/collector_webhook.go

+43-23
Original file line numberDiff line numberDiff line change
@@ -228,32 +228,14 @@ func (c CollectorWebhook) validate(ctx context.Context, r *OpenTelemetryCollecto
228228
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'AdditionalContainers'", r.Spec.Mode)
229229
}
230230

231-
// validate target allocation
232-
if r.Spec.TargetAllocator.Enabled && r.Spec.Mode != ModeStatefulSet {
233-
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode)
234-
}
235-
236-
// validate Prometheus config for target allocation
231+
// validate target allocator configs
237232
if r.Spec.TargetAllocator.Enabled {
238-
promCfg, err := ta.ConfigToPromConfig(r.Spec.Config)
239-
if err != nil {
240-
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
241-
}
242-
err = ta.ValidatePromConfig(promCfg, r.Spec.TargetAllocator.Enabled, featuregate.EnableTargetAllocatorRewrite.IsEnabled())
243-
if err != nil {
244-
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
233+
taWarnings, err := c.validateTargetAllocatorConfig(ctx, r)
234+
if taWarnings != nil {
235+
warnings = append(warnings, taWarnings...)
245236
}
246-
err = ta.ValidateTargetAllocatorConfig(r.Spec.TargetAllocator.PrometheusCR.Enabled, promCfg)
247237
if err != nil {
248-
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
249-
}
250-
// if the prometheusCR is enabled, it needs a suite of permissions to function
251-
if r.Spec.TargetAllocator.PrometheusCR.Enabled {
252-
if subjectAccessReviews, err := c.reviewer.CheckPolicyRules(ctx, r.GetNamespace(), r.Spec.TargetAllocator.ServiceAccount, targetAllocatorCRPolicyRules...); err != nil {
253-
return warnings, fmt.Errorf("unable to check rbac rules %w", err)
254-
} else if allowed, deniedReviews := rbac.AllSubjectAccessReviewsAllowed(subjectAccessReviews); !allowed {
255-
warnings = append(warnings, warningsGroupedByResource(deniedReviews)...)
256-
}
238+
return warnings, err
257239
}
258240
}
259241

@@ -365,6 +347,44 @@ func (c CollectorWebhook) validate(ctx context.Context, r *OpenTelemetryCollecto
365347
return warnings, nil
366348
}
367349

350+
func (c CollectorWebhook) validateTargetAllocatorConfig(ctx context.Context, r *OpenTelemetryCollector) (admission.Warnings, error) {
351+
if r.Spec.Mode != ModeStatefulSet && r.Spec.Mode != ModeDaemonSet {
352+
return nil, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode)
353+
}
354+
355+
if r.Spec.Mode == ModeDaemonSet && r.Spec.TargetAllocator.AllocationStrategy != OpenTelemetryTargetAllocatorAllocationStrategyPerNode {
356+
return nil, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which must be used with target allocation strategy %s ", r.Spec.Mode, OpenTelemetryTargetAllocatorAllocationStrategyPerNode)
357+
}
358+
359+
if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet {
360+
return nil, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet)
361+
}
362+
363+
// validate Prometheus config for target allocation
364+
promCfg, err := ta.ConfigToPromConfig(r.Spec.Config)
365+
if err != nil {
366+
return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
367+
}
368+
err = ta.ValidatePromConfig(promCfg, r.Spec.TargetAllocator.Enabled, featuregate.EnableTargetAllocatorRewrite.IsEnabled())
369+
if err != nil {
370+
return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
371+
}
372+
err = ta.ValidateTargetAllocatorConfig(r.Spec.TargetAllocator.PrometheusCR.Enabled, promCfg)
373+
if err != nil {
374+
return nil, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
375+
}
376+
// if the prometheusCR is enabled, it needs a suite of permissions to function
377+
if r.Spec.TargetAllocator.PrometheusCR.Enabled {
378+
if subjectAccessReviews, err := c.reviewer.CheckPolicyRules(ctx, r.GetNamespace(), r.Spec.TargetAllocator.ServiceAccount, targetAllocatorCRPolicyRules...); err != nil {
379+
return nil, fmt.Errorf("unable to check rbac rules %w", err)
380+
} else if allowed, deniedReviews := rbac.AllSubjectAccessReviewsAllowed(subjectAccessReviews); !allowed {
381+
return warningsGroupedByResource(deniedReviews), nil
382+
}
383+
}
384+
385+
return nil, nil
386+
}
387+
368388
func checkAutoscalerSpec(autoscaler *AutoscalerSpec) error {
369389
if autoscaler.Behavior != nil {
370390
if autoscaler.Behavior.ScaleDown != nil && autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds != nil &&

apis/v1alpha1/collector_webhook_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,19 @@ func TestOTELColValidatingWebhook(t *testing.T) {
670670
},
671671
expectedErr: "the OpenTelemetry Spec Prometheus configuration is incorrect",
672672
},
673+
{
674+
name: "invalid target allocation strategy",
675+
otelcol: OpenTelemetryCollector{
676+
Spec: OpenTelemetryCollectorSpec{
677+
Mode: ModeDaemonSet,
678+
TargetAllocator: OpenTelemetryTargetAllocator{
679+
Enabled: true,
680+
AllocationStrategy: OpenTelemetryTargetAllocatorAllocationStrategyLeastWeighted,
681+
},
682+
},
683+
},
684+
expectedErr: "mode is set to daemonset, which must be used with target allocation strategy per-node",
685+
},
673686
{
674687
name: "invalid port name",
675688
otelcol: OpenTelemetryCollector{

apis/v1alpha1/opentelemetrycollector_types.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ type OpenTelemetryTargetAllocator struct {
305305
// +optional
306306
Resources v1.ResourceRequirements `json:"resources,omitempty"`
307307
// AllocationStrategy determines which strategy the target allocator should use for allocation.
308-
// The current options are least-weighted and consistent-hashing. The default option is consistent-hashing
308+
// The current options are least-weighted, consistent-hashing and per-node. The default is
309+
// consistent-hashing.
309310
// +optional
310311
// +kubebuilder:default:=consistent-hashing
311312
AllocationStrategy OpenTelemetryTargetAllocatorAllocationStrategy `json:"allocationStrategy,omitempty"`

bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -5067,11 +5067,12 @@ spec:
50675067
default: consistent-hashing
50685068
description: AllocationStrategy determines which strategy the
50695069
target allocator should use for allocation. The current options
5070-
are least-weighted and consistent-hashing. The default option
5071-
is consistent-hashing
5070+
are least-weighted, consistent-hashing and per-node. The default
5071+
is consistent-hashing.
50725072
enum:
50735073
- least-weighted
50745074
- consistent-hashing
5075+
- per-node
50755076
type: string
50765077
enabled:
50775078
description: Enabled indicates whether to use a target allocation

cmd/otel-allocator/allocation/allocatortest.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
5151
toReturn[collector] = &Collector{
5252
Name: collector,
5353
NumTargets: 0,
54+
NodeName: fmt.Sprintf("node-%d", i),
5455
}
5556
}
5657
return toReturn
@@ -60,8 +61,9 @@ func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*ta
6061
toReturn := map[string]*target.Item{}
6162
for i := startingIndex; i < n+startingIndex; i++ {
6263
label := model.LabelSet{
63-
"i": model.LabelValue(strconv.Itoa(i)),
64-
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
64+
"i": model.LabelValue(strconv.Itoa(i)),
65+
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
66+
"__meta_kubernetes_pod_node_name": model.LabelValue("node-0"),
6567
}
6668
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), fmt.Sprintf("test-url-%d", i), label, "")
6769
toReturn[newTarget.Hash()] = newTarget

cmd/otel-allocator/allocation/consistent_hashing.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
161161
}
162162
// Insert the new collectors
163163
for _, i := range diff.Additions() {
164-
c.collectors[i.Name] = NewCollector(i.Name)
164+
c.collectors[i.Name] = NewCollector(i.Name, i.NodeName)
165165
c.consistentHasher.Add(c.collectors[i.Name])
166166
}
167167

cmd/otel-allocator/allocation/least_weighted.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
191191
}
192192
// Insert the new collectors
193193
for _, i := range diff.Additions() {
194-
allocator.collectors[i.Name] = NewCollector(i.Name)
194+
allocator.collectors[i.Name] = NewCollector(i.Name, i.NodeName)
195195
}
196196
if allocateTargets {
197197
for _, item := range allocator.targetItems {

0 commit comments

Comments
 (0)