Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ctrl): implements MeshFederation reconciliation #170

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ define local_tag
$(TAG)$(shell [ "$(USE_LOCAL_IMAGE)" = "true" ] && echo "-local")
endef

.PHONY: e2e
TEST_SUITES ?= remote_ip remote_dns_name spire
.PHONY: e2e
e2e: kind-clusters ## Runs end-to-end tests against KinD clusters
@local_tag=$(call local_tag); \
$(foreach suite, $(TEST_SUITES), \
Expand Down
2 changes: 1 addition & 1 deletion Makefile.func.mk
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ define go-mod-version
$(shell go mod graph | grep $(1) | head -n 1 | cut -d'@' -f 2)
endef

# Using controller-gen to fetch external CRDs and put them in defined folder folder
# Using controller-gen to fetch external CRDs and put them in defined folder.
# They can be used e.g. in testing using EnvTest where controller under test
# requires additional resources to manage.
#
Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/meshfederation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type MeshFederationStatus struct {
// Conditions describes the state of the MeshFederation resource.
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`

// +optional
ExportedServices []string `json:"exportedServices,omitempty"`
}

type PortConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ spec:
- type
type: object
type: array
exportedServices:
items:
type: string
type: array
type: object
type: object
served: true
Expand Down
10 changes: 5 additions & 5 deletions chart/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ rules:
verbs: ["get", "list", "create", "update", "patch", "delete"]
- apiGroups: ["security.istio.io"]
resources: ["peerauthentications"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
{{- if (include "remotes.hasOpenshiftRouterPeer" .) }}
- apiGroups: ["networking.istio.io"]
resources: ["destinationrules"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
{{- end }}
{{- if eq .Values.federation.meshPeers.local.ingressType "openshift-router" }}
- apiGroups: ["networking.istio.io"]
resources: ["envoyfilters"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["route.openshift.io"]
resources: ["routes", "routes/custom-host"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
{{- end }}
- apiGroups: ["federation.openshift-service-mesh.io"]
resources: ["meshfederations", "federatedservices"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["federation.openshift-service-mesh.io"]
resources: ["meshfederations/status", "federatedservices/status"]
verbs: ["get"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
2 changes: 1 addition & 1 deletion cmd/federation-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func startFDSClient(ctx context.Context, remote config.Remote, meshConfigPushReq
DiscoveryAddr: discoveryAddr,
Authority: remote.ServiceFQDN(),
Handlers: map[string]adsc.ResponseHandler{
xds.ExportedServiceTypeUrl: fds.NewImportedServiceHandler(importedServiceStore, meshConfigPushRequests),
xds.FederatedServiceTypeUrl: fds.NewImportedServiceHandler(importedServiceStore, meshConfigPushRequests),
},
ReconnectDelay: reconnectDelay,
})
Expand Down
706 changes: 706 additions & 0 deletions docs/arch/diagrams/ctrl-overview.drawio

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions internal/controller/meshfederation/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright Red Hat, Inc.
//
// Licensed under the Apache License, Version 2.0 (the License);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an AS IS BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meshfederation

import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

protov1alpha1 "github.com/openshift-service-mesh/federation/internal/api/federation/v1alpha1"
"github.com/openshift-service-mesh/federation/internal/pkg/discovery"
)

// TODO(design): should we have one server per MF or single server broadcasting to all -> that would imply recognizing subscribers somehow
// TODO(design): currently we won't be able to run two meshfederations at once due to port conflict
type serverExporter struct {
server *discovery.Server
handler *exportedServicesBroadcaster
}

type serviceExporterRegistry struct {
exporters sync.Map
}

func (r *serviceExporterRegistry) LoadOrStore(name string, serviceExporter *exportedServicesBroadcaster) *discovery.Server {
actual, exists := r.exporters.LoadOrStore(name, serverExporter{
server: discovery.NewServer(serviceExporter),
handler: serviceExporter,
})

exporter := actual.(serverExporter)
if exists {
// update settings
exporter.handler.selector = serviceExporter.selector
}

return exporter.server
}

var _ discovery.RequestHandler = (*exportedServicesBroadcaster)(nil)

type exportedServicesBroadcaster struct {
client client.Client
typeUrl string
selector labels.Selector
}

func (e exportedServicesBroadcaster) GetTypeUrl() string {
return e.typeUrl
}

func (e exportedServicesBroadcaster) GenerateResponse() ([]*anypb.Any, error) {
services := &corev1.ServiceList{}
// TODO: rework ads(s|c) to get ctx?
// We cannot latch into ctx from owning Reconcile call, as this piece of code can be called from outside reconcile loop on client push request.
if errSvcList := e.client.List(context.TODO(), services, client.MatchingLabelsSelector{Selector: e.selector}); errSvcList != nil {
return []*anypb.Any{}, errSvcList
}

return convert(services.Items)
}

func convert(services []corev1.Service) ([]*anypb.Any, error) {
var federatedServices []*protov1alpha1.FederatedService

for _, svc := range services {
var ports []*protov1alpha1.ServicePort
for _, port := range svc.Spec.Ports {
servicePort := &protov1alpha1.ServicePort{
Name: port.Name,
Number: uint32(port.Port),
}
if port.TargetPort.IntVal != 0 {
servicePort.TargetPort = uint32(port.TargetPort.IntVal)
}
servicePort.Protocol = detectProtocol(port.Name)
ports = append(ports, servicePort)
}
federatedSvc := &protov1alpha1.FederatedService{
Hostname: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, svc.Namespace),
Ports: ports,
Labels: svc.Labels,
}
federatedServices = append(federatedServices, federatedSvc)
}

return serialize(federatedServices)
}

// TODO: check appProtocol and reject UDP
func detectProtocol(portName string) string {
if portName == "https" || strings.HasPrefix(portName, "https-") {
return "HTTPS"
} else if portName == "http" || strings.HasPrefix(portName, "http-") {
return "HTTP"
} else if portName == "http2" || strings.HasPrefix(portName, "http2-") {
return "HTTP2"
} else if portName == "grpc" || strings.HasPrefix(portName, "grpc-") {
return "GRPC"
} else if portName == "tls" || strings.HasPrefix(portName, "tls-") {
return "TLS"
} else if portName == "mongo" || strings.HasPrefix(portName, "mongo-") {
return "MONGO"
}
return "TCP"
}

func serialize(exportedServices []*protov1alpha1.FederatedService) ([]*anypb.Any, error) {
var serializedServices []*anypb.Any
for _, exportedService := range exportedServices {
serializedExportedService := &anypb.Any{}
if err := anypb.MarshalFrom(serializedExportedService, exportedService, proto.MarshalOptions{}); err != nil {
return []*anypb.Any{}, fmt.Errorf("failed to serialize ExportedService %s to protobuf message: %w", exportedService.Hostname, err)
}
serializedServices = append(serializedServices, serializedExportedService)
}
return serializedServices, nil
}
Loading
Loading