@@ -29,7 +29,6 @@ import (
2929
3030	"k8s.io/klog/v2" 
3131	"k8s.io/utils/strings/slices" 
32- 
3332	"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" 
3433	"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" 
3534	gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" 
7271	formatAndMountTimeout        =  flag .Duration ("format-and-mount-timeout" , 1 * time .Minute , "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount" )
7372	fallbackRequisiteZonesFlag   =  flag .String ("fallback-requisite-zones" , "" , "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk" )
7473	enableStoragePoolsFlag       =  flag .Bool ("enable-storage-pools" , false , "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools" )
74+ 	enableDataCacheFlag          =  flag .Bool ("enable-data-cache" , false , "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration" )
75+ 	nodeName                     =  flag .String ("node-name" , "" , "The node this driver is running on" )
7576
7677	multiZoneVolumeHandleDiskTypesFlag  =  flag .String ("multi-zone-volume-handle-disk-types" , "" , "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable" )
7778	multiZoneVolumeHandleEnableFlag     =  flag .Bool ("multi-zone-volume-handle-enable" , false , "If set to true, the multi-zone volumeHandle feature will be enabled" )
@@ -122,7 +123,7 @@ func handle() {
122123	if  version  ==  ""  {
123124		klog .Fatalf ("version must be set at compile time" )
124125	}
125- 	klog .V (2 ).Infof ("Driver vendor version %v" , version )
126+ 	klog .V (4 ).Infof ("Driver vendor version %v" , version )
126127
127128	// Start tracing as soon as possible 
128129	if  * enableOtelTracing  {
@@ -209,7 +210,7 @@ func handle() {
209210		}
210211		initialBackoffDuration  :=  time .Duration (* errorBackoffInitialDurationMs ) *  time .Millisecond 
211212		maxBackoffDuration  :=  time .Duration (* errorBackoffMaxDurationMs ) *  time .Millisecond 
212- 		controllerServer  =  driver .NewControllerServer (gceDriver , cloudProvider , initialBackoffDuration , maxBackoffDuration , fallbackRequisiteZones , * enableStoragePoolsFlag , multiZoneVolumeHandleConfig , listVolumesConfig )
213+ 		controllerServer  =  driver .NewControllerServer (gceDriver , cloudProvider , initialBackoffDuration , maxBackoffDuration , fallbackRequisiteZones , * enableStoragePoolsFlag , * enableDataCacheFlag ,  multiZoneVolumeHandleConfig , listVolumesConfig )
213214	} else  if  * cloudConfigFilePath  !=  ""  {
214215		klog .Warningf ("controller service is disabled but cloud config given - it has no effect" )
215216	}
@@ -227,10 +228,29 @@ func handle() {
227228		if  err  !=  nil  {
228229			klog .Fatalf ("Failed to set up metadata service: %v" , err .Error ())
229230		}
230- 		nodeServer  =  driver .NewNodeServer (gceDriver , mounter , deviceUtils , meta , statter )
231+ 		isDataCacheEnabledNodePool , err  :=  isDataCacheEnabledNodePool (ctx , * nodeName )
232+ 		if  err  !=  nil  {
233+ 			klog .Fatalf ("Failed to get node info from API server: %v" , err .Error ())
234+ 		}
235+ 		nsArgs  :=  driver.NodeServerArgs {
236+ 			EnableDataCache :          * enableDataCacheFlag ,
237+ 			DataCacheEnabledNodePool : isDataCacheEnabledNodePool ,
238+ 		}
239+ 		nodeServer  =  driver .NewNodeServer (gceDriver , mounter , deviceUtils , meta , statter , nsArgs )
231240		if  * maxConcurrentFormatAndMount  >  0  {
232241			nodeServer  =  nodeServer .WithSerializedFormatAndMount (* formatAndMountTimeout , * maxConcurrentFormatAndMount )
233242		}
243+ 		if  * enableDataCacheFlag  {
244+ 			if  nodeName  ==  nil  ||  * nodeName  ==  ""  {
245+ 				klog .Errorf ("Data Cache enabled, but --node-name not passed" )
246+ 			}
247+ 			if  nsArgs .DataCacheEnabledNodePool  {
248+ 				if  err  :=  setupDataCache (ctx , * nodeName , nodeServer .MetadataService .GetName ()); err  !=  nil  {
249+ 					klog .Errorf ("Data Cache setup failed: %v" , err )
250+ 				}
251+ 				go  driver .StartWatcher (* nodeName )
252+ 			}
253+ 		}
234254	}
235255
236256	err  =  gceDriver .SetupGCEDriver (driverName , version , extraVolumeLabels , extraTags , identityServer , controllerServer , nodeServer )
@@ -311,3 +331,85 @@ func urlFlag(target **url.URL, name string, usage string) {
311331		return  err 
312332	})
313333}
334+ 
335+ func  isDataCacheEnabledNodePool (ctx  context.Context , nodeName  string ) (bool , error ) {
336+ 	if  ! * enableDataCacheFlag  {
337+ 		return  false , nil 
338+ 	}
339+ 	if  len (nodeName ) >  0  &&  nodeName  !=  common .TestNode  { // disregard logic below when E2E testing. 
340+ 		dataCacheLSSDCount , err  :=  driver .GetDataCacheCountFromNodeLabel (ctx , nodeName )
341+ 		return  dataCacheLSSDCount  !=  0 , err 
342+ 	}
343+ 	return  true , nil 
344+ }
345+ 
346+ func  fetchLssdsForRaiding (lssdCount  int ) ([]string , error ) {
347+ 	allLssds , err  :=  driver .FetchAllLssds ()
348+ 	if  err  !=  nil  {
349+ 		return  nil , fmt .Errorf ("Error listing all LSSDs %v" , err )
350+ 	}
351+ 
352+ 	raidedLssds , err  :=  driver .FetchRaidedLssds ()
353+ 	if  err  !=  nil  {
354+ 		return  nil , fmt .Errorf ("Error listing RAIDed LSSDs %v" , err )
355+ 	}
356+ 
357+ 	LSSDsWithEmptyMountPoint , err  :=  driver .FetchLSSDsWithEmptyMountPoint ()
358+ 	if  err  !=  nil  {
359+ 		return  nil , fmt .Errorf ("Error listing LSSDs with empty mountpoint: %v" , err )
360+ 	}
361+ 
362+ 	// We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already 
363+ 	availableLssds  :=  slices .Filter (nil , allLssds , func (e  string ) bool  {
364+ 		return  slices .Contains (LSSDsWithEmptyMountPoint , e ) &&  ! slices .Contains (raidedLssds , e )
365+ 	})
366+ 
367+ 	if  len (availableLssds ) ==  0  {
368+ 		return  nil , fmt .Errorf ("No LSSDs available to set up caching" )
369+ 	}
370+ 
371+ 	if  len (availableLssds ) <  lssdCount  {
372+ 		return  nil , fmt .Errorf ("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v" , len (availableLssds ), lssdCount )
373+ 	}
374+ 
375+ 	return  availableLssds [:lssdCount ], nil 
376+ }
377+ 
378+ func  setupDataCache (ctx  context.Context , nodeName  string , nodeId  string ) error  {
379+ 	isAlreadyRaided , err  :=  driver .IsRaided ()
380+ 	if  err  !=  nil  {
381+ 		klog .V (4 ).Infof ("Errored while scanning for available LocalSSDs err:%v; continuing Raiding" , err )
382+ 	} else  if  isAlreadyRaided  {
383+ 		klog .V (4 ).Infof ("Local SSDs are already RAIDed. Skipping Data Cache setup." )
384+ 		return  nil 
385+ 	}
386+ 
387+ 	lssdCount  :=  common .LocalSSDCountForDataCache 
388+ 	if  nodeName  !=  common .TestNode  {
389+ 		var  err  error 
390+ 		lssdCount , err  =  driver .GetDataCacheCountFromNodeLabel (ctx , nodeName )
391+ 		if  err  !=  nil  {
392+ 			return  err 
393+ 		}
394+ 		if  lssdCount  ==  0  {
395+ 			klog .V (4 ).Infof ("Data Cache is not enabled on node %v, so skipping caching setup" , nodeName )
396+ 			return  nil 
397+ 		}
398+ 	}
399+ 	lssdNames , err  :=  fetchLssdsForRaiding (lssdCount )
400+ 	if  err  !=  nil  {
401+ 		klog .Fatalf ("Failed to get sufficient SSDs for Data Cache's caching setup: %v" , err )
402+ 	}
403+ 	klog .V (4 ).Infof ("Raiding local ssds to setup Data Cache: %v" , lssdNames )
404+ 	if  err  :=  driver .RaidLocalSsds (lssdNames ); err  !=  nil  {
405+ 		return  fmt .Errorf ("Failed to Raid local SSDs, unable to setup Data Cache, got error %v" , err )
406+ 	}
407+ 
408+ 	// Initializing data cache node (VG checks w/ raided lssd) 
409+ 	if  err  :=  driver .InitializeDataCacheNode (nodeId ); err  !=  nil  {
410+ 		return  err 
411+ 	}
412+ 
413+ 	klog .V (4 ).Infof ("LSSD caching is setup for the Data Cache enabled node %s" , nodeName )
414+ 	return  nil 
415+ }
0 commit comments