Skip to content

Commit 26a3295

Browse files
committed
eventstream: rework Client's channels
This change started by creating two separate channels for events from the Event Stream API and from the replay phase. During debugging, I came across multiple bugs, which are somewhat all addressed here. - I used buffered channels to pass events from the producers to the single dispatcher consumer, even as I should have known better. When the last replay producer has finished, another channel is used to communicate this state change. Because of my lazy buffered channel hack, the last events were raced by the finish signal. - After restoring (channel processing) order, the producers need a way to quickly exit when the consumer has finished. Thus, a both reading and writing switch - checking the context's Done channel - was introduced to all producers. - Some safeguard checks were introduced, which, e.g., detected the channel race error listed above. - Somehow during a prior refactoring, the Client.fetchServiceGroups method was broken to also query the host groups instead of, as its name says, the service groups. - My Docker-based testing environment sends SIGTERM instead of SIGINT, which, for other reasons, does not even reached the binary. Now SIGTERM is honored for the main context as well. - Some documentation especially regarding error messages had either typos or grammatically mistakes.
1 parent db8f8b7 commit 26a3295

File tree

3 files changed

+60
-43
lines changed

3 files changed

+60
-43
lines changed

cmd/icinga-notifications-daemon/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os"
1515
"os/signal"
1616
"runtime"
17+
"syscall"
1718
"time"
1819
)
1920

@@ -75,7 +76,7 @@ func main() {
7576
}
7677
}
7778

78-
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
79+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
7980
defer cancel()
8081

8182
runtimeConfig := config.NewRuntimeConfig(db, logs)

internal/eventstream/client.go

+40-30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7+
"errors"
78
"fmt"
89
"github.com/icinga/icinga-notifications/internal/config"
910
"github.com/icinga/icinga-notifications/internal/event"
@@ -21,12 +22,6 @@ import (
2122

2223
// This file contains the main resp. common methods for the Client.
2324

24-
// outgoingEvent is a wrapper around an event.Event and its producer's origin to be sent to the eventDispatcher.
25-
type outgoingEvent struct {
26-
event *event.Event
27-
fromEventStream bool
28-
}
29-
3025
// Client for the Icinga 2 Event Stream API with extended support for other Icinga 2 APIs to gather additional
3126
// information and allow a replay in case of a connection loss.
3227
type Client struct {
@@ -48,8 +43,11 @@ type Client struct {
4843
// Logger to log to.
4944
Logger *logging.Logger
5045

51-
// eventDispatch communicates Events to be processed between producer and consumer.
52-
eventDispatch chan *outgoingEvent
46+
// eventDispatcherEventStream communicates Events to be processed from the Event Stream API.
47+
eventDispatcherEventStream chan *event.Event
48+
// eventDispatcherReplay communicates Events to be processed from the Icinga 2 API replay during replay phase.
49+
eventDispatcherReplay chan *event.Event
50+
5351
// replayTrigger signals the eventDispatcher method that the replay phase is finished.
5452
replayTrigger chan struct{}
5553
// replayPhase indicates that Events will be cached as the Event Stream Client is in the replay phase.
@@ -192,22 +190,22 @@ func (client *Client) buildHostServiceEvent(result CheckResult, state int, host,
192190

193191
if service != "" {
194192
switch state {
195-
case 0:
193+
case 0: // OK
196194
eventSeverity = event.SeverityOK
197-
case 1:
195+
case 1: // WARNING
198196
eventSeverity = event.SeverityWarning
199-
case 2:
197+
case 2: // CRITICAL
200198
eventSeverity = event.SeverityCrit
201-
default:
199+
default: // UNKNOWN or faulty
202200
eventSeverity = event.SeverityErr
203201
}
204202
} else {
205203
switch state {
206-
case 0:
204+
case 0: // UP
207205
eventSeverity = event.SeverityOK
208-
case 1:
206+
case 1: // DOWN
209207
eventSeverity = event.SeverityCrit
210-
default:
208+
default: // faulty
211209
eventSeverity = event.SeverityErr
212210
}
213211
}
@@ -248,7 +246,7 @@ func (client *Client) eventDispatcher() {
248246
for {
249247
select {
250248
case <-client.Ctx.Done():
251-
client.Logger.Warnw("Closing event dispatcher as context is done", zap.Error(client.Ctx.Err()))
249+
client.Logger.Warnw("Closing event dispatcher as its context is done", zap.Error(client.Ctx.Err()))
252250
return
253251

254252
case <-client.replayTrigger:
@@ -259,12 +257,18 @@ func (client *Client) eventDispatcher() {
259257
client.replayPhase.Store(false)
260258
replayBuffer = []*event.Event{}
261259

262-
case ev := <-client.eventDispatch:
263-
if client.replayPhase.Load() && ev.fromEventStream {
264-
replayBuffer = append(replayBuffer, ev.event)
260+
case ev := <-client.eventDispatcherEventStream:
261+
if client.replayPhase.Load() {
262+
replayBuffer = append(replayBuffer, ev)
265263
} else {
266-
client.CallbackFn(ev.event)
264+
client.CallbackFn(ev)
265+
}
266+
267+
case ev := <-client.eventDispatcherReplay:
268+
if !client.replayPhase.Load() {
269+
client.Logger.Errorw("Dispatcher received replay event during normal operation", zap.Stringer("event", ev))
267270
}
271+
client.CallbackFn(ev)
268272
}
269273
}
270274
}
@@ -275,7 +279,10 @@ func (client *Client) eventDispatcher() {
275279
// all of those have finished, the replayTrigger will be used to indicate that the buffered Events should be replayed.
276280
func (client *Client) enterReplayPhase() {
277281
client.Logger.Info("Entering replay phase to replay stored events first")
278-
client.replayPhase.Store(true)
282+
if !client.replayPhase.CompareAndSwap(false, true) {
283+
client.Logger.Error("The Event Stream Client is already in the replay phase")
284+
return
285+
}
279286

280287
queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges}
281288
objTypes := []string{"host", "service"}
@@ -293,7 +300,9 @@ func (client *Client) enterReplayPhase() {
293300
}
294301

295302
go func() {
303+
startTime := time.Now()
296304
replayWg.Wait()
305+
client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime)))
297306
client.replayTrigger <- struct{}{}
298307
}()
299308
}
@@ -304,22 +313,23 @@ func (client *Client) enterReplayPhase() {
304313
// loop takes care of reconnections, all those events will be logged while generated Events will be dispatched to the
305314
// callback function.
306315
func (client *Client) Process() {
307-
// These two channels will be used to communicate the Events and are crucial. As there are multiple producers and
308-
// only one consumer, eventDispatcher, there is no ideal closer. However, producers and the consumer will be
309-
// finished by the Client's context. When this happens, the main application should either be stopped or the Client
310-
// is restarted, and we can hope for the GC. To make sure that nothing gets stuck, make the event channel buffered.
311-
client.eventDispatch = make(chan *outgoingEvent, 1024)
316+
client.eventDispatcherEventStream = make(chan *event.Event)
317+
client.eventDispatcherReplay = make(chan *event.Event)
312318
client.replayTrigger = make(chan struct{})
313319

314-
defer client.Logger.Info("Event Stream Client has stopped")
315-
316320
go client.eventDispatcher()
317321

318322
for {
319323
err := client.listenEventStream()
320-
if err != nil {
324+
switch {
325+
case errors.Is(err, context.Canceled):
326+
client.Logger.Warnw("Stopping Event Stream Client as its context is done", zap.Error(err))
327+
return
328+
329+
case err != nil:
321330
client.Logger.Errorw("Event Stream processing failed", zap.Error(err))
322-
} else {
331+
332+
default:
323333
client.Logger.Warn("Event Stream closed stream; maybe Icinga 2 is reloading")
324334
}
325335
}

internal/eventstream/client_api.go

+18-12
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (client *Client) queryObjectsApiQuery(objType string, query map[string]any)
9494
})
9595
}
9696

97-
// fetchHostGroup fetches all Host Groups for this host.
97+
// fetchHostGroups fetches all Host Groups for this host.
9898
func (client *Client) fetchHostGroups(host string) ([]string, error) {
9999
jsonRaw, err := client.queryObjectsApiDirect("host", host)
100100
if err != nil {
@@ -114,7 +114,7 @@ func (client *Client) fetchHostGroups(host string) ([]string, error) {
114114

115115
// fetchServiceGroups fetches all Service Groups for this service on this host.
116116
func (client *Client) fetchServiceGroups(host, service string) ([]string, error) {
117-
jsonRaw, err := client.queryObjectsApiDirect("host", host)
117+
jsonRaw, err := client.queryObjectsApiDirect("service", host+"!"+service)
118118
if err != nil {
119119
return nil, err
120120
}
@@ -231,15 +231,18 @@ func (client *Client) checkMissedChanges(objType, filterExpr string, attrsCallba
231231
// checkMissedStateChanges fetches all objects of the requested type and feeds them into the handler.
232232
func (client *Client) checkMissedStateChanges(objType string) {
233233
client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) {
234+
logger := client.Logger.With(zap.String("object type", objType))
235+
234236
ev, err := client.buildHostServiceEvent(attrs.LastCheckResult, attrs.State, host, service)
235237
if err != nil {
236-
client.Logger.Errorw("Failed to construct Event from API", zap.String("object type", objType), zap.Error(err))
238+
logger.Errorw("Failed to construct Event from API", zap.Error(err))
237239
return
238240
}
239241

240-
client.eventDispatch <- &outgoingEvent{
241-
event: ev,
242-
fromEventStream: false,
242+
select {
243+
case <-client.Ctx.Done():
244+
logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err()))
245+
case client.eventDispatcherReplay <- ev:
243246
}
244247
})
245248
}
@@ -264,9 +267,10 @@ func (client *Client) checkMissedAcknowledgements(objType string) {
264267
return
265268
}
266269

267-
client.eventDispatch <- &outgoingEvent{
268-
event: ev,
269-
fromEventStream: false,
270+
select {
271+
case <-client.Ctx.Done():
272+
logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err()))
273+
case client.eventDispatcherReplay <- ev:
270274
}
271275
})
272276
}
@@ -356,9 +360,11 @@ func (client *Client) listenEventStream() error {
356360
return err
357361
}
358362

359-
client.eventDispatch <- &outgoingEvent{
360-
event: ev,
361-
fromEventStream: true,
363+
select {
364+
case <-client.Ctx.Done():
365+
client.Logger.Warnw("Cannot dispatch Event Stream event as context is finished", zap.Error(client.Ctx.Err()))
366+
return client.Ctx.Err()
367+
case client.eventDispatcherEventStream <- ev:
362368
}
363369
}
364370
return lineScanner.Err()

0 commit comments

Comments
 (0)