Skip to content

Commit cc2b2b1

Browse files
authored
Merge pull request #4463 from shraddhabang/agaegmanagement
[feat aga] Implement endpoint group management with port override conflict resolution
2 parents 0326ad1 + a5bfc10 commit cc2b2b1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+8547
-100
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package eventhandlers
2+
3+
import (
4+
"context"
5+
"github.com/go-logr/logr"
6+
corev1 "k8s.io/api/core/v1"
7+
networking "k8s.io/api/networking/v1"
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"k8s.io/apimachinery/pkg/types"
10+
"k8s.io/client-go/util/workqueue"
11+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"sigs.k8s.io/controller-runtime/pkg/event"
14+
"sigs.k8s.io/controller-runtime/pkg/handler"
15+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
16+
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
17+
)
18+
19+
// NewEnqueueRequestsForResourceEvent creates a new handler for generic resource events
20+
func NewEnqueueRequestsForResourceEvent(
21+
resourceType aga.ResourceType,
22+
referenceTracker *aga.ReferenceTracker,
23+
logger logr.Logger,
24+
) handler.EventHandler {
25+
return &enqueueRequestsForResourceEvent{
26+
resourceType: resourceType,
27+
referenceTracker: referenceTracker,
28+
logger: logger,
29+
}
30+
}
31+
32+
// enqueueRequestsForResourceEvent handles resource events and enqueues reconcile requests for GlobalAccelerators
33+
// that reference the resource
34+
type enqueueRequestsForResourceEvent struct {
35+
resourceType aga.ResourceType
36+
referenceTracker *aga.ReferenceTracker
37+
logger logr.Logger
38+
}
39+
40+
// The following methods implement handler.TypedEventHandler interface
41+
42+
// Create handles Create events with the typed API
43+
func (h *enqueueRequestsForResourceEvent) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
44+
h.handleResource(ctx, evt.Object, "created", queue)
45+
}
46+
47+
// Update handles Update events with the typed API
48+
func (h *enqueueRequestsForResourceEvent) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
49+
h.handleResource(ctx, evt.ObjectNew, "updated", queue)
50+
}
51+
52+
// Delete handles Delete events with the typed API
53+
func (h *enqueueRequestsForResourceEvent) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
54+
h.handleResource(ctx, evt.Object, "deleted", queue)
55+
}
56+
57+
// Generic handles Generic events with the typed API
58+
func (h *enqueueRequestsForResourceEvent) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
59+
h.handleResource(ctx, evt.Object, "generic event", queue)
60+
}
61+
62+
// handleTypedResource handles resource events for the typed interface
63+
func (h *enqueueRequestsForResourceEvent) handleResource(_ context.Context, obj interface{}, eventType string, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
64+
var namespace, name string
65+
66+
// Extract namespace and name based on the object type
67+
switch res := obj.(type) {
68+
case *corev1.Service:
69+
namespace = res.Namespace
70+
name = res.Name
71+
case *networking.Ingress:
72+
namespace = res.Namespace
73+
name = res.Name
74+
case *gwv1.Gateway:
75+
namespace = res.Namespace
76+
name = res.Name
77+
case *unstructured.Unstructured:
78+
namespace = res.GetNamespace()
79+
name = res.GetName()
80+
default:
81+
h.logger.Error(nil, "Unknown resource type", "type", h.resourceType)
82+
return
83+
}
84+
85+
resourceKey := aga.ResourceKey{
86+
Type: h.resourceType,
87+
Name: types.NamespacedName{
88+
Namespace: namespace,
89+
Name: name,
90+
},
91+
}
92+
93+
// If this resource is not referenced by any GA, no need to queue reconciles
94+
if !h.referenceTracker.IsResourceReferenced(resourceKey) {
95+
return
96+
}
97+
98+
// Get all GAs that reference this resource
99+
gaRefs := h.referenceTracker.GetGAsForResource(resourceKey)
100+
101+
// Queue reconcile for affected GAs
102+
for _, gaRef := range gaRefs {
103+
h.logger.V(1).Info("Enqueueing GA for reconcile due to resource event",
104+
"resourceType", h.resourceType,
105+
"resourceName", resourceKey.Name,
106+
"eventType", eventType,
107+
"ga", gaRef)
108+
109+
queue.Add(reconcile.Request{NamespacedName: gaRef})
110+
}
111+
}

controllers/aga/globalaccelerator_controller.go

Lines changed: 148 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ import (
3535
ctrl "sigs.k8s.io/controller-runtime"
3636
"sigs.k8s.io/controller-runtime/pkg/client"
3737
"sigs.k8s.io/controller-runtime/pkg/controller"
38+
"sigs.k8s.io/controller-runtime/pkg/event"
3839
"sigs.k8s.io/controller-runtime/pkg/reconcile"
40+
"sigs.k8s.io/controller-runtime/pkg/source"
3941

4042
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
43+
"sigs.k8s.io/aws-load-balancer-controller/controllers/aga/eventhandlers"
4144
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
4245
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
4346
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
@@ -50,6 +53,7 @@ import (
5053
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
5154
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
5255
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
56+
gwclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
5357
)
5458

5559
const (
@@ -83,7 +87,7 @@ const (
8387
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
8488

8589
// Create tracking provider
86-
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(config.AWSConfig.Region))
90+
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(cloud.Region()))
8791

8892
// Create model builder
8993
agaModelBuilder := aga.NewDefaultModelBuilder(
@@ -92,7 +96,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
9296
trackingProvider,
9397
config.FeatureGates,
9498
config.ClusterName,
95-
config.AWSConfig.Region,
99+
cloud.Region(),
96100
config.DefaultTags,
97101
config.ExternalManagedTags,
98102
logger.WithName("aga-model-builder"),
@@ -108,6 +112,18 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
108112
// Create status updater
109113
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)
110114

115+
// Create reference tracker for endpoint tracking
116+
referenceTracker := aga.NewReferenceTracker(logger.WithName("reference-tracker"))
117+
118+
// Create DNS resolver
119+
dnsResolver, err := aga.NewDNSResolver(cloud.ELBV2())
120+
if err != nil {
121+
logger.Error(err, "Failed to create DNS resolver")
122+
}
123+
124+
// Create unified endpoint loader
125+
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsResolver, logger.WithName("endpoint-loader"))
126+
111127
return &globalAcceleratorReconciler{
112128
k8sClient: k8sClient,
113129
eventRecorder: eventRecorder,
@@ -120,6 +136,13 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
120136
metricsCollector: metricsCollector,
121137
reconcileTracker: reconcileCounters.IncrementAGA,
122138

139+
// Components for endpoint reference tracking
140+
referenceTracker: referenceTracker,
141+
dnsResolver: dnsResolver,
142+
143+
// Unified endpoint loader
144+
endpointLoader: endpointLoader,
145+
123146
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
124147
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
125148
}
@@ -138,6 +161,21 @@ type globalAcceleratorReconciler struct {
138161
metricsCollector lbcmetrics.MetricCollector
139162
reconcileTracker func(namespaceName ktypes.NamespacedName)
140163

164+
// Components for endpoint reference tracking
165+
referenceTracker *aga.ReferenceTracker
166+
dnsResolver *aga.DNSResolver
167+
168+
// Unified endpoint loader
169+
endpointLoader aga.EndpointLoader
170+
171+
// Resources manager for dedicated endpoint resource watchers
172+
endpointResourcesManager aga.EndpointResourcesManager
173+
174+
// Event channels for dedicated watchers
175+
serviceEventChan chan event.GenericEvent
176+
ingressEventChan chan event.GenericEvent
177+
gatewayEventChan chan event.GenericEvent
178+
141179
maxConcurrentReconciles int
142180
maxExponentialBackoffDelay time.Duration
143181
}
@@ -194,6 +232,13 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
194232

195233
func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
196234
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
235+
// Clean up references in the reference tracker
236+
gaKey := k8s.NamespacedName(ga)
237+
r.referenceTracker.RemoveGA(gaKey)
238+
239+
// Clean up resource watches
240+
r.endpointResourcesManager.RemoveGA(gaKey)
241+
197242
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
198243
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
199244
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
@@ -224,6 +269,29 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
224269

225270
func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
226271
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
272+
273+
// Get all endpoints from GA
274+
endpoints := aga.GetAllEndpointsFromGA(ga)
275+
276+
// Track referenced endpoints
277+
r.referenceTracker.UpdateReferencesForGA(ga, endpoints)
278+
279+
// Update resource watches with the endpointResourcesManager
280+
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)
281+
282+
// Validate and load endpoint status using the endpoint loader
283+
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
284+
if len(fatalErrors) > 0 {
285+
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
286+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
287+
r.logger.Error(err, fmt.Sprintf("fatal error loading endpoints for %v", k8s.NamespacedName(ga)))
288+
// Handle other endpoint loading errors
289+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.EndpointLoadFailed, err.Error()); statusErr != nil {
290+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after endpoint load failure")
291+
}
292+
return err
293+
}
294+
227295
var stack core.Stack
228296
var accelerator *agamodel.Accelerator
229297
var err error
@@ -232,6 +300,8 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
232300
}
233301
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
234302
if err != nil {
303+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GatewayEventReasonFailedBuildModel, fmt.Sprintf("Failed to build model: %v", err))
304+
r.logger.Error(err, fmt.Sprintf("Failed to build model for: %v", k8s.NamespacedName(ga)))
235305
// Update status to indicate model building failure
236306
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
237307
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
@@ -246,7 +316,7 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
246316
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
247317
if err != nil {
248318
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))
249-
319+
r.logger.Error(err, fmt.Sprintf("Failed to deploy stack for: %v", k8s.NamespacedName(ga)))
250320
// Update status to indicate deployment failure
251321
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
252322
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
@@ -335,21 +405,91 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
335405
return nil
336406
}
337407

338-
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
408+
// Create event channels for dedicated watchers
409+
r.serviceEventChan = make(chan event.GenericEvent)
410+
r.ingressEventChan = make(chan event.GenericEvent)
411+
r.gatewayEventChan = make(chan event.GenericEvent)
412+
413+
// Initialize Gateway API client using the same config
414+
gwClient, err := gwclientset.NewForConfig(mgr.GetConfig())
415+
if err != nil {
416+
r.logger.Error(err, "Failed to create Gateway API client")
339417
return err
340418
}
341419

342-
// TODO: Add event handlers for Services, Ingresses, and Gateways
343-
// that are referenced by GlobalAccelerator endpoints
420+
// Initialize the endpoint resources manager with clients
421+
r.endpointResourcesManager = aga.NewEndpointResourcesManager(
422+
clientSet,
423+
gwClient,
424+
r.serviceEventChan,
425+
r.ingressEventChan,
426+
r.gatewayEventChan,
427+
r.logger.WithName("endpoint-resources-manager"),
428+
)
429+
430+
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
431+
return err
432+
}
344433

345-
return ctrl.NewControllerManagedBy(mgr).
434+
// Set up the controller builder
435+
ctrl, err := ctrl.NewControllerManagedBy(mgr).
346436
For(&agaapi.GlobalAccelerator{}).
347437
Named(controllerName).
348438
WithOptions(controller.Options{
349439
MaxConcurrentReconciles: r.maxConcurrentReconciles,
350440
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
351441
}).
352-
Complete(r)
442+
Build(r)
443+
444+
if err != nil {
445+
return err
446+
}
447+
448+
// Setup watches for resource events
449+
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
450+
return err
451+
}
452+
453+
return nil
454+
}
455+
456+
// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
457+
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
458+
loggerPrefix := r.logger.WithName("eventHandlers")
459+
460+
// Create handlers for our dedicated watchers
461+
serviceHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
462+
aga.ServiceResourceType,
463+
r.referenceTracker,
464+
loggerPrefix.WithName("service-handler"),
465+
)
466+
467+
ingressHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
468+
aga.IngressResourceType,
469+
r.referenceTracker,
470+
loggerPrefix.WithName("ingress-handler"),
471+
)
472+
473+
gatewayHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
474+
aga.GatewayResourceType,
475+
r.referenceTracker,
476+
loggerPrefix.WithName("gateway-handler"),
477+
)
478+
479+
// Add watches using the channel sources with event handlers
480+
if err := c.Watch(source.Channel(r.serviceEventChan, serviceHandler)); err != nil {
481+
return err
482+
}
483+
484+
if err := c.Watch(source.Channel(r.ingressEventChan, ingressHandler)); err != nil {
485+
return err
486+
}
487+
488+
if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
489+
return err
490+
}
491+
492+
return nil
353493
}
354494

355495
func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/golang/mock v1.6.0
2626
github.com/google/go-cmp v0.7.0
2727
github.com/google/uuid v1.6.0
28+
github.com/hashicorp/golang-lru v1.0.2
2829
github.com/onsi/ginkgo/v2 v2.23.3
2930
github.com/onsi/gomega v1.37.0
3031
github.com/pkg/errors v0.9.1
@@ -148,6 +149,7 @@ require (
148149
github.com/sirupsen/logrus v1.9.3 // indirect
149150
github.com/spf13/cast v1.7.0 // indirect
150151
github.com/spf13/cobra v1.9.1 // indirect
152+
github.com/stretchr/objx v0.5.2 // indirect
151153
github.com/valyala/bytebufferpool v1.0.0 // indirect
152154
github.com/valyala/fasthttp v1.34.0 // indirect
153155
github.com/x448/float16 v0.8.4 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
228228
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
229229
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
230230
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
231+
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
232+
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
231233
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw=
232234
github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU=
233235
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=

0 commit comments

Comments
 (0)