From 25266262f97406b24a0e81d7f7ab1b08f57358ca Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Fri, 4 Oct 2024 19:12:09 +0200 Subject: [PATCH 1/5] operator: expose receivers by default Signed-off-by: Benedikt Bongartz --- .chloggen/add_receiver_defaults.yaml | 16 +++++ apis/v1beta1/collector_webhook.go | 2 +- apis/v1beta1/collector_webhook_test.go | 33 +++++++++ apis/v1beta1/config.go | 71 +++++++++++++++++++ internal/components/builder.go | 46 +++++++----- internal/components/component.go | 7 ++ internal/components/generic_parser.go | 29 ++++++-- internal/components/multi_endpoint.go | 49 ++++++++++++- internal/components/receivers/helpers.go | 2 + internal/components/single_endpoint.go | 27 ++++++- pkg/collector/upgrade/upgrade_test.go | 2 +- .../02-assert.yaml | 6 +- .../01-assert.yaml | 6 +- tests/e2e/managed-reconcile/02-assert.yaml | 8 ++- tests/e2e/multiple-configmaps/00-assert.yaml | 2 +- .../e2e/smoke-targetallocator/00-assert.yaml | 5 +- tests/e2e/statefulset-features/00-assert.yaml | 2 +- tests/e2e/statefulset-features/01-assert.yaml | 2 +- tests/e2e/versioned-configmaps/00-assert.yaml | 4 +- tests/e2e/versioned-configmaps/01-assert.yaml | 6 +- 20 files changed, 281 insertions(+), 44 deletions(-) create mode 100755 .chloggen/add_receiver_defaults.yaml diff --git a/.chloggen/add_receiver_defaults.yaml b/.chloggen/add_receiver_defaults.yaml new file mode 100755 index 0000000000..7ffaefb2d8 --- /dev/null +++ b/.chloggen/add_receiver_defaults.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: operator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use 0.0.0.0 as otlp receiver default address + +# One or more tracking issues related to the change +issues: [3126] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/apis/v1beta1/collector_webhook.go b/apis/v1beta1/collector_webhook.go index b1f8ddd91e..45744ef3a5 100644 --- a/apis/v1beta1/collector_webhook.go +++ b/apis/v1beta1/collector_webhook.go @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error { if len(otelcol.Spec.ManagementState) == 0 { otelcol.Spec.ManagementState = ManagementStateManaged } - return nil + return otelcol.Spec.Config.ApplyDefaults(c.logger) } func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { diff --git a/apis/v1beta1/collector_webhook_test.go b/apis/v1beta1/collector_webhook_test.go index 9ce9cd3c90..588a6cd3b8 100644 --- a/apis/v1beta1/collector_webhook_test.go +++ b/apis/v1beta1/collector_webhook_test.go @@ -144,6 +144,39 @@ func TestCollectorDefaultingWebhook(t *testing.T) { expected v1beta1.OpenTelemetryCollector shouldFailSar bool }{ + { + name: "update config defaults", + otelcol: v1beta1.OpenTelemetryCollector{ + Spec: v1beta1.OpenTelemetryCollectorSpec{ + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":null,"http":null}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + expected: v1beta1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + Spec: v1beta1.OpenTelemetryCollectorSpec{ + OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{ + Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"}, + ManagementState: v1beta1.ManagementStateManaged, + Replicas: &one, + }, + Mode: v1beta1.ModeDeployment, + UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic, + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + }, { name: "all fields default", otelcol: v1beta1.OpenTelemetryCollector{}, diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 0eb9af57e9..d8ad760691 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "reflect" + "slices" "sort" "strconv" "strings" @@ -225,6 +226,72 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds .. return ports, nil } +// getPortsForComponentKinds gets the ports for the given ComponentKind(s). +func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error { + enabledComponents := c.GetEnabledComponents() + for _, componentKind := range componentKinds { + var retriever components.ParserRetriever + var cfg AnyConfig + switch componentKind { + case KindReceiver: + retriever = receivers.ReceiverFor + cfg = c.Receivers + case KindExporter: + continue + case KindProcessor: + continue + case KindExtension: + continue + } + for componentName := range enabledComponents[componentKind] { + parser := retriever(componentName) + if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil { + return err + } else { + // NOTE: The internally used parser returns components.SingleEndpointConfig + // as a map value. The following lines normalize this value.. + got, err := yaml.Marshal(newCfg) + if err != nil { + return err + } + out := make(map[string]interface{}, 0) + if err := yaml.Unmarshal(got, out); err != nil { + return err + } + // NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress + // field. It is marshaled due to internal use. To avoid adding invalid fields to the + // collector config, this temporary workaround removes this field. + // TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method. + removeKeysRecursively(out, "listenaddress") + cfg.Object[componentName] = out + } + } + } + + return nil +} + +func removeKeysRecursively(m map[string]interface{}, keysToRemove ...string) { + for k, v := range m { + if slices.Contains(keysToRemove, k) { + delete(m, k) + continue + } + + if nm, ok := v.(map[string]interface{}); ok { + removeKeysRecursively(nm, keysToRemove...) + } + + if ns, ok := v.([]interface{}); ok { + for _, item := range ns { + if nestedMap, ok := item.(map[string]interface{}); ok { + removeKeysRecursively(nestedMap, keysToRemove...) + } + } + } + } +} + func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) { return c.getPortsForComponentKinds(logger, KindReceiver) } @@ -241,6 +308,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor) } +func (c *Config) ApplyDefaults(logger logr.Logger) error { + return c.applyDefaultForComponentKinds(logger, KindReceiver) +} + // GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled // that provides the hinting for the liveness probe. func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) { diff --git a/internal/components/builder.go b/internal/components/builder.go index be459cc513..7abc174703 100644 --- a/internal/components/builder.go +++ b/internal/components/builder.go @@ -26,16 +26,18 @@ import ( type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType]) type Settings[ComponentConfigType any] struct { - protocol corev1.Protocol - appProtocol *string - targetPort intstr.IntOrString - nodePort int32 - name string - port int32 - portParser PortParser[ComponentConfigType] - rbacGen RBACRuleGenerator[ComponentConfigType] - livenessGen ProbeGenerator[ComponentConfigType] - readinessGen ProbeGenerator[ComponentConfigType] + protocol corev1.Protocol + appProtocol *string + targetPort intstr.IntOrString + nodePort int32 + name string + port int32 + defaultRecAddr string + portParser PortParser[ComponentConfigType] + rbacGen RBACRuleGenerator[ComponentConfigType] + livenessGen ProbeGenerator[ComponentConfigType] + readinessGen ProbeGenerator[ComponentConfigType] + defaultsApplier Defaulter[ComponentConfigType] } func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] { @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build o.appProtocol = appProtocol }) } +func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultRecAddr = defaultRecAddr + }) +} func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] { return append(b, func(o *Settings[ComponentConfigType]) { o.targetPort = intstr.FromInt32(targetPort) @@ -118,6 +125,12 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat }) } +func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultsApplier = defaultsApplier + }) +} + func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) { o := NewEmptySettings[ComponentConfigType]() o.Apply(b...) @@ -125,12 +138,13 @@ func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigTyp return nil, fmt.Errorf("invalid settings struct, no name specified") } return &GenericParser[ComponentConfigType]{ - name: o.name, - portParser: o.portParser, - rbacGen: o.rbacGen, - livenessGen: o.livenessGen, - readinessGen: o.readinessGen, - settings: o, + name: o.name, + portParser: o.portParser, + rbacGen: o.rbacGen, + livenessGen: o.livenessGen, + readinessGen: o.readinessGen, + defaultsApplier: o.defaultsApplier, + settings: o, }, nil } diff --git a/internal/components/component.go b/internal/components/component.go index f97daba497..0e5a1be5bc 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config // It's expected that type Config is the configuration used by a parser. type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error) +// Defaulter is a function that applies given defaults to the passed Config. +// It's expected that type Config is the configuration used by a parser. +type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error) + // ComponentType returns the type for a given component name. // components have a name like: // - mycomponent/custom @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) { type ParserRetriever func(string) Parser type Parser interface { + // GetDefaultConfig returns a config with set default values. + GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) + // Ports returns the service ports parsed based on the component's configuration where name is the component's name // of the form "name" or "type/name" Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index 6bead9442c..58a5c5f9a5 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -30,12 +30,29 @@ var ( // GenericParser serves as scaffolding for custom parsing logic by isolating // functionality to idempotent functions. type GenericParser[T any] struct { - name string - settings *Settings[T] - portParser PortParser[T] - rbacGen RBACRuleGenerator[T] - livenessGen ProbeGenerator[T] - readinessGen ProbeGenerator[T] + name string + settings *Settings[T] + portParser PortParser[T] + rbacGen RBACRuleGenerator[T] + livenessGen ProbeGenerator[T] + readinessGen ProbeGenerator[T] + defaultsApplier Defaulter[T] +} + +func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + if g.settings == nil || g.defaultsApplier == nil { + return config, nil + } + + if g.settings.defaultRecAddr == "" { + return config, nil + } + + var parsed T + if err := mapstructure.Decode(config, &parsed); err != nil { + return nil, err + } + return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.port, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 261700bb17..e0f5fb7507 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -40,6 +40,7 @@ type MultiPortOption func(parser *MultiPortReceiver) type MultiPortReceiver struct { name string + addrMappings map[string]string portMappings map[string]*corev1.ServicePort } @@ -72,6 +73,37 @@ func (m *MultiPortReceiver) ParserName() string { return fmt.Sprintf("__%s", m.name) } +func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} + if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { + return nil, err + } + tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols)) + for protocol, ec := range multiProtoEndpointCfg.Protocols { + var port int32 + if defaultSvc, ok := m.portMappings[protocol]; ok { + port = defaultSvc.Port + if ec != nil { + port = ec.GetPortNumOrDefault(logger, port) + } + } + var addr string + if defaultAddr, ok := m.addrMappings[protocol]; ok { + addr = defaultAddr + } + res, err := AddressDefaulter(logger, addr, port, ec) + if err != nil { + return nil, err + } + tmp[protocol] = res + } + + for protocol, ec := range tmp { + multiProtoEndpointCfg.Protocols[protocol] = ec + } + return config, mapstructure.Decode(multiProtoEndpointCfg, &config) + +} func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { return nil, nil } @@ -91,7 +123,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd } func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] { - return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port) + return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter) } func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] { @@ -104,6 +136,7 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err } multiReceiver := &MultiPortReceiver{ name: mp[0].MustBuild().name, + addrMappings: map[string]string{}, portMappings: map[string]*corev1.ServicePort{}, } for _, bu := range mp[1:] { @@ -112,6 +145,9 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err return nil, err } multiReceiver.portMappings[built.name] = built.settings.GetServicePort() + if built.settings != nil { + multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr + } } return multiReceiver, nil } @@ -123,3 +159,14 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver { return p } } + +func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) { + for protocol, ec := range config.Protocols { + res, err := AddressDefaulter(logger, defaultRecAddr, port, ec) + if err != nil { + return nil, err + } + config.Protocols[protocol].Endpoint = res.Endpoint + } + return config, nil +} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 89a3cb6fe7..8823101cae 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -52,9 +52,11 @@ var ( components.NewMultiPortReceiverBuilder("otlp"). AddPortMapping(components.NewProtocolBuilder("grpc", 4317). WithAppProtocol(&components.GrpcProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4317)). AddPortMapping(components.NewProtocolBuilder("http", 4318). WithAppProtocol(&components.HttpProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4318)). MustBuild(), components.NewMultiPortReceiverBuilder("skywalking"). diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index 914136b568..cde447afb3 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -15,6 +15,9 @@ package components import ( + "fmt" + "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -80,9 +83,29 @@ func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently b } func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { - return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint) + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint).WithDefaultsApplier(AddressDefaulter) } func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { - return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent) + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter) +} + +func AddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *SingleEndpointConfig) (*SingleEndpointConfig, error) { + if config == nil { + config = &SingleEndpointConfig{} + } + if config.Endpoint == "" { + config.Endpoint = fmt.Sprintf("%s:%d", defaultRecAddr, port) + return config, nil + } + + v := strings.Split(config.Endpoint, ":") + if len(v) < 2 { + return config, nil + } + + if v[0] == "" { + config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1]) + } + return config, nil } diff --git a/pkg/collector/upgrade/upgrade_test.go b/pkg/collector/upgrade/upgrade_test.go index 40e6282f8e..d687e96faf 100644 --- a/pkg/collector/upgrade/upgrade_test.go +++ b/pkg/collector/upgrade/upgrade_test.go @@ -108,7 +108,7 @@ func TestEnvVarUpdates(t *testing.T) { Config: v1beta1.Config{ Receivers: v1beta1.AnyConfig{ Object: map[string]interface{}{ - "prometheus": []interface{}{}, + "prometheus": nil, }, }, Exporters: v1beta1.AnyConfig{ diff --git a/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer-go/02-assert.yaml b/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer-go/02-assert.yaml index a29e431199..5e50fb65d0 100644 --- a/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer-go/02-assert.yaml +++ b/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer-go/02-assert.yaml @@ -84,8 +84,10 @@ spec: receivers: otlp: protocols: - grpc: null - http: null + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 exporters: debug: null service: diff --git a/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer/01-assert.yaml b/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer/01-assert.yaml index 8edeeb6306..616b048e75 100644 --- a/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer/01-assert.yaml +++ b/tests/e2e-multi-instrumentation/instrumentation-multi-multicontainer/01-assert.yaml @@ -323,8 +323,10 @@ spec: receivers: otlp: protocols: - grpc: null - http: null + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 exporters: debug: null service: diff --git a/tests/e2e/managed-reconcile/02-assert.yaml b/tests/e2e/managed-reconcile/02-assert.yaml index ce624d51e5..0a8f5c29bf 100644 --- a/tests/e2e/managed-reconcile/02-assert.yaml +++ b/tests/e2e/managed-reconcile/02-assert.yaml @@ -52,14 +52,16 @@ spec: apiVersion: v1 kind: ConfigMap metadata: - name: simplest-collector-ea71c537 + name: simplest-collector-a85e451c data: collector.yaml: | receivers: otlp: protocols: - grpc: null - http: null + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 exporters: debug: null service: diff --git a/tests/e2e/multiple-configmaps/00-assert.yaml b/tests/e2e/multiple-configmaps/00-assert.yaml index b5555b7a90..54fca05399 100644 --- a/tests/e2e/multiple-configmaps/00-assert.yaml +++ b/tests/e2e/multiple-configmaps/00-assert.yaml @@ -25,7 +25,7 @@ spec: volumes: - name: otc-internal configMap: - name: simplest-with-configmaps-collector-ea71c537 + name: simplest-with-configmaps-collector-a85e451c items: - key: collector.yaml path: collector.yaml diff --git a/tests/e2e/smoke-targetallocator/00-assert.yaml b/tests/e2e/smoke-targetallocator/00-assert.yaml index 14e41f7002..8b1e1a5645 100644 --- a/tests/e2e/smoke-targetallocator/00-assert.yaml +++ b/tests/e2e/smoke-targetallocator/00-assert.yaml @@ -28,7 +28,8 @@ data: receivers: jaeger: protocols: - grpc: null + grpc: + endpoint: :14250 prometheus: config: global: @@ -51,4 +52,4 @@ data: - jaeger kind: ConfigMap metadata: - name: stateful-collector-eebfc9d2 + name: stateful-collector-a65c7bf4 diff --git a/tests/e2e/statefulset-features/00-assert.yaml b/tests/e2e/statefulset-features/00-assert.yaml index 9b5d865a34..3d6d879895 100644 --- a/tests/e2e/statefulset-features/00-assert.yaml +++ b/tests/e2e/statefulset-features/00-assert.yaml @@ -21,7 +21,7 @@ spec: items: - key: collector.yaml path: collector.yaml - name: stateful-collector-81dcbcb5 + name: stateful-collector-1e082e0e name: otc-internal - emptyDir: {} name: testvolume diff --git a/tests/e2e/statefulset-features/01-assert.yaml b/tests/e2e/statefulset-features/01-assert.yaml index 93855eeb85..72d1eb3013 100644 --- a/tests/e2e/statefulset-features/01-assert.yaml +++ b/tests/e2e/statefulset-features/01-assert.yaml @@ -21,7 +21,7 @@ spec: items: - key: collector.yaml path: collector.yaml - name: stateful-collector-81dcbcb5 + name: stateful-collector-1e082e0e name: otc-internal - emptyDir: {} name: testvolume diff --git a/tests/e2e/versioned-configmaps/00-assert.yaml b/tests/e2e/versioned-configmaps/00-assert.yaml index 09b5d13d9e..a1b499db1f 100644 --- a/tests/e2e/versioned-configmaps/00-assert.yaml +++ b/tests/e2e/versioned-configmaps/00-assert.yaml @@ -9,11 +9,11 @@ spec: volumes: - name: otc-internal configMap: - name: simple-collector-d6f40475 + name: simple-collector-bf36603a status: readyReplicas: 1 --- apiVersion: v1 kind: ConfigMap metadata: - name: simple-collector-d6f40475 + name: simple-collector-bf36603a diff --git a/tests/e2e/versioned-configmaps/01-assert.yaml b/tests/e2e/versioned-configmaps/01-assert.yaml index d480715810..169568e53a 100644 --- a/tests/e2e/versioned-configmaps/01-assert.yaml +++ b/tests/e2e/versioned-configmaps/01-assert.yaml @@ -9,16 +9,16 @@ spec: volumes: - name: otc-internal configMap: - name: simple-collector-8cd615bf + name: simple-collector-024c6417 status: readyReplicas: 1 --- apiVersion: v1 kind: ConfigMap metadata: - name: simple-collector-8cd615bf + name: simple-collector-024c6417 --- apiVersion: v1 kind: ConfigMap metadata: - name: simple-collector-d6f40475 + name: simple-collector-bf36603a From 6df1e406eaf20f4dc273d2789fbf2ce4b6c13149 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Sun, 6 Oct 2024 02:22:11 +0200 Subject: [PATCH 2/5] singleEndpointConfig: use omitempty Signed-off-by: Benedikt Bongartz --- apis/v1beta1/config.go | 23 ----------------------- internal/components/generic_parser.go | 2 +- internal/components/single_endpoint.go | 4 ++-- 3 files changed, 3 insertions(+), 26 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index d8ad760691..6310f166a9 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -20,7 +20,6 @@ import ( "fmt" "net" "reflect" - "slices" "sort" "strconv" "strings" @@ -262,7 +261,6 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind // field. It is marshaled due to internal use. To avoid adding invalid fields to the // collector config, this temporary workaround removes this field. // TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method. - removeKeysRecursively(out, "listenaddress") cfg.Object[componentName] = out } } @@ -271,27 +269,6 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind return nil } -func removeKeysRecursively(m map[string]interface{}, keysToRemove ...string) { - for k, v := range m { - if slices.Contains(keysToRemove, k) { - delete(m, k) - continue - } - - if nm, ok := v.(map[string]interface{}); ok { - removeKeysRecursively(nm, keysToRemove...) - } - - if ns, ok := v.([]interface{}); ok { - for _, item := range ns { - if nestedMap, ok := item.(map[string]interface{}); ok { - removeKeysRecursively(nestedMap, keysToRemove...) - } - } - } - } -} - func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) { return c.getPortsForComponentKinds(logger, KindReceiver) } diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index 58a5c5f9a5..c27521a9ce 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -52,7 +52,7 @@ func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface if err := mapstructure.Decode(config, &parsed); err != nil { return nil, err } - return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.port, parsed) + return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.GetServicePort().Port, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index cde447afb3..4a6c8d41a2 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -31,8 +31,8 @@ var ( // SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either // endpoint or listen_address. type SingleEndpointConfig struct { - Endpoint string `mapstructure:"endpoint,omitempty"` - ListenAddress string `mapstructure:"listen_address,omitempty"` + Endpoint string `mapstructure:"endpoint,omitempty" yaml:"endpoint,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty" yaml:"listen_address,omitempty"` } func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) int32 { From edf855402705d17f554c0fd453a1e58b4f5b84a7 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Sun, 6 Oct 2024 02:24:13 +0200 Subject: [PATCH 3/5] components: apply MultiAddressDefaulter Signed-off-by: Benedikt Bongartz --- apis/v1beta1/config.go | 22 ++------ internal/components/component.go | 4 +- internal/components/generic_parser.go | 2 +- internal/components/multi_endpoint.go | 69 ++++++++++++++++++-------- internal/components/single_endpoint.go | 22 ++++---- 5 files changed, 67 insertions(+), 52 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 6310f166a9..494fb5dc99 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -225,7 +225,7 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds .. return ports, nil } -// getPortsForComponentKinds gets the ports for the given ComponentKind(s). +// applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s). func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error { enabledComponents := c.GetEnabledComponents() for _, componentKind := range componentKinds { @@ -244,25 +244,11 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind } for componentName := range enabledComponents[componentKind] { parser := retriever(componentName) - if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil { + newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]) + if err != nil { return err - } else { - // NOTE: The internally used parser returns components.SingleEndpointConfig - // as a map value. The following lines normalize this value.. - got, err := yaml.Marshal(newCfg) - if err != nil { - return err - } - out := make(map[string]interface{}, 0) - if err := yaml.Unmarshal(got, out); err != nil { - return err - } - // NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress - // field. It is marshaled due to internal use. To avoid adding invalid fields to the - // collector config, this temporary workaround removes this field. - // TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method. - cfg.Object[componentName] = out } + cfg.Object[componentName] = newCfg } } diff --git a/internal/components/component.go b/internal/components/component.go index 0e5a1be5bc..3b5990c0cd 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -38,6 +38,8 @@ type PortRetriever interface { GetPortNumOrDefault(logr.Logger, int32) int32 } +type AddressProvider = func(name string) (address string, port int32) + // PortParser is a function that returns a list of servicePorts given a config of type Config. type PortParser[ComponentConfigType any] func(logger logr.Logger, name string, defaultPort *corev1.ServicePort, config ComponentConfigType) ([]corev1.ServicePort, error) @@ -51,7 +53,7 @@ type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config Com // Defaulter is a function that applies given defaults to the passed Config. // It's expected that type Config is the configuration used by a parser. -type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error) +type Defaulter[ComponentConfigType any] func(logger logr.Logger, addrProv AddressProvider, config ComponentConfigType) (map[string]interface{}, error) // ComponentType returns the type for a given component name. // components have a name like: diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index c27521a9ce..aff68c9c07 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -52,7 +52,7 @@ func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface if err := mapstructure.Decode(config, &parsed); err != nil { return nil, err } - return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.GetServicePort().Port, parsed) + return g.defaultsApplier(logger, func(string) (string, int32) { return g.settings.defaultRecAddr, g.settings.GetServicePort().Port }, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index e0f5fb7507..c3503250f7 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -40,8 +40,9 @@ type MultiPortOption func(parser *MultiPortReceiver) type MultiPortReceiver struct { name string - addrMappings map[string]string - portMappings map[string]*corev1.ServicePort + addrMappings map[string]string + portMappings map[string]*corev1.ServicePort + defaultsApplier Defaulter[any] } func (m *MultiPortReceiver) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { @@ -78,11 +79,12 @@ func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interfac if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { return nil, err } - tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols)) - for protocol, ec := range multiProtoEndpointCfg.Protocols { + + defaulter := func(protocol string) (string, int32) { var port int32 if defaultSvc, ok := m.portMappings[protocol]; ok { port = defaultSvc.Port + ec := multiProtoEndpointCfg.Protocols[protocol] if ec != nil { port = ec.GetPortNumOrDefault(logger, port) } @@ -91,19 +93,12 @@ func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interfac if defaultAddr, ok := m.addrMappings[protocol]; ok { addr = defaultAddr } - res, err := AddressDefaulter(logger, addr, port, ec) - if err != nil { - return nil, err - } - tmp[protocol] = res - } - - for protocol, ec := range tmp { - multiProtoEndpointCfg.Protocols[protocol] = ec + return addr, port } - return config, mapstructure.Decode(multiProtoEndpointCfg, &config) + return m.defaultsApplier(logger, defaulter, multiProtoEndpointCfg) } + func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { return nil, nil } @@ -134,16 +129,20 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err if len(mp) < 1 { return nil, fmt.Errorf("must provide at least one port mapping") } + + mp0Defaulter := mp[0].MustBuild().defaultsApplier multiReceiver := &MultiPortReceiver{ - name: mp[0].MustBuild().name, - addrMappings: map[string]string{}, - portMappings: map[string]*corev1.ServicePort{}, + name: mp[0].MustBuild().name, + defaultsApplier: createMultiAddressDefaulter(mp0Defaulter), + addrMappings: map[string]string{}, + portMappings: map[string]*corev1.ServicePort{}, } for _, bu := range mp[1:] { built, err := bu.Build() if err != nil { return nil, err } + multiReceiver.defaultsApplier = createMultiAddressDefaulter(built.defaultsApplier) multiReceiver.portMappings[built.name] = built.settings.GetServicePort() if built.settings != nil { multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr @@ -160,13 +159,41 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver { } } -func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) { +func createMultiAddressDefaulter[ComponentConfigType any](defaultsApplier Defaulter[ComponentConfigType]) Defaulter[any] { + return func(logger logr.Logger, addrProv AddressProvider, config any) (map[string]interface{}, error) { + tc, ok := config.(ComponentConfigType) + if !ok { + return nil, fmt.Errorf("invalid config type, expected ComponentConfigType") + } + + result, err := defaultsApplier(logger, addrProv, tc) + if err != nil { + return nil, err + } + + return result, nil + } +} + +func MultiAddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *MultiProtocolEndpointConfig) (map[string]interface{}, error) { + root := make(map[string]interface{}) + if err := mapstructure.Decode(config, &root); err != nil { + return nil, err + } + + proto, ok := root["protocols"].(map[string]interface{}) + if !ok { + proto = make(map[string]interface{}) + root["protocols"] = proto + } + for protocol, ec := range config.Protocols { - res, err := AddressDefaulter(logger, defaultRecAddr, port, ec) + res, err := AddressDefaulter(logger, func(string) (string, int32) { return addrProv(protocol) }, ec) if err != nil { return nil, err } - config.Protocols[protocol].Endpoint = res.Endpoint + proto[protocol] = res } - return config, nil + + return root, nil } diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index 4a6c8d41a2..5392938809 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-operator/internal/naming" @@ -90,22 +91,21 @@ func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEn return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter) } -func AddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *SingleEndpointConfig) (*SingleEndpointConfig, error) { +func AddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *SingleEndpointConfig) (map[string]interface{}, error) { if config == nil { config = &SingleEndpointConfig{} } + defaultRecAddr, port := addrProv("") + if config.Endpoint == "" { config.Endpoint = fmt.Sprintf("%s:%d", defaultRecAddr, port) - return config, nil - } - - v := strings.Split(config.Endpoint, ":") - if len(v) < 2 { - return config, nil + } else { + v := strings.Split(config.Endpoint, ":") + if len(v) < 2 || v[0] == "" { + config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[len(v)-1]) + } } - if v[0] == "" { - config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1]) - } - return config, nil + res := make(map[string]interface{}) + return res, mapstructure.Decode(config, &res) } From ff6c10505463e247a8ff727744287b3705e2a900 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 7 Oct 2024 10:41:43 -0400 Subject: [PATCH 4/5] simplify some logic, add tests --- apis/v1beta1/collector_webhook_test.go | 33 +++++ apis/v1beta1/config.go | 12 +- internal/components/component.go | 3 +- internal/components/generic_parser.go | 2 +- internal/components/generic_parser_test.go | 93 ++++++++++++ internal/components/multi_endpoint.go | 86 ++++-------- internal/components/multi_endpoint_test.go | 156 +++++++++++++++++++++ internal/components/single_endpoint.go | 6 +- 8 files changed, 322 insertions(+), 69 deletions(-) diff --git a/apis/v1beta1/collector_webhook_test.go b/apis/v1beta1/collector_webhook_test.go index 588a6cd3b8..9407c0f112 100644 --- a/apis/v1beta1/collector_webhook_test.go +++ b/apis/v1beta1/collector_webhook_test.go @@ -177,6 +177,39 @@ func TestCollectorDefaultingWebhook(t *testing.T) { }, }, }, + { + name: "update config defaults, leave other fields alone", + otelcol: v1beta1.OpenTelemetryCollector{ + Spec: v1beta1.OpenTelemetryCollectorSpec{ + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + expected: v1beta1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + Spec: v1beta1.OpenTelemetryCollectorSpec{ + OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{ + Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"}, + ManagementState: v1beta1.ManagementStateManaged, + Replicas: &one, + }, + Mode: v1beta1.ModeDeployment, + UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic, + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + }, { name: "all fields default", otelcol: v1beta1.OpenTelemetryCollector{}, diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 494fb5dc99..96383ed09a 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" + "dario.cat/mergo" "github.com/go-logr/logr" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" @@ -244,11 +245,18 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind } for componentName := range enabledComponents[componentKind] { parser := retriever(componentName) - newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]) + componentConf := cfg.Object[componentName] + newCfg, err := parser.GetDefaultConfig(logger, componentConf) if err != nil { return err } - cfg.Object[componentName] = newCfg + // We need to ensure we don't remove any fields in defaulting. + mappedCfg := newCfg.(map[string]interface{}) + err = mergo.Merge(&mappedCfg, componentConf) + if err != nil { + return err + } + cfg.Object[componentName] = mappedCfg } } diff --git a/internal/components/component.go b/internal/components/component.go index 3b5990c0cd..f713b7636c 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -53,7 +53,7 @@ type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config Com // Defaulter is a function that applies given defaults to the passed Config. // It's expected that type Config is the configuration used by a parser. -type Defaulter[ComponentConfigType any] func(logger logr.Logger, addrProv AddressProvider, config ComponentConfigType) (map[string]interface{}, error) +type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultAddr string, defaultPort int32, config ComponentConfigType) (map[string]interface{}, error) // ComponentType returns the type for a given component name. // components have a name like: @@ -94,6 +94,7 @@ type ParserRetriever func(string) Parser type Parser interface { // GetDefaultConfig returns a config with set default values. + // NOTE: Config merging must be done by the caller if desired. GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) // Ports returns the service ports parsed based on the component's configuration where name is the component's name diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index aff68c9c07..c27521a9ce 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -52,7 +52,7 @@ func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface if err := mapstructure.Decode(config, &parsed); err != nil { return nil, err } - return g.defaultsApplier(logger, func(string) (string, int32) { return g.settings.defaultRecAddr, g.settings.GetServicePort().Port }, parsed) + return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.GetServicePort().Port, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/generic_parser_test.go b/internal/components/generic_parser_test.go index 6059313454..0c1a2a65b0 100644 --- a/internal/components/generic_parser_test.go +++ b/internal/components/generic_parser_test.go @@ -372,3 +372,96 @@ func TestGenericParser_GetProbe(t *testing.T) { }) } } + +func TestGenericParser_GetDefaultConfig(t *testing.T) { + type args struct { + logger logr.Logger + config interface{} + } + type testCase[T any] struct { + name string + g *components.GenericParser[T] + args args + want interface{} + wantErr assert.ErrorAssertionFunc + } + + tests := []testCase[*components.SingleEndpointConfig]{ + { + name: "no settings or defaultsApplier returns config", + g: &components.GenericParser[*components.SingleEndpointConfig]{}, + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "endpoint": "http://localhost:8080", + }, + }, + want: map[string]interface{}{ + "endpoint": "http://localhost:8080", + }, + wantErr: assert.NoError, + }, + { + name: "empty defaultRecAddr returns config", + g: components.NewSinglePortParserBuilder("test", 0).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "endpoint": "http://localhost:8080", + }, + }, + want: map[string]interface{}{ + "endpoint": "http://localhost:8080", + }, + wantErr: assert.NoError, + }, + { + name: "valid settings with defaultsApplier", + g: components.NewSinglePortParserBuilder("test", 8080).WithDefaultRecAddress("127.0.0.1").WithDefaultsApplier(components.AddressDefaulter).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "endpoint": nil, + }, + }, + want: map[string]interface{}{ + "endpoint": "127.0.0.1:8080", + }, + wantErr: assert.NoError, + }, + { + name: "valid settings with defaultsApplier doesnt override", + g: components.NewSinglePortParserBuilder("test", 8080).WithDefaultRecAddress("127.0.0.1").WithDefaultsApplier(components.AddressDefaulter).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "endpoint": "127.0.0.1:9090", + }, + }, + want: map[string]interface{}{ + "endpoint": "127.0.0.1:9090", + }, + wantErr: assert.NoError, + }, + { + name: "invalid config fails to decode", + g: components.NewSinglePortParserBuilder("test", 8080).WithDefaultRecAddress("127.0.0.1").WithDefaultsApplier(components.AddressDefaulter).MustBuild(), + args: args{ + logger: logr.Discard(), + config: "invalid_config", + }, + want: nil, + wantErr: assert.Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.g.GetDefaultConfig(tt.args.logger, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("GetDefaultConfig(%v, %v)", tt.args.logger, tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetDefaultConfig(%v, %v)", tt.args.logger, tt.args.config) + }) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index c3503250f7..30162eeaee 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -40,9 +40,8 @@ type MultiPortOption func(parser *MultiPortReceiver) type MultiPortReceiver struct { name string - addrMappings map[string]string - portMappings map[string]*corev1.ServicePort - defaultsApplier Defaulter[any] + addrMappings map[string]string + portMappings map[string]*corev1.ServicePort } func (m *MultiPortReceiver) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { @@ -79,24 +78,29 @@ func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interfac if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { return nil, err } - - defaulter := func(protocol string) (string, int32) { - var port int32 + defaultedConfig := map[string]interface{}{} + for protocol, ec := range multiProtoEndpointCfg.Protocols { if defaultSvc, ok := m.portMappings[protocol]; ok { - port = defaultSvc.Port - ec := multiProtoEndpointCfg.Protocols[protocol] + port := defaultSvc.Port if ec != nil { port = ec.GetPortNumOrDefault(logger, port) } + var addr string + if defaultAddr, ok := m.addrMappings[protocol]; ok { + addr = defaultAddr + } + conf, err := AddressDefaulter(logger, addr, port, ec) + if err != nil { + return nil, err + } + defaultedConfig[protocol] = conf + } else { + return nil, fmt.Errorf("unknown protocol set: %s", protocol) } - var addr string - if defaultAddr, ok := m.addrMappings[protocol]; ok { - addr = defaultAddr - } - return addr, port } - - return m.defaultsApplier(logger, defaulter, multiProtoEndpointCfg) + return map[string]interface{}{ + "protocols": defaultedConfig, + }, nil } func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { @@ -118,7 +122,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd } func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] { - return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter) + return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port) } func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] { @@ -130,21 +134,18 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err return nil, fmt.Errorf("must provide at least one port mapping") } - mp0Defaulter := mp[0].MustBuild().defaultsApplier multiReceiver := &MultiPortReceiver{ - name: mp[0].MustBuild().name, - defaultsApplier: createMultiAddressDefaulter(mp0Defaulter), - addrMappings: map[string]string{}, - portMappings: map[string]*corev1.ServicePort{}, + name: mp[0].MustBuild().name, + addrMappings: map[string]string{}, + portMappings: map[string]*corev1.ServicePort{}, } for _, bu := range mp[1:] { built, err := bu.Build() if err != nil { return nil, err } - multiReceiver.defaultsApplier = createMultiAddressDefaulter(built.defaultsApplier) - multiReceiver.portMappings[built.name] = built.settings.GetServicePort() if built.settings != nil { + multiReceiver.portMappings[built.name] = built.settings.GetServicePort() multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr } } @@ -158,42 +159,3 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver { return p } } - -func createMultiAddressDefaulter[ComponentConfigType any](defaultsApplier Defaulter[ComponentConfigType]) Defaulter[any] { - return func(logger logr.Logger, addrProv AddressProvider, config any) (map[string]interface{}, error) { - tc, ok := config.(ComponentConfigType) - if !ok { - return nil, fmt.Errorf("invalid config type, expected ComponentConfigType") - } - - result, err := defaultsApplier(logger, addrProv, tc) - if err != nil { - return nil, err - } - - return result, nil - } -} - -func MultiAddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *MultiProtocolEndpointConfig) (map[string]interface{}, error) { - root := make(map[string]interface{}) - if err := mapstructure.Decode(config, &root); err != nil { - return nil, err - } - - proto, ok := root["protocols"].(map[string]interface{}) - if !ok { - proto = make(map[string]interface{}) - root["protocols"] = proto - } - - for protocol, ec := range config.Protocols { - res, err := AddressDefaulter(logger, func(string) (string, int32) { return addrProv(protocol) }, ec) - if err != nil { - return nil, err - } - proto[protocol] = res - } - - return root, nil -} diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go index 83baf24d5c..645e2b394c 100644 --- a/internal/components/multi_endpoint_test.go +++ b/internal/components/multi_endpoint_test.go @@ -368,6 +368,162 @@ func TestMultiPortReceiver_Ports(t *testing.T) { } } +func TestMultiPortReceiver_GetDefaultConfig(t *testing.T) { + type args struct { + logger logr.Logger + config interface{} + } + type testCase struct { + name string + m components.Parser + args args + want interface{} + wantErr assert.ErrorAssertionFunc + } + + tests := []testCase{ + { + name: "default config with single protocol settings", + m: components.NewMultiPortReceiverBuilder("receiver1"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithDefaultRecAddress("0.0.0.0"). + WithTargetPort(8080)).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": nil, + }, + }, + }, + want: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{ + "endpoint": "0.0.0.0:80", + }, + }, + }, + wantErr: assert.NoError, + }, + { + name: "default config with multiple protocol settings", + m: components.NewMultiPortReceiverBuilder("receiver1"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithDefaultRecAddress("0.0.0.0"). + WithTargetPort(8080)). + AddPortMapping(components.NewProtocolBuilder("grpc", 90). + WithDefaultRecAddress("0.0.0.0"). + WithTargetPort(8080)).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": nil, + "grpc": nil, + }, + }, + }, + want: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{ + "endpoint": "0.0.0.0:80", + }, + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:90", + }, + }, + }, + wantErr: assert.NoError, + }, + { + name: "default config with multiple protocol settings does not override", + m: components.NewMultiPortReceiverBuilder("receiver1"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithDefaultRecAddress("0.0.0.0"). + WithTargetPort(8080)). + AddPortMapping(components.NewProtocolBuilder("grpc", 90). + WithDefaultRecAddress("0.0.0.0"). + WithTargetPort(8080)).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{ + "endpoint": "0.0.0.0:8080", + }, + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:9090", + }, + }, + }, + }, + want: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{ + "endpoint": "0.0.0.0:8080", + }, + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:9090", + }, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with unknown protocol", + m: components.NewMultiPortReceiverBuilder("receiver1").MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "unknown": map[string]interface{}{ + "endpoint": "http://localhost", + }, + }, + }, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "config with missing default service port", + m: components.NewMultiPortReceiverBuilder("receiver1").MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{ + "listen_address": "0.0.0.0:8080", + }, + }, + }, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "invalid config fails to decode", + m: components.NewMultiPortReceiverBuilder("receiver1").MustBuild(), + args: args{ + logger: logr.Discard(), + config: "invalid_config", + }, + want: nil, + wantErr: assert.Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.m.GetDefaultConfig(tt.args.logger, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("GetDefaultConfig(%v, %v)", tt.args.logger, tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetDefaultConfig(%v, %v)", tt.args.logger, tt.args.config) + }) + } +} + func TestMultiMustBuildPanics(t *testing.T) { b := components.MultiPortBuilder[*components.MultiProtocolEndpointConfig]{} assert.Panics(t, func() { diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index 5392938809..51a6124b61 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -91,11 +91,10 @@ func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEn return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter) } -func AddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *SingleEndpointConfig) (map[string]interface{}, error) { +func AddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *SingleEndpointConfig) (map[string]interface{}, error) { if config == nil { config = &SingleEndpointConfig{} } - defaultRecAddr, port := addrProv("") if config.Endpoint == "" { config.Endpoint = fmt.Sprintf("%s:%d", defaultRecAddr, port) @@ -107,5 +106,6 @@ func AddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *Sing } res := make(map[string]interface{}) - return res, mapstructure.Decode(config, &res) + err := mapstructure.Decode(config, &res) + return res, err } From 54d7c7cc1b28322aea66ee0d3b5978c464f3d5b9 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Mon, 7 Oct 2024 17:46:57 +0200 Subject: [PATCH 5/5] config merge: handle nil values Signed-off-by: Benedikt Bongartz --- apis/v1beta1/config.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 96383ed09a..2d88c7617e 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -250,10 +250,18 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind if err != nil { return err } + // We need to ensure we don't remove any fields in defaulting. - mappedCfg := newCfg.(map[string]interface{}) - err = mergo.Merge(&mappedCfg, componentConf) - if err != nil { + mappedCfg, ok := newCfg.(map[string]interface{}) + if !ok || mappedCfg == nil { + logger.V(1).Info("returned default configuration invalid", + "warn", "could not apply component defaults", + "component", componentName, + ) + continue + } + + if err := mergo.Merge(&mappedCfg, componentConf); err != nil { return err } cfg.Object[componentName] = mappedCfg