Skip to content

Commit f25bda4

Browse files
embikvincepristtts
committed
Add support to builder for cluster-aware controllers
On-behalf-of: SAP [email protected] Co-authored-by: Vince Prignano <[email protected]> Co-authored-by: Dr. Stefan Schimanski <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent 172b2ec commit f25bda4

File tree

2 files changed

+156
-55
lines changed

2 files changed

+156
-55
lines changed

pkg/builder/controller.go

+146-55
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package builder
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"reflect"
@@ -25,10 +26,12 @@ import (
2526
"github.com/go-logr/logr"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/client-go/util/workqueue"
2830
"k8s.io/klog/v2"
2931

3032
"sigs.k8s.io/controller-runtime/pkg/client"
3133
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
34+
"sigs.k8s.io/controller-runtime/pkg/cluster"
3235
"sigs.k8s.io/controller-runtime/pkg/controller"
3336
"sigs.k8s.io/controller-runtime/pkg/handler"
3437
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -37,6 +40,9 @@ import (
3740
"sigs.k8s.io/controller-runtime/pkg/source"
3841
)
3942

43+
// Supporting mocking out functions for testing.
44+
var getGvk = apiutil.GVKForObject
45+
4046
// project represents other forms that we can use to
4147
// send/receive a given resource (metadata-only, unstructured, etc).
4248
type objectProjection int
@@ -48,23 +54,29 @@ const (
4854
projectAsMetadata
4955
)
5056

57+
// clusterWatcher sets up watches between a cluster and a controller.
58+
type typedClusterWatcher[request comparable] struct {
59+
ctrl controller.TypedController[request]
60+
forInput ForInput
61+
ownsInput []OwnsInput
62+
watchesInput []WatchesInput[request]
63+
globalPredicates []predicate.Predicate
64+
clusterAwareRawSources []source.TypedClusterAwareSource[request]
65+
}
66+
5167
// Builder builds a Controller.
5268
type Builder = TypedBuilder[reconcile.Request]
5369

5470
// TypedBuilder builds a Controller. The request is the request type
5571
// that is passed to the workqueue and then to the Reconciler.
5672
// The workqueue de-duplicates identical requests.
5773
type TypedBuilder[request comparable] struct {
58-
forInput ForInput
59-
ownsInput []OwnsInput
60-
rawSources []source.TypedSource[request]
61-
watchesInput []WatchesInput[request]
62-
mgr manager.Manager
63-
globalPredicates []predicate.Predicate
64-
ctrl controller.TypedController[request]
65-
ctrlOptions controller.TypedOptions[request]
66-
name string
67-
newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error)
74+
typedClusterWatcher[request]
75+
mgr manager.Manager
76+
ctrlOptions controller.TypedOptions[request]
77+
name string
78+
rawSources []source.TypedSource[request]
79+
newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error)
6880
}
6981

7082
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
@@ -216,8 +228,12 @@ func (blder *TypedBuilder[request]) WatchesMetadata(
216228
//
217229
// WatchesRawSource makes it possible to use typed handlers and predicates with `source.Kind` as well as custom source implementations.
218230
func (blder *TypedBuilder[request]) WatchesRawSource(src source.TypedSource[request]) *TypedBuilder[request] {
219-
blder.rawSources = append(blder.rawSources, src)
231+
if src, ok := src.(source.TypedClusterAwareSource[request]); ok {
232+
blder.clusterAwareRawSources = append(blder.clusterAwareRawSources, src)
233+
return blder
234+
}
220235

236+
blder.rawSources = append(blder.rawSources, src)
221237
return blder
222238
}
223239

@@ -279,35 +295,33 @@ func (blder *TypedBuilder[request]) Build(r reconcile.TypedReconciler[request])
279295
return nil, err
280296
}
281297

298+
if blder.ctrlOptions.EngageWithDefaultCluster == nil {
299+
blder.ctrlOptions.EngageWithDefaultCluster = blder.mgr.GetControllerOptions().EngageWithDefaultCluster
300+
}
301+
302+
if blder.ctrlOptions.EngageWithProviderClusters == nil {
303+
blder.ctrlOptions.EngageWithProviderClusters = blder.mgr.GetControllerOptions().EngageWithProviderClusters
304+
}
305+
282306
// Set the Watch
283307
if err := blder.doWatch(); err != nil {
284308
return nil, err
285309
}
286310

287-
return blder.ctrl, nil
288-
}
289-
290-
func (blder *TypedBuilder[request]) project(obj client.Object, proj objectProjection) (client.Object, error) {
291-
switch proj {
292-
case projectAsNormal:
293-
return obj, nil
294-
case projectAsMetadata:
295-
metaObj := &metav1.PartialObjectMetadata{}
296-
gvk, err := apiutil.GVKForObject(obj, blder.mgr.GetScheme())
297-
if err != nil {
298-
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
311+
if *blder.ctrlOptions.EngageWithProviderClusters {
312+
// wrap as cluster.Aware to be engaged with provider clusters on demand
313+
if err := blder.mgr.Add(controller.NewTypedMultiClusterController(blder.ctrl, &blder.typedClusterWatcher)); err != nil {
314+
return nil, err
299315
}
300-
metaObj.SetGroupVersionKind(gvk)
301-
return metaObj, nil
302-
default:
303-
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
304316
}
317+
318+
return blder.ctrl, nil
305319
}
306320

307-
func (blder *TypedBuilder[request]) doWatch() error {
321+
func (cc *typedClusterWatcher[request]) Watch(ctx context.Context, cl cluster.Cluster) error {
308322
// Reconcile type
309-
if blder.forInput.object != nil {
310-
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
323+
if cc.forInput.object != nil {
324+
obj, err := project(cl, cc.forInput.object, cc.forInput.objectProjection)
311325
if err != nil {
312326
return err
313327
}
@@ -318,20 +332,16 @@ func (blder *TypedBuilder[request]) doWatch() error {
318332

319333
var hdler handler.TypedEventHandler[client.Object, request]
320334
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
321-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
322-
allPredicates = append(allPredicates, blder.forInput.predicates...)
323-
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
324-
if err := blder.ctrl.Watch(src); err != nil {
335+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
336+
allPredicates = append(allPredicates, cc.forInput.predicates...)
337+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)}
338+
if err := cc.ctrl.Watch(src); err != nil {
325339
return err
326340
}
327341
}
328342

329-
// Watches the managed types
330-
if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
331-
return errors.New("Owns() can only be used together with For()")
332-
}
333-
for _, own := range blder.ownsInput {
334-
obj, err := blder.project(own.object, own.objectProjection)
343+
for _, own := range cc.ownsInput {
344+
obj, err := project(cl, own.object, own.objectProjection)
335345
if err != nil {
336346
return err
337347
}
@@ -342,38 +352,69 @@ func (blder *TypedBuilder[request]) doWatch() error {
342352

343353
var hdler handler.TypedEventHandler[client.Object, request]
344354
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
345-
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
346-
blder.forInput.object,
355+
cl.GetScheme(), cl.GetRESTMapper(),
356+
cc.forInput.object,
347357
opts...,
348358
)))
349-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
359+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
350360
allPredicates = append(allPredicates, own.predicates...)
351-
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
352-
if err := blder.ctrl.Watch(src); err != nil {
361+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)}
362+
if err := cc.ctrl.Watch(src); err != nil {
353363
return err
354364
}
355365
}
356366

357-
// Do the watch requests
358-
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
359-
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
360-
}
361-
for _, w := range blder.watchesInput {
362-
projected, err := blder.project(w.obj, w.objectProjection)
367+
for _, w := range cc.watchesInput {
368+
projected, err := project(cl, w.obj, w.objectProjection)
363369
if err != nil {
364370
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
365371
}
366-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
372+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
367373
allPredicates = append(allPredicates, w.predicates...)
368-
if err := blder.ctrl.Watch(source.TypedKind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
374+
375+
h := w.handler
376+
if deepCopyableHandler, ok := h.(handler.TypedDeepCopyableEventHandler[client.Object, request]); ok {
377+
h = deepCopyableHandler.DeepCopyFor(cl)
378+
}
379+
380+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), projected, h, allPredicates...)}
381+
if err := cc.ctrl.Watch(src); err != nil {
369382
return err
370383
}
371384
}
372-
for _, src := range blder.rawSources {
373-
if err := blder.ctrl.Watch(src); err != nil {
385+
386+
for _, src := range cc.clusterAwareRawSources {
387+
if err := cc.ctrl.Watch(src); err != nil {
374388
return err
375389
}
376390
}
391+
392+
return nil
393+
}
394+
395+
func (blder *TypedBuilder[request]) doWatch() error {
396+
// Pre-checks for a valid configuration
397+
if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
398+
return errors.New("Owns() can only be used together with For()")
399+
}
400+
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
401+
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
402+
}
403+
if !*blder.ctrlOptions.EngageWithDefaultCluster && len(blder.rawSources) > 0 {
404+
return errors.New("when using a cluster adapter without watching the default cluster, non-cluster-aware custom raw watches are not allowed")
405+
}
406+
407+
if *blder.ctrlOptions.EngageWithDefaultCluster {
408+
if err := blder.Watch(unboundedContext, blder.mgr); err != nil {
409+
return err
410+
}
411+
412+
for _, src := range blder.rawSources {
413+
if err := blder.ctrl.Watch(src); err != nil {
414+
return err
415+
}
416+
}
417+
}
377418
return nil
378419
}
379420

@@ -464,3 +505,53 @@ func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[req
464505
blder.ctrl, err = blder.newController(controllerName, blder.mgr, ctrlOptions)
465506
return err
466507
}
508+
509+
func project(cl cluster.Cluster, obj client.Object, proj objectProjection) (client.Object, error) {
510+
switch proj {
511+
case projectAsNormal:
512+
return obj, nil
513+
case projectAsMetadata:
514+
metaObj := &metav1.PartialObjectMetadata{}
515+
gvk, err := getGvk(obj, cl.GetScheme())
516+
if err != nil {
517+
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
518+
}
519+
metaObj.SetGroupVersionKind(gvk)
520+
return metaObj, nil
521+
default:
522+
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
523+
}
524+
}
525+
526+
// ctxBoundedSyncingSource implements source.SyncingSource and wraps the ctx
527+
// passed to the methods into the life-cycle of another context, i.e. stop
528+
// whenever one of the contexts is done.
529+
type ctxBoundedSyncingSource[request comparable] struct {
530+
ctx context.Context
531+
src source.TypedSyncingSource[request]
532+
}
533+
534+
var unboundedContext context.Context = nil //nolint:revive // keep nil explicit for clarity.
535+
536+
var _ source.SyncingSource = &ctxBoundedSyncingSource[reconcile.Request]{}
537+
538+
func (s *ctxBoundedSyncingSource[request]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[request]) error {
539+
return s.src.Start(joinContexts(ctx, s.ctx), q)
540+
}
541+
542+
func (s *ctxBoundedSyncingSource[request]) WaitForSync(ctx context.Context) error {
543+
return s.src.WaitForSync(joinContexts(ctx, s.ctx))
544+
}
545+
546+
func joinContexts(ctx, bound context.Context) context.Context {
547+
if bound == unboundedContext {
548+
return ctx
549+
}
550+
551+
ctx, cancel := context.WithCancel(ctx)
552+
go func() {
553+
defer cancel()
554+
<-bound.Done()
555+
}()
556+
return ctx
557+
}

pkg/handler/eventhandler.go

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"k8s.io/client-go/util/workqueue"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
24+
"sigs.k8s.io/controller-runtime/pkg/cluster"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
2526
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2627
)
@@ -80,6 +81,15 @@ type TypedEventHandler[object any, request comparable] interface {
8081
Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
8182
}
8283

84+
// TypedDeepCopyableEventHandler embeds a TypedEventHandler, but supports being deep copied for a
85+
// specific cluster.Cluster object. In multi-cluster scenarios, any event handler that stores information
86+
// about the cluster in its own state (versus extracting this information from the object passed via the event)
87+
// should support this interface to create a new instance of an event handler when new clusters get engaged.
88+
type TypedDeepCopyableEventHandler[object any, request comparable] interface {
89+
TypedEventHandler[object, request]
90+
DeepCopyFor(c cluster.Cluster) TypedDeepCopyableEventHandler[object, request]
91+
}
92+
8393
var _ EventHandler = Funcs{}
8494

8595
// Funcs implements eventhandler.

0 commit comments

Comments
 (0)