Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not assign targets to a collector pod that are not Ready for accepting traffic more than a while #3807

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/ta_not_assign_targets_to_bad_collectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not assign targets to a collector pod that is not Running for more than a while

# One or more tracking issues related to the change
issues: [3781]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
40 changes: 31 additions & 9 deletions cmd/otel-allocator/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

const (
defaultMinUpdateInterval = time.Second * 5
defaultMinUpdateInterval = time.Second * 5
defaultGracePeriodBeforeSkipBadCollector = time.Second * 30
)

var (
Expand All @@ -31,10 +32,11 @@ var (
)

type Watcher struct {
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
minUpdateInterval time.Duration
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
minUpdateInterval time.Duration
gracePeriodBeforeSkipBadCollector time.Duration
}

func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher, error) {
Expand All @@ -44,10 +46,11 @@ func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher,
}

return &Watcher{
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
close: make(chan struct{}),
minUpdateInterval: defaultMinUpdateInterval,
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
close: make(chan struct{}),
minUpdateInterval: defaultMinUpdateInterval,
gracePeriodBeforeSkipBadCollector: defaultGracePeriodBeforeSkipBadCollector,
}, nil
}

Expand Down Expand Up @@ -124,6 +127,25 @@ func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[stri
if pod.Spec.NodeName == "" {
continue
}

isPodUnhealthy := false
timeNow := time.Now()
// stop assigning targets to a non-Running pod that has lasted for a specific period
if pod.Status.Phase != v1.PodRunning &&
pod.Status.StartTime != nil &&
(timeNow.Sub(pod.Status.StartTime.Time) > defaultGracePeriodBeforeSkipBadCollector) {
isPodUnhealthy = true
}
// stop assigning targets to a non-Ready pod that has lasted for a specific period
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodReady && condition.Status != v1.ConditionTrue && (timeNow.Sub(condition.LastTransitionTime.Time) > defaultGracePeriodBeforeSkipBadCollector) {
isPodUnhealthy = true
}
}
if isPodUnhealthy {
continue
}

collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName)
}
collectorsDiscovered.Set(float64(len(collectorMap)))
Expand Down
217 changes: 213 additions & 4 deletions cmd/otel-allocator/internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ var labelSelector = metav1.LabelSelector{

func getTestPodWatcher() Watcher {
podWatcher := Watcher{
k8sClient: fake.NewSimpleClientset(),
close: make(chan struct{}),
log: logger,
minUpdateInterval: time.Millisecond,
k8sClient: fake.NewSimpleClientset(),
close: make(chan struct{}),
log: logger,
minUpdateInterval: time.Millisecond,
gracePeriodBeforeSkipBadCollector: defaultGracePeriodBeforeSkipBadCollector,
}
return podWatcher
}
Expand All @@ -49,6 +50,42 @@ func pod(name string) *v1.Pod {
Spec: v1.PodSpec{
NodeName: "test-node",
},
Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}}},
}
}

func podWithPodPhaseAndStartTime(name string, podPhase v1.PodPhase, startTime time.Time) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "test-ns",
Labels: labelMap,
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
Status: v1.PodStatus{Phase: podPhase, StartTime: &metav1.Time{Time: startTime}},
}
}

func podWithPodReadyConditionStatusAndLastTransitionTime(name string, podConditionStatus v1.ConditionStatus, lastTransitionTime time.Time) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "test-ns",
Labels: labelMap,
},
Spec: v1.PodSpec{
NodeName: "test-node",
},
Status: v1.PodStatus{Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: podConditionStatus,
LastTransitionTime: metav1.Time{Time: lastTransitionTime},
}}},
}
}

Expand Down Expand Up @@ -157,6 +194,178 @@ func Test_runWatch(t *testing.T) {
}
}

func Test_gracePeriodWithNonRunningPodPhase(t *testing.T) {
namespace := "test-ns"
type args struct {
kubeFn func(t *testing.T, podWatcher Watcher)
collectorMap map[string]*allocation.Collector
}
tests := []struct {
name string
args args
want map[string]*allocation.Collector
}{
{
name: "pod add",
args: args{
collectorMap: map[string]*allocation.Collector{
"test-pod-running": {
Name: "test-pod-running",
NodeName: "test-node",
},
"test-pod-unknown-within-grace-period": {
Name: "test-pod-unknown-within-grace-period",
NodeName: "test-node",
},
"test-pod-pending-over-grace-period": {
Name: "test-pod-pending-over-grace-period",
NodeName: "test-node",
},
},
},
want: map[string]*allocation.Collector{
"test-pod-running": {
Name: "test-pod-running",
NodeName: "test-node",
},
"test-pod-unknown-within-grace-period": {
Name: "test-pod-unknown-within-grace-period",
NodeName: "test-node",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
timeNow := time.Now()
podWatcher := getTestPodWatcher()
defer func() {
close(podWatcher.close)
}()
var actual map[string]*allocation.Collector
mapMutex := sync.Mutex{}
for _, k := range tt.args.collectorMap {
var p *v1.Pod
switch k.Name {
case "test-pod-running":
p = podWithPodPhaseAndStartTime(k.Name, v1.PodRunning, timeNow)
case "test-pod-unknown-within-grace-period": // non-Running but still within the grace period
p = podWithPodPhaseAndStartTime(k.Name, v1.PodUnknown,
timeNow.Add(-1*podWatcher.gracePeriodBeforeSkipBadCollector).Add(podWatcher.gracePeriodBeforeSkipBadCollector/2))
case "test-pod-pending-over-grace-period": // non-Running and already over the grace period
p = podWithPodPhaseAndStartTime(k.Name, v1.PodPending,
timeNow.Add(-1*podWatcher.gracePeriodBeforeSkipBadCollector).Add(-podWatcher.gracePeriodBeforeSkipBadCollector/2))
}
_, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{})
assert.NoError(t, err)
}

go func(podWatcher Watcher) {
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
mapMutex.Lock()
defer mapMutex.Unlock()
actual = colMap
})
require.NoError(t, err)
}(podWatcher)

assert.EventuallyWithT(t, func(collect *assert.CollectT) {
mapMutex.Lock()
defer mapMutex.Unlock()
assert.Len(collect, actual, len(tt.want))
assert.Equal(collect, actual, tt.want)
assert.Equal(collect, testutil.ToFloat64(collectorsDiscovered), float64(len(actual)))
}, time.Second*3, time.Millisecond)
})
}
}

func Test_gracePeriodWithNonReadyPodCondition(t *testing.T) {
namespace := "test-ns"
type args struct {
kubeFn func(t *testing.T, podWatcher Watcher)
collectorMap map[string]*allocation.Collector
}
tests := []struct {
name string
args args
want map[string]*allocation.Collector
}{
{
name: "pod add",
args: args{
collectorMap: map[string]*allocation.Collector{
"test-pod-ready": {
Name: "test-pod-ready",
NodeName: "test-node",
},
"test-pod-non-ready-within-grace-period": {
Name: "test-pod-non-ready-within-grace-period",
NodeName: "test-node",
},
"test-pod-non-ready-over-grace-period": {
Name: "test-pod-non-ready-over-grace-period",
NodeName: "test-node",
},
},
},
want: map[string]*allocation.Collector{
"test-pod-ready": {
Name: "test-pod-ready",
NodeName: "test-node",
},
"test-pod-non-ready-within-grace-period": {
Name: "test-pod-non-ready-within-grace-period",
NodeName: "test-node",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
timeNow := time.Now()
podWatcher := getTestPodWatcher()
defer func() {
close(podWatcher.close)
}()
var actual map[string]*allocation.Collector
mapMutex := sync.Mutex{}
for _, k := range tt.args.collectorMap {
var p *v1.Pod
switch k.Name {
case "test-pod-ready":
p = podWithPodReadyConditionStatusAndLastTransitionTime(k.Name, v1.ConditionTrue, timeNow)
case "test-pod-non-ready-within-grace-period": // non-Ready but still within the grace period
p = podWithPodReadyConditionStatusAndLastTransitionTime(k.Name, v1.ConditionFalse,
timeNow.Add(-1*podWatcher.gracePeriodBeforeSkipBadCollector).Add(podWatcher.gracePeriodBeforeSkipBadCollector/2))
case "test-pod-non-ready-over-grace-period": // non-Ready and already over the grace period
p = podWithPodReadyConditionStatusAndLastTransitionTime(k.Name, v1.ConditionFalse,
timeNow.Add(-1*podWatcher.gracePeriodBeforeSkipBadCollector).Add(-podWatcher.gracePeriodBeforeSkipBadCollector/2))
}
_, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{})
assert.NoError(t, err)
}

go func(podWatcher Watcher) {
err := podWatcher.Watch(namespace, &labelSelector, func(colMap map[string]*allocation.Collector) {
mapMutex.Lock()
defer mapMutex.Unlock()
actual = colMap
})
require.NoError(t, err)
}(podWatcher)

assert.EventuallyWithT(t, func(collect *assert.CollectT) {
mapMutex.Lock()
defer mapMutex.Unlock()
assert.Len(collect, actual, len(tt.want))
assert.Equal(collect, actual, tt.want)
assert.Equal(collect, testutil.ToFloat64(collectorsDiscovered), float64(len(actual)))
}, time.Second*3, time.Millisecond)
})
}
}

// this tests runWatch in the case of watcher channel closing.
func Test_closeChannel(t *testing.T) {
podWatcher := getTestPodWatcher()
Expand Down