Skip to content

Commit 172b2ec

Browse files
embikvincepristtts
committed
Add experimental support for pluggable cluster providers to manager
On-behalf-of: SAP [email protected] Co-authored-by: Vince Prignano <[email protected]> Co-authored-by: Dr. Stefan Schimanski <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent 36914de commit 172b2ec

File tree

5 files changed

+195
-25
lines changed

5 files changed

+195
-25
lines changed

pkg/config/controller.go

+12
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,16 @@ type Controller struct {
5353
// NeedLeaderElection indicates whether the controller needs to use leader election.
5454
// Defaults to true, which means the controller will use leader election.
5555
NeedLeaderElection *bool
56+
57+
// EngageWithDefaultCluster indicates whether the controller should engage
58+
// with the default cluster. This default to false if a cluster provider
59+
// is configured, and to true otherwise.
60+
//
61+
// This is an experimental feature and is subject to change.
62+
EngageWithDefaultCluster *bool
63+
64+
// EngageWithProvidedClusters indicates whether the controller should engage
65+
// with the provided clusters of the manager. This defaults to true if a
66+
// cluster provider is set, and to false otherwise.
67+
EngageWithProviderClusters *bool
5668
}

pkg/manager/internal.go

+139-13
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
)
6060

6161
var _ Runnable = &controllerManager{}
62+
var _ cluster.Aware = &controllerManager{}
63+
var _ Manager = &controllerManager{}
6264

6365
type controllerManager struct {
6466
sync.Mutex
@@ -68,8 +70,14 @@ type controllerManager struct {
6870
errChan chan error
6971
runnables *runnables
7072

71-
// cluster holds a variety of methods to interact with a cluster. Required.
72-
cluster cluster.Cluster
73+
// defaultCluster holds a variety of methods to interact with a defaultCluster. Required.
74+
defaultCluster cluster.Cluster
75+
defaultClusterOptions cluster.Option
76+
77+
// engagedCluster is a map of engaged clusters. The can come and go as the manager is running.
78+
engagedClustersLock sync.RWMutex
79+
engagedClusters map[string]cluster.Cluster
80+
clusterAwareRunnables []cluster.Aware
7381

7482
// recorderProvider is used to generate event recorders that will be injected into Controllers
7583
// (and EventHandlers, Sources and Predicates).
@@ -161,6 +169,9 @@ type controllerManager struct {
161169
// internalProceduresStop channel is used internally to the manager when coordinating
162170
// the proper shutdown of servers. This channel is also used for dependency injection.
163171
internalProceduresStop chan struct{}
172+
173+
// clusterProvider is used to get clusters by name, beyond the default cluster.
174+
clusterProvider cluster.Provider
164175
}
165176

166177
type hasCache interface {
@@ -176,7 +187,40 @@ func (cm *controllerManager) Add(r Runnable) error {
176187
}
177188

178189
func (cm *controllerManager) add(r Runnable) error {
179-
return cm.runnables.Add(r)
190+
var engaged []cluster.Aware
191+
var errs []error
192+
disengage := func() {
193+
for _, aware := range engaged {
194+
if err := aware.Disengage(cm.internalCtx, cm.defaultCluster); err != nil {
195+
errs = append(errs, err)
196+
}
197+
}
198+
}
199+
200+
// engage with existing clusters (this is reversible)
201+
if aware, ok := r.(cluster.Aware); ok {
202+
cm.engagedClustersLock.RLock()
203+
defer cm.engagedClustersLock.RUnlock()
204+
for _, cl := range cm.engagedClusters {
205+
if err := aware.Engage(cm.internalCtx, cl); err != nil {
206+
errs = append(errs, err)
207+
break
208+
}
209+
engaged = append(engaged, aware)
210+
}
211+
if len(errs) > 0 {
212+
disengage()
213+
return kerrors.NewAggregate(errs)
214+
}
215+
cm.clusterAwareRunnables = append(cm.clusterAwareRunnables, aware)
216+
} else {
217+
if err := cm.runnables.Add(r); err != nil {
218+
disengage()
219+
return err
220+
}
221+
}
222+
223+
return nil
180224
}
181225

182226
// AddMetricsServerExtraHandler adds extra handler served on path to the http server that serves metrics.
@@ -231,40 +275,58 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
231275
return nil
232276
}
233277

278+
func (cm *controllerManager) Name() string {
279+
return cm.defaultCluster.Name()
280+
}
281+
282+
func (cm *controllerManager) GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error) {
283+
if clusterName == "" || clusterName == cm.defaultCluster.Name() {
284+
return cm.defaultCluster, nil
285+
}
286+
287+
if cm.clusterProvider == nil {
288+
return nil, fmt.Errorf("cluster %q not found, cluster provider is not set", clusterName)
289+
}
290+
291+
// intentionally not returning from engaged clusters. This can be used
292+
// without engaging clusters.
293+
return cm.clusterProvider.Get(ctx, clusterName)
294+
}
295+
234296
func (cm *controllerManager) GetHTTPClient() *http.Client {
235-
return cm.cluster.GetHTTPClient()
297+
return cm.defaultCluster.GetHTTPClient()
236298
}
237299

238300
func (cm *controllerManager) GetConfig() *rest.Config {
239-
return cm.cluster.GetConfig()
301+
return cm.defaultCluster.GetConfig()
240302
}
241303

242304
func (cm *controllerManager) GetClient() client.Client {
243-
return cm.cluster.GetClient()
305+
return cm.defaultCluster.GetClient()
244306
}
245307

246308
func (cm *controllerManager) GetScheme() *runtime.Scheme {
247-
return cm.cluster.GetScheme()
309+
return cm.defaultCluster.GetScheme()
248310
}
249311

250312
func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
251-
return cm.cluster.GetFieldIndexer()
313+
return cm.defaultCluster.GetFieldIndexer()
252314
}
253315

254316
func (cm *controllerManager) GetCache() cache.Cache {
255-
return cm.cluster.GetCache()
317+
return cm.defaultCluster.GetCache()
256318
}
257319

258320
func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
259-
return cm.cluster.GetEventRecorderFor(name)
321+
return cm.defaultCluster.GetEventRecorderFor(name)
260322
}
261323

262324
func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
263-
return cm.cluster.GetRESTMapper()
325+
return cm.defaultCluster.GetRESTMapper()
264326
}
265327

266328
func (cm *controllerManager) GetAPIReader() client.Reader {
267-
return cm.cluster.GetAPIReader()
329+
return cm.defaultCluster.GetAPIReader()
268330
}
269331

270332
func (cm *controllerManager) GetWebhookServer() webhook.Server {
@@ -381,7 +443,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
381443
}()
382444

383445
// Add the cluster runnable.
384-
if err := cm.add(cm.cluster); err != nil {
446+
if err := cm.add(cm.defaultCluster); err != nil {
385447
return fmt.Errorf("failed to add cluster to runnables: %w", err)
386448
}
387449

@@ -614,6 +676,70 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector,
614676
return leaderElector, nil
615677
}
616678

679+
func (cm *controllerManager) Engage(ctx context.Context, cl cluster.Cluster) error {
680+
cm.Lock()
681+
defer cm.Unlock()
682+
683+
// be reentrant via noop
684+
cm.engagedClustersLock.RLock()
685+
if _, ok := cm.engagedClusters[cl.Name()]; ok {
686+
cm.engagedClustersLock.RUnlock()
687+
return nil
688+
}
689+
cm.engagedClustersLock.RUnlock()
690+
691+
// add early because any engaged runnable could access it
692+
cm.engagedClustersLock.Lock()
693+
cm.engagedClusters[cl.Name()] = cl
694+
cm.engagedClustersLock.Unlock()
695+
696+
// engage known runnables
697+
var errs []error
698+
engaged := []cluster.Aware{}
699+
for _, r := range cm.clusterAwareRunnables {
700+
if err := r.Engage(ctx, cl); err != nil {
701+
errs = append(errs, err)
702+
break
703+
}
704+
engaged = append(engaged, r)
705+
}
706+
707+
// clean-up
708+
if len(errs) > 0 {
709+
for _, aware := range engaged {
710+
if err := aware.Disengage(ctx, cl); err != nil {
711+
errs = append(errs, err)
712+
}
713+
}
714+
715+
cm.engagedClustersLock.Lock()
716+
delete(cm.engagedClusters, cl.Name())
717+
cm.engagedClustersLock.Unlock()
718+
719+
return kerrors.NewAggregate(errs)
720+
}
721+
722+
return nil
723+
}
724+
725+
func (cm *controllerManager) Disengage(ctx context.Context, cl cluster.Cluster) error {
726+
cm.Lock()
727+
defer cm.Unlock()
728+
729+
var errs []error
730+
for _, r := range cm.clusterAwareRunnables {
731+
if err := r.Disengage(ctx, cl); err != nil {
732+
errs = append(errs, err)
733+
}
734+
}
735+
736+
cm.engagedClustersLock.Lock()
737+
delete(cm.engagedClusters, cl.Name())
738+
cm.engagedClustersLock.Unlock()
739+
740+
return kerrors.NewAggregate(errs)
741+
}
742+
617743
func (cm *controllerManager) startLeaderElectionRunnables() error {
618744
return cm.runnables.LeaderElection.Start(cm.internalCtx)
619745
}

pkg/manager/manager.go

+31-8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ type Manager interface {
5353
// Cluster holds a variety of methods to interact with a cluster.
5454
cluster.Cluster
5555

56+
// Aware is an interface for dynamic cluster addition and removal. The
57+
// Manager will call Engage and Disengage on cluster-aware runnables like
58+
// controllers to e.g. watch multiple clusters.
59+
cluster.Aware
60+
5661
// Add will set requested dependencies on the component, and cause the component to be
5762
// started when Start is called.
5863
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
@@ -87,6 +92,10 @@ type Manager interface {
8792
// lock was lost.
8893
Start(ctx context.Context) error
8994

95+
// GetCluster retrieves a Cluster from a given identifying cluster name. An
96+
// empty string will return the default cluster of the manager.
97+
GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error)
98+
9099
// GetWebhookServer returns a webhook.Server
91100
GetWebhookServer() webhook.Server
92101

@@ -281,6 +290,11 @@ type Options struct {
281290
newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error)
282291
newHealthProbeListener func(addr string) (net.Listener, error)
283292
newPprofListener func(addr string) (net.Listener, error)
293+
294+
// ExperimentalClusterProvider is an EXPERIMENTAL feature that allows the manager to
295+
// operate against many Kubernetes clusters at once. Individual clusters can
296+
// be accessed by calling GetCluster on the Manager.
297+
ExperimentalClusterProvider cluster.Provider
284298
}
285299

286300
// BaseContextFunc is a function used to provide a base Context to Runnables
@@ -325,7 +339,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
325339
// Set default values for options fields
326340
options = setOptionsDefaults(options)
327341

328-
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
342+
clusterOptions := func(clusterOptions *cluster.Options) {
329343
clusterOptions.Scheme = options.Scheme
330344
clusterOptions.MapperProvider = options.MapperProvider
331345
clusterOptions.Logger = options.Logger
@@ -334,7 +348,9 @@ func New(config *rest.Config, options Options) (Manager, error) {
334348
clusterOptions.Cache = options.Cache
335349
clusterOptions.Client = options.Client
336350
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
337-
})
351+
}
352+
353+
cl, err := cluster.New(config, clusterOptions)
338354
if err != nil {
339355
return nil, err
340356
}
@@ -347,7 +363,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
347363
// Create the recorder provider to inject event recorders for the components.
348364
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
349365
// to the particular controller that it's being injected into, rather than a generic one like is here.
350-
recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
366+
recorderProvider, err := options.newRecorderProvider(config, cl.GetHTTPClient(), cl.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
351367
if err != nil {
352368
return nil, err
353369
}
@@ -361,7 +377,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
361377
leaderRecorderProvider = recorderProvider
362378
} else {
363379
leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)
364-
scheme := cluster.GetScheme()
380+
scheme := cl.GetScheme()
365381
err := corev1.AddToScheme(scheme)
366382
if err != nil {
367383
return nil, err
@@ -396,7 +412,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
396412
}
397413

398414
// Create the metrics server.
399-
metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient())
415+
metricsServer, err := options.newMetricsServer(options.Metrics, config, cl.GetHTTPClient())
400416
if err != nil {
401417
return nil, err
402418
}
@@ -416,11 +432,13 @@ func New(config *rest.Config, options Options) (Manager, error) {
416432
}
417433

418434
errChan := make(chan error, 1)
419-
runnables := newRunnables(options.BaseContext, errChan)
420435
return &controllerManager{
421436
stopProcedureEngaged: ptr.To(int64(0)),
422-
cluster: cluster,
423-
runnables: runnables,
437+
defaultCluster: cl,
438+
defaultClusterOptions: clusterOptions,
439+
clusterProvider: options.ExperimentalClusterProvider,
440+
engagedClusters: make(map[string]cluster.Cluster),
441+
runnables: newRunnables(options.BaseContext, errChan),
424442
errChan: errChan,
425443
recorderProvider: recorderProvider,
426444
resourceLock: resourceLock,
@@ -551,5 +569,10 @@ func setOptionsDefaults(options Options) Options {
551569
options.WebhookServer = webhook.NewServer(webhook.Options{})
552570
}
553571

572+
if options.Controller.EngageWithDefaultCluster == nil {
573+
options.Controller.EngageWithDefaultCluster = ptr.To(options.ExperimentalClusterProvider == nil)
574+
options.Controller.EngageWithProviderClusters = ptr.To(options.ExperimentalClusterProvider != nil)
575+
}
576+
554577
return options
555578
}

pkg/manager/manager_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1823,31 +1823,31 @@ var _ = Describe("manger.Manager", func() {
18231823
Expect(err).NotTo(HaveOccurred())
18241824
mgr, ok := m.(*controllerManager)
18251825
Expect(ok).To(BeTrue())
1826-
Expect(m.GetConfig()).To(Equal(mgr.cluster.GetConfig()))
1826+
Expect(m.GetConfig()).To(Equal(mgr.defaultCluster.GetConfig()))
18271827
})
18281828

18291829
It("should provide a function to get the Client", func() {
18301830
m, err := New(cfg, Options{})
18311831
Expect(err).NotTo(HaveOccurred())
18321832
mgr, ok := m.(*controllerManager)
18331833
Expect(ok).To(BeTrue())
1834-
Expect(m.GetClient()).To(Equal(mgr.cluster.GetClient()))
1834+
Expect(m.GetClient()).To(Equal(mgr.defaultCluster.GetClient()))
18351835
})
18361836

18371837
It("should provide a function to get the Scheme", func() {
18381838
m, err := New(cfg, Options{})
18391839
Expect(err).NotTo(HaveOccurred())
18401840
mgr, ok := m.(*controllerManager)
18411841
Expect(ok).To(BeTrue())
1842-
Expect(m.GetScheme()).To(Equal(mgr.cluster.GetScheme()))
1842+
Expect(m.GetScheme()).To(Equal(mgr.defaultCluster.GetScheme()))
18431843
})
18441844

18451845
It("should provide a function to get the FieldIndexer", func() {
18461846
m, err := New(cfg, Options{})
18471847
Expect(err).NotTo(HaveOccurred())
18481848
mgr, ok := m.(*controllerManager)
18491849
Expect(ok).To(BeTrue())
1850-
Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer()))
1850+
Expect(m.GetFieldIndexer()).To(Equal(mgr.defaultCluster.GetFieldIndexer()))
18511851
})
18521852

18531853
It("should provide a function to get the EventRecorder", func() {

0 commit comments

Comments
 (0)