Skip to content

Commit a139330

Browse files
committed
Improve node watch handling
Signed-off-by: Markus Blaschke <[email protected]>
1 parent 9905b81 commit a139330

File tree

4 files changed

+46
-22
lines changed

4 files changed

+46
-22
lines changed

README.md

+10-7
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ Usage:
2323
kube-pool-manager [OPTIONS]
2424
2525
Application Options:
26-
--debug debug mode [$DEBUG]
27-
-v, --verbose verbose mode [$VERBOSE]
28-
--log.json Switch log output to json format [$LOG_JSON]
29-
--dry-run Dry run (do not apply to nodes) [$DRY_RUN]
30-
--config= Config path [$CONFIG]
31-
--bind= Server address (default: :8080) [$SERVER_BIND]
26+
--debug debug mode [$DEBUG]
27+
-v, --verbose verbose mode [$VERBOSE]
28+
--log.json Switch log output to json format [$LOG_JSON]
29+
--kube.node.labelselector= Node Label selector which nodes should be checked [$KUBE_NODE_LABELSELECTOR]
30+
--kube.watch.timeout= Timeout & full resync for node watch (time.Duration) (default: 24h)
31+
[$KUBE_WATCH_TIMEOUT]
32+
--dry-run Dry run (do not apply to nodes) [$DRY_RUN]
33+
--config= Config path [$CONFIG]
34+
--bind= Server address (default: :8080) [$SERVER_BIND]
3235
3336
Help Options:
34-
-h, --help Show this help message
37+
-h, --help Show this help message
3538
```
3639

3740
see [example.yaml](/example.yaml) for configuration file

config/config_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ func Test_NodeLabelMatcher(t *testing.T) {
9494
}
9595
}
9696

97-
9897
func Test_NodeMultipleMatcher(t *testing.T) {
9998
node := buildNode()
10099

config/opts.go

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"encoding/json"
55
log "github.com/sirupsen/logrus"
6+
"time"
67
)
78

89
type (
@@ -14,6 +15,11 @@ type (
1415
LogJson bool ` long:"log.json" env:"LOG_JSON" description:"Switch log output to json format"`
1516
}
1617

18+
K8s struct {
19+
NodeLabelSelector string `long:"kube.node.labelselector" env:"KUBE_NODE_LABELSELECTOR" description:"Node Label selector which nodes should be checked" default:""`
20+
WatchTimeout time.Duration `long:"kube.watch.timeout" env:"KUBE_WATCH_TIMEOUT" description:"Timeout & full resync for node watch (time.Duration)" default:"24h"`
21+
}
22+
1723
// general options
1824
DryRun bool `long:"dry-run" env:"DRY_RUN" description:"Dry run (do not apply to nodes)"`
1925
Config string `long:"config" env:"CONFIG" description:"Config path" required:"true"`

manager/manager.go

+30-14
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ package manager
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"github.com/prometheus/client_golang/prometheus"
78
log "github.com/sirupsen/logrus"
89
"github.com/webdevops/kube-pool-manager/config"
910
corev1 "k8s.io/api/core/v1"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/types"
13+
"k8s.io/apimachinery/pkg/watch"
1214
"k8s.io/client-go/kubernetes"
1315
_ "k8s.io/client-go/plugin/pkg/client/auth"
1416
"k8s.io/client-go/rest"
1517
"k8s.io/client-go/tools/clientcmd"
1618
"os"
17-
"strings"
1819
)
1920

2021
type (
@@ -27,7 +28,7 @@ type (
2728

2829
prometheus struct {
2930
nodePoolStatus *prometheus.GaugeVec
30-
nodeApplied *prometheus.GaugeVec
31+
nodeApplied *prometheus.GaugeVec
3132
}
3233
}
3334
)
@@ -83,31 +84,46 @@ func (r *KubePoolManager) initK8s() {
8384
}
8485

8586
func (m *KubePoolManager) Start() {
86-
m.startWatch()
87+
go func() {
88+
for {
89+
log.Info("(re)starting node watch")
90+
if err := m.startNodeWatch(); err != nil {
91+
log.Errorf("node watcher stopped: %v", err)
92+
}
93+
}
94+
}()
8795
}
8896

89-
func (m *KubePoolManager) startWatch() {
90-
watch, err := m.k8sClient.CoreV1().Nodes().Watch(m.ctx, metav1.ListOptions{Watch: true})
97+
func (m *KubePoolManager) startNodeWatch() error {
98+
timeout := int64(m.Opts.K8s.WatchTimeout.Seconds())
99+
watchOpts := metav1.ListOptions{
100+
LabelSelector: m.Opts.K8s.NodeLabelSelector,
101+
TimeoutSeconds: &timeout,
102+
Watch: true,
103+
}
104+
nodeWatcher, err := m.k8sClient.CoreV1().Nodes().Watch(m.ctx, watchOpts)
91105
if err != nil {
92106
log.Panic(err)
93107
}
108+
defer nodeWatcher.Stop()
94109

95-
go func() {
96-
for res := range watch.ResultChan() {
97-
switch strings.ToLower(string(res.Type)) {
98-
case "added":
99-
if node, ok := res.Object.(*corev1.Node); ok {
100-
m.applyNode(node)
101-
}
110+
for res := range nodeWatcher.ResultChan() {
111+
switch res.Type {
112+
case watch.Added:
113+
if node, ok := res.Object.(*corev1.Node); ok {
114+
m.applyNode(node)
102115
}
116+
case watch.Error:
117+
log.Errorf("go watch error event %v", res.Object)
103118
}
104-
}()
119+
}
120+
121+
return fmt.Errorf("terminated")
105122
}
106123

107124
func (m *KubePoolManager) applyNode(node *corev1.Node) {
108125
contextLogger := log.WithField("node", node.Name)
109126

110-
111127
for _, poolConfig := range m.Config.Pools {
112128
m.prometheus.nodePoolStatus.WithLabelValues(node.Name, poolConfig.Name).Set(0)
113129
poolLogger := contextLogger.WithField("pool", poolConfig.Name)

0 commit comments

Comments
 (0)