diff --git a/vertical-pod-autoscaler/pkg/admission-controller/main.go b/vertical-pod-autoscaler/pkg/admission-controller/main.go index efb633bad6a6..b2850e88da03 100644 --- a/vertical-pod-autoscaler/pkg/admission-controller/main.go +++ b/vertical-pod-autoscaler/pkg/admission-controller/main.go @@ -38,6 +38,7 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" + vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" @@ -99,10 +100,12 @@ func main() { config := common.CreateKubeConfigOrDie(commonFlags.KubeConfig, float32(commonFlags.KubeApiQps), int(commonFlags.KubeApiBurst)) - vpaClient := vpa_clientset.NewForConfigOrDie(config) - vpaLister := vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), commonFlags.VpaObjectNamespace) kubeClient := kube_client.NewForConfigOrDie(config) factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod) + + vpaClient := vpa_clientset.NewForConfigOrDie(config) + vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlags.VpaObjectNamespace)) + targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory) controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) podPreprocessor := pod.NewDefaultPreProcessor() @@ -114,11 +117,13 @@ func main() { limitRangeCalculator = limitrange.NewNoopLimitsCalculator() } recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator)) + vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory) vpaMatcher := vpa.NewMatcher(vpaLister, targetSelectorFetcher, controllerFetcher) stopCh := make(chan struct{}) defer close(stopCh) factory.Start(stopCh) + vpaFactory.Start(stopCh) informerMap := factory.WaitForCacheSync(stopCh) for kind, synced := range informerMap { if !synced { @@ -126,6 +131,13 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } + vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh) + for kind, synced := range vpaInformerMap { + if !synced { + klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String())) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } hostname, err := os.Hostname() if err != nil { diff --git a/vertical-pod-autoscaler/pkg/recommender/main.go b/vertical-pod-autoscaler/pkg/recommender/main.go index cda2d831da8c..febc9c008943 100644 --- a/vertical-pod-autoscaler/pkg/recommender/main.go +++ b/vertical-pod-autoscaler/pkg/recommender/main.go @@ -41,6 +41,7 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/common" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" + vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/checkpoint" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input" @@ -244,13 +245,22 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm stopCh := make(chan struct{}) defer close(stopCh) config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst)) - kubeClient := kube_client.NewForConfigOrDie(config) clusterState := model.NewClusterState(aggregateContainerStateGCInterval) + + kubeClient := kube_client.NewForConfigOrDie(config) factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.VpaObjectNamespace)) + + vpaClient := vpa_clientset.NewForConfigOrDie(config) + vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlag.VpaObjectNamespace)) + + vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory) + vpacheckpointLister := vpa_api_util.NewVpaCheckpointListerFromFactory(vpaFactory) + controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh) factory.Start(stopCh) + vpaFactory.Start(stopCh) informerMap := factory.WaitForCacheSync(stopCh) for kind, synced := range informerMap { if !synced { @@ -258,6 +268,13 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } + vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh) + for kind, synced := range vpaInformerMap { + if !synced { + klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String())) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp)) @@ -296,8 +313,8 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm KubeClient: kubeClient, MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"), VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(), - VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace), - VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace), + VpaLister: vpaLister, + VpaCheckpointLister: vpacheckpointLister, ClusterState: clusterState, SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory), MemorySaveMode: *memorySaver, diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater.go b/vertical-pod-autoscaler/pkg/updater/logic/updater.go index ffe266d7cbd0..17c0de02d236 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater.go @@ -18,7 +18,6 @@ package logic import ( "context" - "fmt" "slices" "time" @@ -26,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" corescheme "k8s.io/client-go/kubernetes/scheme" @@ -88,6 +88,8 @@ type updater struct { func NewUpdater( kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, + kubeInformerFactory informers.SharedInformerFactory, + vpaLister vpa_lister.VerticalPodAutoscalerLister, minReplicasForEviction int, evictionRateLimit float64, evictionRateBurst int, @@ -106,18 +108,16 @@ func NewUpdater( evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) // TODO: Create in-place rate limits for the in-place rate limiter inPlaceRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) - factory, err := restriction.NewPodsRestrictionFactory( + factory := restriction.NewPodsRestrictionFactory( kubeClient, + kubeInformerFactory, minReplicasForEviction, evictionToleranceFraction, patchCalculators, ) - if err != nil { - return nil, fmt.Errorf("failed to create restriction factory: %v", err) - } return &updater{ - vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace), + vpaLister: vpaLister, podLister: newPodLister(kubeClient, namespace), eventRecorder: newEventRecorder(kubeClient), restrictionFactory: factory, diff --git a/vertical-pod-autoscaler/pkg/updater/main.go b/vertical-pod-autoscaler/pkg/updater/main.go index 8394fd54b29c..88b84874d38e 100644 --- a/vertical-pod-autoscaler/pkg/updater/main.go +++ b/vertical-pod-autoscaler/pkg/updater/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" + vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" @@ -178,21 +179,35 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst)) kubeClient := kube_client.NewForConfigOrDie(config) vpaClient := vpa_clientset.NewForConfigOrDie(config) - factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod) - targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory) - controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) + + kubeFactory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod) + vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlag.VpaObjectNamespace)) + vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory) + + targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, kubeFactory) + controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, kubeFactory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) var limitRangeCalculator limitrange.LimitRangeCalculator - limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(factory) + limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(kubeFactory) if err != nil { klog.ErrorS(err, "Failed to create limitRangeCalculator, falling back to not checking limits") limitRangeCalculator = limitrange.NewNoopLimitsCalculator() } - factory.Start(stopCh) - informerMap := factory.WaitForCacheSync(stopCh) - for kind, synced := range informerMap { + kubeFactory.Start(stopCh) + vpaFactory.Start(stopCh) + + kubeInformerMap := kubeFactory.WaitForCacheSync(stopCh) + for kind, synced := range kubeInformerMap { + if !synced { + klog.ErrorS(nil, fmt.Sprintf("Could not sync Kubernetes cache for the %s informer", kind.String())) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } + + vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh) + for kind, synced := range vpaInformerMap { if !synced { - klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String())) + klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String())) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } @@ -208,10 +223,11 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { calculators := []patch.Calculator{inplace.NewResourceInPlaceUpdatesCalculator(recommendationProvider), inplace.NewInPlaceUpdatedCalculator()} - // TODO: use SharedInformerFactory in updater updater, err := updater.NewUpdater( kubeClient, vpaClient, + kubeFactory, + vpaLister, *minReplicas, *evictionRateLimit, *evictionRateBurst, diff --git a/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory.go b/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory.go index 1defa05664a6..f2d3b86ec7ed 100644 --- a/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory.go +++ b/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory.go @@ -23,10 +23,8 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - appsinformer "k8s.io/client-go/informers/apps/v1" - coreinformer "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -34,10 +32,6 @@ import ( vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" ) -const ( - resyncPeriod time.Duration = 1 * time.Minute -) - // ControllerKind is the type of controller that can manage a pod. type controllerKind string @@ -65,10 +59,7 @@ type PodsRestrictionFactory interface { // PodsRestrictionFactoryImpl is the implementation of the PodsRestrictionFactory interface. type PodsRestrictionFactoryImpl struct { client kube_client.Interface - rcInformer cache.SharedIndexInformer // informer for Replication Controllers - ssInformer cache.SharedIndexInformer // informer for Stateful Sets - rsInformer cache.SharedIndexInformer // informer for Replica Sets - dsInformer cache.SharedIndexInformer // informer for Daemon Sets + informerFactory informers.SharedInformerFactory minReplicas int evictionToleranceFraction float64 clock clock.Clock @@ -77,41 +68,22 @@ type PodsRestrictionFactoryImpl struct { } // NewPodsRestrictionFactory creates a new PodsRestrictionFactory. -func NewPodsRestrictionFactory(client kube_client.Interface, minReplicas int, evictionToleranceFraction float64, patchCalculators []patch.Calculator) (PodsRestrictionFactory, error) { - rcInformer, err := setupInformer(client, replicationController) - if err != nil { - return nil, fmt.Errorf("failed to create rcInformer: %v", err) - } - ssInformer, err := setupInformer(client, statefulSet) - if err != nil { - return nil, fmt.Errorf("failed to create ssInformer: %v", err) - } - rsInformer, err := setupInformer(client, replicaSet) - if err != nil { - return nil, fmt.Errorf("failed to create rsInformer: %v", err) - } - dsInformer, err := setupInformer(client, daemonSet) - if err != nil { - return nil, fmt.Errorf("failed to create dsInformer: %v", err) - } +func NewPodsRestrictionFactory(client kube_client.Interface, informerFactory informers.SharedInformerFactory, minReplicas int, evictionToleranceFraction float64, patchCalculators []patch.Calculator) PodsRestrictionFactory { return &PodsRestrictionFactoryImpl{ client: client, - rcInformer: rcInformer, // informer for Replication Controllers - ssInformer: ssInformer, // informer for Stateful Sets - rsInformer: rsInformer, // informer for Replica Sets - dsInformer: dsInformer, // informer for Daemon Sets + informerFactory: informerFactory, minReplicas: minReplicas, evictionToleranceFraction: evictionToleranceFraction, clock: &clock.RealClock{}, lastInPlaceAttemptTimeMap: make(map[string]time.Time), patchCalculators: patchCalculators, - }, nil + } } func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) (int, error) { switch creator.Kind { case replicationController: - rcObj, exists, err := f.rcInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) + rcObj, exists, err := f.informerFactory.Core().V1().ReplicationControllers().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("replication controller %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } @@ -127,7 +99,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) } return int(*rc.Spec.Replicas), nil case replicaSet: - rsObj, exists, err := f.rsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) + rsObj, exists, err := f.informerFactory.Apps().V1().ReplicaSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("replica set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } @@ -143,7 +115,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) } return int(*rs.Spec.Replicas), nil case statefulSet: - ssObj, exists, err := f.ssInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) + ssObj, exists, err := f.informerFactory.Apps().V1().StatefulSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("stateful set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } @@ -159,7 +131,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) } return int(*ss.Spec.Replicas), nil case daemonSet: - dsObj, exists, err := f.dsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name) + dsObj, exists, err := f.informerFactory.Apps().V1().DaemonSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name) if err != nil { return 0, fmt.Errorf("daemon set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err) } @@ -299,33 +271,6 @@ func managingControllerRef(pod *apiv1.Pod) *metav1.OwnerReference { return &managingController } -func setupInformer(kubeClient kube_client.Interface, kind controllerKind) (cache.SharedIndexInformer, error) { - var informer cache.SharedIndexInformer - switch kind { - case replicationController: - informer = coreinformer.NewReplicationControllerInformer(kubeClient, apiv1.NamespaceAll, - resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - case replicaSet: - informer = appsinformer.NewReplicaSetInformer(kubeClient, apiv1.NamespaceAll, - resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - case statefulSet: - informer = appsinformer.NewStatefulSetInformer(kubeClient, apiv1.NamespaceAll, - resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - case daemonSet: - informer = appsinformer.NewDaemonSetInformer(kubeClient, apiv1.NamespaceAll, - resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - default: - return nil, fmt.Errorf("unknown controller kind: %v", kind) - } - stopCh := make(chan struct{}) - go informer.Run(stopCh) - synced := cache.WaitForCacheSync(stopCh, informer.HasSynced) - if !synced { - return nil, fmt.Errorf("failed to sync %v cache", kind) - } - return informer, nil -} - type singleGroupStats struct { configured int pending int diff --git a/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory_test.go b/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory_test.go index 9f22d0b43e9a..2e6043451dac 100644 --- a/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory_test.go +++ b/vertical-pod-autoscaler/pkg/updater/restriction/pods_restriction_factory_test.go @@ -26,10 +26,8 @@ import ( batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - appsinformer "k8s.io/client-go/informers/apps/v1" - coreinformer "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" baseclocktest "k8s.io/utils/clock/testing" @@ -707,45 +705,37 @@ func getRestrictionFactory(rc *apiv1.ReplicationController, rs *appsv1.ReplicaSe ss *appsv1.StatefulSet, ds *appsv1.DaemonSet, minReplicas int, evictionToleranceFraction float64, clock clock.Clock, lipuatm map[string]time.Time, patchCalculators []patch.Calculator) (PodsRestrictionFactory, error) { kubeClient := &fake.Clientset{} - rcInformer := coreinformer.NewReplicationControllerInformer(kubeClient, apiv1.NamespaceAll, - 0*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - rsInformer := appsinformer.NewReplicaSetInformer(kubeClient, apiv1.NamespaceAll, - 0*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - ssInformer := appsinformer.NewStatefulSetInformer(kubeClient, apiv1.NamespaceAll, - 0*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - dsInformer := appsinformer.NewDaemonSetInformer(kubeClient, apiv1.NamespaceAll, - 0*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second) + + // Create informers and get their stores to populate directly if rc != nil { - err := rcInformer.GetIndexer().Add(rc) - if err != nil { - return nil, fmt.Errorf("Error adding object to cache: %v", err) + rcInformer := informerFactory.Core().V1().ReplicationControllers().Informer() + if err := rcInformer.GetStore().Add(rc); err != nil { + return nil, fmt.Errorf("Error adding ReplicationController to store: %v", err) } } if rs != nil { - err := rsInformer.GetIndexer().Add(rs) - if err != nil { - return nil, fmt.Errorf("Error adding object to cache: %v", err) + rsInformer := informerFactory.Apps().V1().ReplicaSets().Informer() + if err := rsInformer.GetStore().Add(rs); err != nil { + return nil, fmt.Errorf("Error adding ReplicaSet to store: %v", err) } } if ss != nil { - err := ssInformer.GetIndexer().Add(ss) - if err != nil { - return nil, fmt.Errorf("Error adding object to cache: %v", err) + ssInformer := informerFactory.Apps().V1().StatefulSets().Informer() + if err := ssInformer.GetStore().Add(ss); err != nil { + return nil, fmt.Errorf("Error adding StatefulSet to store: %v", err) } } if ds != nil { - err := dsInformer.GetIndexer().Add(ds) - if err != nil { - return nil, fmt.Errorf("Error adding object to cache: %v", err) + dsInformer := informerFactory.Apps().V1().DaemonSets().Informer() + if err := dsInformer.GetStore().Add(ds); err != nil { + return nil, fmt.Errorf("Error adding DaemonSet to store: %v", err) } } return &PodsRestrictionFactoryImpl{ client: kubeClient, - rcInformer: rcInformer, - ssInformer: ssInformer, - rsInformer: rsInformer, - dsInformer: dsInformer, + informerFactory: informerFactory, minReplicas: minReplicas, evictionToleranceFraction: evictionToleranceFraction, clock: clock, diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index b30f3fc6039d..7cff2d216fa2 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -22,20 +22,17 @@ import ( "errors" "fmt" "strings" - "time" core "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" - vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" vpa_api "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/typed/autoscaling.k8s.io/v1" + vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions" vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/autoscaling.k8s.io/v1" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" ) @@ -77,64 +74,18 @@ func UpdateVpaStatusIfNeeded(vpaClient vpa_api.VerticalPodAutoscalerInterface, v return nil, nil } -// NewVpasLister returns VerticalPodAutoscalerLister configured to fetch all VPA objects from namespace, -// set namespace to k8sapiv1.NamespaceAll to select all namespaces. -// The method blocks until vpaLister is initially populated. -func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerLister { - vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalers", namespace, fields.Everything()) - informerOptions := cache.InformerOptions{ - ObjectType: &vpa_types.VerticalPodAutoscaler{}, - ListerWatcher: vpaListWatch, - Handler: &cache.ResourceEventHandlerFuncs{}, - ResyncPeriod: 1 * time.Hour, - Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - } - - store, controller := cache.NewInformerWithOptions(informerOptions) - indexer, ok := store.(cache.Indexer) - if !ok { - klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - vpaLister := vpa_lister.NewVerticalPodAutoscalerLister(indexer) - go controller.Run(stopChannel) - if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) { - klog.ErrorS(nil, "Failed to sync VPA cache during initialization") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } else { - klog.InfoS("Initial VPA synced successfully") - } - return vpaLister +// NewVpasListerFromFactory returns VerticalPodAutoscalerLister using a shared informer factory. +// This is the preferred method as it allows sharing informers and reduces resource usage. +// The factory must be started by the caller and cache sync should be waited for. +func NewVpasListerFromFactory(factory vpa_informers.SharedInformerFactory) vpa_lister.VerticalPodAutoscalerLister { + return factory.Autoscaling().V1().VerticalPodAutoscalers().Lister() } -// NewVpaCheckpointLister returns VerticalPodAutoscalerCheckpointLister configured to fetch all VPACheckpoint objects from namespace, -// set namespace to k8sapiv1.NamespaceAll to select all namespaces. -// The method blocks until vpaCheckpointLister is initially populated. -func NewVpaCheckpointLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerCheckpointLister { - vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalercheckpoints", namespace, fields.Everything()) - informerOptions := cache.InformerOptions{ - ObjectType: &vpa_types.VerticalPodAutoscalerCheckpoint{}, - ListerWatcher: vpaListWatch, - Handler: &cache.ResourceEventHandlerFuncs{}, - ResyncPeriod: 1 * time.Hour, - Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - } - - store, controller := cache.NewInformerWithOptions(informerOptions) - indexer, ok := store.(cache.Indexer) - if !ok { - klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - vpaCheckpointLister := vpa_lister.NewVerticalPodAutoscalerCheckpointLister(indexer) - go controller.Run(stopChannel) - if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) { - klog.ErrorS(nil, "Failed to sync VPA checkpoint cache during initialization") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } else { - klog.InfoS("Initial VPA checkpoint synced successfully") - } - return vpaCheckpointLister +// NewVpaCheckpointListerFromFactory returns VerticalPodAutoscalerCheckpointLister using a shared informer factory. +// This is the preferred method as it allows sharing informers and reduces resource usage. +// The factory must be started by the caller and cache sync should be waited for. +func NewVpaCheckpointListerFromFactory(factory vpa_informers.SharedInformerFactory) vpa_lister.VerticalPodAutoscalerCheckpointLister { + return factory.Autoscaling().V1().VerticalPodAutoscalerCheckpoints().Lister() } // PodMatchesVPA returns true iff the vpaWithSelector matches the Pod.