Skip to content

Commit 4e48044

Browse files
committed
feat(nodeScaleDownTimeTracker): add a new metric to track unprocessed nodes during scaleDown
1 parent f8e49af commit 4e48044

File tree

7 files changed

+297
-25
lines changed

7 files changed

+297
-25
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,11 @@ type AutoscalingOptions struct {
352352
CapacitybufferControllerEnabled bool
353353
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
354354
CapacitybufferPodInjectionEnabled bool
355+
// LongestNodeScaleDownEvalTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time.
356+
// We want to track all the nodes that were present in currentlyUnneededNodeNames, but were neither processed nor deleted during the ScaleDown.
357+
// If it happened to a node multiple times consecutively, we store only the earliest time it happened.
358+
// The difference between the current time and the earliest time among all unprocessed nodes will give the longest evaluation time
359+
LongestNodeScaleDownEvalTimeTrackerEnabled bool
355360
}
356361

357362
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ var (
231231
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
232232
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
233233
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
234+
longestNodeScaleDownEvalTimeTrackerEnabled = flag.Bool("longest-node-scaledown-eval-timetracker-enabled", false, "Whether to enable the tracking of the longest node ScaleDown evaluation time")
234235

235236
// Deprecated flags
236237
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -416,6 +417,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
416417
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
417418
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
418419
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
420+
LongestNodeScaleDownEvalTimeTrackerEnabled: *longestNodeScaleDownEvalTimeTrackerEnabled,
419421
}
420422
}
421423

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package longestevaluationtracker
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
23+
)
24+
25+
// LongestNodeScaleDownEvalTime is a time tracker for the longest evaluation time of a node during ScaleDown
26+
type LongestNodeScaleDownEvalTime struct {
27+
// lastEvalTime is the time of previous currentlyUnneededNodeNames parsing
28+
lastEvalTime time.Time
29+
// NodeNamesWithTimeStamps is maps of nodeNames with their time of last successful evaluation
30+
NodeNamesWithTimeStamps map[string]time.Time
31+
}
32+
33+
// NewLongestNodeScaleDownEvalTime returns LongestNodeScaleDownEvalTime with lastEvalTime set to currentTime
34+
func NewLongestNodeScaleDownEvalTime(currentTime time.Time) *LongestNodeScaleDownEvalTime {
35+
return &LongestNodeScaleDownEvalTime{lastEvalTime: currentTime}
36+
}
37+
38+
// Retrieves the time of the last evaluation of a node.
39+
func (l *LongestNodeScaleDownEvalTime) get(nodeName string) time.Time {
40+
if _, ok := l.NodeNamesWithTimeStamps[nodeName]; ok {
41+
return l.NodeNamesWithTimeStamps[nodeName]
42+
}
43+
return l.lastEvalTime
44+
}
45+
46+
// getMin() returns the minimum time in NodeNamesWithTimeStamps or time of last evaluation
47+
func (l *LongestNodeScaleDownEvalTime) getMin() time.Time {
48+
minimumTime := l.lastEvalTime
49+
for _, val := range l.NodeNamesWithTimeStamps {
50+
if minimumTime.After(val) {
51+
minimumTime = val
52+
}
53+
}
54+
return minimumTime
55+
}
56+
57+
// Update returns the longest evaluation time for the nodes in NodeNamesWithTimeStamps
58+
// and changes NodeNamesWithTimeStamps for nodeNames.
59+
func (l *LongestNodeScaleDownEvalTime) Update(nodeNames []string, currentTime time.Time) time.Duration {
60+
var longestTime time.Duration
61+
minimumTime := l.getMin()
62+
if len(nodeNames) == 0 {
63+
// if l.minimumTime is lastEvalTime, then in previous iteration we also processed all the nodes, so the longest time is 0
64+
// otherwise -> report the longest time from previous iteration and reset the minimumTime
65+
if minimumTime.Equal(l.lastEvalTime) {
66+
longestTime = 0
67+
} else {
68+
longestTime = currentTime.Sub(minimumTime)
69+
}
70+
l.NodeNamesWithTimeStamps = make(map[string]time.Time)
71+
} else {
72+
newNodes := make(map[string]time.Time, len(nodeNames))
73+
for _, nodeName := range nodeNames {
74+
newNodes[nodeName] = l.get(nodeName)
75+
}
76+
l.NodeNamesWithTimeStamps = newNodes
77+
longestTime = currentTime.Sub(minimumTime)
78+
}
79+
l.lastEvalTime = currentTime
80+
metrics.ObserveLongestUnneededNodeScaleDownEvalDurationSeconds(longestTime)
81+
return longestTime
82+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package longestevaluationtracker
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestLongestUnprocessedNodeScaleDownTime(t *testing.T) {
27+
type testCase struct {
28+
name string
29+
unprocessedNodes [][]string
30+
wantLongestScaleDownEvalTime []time.Duration
31+
}
32+
start := time.Now()
33+
testCases := []testCase{
34+
{
35+
name: "All nodes processed in the first iteration",
36+
unprocessedNodes: [][]string{nil, {"n1", "n2"}, {"n2", "n3"}, {}, {}},
37+
wantLongestScaleDownEvalTime: []time.Duration{time.Duration(0), time.Duration(1 * time.Second), time.Duration(2 * time.Second),
38+
time.Duration(3 * time.Second), time.Duration(0)},
39+
},
40+
{
41+
name: "Not all nodes processed in the first iteration",
42+
unprocessedNodes: [][]string{{"n1", "n2"}, {"n1", "n2"}, {"n2", "n3"}, {}, {}},
43+
wantLongestScaleDownEvalTime: []time.Duration{time.Duration(1 * time.Second), time.Duration(2 * time.Second), time.Duration(3 * time.Second),
44+
time.Duration(4 * time.Second), time.Duration(0)},
45+
},
46+
{
47+
name: "Different nodes processed in each iteration",
48+
unprocessedNodes: [][]string{{"n1"}, {"n2"}, {"n3"}, {"n4"}, {}, {}},
49+
wantLongestScaleDownEvalTime: []time.Duration{time.Duration(1 * time.Second), time.Duration(2 * time.Second), time.Duration(2 * time.Second),
50+
time.Duration(2 * time.Second), time.Duration(2 * time.Second), time.Duration(0)},
51+
},
52+
}
53+
for _, tc := range testCases {
54+
t.Run(tc.name, func(t *testing.T) {
55+
t.Parallel()
56+
timestamp := start
57+
longestScaleDownEvalT := NewLongestNodeScaleDownEvalTime(start)
58+
for i := 0; i < len(tc.unprocessedNodes); i++ {
59+
timestamp = timestamp.Add(1 * time.Second)
60+
assert.Equal(t, tc.wantLongestScaleDownEvalTime[i], longestScaleDownEvalT.Update(tc.unprocessedNodes[i], timestamp))
61+
assert.Equal(t, len(tc.unprocessedNodes[i]), len(longestScaleDownEvalT.NodeNamesWithTimeStamps))
62+
}
63+
})
64+
}
65+
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/longestevaluationtracker"
2930
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
@@ -63,19 +64,20 @@ type replicasInfo struct {
6364

6465
// Planner is responsible for deciding which nodes should be deleted during scale down.
6566
type Planner struct {
66-
autoscalingCtx *ca_context.AutoscalingContext
67-
unremovableNodes *unremovable.Nodes
68-
unneededNodes *unneeded.Nodes
69-
rs removalSimulator
70-
actuationInjector *scheduling.HintingSimulator
71-
latestUpdate time.Time
72-
minUpdateInterval time.Duration
73-
eligibilityChecker eligibilityChecker
74-
nodeUtilizationMap map[string]utilization.Info
75-
resourceLimitsFinder *resource.LimitsFinder
76-
cc controllerReplicasCalculator
77-
scaleDownSetProcessor nodes.ScaleDownSetProcessor
78-
scaleDownContext *nodes.ScaleDownContext
67+
autoscalingCtx *ca_context.AutoscalingContext
68+
unremovableNodes *unremovable.Nodes
69+
unneededNodes *unneeded.Nodes
70+
rs removalSimulator
71+
actuationInjector *scheduling.HintingSimulator
72+
latestUpdate time.Time
73+
minUpdateInterval time.Duration
74+
eligibilityChecker eligibilityChecker
75+
nodeUtilizationMap map[string]utilization.Info
76+
resourceLimitsFinder *resource.LimitsFinder
77+
cc controllerReplicasCalculator
78+
scaleDownSetProcessor nodes.ScaleDownSetProcessor
79+
scaleDownContext *nodes.ScaleDownContext
80+
longestNodeScaleDownEvalTime *longestevaluationtracker.LongestNodeScaleDownEvalTime
7981
}
8082

8183
// New creates a new Planner object.
@@ -91,19 +93,25 @@ func New(autoscalingCtx *ca_context.AutoscalingContext, processors *processors.A
9193
unneededNodes.LoadFromExistingTaints(autoscalingCtx.ListerRegistry, time.Now(), autoscalingCtx.AutoscalingOptions.NodeDeletionCandidateTTL)
9294
}
9395

96+
var longestNodeScaleDownEvalTime *longestevaluationtracker.LongestNodeScaleDownEvalTime
97+
if autoscalingCtx.AutoscalingOptions.LongestNodeScaleDownEvalTimeTrackerEnabled {
98+
longestNodeScaleDownEvalTime = longestevaluationtracker.NewLongestNodeScaleDownEvalTime(time.Now())
99+
}
100+
94101
return &Planner{
95-
autoscalingCtx: autoscalingCtx,
96-
unremovableNodes: unremovable.NewNodes(),
97-
unneededNodes: unneededNodes,
98-
rs: simulator.NewRemovalSimulator(autoscalingCtx.ListerRegistry, autoscalingCtx.ClusterSnapshot, deleteOptions, drainabilityRules, true),
99-
actuationInjector: scheduling.NewHintingSimulator(),
100-
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
101-
nodeUtilizationMap: make(map[string]utilization.Info),
102-
resourceLimitsFinder: resourceLimitsFinder,
103-
cc: newControllerReplicasCalculator(autoscalingCtx.ListerRegistry),
104-
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
105-
scaleDownContext: nodes.NewDefaultScaleDownContext(),
106-
minUpdateInterval: minUpdateInterval,
102+
autoscalingCtx: autoscalingCtx,
103+
unremovableNodes: unremovable.NewNodes(),
104+
unneededNodes: unneededNodes,
105+
rs: simulator.NewRemovalSimulator(autoscalingCtx.ListerRegistry, autoscalingCtx.ClusterSnapshot, deleteOptions, drainabilityRules, true),
106+
actuationInjector: scheduling.NewHintingSimulator(),
107+
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
108+
nodeUtilizationMap: make(map[string]utilization.Info),
109+
resourceLimitsFinder: resourceLimitsFinder,
110+
cc: newControllerReplicasCalculator(autoscalingCtx.ListerRegistry),
111+
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
112+
scaleDownContext: nodes.NewDefaultScaleDownContext(),
113+
minUpdateInterval: minUpdateInterval,
114+
longestNodeScaleDownEvalTime: longestNodeScaleDownEvalTime,
107115
}
108116
}
109117

@@ -277,13 +285,16 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
277285
}
278286
p.nodeUtilizationMap = utilizationMap
279287
timer := time.NewTimer(p.autoscalingCtx.ScaleDownSimulationTimeout)
288+
var skippedNodes []string
280289

281290
for i, node := range currentlyUnneededNodeNames {
282291
if timedOut(timer) {
292+
skippedNodes = currentlyUnneededNodeNames[i:]
283293
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
284294
break
285295
}
286296
if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() {
297+
skippedNodes = currentlyUnneededNodeNames[i:]
287298
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount)
288299
break
289300
}
@@ -306,6 +317,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
306317
p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout)
307318
}
308319
}
320+
p.handleUnprocessedNodes(skippedNodes)
309321
p.unneededNodes.Update(removableList, p.latestUpdate)
310322
if unremovableCount > 0 {
311323
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
@@ -372,6 +384,15 @@ func (p *Planner) unneededNodesLimit() int {
372384
return limit
373385
}
374386

387+
// handleUnprocessedNodes is used to track the longest time it take for a node to be evaluated as removable or not
388+
func (p *Planner) handleUnprocessedNodes(unprocessedNodeNames []string) {
389+
// if p.longestNodeScaleDownEvalTime is not set (flag is disabled) or endedPrematurely is already true (nodes were already reported in this iteration) do not do anything
390+
if p.longestNodeScaleDownEvalTime == nil {
391+
return
392+
}
393+
p.longestNodeScaleDownEvalTime.Update(unprocessedNodeNames, time.Now())
394+
}
395+
375396
// getKnownOwnerRef returns ownerRef that is known by CA and CA knows the logic of how this controller recreates pods.
376397
func getKnownOwnerRef(ownerRefs []metav1.OwnerReference) *metav1.OwnerReference {
377398
for _, ownerRef := range ownerRefs {

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,86 @@ func TestNodesToDelete(t *testing.T) {
10351035
}
10361036
}
10371037

1038+
func TestLongestUnprocessedNodeScaleDown(t *testing.T) {
1039+
type testCase struct {
1040+
name string
1041+
maxParallel int
1042+
isSimulationTimeout bool
1043+
isFlagEnabled bool
1044+
wantUnprocessedNodes int
1045+
}
1046+
nodes := []*apiv1.Node{
1047+
BuildTestNode("n1", 1000, 10),
1048+
BuildTestNode("n2", 1000, 10),
1049+
BuildTestNode("n3", 1000, 10),
1050+
}
1051+
eligible := []string{"n1", "n2"}
1052+
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
1053+
provider := testprovider.NewTestCloudProviderBuilder().Build()
1054+
provider.AddNodeGroup("ng1", 0, 0, 0)
1055+
for _, node := range nodes {
1056+
provider.AddNode("ng1", node)
1057+
}
1058+
testCases := []testCase{
1059+
{
1060+
name: "Unneeded node limit is exceeded",
1061+
maxParallel: 0,
1062+
isSimulationTimeout: false,
1063+
isFlagEnabled: true,
1064+
// maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes() right away
1065+
wantUnprocessedNodes: 2,
1066+
},
1067+
{
1068+
name: "Simulation timeout is hit",
1069+
maxParallel: 1,
1070+
isSimulationTimeout: true,
1071+
isFlagEnabled: true,
1072+
// first node will be deleted and for the second timeout will be triggered
1073+
wantUnprocessedNodes: 1,
1074+
},
1075+
{
1076+
name: "LongestNodeScaleDownEvalTimeTrackerEnabled flag is disabled",
1077+
maxParallel: 1,
1078+
isSimulationTimeout: false,
1079+
isFlagEnabled: false,
1080+
},
1081+
}
1082+
for _, tc := range testCases {
1083+
tc := tc
1084+
t.Run(tc.name, func(t *testing.T) {
1085+
t.Parallel()
1086+
autoscalingCtx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
1087+
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
1088+
ScaleDownUnneededTime: 10 * time.Minute,
1089+
},
1090+
ScaleDownSimulationTimeout: 1 * time.Second,
1091+
MaxScaleDownParallelism: tc.maxParallel,
1092+
LongestNodeScaleDownEvalTimeTrackerEnabled: tc.isFlagEnabled,
1093+
}, &fake.Clientset{}, registry, provider, nil, nil)
1094+
assert.NoError(t, err)
1095+
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingCtx.ClusterSnapshot, nodes, nil)
1096+
deleteOptions := options.NodeDeleteOptions{}
1097+
p := New(&autoscalingCtx, processorstest.NewTestProcessors(&autoscalingCtx), deleteOptions, nil)
1098+
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(eligible)}
1099+
if tc.isSimulationTimeout {
1100+
autoscalingCtx.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
1101+
rs := &fakeRemovalSimulator{
1102+
nodes: nodes,
1103+
sleep: 2 * time.Second,
1104+
}
1105+
p.rs = rs
1106+
}
1107+
assert.NoError(t, p.UpdateClusterState(nodes, nodes, &fakeActuationStatus{}, time.Now()))
1108+
if !tc.isFlagEnabled {
1109+
// if flag is disabled p.longestNodeScaleDownEvalTime is not initialized
1110+
assert.Nil(t, p.longestNodeScaleDownEvalTime)
1111+
} else {
1112+
assert.Equal(t, tc.wantUnprocessedNodes, len(p.longestNodeScaleDownEvalTime.NodeNamesWithTimeStamps))
1113+
}
1114+
})
1115+
}
1116+
}
1117+
10381118
func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
10391119
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
10401120
ng.SetOptions(&config.NodeGroupAutoscalingOptions{

0 commit comments

Comments
 (0)