@@ -18,12 +18,9 @@ import (
18
18
"context"
19
19
"fmt"
20
20
"log/slog"
21
- "os"
22
21
"time"
23
22
24
23
"github.com/blang/semver/v4"
25
- "github.com/go-kit/log"
26
- "github.com/go-kit/log/level"
27
24
"github.com/go-logr/logr"
28
25
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
29
26
promv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
@@ -53,6 +50,7 @@ const (
53
50
)
54
51
55
52
func NewPrometheusCRWatcher (ctx context.Context , logger logr.Logger , cfg allocatorconfig.Config ) (* PrometheusCRWatcher , error ) {
53
+ slogger := slog .New (logr .ToSlogHandler (logger ))
56
54
var resourceSelector * prometheus.ResourceSelector
57
55
mClient , err := monitoringclient .NewForConfig (cfg .ClusterConfig )
58
56
if err != nil {
@@ -92,9 +90,7 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
92
90
},
93
91
}
94
92
95
- promOperatorLogger := level .NewFilter (log .NewLogfmtLogger (os .Stderr ), level .AllowWarn ())
96
- promOperatorSlogLogger := slog .New (slog .NewTextHandler (os .Stderr , & slog.HandlerOptions {Level : slog .LevelWarn }))
97
- generator , err := prometheus .NewConfigGenerator (promOperatorLogger , prom , true )
93
+ generator , err := prometheus .NewConfigGenerator (slogger , prom , prometheus .WithEndpointSliceSupport ())
98
94
99
95
if err != nil {
100
96
return nil , err
@@ -112,21 +108,21 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
112
108
logger .Error (err , "Retrying namespace informer creation in promOperator CRD watcher" )
113
109
return true
114
110
}, func () error {
115
- nsMonInf , err = getNamespaceInformer (ctx , map [string ]struct {}{v1 .NamespaceAll : {}}, promOperatorLogger , clientset , operatorMetrics )
111
+ nsMonInf , err = getNamespaceInformer (ctx , map [string ]struct {}{v1 .NamespaceAll : {}}, slogger , clientset , operatorMetrics )
116
112
return err
117
113
})
118
114
if getNamespaceInformerErr != nil {
119
115
logger .Error (getNamespaceInformerErr , "Failed to create namespace informer in promOperator CRD watcher" )
120
116
return nil , getNamespaceInformerErr
121
117
}
122
118
123
- resourceSelector , err = prometheus .NewResourceSelector (promOperatorSlogLogger , prom , store , nsMonInf , operatorMetrics , eventRecorder )
119
+ resourceSelector , err = prometheus .NewResourceSelector (slogger , prom , store , nsMonInf , operatorMetrics , eventRecorder )
124
120
if err != nil {
125
121
logger .Error (err , "Failed to create resource selector in promOperator CRD watcher" )
126
122
}
127
123
128
124
return & PrometheusCRWatcher {
129
- logger : logger ,
125
+ logger : slogger ,
130
126
kubeMonitoringClient : mClient ,
131
127
k8sClient : clientset ,
132
128
informers : monitoringInformers ,
@@ -145,7 +141,7 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
145
141
}
146
142
147
143
type PrometheusCRWatcher struct {
148
- logger logr .Logger
144
+ logger * slog .Logger
149
145
kubeMonitoringClient monitoringclient.Interface
150
146
k8sClient kubernetes.Interface
151
147
informers map [string ]* informers.ForResource
@@ -162,7 +158,7 @@ type PrometheusCRWatcher struct {
162
158
store * assets.StoreBuilder
163
159
}
164
160
165
- func getNamespaceInformer (ctx context.Context , allowList map [string ]struct {}, promOperatorLogger log .Logger , clientset kubernetes.Interface , operatorMetrics * operator.Metrics ) (cache.SharedIndexInformer , error ) {
161
+ func getNamespaceInformer (ctx context.Context , allowList map [string ]struct {}, promOperatorLogger * slog .Logger , clientset kubernetes.Interface , operatorMetrics * operator.Metrics ) (cache.SharedIndexInformer , error ) {
166
162
kubernetesVersion , err := clientset .Discovery ().ServerVersion ()
167
163
if err != nil {
168
164
return nil , err
@@ -252,7 +248,7 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
252
248
} {
253
249
sync , err := k8sutil .LabelSelectionHasChanged (old .Labels , cur .Labels , selector )
254
250
if err != nil {
255
- w .logger .Error (err , "Failed to check label selection between namespaces while handling namespace updates" , "selector" , name )
251
+ w .logger .Error (err . Error () , "Failed to check label selection between namespaces while handling namespace updates" , "selector" , name )
256
252
return
257
253
}
258
254
@@ -273,17 +269,20 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
273
269
for name , resource := range w .informers {
274
270
resource .Start (w .stopChannel )
275
271
276
- if ok := cache .WaitForNamedCacheSync (name , w .stopChannel , resource .HasSynced ); ! ok {
277
- success = false
272
+ if ok := w .WaitForNamedCacheSync (name , resource .HasSynced ); ! ok {
273
+ w .logger .Info ("skipping informer" , "informer" , name )
274
+ continue
278
275
}
279
276
280
277
// only send an event notification if there isn't one already
281
278
resource .AddEventHandler (cache.ResourceEventHandlerFuncs {
282
279
// these functions only write to the notification channel if it's empty to avoid blocking
283
280
// if scrape config updates are being rate-limited
284
281
AddFunc : func (obj interface {}) {
282
+ w .logger .Info ("added" , "obj" , obj )
285
283
select {
286
284
case notifyEvents <- struct {}{}:
285
+ w .logger .Info ("added" )
287
286
default :
288
287
}
289
288
},
@@ -378,7 +377,7 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
378
377
"" ,
379
378
nil ,
380
379
nil ,
381
- monitoringv1.TSDBSpec {},
380
+ & monitoringv1.TSDBSpec {},
382
381
nil ,
383
382
nil ,
384
383
serviceMonitorInstances ,
@@ -415,3 +414,41 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
415
414
return promCfg , nil
416
415
}
417
416
}
417
+
418
+ // WaitForNamedCacheSync adds a timeout to the informer's wait for the cache to be ready.
419
+ // If the PrometheusCRWatcher is unable to load an informer within 15 seconds, the method is
420
+ // cancelled and returns false. A successful informer load will return true. This method also
421
+ // will be cancelled if the target allocator's stopChannel is called before it returns.
422
+ //
423
+ // This method is inspired by the upstream prometheus-operator implementation, with a shorter timeout
424
+ // and support for the PrometheusCRWatcher's stopChannel.
425
+ // https://github.com/prometheus-operator/prometheus-operator/blob/293c16c854ce69d1da9fdc8f0705de2d67bfdbfa/pkg/operator/operator.go#L433
426
+ func (w * PrometheusCRWatcher ) WaitForNamedCacheSync (controllerName string , inf cache.InformerSynced ) bool {
427
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 15 )
428
+ t := time .NewTicker (time .Second * 5 )
429
+ defer t .Stop ()
430
+
431
+ go func () {
432
+ for {
433
+ select {
434
+ case <- t .C :
435
+ w .logger .Debug ("cache sync not yet completed" )
436
+ case <- ctx .Done ():
437
+ return
438
+ case <- w .stopChannel :
439
+ w .logger .Warn ("stop received, shutting down cache syncing" )
440
+ cancel ()
441
+ return
442
+ }
443
+ }
444
+ }()
445
+
446
+ ok := cache .WaitForNamedCacheSync (controllerName , ctx .Done (), inf )
447
+ if ! ok {
448
+ w .logger .Error ("failed to sync cache" )
449
+ } else {
450
+ w .logger .Debug ("successfully synced cache" )
451
+ }
452
+
453
+ return ok
454
+ }
0 commit comments