Skip to content

Commit 5f3bf93

Browse files
embikvincepristtts
committed
Add support to builder for cluster-aware controllers
On-behalf-of: SAP [email protected] Co-authored-by: Vince Prignano <[email protected]> Co-authored-by: Dr. Stefan Schimanski <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent 172b2ec commit 5f3bf93

File tree

1 file changed

+153
-55
lines changed

1 file changed

+153
-55
lines changed

pkg/builder/controller.go

+153-55
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package builder
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"reflect"
@@ -25,10 +26,12 @@ import (
2526
"github.com/go-logr/logr"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/client-go/util/workqueue"
2830
"k8s.io/klog/v2"
2931

3032
"sigs.k8s.io/controller-runtime/pkg/client"
3133
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
34+
"sigs.k8s.io/controller-runtime/pkg/cluster"
3235
"sigs.k8s.io/controller-runtime/pkg/controller"
3336
"sigs.k8s.io/controller-runtime/pkg/handler"
3437
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -37,6 +40,9 @@ import (
3740
"sigs.k8s.io/controller-runtime/pkg/source"
3841
)
3942

43+
// Supporting mocking out functions for testing.
44+
var getGvk = apiutil.GVKForObject
45+
4046
// project represents other forms that we can use to
4147
// send/receive a given resource (metadata-only, unstructured, etc).
4248
type objectProjection int
@@ -48,23 +54,30 @@ const (
4854
projectAsMetadata
4955
)
5056

57+
// clusterWatcher sets up watches between a cluster and a controller.
58+
type typedClusterWatcher[request comparable] struct {
59+
ctrl controller.TypedController[request]
60+
forInput ForInput
61+
ownsInput []OwnsInput
62+
watchesInput []WatchesInput[request]
63+
globalPredicates []predicate.Predicate
64+
clusterAwareRawSources []source.TypedClusterAwareSource[request]
65+
handlerWrapper func(handler.TypedEventHandler[client.Object, request], cluster.Cluster) handler.TypedEventHandler[client.Object, request]
66+
}
67+
5168
// Builder builds a Controller.
5269
type Builder = TypedBuilder[reconcile.Request]
5370

5471
// TypedBuilder builds a Controller. The request is the request type
5572
// that is passed to the workqueue and then to the Reconciler.
5673
// The workqueue de-duplicates identical requests.
5774
type TypedBuilder[request comparable] struct {
58-
forInput ForInput
59-
ownsInput []OwnsInput
60-
rawSources []source.TypedSource[request]
61-
watchesInput []WatchesInput[request]
62-
mgr manager.Manager
63-
globalPredicates []predicate.Predicate
64-
ctrl controller.TypedController[request]
65-
ctrlOptions controller.TypedOptions[request]
66-
name string
67-
newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error)
75+
typedClusterWatcher[request]
76+
mgr manager.Manager
77+
ctrlOptions controller.TypedOptions[request]
78+
name string
79+
rawSources []source.TypedSource[request]
80+
newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error)
6881
}
6982

7083
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
@@ -216,8 +229,12 @@ func (blder *TypedBuilder[request]) WatchesMetadata(
216229
//
217230
// WatchesRawSource makes it possible to use typed handlers and predicates with `source.Kind` as well as custom source implementations.
218231
func (blder *TypedBuilder[request]) WatchesRawSource(src source.TypedSource[request]) *TypedBuilder[request] {
219-
blder.rawSources = append(blder.rawSources, src)
232+
if src, ok := src.(source.TypedClusterAwareSource[request]); ok {
233+
blder.clusterAwareRawSources = append(blder.clusterAwareRawSources, src)
234+
return blder
235+
}
220236

237+
blder.rawSources = append(blder.rawSources, src)
221238
return blder
222239
}
223240

@@ -232,6 +249,12 @@ func (blder *TypedBuilder[request]) WithEventFilter(p predicate.Predicate) *Type
232249
return blder
233250
}
234251

252+
// WithHandlerWrapper sets an optional wrapper function that is applied to the TypedEventHandler for watches set up via the Watches func.
253+
func (blder *TypedBuilder[request]) WithHandlerWrapper(handlerWrapper func(handler.TypedEventHandler[client.Object, request], cluster.Cluster) handler.TypedEventHandler[client.Object, request]) *TypedBuilder[request] {
254+
blder.handlerWrapper = handlerWrapper
255+
return blder
256+
}
257+
235258
// WithOptions overrides the controller options used in doController. Defaults to empty.
236259
func (blder *TypedBuilder[request]) WithOptions(options controller.TypedOptions[request]) *TypedBuilder[request] {
237260
blder.ctrlOptions = options
@@ -279,35 +302,33 @@ func (blder *TypedBuilder[request]) Build(r reconcile.TypedReconciler[request])
279302
return nil, err
280303
}
281304

305+
if blder.ctrlOptions.EngageWithDefaultCluster == nil {
306+
blder.ctrlOptions.EngageWithDefaultCluster = blder.mgr.GetControllerOptions().EngageWithDefaultCluster
307+
}
308+
309+
if blder.ctrlOptions.EngageWithProviderClusters == nil {
310+
blder.ctrlOptions.EngageWithProviderClusters = blder.mgr.GetControllerOptions().EngageWithProviderClusters
311+
}
312+
282313
// Set the Watch
283314
if err := blder.doWatch(); err != nil {
284315
return nil, err
285316
}
286317

287-
return blder.ctrl, nil
288-
}
289-
290-
func (blder *TypedBuilder[request]) project(obj client.Object, proj objectProjection) (client.Object, error) {
291-
switch proj {
292-
case projectAsNormal:
293-
return obj, nil
294-
case projectAsMetadata:
295-
metaObj := &metav1.PartialObjectMetadata{}
296-
gvk, err := apiutil.GVKForObject(obj, blder.mgr.GetScheme())
297-
if err != nil {
298-
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
318+
if *blder.ctrlOptions.EngageWithProviderClusters {
319+
// wrap as cluster.Aware to be engaged with provider clusters on demand
320+
if err := blder.mgr.Add(controller.NewTypedMultiClusterController(blder.ctrl, &blder.typedClusterWatcher)); err != nil {
321+
return nil, err
299322
}
300-
metaObj.SetGroupVersionKind(gvk)
301-
return metaObj, nil
302-
default:
303-
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
304323
}
324+
325+
return blder.ctrl, nil
305326
}
306327

307-
func (blder *TypedBuilder[request]) doWatch() error {
328+
func (cc *typedClusterWatcher[request]) Watch(ctx context.Context, cl cluster.Cluster) error {
308329
// Reconcile type
309-
if blder.forInput.object != nil {
310-
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
330+
if cc.forInput.object != nil {
331+
obj, err := project(cl, cc.forInput.object, cc.forInput.objectProjection)
311332
if err != nil {
312333
return err
313334
}
@@ -318,20 +339,16 @@ func (blder *TypedBuilder[request]) doWatch() error {
318339

319340
var hdler handler.TypedEventHandler[client.Object, request]
320341
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
321-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
322-
allPredicates = append(allPredicates, blder.forInput.predicates...)
323-
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
324-
if err := blder.ctrl.Watch(src); err != nil {
342+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
343+
allPredicates = append(allPredicates, cc.forInput.predicates...)
344+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)}
345+
if err := cc.ctrl.Watch(src); err != nil {
325346
return err
326347
}
327348
}
328349

329-
// Watches the managed types
330-
if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
331-
return errors.New("Owns() can only be used together with For()")
332-
}
333-
for _, own := range blder.ownsInput {
334-
obj, err := blder.project(own.object, own.objectProjection)
350+
for _, own := range cc.ownsInput {
351+
obj, err := project(cl, own.object, own.objectProjection)
335352
if err != nil {
336353
return err
337354
}
@@ -342,37 +359,68 @@ func (blder *TypedBuilder[request]) doWatch() error {
342359

343360
var hdler handler.TypedEventHandler[client.Object, request]
344361
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
345-
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
346-
blder.forInput.object,
362+
cl.GetScheme(), cl.GetRESTMapper(),
363+
cc.forInput.object,
347364
opts...,
348365
)))
349-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
366+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
350367
allPredicates = append(allPredicates, own.predicates...)
351-
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
352-
if err := blder.ctrl.Watch(src); err != nil {
368+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), obj, hdler, allPredicates...)}
369+
if err := cc.ctrl.Watch(src); err != nil {
353370
return err
354371
}
355372
}
356373

357-
// Do the watch requests
358-
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
359-
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
360-
}
361-
for _, w := range blder.watchesInput {
362-
projected, err := blder.project(w.obj, w.objectProjection)
374+
for _, w := range cc.watchesInput {
375+
projected, err := project(cl, w.obj, w.objectProjection)
363376
if err != nil {
364377
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
365378
}
366-
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
379+
allPredicates := append([]predicate.Predicate(nil), cc.globalPredicates...)
367380
allPredicates = append(allPredicates, w.predicates...)
368-
if err := blder.ctrl.Watch(source.TypedKind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
381+
382+
handler := w.handler
383+
if cc.handlerWrapper != nil {
384+
handler = cc.handlerWrapper(handler, cl)
385+
}
386+
387+
src := &ctxBoundedSyncingSource[request]{ctx: ctx, src: source.TypedKind(cl.GetCache(), projected, handler, allPredicates...)}
388+
if err := cc.ctrl.Watch(src); err != nil {
389+
return err
390+
}
391+
}
392+
393+
for _, src := range cc.clusterAwareRawSources {
394+
if err := cc.ctrl.Watch(src); err != nil {
369395
return err
370396
}
371397
}
372-
for _, src := range blder.rawSources {
373-
if err := blder.ctrl.Watch(src); err != nil {
398+
399+
return nil
400+
}
401+
402+
func (blder *TypedBuilder[request]) doWatch() error {
403+
// Pre-checks for a valid configuration
404+
if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
405+
return errors.New("Owns() can only be used together with For()")
406+
}
407+
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
408+
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
409+
}
410+
if !*blder.ctrlOptions.EngageWithDefaultCluster && len(blder.rawSources) > 0 {
411+
return errors.New("when using a cluster adapter without watching the default cluster, non-cluster-aware custom raw watches are not allowed")
412+
}
413+
414+
if *blder.ctrlOptions.EngageWithDefaultCluster {
415+
if err := blder.Watch(unboundedContext, blder.mgr); err != nil {
374416
return err
375417
}
418+
419+
for _, src := range blder.rawSources {
420+
if err := blder.ctrl.Watch(src); err != nil {
421+
return err
422+
}
423+
}
376424
}
377425
return nil
378426
}
@@ -464,3 +512,53 @@ func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[req
464512
blder.ctrl, err = blder.newController(controllerName, blder.mgr, ctrlOptions)
465513
return err
466514
}
515+
516+
func project(cl cluster.Cluster, obj client.Object, proj objectProjection) (client.Object, error) {
517+
switch proj {
518+
case projectAsNormal:
519+
return obj, nil
520+
case projectAsMetadata:
521+
metaObj := &metav1.PartialObjectMetadata{}
522+
gvk, err := getGvk(obj, cl.GetScheme())
523+
if err != nil {
524+
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
525+
}
526+
metaObj.SetGroupVersionKind(gvk)
527+
return metaObj, nil
528+
default:
529+
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
530+
}
531+
}
532+
533+
// ctxBoundedSyncingSource implements source.SyncingSource and wraps the ctx
534+
// passed to the methods into the life-cycle of another context, i.e. stop
535+
// whenever one of the contexts is done.
536+
type ctxBoundedSyncingSource[request comparable] struct {
537+
ctx context.Context
538+
src source.TypedSyncingSource[request]
539+
}
540+
541+
var unboundedContext context.Context = nil //nolint:revive // keep nil explicit for clarity.
542+
543+
var _ source.SyncingSource = &ctxBoundedSyncingSource[reconcile.Request]{}
544+
545+
func (s *ctxBoundedSyncingSource[request]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[request]) error {
546+
return s.src.Start(joinContexts(ctx, s.ctx), q)
547+
}
548+
549+
func (s *ctxBoundedSyncingSource[request]) WaitForSync(ctx context.Context) error {
550+
return s.src.WaitForSync(joinContexts(ctx, s.ctx))
551+
}
552+
553+
func joinContexts(ctx, bound context.Context) context.Context {
554+
if bound == unboundedContext {
555+
return ctx
556+
}
557+
558+
ctx, cancel := context.WithCancel(ctx)
559+
go func() {
560+
defer cancel()
561+
<-bound.Done()
562+
}()
563+
return ctx
564+
}

0 commit comments

Comments
 (0)