Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary config marshalling wherever possible #2735

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Changes by Version
- `operator`: Automate the creation of the permissions needed by the resourcedetection processor (#2393)
- `operator`: Automate the creation of the permissions needed by the k8sattributes processor (#2395)
- `target allocator`: Change default allocation and filtering strategy (#2477)
- `operator`: Introduce common fields to the v1alpha2 types that can be reused for any CRDs. (#901)
- `operator`: Introduce common fields to the v1beta1 types that can be reused for any CRDs. (#901)
- `target allocator`: Use Pod securityContext for target allocator (#2495)
Bring back PodSecurityContext as it was removed in the previous changes.
- `bridge`: Sets pods in the component health map (#2489)
Expand Down
1 change: 0 additions & 1 deletion apis/v1alpha1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func (c CollectorWebhook) validateTargetAllocatorConfig(ctx context.Context, r *
if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet {
return nil, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet)
}

// validate Prometheus config for target allocation
promCfg, err := ta.ConfigToPromConfig(r.Spec.Config)
if err != nil {
Expand Down
58 changes: 50 additions & 8 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AnyConfig) DeepCopyInto(out *AnyConfig) {
*out = *in
if in.Object != nil {
in, out := &in.Object, &out.Object
func (c *AnyConfig) DeepCopyInto(out *AnyConfig) {
*out = *c
if c.Object != nil {
in, out := &c.Object, &out.Object
*out = make(map[string]interface{}, len(*in))
for key, val := range *in {
(*out)[key] = val
Expand All @@ -42,12 +42,12 @@
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AnyConfig.
func (in *AnyConfig) DeepCopy() *AnyConfig {
if in == nil {
func (c *AnyConfig) DeepCopy() *AnyConfig {
if c == nil {
return nil
}
out := new(AnyConfig)
in.DeepCopyInto(out)
c.DeepCopyInto(out)
return out
}

Expand Down Expand Up @@ -98,14 +98,56 @@
return buf.String(), nil
}

// MetricsConfig comes from the collector

Check failure on line 101 in apis/v1beta1/config.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

Comment should end in a period (godot)
type MetricsConfig struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be introduced in a separate PR?

// Level is the level of telemetry metrics, the possible values are:
// - "none" indicates that no telemetry data should be collected;
// - "basic" is the recommended and covers the basics of the service telemetry.
// - "normal" adds some other indicators on top of basic.
// - "detailed" adds dimensions and views to the previous levels.
Level string `json:"level,omitempty" yaml:"level,omitempty"`

// Address is the [address]:port that metrics exposition should be bound to.
Address string `json:"address,omitempty" yaml:"address,omitempty"`
}

type Telemetry struct {
Metrics MetricsConfig `json:"metrics,omitempty" yaml:"metrics,omitempty"`

// Resource specifies user-defined attributes to include with all emitted telemetry.
// Note that some attributes are added automatically (e.g. service.version) even
// if they are not specified here. In order to suppress such attributes the
// attribute must be specified in this map with null YAML value (nil string pointer).
Resource map[string]*string `json:"resource,omitempty" yaml:"resource,omitempty"`
}

type Service struct {
Extensions *[]string `json:"extensions,omitempty" yaml:"extensions,omitempty"`
Extensions []string `json:"extensions,omitempty" yaml:"extensions,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this will enforce marshalling of empty array for extensions. Is this necessary?

// +kubebuilder:pruning:PreserveUnknownFields
Telemetry *AnyConfig `json:"telemetry,omitempty" yaml:"telemetry,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Pipelines AnyConfig `json:"pipelines" yaml:"pipelines"`
}

// GetTelemetry serves as a helper function to access the fields we care about in the underlying telemetry struct.
// This exists to avoid needing to worry extra fields in the telemetry struct.
func (s *Service) GetTelemetry() *Telemetry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a test for this? Getting a telemetry from an existing yaml?

if s.Telemetry == nil {
return nil
}
// Convert map to JSON bytes
jsonData, err := json.Marshal(s.Telemetry)
if err != nil {
return nil
}
t := &Telemetry{}
// Unmarshal JSON into the provided struct
if err := json.Unmarshal(jsonData, t); err != nil {
return nil
}
return t
}

// Returns null objects in the config.
func (c Config) nullObjects() []string {
var nullKeys []string
Expand Down
9 changes: 6 additions & 3 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ func TestConfigYaml(t *testing.T) {
},
},
Service: Service{
Extensions: &[]string{"addon"},
Extensions: []string{"addon"},
Telemetry: &AnyConfig{
Object: map[string]interface{}{
"insights": "yeah!",
"metrics": map[string]interface{}{
"address": "0.0.0.0:9000",
},
},
},
Pipelines: AnyConfig{
Expand Down Expand Up @@ -171,7 +173,8 @@ service:
extensions:
- addon
telemetry:
insights: yeah!
metrics:
address: 0.0.0.0:9000
pipelines:
exporters:
- otlp/exporter
Expand Down
54 changes: 48 additions & 6 deletions apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/go-kit/log v0.2.1
github.com/go-logr/logr v1.4.1
github.com/json-iterator/go v1.1.12
github.com/mitchellh/mapstructure v1.5.0
github.com/oklog/run v1.1.0
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.12.0
Expand Down Expand Up @@ -161,6 +160,7 @@ require (
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
115 changes: 28 additions & 87 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
package adapters

import (
"fmt"
"net"
"sort"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter"
receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
Expand All @@ -42,94 +41,53 @@ func (c ComponentType) String() string {
return [...]string{"receiver", "exporter", "processor"}[c]
}

// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// ${instance.Name}-${exporter.name}-${exporter.qualifier}
// the exporter-name is typically the node name from the exporters map
// the exporter-qualifier is what comes after the slash in the exporter name, but typically nil
// examples:
// ```yaml
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())]
if !ok {
return nil, fmt.Errorf("no %ss available as part of the configuration", cType)
}

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String())
}

compEnabled := getEnabledComponents(config, cType)
func PortsForExporters(l logr.Logger, c v1beta1.Config) ([]corev1.ServicePort, error) {
compEnabled := getEnabledComponents(c.Service, ComponentTypeExporter)
return componentPorts(l, c.Exporters, exporterParser.For, compEnabled)
}

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType)
}
func PortsForReceivers(l logr.Logger, c v1beta1.Config) ([]corev1.ServicePort, error) {
compEnabled := getEnabledComponents(c.Service, ComponentTypeReceiver)
return componentPorts(l, c.Receivers, receiverParser.For, compEnabled)
}

ports := []corev1.ServicePort{}
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !compEnabled[key] {
func componentPorts(l logr.Logger, c v1beta1.AnyConfig, p parser.For, enabledComponents map[string]bool) ([]corev1.ServicePort, error) {
var ports []corev1.ServicePort
for cmptName, val := range c.Object {
if !enabledComponents[cmptName] {
continue
}
exporter, ok := val.(map[interface{}]interface{})
component, ok := val.(map[string]interface{})
if !ok {
logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key)
exporter = map[interface{}]interface{}{}
}

cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
switch cType {
case ComponentTypeExporter:
cmptParser, err = exporterParser.For(logger, cmptName, exporter)
case ComponentTypeReceiver:
cmptParser, err = receiverParser.For(logger, cmptName, exporter)
case ComponentTypeProcessor:
logger.V(4).Info("processors don't provide a way to enable associated ports", "name", key)
component = map[string]interface{}{}
}

componentParser, err := p(l, cmptName, component)
if err != nil {
logger.V(2).Info("no parser found for '%s'", cmptName)
l.V(2).Info("no parser found for '%s'", cmptName)
continue
}

exprtPorts, err := cmptParser.Ports()
componentPorts, err := componentParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
l.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
ports = append(ports, componentPorts...)
}

sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})

return ports, nil
}

func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config)
func ConfigToPorts(logger logr.Logger, config v1beta1.Config) ([]corev1.ServicePort, error) {
ports, err := PortsForReceivers(logger, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
return nil, err
}

exporterPorts, err := ConfigToComponentPorts(logger, ComponentTypeExporter, config)
exporterPorts, err := PortsForExporters(logger, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the exporters")
return nil, err
Expand All @@ -145,29 +103,12 @@ func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) ([]co
}

// ConfigToMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set.
func ConfigToMetricsPort(logger logr.Logger, config map[interface{}]interface{}) (int32, error) {
// we don't need to unmarshal the whole config, just follow the keys down to
// the metrics address.
type metricsCfg struct {
Address string
}
type telemetryCfg struct {
Metrics metricsCfg
}
type serviceCfg struct {
Telemetry telemetryCfg
}
type cfg struct {
Service serviceCfg
}

var cOut cfg
err := mapstructure.Decode(config, &cOut)
if err != nil {
return 0, err
func ConfigToMetricsPort(config v1beta1.Service) (int32, error) {
if config.GetTelemetry() == nil {
// telemetry isn't set, use the default
return 8888, nil
}

_, port, netErr := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address)
_, port, netErr := net.SplitHostPort(config.GetTelemetry().Metrics.Address)
if netErr != nil && strings.Contains(netErr.Error(), "missing port in address") {
return 8888, nil
} else if netErr != nil {
Expand Down
Loading
Loading