@@ -22,7 +22,7 @@ import (
22
22
"time"
23
23
24
24
"github.com/blang/semver/v4"
25
- "github.com/go-kit/log"
25
+ gokitlog "github.com/go-kit/log"
26
26
"github.com/go-kit/log/level"
27
27
"github.com/go-logr/logr"
28
28
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
@@ -53,6 +53,9 @@ const (
53
53
)
54
54
55
55
func NewPrometheusCRWatcher (ctx context.Context , logger logr.Logger , cfg allocatorconfig.Config ) (* PrometheusCRWatcher , error ) {
56
+ // TODO: Remove this after go 1.23 upgrade
57
+ promLogger := level .NewFilter (gokitlog .NewLogfmtLogger (os .Stderr ), level .AllowWarn ())
58
+ slogger := slog .New (logr .ToSlogHandler (logger ))
56
59
var resourceSelector * prometheus.ResourceSelector
57
60
mClient , err := monitoringclient .NewForConfig (cfg .ClusterConfig )
58
61
if err != nil {
@@ -79,18 +82,20 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
79
82
Spec : monitoringv1.PrometheusSpec {
80
83
CommonPrometheusFields : monitoringv1.CommonPrometheusFields {
81
84
ScrapeInterval : monitoringv1 .Duration (cfg .PrometheusCR .ScrapeInterval .String ()),
82
- ServiceMonitorSelector : cfg .PrometheusCR .ServiceMonitorSelector ,
83
85
PodMonitorSelector : cfg .PrometheusCR .PodMonitorSelector ,
84
- ServiceMonitorNamespaceSelector : cfg .PrometheusCR .ServiceMonitorNamespaceSelector ,
85
86
PodMonitorNamespaceSelector : cfg .PrometheusCR .PodMonitorNamespaceSelector ,
87
+ ServiceMonitorSelector : cfg .PrometheusCR .ServiceMonitorSelector ,
88
+ ServiceMonitorNamespaceSelector : cfg .PrometheusCR .ServiceMonitorNamespaceSelector ,
89
+ ScrapeConfigSelector : cfg .PrometheusCR .ScrapeConfigSelector ,
90
+ ScrapeConfigNamespaceSelector : cfg .PrometheusCR .ScrapeConfigNamespaceSelector ,
91
+ ProbeSelector : cfg .PrometheusCR .ProbeSelector ,
92
+ ProbeNamespaceSelector : cfg .PrometheusCR .ProbeNamespaceSelector ,
86
93
ServiceDiscoveryRole : & serviceDiscoveryRole ,
87
94
},
88
95
},
89
96
}
90
97
91
- promOperatorLogger := level .NewFilter (log .NewLogfmtLogger (os .Stderr ), level .AllowWarn ())
92
- promOperatorSlogLogger := slog .New (slog .NewTextHandler (os .Stderr , & slog.HandlerOptions {Level : slog .LevelWarn }))
93
- generator , err := prometheus .NewConfigGenerator (promOperatorLogger , prom , true )
98
+ generator , err := prometheus .NewConfigGenerator (promLogger , prom , true )
94
99
95
100
if err != nil {
96
101
return nil , err
@@ -108,21 +113,21 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
108
113
logger .Error (err , "Retrying namespace informer creation in promOperator CRD watcher" )
109
114
return true
110
115
}, func () error {
111
- nsMonInf , err = getNamespaceInformer (ctx , map [string ]struct {}{v1 .NamespaceAll : {}}, promOperatorLogger , clientset , operatorMetrics )
116
+ nsMonInf , err = getNamespaceInformer (ctx , map [string ]struct {}{v1 .NamespaceAll : {}}, promLogger , clientset , operatorMetrics )
112
117
return err
113
118
})
114
119
if getNamespaceInformerErr != nil {
115
120
logger .Error (getNamespaceInformerErr , "Failed to create namespace informer in promOperator CRD watcher" )
116
121
return nil , getNamespaceInformerErr
117
122
}
118
123
119
- resourceSelector , err = prometheus .NewResourceSelector (promOperatorSlogLogger , prom , store , nsMonInf , operatorMetrics , eventRecorder )
124
+ resourceSelector , err = prometheus .NewResourceSelector (slogger , prom , store , nsMonInf , operatorMetrics , eventRecorder )
120
125
if err != nil {
121
126
logger .Error (err , "Failed to create resource selector in promOperator CRD watcher" )
122
127
}
123
128
124
129
return & PrometheusCRWatcher {
125
- logger : logger ,
130
+ logger : slogger ,
126
131
kubeMonitoringClient : mClient ,
127
132
k8sClient : clientset ,
128
133
informers : monitoringInformers ,
@@ -133,13 +138,15 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
133
138
kubeConfigPath : cfg .KubeConfigFilePath ,
134
139
podMonitorNamespaceSelector : cfg .PrometheusCR .PodMonitorNamespaceSelector ,
135
140
serviceMonitorNamespaceSelector : cfg .PrometheusCR .ServiceMonitorNamespaceSelector ,
141
+ scrapeConfigNamespaceSelector : cfg .PrometheusCR .ScrapeConfigNamespaceSelector ,
142
+ probeNamespaceSelector : cfg .PrometheusCR .ProbeNamespaceSelector ,
136
143
resourceSelector : resourceSelector ,
137
144
store : store ,
138
145
}, nil
139
146
}
140
147
141
148
type PrometheusCRWatcher struct {
142
- logger logr .Logger
149
+ logger * slog .Logger
143
150
kubeMonitoringClient monitoringclient.Interface
144
151
k8sClient kubernetes.Interface
145
152
informers map [string ]* informers.ForResource
@@ -150,12 +157,13 @@ type PrometheusCRWatcher struct {
150
157
kubeConfigPath string
151
158
podMonitorNamespaceSelector * metav1.LabelSelector
152
159
serviceMonitorNamespaceSelector * metav1.LabelSelector
160
+ scrapeConfigNamespaceSelector * metav1.LabelSelector
161
+ probeNamespaceSelector * metav1.LabelSelector
153
162
resourceSelector * prometheus.ResourceSelector
154
163
store * assets.StoreBuilder
155
164
}
156
165
157
- func getNamespaceInformer (ctx context.Context , allowList map [string ]struct {}, promOperatorLogger log.Logger , clientset kubernetes.Interface , operatorMetrics * operator.Metrics ) (cache.SharedIndexInformer , error ) {
158
-
166
+ func getNamespaceInformer (ctx context.Context , allowList map [string ]struct {}, promOperatorLogger gokitlog.Logger , clientset kubernetes.Interface , operatorMetrics * operator.Metrics ) (cache.SharedIndexInformer , error ) {
159
167
kubernetesVersion , err := clientset .Discovery ().ServerVersion ()
160
168
if err != nil {
161
169
return nil , err
@@ -196,9 +204,21 @@ func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informe
196
204
return nil , err
197
205
}
198
206
207
+ probeInformers , err := informers .NewInformersForResource (factory , monitoringv1 .SchemeGroupVersion .WithResource (monitoringv1 .ProbeName ))
208
+ if err != nil {
209
+ return nil , err
210
+ }
211
+
212
+ scrapeConfigInformers , err := informers .NewInformersForResource (factory , promv1alpha1 .SchemeGroupVersion .WithResource (promv1alpha1 .ScrapeConfigName ))
213
+ if err != nil {
214
+ return nil , err
215
+ }
216
+
199
217
return map [string ]* informers.ForResource {
200
218
monitoringv1 .ServiceMonitorName : serviceMonitorInformers ,
201
219
monitoringv1 .PodMonitorName : podMonitorInformers ,
220
+ monitoringv1 .ProbeName : probeInformers ,
221
+ promv1alpha1 .ScrapeConfigName : scrapeConfigInformers ,
202
222
}, nil
203
223
}
204
224
@@ -210,7 +230,7 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
210
230
211
231
if w .nsInformer != nil {
212
232
go w .nsInformer .Run (w .stopChannel )
213
- if ok := cache .WaitForNamedCacheSync ("namespace" , w . stopChannel , w .nsInformer .HasSynced ); ! ok {
233
+ if ok := w .WaitForNamedCacheSync ("namespace" , w .nsInformer .HasSynced ); ! ok {
214
234
success = false
215
235
}
216
236
@@ -228,10 +248,12 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
228
248
for name , selector := range map [string ]* metav1.LabelSelector {
229
249
"PodMonitorNamespaceSelector" : w .podMonitorNamespaceSelector ,
230
250
"ServiceMonitorNamespaceSelector" : w .serviceMonitorNamespaceSelector ,
251
+ "ProbeNamespaceSelector" : w .probeNamespaceSelector ,
252
+ "ScrapeConfigNamespaceSelector" : w .scrapeConfigNamespaceSelector ,
231
253
} {
232
254
sync , err := k8sutil .LabelSelectionHasChanged (old .Labels , cur .Labels , selector )
233
255
if err != nil {
234
- w .logger .Error (err , "Failed to check label selection between namespaces while handling namespace updates" , "selector" , name )
256
+ w .logger .Error ("Failed to check label selection between namespaces while handling namespace updates" , "selector" , name , "error" , err )
235
257
return
236
258
}
237
259
@@ -252,8 +274,9 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
252
274
for name , resource := range w .informers {
253
275
resource .Start (w .stopChannel )
254
276
255
- if ok := cache .WaitForNamedCacheSync (name , w .stopChannel , resource .HasSynced ); ! ok {
256
- success = false
277
+ if ok := w .WaitForNamedCacheSync (name , resource .HasSynced ); ! ok {
278
+ w .logger .Info ("skipping informer" , "informer" , name )
279
+ continue
257
280
}
258
281
259
282
// only send an event notification if there isn't one already
@@ -342,6 +365,16 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
342
365
return nil , err
343
366
}
344
367
368
+ probeInstances , err := w .resourceSelector .SelectProbes (ctx , w .informers [monitoringv1 .ProbeName ].ListAllByNamespace )
369
+ if err != nil {
370
+ return nil , err
371
+ }
372
+
373
+ scrapeConfigInstances , err := w .resourceSelector .SelectScrapeConfigs (ctx , w .informers [promv1alpha1 .ScrapeConfigName ].ListAllByNamespace )
374
+ if err != nil {
375
+ return nil , err
376
+ }
377
+
345
378
generatedConfig , err := w .configGenerator .GenerateServerConfiguration (
346
379
"30s" ,
347
380
"" ,
@@ -352,8 +385,8 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
352
385
nil ,
353
386
serviceMonitorInstances ,
354
387
podMonitorInstances ,
355
- map [ string ] * monitoringv1. Probe {} ,
356
- map [ string ] * promv1alpha1. ScrapeConfig {} ,
388
+ probeInstances ,
389
+ scrapeConfigInstances ,
357
390
w .store ,
358
391
nil ,
359
392
nil ,
@@ -384,3 +417,41 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
384
417
return promCfg , nil
385
418
}
386
419
}
420
+
421
+ // WaitForNamedCacheSync adds a timeout to the informer's wait for the cache to be ready.
422
+ // If the PrometheusCRWatcher is unable to load an informer within 15 seconds, the method is
423
+ // cancelled and returns false. A successful informer load will return true. This method also
424
+ // will be cancelled if the target allocator's stopChannel is called before it returns.
425
+ //
426
+ // This method is inspired by the upstream prometheus-operator implementation, with a shorter timeout
427
+ // and support for the PrometheusCRWatcher's stopChannel.
428
+ // https://github.com/prometheus-operator/prometheus-operator/blob/293c16c854ce69d1da9fdc8f0705de2d67bfdbfa/pkg/operator/operator.go#L433
429
+ func (w * PrometheusCRWatcher ) WaitForNamedCacheSync (controllerName string , inf cache.InformerSynced ) bool {
430
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 15 )
431
+ t := time .NewTicker (time .Second * 5 )
432
+ defer t .Stop ()
433
+
434
+ go func () {
435
+ for {
436
+ select {
437
+ case <- t .C :
438
+ w .logger .Debug ("cache sync not yet completed" )
439
+ case <- ctx .Done ():
440
+ return
441
+ case <- w .stopChannel :
442
+ w .logger .Warn ("stop received, shutting down cache syncing" )
443
+ cancel ()
444
+ return
445
+ }
446
+ }
447
+ }()
448
+
449
+ ok := cache .WaitForNamedCacheSync (controllerName , ctx .Done (), inf )
450
+ if ! ok {
451
+ w .logger .Error ("failed to sync cache" )
452
+ } else {
453
+ w .logger .Debug ("successfully synced cache" )
454
+ }
455
+
456
+ return ok
457
+ }
0 commit comments