diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go index d77bf6aa8307..658eb1ef0e24 100644 --- a/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "math" "os" "time" @@ -38,7 +37,10 @@ import ( const ( // GPULabel is the label added to GPU nodes - GPULabel = "k8s.scaleway.com/gpu" + GPULabel = "k8s.scw.cloud/gpu" + + // DefaultRefreshInterval is the default refresh interval for the cloud provider + DefaultRefreshInterval = 60 * time.Second ) type scalewayCloudProvider struct { @@ -47,13 +49,22 @@ type scalewayCloudProvider struct { // ClusterID is the cluster id where the Autoscaler is running. clusterID string // nodeGroups is an abstraction around the Pool object returned by the API - nodeGroups []*NodeGroup + // key is the Pool ID + nodeGroups map[string]*NodeGroup + // providerNodeGroups is a pre-converted slice of node groups for NodeGroups() method + providerNodeGroups []cloudprovider.NodeGroup + // refreshInterval is the minimum duration between refreshes + refreshInterval time.Duration + // lastRefresh is the last time the nodes and node groups were refreshed from the API + lastRefresh time.Time + // lastRefreshError stores the error from the last refresh, if any + lastRefreshError error resourceLimiter *cloudprovider.ResourceLimiter } func readConf(config *scalewaygo.Config, configFile io.Reader) error { - body, err := ioutil.ReadAll(configFile) + body, err := io.ReadAll(configFile) if err != nil { return err } @@ -61,7 +72,7 @@ func readConf(config *scalewaygo.Config, configFile io.Reader) error { return err } -func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) *scalewayCloudProvider { +func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) (*scalewayCloudProvider, error) { getenvOr := func(key, defaultValue string) string { value := os.Getenv(key) if value != "" { @@ -84,6 +95,7 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl cfg.SecretKey = getenvOr("SCW_SECRET_KEY", cfg.SecretKey) cfg.Region = getenvOr("SCW_REGION", cfg.Region) cfg.ApiUrl = getenvOr("SCW_API_URL", cfg.ApiUrl) + cfg.DefaultCacheControl = DefaultRefreshInterval cfg.UserAgent = defaultUserAgent @@ -92,21 +104,26 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl klog.Fatalf("failed to create scaleway cloud provider: %v", err) } - klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,SecretKey=%s-***,Region=%s,ApiURL=%s", cfg.ClusterID, client.Token()[:8], client.Region(), client.ApiURL()) + klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,Region=%s,ApiURL=%s", cfg.ClusterID, client.Region(), client.ApiURL()) - return &scalewayCloudProvider{ + provider := &scalewayCloudProvider{ client: client, clusterID: cfg.ClusterID, resourceLimiter: rl, + refreshInterval: DefaultRefreshInterval, + } + + // Perform initial refresh to populate node groups cache + if err := provider.Refresh(); err != nil { + klog.Errorf("Failed to perform initial refresh: %v", err) + return nil, err } + + return provider, nil } // BuildScaleway returns CloudProvider implementation for Scaleway. -func BuildScaleway( - opts *coreoptions.AutoscalerOptions, - do cloudprovider.NodeGroupDiscoveryOptions, - rl *cloudprovider.ResourceLimiter, -) cloudprovider.CloudProvider { +func BuildScaleway(opts *coreoptions.AutoscalerOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { var configFile io.Reader if opts.CloudConfig != "" { @@ -123,79 +140,63 @@ func BuildScaleway( }() } } - return newScalewayCloudProvider(configFile, opts.UserAgent, rl) + + provider, err := newScalewayCloudProvider(configFile, opts.UserAgent, rl) + if err != nil { + klog.Fatalf("Failed to create Scaleway cloud provider: %v", err) + } + return provider } -// Name returns 'scaleway' +// Name returns name of the cloud provider. func (*scalewayCloudProvider) Name() string { return cloudprovider.ScalewayProviderName } -// NodeGroups returns all node groups configured for this cluster. -// critical endpoint, make it fast +// NodeGroups returns all node groups configured for this cloud provider. func (scw *scalewayCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - klog.V(4).Info("NodeGroups,ClusterID=", scw.clusterID) - nodeGroups := make([]cloudprovider.NodeGroup, len(scw.nodeGroups)) - for i, ng := range scw.nodeGroups { - nodeGroups[i] = ng - } - return nodeGroups -} - -func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) { - for _, ng := range scw.nodeGroups { - if _, ok := ng.nodes[node.Spec.ProviderID]; ok { - return ng, nil - } - } - return nil, nil + return scw.providerNodeGroups } // NodeGroupForNode returns the node group for the given node, nil if the node // should not be processed by cluster autoscaler, or non-nil error if such -// occurred. -// critical endpoint, make it fast +// occurred. Must be implemented. func (scw *scalewayCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) { klog.V(4).Infof("NodeGroupForNode,NodeSpecProviderID=%s", node.Spec.ProviderID) - return scw.nodeGroupForNode(node) -} - -// HasInstance returns whether a given node has a corresponding instance in this cloud provider -func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) { - return true, cloudprovider.ErrNotImplemented -} - -func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) { - ng, err := scw.nodeGroupForNode(node) - if err != nil { - return 0.0, err + for _, ng := range scw.nodeGroups { + if _, ok := ng.nodes[node.Spec.ProviderID]; ok { + return ng, nil + } } - d := endTime.Sub(startTime) - hours := math.Ceil(d.Hours()) - - return hours * float64(ng.specs.NodePricePerHour), nil + return nil, nil } -func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) { - return 0.0, nil +// HasInstance returns whether the node has corresponding instance in cloud provider, +// true if the node has an instance, false if it no longer exists +func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) { + return node.Spec.ProviderID != "", nil } -// Pricing return pricing model for scaleway. +// Pricing returns pricing model for this cloud provider or error if not available. +// Implementation optional. func (scw *scalewayCloudProvider) Pricing() (cloudprovider.PricingModel, ca_errors.AutoscalerError) { klog.V(4).Info("Pricing,called") return scw, nil } -// GetAvailableMachineTypes get all machine types that can be requested from scaleway. -// Not implemented +// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider. +// Implementation optional. func (scw *scalewayCloudProvider) GetAvailableMachineTypes() ([]string, error) { return []string{}, nil } +// NewNodeGroup builds a theoretical node group based on the node definition provided. The node group is not automatically +// created on the cloud provider side. The node group is not returned by NodeGroups() until it is created. +// Implementation optional. func (scw *scalewayCloudProvider) NewNodeGroup( machineType string, labels map[string]string, @@ -220,7 +221,6 @@ func (scw *scalewayCloudProvider) GPULabel() string { } // GetAvailableGPUTypes return all available GPU types cloud provider supports. -// not yet implemented. func (scw *scalewayCloudProvider) GetAvailableGPUTypes() map[string]struct{} { klog.V(4).Info("GetAvailableGPUTypes,called") return nil @@ -244,36 +244,98 @@ func (scw *scalewayCloudProvider) Cleanup() error { func (scw *scalewayCloudProvider) Refresh() error { klog.V(4).Info("Refresh,ClusterID=", scw.clusterID) - ctx := context.Background() - resp, err := scw.client.ListPools(ctx, &scalewaygo.ListPoolsRequest{ClusterID: scw.clusterID}) + // Only skip refresh if lastRefresh is non-zero and interval has not elapsed + if !scw.lastRefresh.IsZero() && time.Since(scw.lastRefresh) < scw.refreshInterval { + klog.V(4).Infof("Refresh,ClusterID=%s,skipping refresh, last refresh was %s ago", scw.clusterID, time.Since(scw.lastRefresh)) + return scw.lastRefreshError + } + cc, pools, err := scw.client.ListPools(context.Background(), scw.clusterID) if err != nil { klog.Errorf("Refresh,failed to list pools for cluster %s: %s", scw.clusterID, err) + scw.lastRefresh = time.Now() + scw.lastRefreshError = err return err } + // Update refresh interval based on Cache-Control header from listPools response + scw.refreshInterval = cc - var ng []*NodeGroup - - for _, p := range resp.Pools { + _, nodes, err := scw.client.ListNodes(context.Background(), scw.clusterID) + if err != nil { + klog.Errorf("Refresh,failed to list nodes for cluster %s: %s", scw.clusterID, err) + scw.lastRefresh = time.Now() + scw.lastRefreshError = err + return err + } - if p.Pool.Autoscaling == false { + // Build NodeGroups + nodeGroups := make(map[string]*NodeGroup) + for _, pool := range pools { + if !pool.Autoscaling { + klog.V(4).Infof("Refresh,ClusterID=%s,skipping pool %s (autoscaling disabled)", scw.clusterID, pool.ID) continue } - nodes, err := nodesFromPool(scw.client, p.Pool) - if err != nil { - return fmt.Errorf("Refresh,failed to list nodes for pool %s: %w", p.Pool.ID, err) - } - ng = append(ng, &NodeGroup{ + nodeGroup := &NodeGroup{ Client: scw.client, - nodes: nodes, - specs: &p.Specs, - p: p.Pool, - }) + nodes: make(map[string]*scalewaygo.Node), + pool: pool, + } + + nodeGroups[pool.ID] = nodeGroup } - klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(ng)) - scw.nodeGroups = ng + // Assign nodes to NodeGroups + for _, node := range nodes { + _, ok := nodeGroups[node.PoolID] + if !ok { + klog.V(4).Infof("Refresh,ClusterID=%s,node %s found for PoolID=%s which does not exist in nodeGroups, skipping", scw.clusterID, node.ProviderID, node.PoolID) + continue + } + + nodeGroups[node.PoolID].nodes[node.ProviderID] = &node + } + + scw.nodeGroups = nodeGroups + + // Pre-convert nodeGroups map to slice for NodeGroups() method + // This is to avoid converting the map to a slice on every call to NodeGroups() + // which happens quite often + scw.providerNodeGroups = make([]cloudprovider.NodeGroup, 0, len(nodeGroups)) + for _, ng := range nodeGroups { + scw.providerNodeGroups = append(scw.providerNodeGroups, ng) + } + + klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(nodeGroups)) + + scw.lastRefresh = time.Now() + scw.lastRefreshError = nil return nil } + +// NodePrice returns a price of running the given node for a given period of time. +// All prices returned by the structure should be in the same currency. +func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) { + var nodeGroup *NodeGroup + for _, ng := range scw.nodeGroups { + if _, ok := ng.nodes[node.Spec.ProviderID]; ok { + nodeGroup = ng + } + } + + if nodeGroup == nil { + return 0.0, fmt.Errorf("node group not found for node %s", node.Spec.ProviderID) + } + + d := endTime.Sub(startTime) + hours := math.Ceil(d.Hours()) + + return hours * float64(nodeGroup.pool.NodePricePerHour), nil +} + +// PodPrice returns a theoretical minimum price of running a pod for a given +// period of time on a perfectly matching machine. +func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) { + return 0.0, nil +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider_test.go new file mode 100644 index 000000000000..6bf7e5fe6e90 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider_test.go @@ -0,0 +1,578 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleway + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" +) + +// mockClient is a mock implementation of scalewaygo.Client +type mockClient struct { + mock.Mock +} + +func (m *mockClient) ListPools(ctx context.Context, clusterID string) (time.Duration, []scalewaygo.Pool, error) { + args := m.Called(ctx, clusterID) + return args.Get(0).(time.Duration), args.Get(1).([]scalewaygo.Pool), args.Error(2) +} + +func (m *mockClient) UpdatePool(ctx context.Context, poolID string, size int) (scalewaygo.Pool, error) { + args := m.Called(ctx, poolID, size) + return args.Get(0).(scalewaygo.Pool), args.Error(1) +} + +func (m *mockClient) ListNodes(ctx context.Context, clusterID string) (time.Duration, []scalewaygo.Node, error) { + args := m.Called(ctx, clusterID) + return args.Get(0).(time.Duration), args.Get(1).([]scalewaygo.Node), args.Error(2) +} + +func (m *mockClient) DeleteNode(ctx context.Context, nodeID string) (scalewaygo.Node, error) { + args := m.Called(ctx, nodeID) + return args.Get(0).(scalewaygo.Node), args.Error(1) +} + +func createTestPool(id string, autoscaling bool, size, minSize, maxSize int) scalewaygo.Pool { + now := time.Now() + return scalewaygo.Pool{ + ID: id, + ClusterID: "test-cluster", + Name: fmt.Sprintf("pool-%s", id), + Status: scalewaygo.PoolStatusReady, + Version: "1.27.0", + NodeType: "DEV1-M", + Autoscaling: autoscaling, + Size: size, + MinSize: minSize, + MaxSize: maxSize, + Zone: "fr-par-1", + Capacity: map[string]int64{ + "cpu": 2000, + "memory": 4096000000, + "ephemeral-storage": 20000000000, + "pods": 110, + }, + Allocatable: map[string]int64{ + "cpu": 1800, + "memory": 3500000000, + "ephemeral-storage": 18000000000, + "pods": 110, + }, + Labels: map[string]string{ + "kubernetes.io/hostname": "node-1", + }, + Taints: map[string]string{}, + NodePricePerHour: 0.05, + CreatedAt: &now, + UpdatedAt: &now, + } +} + +func createTestNode(nodeID, poolID, providerID string, status scalewaygo.NodeStatus) scalewaygo.Node { + now := time.Now() + return scalewaygo.Node{ + ID: nodeID, + PoolID: poolID, + ClusterID: "test-cluster", + ProviderID: providerID, + Name: fmt.Sprintf("node-%s", nodeID), + Status: status, + CreatedAt: &now, + UpdatedAt: &now, + } +} + +func TestScalewayCloudProvider_Name(t *testing.T) { + client := new(mockClient) + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + } + + assert.Equal(t, cloudprovider.ScalewayProviderName, provider.Name()) +} + +func TestScalewayCloudProvider_Refresh(t *testing.T) { + t.Run("successful refresh", func(t *testing.T) { + client := new(mockClient) + pool1 := createTestPool("pool-1", true, 3, 1, 10) + pool2 := createTestPool("pool-2", true, 2, 1, 5) + pool3 := createTestPool("pool-3", false, 1, 1, 1) // autoscaling disabled + + node1 := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + node2 := createTestNode("node-2", "pool-1", "scaleway://fr-par-1/instance-2", scalewaygo.NodeStatusReady) + node3 := createTestNode("node-3", "pool-2", "scaleway://fr-par-1/instance-3", scalewaygo.NodeStatusReady) + + client.On("ListPools", mock.Anything, "test-cluster").Return( + 30*time.Second, + []scalewaygo.Pool{pool1, pool2, pool3}, + nil, + ) + client.On("ListNodes", mock.Anything, "test-cluster").Return( + 30*time.Second, + []scalewaygo.Node{node1, node2, node3}, + nil, + ) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: DefaultRefreshInterval, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + require.NoError(t, err) + + // Verify node groups (only autoscaling pools) + assert.Len(t, provider.nodeGroups, 2) + assert.Contains(t, provider.nodeGroups, "pool-1") + assert.Contains(t, provider.nodeGroups, "pool-2") + assert.NotContains(t, provider.nodeGroups, "pool-3") + + // Verify nodes are assigned correctly + assert.Len(t, provider.nodeGroups["pool-1"].nodes, 2) + assert.Len(t, provider.nodeGroups["pool-2"].nodes, 1) + + // Verify refresh interval is updated + assert.Equal(t, 30*time.Second, provider.refreshInterval) + + client.AssertExpectations(t) + }) + + t.Run("caching prevents refresh", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + node := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + + // First refresh + client.On("ListPools", mock.Anything, "test-cluster").Return( + 5*time.Second, + []scalewaygo.Pool{pool}, + nil, + ).Once() + client.On("ListNodes", mock.Anything, "test-cluster").Return( + 5*time.Second, + []scalewaygo.Node{node}, + nil, + ).Once() + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: 5 * time.Second, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + require.NoError(t, err) + assert.NotZero(t, provider.lastRefresh) + + // Second refresh immediately - should be skipped + err = provider.Refresh() + require.NoError(t, err) + + // Only one call to each method + client.AssertExpectations(t) + }) + + t.Run("refresh after interval elapsed", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + node := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + + // First refresh + client.On("ListPools", mock.Anything, "test-cluster").Return( + 1*time.Millisecond, + []scalewaygo.Pool{pool}, + nil, + ).Twice() + client.On("ListNodes", mock.Anything, "test-cluster").Return( + 1*time.Millisecond, + []scalewaygo.Node{node}, + nil, + ).Twice() + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: 1 * time.Millisecond, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + require.NoError(t, err) + + // Wait for interval to elapse + time.Sleep(2 * time.Millisecond) + + // Second refresh should execute + err = provider.Refresh() + require.NoError(t, err) + + client.AssertExpectations(t) + }) + + t.Run("error on ListPools", func(t *testing.T) { + client := new(mockClient) + + client.On("ListPools", mock.Anything, "test-cluster").Return( + time.Duration(0), + []scalewaygo.Pool{}, + fmt.Errorf("API error"), + ) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: DefaultRefreshInterval, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + assert.Error(t, err) + assert.Equal(t, err, provider.lastRefreshError) + + client.AssertExpectations(t) + }) + + t.Run("error on ListNodes", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + + client.On("ListPools", mock.Anything, "test-cluster").Return( + 30*time.Second, + []scalewaygo.Pool{pool}, + nil, + ) + client.On("ListNodes", mock.Anything, "test-cluster").Return( + time.Duration(0), + []scalewaygo.Node{}, + fmt.Errorf("API error"), + ) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: DefaultRefreshInterval, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + assert.Error(t, err) + assert.Equal(t, err, provider.lastRefreshError) + + client.AssertExpectations(t) + }) + + t.Run("nodes for non-existent pool are skipped", func(t *testing.T) { + client := new(mockClient) + pool1 := createTestPool("pool-1", true, 1, 1, 10) + node1 := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + node2 := createTestNode("node-2", "pool-nonexistent", "scaleway://fr-par-1/instance-2", scalewaygo.NodeStatusReady) + + client.On("ListPools", mock.Anything, "test-cluster").Return( + 30*time.Second, + []scalewaygo.Pool{pool1}, + nil, + ) + client.On("ListNodes", mock.Anything, "test-cluster").Return( + 30*time.Second, + []scalewaygo.Node{node1, node2}, + nil, + ) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + refreshInterval: DefaultRefreshInterval, + nodeGroups: make(map[string]*NodeGroup), + } + + err := provider.Refresh() + require.NoError(t, err) + + // Only pool-1 should have nodes + assert.Len(t, provider.nodeGroups["pool-1"].nodes, 1) + + client.AssertExpectations(t) + }) +} + +func TestScalewayCloudProvider_NodeGroups(t *testing.T) { + client := new(mockClient) + pool1 := createTestPool("pool-1", true, 3, 1, 10) + pool2 := createTestPool("pool-2", true, 2, 1, 5) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + nodeGroups: map[string]*NodeGroup{ + "pool-1": { + Client: client, + pool: pool1, + nodes: make(map[string]*scalewaygo.Node), + }, + "pool-2": { + Client: client, + pool: pool2, + nodes: make(map[string]*scalewaygo.Node), + }, + }, + } + + // Pre-convert for testing + provider.providerNodeGroups = []cloudprovider.NodeGroup{ + provider.nodeGroups["pool-1"], + provider.nodeGroups["pool-2"], + } + + nodeGroups := provider.NodeGroups() + assert.Len(t, nodeGroups, 2) +} + +func TestScalewayCloudProvider_NodeGroupForNode(t *testing.T) { + t.Run("node found in pool", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + node := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + nodeGroups: map[string]*NodeGroup{ + "pool-1": { + Client: client, + pool: pool, + nodes: map[string]*scalewaygo.Node{ + "scaleway://fr-par-1/instance-1": &node, + }, + }, + }, + } + + k8sNode := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "scaleway://fr-par-1/instance-1", + }, + } + + ng, err := provider.NodeGroupForNode(k8sNode) + require.NoError(t, err) + require.NotNil(t, ng) + assert.Equal(t, "pool-1", ng.Id()) + }) + + t.Run("node not found", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + nodeGroups: map[string]*NodeGroup{ + "pool-1": { + Client: client, + pool: pool, + nodes: make(map[string]*scalewaygo.Node), + }, + }, + } + + k8sNode := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "scaleway://fr-par-1/instance-nonexistent", + }, + } + + ng, err := provider.NodeGroupForNode(k8sNode) + require.NoError(t, err) + assert.Nil(t, ng) + }) +} + +func TestScalewayCloudProvider_HasInstance(t *testing.T) { + provider := &scalewayCloudProvider{} + + t.Run("node with provider ID", func(t *testing.T) { + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "scaleway://fr-par-1/instance-1", + }, + } + + hasInstance, err := provider.HasInstance(node) + require.NoError(t, err) + assert.True(t, hasInstance) + }) + + t.Run("node without provider ID", func(t *testing.T) { + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "", + }, + } + + hasInstance, err := provider.HasInstance(node) + require.NoError(t, err) + assert.False(t, hasInstance) + }) +} + +func TestScalewayCloudProvider_Pricing(t *testing.T) { + provider := &scalewayCloudProvider{} + + pricingModel, err := provider.Pricing() + require.NoError(t, err) + assert.NotNil(t, pricingModel) + assert.Equal(t, provider, pricingModel) +} + +func TestScalewayCloudProvider_GetAvailableMachineTypes(t *testing.T) { + provider := &scalewayCloudProvider{} + + machineTypes, err := provider.GetAvailableMachineTypes() + require.NoError(t, err) + assert.Empty(t, machineTypes) +} + +func TestScalewayCloudProvider_NewNodeGroup(t *testing.T) { + provider := &scalewayCloudProvider{} + + ng, err := provider.NewNodeGroup("DEV1-M", nil, nil, nil, nil) + assert.Error(t, err) + assert.Equal(t, cloudprovider.ErrNotImplemented, err) + assert.Nil(t, ng) +} + +func TestScalewayCloudProvider_GetResourceLimiter(t *testing.T) { + limiter := &cloudprovider.ResourceLimiter{} + provider := &scalewayCloudProvider{ + resourceLimiter: limiter, + } + + result, err := provider.GetResourceLimiter() + require.NoError(t, err) + assert.Equal(t, limiter, result) +} + +func TestScalewayCloudProvider_GPULabel(t *testing.T) { + provider := &scalewayCloudProvider{} + + label := provider.GPULabel() + assert.Equal(t, "k8s.scw.cloud/gpu", label) +} + +func TestScalewayCloudProvider_GetAvailableGPUTypes(t *testing.T) { + provider := &scalewayCloudProvider{} + + gpuTypes := provider.GetAvailableGPUTypes() + assert.Nil(t, gpuTypes) +} + +func TestScalewayCloudProvider_NodePrice(t *testing.T) { + t.Run("node price calculation", func(t *testing.T) { + client := new(mockClient) + pool := createTestPool("pool-1", true, 3, 1, 10) + pool.NodePricePerHour = 0.10 + node := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + nodeGroups: map[string]*NodeGroup{ + "pool-1": { + Client: client, + pool: pool, + nodes: map[string]*scalewaygo.Node{ + "scaleway://fr-par-1/instance-1": &node, + }, + }, + }, + } + + k8sNode := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "scaleway://fr-par-1/instance-1", + }, + } + + startTime := time.Now() + endTime := startTime.Add(2*time.Hour + 30*time.Minute) + + price, err := provider.NodePrice(k8sNode, startTime, endTime) + require.NoError(t, err) + // 2.5 hours rounds up to 3 hours at $0.10/hour = $0.30 + assert.InDelta(t, 0.30, price, 0.001) + }) + + t.Run("node not found", func(t *testing.T) { + client := new(mockClient) + provider := &scalewayCloudProvider{ + client: client, + clusterID: "test-cluster", + nodeGroups: make(map[string]*NodeGroup), + } + + k8sNode := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: "scaleway://fr-par-1/instance-nonexistent", + }, + } + + startTime := time.Now() + endTime := startTime.Add(1 * time.Hour) + + price, err := provider.NodePrice(k8sNode, startTime, endTime) + assert.Error(t, err) + assert.Equal(t, 0.0, price) + }) +} + +func TestScalewayCloudProvider_PodPrice(t *testing.T) { + provider := &scalewayCloudProvider{} + + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + }, + } + + startTime := time.Now() + endTime := startTime.Add(1 * time.Hour) + + price, err := provider.PodPrice(pod, startTime, endTime) + require.NoError(t, err) + assert.Equal(t, 0.0, price) +} + +func TestScalewayCloudProvider_Cleanup(t *testing.T) { + provider := &scalewayCloudProvider{} + + err := provider.Cleanup() + assert.NoError(t, err) +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go index 0869541d6341..5a5c26dec431 100644 --- a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go @@ -18,7 +18,6 @@ package scaleway import ( "context" - "errors" "fmt" "strings" @@ -29,113 +28,108 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/klog/v2" ) -// NodeGroup implements cloudprovider.NodeGroup interface. -// it is used to resize a Scaleway Pool which is a group of nodes with the same capacity. +// NodeGroup contains configuration info and functions to control a set +// of nodes that have the same capacity and set of labels. type NodeGroup struct { scalewaygo.Client nodes map[string]*scalewaygo.Node - specs *scalewaygo.GenericNodeSpecs - p *scalewaygo.Pool + pool scalewaygo.Pool } // MaxSize returns maximum size of the node group. func (ng *NodeGroup) MaxSize() int { klog.V(6).Info("MaxSize,called") - - return int(ng.p.MaxSize) + return ng.pool.MaxSize } // MinSize returns minimum size of the node group. func (ng *NodeGroup) MinSize() int { klog.V(6).Info("MinSize,called") - - return int(ng.p.MinSize) + return ng.pool.MinSize } // TargetSize returns the current target size of the node group. It is possible that the // number of nodes in Kubernetes is different at the moment but should be equal // to Size() once everything stabilizes (new nodes finish startup and registration or -// removed nodes are deleted completely). +// removed nodes are deleted completely). Implementation required. func (ng *NodeGroup) TargetSize() (int, error) { klog.V(6).Info("TargetSize,called") - return int(ng.p.Size), nil + return ng.pool.Size, nil } // IncreaseSize increases the size of the node group. To delete a node you need // to explicitly name it and use DeleteNode. This function should wait until -// node group size is updated. +// node group size is updated. Implementation required. func (ng *NodeGroup) IncreaseSize(delta int) error { - - klog.V(4).Infof("IncreaseSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta) + klog.V(4).Infof("IncreaseSize,ClusterID=%s,delta=%d", ng.pool.ClusterID, delta) if delta <= 0 { return fmt.Errorf("delta must be strictly positive, have: %d", delta) } - targetSize := ng.p.Size + uint32(delta) + targetSize := ng.pool.Size + delta - if targetSize > uint32(ng.MaxSize()) { - return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", - ng.p.Size, targetSize, ng.MaxSize()) + if targetSize < 0 { + return fmt.Errorf("size cannot be negative. current: %d delta: %d", ng.pool.Size, delta) } - ctx := context.Background() - pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{ - PoolID: ng.p.ID, - Size: &targetSize, - }) - if err != nil { - return err + if targetSize > ng.MaxSize() { + return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", ng.pool.Size, targetSize, ng.MaxSize()) } - if pool.Size != targetSize { - return fmt.Errorf("couldn't increase size to %d. Current size is: %d", - targetSize, pool.Size) + updatedPool, err := ng.UpdatePool(context.Background(), ng.pool.ID, targetSize) + if err != nil { + return err } - ng.p.Size = targetSize + ng.pool.Size = updatedPool.Size + ng.pool.Status = updatedPool.Status return nil } -// AtomicIncreaseSize is not implemented. +// AtomicIncreaseSize tries to increase the size of the node group atomically. +// It returns error if requesting the entire delta fails. The method doesn't wait until the new instances appear. +// Implementation is optional. Implementation of this method generally requires external cloud provider support +// for atomically requesting multiple instances. If implemented, CA will take advantage of the method while scaling up +// BestEffortAtomicScaleUp ProvisioningClass, guaranteeing that all instances required for such a +// ProvisioningRequest are provisioned atomically. func (ng *NodeGroup) AtomicIncreaseSize(delta int) error { return cloudprovider.ErrNotImplemented } // DeleteNodes deletes nodes from this node group. Error is returned either on // failure or if the given node doesn't belong to this node group. This function -// should wait until node group size is updated. +// should wait until node group size is updated. Implementation required. func (ng *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { ctx := context.Background() - klog.V(4).Info("DeleteNodes,", len(nodes), " nodes to reclaim") + klog.V(4).Infof("DeleteNodes: %d nodes to reclaim", len(nodes)) for _, n := range nodes { - node, ok := ng.nodes[n.Spec.ProviderID] if !ok { - klog.Errorf("DeleteNodes,ProviderID=%s,PoolID=%s,node marked for deletion not found in pool", n.Spec.ProviderID, ng.p.ID) + klog.Errorf("DeleteNodes,ProviderID=%s,PoolID=%s,node marked for deletion not found in pool", n.Spec.ProviderID, ng.pool.ID) continue } - updatedNode, err := ng.DeleteNode(ctx, &scalewaygo.DeleteNodeRequest{ - NodeID: node.ID, - }) - if err != nil || updatedNode.Status != scalewaygo.NodeStatusDeleting { + deletedNode, err := ng.DeleteNode(ctx, node.ID) + if err != nil { return err } - ng.p.Size-- - ng.nodes[n.Spec.ProviderID].Status = scalewaygo.NodeStatusDeleting + ng.pool.Size-- + ng.nodes[n.Spec.ProviderID].Status = deletedNode.Status } return nil } -// ForceDeleteNodes deletes nodes from the group regardless of constraints. +// ForceDeleteNodes deletes nodes from this node group, without checking for +// constraints like minimal size validation etc. Error is returned either on +// failure or if the given node doesn't belong to this node group. This function +// should wait until node group size is updated. func (ng *NodeGroup) ForceDeleteNodes(nodes []*apiv1.Node) error { return cloudprovider.ErrNotImplemented } @@ -144,56 +138,55 @@ func (ng *NodeGroup) ForceDeleteNodes(nodes []*apiv1.Node) error { // doesn't permit to delete any existing node and can be used only to reduce the // request for new nodes that have not been yet fulfilled. Delta should be negative. // It is assumed that cloud provider will not delete the existing nodes when there -// is an option to just decrease the target. +// is an option to just decrease the target. Implementation required. func (ng *NodeGroup) DecreaseTargetSize(delta int) error { - - klog.V(4).Infof("DecreaseTargetSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta) + klog.V(4).Infof("DecreaseTargetSize,ClusterID=%s,delta=%d", ng.pool.ClusterID, delta) if delta >= 0 { return fmt.Errorf("delta must be strictly negative, have: %d", delta) } - targetSize := ng.p.Size + uint32(delta) - if int(targetSize) < ng.MinSize() { - return fmt.Errorf("size decrease is too large. current: %d desired: %d min: %d", - ng.p.Size, targetSize, ng.MinSize()) + targetSize := ng.pool.Size + delta + + if targetSize < 0 { + return fmt.Errorf("size cannot be negative. current: %d delta: %d", ng.pool.Size, delta) + } + + if targetSize < ng.MinSize() { + return fmt.Errorf("size decrease is too large. current: %d desired: %d min: %d", ng.pool.Size, targetSize, ng.MinSize()) } ctx := context.Background() - pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{ - PoolID: ng.p.ID, - Size: &targetSize, - }) + updatedNode, err := ng.UpdatePool(ctx, ng.pool.ID, targetSize) if err != nil { return err } - if pool.Size != targetSize { - return fmt.Errorf("couldn't decrease size to %d. Current size is: %d", - targetSize, pool.Size) - } + ng.pool.Size = updatedNode.Size + ng.pool.Status = scalewaygo.PoolStatusScaling - ng.p.Size = targetSize return nil } // Id returns an unique identifier of the node group. func (ng *NodeGroup) Id() string { - return ng.p.ID + return ng.pool.ID } // Debug returns a string containing all information regarding this node group. func (ng *NodeGroup) Debug() string { klog.V(4).Info("Debug,called") - return fmt.Sprintf("id:%s,status:%s,version:%s,autoscaling:%t,size:%d,min_size:%d,max_size:%d", ng.Id(), ng.p.Status, ng.p.Version, ng.p.Autoscaling, ng.p.Size, ng.MinSize(), ng.MaxSize()) + return fmt.Sprintf("id:%s,status:%s,version:%s,autoscaling:%t,size:%d,min_size:%d,max_size:%d", ng.Id(), ng.pool.Status, ng.pool.Version, ng.pool.Autoscaling, ng.pool.Size, ng.MinSize(), ng.MaxSize()) } // Nodes returns a list of all nodes that belong to this node group. +// It is required that Instance objects returned by this method have Id field set. +// Other fields are optional. +// This list should include also instances that might have not become a kubernetes node yet. func (ng *NodeGroup) Nodes() ([]cloudprovider.Instance, error) { - var nodes []cloudprovider.Instance - - klog.V(4).Info("Nodes,PoolID=", ng.p.ID) + klog.V(4).Infof("Nodes,PoolID=%s", ng.pool.ID) + nodes := make([]cloudprovider.Instance, 0, len(ng.nodes)) for _, node := range ng.nodes { nodes = append(nodes, cloudprovider.Instance{ Id: node.ProviderID, @@ -209,127 +202,68 @@ func (ng *NodeGroup) Nodes() ([]cloudprovider.Instance, error) { // predict what would a new node look like if a node group was expanded. The returned // NodeInfo is expected to have a fully populated Node object, with all of the labels, // capacity and allocatable information as well as all pods that are started on -// the node by default, using manifest (most likely only kube-proxy). +// the node by default, using manifest (most likely only kube-proxy). Implementation optional. func (ng *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) { - klog.V(4).Infof("TemplateNodeInfo,PoolID=%s", ng.p.ID) + klog.V(4).Infof("TemplateNodeInfo,PoolID=%s", ng.pool.ID) node := apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: ng.specs.Labels[apiv1.LabelHostname], - Labels: ng.specs.Labels, + Name: ng.pool.Labels[apiv1.LabelHostname], + Labels: ng.pool.Labels, }, Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{}, Allocatable: apiv1.ResourceList{}, }, } - node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuCapacity), resource.DecimalSI) - node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryCapacity), resource.DecimalSI) - node.Status.Capacity[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageCapacity), resource.DecimalSI) - node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI) - - node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuAllocatable), resource.DecimalSI) - node.Status.Allocatable[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryAllocatable), resource.DecimalSI) - node.Status.Allocatable[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageAllocatable), resource.DecimalSI) - node.Status.Allocatable[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI) - - if ng.specs.Gpu > 0 { - nbGpu := *resource.NewQuantity(int64(ng.specs.Gpu), resource.DecimalSI) - node.Status.Capacity[gpu.ResourceNvidiaGPU] = nbGpu - node.Status.Allocatable[gpu.ResourceNvidiaGPU] = nbGpu - } - - node.Status.Conditions = cloudprovider.BuildReadyConditions() - node.Spec.Taints = parseTaints(ng.specs.Taints) - nodeInfo := framework.NewNodeInfo(&node, nil, &framework.PodInfo{Pod: cloudprovider.BuildKubeProxy(ng.p.Name)}) - return nodeInfo, nil -} - -func parseTaints(taints map[string]string) []apiv1.Taint { - k8sTaints := make([]apiv1.Taint, 0, len(taints)) + for capacityName, capacityValue := range ng.pool.Capacity { + node.Status.Capacity[apiv1.ResourceName(capacityName)] = *resource.NewQuantity(capacityValue, resource.DecimalSI) + } - for key, valueEffect := range taints { + for allocatableName, allocatableValue := range ng.pool.Allocatable { + node.Status.Allocatable[apiv1.ResourceName(allocatableName)] = *resource.NewQuantity(allocatableValue, resource.DecimalSI) + } - splittedValueEffect := strings.Split(valueEffect, ":") - var taint apiv1.Taint + node.Status.Conditions = cloudprovider.BuildReadyConditions() + node.Spec.Taints = parseTaints(ng.pool.Taints) - switch apiv1.TaintEffect(splittedValueEffect[len(splittedValueEffect)-1]) { - case apiv1.TaintEffectNoExecute: - taint.Effect = apiv1.TaintEffectNoExecute - case apiv1.TaintEffectNoSchedule: - taint.Effect = apiv1.TaintEffectNoSchedule - case apiv1.TaintEffectPreferNoSchedule: - taint.Effect = apiv1.TaintEffectPreferNoSchedule - default: - continue - } - if len(splittedValueEffect) == 2 { - taint.Value = splittedValueEffect[0] - } - taint.Key = key + nodeInfo := framework.NewNodeInfo(&node, nil, &framework.PodInfo{Pod: cloudprovider.BuildKubeProxy(ng.pool.Name)}) - k8sTaints = append(k8sTaints, taint) - } - return k8sTaints + return nodeInfo, nil } // Exist checks if the node group really exists on the cloud provider side. Allows to tell the -// theoretical node group from the real one. +// theoretical node group from the real one. Implementation required. func (ng *NodeGroup) Exist() bool { - - klog.V(4).Infof("Exist,PoolID=%s", ng.p.ID) - - _, err := ng.GetPool(context.Background(), &scalewaygo.GetPoolRequest{ - PoolID: ng.p.ID, - }) - if err != nil && errors.Is(err, scalewaygo.ErrClientSide) { - return false - } + klog.V(4).Infof("Exist,PoolID=%s", ng.pool.ID) return true - } -// Pool Autoprovision feature is not supported by Scaleway - -// Create creates the node group on the cloud provider side. +// Create creates the node group on the cloud provider side. Implementation optional. func (ng *NodeGroup) Create() (cloudprovider.NodeGroup, error) { return nil, cloudprovider.ErrNotImplemented } // Delete deletes the node group on the cloud provider side. +// This will be executed only for autoprovisioned node groups, once their size drops to 0. +// Implementation optional. func (ng *NodeGroup) Delete() error { return cloudprovider.ErrNotImplemented } -// Autoprovisioned returns true if the node group is autoprovisioned. +// Autoprovisioned returns true if the node group is autoprovisioned. An autoprovisioned group +// was created by CA and can be deleted when scaled to 0. func (ng *NodeGroup) Autoprovisioned() bool { return false } -// GetOptions returns nil which means 'use defaults options' +// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular +// NodeGroup. Returning a nil will result in using default options. +// Implementation optional. Callers MUST handle `cloudprovider.ErrNotImplemented`. func (ng *NodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { return nil, cloudprovider.ErrNotImplemented } -// nodesFromPool returns the nodes associated to a Scaleway Pool -func nodesFromPool(client scalewaygo.Client, p *scalewaygo.Pool) (map[string]*scalewaygo.Node, error) { - - ctx := context.Background() - resp, err := client.ListNodes(ctx, &scalewaygo.ListNodesRequest{ClusterID: p.ClusterID, PoolID: &p.ID}) - if err != nil { - return nil, err - } - - nodes := make(map[string]*scalewaygo.Node) - for _, node := range resp.Nodes { - nodes[node.ProviderID] = node - } - - klog.V(4).Infof("nodesFromPool,PoolID=%s,%d nodes found", p.ID, len(nodes)) - - return nodes, nil -} - func fromScwStatus(status scalewaygo.NodeStatus) *cloudprovider.InstanceStatus { st := &cloudprovider.InstanceStatus{} switch status { @@ -364,3 +298,30 @@ func fromScwStatus(status scalewaygo.NodeStatus) *cloudprovider.InstanceStatus { return st } + +func parseTaints(taints map[string]string) []apiv1.Taint { + k8sTaints := make([]apiv1.Taint, 0, len(taints)) + for key, valueEffect := range taints { + splittedValueEffect := strings.Split(valueEffect, ":") + var taint apiv1.Taint + + switch apiv1.TaintEffect(splittedValueEffect[len(splittedValueEffect)-1]) { + case apiv1.TaintEffectNoExecute: + taint.Effect = apiv1.TaintEffectNoExecute + case apiv1.TaintEffectNoSchedule: + taint.Effect = apiv1.TaintEffectNoSchedule + case apiv1.TaintEffectPreferNoSchedule: + taint.Effect = apiv1.TaintEffectPreferNoSchedule + default: + continue + } + if len(splittedValueEffect) == 2 { + taint.Value = splittedValueEffect[0] + } + taint.Key = key + + k8sTaints = append(k8sTaints, taint) + } + + return k8sTaints +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go index b971cbdaff03..78e2f5e39c98 100644 --- a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go @@ -17,222 +17,735 @@ limitations under the License. package scaleway import ( - "context" - "errors" + "fmt" + "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" - "testing" + "k8s.io/autoscaler/cluster-autoscaler/config" ) -func TestNodeGroup_TargetSize(t *testing.T) { - var nodesNb uint32 = 3 - +func TestNodeGroup_MaxSize(t *testing.T) { ng := &NodeGroup{ - p: &scalewaygo.Pool{ - Size: nodesNb, + pool: scalewaygo.Pool{ + MaxSize: 10, }, } - size, err := ng.TargetSize() - assert.NoError(t, err) - assert.Equal(t, int(nodesNb), size, "target size is wrong") + + assert.Equal(t, 10, ng.MaxSize()) } -func TestNodeGroup_IncreaseSize(t *testing.T) { - ctx := context.Background() - nodesNb := 3 - delta := 2 - client := &clientMock{} +func TestNodeGroup_MinSize(t *testing.T) { ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), - MinSize: 1, - MaxSize: 10, - Autoscaling: true, + pool: scalewaygo.Pool{ + MinSize: 2, }, } - newSize := uint32(nodesNb + delta) - client.On("UpdatePool", - ctx, - &scalewaygo.UpdatePoolRequest{ - PoolID: ng.p.ID, - Size: &newSize, - }).Return( - &scalewaygo.Pool{ - Size: newSize, - }, nil, - ).Once() - err := ng.IncreaseSize(delta) - assert.NoError(t, err) -} - -func TestNodeGroup_IncreaseNegativeDelta(t *testing.T) { - nodesNb := 3 - delta := -2 - client := &clientMock{} + assert.Equal(t, 2, ng.MinSize()) +} + +func TestNodeGroup_TargetSize(t *testing.T) { ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), + pool: scalewaygo.Pool{ + Size: 5, }, } - err := ng.IncreaseSize(delta) - assert.Error(t, err) + size, err := ng.TargetSize() + require.NoError(t, err) + assert.Equal(t, 5, size) } -func TestNodeGroup_IncreaseAboveMaximum(t *testing.T) { - nodesNb := 3 - delta := 10 - client := &clientMock{} - ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), - MaxSize: 10, - }, - } +func TestNodeGroup_IncreaseSize(t *testing.T) { + t.Run("successful increase", func(t *testing.T) { + client := new(mockClient) + pool := scalewaygo.Pool{ + ID: "pool-1", + ClusterID: "test-cluster", + Size: 3, + MinSize: 1, + MaxSize: 10, + Status: scalewaygo.PoolStatusReady, + } + + updatedPool := pool + updatedPool.Size = 5 + updatedPool.Status = scalewaygo.PoolStatusScaling + + client.On("UpdatePool", mock.Anything, "pool-1", 5).Return(updatedPool, nil) + + ng := &NodeGroup{ + Client: client, + pool: pool, + nodes: make(map[string]*scalewaygo.Node), + } + + err := ng.IncreaseSize(2) + require.NoError(t, err) + assert.Equal(t, 5, ng.pool.Size) + assert.Equal(t, scalewaygo.PoolStatusScaling, ng.pool.Status) + + client.AssertExpectations(t) + }) + + t.Run("delta is zero", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 3, + MinSize: 1, + MaxSize: 10, + }, + } + + err := ng.IncreaseSize(0) + assert.Error(t, err) + assert.Contains(t, err.Error(), "delta must be strictly positive") + }) + + t.Run("delta is negative", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 3, + MinSize: 1, + MaxSize: 10, + }, + } + + err := ng.IncreaseSize(-1) + assert.Error(t, err) + assert.Contains(t, err.Error(), "delta must be strictly positive") + }) + + t.Run("exceeds max size", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 8, + MinSize: 1, + MaxSize: 10, + }, + } + + err := ng.IncreaseSize(5) + assert.Error(t, err) + assert.Contains(t, err.Error(), "size increase is too large") + }) + + t.Run("negative target size from overflow", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: -5, // Simulating corrupted state + MinSize: 0, + MaxSize: 10, + }, + } + + // Starting from negative size (corrupted state) + err := ng.IncreaseSize(2) + assert.Error(t, err) + assert.Contains(t, err.Error(), "size cannot be negative") + }) - err := ng.IncreaseSize(delta) - assert.Error(t, err) + t.Run("API error", func(t *testing.T) { + client := new(mockClient) + pool := scalewaygo.Pool{ + ID: "pool-1", + ClusterID: "test-cluster", + Size: 3, + MinSize: 1, + MaxSize: 10, + } + + client.On("UpdatePool", mock.Anything, "pool-1", 5).Return( + scalewaygo.Pool{}, + fmt.Errorf("API error"), + ) + + ng := &NodeGroup{ + Client: client, + pool: pool, + nodes: make(map[string]*scalewaygo.Node), + } + + err := ng.IncreaseSize(2) + assert.Error(t, err) + + client.AssertExpectations(t) + }) } func TestNodeGroup_DecreaseTargetSize(t *testing.T) { - ctx := context.Background() - nodesNb := 5 - delta := -4 - client := &clientMock{} - ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), - MinSize: 1, - MaxSize: 10, - Autoscaling: true, - }, - } + t.Run("successful decrease", func(t *testing.T) { + client := new(mockClient) + pool := scalewaygo.Pool{ + ID: "pool-1", + ClusterID: "test-cluster", + Size: 5, + MinSize: 1, + MaxSize: 10, + Status: scalewaygo.PoolStatusReady, + } + + updatedPool := pool + updatedPool.Size = 3 + updatedPool.Status = scalewaygo.PoolStatusScaling + + client.On("UpdatePool", mock.Anything, "pool-1", 3).Return(updatedPool, nil) + + ng := &NodeGroup{ + Client: client, + pool: pool, + nodes: make(map[string]*scalewaygo.Node), + } + + err := ng.DecreaseTargetSize(-2) + require.NoError(t, err) + assert.Equal(t, 3, ng.pool.Size) + assert.Equal(t, scalewaygo.PoolStatusScaling, ng.pool.Status) + + client.AssertExpectations(t) + }) + + t.Run("delta is zero", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 3, + MinSize: 1, + MaxSize: 10, + }, + } + + err := ng.DecreaseTargetSize(0) + assert.Error(t, err) + assert.Contains(t, err.Error(), "delta must be strictly negative") + }) + + t.Run("delta is positive", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 3, + MinSize: 1, + MaxSize: 10, + }, + } + + err := ng.DecreaseTargetSize(1) + assert.Error(t, err) + assert.Contains(t, err.Error(), "delta must be strictly negative") + }) + + t.Run("below min size", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 5, + MinSize: 3, + MaxSize: 10, + }, + } + + // 5 + (-3) = 2, which is below min (3) but not negative + err := ng.DecreaseTargetSize(-3) + assert.Error(t, err) + assert.Contains(t, err.Error(), "size decrease is too large") + }) + + t.Run("negative target size", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + Size: 2, + MinSize: 0, + MaxSize: 10, + }, + } + + // Attempting to decrease by more than current size would result in negative + // 2 + (-5) = -3 + err := ng.DecreaseTargetSize(-5) + assert.Error(t, err) + assert.Contains(t, err.Error(), "size cannot be negative") + }) + + t.Run("API error", func(t *testing.T) { + client := new(mockClient) + pool := scalewaygo.Pool{ + ID: "pool-1", + ClusterID: "test-cluster", + Size: 5, + MinSize: 1, + MaxSize: 10, + } + + client.On("UpdatePool", mock.Anything, "pool-1", 3).Return( + scalewaygo.Pool{}, + fmt.Errorf("API error"), + ) + + ng := &NodeGroup{ + Client: client, + pool: pool, + nodes: make(map[string]*scalewaygo.Node), + } + + err := ng.DecreaseTargetSize(-2) + assert.Error(t, err) + + client.AssertExpectations(t) + }) +} + +func TestNodeGroup_DeleteNodes(t *testing.T) { + t.Run("successful delete", func(t *testing.T) { + client := new(mockClient) + node1 := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + node2 := createTestNode("node-2", "pool-1", "scaleway://fr-par-1/instance-2", scalewaygo.NodeStatusReady) + + deletedNode1 := node1 + deletedNode1.Status = scalewaygo.NodeStatusDeleting + deletedNode2 := node2 + deletedNode2.Status = scalewaygo.NodeStatusDeleting + + client.On("DeleteNode", mock.Anything, "node-1").Return(deletedNode1, nil) + client.On("DeleteNode", mock.Anything, "node-2").Return(deletedNode2, nil) + + ng := &NodeGroup{ + Client: client, + pool: scalewaygo.Pool{ + ID: "pool-1", + Size: 5, + }, + nodes: map[string]*scalewaygo.Node{ + "scaleway://fr-par-1/instance-1": &node1, + "scaleway://fr-par-1/instance-2": &node2, + }, + } - newSize := uint32(nodesNb + delta) - client.On("UpdatePool", - ctx, - &scalewaygo.UpdatePoolRequest{ - PoolID: ng.p.ID, - Size: &newSize, - }).Return( - &scalewaygo.Pool{ - Size: newSize, - }, nil, - ).Once() - err := ng.DecreaseTargetSize(delta) - assert.NoError(t, err) -} - -func TestNodeGroup_DecreaseTargetSizePositiveDelta(t *testing.T) { - nodesNb := 3 - delta := 2 - client := &clientMock{} + k8sNodes := []*apiv1.Node{ + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://fr-par-1/instance-1"}}, + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://fr-par-1/instance-2"}}, + } + + err := ng.DeleteNodes(k8sNodes) + require.NoError(t, err) + assert.Equal(t, 3, ng.pool.Size) + assert.Equal(t, scalewaygo.NodeStatusDeleting, ng.nodes["scaleway://fr-par-1/instance-1"].Status) + assert.Equal(t, scalewaygo.NodeStatusDeleting, ng.nodes["scaleway://fr-par-1/instance-2"].Status) + + client.AssertExpectations(t) + }) + + t.Run("node not found in pool", func(t *testing.T) { + client := new(mockClient) + + ng := &NodeGroup{ + Client: client, + pool: scalewaygo.Pool{ + ID: "pool-1", + Size: 3, + }, + nodes: make(map[string]*scalewaygo.Node), + } + + k8sNodes := []*apiv1.Node{ + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://fr-par-1/instance-nonexistent"}}, + } + + err := ng.DeleteNodes(k8sNodes) + // Should not error, just log and continue + require.NoError(t, err) + assert.Equal(t, 3, ng.pool.Size) // Size unchanged + + client.AssertExpectations(t) + }) + + t.Run("API error", func(t *testing.T) { + client := new(mockClient) + node1 := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + + client.On("DeleteNode", mock.Anything, "node-1").Return( + scalewaygo.Node{}, + fmt.Errorf("API error"), + ) + + ng := &NodeGroup{ + Client: client, + pool: scalewaygo.Pool{ + ID: "pool-1", + Size: 3, + }, + nodes: map[string]*scalewaygo.Node{ + "scaleway://fr-par-1/instance-1": &node1, + }, + } + + k8sNodes := []*apiv1.Node{ + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://fr-par-1/instance-1"}}, + } + + err := ng.DeleteNodes(k8sNodes) + assert.Error(t, err) + + client.AssertExpectations(t) + }) +} + +func TestNodeGroup_ForceDeleteNodes(t *testing.T) { + ng := &NodeGroup{} + + err := ng.ForceDeleteNodes([]*apiv1.Node{}) + assert.Equal(t, cloudprovider.ErrNotImplemented, err) +} + +func TestNodeGroup_AtomicIncreaseSize(t *testing.T) { + ng := &NodeGroup{} + + err := ng.AtomicIncreaseSize(1) + assert.Equal(t, cloudprovider.ErrNotImplemented, err) +} + +func TestNodeGroup_Id(t *testing.T) { ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), + pool: scalewaygo.Pool{ + ID: "pool-123", }, } - err := ng.DecreaseTargetSize(delta) - assert.Error(t, err) + assert.Equal(t, "pool-123", ng.Id()) } -func TestNodeGroup_DecreaseBelowMinimum(t *testing.T) { - nodesNb := 3 - delta := -3 - client := &clientMock{} +func TestNodeGroup_Debug(t *testing.T) { ng := &NodeGroup{ - Client: client, - p: &scalewaygo.Pool{ - Size: uint32(nodesNb), - MinSize: 1, + pool: scalewaygo.Pool{ + ID: "pool-123", + Status: scalewaygo.PoolStatusReady, + Version: "1.27.0", + Autoscaling: true, + Size: 5, + MinSize: 1, + MaxSize: 10, }, } - err := ng.DecreaseTargetSize(delta) - assert.Error(t, err) + debug := ng.Debug() + assert.Contains(t, debug, "pool-123") + assert.Contains(t, debug, "ready") + assert.Contains(t, debug, "1.27.0") + assert.Contains(t, debug, "true") + assert.Contains(t, debug, "size:5") + assert.Contains(t, debug, "min_size:1") + assert.Contains(t, debug, "max_size:10") } -func TestNodeGroup_DeleteNodes(t *testing.T) { - ctx := context.Background() - client := &clientMock{} +func TestNodeGroup_Nodes(t *testing.T) { + t.Run("returns all nodes", func(t *testing.T) { + node1 := createTestNode("node-1", "pool-1", "scaleway://fr-par-1/instance-1", scalewaygo.NodeStatusReady) + node2 := createTestNode("node-2", "pool-1", "scaleway://fr-par-1/instance-2", scalewaygo.NodeStatusCreating) + node3 := createTestNode("node-3", "pool-1", "scaleway://fr-par-1/instance-3", scalewaygo.NodeStatusDeleting) + + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + ID: "pool-1", + }, + nodes: map[string]*scalewaygo.Node{ + "scaleway://fr-par-1/instance-1": &node1, + "scaleway://fr-par-1/instance-2": &node2, + "scaleway://fr-par-1/instance-3": &node3, + }, + } + + instances, err := ng.Nodes() + require.NoError(t, err) + assert.Len(t, instances, 3) + + // Verify instances have correct data + providerIDs := make(map[string]bool) + for _, inst := range instances { + providerIDs[inst.Id] = true + assert.NotNil(t, inst.Status) + } + + assert.True(t, providerIDs["scaleway://fr-par-1/instance-1"]) + assert.True(t, providerIDs["scaleway://fr-par-1/instance-2"]) + assert.True(t, providerIDs["scaleway://fr-par-1/instance-3"]) + }) + + t.Run("empty node group", func(t *testing.T) { + ng := &NodeGroup{ + pool: scalewaygo.Pool{ + ID: "pool-1", + }, + nodes: make(map[string]*scalewaygo.Node), + } + + instances, err := ng.Nodes() + require.NoError(t, err) + assert.Empty(t, instances) + }) +} + +func TestNodeGroup_TemplateNodeInfo(t *testing.T) { ng := &NodeGroup{ - Client: client, - nodes: map[string]*scalewaygo.Node{ - "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15": {ID: "6852824b-e409-4c77-94df-819629d135b9", ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"}, - "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066": {ID: "84acb1a6-0e14-4j36-8b32-71bf7b328c22", ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"}, - "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529": {ID: "5c4d832a-d964-4c64-9d53-b9295c206cdd", ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"}, - }, - p: &scalewaygo.Pool{ - Size: 3, + pool: scalewaygo.Pool{ + ID: "pool-1", + Name: "test-pool", + Capacity: map[string]int64{ + "cpu": 2000, + "memory": 4096000000, + "ephemeral-storage": 20000000000, + "pods": 110, + }, + Allocatable: map[string]int64{ + "cpu": 1800, + "memory": 3500000000, + "ephemeral-storage": 18000000000, + "pods": 110, + }, + Labels: map[string]string{ + "kubernetes.io/hostname": "test-node", + "node.kubernetes.io/instance-type": "DEV1-M", + }, + Taints: map[string]string{ + "key1": "value1:NoSchedule", + "key2": "NoExecute", + }, }, } - nodes := []*apiv1.Node{ - {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"}}, - {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"}}, - {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"}}, + nodeInfo, err := ng.TemplateNodeInfo() + require.NoError(t, err) + require.NotNil(t, nodeInfo) + + node := nodeInfo.Node() + require.NotNil(t, node) + + // Verify capacity + cpuCapacity := node.Status.Capacity[apiv1.ResourceCPU] + assert.Equal(t, int64(2000), cpuCapacity.Value()) + memCapacity := node.Status.Capacity[apiv1.ResourceMemory] + assert.Equal(t, int64(4096000000), memCapacity.Value()) + storageCapacity := node.Status.Capacity[apiv1.ResourceEphemeralStorage] + assert.Equal(t, int64(20000000000), storageCapacity.Value()) + podsCapacity := node.Status.Capacity[apiv1.ResourcePods] + assert.Equal(t, int64(110), podsCapacity.Value()) + + // Verify allocatable + cpuAllocatable := node.Status.Allocatable[apiv1.ResourceCPU] + assert.Equal(t, int64(1800), cpuAllocatable.Value()) + memAllocatable := node.Status.Allocatable[apiv1.ResourceMemory] + assert.Equal(t, int64(3500000000), memAllocatable.Value()) + storageAllocatable := node.Status.Allocatable[apiv1.ResourceEphemeralStorage] + assert.Equal(t, int64(18000000000), storageAllocatable.Value()) + podsAllocatable := node.Status.Allocatable[apiv1.ResourcePods] + assert.Equal(t, int64(110), podsAllocatable.Value()) + + // Verify labels + assert.Equal(t, "test-node", node.ObjectMeta.Labels["kubernetes.io/hostname"]) + assert.Equal(t, "DEV1-M", node.ObjectMeta.Labels["node.kubernetes.io/instance-type"]) + + // Verify taints + assert.Len(t, node.Spec.Taints, 2) + taintMap := make(map[string]apiv1.Taint) + for _, taint := range node.Spec.Taints { + taintMap[taint.Key] = taint } - client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() - client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() - client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() + assert.Equal(t, apiv1.TaintEffectNoSchedule, taintMap["key1"].Effect) + assert.Equal(t, "value1", taintMap["key1"].Value) + assert.Equal(t, apiv1.TaintEffectNoExecute, taintMap["key2"].Effect) + assert.Equal(t, "", taintMap["key2"].Value) - err := ng.DeleteNodes(nodes) - assert.NoError(t, err) - assert.Equal(t, uint32(0), ng.p.Size) + // Verify conditions + assert.NotEmpty(t, node.Status.Conditions) } -func TestNodeGroup_DeleteNodesErr(t *testing.T) { - ctx := context.Background() - client := &clientMock{} +func TestNodeGroup_Exist(t *testing.T) { ng := &NodeGroup{ - Client: client, - nodes: map[string]*scalewaygo.Node{ - "nonexistent-on-provider-side": {ID: "unknown"}, + pool: scalewaygo.Pool{ + ID: "pool-123", }, } - nodes := []*apiv1.Node{ - {Spec: apiv1.NodeSpec{ProviderID: "nonexistent-on-provider-side"}}, - } - client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: "unknown"}).Return(&scalewaygo.Node{}, errors.New("nonexistent")).Once() - err := ng.DeleteNodes(nodes) - assert.Error(t, err) + // Always returns true in current implementation + assert.True(t, ng.Exist()) } -type clientMock struct { - mock.Mock +func TestNodeGroup_Create(t *testing.T) { + ng := &NodeGroup{} + + newNg, err := ng.Create() + assert.Equal(t, cloudprovider.ErrNotImplemented, err) + assert.Nil(t, newNg) } -func (m *clientMock) GetPool(ctx context.Context, req *scalewaygo.GetPoolRequest) (*scalewaygo.Pool, error) { - args := m.Called(ctx, req) - return args.Get(0).(*scalewaygo.Pool), args.Error(1) +func TestNodeGroup_Delete(t *testing.T) { + ng := &NodeGroup{} + + err := ng.Delete() + assert.Equal(t, cloudprovider.ErrNotImplemented, err) } -func (m *clientMock) ListPools(ctx context.Context, req *scalewaygo.ListPoolsRequest) (*scalewaygo.ListPoolsResponse, error) { - args := m.Called(ctx, req) - return args.Get(0).(*scalewaygo.ListPoolsResponse), args.Error(1) +func TestNodeGroup_Autoprovisioned(t *testing.T) { + ng := &NodeGroup{} + + assert.False(t, ng.Autoprovisioned()) } -func (m *clientMock) UpdatePool(ctx context.Context, req *scalewaygo.UpdatePoolRequest) (*scalewaygo.Pool, error) { - args := m.Called(ctx, req) - return args.Get(0).(*scalewaygo.Pool), args.Error(1) +func TestNodeGroup_GetOptions(t *testing.T) { + ng := &NodeGroup{} + + defaults := config.NodeGroupAutoscalingOptions{} + opts, err := ng.GetOptions(defaults) + assert.Equal(t, cloudprovider.ErrNotImplemented, err) + assert.Nil(t, opts) } -func (m *clientMock) ListNodes(ctx context.Context, req *scalewaygo.ListNodesRequest) (*scalewaygo.ListNodesResponse, error) { - args := m.Called(ctx, req) - return args.Get(0).(*scalewaygo.ListNodesResponse), args.Error(1) +func TestFromScwStatus(t *testing.T) { + tests := []struct { + name string + status scalewaygo.NodeStatus + expectedState cloudprovider.InstanceState + hasErrorInfo bool + errorCode string + }{ + { + name: "ready", + status: scalewaygo.NodeStatusReady, + expectedState: cloudprovider.InstanceRunning, + hasErrorInfo: false, + }, + { + name: "creating", + status: scalewaygo.NodeStatusCreating, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + { + name: "starting", + status: scalewaygo.NodeStatusStarting, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + { + name: "registering", + status: scalewaygo.NodeStatusRegistering, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + { + name: "not_ready", + status: scalewaygo.NodeStatusNotReady, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + { + name: "deleting", + status: scalewaygo.NodeStatusDeleting, + expectedState: cloudprovider.InstanceDeleting, + hasErrorInfo: false, + }, + { + name: "deleted", + status: scalewaygo.NodeStatusDeleted, + hasErrorInfo: true, + errorCode: "deleted", + }, + { + name: "creation_error", + status: scalewaygo.NodeStatusCreationError, + hasErrorInfo: true, + errorCode: "creation_error", + }, + { + name: "upgrading", + status: scalewaygo.NodeStatusUpgrading, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + { + name: "locked", + status: scalewaygo.NodeStatusLocked, + hasErrorInfo: true, + errorCode: "locked", + }, + { + name: "rebooting", + status: scalewaygo.NodeStatusRebooting, + expectedState: cloudprovider.InstanceCreating, + hasErrorInfo: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + status := fromScwStatus(tt.status) + require.NotNil(t, status) + assert.Equal(t, tt.expectedState, status.State) + if tt.hasErrorInfo { + require.NotNil(t, status.ErrorInfo) + assert.Equal(t, tt.errorCode, status.ErrorInfo.ErrorCode) + } + }) + } } -func (m *clientMock) DeleteNode(ctx context.Context, req *scalewaygo.DeleteNodeRequest) (*scalewaygo.Node, error) { - args := m.Called(ctx, req) - return args.Get(0).(*scalewaygo.Node), args.Error(1) +func TestParseTaints(t *testing.T) { + t.Run("parse various taint formats", func(t *testing.T) { + taints := map[string]string{ + "key1": "value1:NoSchedule", + "key2": "value2:NoExecute", + "key3": "value3:PreferNoSchedule", + "key4": "NoSchedule", + "key5": "invalid:InvalidEffect", + } + + k8sTaints := parseTaints(taints) + assert.Len(t, k8sTaints, 4) // key5 should be skipped + + taintMap := make(map[string]apiv1.Taint) + for _, taint := range k8sTaints { + taintMap[taint.Key] = taint + } + + // Verify key1 + assert.Equal(t, apiv1.TaintEffectNoSchedule, taintMap["key1"].Effect) + assert.Equal(t, "value1", taintMap["key1"].Value) + + // Verify key2 + assert.Equal(t, apiv1.TaintEffectNoExecute, taintMap["key2"].Effect) + assert.Equal(t, "value2", taintMap["key2"].Value) + + // Verify key3 + assert.Equal(t, apiv1.TaintEffectPreferNoSchedule, taintMap["key3"].Effect) + assert.Equal(t, "value3", taintMap["key3"].Value) + + // Verify key4 (no value) + assert.Equal(t, apiv1.TaintEffectNoSchedule, taintMap["key4"].Effect) + assert.Equal(t, "", taintMap["key4"].Value) + + // Verify key5 is not present + _, exists := taintMap["key5"] + assert.False(t, exists) + }) + + t.Run("empty taints", func(t *testing.T) { + taints := map[string]string{} + k8sTaints := parseTaints(taints) + assert.Empty(t, k8sTaints) + }) + + t.Run("taint with multiple colons has no value", func(t *testing.T) { + // parseTaints only extracts value if there are exactly 2 parts (value:Effect) + // With multiple colons, the value is not extracted + taints := map[string]string{ + "key1": "value:with:colons:NoSchedule", + } + + k8sTaints := parseTaints(taints) + assert.Len(t, k8sTaints, 1) + assert.Equal(t, "key1", k8sTaints[0].Key) + assert.Equal(t, "", k8sTaints[0].Value) // No value extracted for multiple colons + assert.Equal(t, apiv1.TaintEffectNoSchedule, k8sTaints[0].Effect) + }) } diff --git a/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go index 83e9341303fc..d19ec9dd3975 100644 --- a/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go +++ b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go @@ -23,19 +23,22 @@ import ( "errors" "fmt" "io" - "k8s.io/autoscaler/cluster-autoscaler/version" - "k8s.io/klog/v2" "net" "net/http" "net/url" + "strconv" + "strings" "time" + + "k8s.io/autoscaler/cluster-autoscaler/version" + "k8s.io/klog/v2" ) const ( defaultApiURL string = "https://api.scaleway.com" defaultHTTPTimeout = 30 - pageSizeListPools uint32 = 100 - pageSizeListNodes uint32 = 100 + pageSizeListPools int = 100 + pageSizeListNodes int = 100 ) var ( @@ -57,13 +60,22 @@ var ( ErrOther = errors.New("generic error type") ) +// Client is used to talk to Scaleway Kapsule API +type Client interface { + ListPools(ctx context.Context, clusterID string) (time.Duration, []Pool, error) + UpdatePool(ctx context.Context, poolID string, size int) (Pool, error) + ListNodes(ctx context.Context, clusterID string) (time.Duration, []Node, error) + DeleteNode(ctx context.Context, nodeID string) (Node, error) +} + // Config is used to deserialize config file passed with flag `cloud-config` type Config struct { - ClusterID string `json:"cluster_id"` - SecretKey string `json:"secret_key"` - Region string `json:"region"` - ApiUrl string `json:"api_url"` - UserAgent string + ClusterID string `json:"cluster_id"` + SecretKey string `json:"secret_key"` + Region string `json:"region"` + ApiUrl string `json:"api_url"` + UserAgent string + DefaultCacheControl time.Duration } // NewClient returns a new Client able to talk to Scaleway API @@ -91,11 +103,12 @@ func NewClient(cfg Config) (*client, error) { } return &client{ - httpClient: hc, - apiURL: cfg.ApiUrl, - token: cfg.SecretKey, - userAgent: fmt.Sprintf("%s/%s cluster-id/%s", cfg.UserAgent, version.ClusterAutoscalerVersion, cfg.ClusterID), - region: cfg.Region, + httpClient: hc, + apiURL: cfg.ApiUrl, + token: cfg.SecretKey, + userAgent: fmt.Sprintf("%s/%s cluster-id/%s", cfg.UserAgent, version.ClusterAutoscalerVersion, cfg.ClusterID), + region: cfg.Region, + defaultCacheControl: cfg.DefaultCacheControl, }, nil } @@ -107,25 +120,14 @@ type scalewayRequest struct { Body io.Reader } -// Listing queries default to `fetch all resources` if no `page` is provided -// as CA needs access to all nodes and pools - -// Client is used to talk to Scaleway Kapsule API -type Client interface { - GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) - ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) - UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) - ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) - DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) -} - // client contains necessary information to perform API calls type client struct { - httpClient *http.Client - apiURL string - token string - userAgent string - region string + httpClient *http.Client + apiURL string + token string + userAgent string + region string + defaultCacheControl time.Duration } func (req *scalewayRequest) getURL(apiURL string) (*url.URL, error) { @@ -151,21 +153,21 @@ func (c *client) Region() string { } // do perform a single HTTP request based on the generic Request object. -func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) error { +func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) (http.Header, error) { if req == nil { - return errors.New("request must be non-nil") + return nil, errors.New("request must be non-nil") } // build URL completeURL, err := req.getURL(c.apiURL) if err != nil { - return err + return nil, err } // build request httpRequest, err := http.NewRequest(req.Method, completeURL.String(), req.Body) if err != nil { - return fmt.Errorf("could not create request: %w", err) + return nil, fmt.Errorf("could not create request: %w", err) } httpRequest.Header.Set("User-Agent", c.userAgent) @@ -179,7 +181,7 @@ func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) // execute request httpResponse, err := c.httpClient.Do(httpRequest) if err != nil { - return fmt.Errorf("error executing request: %w", err) + return nil, fmt.Errorf("error executing request: %w", err) } defer func() { @@ -190,27 +192,26 @@ func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) ct := httpResponse.Header.Get("Content-Type") if ct != "application/json" { - return fmt.Errorf("unexpected content-type: %s with status: %s", ct, httpResponse.Status) + return nil, fmt.Errorf("unexpected content-type: %s with status: %s", ct, httpResponse.Status) } err = json.NewDecoder(httpResponse.Body).Decode(&res) if err != nil { - return fmt.Errorf("could not parse %s response body: %w", ct, err) + return nil, fmt.Errorf("could not parse %s response body: %w", ct, err) } switch { case httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300: - return nil + return httpResponse.Header, nil case httpResponse.StatusCode >= 400 && httpResponse.StatusCode < 500: err = ErrClientSide case httpResponse.StatusCode >= 500 && httpResponse.StatusCode < 600: err = ErrServerSide default: err = ErrOther - } - return fmt.Errorf("%d %v %v: %w", httpResponse.StatusCode, httpRequest.Method, httpRequest.URL, err) + return nil, fmt.Errorf("%d %v %v: %w", httpResponse.StatusCode, httpRequest.Method, httpRequest.URL, err) } // NodeStatus is the state in which a node might be @@ -281,10 +282,6 @@ type Pool struct { ID string `json:"id"` // ClusterID: the cluster ID of the pool ClusterID string `json:"cluster_id"` - // CreatedAt: the date at which the pool was created - CreatedAt *time.Time `json:"created_at"` - // UpdatedAt: the date at which the pool was last updated - UpdatedAt *time.Time `json:"updated_at"` // Name: the name of the pool Name string `json:"name"` // Status: the status of the pool @@ -296,162 +293,81 @@ type Pool struct { // Autoscaling: the enablement of the autoscaling feature for the pool Autoscaling bool `json:"autoscaling"` // Size: the size (number of nodes) of the pool - Size uint32 `json:"size"` + Size int `json:"size"` // MinSize: the minimum size of the pool - MinSize uint32 `json:"min_size"` + MinSize int `json:"min_size"` // MaxSize: the maximum size of the pool - MaxSize uint32 `json:"max_size"` + MaxSize int `json:"max_size"` // Zone: the zone where the nodes will be spawn in Zone string `json:"zone"` -} - -// GetPoolRequest is passed to `GetPool` method -type GetPoolRequest struct { - // PoolID: the ID of the requested pool - PoolID string `json:"-"` -} - -// GetPool is used to request a Pool by its id -func (c *client) GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) { - var err error - - klog.V(4).Info("GetPool,PoolID=", req.PoolID) - - if fmt.Sprint(req.PoolID) == "" { - return nil, errors.New("field PoolID cannot be empty in request") - } - - scwReq := &scalewayRequest{ - Method: "GET", - Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "", - } - - var resp Pool - - err = c.do(ctx, scwReq, &resp) - if err != nil { - return nil, err - } - return &resp, nil -} - -// ListPoolsRequest is passed to `ListPools` method -// it can be used for optional pagination -type ListPoolsRequest struct { - // the ID of the cluster from which the pools will be listed from - ClusterID string `json:"-"` - // Page: the page number for the returned pools - Page *int32 `json:"-"` - // PageSize: the maximum number of pools per page - PageSize *uint32 `json:"-"` -} - -// GenericNodeSpecs represents NodeType specs used for scale-up simulations. -// it is used to select the appropriate pool to scale-up. -type GenericNodeSpecs struct { - NodePricePerHour float32 `json:"node_price_per_hour"` - MaxPods uint32 `json:"max_pods"` - Gpu uint32 `json:"gpu"` - CpuCapacity uint32 `json:"cpu_capacity"` - CpuAllocatable uint32 `json:"cpu_allocatable"` - MemoryCapacity uint64 `json:"memory_capacity"` - MemoryAllocatable uint64 `json:"memory_allocatable"` - LocalStorageCapacity uint64 `json:"local_storage_capacity"` - LocalStorageAllocatable uint64 `json:"local_storage_allocatable"` - Labels map[string]string `json:"labels"` - Taints map[string]string `json:"taints"` -} - -// PoolWithGenericNodeSpecs contains the requested `Pool` with additional `Specs` information -type PoolWithGenericNodeSpecs struct { - Pool *Pool `json:"pool"` - Specs GenericNodeSpecs `json:"specs"` + // NodePricePerHour: the price per hour of a single node of the pool + NodePricePerHour float32 `json:"node_price_per_hour"` + // Capacity: capacity of each resource on a single node of the pool + Capacity map[string]int64 `json:"capacity"` + // Allocatable: allocatable of each resource on a single node of the pool + Allocatable map[string]int64 `json:"allocatable"` + // Labels: labels applied to each node of the pool + Labels map[string]string `json:"labels"` + // Taints: taints applied to each node of the pool + Taints map[string]string `json:"taints"` + // CreatedAt: the date at which the pool was created + CreatedAt *time.Time `json:"created_at"` + // UpdatedAt: the date at which the pool was last updated + UpdatedAt *time.Time `json:"updated_at"` } // ListPoolsResponse is returned from `ListPools` method type ListPoolsResponse struct { - // TotalCount: the total number of pools that exists for the cluster - TotalCount uint32 `json:"total_count"` // Pools: the paginated returned pools - Pools []*PoolWithGenericNodeSpecs `json:"pools"` + Pools []Pool `json:"pools"` + // TotalCount: the total count of pools in the cluster + TotalCount uint64 `json:"total_count"` } -// ListPools returns pools associated to a cluster id, pagination optional -func (c *client) ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) { - klog.V(4).Info("ListPools,ClusterID=", req.ClusterID) - - if req.Page != nil { - return c.listPoolsPaginated(ctx, req) - } - - listPools := func(page int32) (*ListPoolsResponse, error) { - - return c.listPoolsPaginated(ctx, &ListPoolsRequest{ - ClusterID: req.ClusterID, - Page: &page, - }) - } - - page := int32(1) - resp, err := listPools(page) - if err != nil { - return nil, err - } - - nbPages := (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools +// ListPools returns pools associated to a cluster id +func (c *client) ListPools(ctx context.Context, clusterID string) (time.Duration, []Pool, error) { + klog.V(4).Infof("ListPools,ClusterID=%s", clusterID) - for uint32(page) <= nbPages { - page++ - r, err := listPools(page) + var currentPage = 1 + var pools []Pool + var cacheControl time.Duration + for { + cc, paginatedPools, err := c.listPoolsPaginated(ctx, clusterID, currentPage, pageSizeListPools) if err != nil { - return nil, err + return 0, []Pool{}, err } + pools = append(pools, paginatedPools...) + cacheControl = cc - resp.Pools = append(resp.Pools, r.Pools...) - - if r.TotalCount != resp.TotalCount { - // pools have changed on scaleway side, retrying - resp.TotalCount = r.TotalCount - resp.Pools = []*PoolWithGenericNodeSpecs{} - page = int32(1) - nbPages = (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools + if len(paginatedPools) < pageSizeListPools || len(paginatedPools) == 0 { + break } + + currentPage++ } - return resp, nil -} -func (c *client) listPoolsPaginated(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) { - var err error + return cacheControl, pools, nil +} - pageSize := pageSizeListPools - if req.PageSize == nil { - req.PageSize = &pageSize +func (c *client) listPoolsPaginated(ctx context.Context, clusterID string, page, pageSize int) (time.Duration, []Pool, error) { + if len(clusterID) == 0 { + return 0, nil, errors.New("clusterID cannot be empty in request") } query := url.Values{} - if req.Page != nil { - query.Set("page", fmt.Sprint(*req.Page)) - } - query.Set("page_size", fmt.Sprint(*req.PageSize)) - - if fmt.Sprint(req.ClusterID) == "" { - return nil, errors.New("field ClusterID cannot be empty in request") - } + query.Set("page", strconv.Itoa(page)) + query.Set("page_size", strconv.Itoa(pageSize)) scwReq := &scalewayRequest{ Method: "GET", - Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/pools-autoscaler", + Path: fmt.Sprintf("/k8s/v1/regions/%s/autoscaler/clusters/%s/pools", c.region, clusterID), Query: query, } var resp ListPoolsResponse + header, err := c.do(ctx, scwReq, &resp) - err = c.do(ctx, scwReq, &resp) - if err != nil { - return nil, err - } - - return &resp, nil + return c.cacheControl(header), resp.Pools, err } // UpdatePoolRequest is passed to `UpdatePool` method @@ -459,166 +375,127 @@ type UpdatePoolRequest struct { // PoolID: the ID of the pool to update PoolID string `json:"-"` // Size: the new size for the pool - Size *uint32 `json:"size"` + Size int `json:"size"` } // UpdatePool is used to resize a pool, to decrease pool size `DeleteNode` should be used instead -func (c *client) UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) { - var err error - - klog.V(4).Info("UpdatePool,PoolID=", req.PoolID) +func (c *client) UpdatePool(ctx context.Context, poolID string, size int) (Pool, error) { + klog.V(4).Infof("UpdatePool,PoolID=%s", poolID) - if fmt.Sprint(req.PoolID) == "" { - return nil, errors.New("field PoolID cannot be empty in request") + if len(poolID) == 0 { + return Pool{}, errors.New("field PoolID cannot be empty in request") } scwReq := &scalewayRequest{ Method: "PATCH", - Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "", + Path: fmt.Sprintf("/k8s/v1/regions/%s/autoscaler/pools/%s", c.region, poolID), } - buf, err := json.Marshal(req) + buf, err := json.Marshal(UpdatePoolRequest{PoolID: poolID, Size: size}) if err != nil { - return nil, err + return Pool{}, err } scwReq.Body = bytes.NewReader(buf) var resp Pool + _, err = c.do(ctx, scwReq, &resp) - err = c.do(ctx, scwReq, &resp) - if err != nil { - return nil, err - } - return &resp, nil -} - -// ListNodesRequest is passed to `ListNodes` method -type ListNodesRequest struct { - // ClusterID: the cluster ID from which the nodes will be listed from - ClusterID string `json:"-"` - // PoolID: the pool ID on which to filter the returned nodes - PoolID *string `json:"-"` - // Page: the page number for the returned nodes - Page *int32 `json:"-"` - // PageSize: the maximum number of nodes per page - PageSize *uint32 `json:"-"` + return resp, err } // ListNodesResponse is returned from `ListNodes` method type ListNodesResponse struct { - // TotalCount: the total number of nodes - TotalCount uint32 `json:"total_count"` // Nodes: the paginated returned nodes - Nodes []*Node `json:"nodes"` + Nodes []Node `json:"nodes"` + // TotalCount: the total count of nodes in the cluster + TotalCount uint64 `json:"total_count"` } -// ListNodes returns the Nodes associated to a Cluster and/or a Pool -func (c *client) ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) { - klog.V(4).Info("ListNodes,ClusterID=", req.ClusterID) - - if req.Page != nil { - return c.listNodesPaginated(ctx, req) - } - - listNodes := func(page int32) (*ListNodesResponse, error) { - ctx := context.Background() +// ListNodes returns the Nodes associated to a Cluster +func (c *client) ListNodes(ctx context.Context, clusterID string) (time.Duration, []Node, error) { + klog.V(4).Infof("ListNodes,ClusterID=%s", clusterID) - return c.listNodesPaginated(ctx, &ListNodesRequest{ - ClusterID: req.ClusterID, - PoolID: req.PoolID, - Page: &page, - }) - } - - page := int32(1) - resp, err := listNodes(page) - if err != nil { - return nil, err - } - - nbPages := (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes - - for uint32(page) <= nbPages { - page++ - r, err := listNodes(page) + var currentPage = 1 + var nodes []Node + var cacheControl time.Duration + for { + cc, paginatedNodes, err := c.listNodesPaginated(ctx, clusterID, currentPage, pageSizeListPools) if err != nil { - return nil, err + return 0, []Node{}, err } + nodes = append(nodes, paginatedNodes...) + cacheControl = cc - resp.Nodes = append(resp.Nodes, r.Nodes...) - - if r.TotalCount != resp.TotalCount { - // nodes have changed on scaleway side, retrying - resp.TotalCount = r.TotalCount - resp.Nodes = []*Node{} - page = int32(1) - nbPages = (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes + if len(paginatedNodes) < pageSizeListNodes || len(paginatedNodes) == 0 { + break } + + currentPage++ } - return resp, nil -} -func (c *client) listNodesPaginated(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) { - var err error + return cacheControl, nodes, nil +} - pageSize := pageSizeListNodes - if req.PageSize == nil { - req.PageSize = &pageSize +func (c *client) listNodesPaginated(ctx context.Context, clusterID string, page, pageSize int) (time.Duration, []Node, error) { + if len(clusterID) == 0 { + return 0, nil, errors.New("clusterID cannot be empty in request") } query := url.Values{} - if req.PoolID != nil { - query.Set("pool_id", fmt.Sprint(*req.PoolID)) - } - if req.Page != nil { - query.Set("page", fmt.Sprint(*req.Page)) - } - query.Set("page_size", fmt.Sprint(*req.PageSize)) - - if fmt.Sprint(req.ClusterID) == "" { - return nil, errors.New("field ClusterID cannot be empty in request") - } + query.Set("page", strconv.Itoa(page)) + query.Set("page_size", strconv.Itoa(pageSize)) scwReq := &scalewayRequest{ Method: "GET", - Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/nodes", + Path: fmt.Sprintf("/k8s/v1/regions/%s/autoscaler/clusters/%s/nodes", c.region, clusterID), Query: query, } var resp ListNodesResponse + header, err := c.do(ctx, scwReq, &resp) - err = c.do(ctx, scwReq, &resp) - if err != nil { - return nil, err - } - return &resp, nil -} - -// DeleteNodeRequest is passed to `DeleteNode` method -type DeleteNodeRequest struct { - NodeID string `json:"-"` + return c.cacheControl(header), resp.Nodes, err } // DeleteNode asynchronously deletes a Node by its id -func (c *client) DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) { - var err error - - klog.V(4).Info("DeleteNode,NodeID=", req.NodeID) +func (c *client) DeleteNode(ctx context.Context, nodeID string) (Node, error) { + klog.V(4).Info("DeleteNode,NodeID=", nodeID) - if fmt.Sprint(req.NodeID) == "" { - return nil, errors.New("field NodeID cannot be empty in request") + if len(nodeID) == 0 { + return Node{}, errors.New("field NodeID cannot be empty in request") } scwReq := &scalewayRequest{ Method: "DELETE", - Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/nodes/" + fmt.Sprint(req.NodeID) + "", + Path: fmt.Sprintf("/k8s/v1/regions/%s/autoscaler/nodes/%s", c.region, nodeID), } var resp Node + _, err := c.do(ctx, scwReq, &resp) - err = c.do(ctx, scwReq, &resp) - if err != nil { - return nil, err + return resp, err +} + +// cacheControl extracts the `max-age` from the `Cache-Control` header +func (c *client) cacheControl(header http.Header) time.Duration { + cacheHeader := header.Get("Cache-Control") + + for _, value := range strings.Split(cacheHeader, ",") { + value = strings.Trim(value, " ") + if strings.HasPrefix(value, "max-age") { + duration := strings.Split(value, "max-age=") + if len(duration) != 2 { + return c.defaultCacheControl + } + + durationInt, err := strconv.Atoi(duration[1]) + if err != nil { + return c.defaultCacheControl + } + + return time.Second * time.Duration(durationInt) + } } - return &resp, nil + + return c.defaultCacheControl } diff --git a/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api_test.go b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api_test.go new file mode 100644 index 000000000000..8312020e10eb --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api_test.go @@ -0,0 +1,629 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalewaygo + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testClusterID = "a451feb1-e0bc-48ff-b31e-a09bbdacdb1d" + testSecretKey = "4e713ead-7f76-4a8a-8774-0ac9b8fffb5c" + testRegion = "fr-par" +) + +// createTestPool creates a test Pool with sensible defaults +func createTestPool(overrides ...func(*Pool)) Pool { + now := time.Now() + pool := Pool{ + ID: "pool-1", + ClusterID: testClusterID, + Name: "default", + Size: 3, + MinSize: 1, + MaxSize: 10, + Status: PoolStatusReady, + CreatedAt: &now, + } + for _, override := range overrides { + override(&pool) + } + return pool +} + +// createTestNode creates a test Node with sensible defaults +func createTestNode(overrides ...func(*Node)) Node { + now := time.Now() + node := Node{ + ID: "node-1", + PoolID: "pool-1", + ClusterID: testClusterID, + ProviderID: "instance-1", + Name: "node-1", + Status: NodeStatusReady, + CreatedAt: &now, + } + for _, override := range overrides { + override(&node) + } + return node +} + +func TestNewClient(t *testing.T) { + tests := []struct { + name string + cfg Config + wantErr error + }{ + { + name: "valid config", + cfg: Config{ + ClusterID: testClusterID, + SecretKey: testSecretKey, + Region: testRegion, + }, + wantErr: nil, + }, + { + name: "missing cluster ID", + cfg: Config{ + SecretKey: testSecretKey, + Region: testRegion, + }, + wantErr: ErrMissingClusterID, + }, + { + name: "missing secret key", + cfg: Config{ + ClusterID: testClusterID, + Region: testRegion, + }, + wantErr: ErrMissingSecretKey, + }, + { + name: "missing region", + cfg: Config{ + ClusterID: testClusterID, + SecretKey: testSecretKey, + }, + wantErr: ErrMissingRegion, + }, + { + name: "custom API URL", + cfg: Config{ + ClusterID: testClusterID, + SecretKey: testSecretKey, + Region: testRegion, + ApiUrl: "https://custom.api.com", + }, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewClient(tt.cfg) + + if tt.wantErr != nil { + assert.ErrorIs(t, err, tt.wantErr) + return + } + + require.NoError(t, err) + require.NotNil(t, client) + + // Verify default API URL is set when not provided + if tt.cfg.ApiUrl == "" { + assert.Equal(t, defaultApiURL, client.ApiURL()) + } else { + assert.Equal(t, tt.cfg.ApiUrl, client.ApiURL()) + } + + // Verify token and region are set correctly + assert.Equal(t, tt.cfg.SecretKey, client.Token()) + assert.Equal(t, tt.cfg.Region, client.Region()) + }) + } +} + +func TestScalewayRequest_getURL(t *testing.T) { + tests := []struct { + name string + req *scalewayRequest + apiURL string + want string + wantErr bool + }{ + { + name: "simple path", + req: &scalewayRequest{ + Path: "/k8s/v1/regions/fr-par/clusters", + }, + apiURL: "https://api.scaleway.com", + want: "https://api.scaleway.com/k8s/v1/regions/fr-par/clusters", + wantErr: false, + }, + { + name: "path with query parameters", + req: &scalewayRequest{ + Path: "/k8s/v1/regions/fr-par/clusters", + Query: url.Values{ + "page": []string{"1"}, + "page_size": []string{"100"}, + }, + }, + apiURL: "https://api.scaleway.com", + want: "https://api.scaleway.com/k8s/v1/regions/fr-par/clusters?page=1&page_size=100", + wantErr: false, + }, + { + name: "invalid base URL", + req: &scalewayRequest{ + Path: "/test", + }, + apiURL: "://invalid-url", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.req.getURL(tt.apiURL) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.want, got.String()) + }) + } +} + +func TestClient_ListPools(t *testing.T) { + t.Run("successful single page", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "GET", r.Method) + assert.NotEmpty(t, r.Header.Get("X-Auth-Token")) + + pool := createTestPool() + resp := ListPoolsResponse{ + Pools: []Pool{pool}, + TotalCount: 1, + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "max-age=30") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + cacheControl, pools, err := client.ListPools(context.Background(), testClusterID) + + require.NoError(t, err) + assert.Len(t, pools, 1) + assert.Equal(t, "pool-1", pools[0].ID) + assert.Equal(t, 3, pools[0].Size) + assert.Equal(t, PoolStatusReady, pools[0].Status) + assert.Equal(t, 30*time.Second, cacheControl) + }) + + t.Run("multiple pages", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + page := r.URL.Query().Get("page") + var pools []Pool + + if page == "1" { + // Return full page to trigger next page fetch + for i := range pageSizeListPools { + pools = append(pools, createTestPool(func(p *Pool) { + p.ID = fmt.Sprintf("pool-%d", i) + p.Name = fmt.Sprintf("pool-%d", i) + })) + } + } + + resp := ListPoolsResponse{ + Pools: pools, + TotalCount: uint64(len(pools)), + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, pools, err := client.ListPools(context.Background(), testClusterID) + + require.NoError(t, err) + assert.Equal(t, pageSizeListPools, len(pools)) + }) + + t.Run("server error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]string{"error": "internal error"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, _, err := client.ListPools(context.Background(), testClusterID) + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrServerSide)) + }) +} + +func TestClient_UpdatePool(t *testing.T) { + t.Run("successful update", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "PATCH", r.Method) + + var req UpdatePoolRequest + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + assert.Equal(t, 5, req.Size) + + now := time.Now() + resp := createTestPool(func(p *Pool) { + p.Size = 5 + p.Status = PoolStatusScaling + p.UpdatedAt = &now + }) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + pool, err := client.UpdatePool(context.Background(), "pool-1", 5) + + require.NoError(t, err) + assert.Equal(t, "pool-1", pool.ID) + assert.Equal(t, 5, pool.Size) + assert.Equal(t, PoolStatusScaling, pool.Status) + }) + + t.Run("empty pool ID", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("Should not call server with empty pool ID") + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, err := client.UpdatePool(context.Background(), "", 5) + + assert.Error(t, err) + }) + + t.Run("client error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": "bad request"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, err := client.UpdatePool(context.Background(), "pool-1", 5) + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrClientSide)) + }) +} + +func TestClient_ListNodes(t *testing.T) { + t.Run("successful single page", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "GET", r.Method) + + node := createTestNode() + resp := ListNodesResponse{ + Nodes: []Node{node}, + TotalCount: 1, + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "max-age=15") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + cacheControl, nodes, err := client.ListNodes(context.Background(), testClusterID) + + require.NoError(t, err) + assert.Len(t, nodes, 1) + assert.Equal(t, "node-1", nodes[0].ID) + assert.Equal(t, NodeStatusReady, nodes[0].Status) + assert.Equal(t, 15*time.Second, cacheControl) + }) + + t.Run("server error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]string{"error": "service unavailable"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, _, err := client.ListNodes(context.Background(), testClusterID) + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrServerSide)) + }) +} + +func TestClient_DeleteNode(t *testing.T) { + t.Run("successful delete", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "DELETE", r.Method) + + now := time.Now() + resp := createTestNode(func(n *Node) { + n.Status = NodeStatusDeleting + n.UpdatedAt = &now + }) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + node, err := client.DeleteNode(context.Background(), "node-1") + + require.NoError(t, err) + assert.Equal(t, "node-1", node.ID) + assert.Equal(t, NodeStatusDeleting, node.Status) + }) + + t.Run("empty node ID", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("Should not call server with empty node ID") + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, err := client.DeleteNode(context.Background(), "") + + assert.Error(t, err) + }) + + t.Run("not found", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "not found"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, err := client.DeleteNode(context.Background(), "node-nonexistent") + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrClientSide)) + }) +} + +func TestClient_cacheControl(t *testing.T) { + tests := []struct { + name string + headerValue string + defaultCache time.Duration + expectedDuration time.Duration + }{ + { + name: "valid max-age", + headerValue: "max-age=30", + defaultCache: 10 * time.Second, + expectedDuration: 30 * time.Second, + }, + { + name: "max-age with other directives", + headerValue: "public, max-age=60, must-revalidate", + defaultCache: 10 * time.Second, + expectedDuration: 60 * time.Second, + }, + { + name: "no max-age", + headerValue: "no-cache", + defaultCache: 15 * time.Second, + expectedDuration: 15 * time.Second, + }, + { + name: "empty header", + headerValue: "", + defaultCache: 20 * time.Second, + expectedDuration: 20 * time.Second, + }, + { + name: "invalid max-age format", + headerValue: "max-age=", + defaultCache: 25 * time.Second, + expectedDuration: 25 * time.Second, + }, + { + name: "invalid max-age value", + headerValue: "max-age=invalid", + defaultCache: 30 * time.Second, + expectedDuration: 30 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &client{ + defaultCacheControl: tt.defaultCache, + } + + header := http.Header{} + if tt.headerValue != "" { + header.Set("Cache-Control", tt.headerValue) + } + + got := c.cacheControl(header) + assert.Equal(t, tt.expectedDuration, got) + }) + } +} + +func TestClient_do(t *testing.T) { + t.Run("nil request", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("Should not call server with nil request") + })) + defer server.Close() + + client := createTestClient(t, server.URL) + var resp map[string]interface{} + _, err := client.do(context.Background(), nil, &resp) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "request must be non-nil") + }) + + t.Run("successful request", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + var resp map[string]interface{} + _, err := client.do(context.Background(), &scalewayRequest{ + Method: "GET", + Path: "/test", + }, &resp) + + require.NoError(t, err) + assert.Equal(t, "ok", resp["status"]) + }) + + t.Run("wrong content type", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte("plain text")) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + var resp map[string]interface{} + _, err := client.do(context.Background(), &scalewayRequest{ + Method: "GET", + Path: "/test", + }, &resp) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "unexpected content-type") + }) + + t.Run("client error 4xx", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": "bad request"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + var resp map[string]interface{} + _, err := client.do(context.Background(), &scalewayRequest{ + Method: "GET", + Path: "/test", + }, &resp) + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrClientSide)) + }) + + t.Run("server error 5xx", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]string{"error": "internal error"}) + })) + defer server.Close() + + client := createTestClient(t, server.URL) + var resp map[string]interface{} + _, err := client.do(context.Background(), &scalewayRequest{ + Method: "GET", + Path: "/test", + }, &resp) + + assert.Error(t, err) + assert.True(t, errors.Is(err, ErrServerSide)) + }) +} + +func TestListPoolsPaginated(t *testing.T) { + t.Run("empty cluster ID", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("Should not call server with empty cluster ID") + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, _, err := client.listPoolsPaginated(context.Background(), "", 1, 100) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "clusterID cannot be empty") + }) +} + +func TestListNodesPaginated(t *testing.T) { + t.Run("empty cluster ID", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("Should not call server with empty cluster ID") + })) + defer server.Close() + + client := createTestClient(t, server.URL) + _, _, err := client.listNodesPaginated(context.Background(), "", 1, 100) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "clusterID cannot be empty") + }) +} + +// createTestClient is a helper function to create a test client +func createTestClient(t *testing.T, apiURL string) *client { + t.Helper() + + client, err := NewClient(Config{ + ClusterID: testClusterID, + SecretKey: testSecretKey, + Region: testRegion, + ApiUrl: apiURL, + }) + require.NoError(t, err) + return client +}