Skip to content

Commit

Permalink
[backport] fix: trigger xRoutes to reconcile when gateway/backend sta…
Browse files Browse the repository at this point in the history
…tus changed (#435) (#436)

* fix: trigger xRoutes to reconcile when gateway status changed



* fix: trigger xRoutes to reconcile when backend status changed



---------

Signed-off-by: Lin Yang <[email protected]>
  • Loading branch information
reaver-flomesh authored Nov 13, 2024
1 parent dec12e5 commit 8533c4e
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 0 deletions.
60 changes: 60 additions & 0 deletions pkg/controllers/gateway/v1/grpcroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (r *grpcRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := ctrl.NewControllerManagedBy(mgr).
For(&gwv1.GRPCRoute{}).
Watches(&gwv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(r.gatewayToGRPCRoutes)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.serviceToGRPCRoutes)).
Watches(&gwv1alpha3.BackendTLSPolicy{}, handler.EnqueueRequestsFromMapFunc(r.backendTLSToGRPCRoutes)).
Watches(&gwpav1alpha2.BackendLBPolicy{}, handler.EnqueueRequestsFromMapFunc(r.backendLBToGRPCRoutes)).
Watches(&gwv1beta1.ReferenceGrant{}, handler.EnqueueRequestsFromMapFunc(r.referenceGrantToGRPCRoutes)).
Expand All @@ -147,6 +149,64 @@ func (r *grpcRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return addGRPCRouteIndexers(context.Background(), mgr)
}

func (r *grpcRouteReconciler) gatewayToGRPCRoutes(ctx context.Context, object client.Object) []reconcile.Request {
gateway, ok := object.(*gwv1.Gateway)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1.GRPCRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.GatewayGRPCRouteIndex, client.ObjectKeyFromObject(gateway).String()),
}); err != nil {
log.Error().Msgf("Failed to list GRPCRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *grpcRouteReconciler) serviceToGRPCRoutes(ctx context.Context, object client.Object) []reconcile.Request {
service, ok := object.(*corev1.Service)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1.GRPCRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.BackendGRPCRouteIndex, client.ObjectKeyFromObject(service).String()),
}); err != nil {
log.Error().Msgf("Failed to list GRPCRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *grpcRouteReconciler) backendTLSToGRPCRoutes(ctx context.Context, object client.Object) []reconcile.Request {
policy, ok := object.(*gwv1alpha3.BackendTLSPolicy)
if !ok {
Expand Down
60 changes: 60 additions & 0 deletions pkg/controllers/gateway/v1/httproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (r *httpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := ctrl.NewControllerManagedBy(mgr).
For(&gwv1.HTTPRoute{}).
Watches(&gwv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(r.gatewayToHTTPRoutes)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.serviceToHTTPRoutes)).
Watches(&gwv1alpha3.BackendTLSPolicy{}, handler.EnqueueRequestsFromMapFunc(r.backendTLSToHTTPRoutes)).
Watches(&gwpav1alpha2.BackendLBPolicy{}, handler.EnqueueRequestsFromMapFunc(r.backendLBToHTTPRoutes)).
Watches(&gwpav1alpha2.HealthCheckPolicy{}, handler.EnqueueRequestsFromMapFunc(r.healthCheckToHTTPRoutes)).
Expand All @@ -149,6 +151,64 @@ func (r *httpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return addHTTPRouteIndexers(context.Background(), mgr)
}

func (r *httpRouteReconciler) gatewayToHTTPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
gateway, ok := object.(*gwv1.Gateway)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1.HTTPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.GatewayHTTPRouteIndex, client.ObjectKeyFromObject(gateway).String()),
}); err != nil {
log.Error().Msgf("Failed to list HTTPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *httpRouteReconciler) serviceToHTTPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
service, ok := object.(*corev1.Service)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1.HTTPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.BackendHTTPRouteIndex, client.ObjectKeyFromObject(service).String()),
}); err != nil {
log.Error().Msgf("Failed to list HTTPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *httpRouteReconciler) backendTLSToHTTPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
policy, ok := object.(*gwv1alpha3.BackendTLSPolicy)
if !ok {
Expand Down
60 changes: 60 additions & 0 deletions pkg/controllers/gateway/v1alpha2/tcproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (r *tcpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := ctrl.NewControllerManagedBy(mgr).
For(&gwv1alpha2.TCPRoute{}).
Watches(&gwv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(r.gatewayToTCPRoutes)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.serviceToTCPRoutes)).
Watches(&gwv1alpha3.BackendTLSPolicy{}, handler.EnqueueRequestsFromMapFunc(r.backendTLSToTCPRoutes)).
Watches(&gwv1beta1.ReferenceGrant{}, handler.EnqueueRequestsFromMapFunc(r.referenceGrantToTCPRoutes)).
Complete(r); err != nil {
Expand All @@ -138,6 +140,64 @@ func (r *tcpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return addTCPRouteIndexers(context.Background(), mgr)
}

func (r *tcpRouteReconciler) gatewayToTCPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
gateway, ok := object.(*gwv1.Gateway)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.TCPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.GatewayTCPRouteIndex, client.ObjectKeyFromObject(gateway).String()),
}); err != nil {
log.Error().Msgf("Failed to list TCPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *tcpRouteReconciler) serviceToTCPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
service, ok := object.(*corev1.Service)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.TCPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.BackendTCPRouteIndex, client.ObjectKeyFromObject(service).String()),
}); err != nil {
log.Error().Msgf("Failed to list TCPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *tcpRouteReconciler) backendTLSToTCPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
policy, ok := object.(*gwv1alpha3.BackendTLSPolicy)
if !ok {
Expand Down
66 changes: 66 additions & 0 deletions pkg/controllers/gateway/v1alpha2/tlsroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ package v1alpha2
import (
"context"

corev1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/fields"
"sigs.k8s.io/controller-runtime/pkg/handler"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/flomesh-io/fsm/pkg/gateway/status/routes"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -123,13 +129,73 @@ func (r *tlsRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := ctrl.NewControllerManagedBy(mgr).
For(&gwv1alpha2.TLSRoute{}).
Watches(&gwv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(r.gatewayToTLSRoutes)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.serviceToTLSRoutes)).
Complete(r); err != nil {
return err
}

return addTLSRouteIndexers(context.Background(), mgr)
}

func (r *tlsRouteReconciler) gatewayToTLSRoutes(ctx context.Context, object client.Object) []reconcile.Request {
gateway, ok := object.(*gwv1.Gateway)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.TLSRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.GatewayTLSRouteIndex, client.ObjectKeyFromObject(gateway).String()),
}); err != nil {
log.Error().Msgf("Failed to list TLSRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *tlsRouteReconciler) serviceToTLSRoutes(ctx context.Context, object client.Object) []reconcile.Request {
service, ok := object.(*corev1.Service)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.TLSRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.BackendTLSRouteIndex, client.ObjectKeyFromObject(service).String()),
}); err != nil {
log.Error().Msgf("Failed to list TLSRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func addTLSRouteIndexers(ctx context.Context, mgr manager.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(ctx, &gwv1alpha2.TLSRoute{}, constants.GatewayTLSRouteIndex, func(obj client.Object) []string {
tlsRoute := obj.(*gwv1alpha2.TLSRoute)
Expand Down
62 changes: 62 additions & 0 deletions pkg/controllers/gateway/v1alpha2/udproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package v1alpha2
import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/flomesh-io/fsm/pkg/gateway/status/routes"

"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -126,6 +128,8 @@ func (r *udpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := ctrl.NewControllerManagedBy(mgr).
For(&gwv1alpha2.UDPRoute{}).
Watches(&gwv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(r.gatewayToUDPRoutes)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.serviceToUDPRoutes)).
Watches(&gwv1beta1.ReferenceGrant{}, handler.EnqueueRequestsFromMapFunc(r.referenceGrantToUDPRoutes)).
Complete(r); err != nil {
return err
Expand All @@ -134,6 +138,64 @@ func (r *udpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return addUDPRouteIndexers(context.Background(), mgr)
}

func (r *udpRouteReconciler) gatewayToUDPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
gateway, ok := object.(*gwv1.Gateway)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.UDPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.GatewayUDPRouteIndex, client.ObjectKeyFromObject(gateway).String()),
}); err != nil {
log.Error().Msgf("Failed to list UDPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *udpRouteReconciler) serviceToUDPRoutes(ctx context.Context, object client.Object) []reconcile.Request {
service, ok := object.(*corev1.Service)
if !ok {
log.Error().Msgf("Unexpected type %T", object)
return nil
}

var requests []reconcile.Request

list := &gwv1alpha2.UDPRouteList{}
if err := r.fctx.Manager.GetCache().List(context.Background(), list, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(constants.BackendUDPRouteIndex, client.ObjectKeyFromObject(service).String()),
}); err != nil {
log.Error().Msgf("Failed to list UDPRoutes: %v", err)
return nil
}

for _, route := range list.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: route.Namespace,
Name: route.Name,
},
})
}

return requests
}

func (r *udpRouteReconciler) referenceGrantToUDPRoutes(ctx context.Context, obj client.Object) []reconcile.Request {
refGrant, ok := obj.(*gwv1beta1.ReferenceGrant)
if !ok {
Expand Down

0 comments on commit 8533c4e

Please sign in to comment.