Skip to content

Commit 1fc76d6

Browse files
committed
eventstream: Event dispatcher
The just introduced replay logic felt a bit clumsy. Thus, I introduced a middle layer - the eventDispatcher method within its own goroutine - to receive all Events and decide if buffering should be performed.
1 parent cb377fe commit 1fc76d6

File tree

3 files changed

+53
-50
lines changed

3 files changed

+53
-50
lines changed

Diff for: internal/eventstream/client.go

+50-47
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ type Client struct {
4141
// Logger to log to.
4242
Logger *logging.Logger
4343

44+
// eventDispatch communicates Events to be processed between producer and consumer.
45+
eventDispatch chan *event.Event
46+
// replayTrigger signals the eventDispatcher method that the reconnection phase is finished.
47+
replayTrigger chan struct{}
4448
// replayPhase indicates that Events will be cached as the Event Stream Client is in the reconnection phase.
4549
replayPhase atomic.Bool
46-
// replayBuffer is the cache being populated during the reconnection phase and its mutex.
47-
replayBuffer []*event.Event
48-
replayBufferMutex sync.Mutex
4950
}
5051

5152
// NewClientsFromConfig returns all Clients defined in the conf.ConfigFile.
@@ -220,22 +221,44 @@ func (client *Client) buildAcknowledgementEvent(host, service, author, comment s
220221
return ev, nil
221222
}
222223

223-
// handleEvent checks and dispatches generated Events.
224-
func (client *Client) handleEvent(ev *event.Event) {
225-
if client.replayPhase.Load() {
226-
client.replayBufferMutex.Lock()
227-
client.replayBuffer = append(client.replayBuffer, ev)
228-
client.replayBufferMutex.Unlock()
229-
return
230-
}
224+
// eventDispatcher receives generated event.Events to be either buffered or directly delivered to the CallbackFn.
225+
//
226+
// When the Client is in the reconnection phase, indicated in the enterReconnectionPhase method, than all received Events
227+
// from the eventDispatch channel will be buffered until the replayTrigger fires.
228+
func (client *Client) eventDispatcher() {
229+
var reconnectionBuffer []*event.Event
230+
231+
for {
232+
select {
233+
case <-client.Ctx.Done():
234+
client.Logger.Warnw("Closing event dispatcher as context is done", zap.Error(client.Ctx.Err()))
235+
return
231236

232-
client.CallbackFn(ev)
237+
case <-client.replayTrigger:
238+
for _, ev := range reconnectionBuffer {
239+
client.CallbackFn(ev)
240+
}
241+
client.Logger.Debugf("Replayed %d events during reconnection phase", len(reconnectionBuffer))
242+
client.replayPhase.Store(false)
243+
reconnectionBuffer = []*event.Event{}
244+
client.Logger.Info("Finished reconnection phase and returning normal operation")
245+
246+
case ev := <-client.eventDispatch:
247+
if client.replayPhase.Load() {
248+
reconnectionBuffer = append(reconnectionBuffer, ev)
249+
} else {
250+
client.CallbackFn(ev)
251+
}
252+
}
253+
}
233254
}
234255

235-
func (client *Client) replayBufferedEvents() {
236-
client.replayBufferMutex.Lock()
237-
client.replayBuffer = make([]*event.Event, 0, 1024)
238-
client.replayBufferMutex.Unlock()
256+
// enterReconnectionPhase enters the reconnection phase.
257+
//
258+
// This method starts multiple goroutines. First, some workers to query the Icinga 2 Objects API will be launched. When
259+
// all of those have finished, the replayTrigger will be used to indicate that the buffered Events should be replayed.
260+
func (client *Client) enterReconnectionPhase() {
261+
client.Logger.Info("Entering reconnection phase to replay events")
239262
client.replayPhase.Store(true)
240263

241264
queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges}
@@ -253,38 +276,9 @@ func (client *Client) replayBufferedEvents() {
253276
}
254277
}
255278

256-
// Fork off the synchronization in a background goroutine to wait for all producers to finish. As the producers
257-
// check the Client's context, they should finish early and this should not deadlock.
258279
go func() {
259280
replayWg.Wait()
260-
client.Logger.Debug("Querying the Objects API for replaying finished")
261-
262-
if client.Ctx.Err() != nil {
263-
client.Logger.Warn("Aborting Objects API replaying as the context is done")
264-
return
265-
}
266-
267-
for {
268-
// Here is a race between filling the buffer from incoming Event Stream events and processing the buffered
269-
// events. Thus, the buffer will be reset to catch up what happened in between, as otherwise Events would be
270-
// processed out of order. Only when the buffer is empty, the replay mode will be reset.
271-
client.replayBufferMutex.Lock()
272-
tmpReplayBuffer := client.replayBuffer
273-
client.replayBuffer = make([]*event.Event, 0, 1024)
274-
client.replayBufferMutex.Unlock()
275-
276-
if len(tmpReplayBuffer) == 0 {
277-
break
278-
}
279-
280-
for _, ev := range tmpReplayBuffer {
281-
client.CallbackFn(ev)
282-
}
283-
client.Logger.Debugf("Replayed %d events", len(tmpReplayBuffer))
284-
}
285-
286-
client.replayPhase.Store(false)
287-
client.Logger.Debug("Finished replay")
281+
client.replayTrigger <- struct{}{}
288282
}()
289283
}
290284

@@ -294,8 +288,17 @@ func (client *Client) replayBufferedEvents() {
294288
// loop takes care of reconnections, all those events will be logged while generated Events will be dispatched to the
295289
// callback function.
296290
func (client *Client) Process() {
291+
// These two channels will be used to communicate the Events and are crucial. As there are multiple producers and
292+
// only one consumer, eventDispatcher, there is no ideal closer. However, producers and the consumer will be
293+
// finished by the Client's context. When this happens, the main application should either be stopped or the Client
294+
// is restarted, and we can hope for the GC. To make sure that nothing gets stuck, make the event channel buffered.
295+
client.eventDispatch = make(chan *event.Event, 1024)
296+
client.replayTrigger = make(chan struct{})
297+
297298
defer client.Logger.Info("Event Stream Client has stopped")
298299

300+
go client.eventDispatcher()
301+
299302
for {
300303
client.Logger.Info("Start listening on Icinga 2 Event Stream..")
301304
err := client.listenEventStream()
@@ -311,6 +314,6 @@ func (client *Client) Process() {
311314
return
312315
}
313316

314-
client.replayBufferedEvents()
317+
client.enterReconnectionPhase()
315318
}
316319
}

Diff for: internal/eventstream/client_api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (client *Client) checkMissedStateChanges(objType string) {
232232
return
233233
}
234234

235-
client.handleEvent(ev)
235+
client.eventDispatch <- ev
236236
})
237237
}
238238

@@ -254,7 +254,7 @@ func (client *Client) checkMissedAcknowledgements(objType string) {
254254
return
255255
}
256256

257-
client.handleEvent(ev)
257+
client.eventDispatch <- ev
258258
})
259259
}
260260

Diff for: internal/eventstream/client_es.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (client *Client) listenEventStream() error {
8888
return err
8989
}
9090

91-
client.handleEvent(ev)
91+
client.eventDispatch <- ev
9292
}
9393
return lineScanner.Err()
9494
}

0 commit comments

Comments
 (0)