6
6
"github.com/icinga/icinga-notifications/internal/event"
7
7
"go.uber.org/zap"
8
8
"golang.org/x/sync/errgroup"
9
+ "math"
9
10
"net/http"
10
11
"net/url"
11
12
"time"
@@ -21,8 +22,8 @@ type eventMsg struct {
21
22
22
23
// catchupEventMsg propagates either an eventMsg or an error back from the catch-up worker.
23
24
//
24
- // The type must be used as a sum-type like data structure holding either an eventMsg pointer or an error . The error
25
- // should have a higher precedence than the eventMsg.
25
+ // The type must be used as a sum-type like data structure holding either an error or an eventMsg pointer . The error has
26
+ // a higher precedence than the eventMsg.
26
27
type catchupEventMsg struct {
27
28
* eventMsg
28
29
error
@@ -196,7 +197,10 @@ func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, servi
196
197
//
197
198
// Those workers honor a context derived from the Client.Ctx and would either stop when this context is done or when the
198
199
// context.CancelFunc is called.
199
- func (client * Client ) startCatchupWorkers () (chan * catchupEventMsg , context.CancelFunc ) {
200
+ //
201
+ // The startup time might be delayed through the parameter. This lets the goroutines sleep to rate-limit reconnection
202
+ // attempts during network hiccups.
203
+ func (client * Client ) startCatchupWorkers (delay time.Duration ) (chan * catchupEventMsg , context.CancelFunc ) {
200
204
startTime := time .Now ()
201
205
catchupEventCh := make (chan * catchupEventMsg )
202
206
@@ -208,6 +212,12 @@ func (client *Client) startCatchupWorkers() (chan *catchupEventMsg, context.Canc
208
212
for _ , objType := range objTypes {
209
213
objType := objType // https://go.dev/doc/faq#closures_and_goroutines
210
214
group .Go (func () error {
215
+ select {
216
+ case <- ctx .Done ():
217
+ return ctx .Err ()
218
+ case <- time .After (delay ):
219
+ }
220
+
211
221
err := client .checkMissedChanges (groupCtx , objType , catchupEventCh )
212
222
if err != nil {
213
223
client .Logger .Errorw ("Catch-up-phase event worker failed" , zap .String ("object type" , objType ), zap .Error (err ))
@@ -261,16 +271,16 @@ func (client *Client) worker() {
261
271
// catchupCache maps event.Events.Name to API time to skip replaying outdated events.
262
272
catchupCache = make (map [string ]time.Time )
263
273
264
- // catchupErr might hold an error received from catchupEventCh, indicating another catch-up-phase run.
265
- catchupErr error
274
+ // catchupFailCounter indicates how many prior catch-up-phase attempts have failed. It will be used to
275
+ // rate limit catch-up-phase restarts.
276
+ catchupFailCounter int
266
277
)
267
278
268
279
// catchupReset resets all catchup variables to their initial empty state.
269
280
catchupReset := func () {
270
281
catchupEventCh , catchupCancel = nil , nil
271
282
catchupBuffer = make ([]* event.Event , 0 )
272
283
catchupCache = make (map [string ]time.Time )
273
- catchupErr = nil
274
284
}
275
285
276
286
// catchupCacheUpdate updates the catchupCache if this eventMsg seems to be the latest of its kind.
@@ -290,18 +300,13 @@ func (client *Client) worker() {
290
300
case <- client .catchupPhaseRequest :
291
301
if catchupEventCh != nil {
292
302
client .Logger .Warn ("Switching to catch-up-phase was requested while already catching up, restarting phase" )
293
-
294
- // Drain the old catch-up-phase producer channel until it is closed as its context will be canceled.
295
- go func (catchupEventCh chan * catchupEventMsg ) {
296
- for _ , ok := <- catchupEventCh ; ok ; {
297
- }
298
- }(catchupEventCh )
299
303
catchupCancel ()
300
304
}
301
305
302
306
client .Logger .Info ("Worker enters catch-up-phase, start caching up on Event Stream events" )
303
307
catchupReset ()
304
- catchupEventCh , catchupCancel = client .startCatchupWorkers ()
308
+ catchupEventCh , catchupCancel = client .startCatchupWorkers (
309
+ min (3 * time .Minute , time .Duration (math .Exp2 (float64 (catchupFailCounter ))- 1 )* time .Second ))
305
310
306
311
case catchupMsg , ok := <- catchupEventCh :
307
312
// Process an incoming event
@@ -311,9 +316,17 @@ func (client *Client) worker() {
311
316
break
312
317
}
313
318
314
- // Store an incoming error as the catchupErr to be processed below
319
+ // Abort and restart the catch-up-phase when receiving an error.
315
320
if ok && catchupMsg .error != nil {
316
- catchupErr = catchupMsg .error
321
+ client .Logger .Warnw ("Worker leaves catch-up-phase with an error, another attempt will be made" , zap .Error (catchupMsg .error ))
322
+ go func () {
323
+ select {
324
+ case <- client .Ctx .Done ():
325
+ case client .catchupPhaseRequest <- struct {}{}:
326
+ }
327
+ }()
328
+ catchupReset ()
329
+ catchupFailCounter ++
317
330
break
318
331
}
319
332
@@ -336,19 +349,9 @@ func (client *Client) worker() {
336
349
break
337
350
}
338
351
339
- if catchupErr != nil {
340
- client .Logger .Warnw ("Worker leaves catch-up-phase with an error, another attempt will be made" , zap .Error (catchupErr ))
341
- go func () {
342
- select {
343
- case <- client .Ctx .Done ():
344
- case client .catchupPhaseRequest <- struct {}{}:
345
- }
346
- }()
347
- } else {
348
- client .Logger .Info ("Worker leaves catch-up-phase, returning to normal operation" )
349
- }
350
-
352
+ client .Logger .Info ("Worker leaves catch-up-phase, returning to normal operation" )
351
353
catchupReset ()
354
+ catchupFailCounter = 0
352
355
353
356
case ev := <- client .eventDispatcherEventStream :
354
357
// During catch-up-phase, buffer Event Stream events
0 commit comments