Skip to content

Commit 2080aff

Browse files
committed
operator: expose receivers by default
Signed-off-by: Benedikt Bongartz <[email protected]>
1 parent 91759b0 commit 2080aff

File tree

8 files changed

+137
-25
lines changed

8 files changed

+137
-25
lines changed

apis/v1beta1/collector_webhook.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
102102
if len(otelcol.Spec.ManagementState) == 0 {
103103
otelcol.Spec.ManagementState = ManagementStateManaged
104104
}
105-
return nil
105+
return otelcol.Spec.Config.ApplyDefaults(c.logger)
106106
}
107107

108108
func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {

apis/v1beta1/config.go

+38
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,40 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
225225
return ports, nil
226226
}
227227

228+
// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
229+
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
230+
enabledComponents := c.GetEnabledComponents()
231+
for _, componentKind := range componentKinds {
232+
var retriever components.ParserRetriever
233+
var cfg AnyConfig
234+
switch componentKind {
235+
case KindReceiver:
236+
retriever = receivers.ReceiverFor
237+
cfg = c.Receivers
238+
case KindExporter:
239+
continue
240+
case KindProcessor:
241+
continue
242+
case KindExtension:
243+
continue
244+
}
245+
for componentName := range enabledComponents[componentKind] {
246+
parser := retriever(componentName)
247+
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
248+
return err
249+
} else {
250+
cc, ok := newCfg.(map[string]interface{})
251+
if !ok {
252+
return fmt.Errorf("could not apply defaults to receiver: %s", componentName)
253+
}
254+
cfg.Object[componentName] = cc
255+
}
256+
}
257+
}
258+
259+
return nil
260+
}
261+
228262
func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
229263
return c.getPortsForComponentKinds(logger, KindReceiver)
230264
}
@@ -241,6 +275,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
241275
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
242276
}
243277

278+
func (c *Config) ApplyDefaults(logger logr.Logger) error {
279+
return c.applyDefaultForComponentKinds(logger, KindReceiver)
280+
}
281+
244282
// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
245283
// that provides the hinting for the liveness probe.
246284
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {

internal/components/builder.go

+30-16
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,18 @@ import (
2626
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])
2727

2828
type Settings[ComponentConfigType any] struct {
29-
protocol corev1.Protocol
30-
appProtocol *string
31-
targetPort intstr.IntOrString
32-
nodePort int32
33-
name string
34-
port int32
35-
portParser PortParser[ComponentConfigType]
36-
rbacGen RBACRuleGenerator[ComponentConfigType]
37-
livenessGen ProbeGenerator[ComponentConfigType]
38-
readinessGen ProbeGenerator[ComponentConfigType]
29+
protocol corev1.Protocol
30+
appProtocol *string
31+
targetPort intstr.IntOrString
32+
nodePort int32
33+
name string
34+
port int32
35+
defaultRecAddr string
36+
portParser PortParser[ComponentConfigType]
37+
rbacGen RBACRuleGenerator[ComponentConfigType]
38+
livenessGen ProbeGenerator[ComponentConfigType]
39+
readinessGen ProbeGenerator[ComponentConfigType]
40+
defaultsApplier Defaulter[ComponentConfigType]
3941
}
4042

4143
func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
@@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
7577
o.appProtocol = appProtocol
7678
})
7779
}
80+
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
81+
return append(b, func(o *Settings[ComponentConfigType]) {
82+
o.defaultRecAddr = defaultRecAddr
83+
})
84+
}
7885
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
7986
return append(b, func(o *Settings[ComponentConfigType]) {
8087
o.targetPort = intstr.FromInt32(targetPort)
@@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
118125
})
119126
}
120127

128+
func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
129+
return append(b, func(o *Settings[ComponentConfigType]) {
130+
o.defaultsApplier = defaultsApplier
131+
})
132+
}
133+
121134
func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
122135
o := NewEmptySettings[ComponentConfigType]()
123136
o.Apply(b...)
124137
if len(o.name) == 0 {
125138
return nil, fmt.Errorf("invalid settings struct, no name specified")
126139
}
127140
return &GenericParser[ComponentConfigType]{
128-
name: o.name,
129-
portParser: o.portParser,
130-
rbacGen: o.rbacGen,
131-
livenessGen: o.livenessGen,
132-
readinessGen: o.readinessGen,
133-
settings: o,
141+
name: o.name,
142+
portParser: o.portParser,
143+
rbacGen: o.rbacGen,
144+
livenessGen: o.livenessGen,
145+
readinessGen: o.readinessGen,
146+
defaultsApplier: o.defaultsApplier,
147+
settings: o,
134148
}, nil
135149
}
136150

internal/components/component.go

+7
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
4949
// It's expected that type Config is the configuration used by a parser.
5050
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)
5151

52+
// Defaulter is a function that applies given defaults to the passed Config.
53+
// It's expected that type Config is the configuration used by a parser.
54+
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, config ComponentConfigType) (ComponentConfigType, error)
55+
5256
// ComponentType returns the type for a given component name.
5357
// components have a name like:
5458
// - mycomponent/custom
@@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) {
8791
type ParserRetriever func(string) Parser
8892

8993
type Parser interface {
94+
// GetDefaultConfig .. TODO
95+
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)
96+
9097
// Ports returns the service ports parsed based on the component's configuration where name is the component's name
9198
// of the form "name" or "type/name"
9299
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)

internal/components/generic_parser.go

+40-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package components
1616

1717
import (
1818
"fmt"
19+
"strings"
1920

2021
"github.com/go-logr/logr"
2122
"github.com/mitchellh/mapstructure"
@@ -27,15 +28,48 @@ var (
2728
_ Parser = &GenericParser[SingleEndpointConfig]{}
2829
)
2930

31+
// AddressDefaulter ...
32+
func AddressDefaulter(logger logr.Logger, defaultRecAddr string, config *SingleEndpointConfig) (*SingleEndpointConfig, error) {
33+
if config.Endpoint == "" {
34+
return config, nil
35+
}
36+
37+
v := strings.Split(config.Endpoint, ":")
38+
if len(v) < 1 {
39+
return config, nil
40+
}
41+
if v[0] == "" {
42+
config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1])
43+
}
44+
return config, nil
45+
}
46+
3047
// GenericParser serves as scaffolding for custom parsing logic by isolating
3148
// functionality to idempotent functions.
3249
type GenericParser[T any] struct {
33-
name string
34-
settings *Settings[T]
35-
portParser PortParser[T]
36-
rbacGen RBACRuleGenerator[T]
37-
livenessGen ProbeGenerator[T]
38-
readinessGen ProbeGenerator[T]
50+
name string
51+
settings *Settings[T]
52+
portParser PortParser[T]
53+
rbacGen RBACRuleGenerator[T]
54+
livenessGen ProbeGenerator[T]
55+
readinessGen ProbeGenerator[T]
56+
defaultsApplier Defaulter[T]
57+
}
58+
59+
func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
60+
if g.settings == nil {
61+
return config, nil
62+
}
63+
64+
if g.settings.defaultRecAddr == "" {
65+
return config, nil
66+
}
67+
68+
var parsed T
69+
if err := mapstructure.Decode(config, &parsed); err != nil {
70+
return nil, err
71+
}
72+
return g.defaultsApplier(logger, g.settings.defaultRecAddr, parsed)
3973
}
4074

4175
func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {

internal/components/multi_endpoint.go

+17
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,23 @@ func (m *MultiPortReceiver) ParserName() string {
7272
return fmt.Sprintf("__%s", m.name)
7373
}
7474

75+
func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
76+
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
77+
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
78+
return nil, err
79+
}
80+
for protocol, ec := range multiProtoEndpointCfg.Protocols {
81+
// TODO: Should we take default receiver address from settngs?
82+
res, err := AddressDefaulter(logger, "0.0.0.0", ec)
83+
if err != nil {
84+
return nil, err
85+
}
86+
multiProtoEndpointCfg.Protocols[protocol].Endpoint = res.Endpoint
87+
}
88+
// Encode and return.
89+
return config, mapstructure.Decode(multiProtoEndpointCfg, config)
90+
91+
}
7592
func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
7693
return nil, nil
7794
}

internal/components/receivers/helpers.go

+2
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@ var (
5252
components.NewMultiPortReceiverBuilder("otlp").
5353
AddPortMapping(components.NewProtocolBuilder("grpc", 4317).
5454
WithAppProtocol(&components.GrpcProtocol).
55+
WithDefaultRecAddress("0.0.0.0").
5556
WithTargetPort(4317)).
5657
AddPortMapping(components.NewProtocolBuilder("http", 4318).
5758
WithAppProtocol(&components.HttpProtocol).
59+
WithDefaultRecAddress("0.0.0.0").
5860
WithTargetPort(4318)).
5961
MustBuild(),
6062
components.NewMultiPortReceiverBuilder("skywalking").

internal/components/single_endpoint.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently b
8080
}
8181

8282
func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
83-
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint)
83+
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint).WithDefaultsApplier(AddressDefaulter)
8484
}
8585

8686
func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
87-
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent)
87+
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter)
8888
}

0 commit comments

Comments
 (0)