Skip to content

Commit 0ede7b0

Browse files
committed
Try to do smarter handling of leader election
1 parent 0b46401 commit 0ede7b0

File tree

3 files changed

+62
-9
lines changed

3 files changed

+62
-9
lines changed

internal/cmd/api/api.go

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"maps"
78
"os"
89
"os/signal"
910
"syscall"
@@ -197,6 +198,7 @@ func run(ctx context.Context, cfg *Config, log logrus.FieldLogger) error {
197198
if err != nil {
198199
return fmt.Errorf("creating k8s clients: %w", err)
199200
}
201+
log.WithField("envs", maps.Keys(k8sClients)).Info("Start event watcher")
200202
eventWatcher := event.NewWatcher(pool, k8sClients, log)
201203
go eventWatcher.Run(ctx)
202204
}

internal/kubernetes/event/watcher.go

+39-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
eventsql "github.com/nais/api/internal/kubernetes/event/searchsql"
1515
"github.com/nais/api/internal/leaderelection"
1616
"github.com/sirupsen/logrus"
17-
"golang.org/x/sync/errgroup"
17+
"github.com/sourcegraph/conc/pool"
1818
eventv1 "k8s.io/api/events/v1"
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2020
"k8s.io/apimachinery/pkg/watch"
@@ -26,6 +26,9 @@ type Watcher struct {
2626
clients map[string]kubernetes.Interface
2727
events chan eventsql.UpsertParams
2828
log logrus.FieldLogger
29+
wg *pool.ContextPool
30+
31+
cancel context.CancelFunc
2932
}
3033

3134
func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log logrus.FieldLogger) *Watcher {
@@ -38,25 +41,50 @@ func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log
3841
}
3942

4043
func (w *Watcher) Run(ctx context.Context) {
41-
wg, ctx := errgroup.WithContext(ctx)
42-
for env, client := range w.clients {
43-
wg.Go(func() error {
44-
return w.run(ctx, env, client)
45-
})
44+
w.wg = pool.New().WithErrors().WithContext(ctx)
45+
46+
leaderelection.RegisterOnStartedLeading(w.onStartedLeading)
47+
leaderelection.RegisterOnStoppedLeading(w.onStoppedLeading)
48+
if leaderelection.IsLeader() {
49+
w.onStartedLeading(ctx)
4650
}
4751

48-
wg.Go(func() error {
52+
w.wg.Go(func(ctx context.Context) error {
4953
return w.batchInsert(ctx)
5054
})
5155

52-
if err := wg.Wait(); err != nil {
56+
if err := w.wg.Wait(); err != nil {
5357
w.log.WithError(err).Error("error running events watcher")
5458
}
5559
}
5660

61+
func (w *Watcher) onStoppedLeading() {
62+
if w.cancel != nil {
63+
w.cancel()
64+
}
65+
}
66+
67+
func (w *Watcher) onStartedLeading(_ context.Context) {
68+
if w.cancel != nil {
69+
w.cancel()
70+
}
71+
72+
cancel := make(chan struct{})
73+
74+
w.cancel = func() {
75+
close(cancel)
76+
}
77+
78+
for env, client := range w.clients {
79+
w.wg.Go(func(ctx context.Context) error {
80+
return w.run(ctx, env, client, cancel)
81+
})
82+
}
83+
}
84+
5785
var regHorizontalPodAutoscaler = regexp.MustCompile(`New size: (\d+); reason: (\w+).*(below|above) target`)
5886

59-
func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interface) error {
87+
func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interface, cancel chan struct{}) error {
6088
// Events we want to watch for
6189
// SuccessfulRescale - Check for successful rescale events
6290
// Killing - Check for liveness failures
@@ -103,6 +131,8 @@ func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interfa
103131
select {
104132
case <-ctx.Done():
105133
return nil
134+
case <-cancel:
135+
return nil
106136
case event := <-rescale.ResultChan():
107137
handleEvent(event, func(e *eventv1.Event) (eventsql.UpsertParams, bool) {
108138
if !strings.HasPrefix(e.Note, "New size") {

internal/leaderelection/leaderelection.go

+21
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ import (
1414

1515
var elector *leaderelection.LeaderElector
1616

17+
var callbacks = struct {
18+
onStartedLeading []func(context.Context)
19+
onStoppedLeading []func()
20+
}{}
21+
22+
func RegisterOnStartedLeading(f func(context.Context)) {
23+
callbacks.onStartedLeading = append(callbacks.onStartedLeading, f)
24+
}
25+
26+
func RegisterOnStoppedLeading(f func()) {
27+
callbacks.onStoppedLeading = append(callbacks.onStoppedLeading, f)
28+
}
29+
1730
func Start(ctx context.Context, client kubernetes.Interface, leaseName, namespace string, log logrus.FieldLogger) error {
1831
id, err := os.Hostname()
1932
if err != nil {
@@ -40,9 +53,17 @@ func Start(ctx context.Context, client kubernetes.Interface, leaseName, namespac
4053
Callbacks: leaderelection.LeaderCallbacks{
4154
OnStartedLeading: func(context.Context) {
4255
log.Info("Started leading")
56+
57+
for _, f := range callbacks.onStartedLeading {
58+
f(ctx)
59+
}
4360
},
4461
OnStoppedLeading: func() {
4562
log.Info("Stopped leading")
63+
64+
for _, f := range callbacks.onStoppedLeading {
65+
f()
66+
}
4667
},
4768
OnNewLeader: func(identity string) {
4869
log.Infof("New leader: %s", identity)

0 commit comments

Comments
 (0)