Skip to content

Commit d415e08

Browse files
committed
icinga2: Add created/deleted events & cache event extra tags
Host/Service groups are never supposed to change at runtime, thus we can use these two new events to regularly refresh our cache store. This eliminates the overhead of querying the groups with each ongoing event and should relax the Icinga 2 API a litle bit.
1 parent 6a57adf commit d415e08

File tree

6 files changed

+173
-22
lines changed

6 files changed

+173
-22
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ require (
2929
github.com/go-redis/redis/v8 v8.11.5 // indirect
3030
github.com/go-sql-driver/mysql v1.8.1 // indirect
3131
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f // indirect
32+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
3233
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect
3334
github.com/jessevdk/go-flags v1.5.0 // indirect
3435
github.com/lib/pq v1.10.9 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
3939
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
4040
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
4141
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
42+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
43+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
4244
github.com/icinga/icingadb v1.1.1-0.20230418113126-7c4b947aad3a h1:NfVdBKa4dhPk7IU8u0MOF6ywi0LDpMkQMGs1j803+3c=
4345
github.com/icinga/icingadb v1.1.1-0.20230418113126-7c4b947aad3a/go.mod h1:zamCKaKn4JJQinctcUyewTSNNXDfpLc0HSbqb+9lTYs=
4446
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 h1:iCHtR9CQyktQ5+f3dMVZfwD2KWJUgm7M0gdL9NGr8KA=

internal/icinga2/api_responses.go

+36
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"fmt"
66
"strconv"
7+
"strings"
78
"time"
89
)
910

@@ -157,6 +158,8 @@ const (
157158
typeDowntimeStarted = "DowntimeStarted"
158159
typeDowntimeTriggered = "DowntimeTriggered"
159160
typeFlapping = "Flapping"
161+
typeObjectCreated = "ObjectCreated"
162+
typeObjectDeleted = "ObjectDeleted"
160163
)
161164

162165
// StateChange represents the Icinga 2 API Event Stream StateChange response for host/service state changes.
@@ -273,6 +276,37 @@ type Flapping struct {
273276
IsFlapping bool `json:"is_flapping"`
274277
}
275278

279+
// CheckableCreatedDeleted represents the Icinga 2 API stream Checkable created/deleted response.
280+
//
281+
// NOTE:
282+
// - The ObjectName field may already contain the composed name of the checkable if the ObjectType is `Service`.
283+
// - The EventType field indicates which event type is currently being streamed and is either
284+
// set to typeObjectCreated or typeObjectDeleted.
285+
type CheckableCreatedDeleted struct {
286+
ObjectName string `json:"object_name"`
287+
ObjectType string `json:"object_type"`
288+
EventType string `json:"type"`
289+
}
290+
291+
// GetHostName returns the host name of this current response result.
292+
func (c *CheckableCreatedDeleted) GetHostName() string {
293+
if c.ObjectType == "Host" {
294+
return c.ObjectName
295+
}
296+
297+
return strings.Split(c.ObjectName, "!")[0]
298+
}
299+
300+
// GetServiceName returns the service of this current response result.
301+
// Returns empty string if the current result type is of type `Host`.
302+
func (c *CheckableCreatedDeleted) GetServiceName() string {
303+
if c.ObjectType == "Service" {
304+
return strings.Split(c.ObjectName, "!")[1]
305+
}
306+
307+
return ""
308+
}
309+
276310
// UnmarshalEventStreamResponse unmarshal a JSON response line from the Icinga 2 API Event Stream.
277311
//
278312
// The function expects an Icinga 2 API Event Stream Response in its JSON form and tries to unmarshal it into one of the
@@ -312,6 +346,8 @@ func UnmarshalEventStreamResponse(bytes []byte) (any, error) {
312346
resp = new(DowntimeTriggered)
313347
case typeFlapping:
314348
resp = new(Flapping)
349+
case typeObjectCreated, typeObjectDeleted:
350+
resp = new(CheckableCreatedDeleted)
315351
default:
316352
return nil, fmt.Errorf("unsupported type %q", responseType)
317353
}

internal/icinga2/api_responses_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,42 @@ func TestApiResponseUnmarshal(t *testing.T) {
608608
},
609609
},
610610
},
611+
{
612+
name: "objectcreated-host",
613+
jsonData: `{"object_name":"event-stream","object_type":"Host","timestamp":1716542256.769028,"type":"ObjectCreated"}`,
614+
expected: &CheckableCreatedDeleted{
615+
ObjectName: "event-stream",
616+
ObjectType: "Host",
617+
EventType: "ObjectCreated",
618+
},
619+
},
620+
{
621+
name: "objectcreated-service",
622+
jsonData: `{"object_name":"event-stream!ssh","object_type":"Service","timestamp":1716542256.783502,"type":"ObjectCreated"}`,
623+
expected: &CheckableCreatedDeleted{
624+
ObjectName: "event-stream!ssh",
625+
ObjectType: "Service",
626+
EventType: "ObjectCreated",
627+
},
628+
},
629+
{
630+
name: "objectdeleted-host",
631+
jsonData: `{"object_name":"event-stream","object_type":"Host","timestamp":1716542070.492318,"type":"ObjectDeleted"}`,
632+
expected: &CheckableCreatedDeleted{
633+
ObjectName: "event-stream",
634+
ObjectType: "Host",
635+
EventType: "ObjectDeleted",
636+
},
637+
},
638+
{
639+
name: "objectdeleted-service",
640+
jsonData: `{"object_name":"event-stream!ssh","object_type":"Service","timestamp":1716542070.492095,"type":"ObjectDeleted"}`,
641+
expected: &CheckableCreatedDeleted{
642+
ObjectName: "event-stream!ssh",
643+
ObjectType: "Service",
644+
EventType: "ObjectDeleted",
645+
},
646+
},
611647
}
612648

613649
for _, test := range tests {

internal/icinga2/client.go

+78-19
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ package icinga2
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"github.com/google/uuid"
8+
lru "github.com/hashicorp/golang-lru/v2"
79
"github.com/icinga/icinga-notifications/internal/event"
810
"go.uber.org/zap"
911
"golang.org/x/sync/errgroup"
12+
"math"
1013
"net/http"
1114
"net/url"
1215
"time"
@@ -64,6 +67,14 @@ type Client struct {
6467
eventDispatcherEventStream chan *eventMsg
6568
// catchupPhaseRequest requests the main worker to switch to the catch-up-phase to query the API for missed events.
6669
catchupPhaseRequest chan struct{}
70+
71+
// eventExtraTags is used to cache Checkable groups once they have been fetched from the Icinga 2 API so that they
72+
// don't have to be fetched over again with each ongoing event. Host/Service groups are never supposed to change at
73+
// runtime, so this cache is being refreshed once in a while when Icinga 2 dispatches an object created/deleted
74+
// event and thus should not overload the Icinga 2 API in a large environment with numerous Checkables.
75+
// The LRU cache size is defined as 2^17, and when the actual cached items reach this size, the oldest values
76+
// will simply be overwritten by the new ones.
77+
eventExtraTagsCache *lru.Cache[string, map[string]string]
6778
}
6879

6980
// buildCommonEvent creates an event.Event based on Host and (optional) Service attributes to be specified later.
@@ -78,10 +89,9 @@ type Client struct {
7889
// - ID
7990
func (client *Client) buildCommonEvent(ctx context.Context, host, service string) (*event.Event, error) {
8091
var (
81-
eventName string
82-
eventUrl *url.URL
83-
eventTags map[string]string
84-
eventExtraTags = make(map[string]string)
92+
eventName string
93+
eventUrl *url.URL
94+
eventTags map[string]string
8595
)
8696

8797
eventUrl, err := url.Parse(client.IcingaWebRoot)
@@ -99,14 +109,6 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
99109
"host": host,
100110
"service": service,
101111
}
102-
103-
serviceGroups, err := client.fetchHostServiceGroups(ctx, host, service)
104-
if err != nil {
105-
return nil, err
106-
}
107-
for _, serviceGroup := range serviceGroups {
108-
eventExtraTags["servicegroup/"+serviceGroup] = ""
109-
}
110112
} else {
111113
eventName = host
112114

@@ -118,12 +120,27 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
118120
}
119121
}
120122

121-
hostGroups, err := client.fetchHostServiceGroups(ctx, host, "")
122-
if err != nil {
123-
return nil, err
124-
}
125-
for _, hostGroup := range hostGroups {
126-
eventExtraTags["hostgroup/"+hostGroup] = ""
123+
extraTags := make(map[string]string)
124+
if existingExtraTags, ok := client.eventExtraTagsCache.Get(eventName); ok {
125+
extraTags = existingExtraTags
126+
} else {
127+
objectType := "Host"
128+
if service != "" {
129+
objectType = "Service"
130+
}
131+
132+
checkableResult := &CheckableCreatedDeleted{
133+
ObjectName: eventName,
134+
ObjectType: objectType,
135+
EventType: typeObjectCreated,
136+
}
137+
138+
if err := client.refreshExtraTagsCache(ctx, checkableResult); err != nil {
139+
return nil, err
140+
}
141+
if existingExtraTags, ok := client.eventExtraTagsCache.Get(eventName); ok {
142+
extraTags = existingExtraTags
143+
}
127144
}
128145

129146
return &event.Event{
@@ -132,10 +149,44 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
132149
Name: eventName,
133150
URL: eventUrl.String(),
134151
Tags: eventTags,
135-
ExtraTags: eventExtraTags,
152+
ExtraTags: extraTags,
136153
}, nil
137154
}
138155

156+
// refreshExtraTagsCache refreshes the client event extra tags cache store based on the given response result.
157+
func (client *Client) refreshExtraTagsCache(ctx context.Context, result *CheckableCreatedDeleted) error {
158+
switch result.EventType {
159+
case typeObjectDeleted:
160+
// The checkable has just been deleted, so delete all existing extra tags from our cache store as well.
161+
client.eventExtraTagsCache.Remove(result.ObjectName)
162+
case typeObjectCreated:
163+
extraTags := make(map[string]string)
164+
hostGroups, err := client.fetchHostServiceGroups(ctx, result.GetHostName(), "")
165+
if err != nil {
166+
return err
167+
}
168+
for _, hostGroup := range hostGroups {
169+
extraTags["hostgroup/"+hostGroup] = ""
170+
}
171+
172+
if result.ObjectType == "Service" {
173+
serviceGroups, err := client.fetchHostServiceGroups(ctx, result.GetHostName(), result.GetServiceName())
174+
if err != nil {
175+
return err
176+
}
177+
for _, serviceGroup := range serviceGroups {
178+
extraTags["servicegroup/"+serviceGroup] = ""
179+
}
180+
}
181+
182+
client.eventExtraTagsCache.Add(result.ObjectName, extraTags)
183+
default:
184+
return fmt.Errorf("cannot refresh object extra tags for event-stream type %s", result.EventType)
185+
}
186+
187+
return nil
188+
}
189+
139190
// buildHostServiceEvent constructs an event.Event based on a CheckResult, a Host or Service state, a Host name and an
140191
// optional Service name if the Event should represent a Service object.
141192
func (client *Client) buildHostServiceEvent(ctx context.Context, result CheckResult, state int, host, service string) (*event.Event, error) {
@@ -441,6 +492,14 @@ func (client *Client) Process() {
441492
client.eventDispatcherEventStream = make(chan *eventMsg)
442493
client.catchupPhaseRequest = make(chan struct{})
443494

495+
cache, err := lru.New[string, map[string]string](int(math.Exp2(17)))
496+
if err != nil {
497+
// Is unlikely to happen, as the only error being returned is triggered by
498+
// specifying negative numbers as the cache size.
499+
client.Logger.Fatalw("Failed to initialise event extra tags cache", zap.Error(err))
500+
}
501+
client.eventExtraTagsCache = cache
502+
444503
go client.worker()
445504

446505
for client.Ctx.Err() == nil {

internal/icinga2/client_api.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,19 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
309309
queueNameRndBuff := make([]byte, 16)
310310
_, _ = rand.Read(queueNameRndBuff)
311311

312-
reqBody, err := json.Marshal(map[string]any{
312+
reqBody := new(bytes.Buffer)
313+
encoder := json.NewEncoder(reqBody)
314+
// Since our encoded JSON isn't going to be used somewhere in a browser, we've to disable this feature
315+
// so that our filter expressions such as ‘&’ don't get replaced by some UTF-8 code points.
316+
encoder.SetEscapeHTML(false)
317+
318+
err := encoder.Encode(map[string]any{
313319
"queue": fmt.Sprintf("icinga-notifications-%x", queueNameRndBuff),
314320
"types": esTypes,
321+
"filter": fmt.Sprintf(
322+
"(event.type!=%q && event.type!=%q) || event.object_type==%q || event.object_type==%q",
323+
typeObjectCreated, typeObjectDeleted, "Host", "Service",
324+
),
315325
})
316326
if err != nil {
317327
return nil, err
@@ -322,7 +332,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
322332
// When leaving the function without an error, it is being called in connectEventStreamReadCloser.Close().
323333
reqCtx, reqCancel := context.WithCancel(client.Ctx)
324334

325-
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiUrl, bytes.NewReader(reqBody))
335+
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiUrl, reqBody)
326336
if err != nil {
327337
reqCancel()
328338
return nil, err
@@ -337,7 +347,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
337347
go func() {
338348
defer close(resCh)
339349

340-
client.Logger.Debug("Try to establish an Event Stream API connection")
350+
client.Logger.Debugw("Try to establish an Event Stream API connection", zap.String("request_body", reqBody.String()))
341351
httpClient := &http.Client{Transport: client.ApiHttpTransport}
342352
res, err := httpClient.Do(req)
343353
if err != nil {
@@ -398,6 +408,8 @@ func (client *Client) listenEventStream() error {
398408
typeDowntimeStarted,
399409
typeDowntimeTriggered,
400410
typeFlapping,
411+
typeObjectCreated,
412+
typeObjectDeleted,
401413
})
402414
if err != nil {
403415
return err
@@ -471,6 +483,11 @@ func (client *Client) listenEventStream() error {
471483
case *Flapping:
472484
ev, err = client.buildFlappingEvent(client.Ctx, respT.Host, respT.Service, respT.IsFlapping)
473485
evTime = respT.Timestamp.Time()
486+
case *CheckableCreatedDeleted:
487+
err = client.refreshExtraTagsCache(client.Ctx, respT)
488+
if err == nil {
489+
continue
490+
}
474491
default:
475492
err = fmt.Errorf("unsupported type %T", resp)
476493
}

0 commit comments

Comments
 (0)