Skip to content

Commit 14ebe27

Browse files
committed
Some events around receiving events
1 parent 351f855 commit 14ebe27

File tree

2 files changed

+55
-11
lines changed

2 files changed

+55
-11
lines changed

internal/cmd/api/api.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ func run(ctx context.Context, cfg *Config, log logrus.FieldLogger) error {
205205
}
206206

207207
log.WithField("envs", len(k8sClients)).Info("Start event watcher")
208-
eventWatcher := event.NewWatcher(pool, k8sClients, log)
208+
eventWatcher, err := event.NewWatcher(pool, k8sClients, log)
209+
if err != nil {
210+
return fmt.Errorf("creating event watcher: %w", err)
211+
}
209212
go eventWatcher.Run(ctx)
210213
}
211214

internal/kubernetes/event/watcher.go

+51-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
"github.com/nais/api/internal/leaderelection"
1616
"github.com/sirupsen/logrus"
1717
"github.com/sourcegraph/conc/pool"
18+
"go.opentelemetry.io/otel"
19+
"go.opentelemetry.io/otel/attribute"
20+
"go.opentelemetry.io/otel/metric"
1821
eventv1 "k8s.io/api/events/v1"
1922
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2023
"k8s.io/apimachinery/pkg/watch"
@@ -29,21 +32,37 @@ type Watcher struct {
2932
wg *pool.ContextPool
3033

3134
// State returns true when the watcher should be started/continue running and false when it should stop.
32-
state []chan bool
35+
state []chan bool
36+
eventsCounter metric.Int64Counter
37+
handlersCounter metric.Int64UpDownCounter
3338
}
3439

35-
func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log logrus.FieldLogger) *Watcher {
40+
func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log logrus.FieldLogger) (*Watcher, error) {
3641
chs := make([]chan bool, 0, len(clients))
3742
for range clients {
3843
chs = append(chs, make(chan bool, 1))
3944
}
40-
return &Watcher{
41-
clients: clients,
42-
events: make(chan eventsql.UpsertParams, 20),
43-
queries: eventsql.New(pool),
44-
log: log,
45-
state: chs,
45+
46+
meter := otel.GetMeterProvider().Meter("nais_api_k8s_events")
47+
eventsCounter, err := meter.Int64Counter("nais_api_k8s_events_total", metric.WithDescription("Number of events processed"))
48+
if err != nil {
49+
return nil, fmt.Errorf("creating events counter: %w", err)
4650
}
51+
52+
handlersCounter, err := meter.Int64UpDownCounter("nais_api_k8s_handlers", metric.WithDescription("number of goroutines handling events"))
53+
if err != nil {
54+
return nil, fmt.Errorf("creating handlers counter: %w", err)
55+
}
56+
57+
return &Watcher{
58+
clients: clients,
59+
events: make(chan eventsql.UpsertParams, 20),
60+
queries: eventsql.New(pool),
61+
log: log,
62+
state: chs,
63+
eventsCounter: eventsCounter,
64+
handlersCounter: handlersCounter,
65+
}, nil
4766
}
4867

4968
func (w *Watcher) Run(ctx context.Context) {
@@ -114,6 +133,9 @@ func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interfa
114133
}
115134

116135
func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Interface, state chan bool) error {
136+
w.handlersCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("environment", env)))
137+
defer w.handlersCounter.Add(ctx, -1, metric.WithAttributes(attribute.String("environment", env)))
138+
117139
// Events we want to watch for
118140
// SuccessfulRescale - Check for successful rescale events
119141
// Killing - Check for liveness failures
@@ -125,21 +147,28 @@ func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Inter
125147

126148
w.log.WithField("len", len(list.Items)).Debug("listed events")
127149

150+
closeAndDrain := func(w watch.Interface) {
151+
w.Stop()
152+
for range w.ResultChan() {
153+
// Drain the channel
154+
}
155+
}
156+
128157
rescale, err := client.EventsV1().Events("").Watch(ctx, metav1.ListOptions{
129158
FieldSelector: "reason=SuccessfulRescale,metadata.namespace!=nais-system",
130159
})
131160
if err != nil {
132161
return fmt.Errorf("failed to watch for rescale events: %w", err)
133162
}
134-
defer rescale.Stop()
163+
defer closeAndDrain(rescale)
135164

136165
killing, err := client.EventsV1().Events("").Watch(ctx, metav1.ListOptions{
137166
FieldSelector: "reason=Killing,metadata.namespace!=nais-system",
138167
})
139168
if err != nil {
140169
return fmt.Errorf("failed to watch for killing events: %w", err)
141170
}
142-
defer killing.Stop()
171+
defer closeAndDrain(killing)
143172

144173
handleEvent := func(event watch.Event, convert func(e *eventv1.Event) (eventsql.UpsertParams, bool)) {
145174
if event.Type != watch.Added && event.Type != watch.Modified {
@@ -174,6 +203,12 @@ func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Inter
174203
return nil
175204
}
176205
case event := <-rescale.ResultChan():
206+
w.eventsCounter.Add(ctx, 1, metric.WithAttributes(
207+
attribute.String("environment", string(env)),
208+
attribute.String("type", string(event.Type)),
209+
attribute.String("reason", "SuccessfulRescale")),
210+
)
211+
177212
handleEvent(event, func(e *eventv1.Event) (eventsql.UpsertParams, bool) {
178213
if !strings.HasPrefix(e.Note, "New size") {
179214
w.log.WithField("note", e.Note).Debug("ignoring event")
@@ -203,6 +238,12 @@ func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Inter
203238
return w.toUpsertParams(env, e, data)
204239
})
205240
case event := <-killing.ResultChan():
241+
w.eventsCounter.Add(ctx, 1, metric.WithAttributes(
242+
attribute.String("environment", string(env)),
243+
attribute.String("type", string(event.Type)),
244+
attribute.String("reason", "Killing")),
245+
)
246+
206247
handleEvent(event, func(e *eventv1.Event) (eventsql.UpsertParams, bool) {
207248
if strings.HasSuffix(e.Note, "failed liveness probe, will be restarted") {
208249
// Match `Container some-container-name failed liveness probe, will be restarted`

0 commit comments

Comments
 (0)