Skip to content

Commit 0f3a039

Browse files
committed
feat: Add tls support on port 10248
1 parent d8c0de1 commit 0f3a039

File tree

8 files changed

+177
-36
lines changed

8 files changed

+177
-36
lines changed

control_plane.go

+48-22
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"os"
2020
"sync"
2121

22+
"github.com/alibaba/sentinel-golang/util"
23+
2224
"github.com/opensergo/opensergo-control-plane/pkg/controller"
2325
"github.com/opensergo/opensergo-control-plane/pkg/model"
2426
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
@@ -27,12 +29,14 @@ import (
2729
)
2830

2931
type ControlPlane struct {
30-
operator *controller.KubernetesOperator
31-
server *transport.Server
32+
operator *controller.KubernetesOperator
33+
server *transport.Server
34+
secureServer *transport.Server
3235

3336
protoDesc *trpb.ControlPlaneDesc
3437

3538
mux sync.RWMutex
39+
ch chan error
3640
}
3741

3842
func NewControlPlane() (*ControlPlane, error) {
@@ -44,6 +48,8 @@ func NewControlPlane() (*ControlPlane, error) {
4448
}
4549

4650
cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
51+
// On port 10248, it can use tls transport
52+
cp.secureServer = transport.NewSecureServer(uint32(10248), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
4753
cp.operator = operator
4854

4955
hostname, herr := os.Hostname()
@@ -62,20 +68,45 @@ func (c *ControlPlane) Start() error {
6268
if err != nil {
6369
return err
6470
}
65-
// Run the transport server
66-
err = c.server.Run()
67-
if err != nil {
68-
return err
69-
}
7071

71-
return nil
72+
go util.RunWithRecover(func() {
73+
// Run the transport server
74+
log.Println("Starting grpc server on port 10246!")
75+
err = c.server.Run()
76+
if err != nil {
77+
c.ch <- err
78+
log.Fatal("Failed to run the grpc server")
79+
}
80+
})
81+
82+
go util.RunWithRecover(func() {
83+
// Run the secure transport server
84+
log.Println("Starting secure grpc server on port 10248!")
85+
err = c.secureServer.Run()
86+
if err != nil {
87+
c.ch <- err
88+
log.Fatal("Failed to run the secure grpc server")
89+
}
90+
})
91+
err = <-c.ch
92+
return err
7293
}
7394

74-
func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
75-
connections, exists := c.server.ConnectionManager().Get(namespace, app, kind)
95+
func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, isSecure bool) error {
96+
var connections []*transport.Connection
97+
var exists bool
98+
if isSecure {
99+
connections, exists = c.secureServer.ConnectionManager().Get(namespace, app, kind)
100+
} else {
101+
connections, exists = c.server.ConnectionManager().Get(namespace, app, kind)
102+
}
76103
if !exists || connections == nil {
77104
return errors.New("There is no connection for this kind")
78105
}
106+
return c.innerSendMessage(namespace, app, kind, dataWithVersion, status, respId, connections)
107+
}
108+
109+
func (c *ControlPlane) innerSendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, connections []*transport.Connection) error {
79110
for _, connection := range connections {
80111
if connection == nil || !connection.IsValid() {
81112
// TODO: log.Debug
@@ -106,22 +137,13 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
106137
})
107138
}
108139

109-
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
110-
//var labels []model.LabelKV
111-
//if request.Target.Labels != nil {
112-
// for _, label := range request.Target.Labels {
113-
// labels = append(labels, model.LabelKV{
114-
// Key: label.Key,
115-
// Value: label.Value,
116-
// })
117-
// }
118-
//}
140+
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream, isSecure bool) error {
119141
for _, kind := range request.Target.Kinds {
120142
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{
121143
Namespace: request.Target.Namespace,
122144
AppName: request.Target.App,
123145
Kind: kind,
124-
})
146+
}, isSecure)
125147
if err != nil {
126148
status := &trpb.Status{
127149
Code: transport.RegisterWatcherError,
@@ -135,7 +157,11 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
135157
}
136158
continue
137159
}
138-
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
160+
if isSecure {
161+
_ = c.secureServer.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
162+
} else {
163+
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
164+
}
139165
// watcher缓存不空就发送
140166
rules, version := crdWatcher.GetRules(model.NamespacedApp{
141167
Namespace: request.Target.Namespace,

pkg/cert/cert_manager.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cert
15+
16+
import "crypto/tls"
17+
18+
var provider CertProvider
19+
20+
// Use EnvCertProvider by default
21+
func init() {
22+
provider = &EnvCertProvider{
23+
certEnvKey: "OPENSERGO_10248_CERT",
24+
pkEnvKey: "OPENSERGO_10248_KEY",
25+
}
26+
}
27+
28+
func GetCertificate(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
29+
return provider.GetCert(info)
30+
}

pkg/cert/cert_provider.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cert
15+
16+
import (
17+
"crypto/tls"
18+
"os"
19+
"strings"
20+
"sync"
21+
22+
"github.com/pkg/errors"
23+
)
24+
25+
var certMU sync.RWMutex
26+
27+
type CertProvider interface {
28+
GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error)
29+
}
30+
31+
// EnvCertProvider reads cert and secret from ENV
32+
type EnvCertProvider struct {
33+
certEnvKey string
34+
pkEnvKey string
35+
}
36+
37+
func (e *EnvCertProvider) GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
38+
certMU.Lock()
39+
defer certMU.Unlock()
40+
c, s := os.Getenv(e.certEnvKey), os.Getenv(e.pkEnvKey)
41+
if c == "" || s == "" {
42+
return nil, errors.New("Read empty certificate or secret from env")
43+
}
44+
// In environment variable, the \n is replaced by whitespace character
45+
// So we need to replace the whitespace with \n character in the first and last line of PEM file
46+
// If not, the X509KeyPair func can not recognize the string from environment variable
47+
c, s = e.polishCert(c, s)
48+
keyPair, err := tls.X509KeyPair([]byte(c), []byte(s))
49+
return &keyPair, err
50+
}
51+
52+
func (e *EnvCertProvider) polishCert(c, s string) (string, string) {
53+
c = strings.Replace(c, "----- ", "-----\n", -1)
54+
c = strings.Replace(c, " -----", "\n-----", -1)
55+
s = strings.Replace(s, "----- ", "-----\n", -1)
56+
s = strings.Replace(s, " -----", "\n-----", -1)
57+
return c, s
58+
}

pkg/controller/crd_watcher.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,12 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
193193
Details: nil,
194194
}
195195
dataWithVersion := &trpb.DataWithVersion{Data: rules, Version: version}
196-
err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "")
197-
if err != nil {
196+
err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "", false)
197+
errSecure := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "", true)
198+
if errSecure != nil && err != nil {
198199
log.Error(err, "Failed to send rules", "kind", r.kind)
199200
}
201+
200202
return ctrl.Result{}, nil
201203
}
202204

@@ -326,7 +328,7 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro
326328

327329
}
328330

329-
func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler) *CRDWatcher {
331+
func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, isSecure bool) *CRDWatcher {
330332
return &CRDWatcher{
331333
kind: kind,
332334
Client: crdManager.GetClient(),

pkg/controller/k8s_operator.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern
107107
return k, nil
108108
}
109109

110-
func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget) error {
111-
_, err := k.RegisterWatcher(info)
110+
func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget, isSecure bool) error {
111+
_, err := k.RegisterWatcher(info, isSecure)
112112
if err != nil {
113113
return err
114114
}
@@ -117,7 +117,7 @@ func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTar
117117

118118
// RegisterWatcher registers given CRD type and CRD name.
119119
// For each CRD type, it can be registered only once.
120-
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRDWatcher, error) {
120+
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget, isSecure bool) (*CRDWatcher, error) {
121121
k.controllerMux.Lock()
122122
defer k.controllerMux.Unlock()
123123

@@ -141,7 +141,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
141141
return nil, errors.New("CRD not supported: " + target.Kind)
142142
}
143143
// This kind of CRD has never been watched.
144-
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler)
144+
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, isSecure)
145145
err = crdWatcher.AddSubscribeTarget(target)
146146
if err != nil {
147147
return nil, err
@@ -156,7 +156,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
156156
return k.controllers[target.Kind], nil
157157
}
158158

159-
func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
159+
func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget, isSecure bool) error {
160160
k.controllerMux.Lock()
161161
defer k.controllerMux.Unlock()
162162

@@ -174,7 +174,7 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
174174
if !crdSupports {
175175
return errors.New("CRD not supported: " + target.Kind)
176176
}
177-
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler)
177+
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, isSecure)
178178
err = crdWatcher.AddSubscribeTarget(target)
179179
if err != nil {
180180
return err

pkg/main/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ func main() {
2929
if err != nil {
3030
log.Fatal(err)
3131
}
32+
3233
}

pkg/model/model.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ type ClientIdentifier string
2828

2929
type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_SubscribeConfigServer
3030

31-
type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error
31+
type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream, bool) error
3232

33-
type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error
33+
type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, isSecure bool) error

pkg/transport/grpc/server.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
package grpc
1616

1717
import (
18+
"crypto/tls"
1819
"fmt"
1920
"io"
2021
"log"
2122
"net"
2223

24+
"github.com/opensergo/opensergo-control-plane/pkg/cert"
25+
2326
"github.com/opensergo/opensergo-control-plane/pkg/model"
2427
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
2528
"github.com/opensergo/opensergo-control-plane/pkg/util"
2629
"go.uber.org/atomic"
2730
"google.golang.org/grpc"
31+
"google.golang.org/grpc/credentials"
2832
)
2933

3034
const (
@@ -42,10 +46,27 @@ type Server struct {
4246
started *atomic.Bool
4347
}
4448

49+
func NewSecureServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
50+
connectionManager := NewConnectionManager()
51+
cfg := &tls.Config{
52+
GetCertificate: cert.GetCertificate,
53+
ClientAuth: tls.VerifyClientCertIfGiven,
54+
MinVersion: tls.VersionTLS12,
55+
}
56+
tlsCreds := credentials.NewTLS(cfg)
57+
return &Server{
58+
transportServer: newTransportServer(connectionManager, subscribeHandlers, true),
59+
port: port,
60+
grpcServer: grpc.NewServer(grpc.Creds(tlsCreds)),
61+
started: atomic.NewBool(false),
62+
connectionManager: connectionManager,
63+
}
64+
}
65+
4566
func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
4667
connectionManager := NewConnectionManager()
4768
return &Server{
48-
transportServer: newTransportServer(connectionManager, subscribeHandlers),
69+
transportServer: newTransportServer(connectionManager, subscribeHandlers, false),
4970
port: port,
5071
grpcServer: grpc.NewServer(),
5172
started: atomic.NewBool(false),
@@ -84,6 +105,8 @@ type TransportServer struct {
84105
connectionManager *ConnectionManager
85106

86107
subscribeHandlers []model.SubscribeRequestHandler
108+
// whether the server use tls
109+
isSecure bool
87110
}
88111

89112
const (
@@ -144,7 +167,7 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
144167
}
145168

146169
for _, handler := range s.subscribeHandlers {
147-
err = handler(clientIdentifier, recvData, stream)
170+
err = handler(clientIdentifier, recvData, stream, s.isSecure)
148171
if err != nil {
149172
// TODO: handle error
150173
log.Printf("Failed to handle SubscribeRequest, err=%s\n", err.Error())
@@ -155,9 +178,10 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
155178
}
156179
}
157180

158-
func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler) *TransportServer {
181+
func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler, isSecure bool) *TransportServer {
159182
return &TransportServer{
160183
connectionManager: connectionManager,
161184
subscribeHandlers: subscribeHandlers,
185+
isSecure: isSecure,
162186
}
163187
}

0 commit comments

Comments
 (0)