Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 233 additions & 3 deletions pkg/profiles/recommender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package profiles

import (
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -33,7 +34,7 @@ import (
)

const (
// StorageClass param keys
// StorageClass param keys.
workloadTypeKey = "workloadType"
workloadTypeInferenceKey = "inference"
workloadTypeTrainingKey = "training"
Expand All @@ -42,11 +43,11 @@ const (
fuseMemoryAllocatableFactorKey = "fuseMemoryAllocatableFactor"
fuseEphemeralStorageAllocatableFactorKey = "fuseEphemeralStorageAllocatableFactor"

// Node allocatable resource keys
// Node allocatable resource keys.
nvidiaGpuResourceName = corev1.ResourceName("nvidia.com/gpu")
googleTpuResourceName = corev1.ResourceName("google.com/tpu")

// Node types
// Node types.
nodeTypeTPU = "tpu"
nodeTypeGPU = "gpu"
nodeTypeGeneralPurpose = "general_purpose"
Expand All @@ -55,6 +56,17 @@ const (
gkeAppliedNodeLabelsAnnotationKey = "node.gke.io/last-applied-node-labels"
// EphemeralStorageLocalSSDLabelKey is the specific label key we are looking for within the applied labels.
ephemeralStorageLocalSSDLabelKey = "cloud.google.com/gke-ephemeral-storage-local-ssd"

// metadataStatCacheBytesPerObject is the average number of metadata stat cache bytes per object.
metadataStatCacheBytesPerObject int64 = 1500
// metadataTypeCacheBytesPerObject is the average number of metadata type cache bytes per object.
metadataTypeCacheBytesPerObject int64 = 200
// mib represents 1024 * 1024 bytes.
mib int64 = 1024 * 1024

// Mount option names.
metadataStatCacheMaxSizeMiBMountOptionKey = "metadata-cache:stat-cache-max-size-mb"
metadataTypeCacheMaxSizeMiBMountOptionKey = "metadata-cache:type-cache-max-size-mb"
)

// ProfileConfig holds the consolidated configuration for a volume profile,
Expand Down Expand Up @@ -104,6 +116,25 @@ type parsedResourceList struct {
ephemeralStorageBytes int64
}

// cacheRequirements defines the size constraints for various gcsfuse caches.
type cacheRequirements struct {
// metadataStatCacheBytes is the maximum size (in bytes) for the metadata stat cache.
metadataStatCacheBytes int64
// metadataTypeCacheBytes is the maximum size (in bytes) for the metadata type cache.
metadataTypeCacheBytes int64
// fileCache is the maximum size (in bytes) for the file cache.
fileCacheBytes int64
}

// recommendation suggests optimal cache sizes based on certain criteria.
type recommendation struct {
// metadataStatCacheBytes is the recommended size in bytes for the metadata stat cache.
metadataStatCacheBytes int64
// metadataTypeCacheBytes is the recommended size in bytes for the metadata type cache.
metadataTypeCacheBytes int64
// TODO(urielguzman): Implement file cache bytes and medium recommendation.
}

// BuildProfileConfigParams contains the parameters needed to build a profile configuration.
type BuildProfileConfigParams struct {
targetPath string
Expand Down Expand Up @@ -201,6 +232,127 @@ func BuildProfileConfig(params *BuildProfileConfigParams) (*ProfileConfig, error
}, nil
}

// addRecommendationToMountOptions appends a mount option string to the given slice if the recommendationBytes is greater than 0.
// The option is formatted as "mountOptionKey:sizeMiB", where sizeMiB is the recommendationBytes converted to MiB.
func addRecommendationToMountOptions(mountOptions []string, mountOptionKey string, recommendationBytes int64) []string {
result := mountOptions
if recommendationBytes > 0 {
result = mergeMountOptionsIfKeyUnset(result, []string{fmt.Sprintf("%s=%d", mountOptionKey, bytesToMiB(recommendationBytes))})
}
return result
}

// RecommendMountOptions generates a slice of recommended mount options for GCS FUSE based on the provided ProfileConfig.
// It calculates optimal cache configurations and translates them into gcsfuse mount option strings.
func RecommendMountOptions(config *ProfileConfig) ([]string, error) {
recommendation, err := recommendCacheConfigs(config)
// TODO(urielguzman): Log the decision summary into a human readable format via a container log / Pod event.
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to recommend cache configs: %v", err)
}

recommendedMountOptions := config.scDetails.mountOptions

// Map the recommended metadata stat cache size to equivalent mount option.
recommendedMountOptions = addRecommendationToMountOptions(recommendedMountOptions, metadataStatCacheMaxSizeMiBMountOptionKey, recommendation.metadataStatCacheBytes)

// Map the recommended metadata type cache size to equivalent mount option.
recommendedMountOptions = addRecommendationToMountOptions(recommendedMountOptions, metadataTypeCacheMaxSizeMiBMountOptionKey, recommendation.metadataTypeCacheBytes)

// TODO(urielguzman): Map the recommended file cache size & medium to equivalent mount options.
return recommendedMountOptions, nil
}

// buildCacheRequirements constructs a cacheRequirements struct based on the provided pvDetails.
// It calculates the ideal sizes for metadata stat, metadata type, and file caches.
func buildCacheRequirements(pvDetails *pvDetails) *cacheRequirements {
return &cacheRequirements{
metadataStatCacheBytes: pvDetails.numObjects * metadataStatCacheBytesPerObject,
metadataTypeCacheBytes: pvDetails.numObjects * metadataTypeCacheBytesPerObject,
fileCacheBytes: pvDetails.totalSizeBytes,
}
}

// calculateFuseResourceBudget determines the memory or ephemeral storage budget available for GCS FUSE.
// It takes the node's allocatable capacity, a factor for FUSE allocation, and an optional sidecar limit.
func calculateFuseResourceBudget(nodeAllocatable int64, allocatableFactor float64, sidecarLimit int64) int64 {
budget := nodeAllocatable
// If the sidecar has "0" resource limit, then we assume the entire node allocatable is available.
// Otherwise, cap the max fuse resource to the sidecar's memory limit.
if sidecarLimit > 0 && sidecarLimit < nodeAllocatable {
budget = sidecarLimit
}
// Return x% of the available budget, determined by the allocatable factor variable.
return int64(float64(budget) * allocatableFactor)
}

// calculateResourceBudgets computes the memory and ephemeral storage budgets available for GCS FUSE
// based on the node and pod resource configurations in the ProfileConfig.
func calculateResourceBudgets(config *ProfileConfig) (int64, int64) {
// Calculate memory budget
memoryBudget := calculateFuseResourceBudget(
config.nodeDetails.nodeAllocatables.memoryBytes,
config.scDetails.fuseMemoryAllocatableFactor,
config.podDetails.sidecarLimits.memoryBytes,
)

// Calculate ephemeral storage budget
ephemeralStorageBudget := calculateFuseResourceBudget(
config.nodeDetails.nodeAllocatables.ephemeralStorageBytes,
config.scDetails.fuseEphemeralStorageAllocatableFactor,
config.podDetails.sidecarLimits.ephemeralStorageBytes,
)

return memoryBudget, ephemeralStorageBudget
}

// recommendCacheConfigs calculates recommended cache sizes and medium.
func recommendCacheConfigs(config *ProfileConfig) (*recommendation, error) {
// Validate input.
if config.pvDetails == nil {
return nil, errors.New("pvDetails cannot be nil")
}
if config.scDetails == nil {
return nil, errors.New("scDetails cannot be nil")
}
if config.nodeDetails == nil {
return nil, errors.New("nodeDetails cannot be nil")
}
if config.podDetails == nil {
return nil, errors.New("podDetails cannot be nil")
}

recommendation := &recommendation{}

// Calculate metadata and file cache requirements.
cacheRequirements := buildCacheRequirements(config.pvDetails)

// Calculate memory and ephemeral storage budgets.
// TODO(urielguzman): Use ephemeralStorageBudget to calculate file cache size and medium.
memoryBudget, _ := calculateResourceBudgets(config)

// Calculate the recommended metadata cache sizes. The memoryBudget gets decreased after each recommendation.
recommendation.metadataStatCacheBytes, memoryBudget = recommendMetadataCacheSize(config, cacheRequirements.metadataStatCacheBytes, memoryBudget, "stat")
recommendation.metadataTypeCacheBytes, _ = recommendMetadataCacheSize(config, cacheRequirements.metadataTypeCacheBytes, memoryBudget, "type")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is memoryBudget ignored here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used yet. It will be used in the next PR, where the memoryBudget is used to calculate if the file cache will fit or not in the remaining budget.


// TODO(urielguzman): Calculate the recommended file cache size and medium.
return recommendation, nil
}

// recommendMetadataCacheSize determines the recommended size for a specific metadata cache type
// (e.g., "stat" or "type"), ensuring it does not exceed the available memoryBudget.
// It returns the recommended size and the remaining memory budget.
func recommendMetadataCacheSize(config *ProfileConfig, required, memoryBudget int64, cacheType string) (int64, int64) {
recommended := minInt64(required, memoryBudget)
if recommended < required && required > 0 {
// TODO(urielguzman): Log this in a Kubernetes Pod event warning.
klog.Warningf("For target node %s, required metadata %s size %d bytes capped to available fuse memory budget %d bytes. This can impact perf due to increased GCS metadata API calls", config.nodeDetails.name, cacheType, required, recommended)
}
memoryBudget = maxInt64(0, memoryBudget-recommended)
klog.V(6).Infof("available memory after metadata %s cache: %d bytes", cacheType, memoryBudget)
return recommended, memoryBudget
}

// parseFloatParameterNonNegative extracts a parameter by key from the params map,
// parses it as a float64, and returns an error if the key is missing, the value
// is not a valid float, or the value is negative.
Expand Down Expand Up @@ -554,3 +706,81 @@ func parseResourceList(resourceList corev1.ResourceList) (*parsedResourceList, e
ephemeralStorageBytes: ephemeralStorage,
}, nil
}

// minInt64 returns the smaller of two int64 values.
func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

// maxInt64 returns the larger of two int64 values.
func maxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}

// bytesToMiB converts a byte count to mebibytes (MiB), rounding up to the nearest whole MiB.
func bytesToMiB(a int64) int64 {
return ceilDiv64(a, mib)
}

// ceilDiv64 performs integer division of 'a' by 'b', rounding the result up.
func ceilDiv64(a, b int64) int64 {
return (a + b - 1) / b
}

// getMountOptionKey extracts the key part of a mount option string.
// Formats supported and precedence:
// 1. key=value (Key is everything before the first '=')
// 2. ...:key:value (Key is everything before the last ':')
// 3. key (Key is the entire string)
func getMountOptionKey(opt string) string {
if strings.Contains(opt, "=") {
parts := strings.SplitN(opt, "=", 2)
return parts[0]
}
if strings.Contains(opt, ":") {
lastColon := strings.LastIndex(opt, ":")
// If lastColon > 0, the key is the part before the last colon.
// Examples: "a:b" -> "a", "a:b:c" -> "a:b"
if lastColon > 0 {
return opt[:lastColon]
}
}
return opt
}

// MergeMountOptionsIfKeyUnset merges mount options from srcOpts into dstOpts.
// An option from srcOpts is added only if its key is not already
// present in the keys of options within dstOpts.
func mergeMountOptionsIfKeyUnset(dstOpts, srcOpts []string) []string {
if len(srcOpts) == 0 {
return dstOpts
}

if len(dstOpts) == 0 {
return srcOpts
}

mountOptions := make([]string, len(dstOpts))
copy(mountOptions, dstOpts)

existingKeys := make(map[string]bool)
for _, opt := range mountOptions {
key := strings.ToLower(getMountOptionKey(opt))
existingKeys[key] = true
}

for _, opt := range srcOpts {
key := strings.ToLower(getMountOptionKey(opt))
if !existingKeys[key] {
mountOptions = append(mountOptions, opt)
existingKeys[key] = true
}
}
return mountOptions
}
Loading