Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part two of config marshalling changes #2857

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/2603-part-two.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Changes the internal behavior of port generation to use the new config structs

# One or more tracking issues related to the change
issues: [2603]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/go-kit/log v0.2.1
github.com/go-logr/logr v1.4.1
github.com/json-iterator/go v1.1.12
github.com/mitchellh/mapstructure v1.5.0
github.com/oklog/run v1.1.0
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.14.0
Expand Down Expand Up @@ -161,6 +160,7 @@ require (
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
86 changes: 23 additions & 63 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package adapters

import (
"fmt"
"sort"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter"
receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
Expand All @@ -38,77 +38,37 @@ func (c ComponentType) String() string {
return [...]string{"receiver", "exporter", "processor"}[c]
}

// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// ${instance.Name}-${exporter.name}-${exporter.qualifier}
// the exporter-name is typically the node name from the exporters map
// the exporter-qualifier is what comes after the slash in the exporter name, but typically nil
// examples:
// ```yaml
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())]
if !ok {
return nil, fmt.Errorf("no %ss available as part of the configuration", cType)
}

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String())
}

compEnabled := getEnabledComponents(config, cType)
func PortsForExporters(l logr.Logger, c v1beta1.Config) ([]corev1.ServicePort, error) {
compEnabled := getEnabledComponents(c.Service, ComponentTypeExporter)
return componentPorts(l, c.Exporters, exporterParser.For, compEnabled)
}

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType)
}
func PortsForReceivers(l logr.Logger, c v1beta1.Config) ([]corev1.ServicePort, error) {
compEnabled := getEnabledComponents(c.Service, ComponentTypeReceiver)
return componentPorts(l, c.Receivers, receiverParser.For, compEnabled)
}

ports := []corev1.ServicePort{}
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !compEnabled[key] {
func componentPorts(l logr.Logger, c v1beta1.AnyConfig, p parser.For, enabledComponents map[string]bool) ([]corev1.ServicePort, error) {
var ports []corev1.ServicePort
for cmptName, val := range c.Object {
if !enabledComponents[cmptName] {
continue
}
exporter, ok := val.(map[interface{}]interface{})
component, ok := val.(map[string]interface{})
if !ok {
logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key)
exporter = map[interface{}]interface{}{}
component = map[string]interface{}{}
}

cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
switch cType {
case ComponentTypeExporter:
cmptParser, err = exporterParser.For(logger, cmptName, exporter)
case ComponentTypeReceiver:
cmptParser, err = receiverParser.For(logger, cmptName, exporter)
case ComponentTypeProcessor:
logger.V(4).Info("processors don't provide a way to enable associated ports", "name", key)
}

componentParser, err := p(l, cmptName, component)
if err != nil {
logger.V(2).Info("no parser found for", "component", cmptName)
l.Error(err, "failed to retrieve parser for '%s', has returned an error: %w", cmptName, err)
continue
}

exprtPorts, err := cmptParser.Ports()
componentPorts, err := componentParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
l.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
ports = append(ports, componentPorts...)
}

sort.Slice(ports, func(i, j int) bool {
Expand All @@ -118,14 +78,14 @@ func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[
return ports, nil
}

func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config)
func ConfigToPorts(logger logr.Logger, config v1beta1.Config) ([]corev1.ServicePort, error) {
ports, err := PortsForReceivers(logger, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
return nil, err
}

exporterPorts, err := ConfigToComponentPorts(logger, ComponentTypeExporter, config)
exporterPorts, err := PortsForExporters(logger, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the exporters")
return nil, err
Expand Down
74 changes: 24 additions & 50 deletions internal/manifests/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -79,12 +79,13 @@ service:

func TestExtractPortsFromConfig(t *testing.T) {
// prepare
config, err := adapters.ConfigFromString(portConfigStr)
require.NoError(t, err)
require.NotEmpty(t, config)
cfg := v1beta1.Config{}
if err := yaml.Unmarshal([]byte(portConfigStr), &cfg); err != nil {
t.Fatal(err)
}

// test
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)
ports, err := adapters.PortsForReceivers(logger, cfg)
assert.NoError(t, err)
assert.Len(t, ports, 10)

Expand All @@ -110,38 +111,6 @@ func TestExtractPortsFromConfig(t *testing.T) {
assert.ElementsMatch(t, expectedPorts, ports)
}

func TestNoPortsParsed(t *testing.T) {
for _, tt := range []struct {
expected error
desc string
configStr string
}{
{
expected: errors.New("no receivers available as part of the configuration"),
desc: "empty",
configStr: "",
},
{
expected: errors.New("receivers doesn't contain valid components"),
desc: "not a map",
configStr: "receivers: some-string",
},
} {
t.Run(tt.desc, func(t *testing.T) {
// prepare
config, err := adapters.ConfigFromString(tt.configStr)
require.NoError(t, err)

// test
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.Nil(t, ports)
assert.Equal(t, tt.expected, err)
})
}
}

func TestInvalidReceivers(t *testing.T) {
for _, tt := range []struct {
desc string
Expand All @@ -158,11 +127,12 @@ func TestInvalidReceivers(t *testing.T) {
} {
t.Run(tt.desc, func(t *testing.T) {
// prepare
config, err := adapters.ConfigFromString(tt.configStr)
require.NoError(t, err)

cfg := v1beta1.Config{}
if err := yaml.Unmarshal([]byte(tt.configStr), &cfg); err != nil {
t.Fatal(err)
}
// test
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)
ports, err := adapters.PortsForReceivers(logger, cfg)

// verify
assert.NoError(t, err)
Expand All @@ -180,25 +150,29 @@ func TestParserFailed(t *testing.T) {
return nil, errors.New("mocked error")
},
}
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
receiver.Register("mock", func(logger logr.Logger, name string, config map[string]interface{}) parser.ComponentPortParser {
return mockParser
})

config := map[interface{}]interface{}{
"receivers": map[interface{}]interface{}{
"mock": map[string]interface{}{},
cfg := v1beta1.Config{
Receivers: v1beta1.AnyConfig{
Object: map[string]interface{}{
"mock": map[string]interface{}{},
},
},
"service": map[interface{}]interface{}{
"pipelines": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
"receivers": []interface{}{"mock"},
Service: v1beta1.Service{
Pipelines: v1beta1.AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"receivers": []interface{}{"mock"},
},
},
},
},
}

// test
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)
ports, err := adapters.PortsForReceivers(logger, cfg)

// verify
assert.Len(t, ports, 0)
Expand Down
27 changes: 9 additions & 18 deletions internal/manifests/collector/adapters/config_to_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,29 @@ import (
"github.com/go-logr/logr"
rbacv1 "k8s.io/api/rbac/v1"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/processor"
)

// ConfigToRBAC parses the OpenTelemetry Collector configuration and checks what RBAC resources are needed to be created.
func ConfigToRBAC(logger logr.Logger, config map[interface{}]interface{}) []rbacv1.PolicyRule {
func ConfigToRBAC(logger logr.Logger, config v1beta1.Config) []rbacv1.PolicyRule {
var policyRules []rbacv1.PolicyRule
processorsRaw, ok := config["processors"]
if !ok {
logger.V(2).Info("no processors available as part of the configuration")
if config.Processors == nil {
return policyRules
}
enabledProcessors := getEnabledComponents(config.Service, ComponentTypeProcessor)

processors, ok := processorsRaw.(map[interface{}]interface{})
if !ok {
logger.V(2).Info("processors doesn't contain valid components")
return policyRules
}

enabledProcessors := getEnabledComponents(config, ComponentTypeProcessor)

for key, val := range processors {
if !enabledProcessors[key] {
for processorName, val := range config.Processors.Object {
if !enabledProcessors[processorName] {
continue
}

processorCfg, ok := val.(map[interface{}]interface{})
processorCfg, ok := val.(map[string]interface{})
if !ok {
logger.V(2).Info("processor doesn't seem to be a map of properties", "processor", key)
processorCfg = map[interface{}]interface{}{}
logger.V(2).Info("processor doesn't seem to be a map of properties", "processor", processorName)
processorCfg = map[string]interface{}{}
}

processorName := key.(string)
processorParser, err := processor.For(logger, processorName, processorCfg)
if err != nil {
logger.V(2).Info("no parser found for", "processor", processorName)
Expand Down
11 changes: 7 additions & 4 deletions internal/manifests/collector/adapters/config_to_rbac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
rbacv1 "k8s.io/api/rbac/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
)

func TestConfigRBAC(t *testing.T) {
Expand Down Expand Up @@ -87,12 +90,12 @@ service:

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
config, err := ConfigFromString(tt.config)
require.NoError(t, err, tt.desc)
require.NotEmpty(t, config, tt.desc)
cfg := v1beta1.Config{}
err := yaml.Unmarshal([]byte(tt.config), &cfg)
require.NoError(t, err)

// test
rules := ConfigToRBAC(logger, config)
rules := ConfigToRBAC(logger, cfg)
assert.NoError(t, err)
assert.Equal(t, tt.expectedRules, rules, tt.desc)
})
Expand Down
Loading
Loading