Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Disk Topology Feature #1983

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ var (

extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")

diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[some-disk-type] topology label to the Topologies returned in CreateVolumeResponse.")

version string
)

Expand Down Expand Up @@ -225,9 +227,15 @@ func handle() {
if err != nil {
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
}

initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)
// TODO(2042): Move more of the constructor args into this struct
args := &driver.GCEControllerServerArgs{
EnableDiskTopology: *diskTopology,
}

controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
} else if *cloudConfigFilePath != "" {
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
}
Expand All @@ -239,6 +247,7 @@ func handle() {
if err != nil {
klog.Fatalf("Failed to get safe mounter: %v", err.Error())
}

deviceUtils := deviceutils.NewDeviceUtils()
statter := mountmanager.NewStatter(mounter)
meta, err := metadataservice.NewMetadataService()
Expand All @@ -249,13 +258,16 @@ func handle() {
if err != nil {
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
}
nsArgs := driver.NodeServerArgs{

// TODO(2042): Move more of the constructor args into this struct
nsArgs := &driver.NodeServerArgs{
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
DeviceInUseTimeout: *deviceInUseTimeout,
EnableDataCache: *enableDataCacheFlag,
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)

if *maxConcurrentFormatAndMount > 0 {
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
// Keys for Topology. This key will be shared amongst drivers from GCP
TopologyKeyZone = "topology.gke.io/zone"

// DiskTypeKeyPrefix is the prefix for the disk type label key used as part
// of the Disk Topology feature.
DiskTypeKeyPrefix = "disk-type.gke.io"

// VolumeAttributes for Partition
VolumeAttributePartition = "partition"

Expand Down
4 changes: 4 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,7 @@ func MapNumber(num int64) int64 {
}
return 0
}

func DiskTypeLabelKey(diskType string) string {
return fmt.Sprintf("%s/%s", DiskTypeKeyPrefix, diskType)
}
20 changes: 14 additions & 6 deletions pkg/gce-cloud-provider/metadata/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ type fakeServiceManager struct{}

var _ MetadataService = &fakeServiceManager{}

const (
FakeZone = "country-region-zone"
FakeProject = "test-project"
var (
FakeMachineType = "n1-standard-1"
FakeZone = "country-region-zone"
FakeProject = "test-project"
FakeName = "test-name"
)

var FakeMachineType = "n1-standard-1"

func NewFakeService() MetadataService {
return &fakeServiceManager{}
}
Expand All @@ -40,7 +40,7 @@ func (manager *fakeServiceManager) GetProject() string {
}

func (manager *fakeServiceManager) GetName() string {
return "test-name"
return FakeName
}

func (manager *fakeServiceManager) GetMachineType() string {
Expand All @@ -50,3 +50,11 @@ func (manager *fakeServiceManager) GetMachineType() string {
func SetMachineType(s string) {
FakeMachineType = s
}

func SetZone(s string) {
FakeZone = s
}

func SetName(s string) {
FakeName = s
}
48 changes: 31 additions & 17 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ type GCEControllerServer struct {
// Embed UnimplementedControllerServer to ensure the driver returns Unimplemented for any
// new RPC methods that might be introduced in future versions of the spec.
csi.UnimplementedControllerServer

EnableDiskTopology bool
}

type GCEControllerServerArgs struct {
EnableDiskTopology bool
}

type MultiZoneVolumeHandleConfig struct {
Expand Down Expand Up @@ -320,7 +326,7 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
}
if volumeCapabilities == nil || len(volumeCapabilities) == 0 {
if len(volumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
}

Expand Down Expand Up @@ -465,9 +471,7 @@ func (gceCS *GCEControllerServer) getMultiZoneProvisioningZones(ctx context.Cont
}

func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool) (*csi.CreateVolumeResponse, error) {
// Determine the zones that are needed.
var err error

// For multi-zone, we either select:
// 1) The zones specified in requisite topology requirements
// 2) All zones in the region that are compatible with the selected disk type
Expand Down Expand Up @@ -517,7 +521,8 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *
// Use the first response as a template
volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName())
klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey)
return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil

return gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
}

func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) {
Expand Down Expand Up @@ -617,13 +622,13 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer gceCS.volumeLocks.Release(volumeID)
disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones, accessMode)

disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones, accessMode)
if err != nil {
return nil, common.LoggedError("CreateVolume failed: %v", err)
}

return generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
return gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), nil
}

func getAccessMode(req *csi.CreateVolumeRequest, params common.DiskParameters) (string, error) {
Expand Down Expand Up @@ -2396,21 +2401,30 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
case contextForceAttach:
b, err := common.ConvertStringToBool(val)
if err != nil {
return nil, fmt.Errorf("Bad volume context force attach: %v", err)
return nil, fmt.Errorf("bad volume context force attach: %w", err)
}
info.ForceAttach = b
}
}
return info, nil
}

func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
tops := []*csi.Topology{}
for _, zone := range zones {
tops = append(tops, &csi.Topology{
Segments: map[string]string{common.TopologyKeyZone: zone},
})
top := &csi.Topology{
Segments: map[string]string{
common.TopologyKeyZone: zone,
},
}

if gceCS.EnableDiskTopology {
top.Segments[common.DiskTypeLabelKey(params.DiskType)] = "true"
}

tops = append(tops, top)
}

realDiskSizeBytes := common.GbToBytes(disk.GetSizeGb())
createResp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2468,10 +2482,10 @@ func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []strin
func getResourceId(resourceLink string) (string, error) {
url, err := neturl.Parse(resourceLink)
if err != nil {
return "", fmt.Errorf("Could not parse resource %s: %w", resourceLink, err)
return "", fmt.Errorf("could not parse resource %s: %w", resourceLink, err)
}
if url.Scheme != resourceApiScheme {
return "", fmt.Errorf("Unexpected API scheme for resource %s", resourceLink)
return "", fmt.Errorf("unexpected API scheme for resource %s", resourceLink)
}

// Note that the resource host can basically be anything, if we are running in
Expand All @@ -2480,16 +2494,16 @@ func getResourceId(resourceLink string) (string, error) {
// The path should be /compute/VERSION/project/....
elts := strings.Split(url.Path, "/")
if len(elts) < 4 {
return "", fmt.Errorf("Short resource path %s", resourceLink)
return "", fmt.Errorf("short resource path %s", resourceLink)
}
if elts[1] != resourceApiService {
return "", fmt.Errorf("Bad resource service %s in %s", elts[1], resourceLink)
return "", fmt.Errorf("bad resource service %s in %s", elts[1], resourceLink)
}
if _, ok := validResourceApiVersions[elts[2]]; !ok {
return "", fmt.Errorf("Bad version %s in %s", elts[2], resourceLink)
return "", fmt.Errorf("bad version %s in %s", elts[2], resourceLink)
}
if elts[3] != resourceProject {
return "", fmt.Errorf("Expected %v to start with %s in resource %s", elts[3:], resourceProject, resourceLink)
return "", fmt.Errorf("expected %v to start with %s in resource %s", elts[3:], resourceProject, resourceLink)
}
return strings.Join(elts[3:], "/"), nil
}
Expand Down
Loading