Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add tls support on port 10248 #56

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions control_plane.go
Original file line number Diff line number Diff line change
@@ -19,20 +19,23 @@ import (
"os"
"sync"

"github.com/alibaba/sentinel-golang/util"

"github.com/opensergo/opensergo-control-plane/pkg/controller"
"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
"github.com/pkg/errors"
)

type ControlPlane struct {
operator *controller.KubernetesOperator
server *transport.Server
operator *controller.KubernetesOperator
server *transport.Server
secureServer *transport.Server

protoDesc *trpb.ControlPlaneDesc

mux sync.RWMutex
ch chan error
}

func NewControlPlane() (*ControlPlane, error) {
@@ -44,6 +47,8 @@ func NewControlPlane() (*ControlPlane, error) {
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
// On port 10248, it can use tls transport
cp.secureServer = transport.NewSecureServer(uint32(10248), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
cp.operator = operator

hostname, herr := os.Hostname()
@@ -62,20 +67,49 @@ func (c *ControlPlane) Start() error {
if err != nil {
return err
}
// Run the transport server
err = c.server.Run()
if err != nil {
return err
}

return nil
go util.RunWithRecover(func() {
// Run the transport server
log.Println("Starting grpc server on port 10246!")
err = c.server.Run()
if err != nil {
c.ch <- err
log.Fatal("Failed to run the grpc server")
}
})

go util.RunWithRecover(func() {
// Run the secure transport server
log.Println("Starting secure grpc server on port 10248!")
err = c.secureServer.Run()
if err != nil {
c.ch <- err
log.Fatal("Failed to run the secure grpc server")
}
})
err = <-c.ch
return err
}

func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
connections, exists := c.server.ConnectionManager().Get(namespace, app, kind)
var connections []*transport.Connection
var exists bool
scs, exists := c.secureServer.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
return errors.New("There is no connection for this kind")
log.Printf("There is no secure connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections = append(connections, scs...)
}
cs, exists := c.server.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
log.Printf("There is no connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections = append(connections, cs...)
}
return c.innerSendMessage(namespace, app, kind, dataWithVersion, status, respId, connections)
}

func (c *ControlPlane) innerSendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, connections []*transport.Connection) error {
for _, connection := range connections {
if connection == nil || !connection.IsValid() {
// TODO: log.Debug
@@ -106,22 +140,13 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
// var labels []model.LabelKV
// if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
// labels = append(labels, model.LabelKV{
// Key: label.Key,
// Value: label.Value,
// })
// }
// }
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream, isSecure bool) error {
for _, kind := range request.Target.Kinds {
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{
Namespace: request.Target.Namespace,
AppName: request.Target.App,
Kind: kind,
})
}, isSecure)
if err != nil {
status := &trpb.Status{
Code: transport.RegisterWatcherError,
@@ -135,8 +160,13 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
continue
}
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
// send if the watcher cache is not empty

if isSecure {
_ = c.secureServer.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
} else {
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
}

rules, version := crdWatcher.GetRules(model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
30 changes: 30 additions & 0 deletions pkg/cert/cert_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cert

import "crypto/tls"

var provider CertProvider

// Use EnvCertProvider by default
func init() {
provider = &EnvCertProvider{
certEnvKey: "OPENSERGO_10248_CERT",
pkEnvKey: "OPENSERGO_10248_KEY",
}
}

func GetCertificate(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
return provider.GetCert(info)
}
58 changes: 58 additions & 0 deletions pkg/cert/cert_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cert

import (
"crypto/tls"
"os"
"strings"
"sync"

"github.com/pkg/errors"
)

var certMU sync.RWMutex

type CertProvider interface {
GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error)
}

// EnvCertProvider reads cert and secret from ENV
type EnvCertProvider struct {
certEnvKey string
pkEnvKey string
}

func (e *EnvCertProvider) GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
certMU.Lock()
defer certMU.Unlock()
c, s := os.Getenv(e.certEnvKey), os.Getenv(e.pkEnvKey)
if c == "" || s == "" {
return nil, errors.New("Read empty certificate or secret from env")
}
// In environment variable, the \n is replaced by whitespace character
// So we need to replace the whitespace with \n character in the first and last line of PEM file
// If not, the X509KeyPair func can not recognize the string from environment variable
c, s = e.polishCert(c, s)
keyPair, err := tls.X509KeyPair([]byte(c), []byte(s))
return &keyPair, err
}

func (e *EnvCertProvider) polishCert(c, s string) (string, string) {
c = strings.Replace(c, "----- ", "-----\n", -1)
c = strings.Replace(c, " -----", "\n-----", -1)
s = strings.Replace(s, "----- ", "-----\n", -1)
s = strings.Replace(s, " -----", "\n-----", -1)
return c, s
}
1 change: 1 addition & 0 deletions pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
@@ -196,6 +196,7 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err != nil {
logger.Error(err, "Failed to send rules", "kind", r.kind)
}

return ctrl.Result{}, nil
}

8 changes: 4 additions & 4 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
@@ -111,8 +111,8 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern
return k, nil
}

func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget) error {
_, err := k.RegisterWatcher(info)
func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget, isSecure bool) error {
_, err := k.RegisterWatcher(info, isSecure)
if err != nil {
return err
}
@@ -121,7 +121,7 @@ func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTar

// RegisterWatcher registers given CRD type and CRD name.
// For each CRD type, it can be registered only once.
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRDWatcher, error) {
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget, isSecure bool) (*CRDWatcher, error) {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

@@ -160,7 +160,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
return k.controllers[target.Kind], nil
}

func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget, isSecure bool) error {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

1 change: 1 addition & 0 deletions pkg/main/main.go
Original file line number Diff line number Diff line change
@@ -29,4 +29,5 @@ func main() {
if err != nil {
log.Fatal(err)
}

}
2 changes: 1 addition & 1 deletion pkg/model/model.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,6 @@ type ClientIdentifier string

type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_SubscribeConfigServer

type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error
type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream, bool) error

type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error
30 changes: 27 additions & 3 deletions pkg/transport/grpc/server.go
Original file line number Diff line number Diff line change
@@ -15,16 +15,20 @@
package grpc

import (
"crypto/tls"
"fmt"
"io"
"log"
"net"

"github.com/opensergo/opensergo-control-plane/pkg/cert"

"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
"github.com/opensergo/opensergo-control-plane/pkg/util"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
@@ -42,10 +46,27 @@ type Server struct {
started *atomic.Bool
}

func NewSecureServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
connectionManager := NewConnectionManager()
cfg := &tls.Config{
GetCertificate: cert.GetCertificate,
ClientAuth: tls.VerifyClientCertIfGiven,
MinVersion: tls.VersionTLS12,
}
tlsCreds := credentials.NewTLS(cfg)
return &Server{
transportServer: newTransportServer(connectionManager, subscribeHandlers, true),
port: port,
grpcServer: grpc.NewServer(grpc.Creds(tlsCreds)),
started: atomic.NewBool(false),
connectionManager: connectionManager,
}
}

func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
connectionManager := NewConnectionManager()
return &Server{
transportServer: newTransportServer(connectionManager, subscribeHandlers),
transportServer: newTransportServer(connectionManager, subscribeHandlers, false),
port: port,
grpcServer: grpc.NewServer(),
started: atomic.NewBool(false),
@@ -84,6 +105,8 @@ type TransportServer struct {
connectionManager *ConnectionManager

subscribeHandlers []model.SubscribeRequestHandler
// whether the server use tls
isSecure bool
}

const (
@@ -144,7 +167,7 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
}

for _, handler := range s.subscribeHandlers {
err = handler(clientIdentifier, recvData, stream)
err = handler(clientIdentifier, recvData, stream, s.isSecure)
if err != nil {
// TODO: handle error
log.Printf("Failed to handle SubscribeRequest, err=%s\n", err.Error())
@@ -155,9 +178,10 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
}
}

func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler) *TransportServer {
func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler, isSecure bool) *TransportServer {
return &TransportServer{
connectionManager: connectionManager,
subscribeHandlers: subscribeHandlers,
isSecure: isSecure,
}
}