Skip to content

Commit 5677178

Browse files
committed
Fix default internal telemetry endpoint for collector
#3730
1 parent 17e463a commit 5677178

File tree

3 files changed

+103
-20
lines changed

3 files changed

+103
-20
lines changed

apis/v1beta1/collector_webhook_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212

1313
"github.com/go-logr/logr"
14+
"github.com/google/go-cmp/cmp"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"gopkg.in/yaml.v3"
@@ -158,7 +159,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
158159
Mode: v1beta1.ModeDeployment,
159160
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
160161
Config: func() v1beta1.Config {
161-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"0.0.0.0:8888"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
162+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"0.0.0.0","port":8888}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
162163
var cfg v1beta1.Config
163164
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
164165
return cfg
@@ -171,7 +172,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
171172
otelcol: v1beta1.OpenTelemetryCollector{
172173
Spec: v1beta1.OpenTelemetryCollectorSpec{
173174
Config: func() v1beta1.Config {
174-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"1.2.3.4:7654"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
175+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"localhost","port":9999}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
175176
var cfg v1beta1.Config
176177
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
177178
return cfg
@@ -190,7 +191,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
190191
Mode: v1beta1.ModeDeployment,
191192
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
192193
Config: func() v1beta1.Config {
193-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"1.2.3.4:7654"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
194+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"localhost","port":9999}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
194195
var cfg v1beta1.Config
195196
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
196197
return cfg
@@ -547,7 +548,9 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
547548
assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(logr.Discard()), "could not apply defaults")
548549
}
549550
assert.NoError(t, err)
550-
assert.Equal(t, test.expected, test.otelcol)
551+
if diff := cmp.Diff(test.expected, test.otelcol); diff != "" {
552+
t.Errorf("v1beta1.OpenTelemetryCollector mismatch (-want +got):\n%s", diff)
553+
}
551554
})
552555
}
553556
}

apis/v1beta1/config.go

+70-12
Original file line numberDiff line numberDiff line change
@@ -474,23 +474,81 @@ func (s *Service) ApplyDefaults(logger logr.Logger) error {
474474
return err
475475
}
476476

477-
tm := &AnyConfig{
478-
Object: map[string]interface{}{
479-
"metrics": map[string]interface{}{
480-
"address": fmt.Sprintf("%s:%d", telemetryAddr, telemetryPort),
481-
},
482-
},
477+
if s.Telemetry == nil {
478+
s.Telemetry = &AnyConfig{
479+
Object: map[string]any{},
480+
}
483481
}
484482

485-
if s.Telemetry == nil {
486-
s.Telemetry = tm
483+
metrics, ok := s.Telemetry.Object["metrics"]
484+
if !ok {
485+
metrics = map[string]any{}
486+
s.Telemetry.Object["metrics"] = metrics
487+
}
488+
metricsMap, ok := metrics.(map[string]any)
489+
if !ok {
490+
logger.Info("metrics field is not a map")
491+
return nil
492+
}
493+
494+
readers, ok := metricsMap["readers"]
495+
if !ok {
496+
readers = []any{}
497+
metricsMap["readers"] = readers
498+
}
499+
readersSlice, ok := readers.([]any)
500+
if !ok {
501+
logger.Info("readers field is not a slice")
487502
return nil
488503
}
489-
// NOTE: Merge without overwrite. If a telemetry endpoint is specified, the defaulting
490-
// respects the configuration and returns an equal value.
491-
if err := mergo.Merge(s.Telemetry, tm); err != nil {
492-
return fmt.Errorf("telemetry config merge failed: %w", err)
504+
505+
var found bool
506+
for _, reader := range readersSlice {
507+
readerMap, ok := reader.(map[string]any)
508+
if !ok {
509+
logger.Info("reader field is not a map")
510+
return nil
511+
}
512+
513+
pull, ok := readerMap["pull"]
514+
if !ok {
515+
continue
516+
}
517+
pullMap, ok := pull.(map[string]any)
518+
if !ok {
519+
logger.Info("pull field is not a map")
520+
return nil
521+
}
522+
523+
exporter, ok := pullMap["exporter"]
524+
if !ok {
525+
continue
526+
}
527+
exporterMap, ok := exporter.(map[string]any)
528+
if !ok {
529+
logger.Info("exporter field is not a map")
530+
return nil
531+
}
532+
533+
if _, ok := exporterMap["prometheus"]; ok {
534+
found = true
535+
break
536+
}
537+
}
538+
if !found {
539+
readersSlice = append(readersSlice, map[string]any{
540+
"pull": map[string]any{
541+
"exporter": map[string]any{
542+
"prometheus": map[string]any{
543+
"host": telemetryAddr,
544+
"port": int(telemetryPort),
545+
},
546+
},
547+
},
548+
})
549+
metricsMap["readers"] = readersSlice
493550
}
551+
494552
return nil
495553
}
496554

pkg/collector/upgrade/v0_111_0_test.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,20 @@ func Test0_111_0Upgrade(t *testing.T) {
3434
defaultCollectorWithConfig := defaultCollector.DeepCopy()
3535

3636
defaultCollectorWithConfig.Spec.Config.Service.Telemetry = &v1beta1.AnyConfig{
37-
Object: map[string]interface{}{
38-
"metrics": map[string]interface{}{
39-
"address": "1.2.3.4:8888",
37+
Object: map[string]any{
38+
"metrics": map[string]any{
39+
"readers": []any{
40+
map[string]any{
41+
"pull": map[string]any{
42+
"exporter": map[string]any{
43+
"prometheus": map[string]any{
44+
"host": "1.2.3.4",
45+
"port": 8888,
46+
},
47+
},
48+
},
49+
},
50+
},
4051
},
4152
},
4253
}
@@ -59,7 +70,18 @@ func Test0_111_0Upgrade(t *testing.T) {
5970
col.Spec.Config.Service.Telemetry = &v1beta1.AnyConfig{
6071
Object: map[string]interface{}{
6172
"metrics": map[string]interface{}{
62-
"address": "0.0.0.0:8888",
73+
"readers": []any{
74+
map[string]any{
75+
"pull": map[string]any{
76+
"exporter": map[string]any{
77+
"prometheus": map[string]any{
78+
"host": "0.0.0.0",
79+
"port": 8888,
80+
},
81+
},
82+
},
83+
},
84+
},
6385
},
6486
},
6587
}

0 commit comments

Comments
 (0)