Skip to content

Commit d0e552d

Browse files
authored
Merge branch 'main' into chainsaw
2 parents 6c04e74 + 822a24f commit d0e552d

11 files changed

+581
-47
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
5+
component: operator
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: "Fixed handling of protocol in exposed ports."
9+
10+
# One or more tracking issues related to the change
11+
issues: [2619]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Make distinction not only on the port number, but also on protocol. This fix allows to have multiple exposed
18+
ServicePorts with the same port number, but different protocols.
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
5+
component: operator
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: "Fixed handling of exposed port protocol in syslog, tcplog and udplog receivers."
9+
10+
# One or more tracking issues related to the change
11+
issues: [767,2619]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Please note that the operator currently exposes just one port (tcp or udp) of syslog receiver due to the current
18+
receiver implementation (patches are welcome).

internal/manifests/collector/parser/receiver/receiver.go

+2-20
Original file line numberDiff line numberDiff line change
@@ -104,30 +104,12 @@ func isScraperReceiver(name string) bool {
104104

105105
func singlePortFromConfigEndpoint(logger logr.Logger, name string, config map[interface{}]interface{}) *v1.ServicePort {
106106
var endpoint interface{}
107+
var receiverType = receiverType(name)
107108
switch {
108-
// syslog receiver contains the endpoint
109-
// that needs to be exposed one level down inside config
110-
// i.e. either in tcp or udp section with field key
111-
// as `listen_address`
112-
case name == "syslog":
113-
var c map[interface{}]interface{}
114-
if udp, isUDP := config["udp"]; isUDP && udp != nil {
115-
c = udp.(map[interface{}]interface{})
116-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, c)
117-
} else if tcp, isTCP := config["tcp"]; isTCP && tcp != nil {
118-
c = tcp.(map[interface{}]interface{})
119-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, c)
120-
}
121-
122-
// tcplog and udplog receivers hold the endpoint
123-
// value in `listen_address` field
124-
case name == "tcplog" || name == "udplog":
125-
endpoint = getAddressFromConfig(logger, name, listenAddressKey, config)
126-
127109
// ignore the receiver as it holds the field key endpoint, and it
128110
// is a scraper, we only expose endpoint through k8s service objects for
129111
// receivers that aren't scrapers.
130-
case isScraperReceiver(name):
112+
case isScraperReceiver(receiverType):
131113
return nil
132114

133115
default:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/go-logr/logr"
21+
corev1 "k8s.io/api/core/v1"
22+
23+
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
24+
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
25+
)
26+
27+
var _ parser.ComponentPortParser = &SyslogReceiverParser{}
28+
29+
const parserNameSyslog = "__syslog"
30+
31+
// SyslogReceiverParser parses the configuration for TCP log receivers.
32+
type SyslogReceiverParser struct {
33+
config map[interface{}]interface{}
34+
logger logr.Logger
35+
name string
36+
}
37+
38+
// NewSyslogReceiverParser builds a new parser for TCP log receivers.
39+
func NewSyslogReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
40+
return &SyslogReceiverParser{
41+
logger: logger,
42+
name: name,
43+
config: config,
44+
}
45+
}
46+
47+
func (o *SyslogReceiverParser) Ports() ([]corev1.ServicePort, error) {
48+
var endpoint interface{}
49+
var endpointName string
50+
var protocol corev1.Protocol
51+
var c map[interface{}]interface{}
52+
53+
// syslog receiver contains the endpoint
54+
// that needs to be exposed one level down inside config
55+
// i.e. either in tcp or udp section with field key
56+
// as `listen_address`
57+
if tcp, isTCP := o.config["tcp"]; isTCP && tcp != nil {
58+
c = tcp.(map[interface{}]interface{})
59+
endpointName = "tcp"
60+
endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, c)
61+
protocol = corev1.ProtocolTCP
62+
} else if udp, isUDP := o.config["udp"]; isUDP && udp != nil {
63+
c = udp.(map[interface{}]interface{})
64+
endpointName = "udp"
65+
endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, c)
66+
protocol = corev1.ProtocolUDP
67+
}
68+
69+
switch e := endpoint.(type) {
70+
case nil:
71+
break
72+
case string:
73+
port, err := portFromEndpoint(e)
74+
if err != nil {
75+
o.logger.WithValues(listenAddressKey, e).Error(err, fmt.Sprintf("couldn't parse the %s endpoint's port", endpointName))
76+
return nil, nil
77+
}
78+
79+
return []corev1.ServicePort{{
80+
Port: port,
81+
Name: naming.PortName(o.name, port),
82+
Protocol: protocol,
83+
}}, nil
84+
default:
85+
o.logger.WithValues(listenAddressKey, endpoint).Error(fmt.Errorf("unrecognized type %T of %s endpoint", endpoint, endpointName),
86+
"receiver's endpoint isn't a string")
87+
}
88+
89+
return []corev1.ServicePort{}, nil
90+
}
91+
92+
func (o *SyslogReceiverParser) ParserName() string {
93+
return parserNameSyslog
94+
}
95+
96+
func init() {
97+
Register("syslog", NewSyslogReceiverParser)
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
corev1 "k8s.io/api/core/v1"
22+
)
23+
24+
func TestSyslogSelfRegisters(t *testing.T) {
25+
// verify
26+
assert.True(t, IsRegistered("syslog"))
27+
}
28+
29+
func TestSyslogIsFoundByName(t *testing.T) {
30+
// test
31+
p, err := For(logger, "syslog", map[interface{}]interface{}{})
32+
assert.NoError(t, err)
33+
34+
// verify
35+
assert.Equal(t, "__syslog", p.ParserName())
36+
}
37+
38+
func TestSyslogConfiguration(t *testing.T) {
39+
for _, tt := range []struct {
40+
desc string
41+
config map[interface{}]interface{}
42+
expected []corev1.ServicePort
43+
}{
44+
{"Empty configuration", map[interface{}]interface{}{}, []corev1.ServicePort{}},
45+
{"UDP port configuration",
46+
map[interface{}]interface{}{"udp": map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"}},
47+
[]corev1.ServicePort{{Name: "syslog", Port: 1234, Protocol: corev1.ProtocolUDP}}},
48+
{"TCP port configuration",
49+
map[interface{}]interface{}{"tcp": map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"}},
50+
[]corev1.ServicePort{{Name: "syslog", Port: 1234, Protocol: corev1.ProtocolTCP}}},
51+
} {
52+
t.Run(tt.desc, func(t *testing.T) {
53+
// prepare
54+
builder := NewSyslogReceiverParser(logger, "syslog", tt.config)
55+
56+
// test
57+
ports, err := builder.Ports()
58+
59+
// verify
60+
assert.NoError(t, err)
61+
assert.Equal(t, ports, tt.expected)
62+
})
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/go-logr/logr"
21+
corev1 "k8s.io/api/core/v1"
22+
23+
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
24+
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
25+
)
26+
27+
var _ parser.ComponentPortParser = &TcpLogReceiverParser{}
28+
29+
const parserNameTcpLog = "__tcplog"
30+
31+
// TcpLogReceiverParser parses the configuration for TCP log receivers.
32+
type TcpLogReceiverParser struct {
33+
config map[interface{}]interface{}
34+
logger logr.Logger
35+
name string
36+
}
37+
38+
// NewTcpLogReceiverParser builds a new parser for TCP log receivers.
39+
func NewTcpLogReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
40+
return &TcpLogReceiverParser{
41+
logger: logger,
42+
name: name,
43+
config: config,
44+
}
45+
}
46+
47+
func (o *TcpLogReceiverParser) Ports() ([]corev1.ServicePort, error) {
48+
// tcplog receiver hold the endpoint value in `listen_address` field
49+
var endpoint = getAddressFromConfig(o.logger, o.name, listenAddressKey, o.config)
50+
51+
switch e := endpoint.(type) {
52+
case nil:
53+
break
54+
case string:
55+
port, err := portFromEndpoint(e)
56+
if err != nil {
57+
o.logger.WithValues(listenAddressKey, e).Error(err, "couldn't parse the endpoint's port")
58+
return nil, nil
59+
}
60+
61+
return []corev1.ServicePort{{
62+
Port: port,
63+
Name: naming.PortName(o.name, port),
64+
Protocol: corev1.ProtocolTCP,
65+
}}, nil
66+
default:
67+
o.logger.WithValues(listenAddressKey, endpoint).Error(fmt.Errorf("unrecognized type %T", endpoint), "receiver's endpoint isn't a string")
68+
}
69+
70+
return []corev1.ServicePort{}, nil
71+
}
72+
73+
func (o *TcpLogReceiverParser) ParserName() string {
74+
return parserNameTcpLog
75+
}
76+
77+
func init() {
78+
Register("tcplog", NewTcpLogReceiverParser)
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package receiver
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
corev1 "k8s.io/api/core/v1"
22+
)
23+
24+
func TestTcpLogSelfRegisters(t *testing.T) {
25+
// verify
26+
assert.True(t, IsRegistered("tcplog"))
27+
}
28+
29+
func TestTcpLogIsFoundByName(t *testing.T) {
30+
// test
31+
p, err := For(logger, "tcplog", map[interface{}]interface{}{})
32+
assert.NoError(t, err)
33+
34+
// verify
35+
assert.Equal(t, "__tcplog", p.ParserName())
36+
}
37+
38+
func TestTcpLogConfiguration(t *testing.T) {
39+
for _, tt := range []struct {
40+
desc string
41+
config map[interface{}]interface{}
42+
expected []corev1.ServicePort
43+
}{
44+
{"Empty configuration", map[interface{}]interface{}{}, []corev1.ServicePort{}},
45+
{"TCP port configuration",
46+
map[interface{}]interface{}{"listen_address": "0.0.0.0:1234"},
47+
[]corev1.ServicePort{{Name: "tcplog", Port: 1234, Protocol: corev1.ProtocolTCP}}},
48+
} {
49+
t.Run(tt.desc, func(t *testing.T) {
50+
// prepare
51+
builder := NewTcpLogReceiverParser(logger, "tcplog", tt.config)
52+
53+
// test
54+
ports, err := builder.Ports()
55+
56+
// verify
57+
assert.NoError(t, err)
58+
assert.Equal(t, ports, tt.expected)
59+
})
60+
}
61+
}

0 commit comments

Comments
 (0)