Skip to content

Commit a23f433

Browse files
committed
Half-made changes
1 parent d7c5a23 commit a23f433

File tree

5 files changed

+84
-120
lines changed

5 files changed

+84
-120
lines changed

cmd/gce-pd-csi-driver/main.go

+23
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/rest"
3032
"k8s.io/klog/v2"
3133
"k8s.io/utils/strings/slices"
3234
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -239,19 +241,27 @@ func handle() {
239241
if err != nil {
240242
klog.Fatalf("Failed to get safe mounter: %v", err.Error())
241243
}
244+
242245
deviceUtils := deviceutils.NewDeviceUtils()
243246
statter := mountmanager.NewStatter(mounter)
244247
meta, err := metadataservice.NewMetadataService()
245248
if err != nil {
246249
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
247250
}
251+
252+
kubeClient, err := instantiateKubeClient()
253+
if err != nil {
254+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
255+
}
248256
nsArgs := driver.NodeServerArgs{
249257
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
250258
DeviceInUseTimeout: *deviceInUseTimeout,
251259
EnableDataCache: *enableDataCacheFlag,
252260
DataCacheEnabledNodePool: isDataCacheEnabledNodePool(ctx, *nodeName),
261+
KubeClient: kubeClient,
253262
}
254263
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
264+
255265
if *maxConcurrentFormatAndMount > 0 {
256266
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
257267
}
@@ -288,6 +298,19 @@ func handle() {
288298
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
289299
}
290300

301+
func instantiateKubeClient() (*kubernetes.Clientset, error) {
302+
klog.V(2).Infof("Setting up kubeClient")
303+
cfg, err := rest.InClusterConfig()
304+
if err != nil {
305+
return nil, fmt.Errorf("failed to create REST Config for k8s client: %w", err)
306+
}
307+
kubeClient, err := kubernetes.NewForConfig(cfg)
308+
if err != nil {
309+
return nil, fmt.Errorf("failed to create k8s client: %w", err)
310+
}
311+
return kubeClient, nil
312+
}
313+
291314
func notEmpty(v string) bool {
292315
return v != ""
293316
}

pkg/common/constants.go

+2-25
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ limitations under the License.
1717
package common
1818

1919
const (
20+
TopologyKeyPrefix = "topology.gke.io/"
2021
// Keys for Topology. This key will be shared amongst drivers from GCP
21-
TopologyKeyZone = "topology.gke.io/zone"
22+
TopologyKeyZone = TopologyKeyPrefix + "zone"
2223

2324
// VolumeAttributes for Partition
2425
VolumeAttributePartition = "partition"
@@ -32,28 +33,4 @@ const (
3233

3334
// Label that is set on a disk when it is used by a 'multi-zone' VolumeHandle
3435
MultiZoneLabel = "goog-gke-multi-zone"
35-
36-
// GCE Access Modes that are valid for hyperdisks only.
37-
GCEReadOnlyManyAccessMode = "READ_ONLY_MANY"
38-
GCEReadWriteManyAccessMode = "READ_WRITE_MANY"
39-
GCEReadWriteOnceAccessMode = "READ_WRITE_SINGLE"
40-
41-
// Data cache mode
42-
DataCacheModeWriteBack = "writeback"
43-
DataCacheModeWriteThrough = "writethrough"
44-
45-
ContextDataCacheSize = "data-cache-size"
46-
ContextDataCacheMode = "data-cache-mode"
47-
48-
// Keys in the publish context
49-
ContexLocalSsdCacheSize = "local-ssd-cache-size"
50-
// Node name for E2E tests
51-
TestNode = "test-node-csi-e2e"
52-
53-
// Default LSSD count for datacache E2E tests
54-
LocalSSDCountForDataCache = 2
55-
56-
// Node label for Data Cache (only applicable to GKE nodes)
57-
NodeLabelPrefix = "cloud.google.com/%s"
58-
DataCacheLssdCountLabel = "gke-data-cache-disk"
5936
)

pkg/common/utils.go

+7-50
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"hash/fnv"
2423
"net/http"
2524
"regexp"
2625
"slices"
@@ -79,7 +78,8 @@ const (
7978
// Full or partial URL of the zone resource, in the format:
8079
// projects/{project}/zones/{zone}
8180
zoneURIPattern = "projects/[^/]+/zones/([^/]+)$"
82-
alphanums = "bcdfghjklmnpqrstvwxz2456789"
81+
82+
gkeTopologyLabelPrefix = "topology.gke.io/"
8383
)
8484

8585
var (
@@ -103,8 +103,6 @@ var (
103103
http.StatusConflict: codes.FailedPrecondition,
104104
}
105105

106-
validDataCacheMode = []string{DataCacheModeWriteBack, DataCacheModeWriteThrough}
107-
108106
// Regular expressions for validating parent_id, key and value of a resource tag.
109107
regexParent = regexp.MustCompile(`(^[1-9][0-9]{0,31}$)|(^[a-z][a-z0-9-]{4,28}[a-z0-9]$)`)
110108
regexKey = regexp.MustCompile(`^[a-zA-Z0-9]([0-9A-Za-z_.-]{0,61}[a-zA-Z0-9])?$`)
@@ -396,15 +394,6 @@ func ConvertMiStringToInt64(str string) (int64, error) {
396394
return volumehelpers.RoundUpToMiB(quantity)
397395
}
398396

399-
// ConvertGiStringToInt64 converts a GiB string to int64
400-
func ConvertGiStringToInt64(str string) (int64, error) {
401-
quantity, err := resource.ParseQuantity(str)
402-
if err != nil {
403-
return -1, err
404-
}
405-
return volumehelpers.RoundUpToGiB(quantity)
406-
}
407-
408397
// ConvertStringToBool converts a string to a boolean.
409398
func ConvertStringToBool(str string) (bool, error) {
410399
switch strings.ToLower(str) {
@@ -697,29 +686,6 @@ func VolumeIdAsMultiZone(volumeId string) (string, error) {
697686
return strings.Join(splitId, "/"), nil
698687
}
699688

700-
func StringInSlice(s string, list []string) bool {
701-
for _, v := range list {
702-
if v == s {
703-
return true
704-
}
705-
}
706-
return false
707-
}
708-
709-
func ValidateDataCacheMode(s string) error {
710-
if StringInSlice(s, validDataCacheMode) {
711-
return nil
712-
}
713-
return fmt.Errorf("invalid data-cache-mode %s. Only \"writeback\" and \"writethrough\" is a valid input", s)
714-
}
715-
716-
func ValidateNonNegativeInt(n int64) error {
717-
if n <= 0 {
718-
return fmt.Errorf("Input should be set to > 0, got %d", n)
719-
}
720-
return nil
721-
}
722-
723689
// NewLimiter returns a token bucket based request rate limiter after initializing
724690
// the passed values for limit, burst (or token bucket) size. If opted for emptyBucket
725691
// all initial tokens are reserved for the first burst.
@@ -733,19 +699,10 @@ func NewLimiter(limit, burst int, emptyBucket bool) *rate.Limiter {
733699
return limiter
734700
}
735701

736-
func IsHyperdisk(diskType string) bool {
737-
return strings.HasPrefix(diskType, "hyperdisk-")
738-
}
702+
func IsGKETopologyLabel(key string) bool {
703+
// This is the actual code
704+
// return strings.HasPrefix(key, gkeTopologyLabelPrefix)
739705

740-
// shortString is inspired by k8s.io/apimachinery/pkg/util/rand.SafeEncodeString, but takes data from a hash.
741-
func ShortString(s string) string {
742-
hasher := fnv.New128a()
743-
hasher.Write([]byte(s))
744-
sum := hasher.Sum([]byte{})
745-
const sz = 8
746-
short := make([]byte, sz)
747-
for i := 0; i < sz; i++ {
748-
short[i] = alphanums[int(sum[i])%len(alphanums)]
749-
}
750-
return string(short)
706+
// More permissive code for testing
707+
return strings.HasPrefix(key, "topology.gke")
751708
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

+1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
157157
deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout),
158158
EnableDataCache: args.EnableDataCache,
159159
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
160+
KubeClient: args.KubeClient,
160161
}
161162
}
162163

pkg/gce-pd-csi-driver/node.go

+51-45
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030

3131
csi "github.com/container-storage-interface/spec/lib/go/csi"
3232

33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/client-go/kubernetes"
3335
"k8s.io/klog/v2"
3436
"k8s.io/mount-utils"
3537

@@ -41,13 +43,13 @@ import (
4143
)
4244

4345
type GCENodeServer struct {
44-
Driver *GCEDriver
45-
Mounter *mount.SafeFormatAndMount
46-
DeviceUtils deviceutils.DeviceUtils
47-
VolumeStatter mountmanager.Statter
48-
MetadataService metadataservice.MetadataService
49-
EnableDataCache bool
50-
DataCacheEnabledNodePool bool
46+
Driver *GCEDriver
47+
Mounter *mount.SafeFormatAndMount
48+
DeviceUtils deviceutils.DeviceUtils
49+
VolumeStatter mountmanager.Statter
50+
MetadataService metadataservice.MetadataService
51+
52+
kubeClient *kubernetes.Clientset
5153

5254
// A map storing all volumes with ongoing operations so that additional operations
5355
// for that same volume (as defined by VolumeID) return an Aborted error
@@ -80,10 +82,6 @@ type NodeServerArgs struct {
8082
EnableDeviceInUseCheck bool
8183

8284
DeviceInUseTimeout time.Duration
83-
84-
EnableDataCache bool
85-
86-
DataCacheEnabledNodePool bool
8785
}
8886

8987
var _ csi.NodeServer = &GCENodeServer{}
@@ -300,7 +298,6 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
300298
volumeID := req.GetVolumeId()
301299
stagingTargetPath := req.GetStagingTargetPath()
302300
volumeCapability := req.GetVolumeCapability()
303-
nodeId := ns.MetadataService.GetName()
304301
if len(volumeID) == 0 {
305302
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume ID must be provided")
306303
}
@@ -334,32 +331,12 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
334331
partition = part
335332
}
336333
devicePath, err := getDevicePath(ns, volumeID, partition)
334+
337335
if err != nil {
338336
return nil, status.Error(codes.Internal, fmt.Sprintf("Error when getting device path: %v", err.Error()))
339337
}
340338

341-
klog.Infof("Successfully found attached GCE PD %q at device path %s.", volumeKey.Name, devicePath)
342-
343-
if ns.EnableDataCache && (req.GetPublishContext()[common.ContextDataCacheSize] != "" || req.GetPublishContext()[common.ContextDataCacheMode] != "") {
344-
if len(nodeId) == 0 {
345-
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Node ID must be provided")
346-
}
347-
devFsPath, err := filepath.EvalSymlinks(devicePath)
348-
if err != nil {
349-
klog.Errorf("filepath.EvalSymlinks(%q) failed when trying to create volume group: %v", devicePath, err)
350-
}
351-
configError := ValidateDataCacheConfig(req.GetPublishContext()[common.ContextDataCacheMode], req.GetPublishContext()[common.ContextDataCacheSize], ctx)
352-
if configError != nil {
353-
if ns.DataCacheEnabledNodePool {
354-
return nil, status.Error(codes.DataLoss, fmt.Sprintf("Error validate configuration for Data Cache: %v", configError.Error()))
355-
}
356-
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("The Data Cache PVC is scheduled on an incompatible node pool. Please select a node pool with data cache configured: %v", configError.Error()))
357-
}
358-
devicePath, err = setupCaching(devFsPath, req, nodeId)
359-
if err != nil {
360-
return nil, status.Error(codes.DataLoss, fmt.Sprintf("Error setting up cache: %v", err.Error()))
361-
}
362-
}
339+
klog.V(4).Infof("Successfully found attached GCE PD %q at device path %s.", volumeKey.Name, devicePath)
363340

364341
// Part 2: Check if mount already exists at stagingTargetPath
365342
if ns.isVolumePathMounted(stagingTargetPath) {
@@ -513,15 +490,6 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
513490
ns.deviceInUseErrors.deleteDevice(volumeID)
514491
}
515492

516-
// The NodeUnstageVolume does not have any volume or publish context, we need to get the info from LVM locally
517-
// Check if cache group cache-{volumeID} exist in LVM
518-
if ns.EnableDataCache {
519-
nodeId := ns.MetadataService.GetName()
520-
err := cleanupCache(volumeID, nodeId)
521-
if err != nil {
522-
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
523-
}
524-
}
525493
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
526494
return &csi.NodeUnstageVolumeResponse{}, nil
527495
}
@@ -556,22 +524,60 @@ func (ns *GCENodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeG
556524
}
557525

558526
func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
527+
labels, err := ns.gkeTopologyLabels(ctx, ns.MetadataService.GetName())
528+
if err != nil {
529+
// Perhaps we don't want to fail here. We are introducing a new
530+
// dependency and we might be better off allowing this failure to
531+
// happen and moving on to retrieve the zone from GCE MDS.
532+
return nil, err
533+
}
534+
535+
labels[common.TopologyKeyZone] = ns.MetadataService.GetZone()
536+
537+
// Each "Topology" struct will later be translated into an individual
538+
// 'matchExpressions' block in the PV's NodeAffinity. Because we always
539+
// need to match on both the zone AND the disk type, both the zone and the
540+
// supported disks belong as segments on a single Topology.
559541
top := &csi.Topology{
560-
Segments: map[string]string{common.TopologyKeyZone: ns.MetadataService.GetZone()},
542+
Segments: labels,
561543
}
562544

563545
nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())
564-
565546
volumeLimits, err := ns.GetVolumeLimits()
566547

567548
resp := &csi.NodeGetInfoResponse{
568549
NodeId: nodeID,
569550
MaxVolumesPerNode: volumeLimits,
570551
AccessibleTopology: top,
571552
}
553+
554+
klog.V(2).Infof("Returning NodeGetInfoResponse: %+v", resp)
555+
572556
return resp, err
573557
}
574558

559+
// gkeTopologyLabels retrieves the node labels with the prefix
560+
// `topology.gke.io/`.
561+
func (ns *GCENodeServer) gkeTopologyLabels(ctx context.Context, nodeName string) (map[string]string, error) {
562+
klog.V(2).Infof("Retrieving node topology labels for node %q", nodeName)
563+
564+
node, err := ns.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
565+
if err != nil {
566+
// We should retry instead. Need to figure out how much wrong-ness can be tolerated and how often CSINode gets refreshed.
567+
return nil, err
568+
}
569+
570+
topology := make(map[string]string)
571+
for k, v := range node.GetLabels() {
572+
if common.IsGKETopologyLabel(k) {
573+
klog.V(2).Infof("Including node topology label %q=%q", k, v)
574+
topology[k] = v
575+
}
576+
}
577+
578+
return topology, nil
579+
}
580+
575581
func (ns *GCENodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
576582
if len(req.VolumeId) == 0 {
577583
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")

0 commit comments

Comments
 (0)