Skip to content

Commit c5fdee5

Browse files
author
Israel Blancas
authored
Merge branch 'main' into 3370
2 parents dd66758 + 11c1fa9 commit c5fdee5

File tree

91 files changed

+5358
-104
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+5358
-104
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: collector
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Inject environment K8S_NODE_NAME environment variable for the Kubelet Stats Receiver.
9+
10+
# One or more tracking issues related to the change
11+
issues: [2779]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: collector
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: "Fix mutation of deployments, statefulsets, and daemonsets allowing to remove fields on update"
9+
10+
# One or more tracking issues related to the change
11+
issues: [2947]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

apis/v1beta1/config.go

+40
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,42 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
226226
return ports, nil
227227
}
228228

229+
// getEnvironmentVariablesForComponentKinds gets the environment variables for the given ComponentKind(s).
230+
func (c *Config) getEnvironmentVariablesForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) ([]corev1.EnvVar, error) {
231+
var envVars []corev1.EnvVar = []corev1.EnvVar{}
232+
enabledComponents := c.GetEnabledComponents()
233+
for _, componentKind := range componentKinds {
234+
var retriever components.ParserRetriever
235+
var cfg AnyConfig
236+
237+
switch componentKind {
238+
case KindReceiver:
239+
retriever = receivers.ReceiverFor
240+
cfg = c.Receivers
241+
case KindExporter:
242+
continue
243+
case KindProcessor:
244+
continue
245+
case KindExtension:
246+
continue
247+
}
248+
for componentName := range enabledComponents[componentKind] {
249+
parser := retriever(componentName)
250+
if parsedEnvVars, err := parser.GetEnvironmentVariables(logger, cfg.Object[componentName]); err != nil {
251+
return nil, err
252+
} else {
253+
envVars = append(envVars, parsedEnvVars...)
254+
}
255+
}
256+
}
257+
258+
sort.Slice(envVars, func(i, j int) bool {
259+
return envVars[i].Name < envVars[j].Name
260+
})
261+
262+
return envVars, nil
263+
}
264+
229265
// applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s).
230266
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
231267
if err := c.Service.ApplyDefaults(); err != nil {
@@ -286,6 +322,10 @@ func (c *Config) GetAllPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
286322
return c.getPortsForComponentKinds(logger, KindReceiver, KindExporter)
287323
}
288324

325+
func (c *Config) GetEnvironmentVariables(logger logr.Logger) ([]corev1.EnvVar, error) {
326+
return c.getEnvironmentVariablesForComponentKinds(logger, KindReceiver)
327+
}
328+
289329
func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error) {
290330
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
291331
}

apis/v1beta1/config_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,66 @@ func TestConfig_GetEnabledComponents(t *testing.T) {
423423
}
424424
}
425425

426+
func TestConfig_getEnvironmentVariablesForComponentKinds(t *testing.T) {
427+
tests := []struct {
428+
name string
429+
config *Config
430+
componentKinds []ComponentKind
431+
envVarsLen int
432+
}{
433+
{
434+
name: "no env vars",
435+
config: &Config{
436+
Receivers: AnyConfig{
437+
Object: map[string]interface{}{
438+
"myreceiver": map[string]interface{}{
439+
"env": "test",
440+
},
441+
},
442+
},
443+
Service: Service{
444+
Pipelines: map[string]*Pipeline{
445+
"test": {
446+
Receivers: []string{"myreceiver"},
447+
},
448+
},
449+
},
450+
},
451+
componentKinds: []ComponentKind{KindReceiver},
452+
envVarsLen: 0,
453+
},
454+
{
455+
name: "kubeletstats env vars",
456+
config: &Config{
457+
Receivers: AnyConfig{
458+
Object: map[string]interface{}{
459+
"kubeletstats": map[string]interface{}{},
460+
},
461+
},
462+
Service: Service{
463+
Pipelines: map[string]*Pipeline{
464+
"test": {
465+
Receivers: []string{"kubeletstats"},
466+
},
467+
},
468+
},
469+
},
470+
componentKinds: []ComponentKind{KindReceiver},
471+
envVarsLen: 1,
472+
},
473+
}
474+
475+
for _, tt := range tests {
476+
t.Run(tt.name, func(t *testing.T) {
477+
logger := logr.Discard()
478+
envVars, err := tt.config.GetEnvironmentVariables(logger)
479+
480+
assert.NoError(t, err)
481+
assert.Len(t, envVars, tt.envVarsLen)
482+
})
483+
}
484+
}
485+
426486
func TestConfig_GetReceiverPorts(t *testing.T) {
427487
tests := []struct {
428488
name string

cmd/otel-allocator/benchmark_test.go

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"os"
21+
"testing"
22+
23+
gokitlog "github.com/go-kit/log"
24+
"github.com/go-logr/logr"
25+
"github.com/prometheus/client_golang/prometheus"
26+
"github.com/prometheus/common/model"
27+
"github.com/prometheus/prometheus/discovery"
28+
"github.com/prometheus/prometheus/discovery/targetgroup"
29+
"github.com/prometheus/prometheus/model/relabel"
30+
ctrl "sigs.k8s.io/controller-runtime"
31+
"sigs.k8s.io/controller-runtime/pkg/log"
32+
33+
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
34+
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
35+
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/server"
36+
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
37+
)
38+
39+
// BenchmarkProcessTargets benchmarks the whole target allocation pipeline. It starts with data the prometheus
40+
// discovery manager would normally output, and pushes it all the way into the allocator. It notably doe *not* check
41+
// the HTTP server afterward. Test data is chosen to be reasonably representative of what the Prometheus service discovery
42+
// outputs in the real world.
43+
func BenchmarkProcessTargets(b *testing.B) {
44+
numTargets := 10000
45+
targetsPerGroup := 5
46+
groupsPerJob := 20
47+
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
48+
49+
b.ResetTimer()
50+
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
51+
b.Run(strategy, func(b *testing.B) {
52+
targetDiscoverer, allocator := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
53+
for i := 0; i < b.N; i++ {
54+
targetDiscoverer.ProcessTargets(tsets, allocator.SetTargets)
55+
}
56+
})
57+
}
58+
}
59+
60+
// BenchmarkProcessTargetsWithRelabelConfig is BenchmarkProcessTargets with a relabel config set. The relabel config
61+
// does not actually modify any records, but does force the prehook to perform any necessary conversions along the way.
62+
func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
63+
numTargets := 10000
64+
targetsPerGroup := 5
65+
groupsPerJob := 20
66+
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
67+
prehookConfig := make(map[string][]*relabel.Config, len(tsets))
68+
for jobName := range tsets {
69+
prehookConfig[jobName] = []*relabel.Config{
70+
{
71+
Action: "keep",
72+
Regex: relabel.MustNewRegexp(".*"),
73+
},
74+
}
75+
}
76+
77+
b.ResetTimer()
78+
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
79+
b.Run(strategy, func(b *testing.B) {
80+
targetDiscoverer, allocator := createTestDiscoverer(strategy, prehookConfig)
81+
for i := 0; i < b.N; i++ {
82+
targetDiscoverer.ProcessTargets(tsets, allocator.SetTargets)
83+
}
84+
})
85+
}
86+
}
87+
88+
func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[string][]*targetgroup.Group {
89+
numGroups := numTargets / targetsPerGroup
90+
numJobs := numGroups / groupsPerJob
91+
jobNamePrefix := "test-"
92+
groupLabels := model.LabelSet{
93+
"__meta_kubernetes_pod_controller_name": "example",
94+
"__meta_kubernetes_pod_ip": "10.244.0.251",
95+
"__meta_kubernetes_pod_uid": "676ebee7-14f8-481e-a937-d2affaec4105",
96+
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
97+
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
98+
"__meta_kubernetes_service_annotation_kubectl_kubernetes_io_last_applied_configuration": "{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"labels\":{\"app\":\"example\"},\"name\":\"example-svc\",\"namespace\":\"example\"},\"spec\":{\"clusterIP\":\"None\",\"ports\":[{\"name\":\"http-example\",\"port\":9006,\"targetPort\":9006}],\"selector\":{\"app\":\"example\"},\"type\":\"ClusterIP\"}}\n",
99+
"__meta_kubernetes_endpointslice_labelpresent_app": "true",
100+
"__meta_kubernetes_endpointslice_name": "example-svc-qgwxf",
101+
"__address__": "10.244.0.251:9006",
102+
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
103+
"__meta_kubernetes_pod_labelpresent_pod_template_hash": "true",
104+
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "example-svc",
105+
"__meta_kubernetes_endpointslice_labelpresent_service_kubernetes_io_headless": "true",
106+
"__meta_kubernetes_pod_label_pod_template_hash": "6b549885f8",
107+
"__meta_kubernetes_endpointslice_address_target_name": "example-6b549885f8-7tbcw",
108+
"__meta_kubernetes_pod_labelpresent_app": "true",
109+
"somelabel": "somevalue",
110+
}
111+
exampleTarget := model.LabelSet{
112+
"__meta_kubernetes_endpointslice_port": "9006",
113+
"__meta_kubernetes_service_label_app": "example",
114+
"__meta_kubernetes_endpointslice_port_name": "http-example",
115+
"__meta_kubernetes_pod_ready": "true",
116+
"__meta_kubernetes_endpointslice_address_type": "IPv4",
117+
"__meta_kubernetes_endpointslice_label_endpointslice_kubernetes_io_managed_by": "endpointslice-controller.k8s.io",
118+
"__meta_kubernetes_endpointslice_labelpresent_endpointslice_kubernetes_io_managed_by": "true",
119+
"__meta_kubernetes_endpointslice_label_app": "example",
120+
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
121+
"__meta_kubernetes_pod_phase": "Running",
122+
"__meta_kubernetes_pod_controller_kind": "ReplicaSet",
123+
"__meta_kubernetes_service_annotationpresent_kubectl_kubernetes_io_last_applied_configuration": "true",
124+
"__meta_kubernetes_service_labelpresent_app": "true",
125+
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
126+
"__meta_kubernetes_endpointslice_annotation_endpoints_kubernetes_io_last_change_trigger_time": "2023-09-27T16:01:29Z",
127+
"__meta_kubernetes_pod_name": "example-6b549885f8-7tbcw",
128+
"__meta_kubernetes_service_name": "example-svc",
129+
"__meta_kubernetes_namespace": "example",
130+
"__meta_kubernetes_endpointslice_annotationpresent_endpoints_kubernetes_io_last_change_trigger_time": "true",
131+
"__meta_kubernetes_pod_node_name": "kind-control-plane",
132+
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
133+
"__meta_kubernetes_pod_host_ip": "172.18.0.2",
134+
"__meta_kubernetes_endpointslice_label_service_kubernetes_io_headless": "",
135+
"__meta_kubernetes_pod_label_app": "example",
136+
}
137+
targets := []model.LabelSet{}
138+
for i := 0; i < numTargets; i++ {
139+
targets = append(targets, exampleTarget.Clone())
140+
}
141+
groups := make([]*targetgroup.Group, numGroups)
142+
for i := 0; i < numGroups; i++ {
143+
groupTargets := targets[(i * targetsPerGroup):(i*targetsPerGroup + targetsPerGroup)]
144+
groups[i] = &targetgroup.Group{
145+
Labels: groupLabels,
146+
Targets: groupTargets,
147+
}
148+
}
149+
tsets := make(map[string][]*targetgroup.Group, numJobs)
150+
for i := 0; i < numJobs; i++ {
151+
jobGroups := groups[(i * groupsPerJob):(i*groupsPerJob + groupsPerJob)]
152+
jobName := fmt.Sprintf("%s%d", jobNamePrefix, i)
153+
tsets[jobName] = jobGroups
154+
}
155+
return tsets
156+
}
157+
158+
func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) (*target.Discoverer, allocation.Allocator) {
159+
ctx := context.Background()
160+
logger := ctrl.Log.WithName(fmt.Sprintf("bench-%s", allocationStrategy))
161+
ctrl.SetLogger(logr.New(log.NullLogSink{}))
162+
allocatorPrehook := prehook.New("relabel-config", logger)
163+
allocatorPrehook.SetConfig(prehookConfig)
164+
allocator, err := allocation.New(allocationStrategy, logger, allocation.WithFilter(allocatorPrehook))
165+
srv := server.NewServer(logger, allocator, "localhost:0")
166+
if err != nil {
167+
setupLog.Error(err, "Unable to initialize allocation strategy")
168+
os.Exit(1)
169+
}
170+
registry := prometheus.NewRegistry()
171+
sdMetrics, _ := discovery.CreateAndRegisterSDMetrics(registry)
172+
discoveryManager := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics)
173+
targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv)
174+
return targetDiscoverer, allocator
175+
}

cmd/otel-allocator/target/discovery.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/common/model"
2525
promconfig "github.com/prometheus/prometheus/config"
2626
"github.com/prometheus/prometheus/discovery"
27+
"github.com/prometheus/prometheus/discovery/targetgroup"
2728
"github.com/prometheus/prometheus/model/relabel"
2829
"gopkg.in/yaml.v3"
2930

@@ -110,22 +111,26 @@ func (m *Discoverer) Watch(fn func(targets map[string]*Item)) error {
110111
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
111112
return nil
112113
case tsets := <-m.manager.SyncCh():
113-
targets := map[string]*Item{}
114-
115-
for jobName, tgs := range tsets {
116-
var count float64 = 0
117-
for _, tg := range tgs {
118-
for _, t := range tg.Targets {
119-
count++
120-
item := NewItem(jobName, string(t[model.AddressLabel]), t.Merge(tg.Labels), "")
121-
targets[item.Hash()] = item
122-
}
123-
}
124-
targetsDiscovered.WithLabelValues(jobName).Set(count)
114+
m.ProcessTargets(tsets, fn)
115+
}
116+
}
117+
}
118+
119+
func (m *Discoverer) ProcessTargets(tsets map[string][]*targetgroup.Group, fn func(targets map[string]*Item)) {
120+
targets := map[string]*Item{}
121+
122+
for jobName, tgs := range tsets {
123+
var count float64 = 0
124+
for _, tg := range tgs {
125+
for _, t := range tg.Targets {
126+
count++
127+
item := NewItem(jobName, string(t[model.AddressLabel]), t.Merge(tg.Labels), "")
128+
targets[item.Hash()] = item
125129
}
126-
fn(targets)
127130
}
131+
targetsDiscovered.WithLabelValues(jobName).Set(count)
128132
}
133+
fn(targets)
129134
}
130135

131136
func (m *Discoverer) Close() {

controllers/common.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logg
160160
op = result
161161
return createOrUpdateErr
162162
})
163-
if crudErr != nil && errors.Is(crudErr, manifests.ImmutableChangeErr) {
163+
if crudErr != nil && errors.As(crudErr, &manifests.ImmutableChangeErr) {
164164
l.Error(crudErr, "detected immutable field change, trying to delete, new object will be created on next reconcile", "existing", existing.GetName())
165165
delErr := kubeClient.Delete(ctx, existing)
166166
if delErr != nil {

0 commit comments

Comments
 (0)