Skip to content

Commit 111dec4

Browse files
committed
eventstream: replay State and ACK Changes together
Instead of querying both state and acknowledgement changes in parallel and risking replaying an ACK before the referenced state change event, acknowledgement events are now being generated only on demand after a state change event was emitted which holds an acknowledgement flag. This allowed some refactoring as now some methods have collided. Furthermore, the replay context - derived from the Client context - is now explicitly passed to all context-bound functions, allowing a strict termination in case of an early replay error. The other code path - Event Stream API - still uses the Client's main context.
1 parent 85f8d12 commit 111dec4

File tree

2 files changed

+84
-105
lines changed

2 files changed

+84
-105
lines changed

internal/eventstream/client.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func NewClientsFromConfig(
139139
// - Username
140140
// - Message
141141
// - ID
142-
func (client *Client) buildCommonEvent(host, service string) (*event.Event, error) {
142+
func (client *Client) buildCommonEvent(ctx context.Context, host, service string) (*event.Event, error) {
143143
var (
144144
eventName string
145145
eventUrl *url.URL
@@ -163,7 +163,7 @@ func (client *Client) buildCommonEvent(host, service string) (*event.Event, erro
163163
"service": service,
164164
}
165165

166-
serviceGroups, err := client.fetchServiceGroups(host, service)
166+
serviceGroups, err := client.fetchHostServiceGroups(ctx, host, service)
167167
if err != nil {
168168
return nil, err
169169
}
@@ -181,7 +181,7 @@ func (client *Client) buildCommonEvent(host, service string) (*event.Event, erro
181181
}
182182
}
183183

184-
hostGroups, err := client.fetchHostGroups(host)
184+
hostGroups, err := client.fetchHostServiceGroups(ctx, host, "")
185185
if err != nil {
186186
return nil, err
187187
}
@@ -201,7 +201,7 @@ func (client *Client) buildCommonEvent(host, service string) (*event.Event, erro
201201

202202
// buildHostServiceEvent constructs an event.Event based on a CheckResult, a Host or Service state, a Host name and an
203203
// optional Service name if the Event should represent a Service object.
204-
func (client *Client) buildHostServiceEvent(result CheckResult, state int, host, service string) (*event.Event, error) {
204+
func (client *Client) buildHostServiceEvent(ctx context.Context, result CheckResult, state int, host, service string) (*event.Event, error) {
205205
var eventSeverity event.Severity
206206

207207
if service != "" {
@@ -226,7 +226,7 @@ func (client *Client) buildHostServiceEvent(result CheckResult, state int, host,
226226
}
227227
}
228228

229-
ev, err := client.buildCommonEvent(host, service)
229+
ev, err := client.buildCommonEvent(ctx, host, service)
230230
if err != nil {
231231
return nil, err
232232
}
@@ -239,8 +239,8 @@ func (client *Client) buildHostServiceEvent(result CheckResult, state int, host,
239239
}
240240

241241
// buildAcknowledgementEvent from the given fields.
242-
func (client *Client) buildAcknowledgementEvent(host, service, author, comment string) (*event.Event, error) {
243-
ev, err := client.buildCommonEvent(host, service)
242+
func (client *Client) buildAcknowledgementEvent(ctx context.Context, host, service, author, comment string) (*event.Event, error) {
243+
ev, err := client.buildCommonEvent(ctx, host, service)
244244
if err != nil {
245245
return nil, err
246246
}
@@ -338,17 +338,18 @@ func (client *Client) enterReplayPhase() {
338338
return
339339
}
340340

341-
queryFns := []func(context.Context, string) error{client.checkMissedAcknowledgements, client.checkMissedStateChanges}
342-
objTypes := []string{"host", "service"}
343-
344341
group, groupCtx := errgroup.WithContext(client.Ctx)
345-
for _, fn := range queryFns {
346-
for _, objType := range objTypes {
347-
fn, objType := fn, objType // https://go.dev/doc/faq#closures_and_goroutines
348-
group.Go(func() error {
349-
return fn(groupCtx, objType)
350-
})
351-
}
342+
objTypes := []string{"host", "service"}
343+
for _, objType := range objTypes {
344+
objType := objType // https://go.dev/doc/faq#closures_and_goroutines
345+
group.Go(func() error {
346+
err := client.checkMissedChanges(groupCtx, objType)
347+
if err != nil {
348+
client.Logger.Errorw("Replaying API events resulted in errors",
349+
zap.String("object type", objType), zap.Error(err))
350+
}
351+
return err
352+
})
352353
}
353354

354355
go func() {

internal/eventstream/client_api.go

Lines changed: 66 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,18 @@ func extractObjectQueriesResult[T Comment | Downtime | HostServiceRuntimeAttribu
3939
}
4040

4141
// queryObjectsApi performs a configurable HTTP request against the Icinga 2 API and returns its raw response.
42-
func (client *Client) queryObjectsApi(urlPaths []string, method string, body io.Reader, headers map[string]string) (io.ReadCloser, error) {
42+
func (client *Client) queryObjectsApi(
43+
ctx context.Context,
44+
urlPaths []string,
45+
method string,
46+
body io.Reader,
47+
headers map[string]string,
48+
) (io.ReadCloser, error) {
4349
apiUrl, err := url.JoinPath(client.ApiHost, urlPaths...)
4450
if err != nil {
4551
return nil, err
4652
}
47-
req, err := http.NewRequestWithContext(client.Ctx, method, apiUrl, body)
53+
req, err := http.NewRequestWithContext(ctx, method, apiUrl, body)
4854
if err != nil {
4955
return nil, err
5056
}
@@ -72,22 +78,24 @@ func (client *Client) queryObjectsApi(urlPaths []string, method string, body io.
7278
}
7379

7480
// queryObjectsApiDirect performs a direct resp. "fast" API query against an object, optionally identified by its name.
75-
func (client *Client) queryObjectsApiDirect(objType, objName string) (io.ReadCloser, error) {
81+
func (client *Client) queryObjectsApiDirect(ctx context.Context, objType, objName string) (io.ReadCloser, error) {
7682
return client.queryObjectsApi(
83+
ctx,
7784
[]string{"/v1/objects/", objType + "s/", rawurlencode(objName)},
7885
http.MethodGet,
7986
nil,
8087
map[string]string{"Accept": "application/json"})
8188
}
8289

8390
// queryObjectsApiQuery sends a query to the Icinga 2 API /v1/objects to receive data of the given objType.
84-
func (client *Client) queryObjectsApiQuery(objType string, query map[string]any) (io.ReadCloser, error) {
91+
func (client *Client) queryObjectsApiQuery(ctx context.Context, objType string, query map[string]any) (io.ReadCloser, error) {
8592
reqBody, err := json.Marshal(query)
8693
if err != nil {
8794
return nil, err
8895
}
8996

9097
return client.queryObjectsApi(
98+
ctx,
9199
[]string{"/v1/objects/", objType + "s"},
92100
http.MethodPost,
93101
bytes.NewReader(reqBody),
@@ -98,27 +106,15 @@ func (client *Client) queryObjectsApiQuery(objType string, query map[string]any)
98106
})
99107
}
100108

101-
// fetchHostGroups fetches all Host Groups for this host.
102-
func (client *Client) fetchHostGroups(host string) ([]string, error) {
103-
jsonRaw, err := client.queryObjectsApiDirect("host", host)
104-
if err != nil {
105-
return nil, err
106-
}
107-
objQueriesResults, err := extractObjectQueriesResult[HostServiceRuntimeAttributes](jsonRaw)
108-
if err != nil {
109-
return nil, err
110-
}
111-
112-
if len(objQueriesResults) != 1 {
113-
return nil, fmt.Errorf("expected exactly one result for host %q instead of %d", host, len(objQueriesResults))
109+
// fetchHostServiceGroups fetches all Host or, if service is not empty, Service groups.
110+
func (client *Client) fetchHostServiceGroups(ctx context.Context, host, service string) ([]string, error) {
111+
objType, objName := "host", host
112+
if service != "" {
113+
objType = "service"
114+
objName += "!" + service
114115
}
115116

116-
return objQueriesResults[0].Attrs.Groups, nil
117-
}
118-
119-
// fetchServiceGroups fetches all Service Groups for this service on this host.
120-
func (client *Client) fetchServiceGroups(host, service string) ([]string, error) {
121-
jsonRaw, err := client.queryObjectsApiDirect("service", host+"!"+service)
117+
jsonRaw, err := client.queryObjectsApiDirect(ctx, objType, objName)
122118
if err != nil {
123119
return nil, err
124120
}
@@ -128,7 +124,8 @@ func (client *Client) fetchServiceGroups(host, service string) ([]string, error)
128124
}
129125

130126
if len(objQueriesResults) != 1 {
131-
return nil, fmt.Errorf("expected exactly one result for service %q instead of %d", host+"!"+service, len(objQueriesResults))
127+
return nil, fmt.Errorf("expected exactly one result for object type %q and %q instead of %d",
128+
objType, objName, len(objQueriesResults))
132129
}
133130

134131
return objQueriesResults[0].Attrs.Groups, nil
@@ -140,15 +137,15 @@ func (client *Client) fetchServiceGroups(host, service string) ([]string, error)
140137
// closest we can do, is query for Comments with the Acknowledgement Service Type and the host/service name. In addition,
141138
// the Host's resp. Service's AcknowledgementLastChange field has NOT the same timestamp as the Comment; there is a
142139
// difference of some milliseconds. As there might be even multiple ACK comments, we have to find the closest one.
143-
func (client *Client) fetchAcknowledgementComment(host, service string, ackTime time.Time) (*Comment, error) {
140+
func (client *Client) fetchAcknowledgementComment(ctx context.Context, host, service string, ackTime time.Time) (*Comment, error) {
144141
filterExpr := "comment.entry_type == 4 && comment.host_name == comment_host_name"
145142
filterVars := map[string]string{"comment_host_name": host}
146143
if service != "" {
147144
filterExpr += " && comment.service_name == comment_service_name"
148145
filterVars["comment_service_name"] = service
149146
}
150147

151-
jsonRaw, err := client.queryObjectsApiQuery("comment", map[string]any{"filter": filterExpr, "filter_vars": filterVars})
148+
jsonRaw, err := client.queryObjectsApiQuery(ctx, "comment", map[string]any{"filter": filterExpr, "filter_vars": filterVars})
152149
if err != nil {
153150
return nil, err
154151
}
@@ -175,39 +172,20 @@ func (client *Client) fetchAcknowledgementComment(host, service string, ackTime
175172

176173
// checkMissedChanges queries for Service or Host objects to handle missed elements.
177174
//
178-
// If a filterExpr is given (non-empty string), it will be used for the query. Otherwise, all objects will be requested.
179-
//
180-
// The callback function will be called f.e. object of the objType (i.e. "host" or "service") being retrieved from the
181-
// Icinga 2 Objects API sequentially. The callback function or a later caller decides if this object should be replayed.
182-
func (client *Client) checkMissedChanges(
183-
objType, filterExpr string,
184-
attrsCallbackFn func(attrs HostServiceRuntimeAttributes, host, service string) error,
185-
) (err error) {
186-
logger := client.Logger.With(zap.String("object type", objType), zap.String("filter expr", filterExpr))
187-
188-
defer func() {
189-
if err != nil {
190-
logger.Errorw("Querying API for replay failed", zap.Error(err))
191-
}
192-
}()
193-
194-
var jsonRaw io.ReadCloser
195-
if filterExpr == "" {
196-
jsonRaw, err = client.queryObjectsApiDirect(objType, "")
197-
} else {
198-
jsonRaw, err = client.queryObjectsApiQuery(objType, map[string]any{"filter": filterExpr})
199-
}
175+
// If the object's acknowledgement field is non-zero, an Acknowledgement Event will be constructed following the Host or
176+
// Service object.
177+
func (client *Client) checkMissedChanges(ctx context.Context, objType string) error {
178+
jsonRaw, err := client.queryObjectsApiDirect(ctx, objType, "")
200179
if err != nil {
201-
return
180+
return err
202181
}
203182

204183
objQueriesResults, err := extractObjectQueriesResult[HostServiceRuntimeAttributes](jsonRaw)
205184
if err != nil {
206-
return
185+
return err
207186
}
208187

209-
logger.Debugw("Querying API resulted in state changes", zap.Int("changes", len(objQueriesResults)))
210-
188+
var stateChangeEvents, acknowledgementEvents int
211189
for _, objQueriesResult := range objQueriesResults {
212190
var hostName, serviceName string
213191
switch objQueriesResult.Type {
@@ -219,58 +197,58 @@ func (client *Client) checkMissedChanges(
219197
serviceName = objQueriesResult.Attrs.Name
220198

221199
default:
222-
err = fmt.Errorf("querying API delivered a wrong object type %q", objQueriesResult.Type)
223-
return
200+
return fmt.Errorf("querying API delivered a wrong object type %q", objQueriesResult.Type)
224201
}
225202

226-
err = attrsCallbackFn(objQueriesResult.Attrs, hostName, serviceName)
203+
// State change event first
204+
ev, err := client.buildHostServiceEvent(
205+
ctx,
206+
objQueriesResult.Attrs.LastCheckResult, objQueriesResult.Attrs.State,
207+
hostName, serviceName)
227208
if err != nil {
228-
return
209+
return fmt.Errorf("failed to construct Event from Host/Service response, %w", err)
229210
}
230-
}
231-
return
232-
}
233-
234-
// checkMissedStateChanges fetches all objects of the requested type and feeds them into the handler.
235-
func (client *Client) checkMissedStateChanges(ctx context.Context, objType string) error {
236-
return client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) error {
237-
ev, err := client.buildHostServiceEvent(attrs.LastCheckResult, attrs.State, host, service)
238-
if err != nil {
239-
return fmt.Errorf("failed to construct Event from API, %w", err)
240-
}
241-
242211
select {
243212
case <-ctx.Done():
244213
return ctx.Err()
245-
case client.eventDispatcherReplay <- &eventMsg{ev, attrs.LastStateChange.Time}:
246-
return nil
214+
case client.eventDispatcherReplay <- &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time}:
215+
stateChangeEvents++
247216
}
248-
})
249-
}
250217

251-
// checkMissedAcknowledgements fetches all Host or Service Acknowledgements and feeds them into the handler.
252-
//
253-
// Currently only active acknowledgements are being processed.
254-
func (client *Client) checkMissedAcknowledgements(ctx context.Context, objType string) error {
255-
filterExpr := fmt.Sprintf("%s.acknowledgement", objType)
256-
return client.checkMissedChanges(objType, filterExpr, func(attrs HostServiceRuntimeAttributes, host, service string) error {
257-
ackComment, err := client.fetchAcknowledgementComment(host, service, attrs.AcknowledgementLastChange.Time)
258-
if err != nil {
259-
return fmt.Errorf("cannot fetch ACK Comment for Acknowledgement, %w", err)
218+
// Optional acknowledgement event second
219+
if objQueriesResult.Attrs.Acknowledgement == 0 {
220+
continue
260221
}
261222

262-
ev, err := client.buildAcknowledgementEvent(host, service, ackComment.Author, ackComment.Text)
223+
ackComment, err := client.fetchAcknowledgementComment(
224+
ctx,
225+
hostName, serviceName,
226+
objQueriesResult.Attrs.AcknowledgementLastChange.Time)
263227
if err != nil {
264-
return fmt.Errorf("failed to construct Event from Acknowledgement API, %w", err)
228+
return fmt.Errorf("fetching acknowledgement comment for %v failed, %w", ev, err)
265229
}
266230

231+
ev, err = client.buildAcknowledgementEvent(
232+
ctx,
233+
hostName, serviceName,
234+
ackComment.Author, ackComment.Text)
235+
if err != nil {
236+
return fmt.Errorf("failed to construct Event from Acknowledgement response, %w", err)
237+
}
267238
select {
268239
case <-ctx.Done():
269240
return ctx.Err()
270-
case client.eventDispatcherReplay <- &eventMsg{ev, attrs.AcknowledgementLastChange.Time}:
271-
return nil
241+
case client.eventDispatcherReplay <- &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time}:
242+
acknowledgementEvents++
272243
}
273-
})
244+
}
245+
246+
client.Logger.Infow("Replaying API emitted state changes",
247+
zap.String("object type", objType),
248+
zap.Int("state changes", stateChangeEvents),
249+
zap.Int("acknowledgements", acknowledgementEvents))
250+
251+
return nil
274252
}
275253

276254
// connectEventStream connects to the EventStream within an infinite loop until a connection was established.
@@ -390,11 +368,11 @@ func (client *Client) listenEventStream() error {
390368
)
391369
switch respT := resp.(type) {
392370
case *StateChange:
393-
ev, err = client.buildHostServiceEvent(respT.CheckResult, respT.State, respT.Host, respT.Service)
371+
ev, err = client.buildHostServiceEvent(client.Ctx, respT.CheckResult, respT.State, respT.Host, respT.Service)
394372
evTime = respT.Timestamp.Time
395373

396374
case *AcknowledgementSet:
397-
ev, err = client.buildAcknowledgementEvent(respT.Host, respT.Service, respT.Author, respT.Comment)
375+
ev, err = client.buildAcknowledgementEvent(client.Ctx, respT.Host, respT.Service, respT.Author, respT.Comment)
398376
evTime = respT.Timestamp.Time
399377

400378
// case *AcknowledgementCleared:

0 commit comments

Comments
 (0)