Skip to content

Commit 500b2a9

Browse files
committed
eventstream: rework replay communication logic
Prior, two variables - one signaling channel and an atomic.Bool - were holding and/or communicating if the eventDispatcher was in its replay phase. Especially the atomic.Bool variable was accessed both in the producer - enterReplayPhase - as well as the consumer - eventDispatcher. After this rework, the whole logic went into the main worker, previously named eventDispatcher. By using a single channel - replayPhaseRequest -, the worker might now switch to the replay phase. This eases and unifies the internal "API" as all communication with the worker takes place over unidirectional channels.
1 parent 111dec4 commit 500b2a9

File tree

2 files changed

+90
-65
lines changed

2 files changed

+90
-65
lines changed

internal/eventstream/client.go

Lines changed: 84 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"net/http"
1616
"net/url"
1717
"os"
18-
"sync/atomic"
1918
"time"
2019
)
2120

@@ -28,7 +27,15 @@ type eventMsg struct {
2827
}
2928

3029
// Client for the Icinga 2 Event Stream API with extended support for other Icinga 2 APIs to gather additional
31-
// information and allow a replay in case of a connection loss.
30+
// information and a replay either when starting up to catch up the Icinga's state or in case of a connection loss.
31+
//
32+
// Within the icinga-notifications scope, one or multiple Client instances can be generated from the configuration by
33+
// calling NewClientsFromConfig.
34+
//
35+
// A Client must be started by calling its Process method, which blocks until Ctx is marked as done. Reconnections and
36+
// the necessary state replaying from the Icinga 2 API will be taken care off. Internally, the Client executes a worker
37+
// within its own goroutine, which dispatches event.Event to the CallbackFn and enforces event.Event order during
38+
// replaying after (re-)connections.
3239
type Client struct {
3340
// ApiHost et al. configure where and how the Icinga 2 API can be reached.
3441
ApiHost string
@@ -52,11 +59,8 @@ type Client struct {
5259
eventDispatcherEventStream chan *eventMsg
5360
// eventDispatcherReplay communicates Events to be processed from the Icinga 2 API replay during replay phase.
5461
eventDispatcherReplay chan *eventMsg
55-
56-
// replayTrigger signals the eventDispatcher method that the replay phase is finished.
57-
replayTrigger chan struct{}
58-
// replayPhase indicates that Events will be cached as the Event Stream Client is in the replay phase.
59-
replayPhase atomic.Bool
62+
// replayPhaseRequest requests the main worker to switch to the replay phase and re-request the Icinga 2 API.
63+
replayPhaseRequest chan struct{}
6064
}
6165

6266
// NewClientsFromConfig returns all Clients defined in the conf.ConfigFile.
@@ -252,11 +256,50 @@ func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, servi
252256
return ev, nil
253257
}
254258

255-
// eventDispatcher receives generated event.Events to be either buffered or directly delivered to the CallbackFn.
259+
// startReplayWorker launches goroutines for replaying the Icinga 2 API state.
256260
//
257-
// When the Client is in the replay phase, events from the Event Stream API will be cached until the replay phase has
258-
// finished, while replayed events will be delivered directly.
259-
func (client *Client) eventDispatcher() {
261+
// When all launched workers have finished - either because all are done or one has failed and the others were
262+
// interrupted -, the returned channel will be closed. Those workers honor the Ctx and would also fail when the main
263+
// context is done.
264+
func (client *Client) startReplayWorker() chan struct{} {
265+
startTime := time.Now()
266+
group, groupCtx := errgroup.WithContext(client.Ctx)
267+
268+
objTypes := []string{"host", "service"}
269+
for _, objType := range objTypes {
270+
objType := objType // https://go.dev/doc/faq#closures_and_goroutines
271+
group.Go(func() error {
272+
err := client.checkMissedChanges(groupCtx, objType)
273+
if err != nil {
274+
client.Logger.Errorw("Replaying API events resulted in errors",
275+
zap.String("object type", objType), zap.Error(err))
276+
}
277+
return err
278+
})
279+
}
280+
281+
finCh := make(chan struct{})
282+
go func() {
283+
err := group.Wait()
284+
if err != nil {
285+
client.Logger.Errorw("Replaying the API resulted in errors", zap.Error(err), zap.Duration("duration", time.Since(startTime)))
286+
} else {
287+
client.Logger.Infow("Replaying the API has finished", zap.Duration("duration", time.Since(startTime)))
288+
}
289+
290+
close(finCh)
291+
}()
292+
293+
return finCh
294+
}
295+
296+
// worker is the Client's main background worker, taking care of event.Event dispatching and mode switching.
297+
//
298+
// When the Client is in the replay phase, requested by replayPhaseRequest, events from the Event Stream API will
299+
// be cached until the replay phase has finished, while replayed events will be delivered directly.
300+
//
301+
// Communication takes place over the channels eventDispatcherEventStream, eventDispatcherReplay and replayPhaseRequest.
302+
func (client *Client) worker() {
260303
// eventCache is a subset of event.Event used for caching those in replayCache below.
261304
type eventCache struct {
262305
SourceId int64
@@ -265,6 +308,11 @@ func (client *Client) eventDispatcher() {
265308
}
266309

267310
var (
311+
// replayFinCh holds a reference when the Client is in the replay phase. It will be closed from the producer,
312+
// startReplayWorker, when replaying is finished, which indicates the select below to switch phases. When this
313+
// variable is nil, the Client is in the normal operating phase.
314+
replayFinCh chan struct{}
315+
268316
// replayBuffer holds Event Stream events to be replayed after the replay phase has finished.
269317
replayBuffer = make([]*event.Event, 0)
270318
// replayCache maps eventCache derived from event.Events to API time to skip replaying outdated events.
@@ -283,10 +331,21 @@ func (client *Client) eventDispatcher() {
283331
for {
284332
select {
285333
case <-client.Ctx.Done():
286-
client.Logger.Warnw("Closing event dispatcher as its context is done", zap.Error(client.Ctx.Err()))
334+
client.Logger.Warnw("Closing down main worker as context is finished", zap.Error(client.Ctx.Err()))
287335
return
288336

289-
case <-client.replayTrigger:
337+
case <-client.replayPhaseRequest:
338+
if replayFinCh != nil {
339+
// There shouldn't be multiple concurrent startReplayWorker calls. However, technically this is possible,
340+
// i.e. when the Icinga 2 API connection is flapping.
341+
client.Logger.Error("Another replay phase request was sent while the Client is already replaying")
342+
break
343+
}
344+
345+
client.Logger.Debug("Dispatcher enters replay phase, starting caching Event Stream events")
346+
replayFinCh = client.startReplayWorker()
347+
348+
case <-replayFinCh:
290349
skipCounter := 0
291350
for _, ev := range replayBuffer {
292351
ts, ok := replayCache[eventCache{ev.SourceId, ev.Name, ev.Type}]
@@ -299,26 +358,26 @@ func (client *Client) eventDispatcher() {
299358

300359
client.CallbackFn(ev)
301360
}
302-
client.Logger.Infow("Finished replay phase, returning to normal operation",
361+
client.Logger.Infow("Dispatcher leaves replay phase, returning to normal operation",
303362
zap.Int("cached events", len(replayBuffer)), zap.Int("skipped events", skipCounter))
304363

364+
replayFinCh = nil
305365
replayBuffer = make([]*event.Event, 0)
306366
replayCache = make(map[eventCache]time.Time)
307-
client.replayPhase.Store(false)
308367

309368
case ev := <-client.eventDispatcherEventStream:
310-
if !client.replayPhase.Load() {
311-
client.CallbackFn(ev.event)
312-
continue
369+
if replayFinCh != nil {
370+
replayBuffer = append(replayBuffer, ev.event)
371+
replayCacheUpdate(ev)
372+
break
313373
}
314374

315-
replayBuffer = append(replayBuffer, ev.event)
316-
replayCacheUpdate(ev)
375+
client.CallbackFn(ev.event)
317376

318377
case ev := <-client.eventDispatcherReplay:
319-
if !client.replayPhase.Load() {
320-
client.Logger.Errorw("Dispatcher received replay event during normal operation", zap.Stringer("event", ev.event))
321-
continue
378+
if replayFinCh == nil {
379+
client.Logger.Errorw("Dispatcher received replay event outside of the replay phase", zap.Stringer("event", ev.event))
380+
break
322381
}
323382

324383
client.CallbackFn(ev.event)
@@ -327,45 +386,6 @@ func (client *Client) eventDispatcher() {
327386
}
328387
}
329388

330-
// enterReplayPhase enters the replay phase for the initial sync and after reconnections.
331-
//
332-
// This method starts multiple goroutines. First, some workers to query the Icinga 2 Objects API will be launched. When
333-
// all of those have finished, the replayTrigger will be used to indicate that the buffered Events should be replayed.
334-
func (client *Client) enterReplayPhase() {
335-
client.Logger.Info("Entering replay phase to replay stored events first")
336-
if !client.replayPhase.CompareAndSwap(false, true) {
337-
client.Logger.Error("The Event Stream Client is already in the replay phase")
338-
return
339-
}
340-
341-
group, groupCtx := errgroup.WithContext(client.Ctx)
342-
objTypes := []string{"host", "service"}
343-
for _, objType := range objTypes {
344-
objType := objType // https://go.dev/doc/faq#closures_and_goroutines
345-
group.Go(func() error {
346-
err := client.checkMissedChanges(groupCtx, objType)
347-
if err != nil {
348-
client.Logger.Errorw("Replaying API events resulted in errors",
349-
zap.String("object type", objType), zap.Error(err))
350-
}
351-
return err
352-
})
353-
}
354-
355-
go func() {
356-
startTime := time.Now()
357-
358-
err := group.Wait()
359-
if err != nil {
360-
client.Logger.Errorw("Replaying the API resulted in errors", zap.Error(err), zap.Duration("duration", time.Since(startTime)))
361-
} else {
362-
client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime)))
363-
}
364-
365-
client.replayTrigger <- struct{}{}
366-
}()
367-
}
368-
369389
// Process incoming objects and reconnect to the Event Stream with replaying objects if necessary.
370390
//
371391
// This method blocks as long as the Client runs, which, unless its context is cancelled, is forever. While its internal
@@ -374,9 +394,9 @@ func (client *Client) enterReplayPhase() {
374394
func (client *Client) Process() {
375395
client.eventDispatcherEventStream = make(chan *eventMsg)
376396
client.eventDispatcherReplay = make(chan *eventMsg)
377-
client.replayTrigger = make(chan struct{})
397+
client.replayPhaseRequest = make(chan struct{})
378398

379-
go client.eventDispatcher()
399+
go client.worker()
380400

381401
for {
382402
err := client.listenEventStream()

internal/eventstream/client_api.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,12 @@ func (client *Client) listenEventStream() error {
349349
defer cancel()
350350
defer func() { _ = response.Body.Close() }()
351351

352-
client.enterReplayPhase()
352+
select {
353+
case <-client.Ctx.Done():
354+
client.Logger.Warnw("Cannot request starting replay phase as context is finished", zap.Error(client.Ctx.Err()))
355+
return client.Ctx.Err()
356+
case client.replayPhaseRequest <- struct{}{}:
357+
}
353358

354359
client.Logger.Info("Start listening on Icinga 2 Event Stream..")
355360

0 commit comments

Comments
 (0)