Skip to content

Commit 822a24f

Browse files
authored
Added protocol handling for ServicePort and fixed type/value naming inside generic receiver (#2619)
* Fix handling of type The name is not correctly split before it is checked. Split it first. Signed-off-by: Oldřich Jedlička <[email protected]> * Split parsing of Syslog, TCP Log and UDP Log parsers The generic implementation does not allow to specify different endpoints and endpoint-specific protocols, so split the implementation instead of trying to make the generic implementation a complex catch-all-party. Signed-off-by: Oldřich Jedlička <[email protected]> * Handle port numbers together with protocols It is legal to have the same protocol number and different protocol, so handle it accordingly. Also interpret empty protocol as TCP, like the K8s does. Signed-off-by: Oldřich Jedlička <[email protected]> * Changelog updates Signed-off-by: Oldřich Jedlička <[email protected]> --------- Signed-off-by: Oldřich Jedlička <[email protected]>
1 parent c10fe8a commit 822a24f

11 files changed

+581
-47
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
5+
component: operator
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: "Fixed handling of protocol in exposed ports."
9+
10+
# One or more tracking issues related to the change
11+
issues: [2619]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Make distinction not only on the port number, but also on protocol. This fix allows to have multiple exposed
18+
ServicePorts with the same port number, but different protocols.
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
5+
component: operator
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: "Fixed handling of exposed port protocol in syslog, tcplog and udplog receivers."
9+
10+
# One or more tracking issues related to the change
11+
issues: [767,2619]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Please note that the operator currently exposes just one port (tcp or udp) of syslog receiver due to the current
18+
receiver implementation (patches are welcome).

internal/manifests/collector/parser/receiver/receiver.go

+2-20
Original file line numberDiff line numberDiff line change
@@ -104,30 +104,12 @@ func isScraperReceiver(name string) bool {
104104

105105
func singlePortFromConfigEndpoint(logger logr.Logger, name string, config map[interface{}]interface{}) *v1.ServicePort {
106106
var endpoint interface{}
107+
var receiverType = receiverType(name)
107108
switch {
108-
// syslog receiver contains the endpoint
109-
// that needs to be exposed one level down inside config
110-
// i.e. either in tcp or udp section with field key
111-
// as `listen_address`
112-
case name == "syslog":
113-
var c map[interface{}]interface{}
114-
if udp, isUDP := config["udp"]; isUDP && udp != nil {
115-
c = udp.(map[interface{}]interface{})
116-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, c)
117-
} else if tcp, isTCP := config["tcp"]; isTCP && tcp != nil {
118-
c = tcp.(map[interface{}]interface{})
119-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, c)
120-
}
121-
122-
// tcplog and udplog receivers hold the endpoint
123-
// value in `listen_address` field
124-
case name == "tcplog" || name == "udplog":
125-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, config)
126-
127109
// ignore the receiver as it holds the field key endpoint, and it
128110
// is a scraper, we only expose endpoint through k8s service objects for
129111
// receivers that aren't scrapers.
130-
case isScraperReceiver(name):
112+
case isScraperReceiver(receiverType):
131113
return nil
132114

133115
default:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/go-logr/logr"
21+
corev1 "k8s.io/api/core/v1"
22+
23+
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
24+
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
25+
)
26+
27+
var _ parser.ComponentPortParser = &SyslogReceiverParser{}
28+
29+
const parserNameSyslog = "__syslog"
30+
31+
// SyslogReceiverParser parses the configuration for TCP log receivers.
32+
type SyslogReceiverParser struct {
33+
config map[interface{}]interface{}
34+
logger logr.Logger
35+
name string
36+
}
37+
38+
// NewSyslogReceiverParser builds a new parser for TCP log receivers.
39+
func NewSyslogReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
40+
return &SyslogReceiverParser{
41+
logger: logger,
42+
name: name,
43+
config: config,
44+
}
45+
}
46+
47+
func (o *SyslogReceiverParser) Ports() ([]corev1.ServicePort, error) {
48+
var endpoint interface{}
49+
var endpointName string
50+
var protocol corev1.Protocol
51+
var c map[interface{}]interface{}
52+
53+
// syslog receiver contains the endpoint
54+
// that needs to be exposed one level down inside config
55+
// i.e. either in tcp or udp section with field key
56+
// as `listen_address`
57+
if tcp, isTCP := o.config["tcp"]; isTCP && tcp != nil {
58+
c = tcp.(map[interface{}]interface{})
59+
endpointName = "tcp"
60+
endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, c)
61+
protocol = corev1.ProtocolTCP
62+
} else if udp, isUDP := o.config["udp"]; isUDP && udp != nil {
63+
c = udp.(map[interface{}]interface{})
64+
endpointName = "udp"
65+
endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, c)
66+
protocol = corev1.ProtocolUDP
67+
}
68+
69+
switch e := endpoint.(type) {
70+
case nil:
71+
break
72+
case string:
73+
port, err := portFromEndpoint(e)
74+
if err != nil {
75+
o.logger.WithValues(listenAddressKey, e).Error(err, fmt.Sprintf("couldn't parse the %s endpoint's port", endpointName))
76+
return nil, nil
77+
}
78+
79+
return []corev1.ServicePort{{
80+
Port: port,
81+
Name: naming.PortName(o.name, port),
82+
Protocol: protocol,
83+
}}, nil
84+
default:
85+
o.logger.WithValues(listenAddressKey, endpoint).Error(fmt.Errorf("unrecognized type %T of %s endpoint", endpoint, endpointName),
86+
"receiver's endpoint isn't a string")
87+
}
88+
89+
return []corev1.ServicePort{}, nil
90+
}
91+
92+
func (o *SyslogReceiverParser) ParserName() string {
93+
return parserNameSyslog
94+
}
95+
96+
func init() {
97+
Register("syslog", NewSyslogReceiverParser)
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
corev1 "k8s.io/api/core/v1"
22+
)
23+
24+
func TestSyslogSelfRegisters(t *testing.T) {
25+
// verify
26+
assert.True(t, IsRegistered("syslog"))
27+
}
28+
29+
func TestSyslogIsFoundByName(t *testing.T) {
30+
// test
31+
p, err := For(logger, "syslog", map[interface{}]interface{}{})
32+
assert.NoError(t, err)
33+
34+
// verify
35+
assert.Equal(t, "__syslog", p.ParserName())
36+
}
37+
38+
func TestSyslogConfiguration(t *testing.T) {
39+
for _, tt := range []struct {
40+
desc string
41+
config map[interface{}]interface{}
42+
expected []corev1.ServicePort
43+
}{
44+
{"Empty configuration", map[interface{}]interface{}{}, []corev1.ServicePort{}},
45+
{"UDP port configuration",
46+
map[interface{}]interface{}{"udp": map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"}},
47+
[]corev1.ServicePort{{Name: "syslog", Port: 1234, Protocol: corev1.ProtocolUDP}}},
48+
{"TCP port configuration",
49+
map[interface{}]interface{}{"tcp": map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"}},
50+
[]corev1.ServicePort{{Name: "syslog", Port: 1234, Protocol: corev1.ProtocolTCP}}},
51+
} {
52+
t.Run(tt.desc, func(t *testing.T) {
53+
// prepare
54+
builder := NewSyslogReceiverParser(logger, "syslog", tt.config)
55+
56+
// test
57+
ports, err := builder.Ports()
58+
59+
// verify
60+
assert.NoError(t, err)
61+
assert.Equal(t, ports, tt.expected)
62+
})
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/go-logr/logr"
21+
corev1 "k8s.io/api/core/v1"
22+
23+
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
24+
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
25+
)
26+
27+
var _ parser.ComponentPortParser = &TcpLogReceiverParser{}
28+
29+
const parserNameTcpLog = "__tcplog"
30+
31+
// TcpLogReceiverParser parses the configuration for TCP log receivers.
32+
type TcpLogReceiverParser struct {
33+
config map[interface{}]interface{}
34+
logger logr.Logger
35+
name string
36+
}
37+
38+
// NewTcpLogReceiverParser builds a new parser for TCP log receivers.
39+
func NewTcpLogReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
40+
return &TcpLogReceiverParser{
41+
logger: logger,
42+
name: name,
43+
config: config,
44+
}
45+
}
46+
47+
func (o *TcpLogReceiverParser) Ports() ([]corev1.ServicePort, error) {
48+
// tcplog receiver hold the endpoint value in `listen_address` field
49+
var endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, o.config)
50+
51+
switch e := endpoint.(type) {
52+
case nil:
53+
break
54+
case string:
55+
port, err := portFromEndpoint(e)
56+
if err != nil {
57+
o.logger.WithValues(listenAddressKey, e).Error(err, "couldn't parse the endpoint's port")
58+
return nil, nil
59+
}
60+
61+
return []corev1.ServicePort{{
62+
Port: port,
63+
Name: naming.PortName(o.name, port),
64+
Protocol: corev1.ProtocolTCP,
65+
}}, nil
66+
default:
67+
o.logger.WithValues(listenAddressKey, endpoint).Error(fmt.Errorf("unrecognized type %T", endpoint), "receiver's endpoint isn't a string")
68+
}
69+
70+
return []corev1.ServicePort{}, nil
71+
}
72+
73+
func (o *TcpLogReceiverParser) ParserName() string {
74+
return parserNameTcpLog
75+
}
76+
77+
func init() {
78+
Register("tcplog", NewTcpLogReceiverParser)
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
corev1 "k8s.io/api/core/v1"
22+
)
23+
24+
func TestTcpLogSelfRegisters(t *testing.T) {
25+
// verify
26+
assert.True(t, IsRegistered("tcplog"))
27+
}
28+
29+
func TestTcpLogIsFoundByName(t *testing.T) {
30+
// test
31+
p, err := For(logger, "tcplog", map[interface{}]interface{}{})
32+
assert.NoError(t, err)
33+
34+
// verify
35+
assert.Equal(t, "__tcplog", p.ParserName())
36+
}
37+
38+
func TestTcpLogConfiguration(t *testing.T) {
39+
for _, tt := range []struct {
40+
desc string
41+
config map[interface{}]interface{}
42+
expected []corev1.ServicePort
43+
}{
44+
{"Empty configuration", map[interface{}]interface{}{}, []corev1.ServicePort{}},
45+
{"TCP port configuration",
46+
map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"},
47+
[]corev1.ServicePort{{Name: "tcplog", Port: 1234, Protocol: corev1.ProtocolTCP}}},
48+
} {
49+
t.Run(tt.desc, func(t *testing.T) {
50+
// prepare
51+
builder := NewTcpLogReceiverParser(logger, "tcplog", tt.config)
52+
53+
// test
54+
ports, err := builder.Ports()
55+
56+
// verify
57+
assert.NoError(t, err)
58+
assert.Equal(t, ports, tt.expected)
59+
})
60+
}
61+
}

0 commit comments

Comments
 (0)