Skip to content

Commit 4723c17

Browse files
authored
Introduce simplified parsers (#2972)
1 parent 977a759 commit 4723c17

11 files changed

+1817
-0
lines changed

internal/components/component.go

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package components
16+
17+
import (
18+
"errors"
19+
"regexp"
20+
"strconv"
21+
"strings"
22+
23+
"github.com/go-logr/logr"
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/util/intstr"
26+
)
27+
28+
var (
29+
GrpcProtocol = "grpc"
30+
HttpProtocol = "http"
31+
UnsetPort int32 = 0
32+
PortNotFoundErr = errors.New("port should not be empty")
33+
)
34+
35+
type PortRetriever interface {
36+
GetPortNum() (int32, error)
37+
GetPortNumOrDefault(logr.Logger, int32) int32
38+
}
39+
40+
type PortBuilderOption func(*corev1.ServicePort)
41+
42+
func WithTargetPort(targetPort int32) PortBuilderOption {
43+
return func(servicePort *corev1.ServicePort) {
44+
servicePort.TargetPort = intstr.FromInt32(targetPort)
45+
}
46+
}
47+
48+
func WithAppProtocol(proto *string) PortBuilderOption {
49+
return func(servicePort *corev1.ServicePort) {
50+
servicePort.AppProtocol = proto
51+
}
52+
}
53+
54+
func WithProtocol(proto corev1.Protocol) PortBuilderOption {
55+
return func(servicePort *corev1.ServicePort) {
56+
servicePort.Protocol = proto
57+
}
58+
}
59+
60+
// ComponentType returns the type for a given component name.
61+
// components have a name like:
62+
// - mycomponent/custom
63+
// - mycomponent
64+
// we extract the "mycomponent" part and see if we have a parser for the component.
65+
func ComponentType(name string) string {
66+
if strings.Contains(name, "/") {
67+
return name[:strings.Index(name, "/")]
68+
}
69+
return name
70+
}
71+
72+
func PortFromEndpoint(endpoint string) (int32, error) {
73+
var err error
74+
var port int64
75+
76+
r := regexp.MustCompile(":[0-9]+")
77+
78+
if r.MatchString(endpoint) {
79+
portStr := r.FindString(endpoint)
80+
cleanedPortStr := strings.Replace(portStr, ":", "", -1)
81+
port, err = strconv.ParseInt(cleanedPortStr, 10, 32)
82+
83+
if err != nil {
84+
return 0, err
85+
}
86+
}
87+
88+
if port == 0 {
89+
return 0, PortNotFoundErr
90+
}
91+
92+
return int32(port), err
93+
}
94+
95+
type ComponentPortParser interface {
96+
// Ports returns the service ports parsed based on the exporter's configuration
97+
Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error)
98+
99+
// ParserType returns the name of this parser
100+
ParserType() string
101+
102+
// ParserName is an internal name for the parser
103+
ParserName() string
104+
}
105+
106+
func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.ServicePort {
107+
return corev1.ServicePort{
108+
Name: current.Name,
109+
Port: port,
110+
TargetPort: current.TargetPort,
111+
NodePort: current.NodePort,
112+
AppProtocol: current.AppProtocol,
113+
Protocol: current.Protocol,
114+
}
115+
}

internal/components/component_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package components_test
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
22+
"github.com/open-telemetry/opentelemetry-operator/internal/components"
23+
)
24+
25+
func TestComponentType(t *testing.T) {
26+
for _, tt := range []struct {
27+
desc string
28+
name string
29+
expected string
30+
}{
31+
{"regular case", "myreceiver", "myreceiver"},
32+
{"named instance", "myreceiver/custom", "myreceiver"},
33+
} {
34+
t.Run(tt.desc, func(t *testing.T) {
35+
// test and verify
36+
assert.Equal(t, tt.expected, components.ComponentType(tt.name))
37+
})
38+
}
39+
}
40+
41+
func TestReceiverParsePortFromEndpoint(t *testing.T) {
42+
for _, tt := range []struct {
43+
desc string
44+
endpoint string
45+
expected int
46+
errorExpected bool
47+
}{
48+
{"regular case", "http://localhost:1234", 1234, false},
49+
{"absolute with path", "http://localhost:1234/server-status?auto", 1234, false},
50+
{"no protocol", "0.0.0.0:1234", 1234, false},
51+
{"just port", ":1234", 1234, false},
52+
{"no port at all", "http://localhost", 0, true},
53+
{"overflow", "0.0.0.0:2147483648", 0, true},
54+
} {
55+
t.Run(tt.desc, func(t *testing.T) {
56+
// test
57+
val, err := components.PortFromEndpoint(tt.endpoint)
58+
if tt.errorExpected {
59+
assert.Error(t, err)
60+
} else {
61+
assert.NoError(t, err)
62+
}
63+
64+
assert.EqualValues(t, tt.expected, val, "wrong port from endpoint %s: %d", tt.endpoint, val)
65+
})
66+
}
67+
}

internal/components/multi_endpoint.go

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package components
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/go-logr/logr"
21+
"github.com/mitchellh/mapstructure"
22+
corev1 "k8s.io/api/core/v1"
23+
24+
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
25+
)
26+
27+
var _ ComponentPortParser = &MultiPortReceiver{}
28+
29+
// MultiProtocolEndpointConfig represents the minimal struct for a given YAML configuration input containing a map to
30+
// a struct with either endpoint or listen_address.
31+
type MultiProtocolEndpointConfig struct {
32+
Protocols map[string]*SingleEndpointConfig `mapstructure:"protocols"`
33+
}
34+
35+
// MultiPortOption allows the setting of options for a MultiPortReceiver.
36+
type MultiPortOption func(parser *MultiPortReceiver)
37+
38+
// MultiPortReceiver is a special parser for components with endpoints for each protocol.
39+
type MultiPortReceiver struct {
40+
name string
41+
42+
portMappings map[string]*corev1.ServicePort
43+
}
44+
45+
func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) {
46+
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
47+
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
48+
return nil, err
49+
}
50+
var ports []corev1.ServicePort
51+
for protocol, ec := range multiProtoEndpointCfg.Protocols {
52+
if defaultSvc, ok := m.portMappings[protocol]; ok {
53+
port := defaultSvc.Port
54+
if ec != nil {
55+
port = ec.GetPortNumOrDefault(logger, port)
56+
defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port)
57+
}
58+
ports = append(ports, ConstructServicePort(defaultSvc, port))
59+
} else {
60+
return nil, fmt.Errorf("unknown protocol set: %s", protocol)
61+
}
62+
}
63+
return ports, nil
64+
}
65+
66+
func (m *MultiPortReceiver) ParserType() string {
67+
return ComponentType(m.name)
68+
}
69+
70+
func (m *MultiPortReceiver) ParserName() string {
71+
return fmt.Sprintf("__%s", m.name)
72+
}
73+
74+
func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiver {
75+
multiReceiver := &MultiPortReceiver{
76+
name: name,
77+
portMappings: map[string]*corev1.ServicePort{},
78+
}
79+
for _, opt := range opts {
80+
opt(multiReceiver)
81+
}
82+
return multiReceiver
83+
}
84+
85+
func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption {
86+
return func(parser *MultiPortReceiver) {
87+
servicePort := &corev1.ServicePort{
88+
Name: naming.PortName(fmt.Sprintf("%s-%s", parser.name, name), port),
89+
Port: port,
90+
}
91+
for _, opt := range opts {
92+
opt(servicePort)
93+
}
94+
parser.portMappings[name] = servicePort
95+
}
96+
}

0 commit comments

Comments
 (0)