@@ -3,7 +3,6 @@ package event
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
- "errors"
7
6
"fmt"
8
7
"regexp"
9
8
"strings"
@@ -29,7 +28,8 @@ type Watcher struct {
29
28
log logrus.FieldLogger
30
29
wg * pool.ContextPool
31
30
32
- cancel context.CancelFunc
31
+ // State returns true when the watcher should be started/continue running and false when it should stop.
32
+ state []chan bool
33
33
}
34
34
35
35
func NewWatcher (pool * pgxpool.Pool , clients map [string ]kubernetes.Interface , log logrus.FieldLogger ) * Watcher {
@@ -42,75 +42,78 @@ func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log
42
42
events : make (chan eventsql.UpsertParams , 20 ),
43
43
queries : eventsql .New (pool ),
44
44
log : log ,
45
+ state : chs ,
45
46
}
46
47
}
47
48
48
49
func (w * Watcher ) Run (ctx context.Context ) {
49
- w .wg = pool .New ().WithContext (ctx )
50
+ w .wg = pool .New ().WithErrors (). WithContext (ctx )
50
51
51
52
leaderelection .RegisterOnStartedLeading (w .onStartedLeading )
52
53
leaderelection .RegisterOnStoppedLeading (w .onStoppedLeading )
53
54
if leaderelection .IsLeader () {
54
- w .log .Debug ("Is already leader, force start" )
55
55
w .onStartedLeading (ctx )
56
56
}
57
57
58
58
w .wg .Go (func (ctx context.Context ) error {
59
59
return w .batchInsert (ctx )
60
60
})
61
61
62
+ i := 0
63
+ for env , client := range w .clients {
64
+ ch := w .state [i ]
65
+ i ++
66
+ w .wg .Go (func (ctx context.Context ) error {
67
+ return w .run (ctx , env , client , ch )
68
+ })
69
+ }
70
+
62
71
if err := w .wg .Wait (); err != nil {
63
72
w .log .WithError (err ).Error ("error running events watcher" )
64
73
}
65
74
}
66
75
67
76
func (w * Watcher ) onStoppedLeading () {
68
- w . log . Debug ( "onStoppedLeading..." )
69
- if w . cancel != nil {
70
- w . cancel ()
71
- w . cancel = nil
72
-
73
- w . log . Debug ( "cancelling" )
77
+ for _ , ch := range w . state {
78
+ select {
79
+ case ch <- false :
80
+ default :
81
+ w . log . WithField ( "state" , "stopped" ). Error ( "failed to send state" )
82
+ }
74
83
}
75
84
}
76
85
77
- func (w * Watcher ) onStartedLeading (ctx context.Context ) {
78
- if w .cancel != nil {
79
- w .cancel ()
80
- }
81
-
82
- go func () {
83
- time .Sleep (5 * time .Second )
84
- w .onStoppedLeading ()
85
- }()
86
-
87
- ctx , cancel := context .WithCancel (ctx )
88
- w .cancel = cancel
89
-
90
- for env , client := range w .clients {
91
- w .wg .Go (func (_ context.Context ) error {
92
- w .log .WithField ("env" , env ).Debug ("starting watcher" )
93
- return w .run (ctx , env , client )
94
- })
86
+ func (w * Watcher ) onStartedLeading (_ context.Context ) {
87
+ for _ , ch := range w .state {
88
+ select {
89
+ case ch <- true :
90
+ default :
91
+ w .log .WithField ("state" , "started" ).Error ("failed to send state" )
92
+ }
95
93
}
96
94
}
97
95
98
96
var regHorizontalPodAutoscaler = regexp .MustCompile (`New size: (\d+); reason: (\w+).*(below|above) target` )
99
97
100
- func (w * Watcher ) run (ctx context.Context , env string , client kubernetes.Interface ) error {
98
+ func (w * Watcher ) run (ctx context.Context , env string , client kubernetes.Interface , state chan bool ) error {
101
99
for {
102
- if err := w .watch (ctx , env , client ); err != nil {
103
- if errors .Is (err , context .Canceled ) {
104
- return nil
100
+ select {
101
+ case <- ctx .Done ():
102
+ return nil
103
+ case s := <- state :
104
+ w .log .WithField ("env" , env ).WithField ("state" , s ).Info ("state change" )
105
+ if s {
106
+ if err := w .watch (ctx , env , client , state ); err != nil {
107
+ w .log .WithError (err ).Error ("failed to watch events" )
108
+ }
109
+ w .log .WithField ("env" , env ).Info ("stopped watching" )
110
+
105
111
}
106
- w .log .WithError (err ).Error ("failed to watch events" )
107
112
}
108
- w .log .WithField ("env" , env ).Info ("stopped watching" )
109
- time .Sleep (2 * time .Second )
110
113
}
111
114
}
112
115
113
- func (w * Watcher ) watch (ctx context.Context , env string , client kubernetes.Interface ) error {
116
+ func (w * Watcher ) watch (ctx context.Context , env string , client kubernetes.Interface , state chan bool ) error {
114
117
// Events we want to watch for
115
118
// SuccessfulRescale - Check for successful rescale events
116
119
// Killing - Check for liveness failures
@@ -166,6 +169,10 @@ func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Inter
166
169
select {
167
170
case <- ctx .Done ():
168
171
return nil
172
+ case s := <- state :
173
+ if ! s {
174
+ return nil
175
+ }
169
176
case event := <- rescale .ResultChan ():
170
177
handleEvent (event , func (e * eventv1.Event ) (eventsql.UpsertParams , bool ) {
171
178
if ! strings .HasPrefix (e .Note , "New size" ) {
0 commit comments