Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 134 additions & 72 deletions cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"time"
Expand All @@ -38,7 +37,10 @@ import (

const (
// GPULabel is the label added to GPU nodes
GPULabel = "k8s.scaleway.com/gpu"
GPULabel = "k8s.scw.cloud/gpu"

// DefaultRefreshInterval is the default refresh interval for the cloud provider
DefaultRefreshInterval = 60 * time.Second
)

type scalewayCloudProvider struct {
Expand All @@ -47,21 +49,30 @@ type scalewayCloudProvider struct {
// ClusterID is the cluster id where the Autoscaler is running.
clusterID string
// nodeGroups is an abstraction around the Pool object returned by the API
nodeGroups []*NodeGroup
// key is the Pool ID
nodeGroups map[string]*NodeGroup
// providerNodeGroups is a pre-converted slice of node groups for NodeGroups() method
providerNodeGroups []cloudprovider.NodeGroup
// refreshInterval is the minimum duration between refreshes
refreshInterval time.Duration
// lastRefresh is the last time the nodes and node groups were refreshed from the API
lastRefresh time.Time
// lastRefreshError stores the error from the last refresh, if any
lastRefreshError error

resourceLimiter *cloudprovider.ResourceLimiter
}

func readConf(config *scalewaygo.Config, configFile io.Reader) error {
body, err := ioutil.ReadAll(configFile)
body, err := io.ReadAll(configFile)
if err != nil {
return err
}
err = json.Unmarshal(body, config)
return err
}

func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) *scalewayCloudProvider {
func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) (*scalewayCloudProvider, error) {
getenvOr := func(key, defaultValue string) string {
value := os.Getenv(key)
if value != "" {
Expand All @@ -84,6 +95,7 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl
cfg.SecretKey = getenvOr("SCW_SECRET_KEY", cfg.SecretKey)
cfg.Region = getenvOr("SCW_REGION", cfg.Region)
cfg.ApiUrl = getenvOr("SCW_API_URL", cfg.ApiUrl)
cfg.DefaultCacheControl = DefaultRefreshInterval

cfg.UserAgent = defaultUserAgent

Expand All @@ -92,21 +104,26 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl
klog.Fatalf("failed to create scaleway cloud provider: %v", err)
}

klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,SecretKey=%s-***,Region=%s,ApiURL=%s", cfg.ClusterID, client.Token()[:8], client.Region(), client.ApiURL())
klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,Region=%s,ApiURL=%s", cfg.ClusterID, client.Region(), client.ApiURL())

return &scalewayCloudProvider{
provider := &scalewayCloudProvider{
client: client,
clusterID: cfg.ClusterID,
resourceLimiter: rl,
refreshInterval: DefaultRefreshInterval,
}

// Perform initial refresh to populate node groups cache
if err := provider.Refresh(); err != nil {
klog.Errorf("Failed to perform initial refresh: %v", err)
return nil, err
}

return provider, nil
}

// BuildScaleway returns CloudProvider implementation for Scaleway.
func BuildScaleway(
opts *coreoptions.AutoscalerOptions,
do cloudprovider.NodeGroupDiscoveryOptions,
rl *cloudprovider.ResourceLimiter,
) cloudprovider.CloudProvider {
func BuildScaleway(opts *coreoptions.AutoscalerOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
var configFile io.Reader

if opts.CloudConfig != "" {
Expand All @@ -123,79 +140,63 @@ func BuildScaleway(
}()
}
}
return newScalewayCloudProvider(configFile, opts.UserAgent, rl)

provider, err := newScalewayCloudProvider(configFile, opts.UserAgent, rl)
if err != nil {
klog.Fatalf("Failed to create Scaleway cloud provider: %v", err)
}
return provider
}

// Name returns 'scaleway'
// Name returns name of the cloud provider.
func (*scalewayCloudProvider) Name() string {
return cloudprovider.ScalewayProviderName
}

// NodeGroups returns all node groups configured for this cluster.
// critical endpoint, make it fast
// NodeGroups returns all node groups configured for this cloud provider.
func (scw *scalewayCloudProvider) NodeGroups() []cloudprovider.NodeGroup {

klog.V(4).Info("NodeGroups,ClusterID=", scw.clusterID)

nodeGroups := make([]cloudprovider.NodeGroup, len(scw.nodeGroups))
for i, ng := range scw.nodeGroups {
nodeGroups[i] = ng
}
return nodeGroups
}

func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) {
for _, ng := range scw.nodeGroups {
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
return ng, nil
}
}
return nil, nil
return scw.providerNodeGroups
}

// NodeGroupForNode returns the node group for the given node, nil if the node
// should not be processed by cluster autoscaler, or non-nil error if such
// occurred.
// critical endpoint, make it fast
// occurred. Must be implemented.
func (scw *scalewayCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
klog.V(4).Infof("NodeGroupForNode,NodeSpecProviderID=%s", node.Spec.ProviderID)

return scw.nodeGroupForNode(node)
}

// HasInstance returns whether a given node has a corresponding instance in this cloud provider
func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
return true, cloudprovider.ErrNotImplemented
}

func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
ng, err := scw.nodeGroupForNode(node)
if err != nil {
return 0.0, err
for _, ng := range scw.nodeGroups {
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
return ng, nil
}
}

d := endTime.Sub(startTime)
hours := math.Ceil(d.Hours())

return hours * float64(ng.specs.NodePricePerHour), nil
return nil, nil
}

func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
return 0.0, nil
// HasInstance returns whether the node has corresponding instance in cloud provider,
// true if the node has an instance, false if it no longer exists
func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
return node.Spec.ProviderID != "", nil
}

// Pricing return pricing model for scaleway.
// Pricing returns pricing model for this cloud provider or error if not available.
// Implementation optional.
func (scw *scalewayCloudProvider) Pricing() (cloudprovider.PricingModel, ca_errors.AutoscalerError) {
klog.V(4).Info("Pricing,called")
return scw, nil
}

// GetAvailableMachineTypes get all machine types that can be requested from scaleway.
// Not implemented
// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider.
// Implementation optional.
func (scw *scalewayCloudProvider) GetAvailableMachineTypes() ([]string, error) {
return []string{}, nil
}

// NewNodeGroup builds a theoretical node group based on the node definition provided. The node group is not automatically
// created on the cloud provider side. The node group is not returned by NodeGroups() until it is created.
// Implementation optional.
func (scw *scalewayCloudProvider) NewNodeGroup(
machineType string,
labels map[string]string,
Expand All @@ -220,7 +221,6 @@ func (scw *scalewayCloudProvider) GPULabel() string {
}

// GetAvailableGPUTypes return all available GPU types cloud provider supports.
// not yet implemented.
func (scw *scalewayCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
klog.V(4).Info("GetAvailableGPUTypes,called")
return nil
Expand All @@ -244,36 +244,98 @@ func (scw *scalewayCloudProvider) Cleanup() error {
func (scw *scalewayCloudProvider) Refresh() error {
klog.V(4).Info("Refresh,ClusterID=", scw.clusterID)

ctx := context.Background()
resp, err := scw.client.ListPools(ctx, &scalewaygo.ListPoolsRequest{ClusterID: scw.clusterID})
// Only skip refresh if lastRefresh is non-zero and interval has not elapsed
if !scw.lastRefresh.IsZero() && time.Since(scw.lastRefresh) < scw.refreshInterval {
klog.V(4).Infof("Refresh,ClusterID=%s,skipping refresh, last refresh was %s ago", scw.clusterID, time.Since(scw.lastRefresh))
return scw.lastRefreshError
}

cc, pools, err := scw.client.ListPools(context.Background(), scw.clusterID)
if err != nil {
klog.Errorf("Refresh,failed to list pools for cluster %s: %s", scw.clusterID, err)
scw.lastRefresh = time.Now()
scw.lastRefreshError = err
return err
}
// Update refresh interval based on Cache-Control header from listPools response
scw.refreshInterval = cc

var ng []*NodeGroup

for _, p := range resp.Pools {
_, nodes, err := scw.client.ListNodes(context.Background(), scw.clusterID)
if err != nil {
klog.Errorf("Refresh,failed to list nodes for cluster %s: %s", scw.clusterID, err)
scw.lastRefresh = time.Now()
scw.lastRefreshError = err
return err
}

if p.Pool.Autoscaling == false {
// Build NodeGroups
nodeGroups := make(map[string]*NodeGroup)
for _, pool := range pools {
if !pool.Autoscaling {
klog.V(4).Infof("Refresh,ClusterID=%s,skipping pool %s (autoscaling disabled)", scw.clusterID, pool.ID)
continue
}

nodes, err := nodesFromPool(scw.client, p.Pool)
if err != nil {
return fmt.Errorf("Refresh,failed to list nodes for pool %s: %w", p.Pool.ID, err)
}
ng = append(ng, &NodeGroup{
nodeGroup := &NodeGroup{
Client: scw.client,
nodes: nodes,
specs: &p.Specs,
p: p.Pool,
})
nodes: make(map[string]*scalewaygo.Node),
pool: pool,
}

nodeGroups[pool.ID] = nodeGroup
}
klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(ng))

scw.nodeGroups = ng
// Assign nodes to NodeGroups
for _, node := range nodes {
_, ok := nodeGroups[node.PoolID]
if !ok {
klog.V(4).Infof("Refresh,ClusterID=%s,node %s found for PoolID=%s which does not exist in nodeGroups, skipping", scw.clusterID, node.ProviderID, node.PoolID)
continue
}

nodeGroups[node.PoolID].nodes[node.ProviderID] = &node
}

scw.nodeGroups = nodeGroups

// Pre-convert nodeGroups map to slice for NodeGroups() method
// This is to avoid converting the map to a slice on every call to NodeGroups()
// which happens quite often
scw.providerNodeGroups = make([]cloudprovider.NodeGroup, 0, len(nodeGroups))
for _, ng := range nodeGroups {
scw.providerNodeGroups = append(scw.providerNodeGroups, ng)
}

klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(nodeGroups))

scw.lastRefresh = time.Now()
scw.lastRefreshError = nil

return nil
}

// NodePrice returns a price of running the given node for a given period of time.
// All prices returned by the structure should be in the same currency.
func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
var nodeGroup *NodeGroup
for _, ng := range scw.nodeGroups {
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
nodeGroup = ng
}
}

if nodeGroup == nil {
return 0.0, fmt.Errorf("node group not found for node %s", node.Spec.ProviderID)
}

d := endTime.Sub(startTime)
hours := math.Ceil(d.Hours())

return hours * float64(nodeGroup.pool.NodePricePerHour), nil
}

// PodPrice returns a theoretical minimum price of running a pod for a given
// period of time on a perfectly matching machine.
func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
return 0.0, nil
}
Loading