From 9a7460052b6181bc6b6679b7009f4d6306a2d4cf Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 17 Jun 2025 11:20:02 +0800 Subject: [PATCH 1/4] fix: tsdb SQL schema typing issue --- internal/metrics/tag_parser.go | 13 ++++++++----- internal/metrics/types.go | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/metrics/tag_parser.go b/internal/metrics/tag_parser.go index 0972e25..6e85d14 100644 --- a/internal/metrics/tag_parser.go +++ b/internal/metrics/tag_parser.go @@ -41,18 +41,19 @@ func getInitTableSQL(model schema.Tabler, ttl string) string { var indexClass string var isIndex bool var extraOption string + timePrecision := "ns" // Split by semicolon first - parts := strings.Split(gormTag, ";") - for _, part := range parts { + parts := strings.SplitSeq(gormTag, ";") + for part := range parts { if part == "" { continue } // Split by colon - keyValue := strings.Split(part, ",") + keyValue := strings.SplitSeq(part, ",") - for _, key := range keyValue { + for key := range keyValue { if strings.HasPrefix(key, "column:") { columnName = strings.TrimPrefix(key, "column:") } else if strings.HasPrefix(key, "index:") { @@ -61,6 +62,8 @@ func getInitTableSQL(model schema.Tabler, ttl string) string { indexClass = strings.TrimPrefix(key, "class:") } else if strings.HasPrefix(key, "option:") { extraOption = strings.TrimPrefix(key, "option:") + } else if strings.HasPrefix(key, "precision:") { + timePrecision = strings.TrimPrefix(key, "precision:") } } } @@ -85,7 +88,7 @@ func getInitTableSQL(model schema.Tabler, ttl string) string { default: // Check if it's time.Time if field.Type == reflect.TypeOf(time.Time{}) { - dbType = "Timestamp_ms" + dbType = fmt.Sprintf("Timestamp_%s", timePrecision) isNullable = false } else { // Default to String for unknown types diff --git a/internal/metrics/types.go b/internal/metrics/types.go index 1532a04..5e95681 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -110,7 +110,7 @@ type TFSystemLog struct { // NOTE: make sure new fields will be migrated in SetupTable function - GreptimeTimestamp time.Time `json:"greptime_timestamp" gorm:"column:greptime_timestamp;index:,class:TIME"` + GreptimeTimestamp time.Time `json:"greptime_timestamp" gorm:"column:greptime_timestamp;index:,class:TIME;precision:ms"` } func (sl TFSystemLog) TableName() string { From c77c2717e518bc43ea3fc980c9be030c19f7f006 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 17 Jun 2025 17:10:18 +0800 Subject: [PATCH 2/4] feat: refactor to k8s scheduler framework --- cmd/main.go | 18 +- cmd/sched/setup.go | 124 +++++++++++++ go.mod | 80 ++++++++- go.sum | 133 +++++++++++--- internal/constants/constants.go | 3 + internal/scheduler/sched.go | 59 +++++++ internal/scheduler/sched_test.go | 294 +++++++++++++++++++++++++++++++ 7 files changed, 677 insertions(+), 34 deletions(-) create mode 100644 cmd/sched/setup.go create mode 100644 internal/scheduler/sched.go create mode 100644 internal/scheduler/sched_test.go diff --git a/cmd/main.go b/cmd/main.go index 5528120..168ce22 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/kubernetes/cmd/kube-scheduler/app" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -43,12 +44,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" + "github.com/NexusGPU/tensor-fusion/cmd/sched" "github.com/NexusGPU/tensor-fusion/internal/alert" "github.com/NexusGPU/tensor-fusion/internal/config" + "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/controller" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/portallocator" + "github.com/NexusGPU/tensor-fusion/internal/scheduler" "github.com/NexusGPU/tensor-fusion/internal/server" "github.com/NexusGPU/tensor-fusion/internal/server/router" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -244,14 +248,24 @@ func main() { os.Exit(1) } - // nolint:goconst - if os.Getenv("ENABLE_WEBHOOKS") != "false" { + if os.Getenv(constants.EnableWebhookEnv) != "false" { if err = webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Pod") os.Exit(1) } } + if os.Getenv(constants.EnableSchedulerEnv) != "false" { + + pluginOpt := app.WithPlugin(scheduler.Name, scheduler.New) + cc, scheduler, err := sched.SetupScheduler(ctx, pluginOpt) + if err != nil { + setupLog.Error(err, "unable to create tensor fusion scheduler") + os.Exit(1) + } + sched.RunScheduler(ctx, cc, scheduler) + } + if err = (&controller.TensorFusionClusterReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), diff --git a/cmd/sched/setup.go b/cmd/sched/setup.go new file mode 100644 index 0000000..bde10be --- /dev/null +++ b/cmd/sched/setup.go @@ -0,0 +1,124 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package sched + +import ( + "context" + "fmt" + "os" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/events" + "k8s.io/component-base/configz" + utilversion "k8s.io/component-base/version" + "k8s.io/klog/v2" + "k8s.io/kubernetes/cmd/kube-scheduler/app" + schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" + "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + "k8s.io/kubernetes/pkg/scheduler" + kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/profile" +) + +func SetupScheduler(ctx context.Context, outOfTreeRegistryOptions ...app.Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { + opts := options.NewOptions() + + if cfg, err := latest.Default(); err != nil { + return nil, nil, err + } else { + opts.ComponentConfig = cfg + } + + if errs := opts.Validate(); len(errs) > 0 { + return nil, nil, utilerrors.NewAggregate(errs) + } + + c, err := opts.Config(ctx) + if err != nil { + return nil, nil, err + } + + // Get the completed config + cc := c.Complete() + + outOfTreeRegistry := make(runtime.Registry) + for _, option := range outOfTreeRegistryOptions { + if err := option(outOfTreeRegistry); err != nil { + return nil, nil, err + } + } + + recorderFactory := getRecorderFactory(&cc) + completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0) + // Create the scheduler. + sched, err := scheduler.New(ctx, + cc.Client, + cc.InformerFactory, + cc.DynInformerFactory, + recorderFactory, + scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), + scheduler.WithKubeConfig(cc.KubeConfig), + scheduler.WithProfiles(cc.ComponentConfig.Profiles...), + scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), + scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), + scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), + scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), + scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration), + scheduler.WithExtenders(cc.ComponentConfig.Extenders...), + scheduler.WithParallelism(cc.ComponentConfig.Parallelism), + scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) { + // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging + completedProfiles = append(completedProfiles, profile) + }), + ) + if err != nil { + return nil, nil, err + } + if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil { + return nil, nil, err + } + + return &cc, sched, nil +} + +func RunScheduler(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { + logger := klog.FromContext(ctx) + + // To help debugging, immediately log version + logger.Info("Starting Kubernetes Scheduler", "version", utilversion.Get()) + + logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) + + // Configz registration. + if cz, err := configz.New("componentconfig"); err != nil { + return fmt.Errorf("unable to register configz: %s", err) + } else { + cz.Set(cc.ComponentConfig) + } + + // Start events processing pipeline. + cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) + defer cc.EventBroadcaster.Shutdown() + return nil +} + +func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory { + return func(name string) events.EventRecorder { + return cc.EventBroadcaster.NewRecorder(name) + } +} diff --git a/go.mod b/go.mod index 25d9ea8..41131e0 100644 --- a/go.mod +++ b/go.mod @@ -26,16 +26,23 @@ require ( k8s.io/api v0.33.1 k8s.io/apimachinery v0.33.1 k8s.io/client-go v0.33.1 + k8s.io/component-base v0.33.1 k8s.io/component-helpers v0.33.1 + k8s.io/klog/v2 v2.130.1 + k8s.io/kubernetes v1.32.5 k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 - sigs.k8s.io/controller-runtime v0.21.0 + sigs.k8s.io/controller-runtime v0.20.4 + sigs.k8s.io/scheduler-plugins v0.31.8 sigs.k8s.io/yaml v1.4.0 ) require ( - cel.dev/expr v0.19.1 // indirect + cel.dev/expr v0.23.1 // indirect filippo.io/edwards25519 v1.1.0 // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect + github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect + github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect @@ -47,7 +54,10 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/distribution/reference v0.6.0 // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -69,12 +79,16 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect - github.com/google/cel-go v0.23.2 // indirect + github.com/google/cel-go v0.22.0 // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.7.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect @@ -82,13 +96,17 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.2 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -104,7 +122,11 @@ require ( github.com/ugorji/go/codec v1.2.12 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/etcd/api/v3 v3.5.21 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect + go.etcd.io/etcd/client/v3 v3.5.21 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect go.opentelemetry.io/otel v1.33.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect @@ -136,12 +158,56 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.33.0 // indirect - k8s.io/apiserver v0.33.0 // indirect - k8s.io/component-base v0.33.0 // indirect - k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect + k8s.io/apiserver v0.32.5 // indirect + k8s.io/cloud-provider v0.33.1 // indirect + k8s.io/controller-manager v0.33.1 // indirect + k8s.io/csi-translation-lib v0.33.1 // indirect + k8s.io/dynamic-resource-allocation v0.33.1 // indirect + k8s.io/kms v0.33.1 // indirect + k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a // indirect + k8s.io/kube-scheduler v0.31.8 // indirect + k8s.io/kubelet v0.33.1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect ) + +replace ( + k8s.io/api => k8s.io/api v0.32.5 + k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.32.5 + k8s.io/apimachinery => k8s.io/apimachinery v0.32.5 + k8s.io/apiserver => k8s.io/apiserver v0.32.5 + k8s.io/cli-runtime => k8s.io/cli-runtime v0.32.5 + k8s.io/client-go => k8s.io/client-go v0.32.5 + k8s.io/cloud-provider => k8s.io/cloud-provider v0.32.5 + k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.32.5 + k8s.io/code-generator => k8s.io/code-generator v0.32.5 + k8s.io/component-base => k8s.io/component-base v0.32.5 + k8s.io/component-helpers => k8s.io/component-helpers v0.32.5 + k8s.io/controller-manager => k8s.io/controller-manager v0.32.5 + k8s.io/cri-api => k8s.io/cri-api v0.32.5 + k8s.io/cri-client => k8s.io/cri-client v0.32.5 + k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.32.5 + k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.32.5 + k8s.io/endpointslice => k8s.io/endpointslice v0.32.5 + k8s.io/kms => k8s.io/kms v0.32.5 + k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.32.5 + k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.32.5 + k8s.io/kube-proxy => k8s.io/kube-proxy v0.32.5 + k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.32.5 + k8s.io/kubectl => k8s.io/kubectl v0.32.5 + k8s.io/kubelet => k8s.io/kubelet v0.32.5 + k8s.io/kubernetes => k8s.io/kubernetes v1.32.5 + k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.31.8 + k8s.io/metrics => k8s.io/metrics v0.32.5 + k8s.io/mount-utils => k8s.io/mount-utils v0.32.5 + k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.32.5 + k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.32.5 +) + +replace k8s.io/externaljwt => k8s.io/externaljwt v0.32.5 + +replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.32.5 + +replace k8s.io/sample-controller => k8s.io/sample-controller v0.32.5 diff --git a/go.sum b/go.sum index dad7be6..72f583e 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,27 @@ al.essio.dev/pkg/shellescape v1.6.0 h1:NxFcEqzFSEVCGN2yq7Huv/9hyCEGVa/TncnOOBBeXHA= al.essio.dev/pkg/shellescape v1.6.0/go.mod h1:6sIqp7X2P6mThCQ7twERpZTuigpr6KbZWtls1U8I890= -cel.dev/expr v0.19.1 h1:NciYrtDRIR0lNCnH1LFJegdjspNx9fI59O7TWcua/W4= -cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= +cel.dev/expr v0.23.1 h1:K4KOtPCJQjVggkARsjG9RWXP6O4R73aHeJMa/dmCQQg= +cel.dev/expr v0.23.1/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/NVIDIA/go-nvml v0.12.9-0 h1:e344UK8ZkeMeeLkdQtRhmXRxNf+u532LDZPGMtkdus0= github.com/NVIDIA/go-nvml v0.12.9-0/go.mod h1:+KNA7c7gIBH7SKSJ1ntlwkfN80zdx8ovl4hrK3LmPt4= +github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= +github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107 h1:qagvUyrgOnBIlVRQWOyCZGVKUIYbMBdGdJ104vBpRFU= github.com/aliyun/alibaba-cloud-sdk-go v1.63.107/go.mod h1:SOSDHfe1kX91v3W5QiBsWSLqeLxImobbMX1mxrFHsVQ= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= +github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= +github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go-v2 v1.36.4 h1:GySzjhVvx0ERP6eyfAbAuAXLtAda5TEy19E5q5W8I9E= github.com/aws/aws-sdk-go-v2 v1.36.4/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 h1:o1v1VFfPcDVlK3ll1L5xHsaQAFdNtZ5GXnNR7SwueC4= @@ -46,16 +52,26 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= -github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -107,16 +123,21 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= +github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= +github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/cel-go v0.23.2 h1:UdEe3CvQh3Nv+E/j9r1Y//WO0K0cSyD7/y0bzyLIMI4= -github.com/google/cel-go v0.23.2/go.mod h1:52Pb6QsDbC5kvgxvZhiL9QX1oZEkcUF/ZqaPx1J5Wwo= +github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= +github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= +github.com/google/cel-go v0.25.0 h1:jsFw9Fhn+3y2kBbltZR4VEz5xKkcIFRPDnuEzAGv5GY= +github.com/google/cel-go v0.25.0/go.mod h1:hjEb6r5SuOSlhCHmFoLzu8HGCERvIsDAbxDAyNU/MmI= github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -134,6 +155,14 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= +github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= +github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -153,11 +182,15 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.2 h1:uAwqOtyrFYggq3pVf3hs1XKkBxrQ8dkgjWz3LCLJsiY= +github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.2/go.mod h1:LBzS4n6GX1C69tzSd5EibZ9cGOXFuHP7GxEMDYVe1sM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= @@ -184,6 +217,8 @@ github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4 github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -196,6 +231,8 @@ github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y= github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= @@ -222,6 +259,10 @@ github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI= github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= +github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -231,7 +272,6 @@ github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8w github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -241,6 +281,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= @@ -251,12 +293,32 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 h1:S2dVYn90KE98chqDkyE9Z4N61UnQd+KOfgp5Iu53llk= +github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= +go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= +go.etcd.io/etcd/api/v3 v3.5.21 h1:A6O2/JDb3tvHhiIz3xf9nJ7REHvtEFJJ3veW3FbCnS8= +go.etcd.io/etcd/api/v3 v3.5.21/go.mod h1:c3aH5wcvXv/9dqIw2Y810LDXJfhSYdHQ0vxmP3CCHVY= +go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc= +go.etcd.io/etcd/client/pkg/v3 v3.5.21/go.mod h1:BgqT/IXPjK9NkeSDjbzwsHySX3yIle2+ndz28nVsjUs= +go.etcd.io/etcd/client/v2 v2.305.16 h1:kQrn9o5czVNaukf2A2At43cE9ZtWauOtf9vRZuiKXow= +go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= +go.etcd.io/etcd/client/v3 v3.5.21 h1:T6b1Ow6fNjOLOtM0xSoKNQt1ASPCLWrF9XMHcH9pEyY= +go.etcd.io/etcd/client/v3 v3.5.21/go.mod h1:mFYy67IOqmbRf/kRUvsHixzo3iG+1OF2W2+jVIQRAnU= +go.etcd.io/etcd/pkg/v3 v3.5.16 h1:cnavs5WSPWeK4TYwPYfmcr3Joz9BH+TZ6qoUtz6/+mc= +go.etcd.io/etcd/pkg/v3 v3.5.16/go.mod h1:+lutCZHG5MBBFI/U4eYT5yL7sJfnexsoM20Y0t2uNuY= +go.etcd.io/etcd/raft/v3 v3.5.16 h1:zBXA3ZUpYs1AwiLGPafYAKKl/CORn/uaxYDwlNwndAk= +go.etcd.io/etcd/raft/v3 v3.5.16/go.mod h1:P4UP14AxofMJ/54boWilabqqWoW9eLodl6I5GdGzazI= +go.etcd.io/etcd/server/v3 v3.5.16 h1:d0/SAdJ3vVsZvF8IFVb1k8zqMZ+heGcNfft71ul9GWE= +go.etcd.io/etcd/server/v3 v3.5.16/go.mod h1:ynhyZZpdDp1Gq49jkUg5mfkDWZwXnn3eIqCqtJnrD/s= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 h1:PS8wXpbyaDJQ2VDHHncMe9Vct0Zn1fEjpsjrLxGJoSc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0/go.mod h1:HDBUsEjOuRC0EzKZ1bSaRGZWUBAzo+MhAcUUORSr4D0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= @@ -325,6 +387,7 @@ golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= @@ -355,6 +418,8 @@ gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJ gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= +google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= google.golang.org/genproto/googleapis/api v0.0.0-20241223144023-3abc09e42ca8 h1:st3LcW/BPi75W4q1jJTEor/QWwbNlPlDG0JTn6XhZu0= google.golang.org/genproto/googleapis/api v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:klhJGKFyG8Tn50enBn7gizg4nXGXJ+jqEREdCWaPcV4= google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= @@ -387,37 +452,55 @@ gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= gorm.io/driver/mysql v1.6.0/go.mod h1:D/oCC2GWK3M/dqoLxnOlaNKmXz8WNTfcS9y5ovaSqKo= gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs= gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE= -k8s.io/api v0.33.1 h1:tA6Cf3bHnLIrUK4IqEgb2v++/GYUtqiu9sRVk3iBXyw= -k8s.io/api v0.33.1/go.mod h1:87esjTn9DRSRTD4fWMXamiXxJhpOIREjWOSjsW1kEHw= -k8s.io/apiextensions-apiserver v0.33.0 h1:d2qpYL7Mngbsc1taA4IjJPRJ9ilnsXIrndH+r9IimOs= -k8s.io/apiextensions-apiserver v0.33.0/go.mod h1:VeJ8u9dEEN+tbETo+lFkwaaZPg6uFKLGj5vyNEwwSzc= -k8s.io/apimachinery v0.33.1 h1:mzqXWV8tW9Rw4VeW9rEkqvnxj59k1ezDUl20tFK/oM4= -k8s.io/apimachinery v0.33.1/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= -k8s.io/apiserver v0.33.0 h1:QqcM6c+qEEjkOODHppFXRiw/cE2zP85704YrQ9YaBbc= -k8s.io/apiserver v0.33.0/go.mod h1:EixYOit0YTxt8zrO2kBU7ixAtxFce9gKGq367nFmqI8= -k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4= -k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA= -k8s.io/component-base v0.33.0 h1:Ot4PyJI+0JAD9covDhwLp9UNkUja209OzsJ4FzScBNk= -k8s.io/component-base v0.33.0/go.mod h1:aXYZLbw3kihdkOPMDhWbjGCO6sg+luw554KP51t8qCU= -k8s.io/component-helpers v0.33.1 h1:DdQMww8jOr+sGhIrkz70Lp9Qerq/JzeZDBRd508DHDo= -k8s.io/component-helpers v0.33.1/go.mod h1:LQwxW5L3dH7341Unj+phndJu0Ic5UjxA//7FT8YVP5U= +k8s.io/api v0.32.5 h1:uqjjsYo1kTJr5NIcoIaP9F+TgXgADH7nKQx91FDAhtk= +k8s.io/api v0.32.5/go.mod h1:bXXFU3fGCZ/eFMZvfHZC69PeGbXEL4zzjuPVzOxHF64= +k8s.io/apiextensions-apiserver v0.32.5 h1:o0aKvmzIIs8Uk54pidk32pxET+Pg2ULnh9WI1PuKTwE= +k8s.io/apiextensions-apiserver v0.32.5/go.mod h1:5fpedJa3HJJFBukAZ6ur91DEDye5gYuXISPbOiNLYpU= +k8s.io/apimachinery v0.32.5 h1:6We3aJ6crC0ap8EhsEXcgX3LpI6SEjubpiOMXLROwPM= +k8s.io/apimachinery v0.32.5/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/apiserver v0.32.5 h1:phmm2EOUVFI+cLiq8Grtuh166fTt/qgvkGPkpgzp5uY= +k8s.io/apiserver v0.32.5/go.mod h1:5bfueS1tgARVWVXRJBMI5mHoCmev0jOvbxebai/kiqc= +k8s.io/client-go v0.32.5 h1:huFmQMzgWu0z4kbWsuZci+Gt4Fo72I4CcrvhToZ/Qp0= +k8s.io/client-go v0.32.5/go.mod h1:Qchw6f9WIVrur7DKojAHpRgGLcANT0RLIvF39Jz58xA= +k8s.io/cloud-provider v0.32.5 h1:KzO0mpXYArWxQH91+a4WLLrhTaO5RGWmQn4lzUXY6ak= +k8s.io/cloud-provider v0.32.5/go.mod h1:nMd0wwOlOIkl98HgOl960Uf+swbp6FWF/240K3/gkWQ= +k8s.io/component-base v0.32.5 h1:2HiX+m3s9Iz5CMqdCVDH2V942UqzQvjuhcXb4W+KCsg= +k8s.io/component-base v0.32.5/go.mod h1:jDsPNFFElv9m27TcYxlpEX7TZ3vdgx2g4PaqMUHpV/Y= +k8s.io/component-helpers v0.32.5 h1:CfUO2BUlAUwddmPw4T7vcqdO992a8WGQW53Ldl5K9vg= +k8s.io/component-helpers v0.32.5/go.mod h1:YRmZMXae6PC7ywPMhrnfPDQUMOaqE8re98CnyTqUcwE= +k8s.io/controller-manager v0.32.5 h1:XeFdbhnpvSMr4WI1xASgYj4Eqt9OTcPh4lmJV88NGAk= +k8s.io/controller-manager v0.32.5/go.mod h1:NDWmzWlHAUBLDwtavRsF5O48ZGuLJezT8m82ehI7s+Y= +k8s.io/csi-translation-lib v0.32.5 h1:LO6yj5HqgEftil7PPq0/YLtyA9x+3WNlp7I0W/UkYbc= +k8s.io/csi-translation-lib v0.32.5/go.mod h1:7YI8bhbV4qphv2s/ANeEgoUzW1qec4b8leTMujAtTVk= +k8s.io/dynamic-resource-allocation v0.32.5 h1:MrlfF5bfOvRhNLhnqUu+6ZQg2z77jfENT72p4sVTQuc= +k8s.io/dynamic-resource-allocation v0.32.5/go.mod h1:kKYIB7UMRvZ4Ykb80+mSxXpPxokKVY5bqlFM8vQ1ayk= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= -k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= +k8s.io/kms v0.32.5 h1:wnvdGfSniCyrXa3ukM7hTNlZPO05U8IJHa2scqYRYm4= +k8s.io/kms v0.32.5/go.mod h1:Bk2evz/Yvk0oVrvm4MvZbgq8BD34Ksxs2SRHn4/UiOM= +k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a h1:ZV3Zr+/7s7aVbjNGICQt+ppKWsF1tehxggNfbM7XnG8= +k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= +k8s.io/kube-scheduler v0.32.5 h1:sPlell9o0IAD12PanESrWTUcZeUaz9lQMBzNZzZ66/8= +k8s.io/kube-scheduler v0.32.5/go.mod h1:jTl+IZEujDywfLvcyHC6tOVQVz8vRct/vtoqikqsBms= +k8s.io/kubelet v0.32.5 h1:PkD/XAO3KArZIi/B+cHEUBd6VfjHxHK/KYaThEQ3n8E= +k8s.io/kubelet v0.32.5/go.mod h1:5DlJlmv/PDspCFcMUQ+r5muJtg/jgxFr6Kqd28WXfIA= +k8s.io/kubernetes v1.32.5 h1:PmYLXqBh09l3ez5xA8OqzVTYmjsF12+b8DSQel6phtQ= +k8s.io/kubernetes v1.32.5/go.mod h1:GvhiBeolvSRzBpFlgM0z/Bbu3Oxs9w3P6XfEgYaMi8k= k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 h1:jgJW5IePPXLGB8e/1wvd0Ich9QE97RvvF3a8J3fP/Lg= k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= -sigs.k8s.io/controller-runtime v0.21.0 h1:CYfjpEuicjUecRk+KAeyYh+ouUBn4llGyDYytIGcJS8= -sigs.k8s.io/controller-runtime v0.21.0/go.mod h1:OSg14+F65eWqIu4DceX7k/+QRAbTTvxeQSNSOQpukWM= +sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= +sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/scheduler-plugins v0.31.8 h1:Ie2EFRnkE9T2tBjxwypww7hJJyPRIwrXJNZeNxjP6QY= +sigs.k8s.io/scheduler-plugins v0.31.8/go.mod h1:KkcXEbf9CYaoZ5ntbAMSYmquPq9MtSfXVpI31R6mHeM= sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc= sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 3d72be1..f7e39b0 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -98,6 +98,9 @@ const ( QoSLevelMedium = "medium" QoSLevelHigh = "high" QoSLevelCritical = "critical" + + EnableWebhookEnv = "ENABLE_WEBHOOKS" + EnableSchedulerEnv = "ENABLE_SCHEDULER" ) const ( diff --git a/internal/scheduler/sched.go b/internal/scheduler/sched.go new file mode 100644 index 0000000..f5e3232 --- /dev/null +++ b/internal/scheduler/sched.go @@ -0,0 +1,59 @@ +package scheduler + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" + "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const Name = "tf-scheduler" + +var _ framework.PreFilterPlugin = &TensorFusionScheduling{} +var _ framework.ReservePlugin = &TensorFusionScheduling{} + +type TensorFusionScheduling struct { + logger *klog.Logger + fh framework.Handle + podLister cache.Indexer + pdbLister cache.Indexer + client *rest.RESTClient +} + +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + lh := klog.FromContext(ctx).WithValues("plugin", Name) + c := &TensorFusionScheduling{ + logger: &lh, + fh: handle, + } + return c, nil +} + +func (s *TensorFusionScheduling) Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) { + +} + +func (s *TensorFusionScheduling) Name() string { + return Name +} + +func (s *TensorFusionScheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + return nil, nil +} + +func (s *TensorFusionScheduling) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func (s *TensorFusionScheduling) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} + +func (s *TensorFusionScheduling) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { +} diff --git a/internal/scheduler/sched_test.go b/internal/scheduler/sched_test.go new file mode 100644 index 0000000..15b1328 --- /dev/null +++ b/internal/scheduler/sched_test.go @@ -0,0 +1,294 @@ +package scheduler + +import ( + "context" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + st "k8s.io/kubernetes/pkg/scheduler/testing" + tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" + imageutils "k8s.io/kubernetes/test/utils/image" + + tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + testutil "sigs.k8s.io/scheduler-plugins/test/util" +) + +const ResourceGPU v1.ResourceName = "nvidia.com/gpu" + +var ( + midPriority, highPriority = int32(100), int32(1000) +) + +func TestPreFilter(t *testing.T) { + type podInfo struct { + podName string + podNamespace string + memReq int64 + } + + tests := []struct { + name string + podInfos []podInfo + gpus []tfv1.GPU + expected []framework.Code + }{ + { + name: "find gpu", + podInfos: []podInfo{ + {podName: "ns1-p1", podNamespace: "ns1", memReq: 500}, + {podName: "ns1-p2", podNamespace: "ns1", memReq: 1800}, + }, + gpus: []tfv1.GPU{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "gpu-1", + }, + Status: tfv1.GPUStatus{ + Capacity: &tfv1.Resource{ + Tflops: resource.MustParse("10"), + Vram: resource.MustParse("20Gi"), + }, + }, + }, + }, + expected: []framework.Code{ + framework.Success, + framework.Unschedulable, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var registerPlugins []tf.RegisterPluginFunc + registeredPlugins := append( + registerPlugins, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + + fwk, err := tf.NewFramework( + ctx, registeredPlugins, "", + frameworkruntime.WithPodNominator(testutil.NewPodNominator(nil)), + frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(make([]*v1.Pod, 0), make([]*v1.Node, 0))), + ) + + if err != nil { + t.Fatal(err) + } + + // TODO add gpu store + cs := &TensorFusionScheduling{ + fh: fwk, + } + + pods := make([]*v1.Pod, 0) + + // for _, _ := range tt.gpus { + // // TODO make gpu + // } + for _, podInfo := range tt.podInfos { + pod := makePod(podInfo.podName, podInfo.podNamespace, podInfo.memReq, 0, 0, 0, podInfo.podName, "") + pods = append(pods, pod) + } + + state := framework.NewCycleState() + for i := range pods { + if _, got := cs.PreFilter(context.TODO(), state, pods[i]); got.Code() != tt.expected[i] { + t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message()) + } + } + }) + } +} + +func TestReserve(t *testing.T) { + tests := []struct { + name string + pods []*v1.Pod + expectedCodes []framework.Code + expected []tfv1.GPUStatus + }{ + { + name: "Reserve pods", + pods: []*v1.Pod{ + makePod("t1-p1", "ns1", 50, 0, 0, midPriority, "t1-p1", "node-a"), + makePod("t1-p2", "ns2", 50, 0, 0, midPriority, "t1-p2", "node-a"), + }, + expectedCodes: []framework.Code{ + framework.Success, + framework.Success, + }, + expected: []tfv1.GPUStatus{ + { + Capacity: &tfv1.Resource{ + Tflops: resource.MustParse("1000"), + Vram: resource.MustParse("20Gi"), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var registerPlugins []tf.RegisterPluginFunc + registeredPlugins := append( + registerPlugins, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + + fwk, err := tf.NewFramework( + ctx, registeredPlugins, "", + frameworkruntime.WithPodNominator(testutil.NewPodNominator(nil)), + frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(make([]*v1.Pod, 0), make([]*v1.Node, 0))), + ) + + if err != nil { + t.Fatal(err) + } + + cs := &TensorFusionScheduling{ + fh: fwk, + } + + state := framework.NewCycleState() + for i, pod := range tt.pods { + got := cs.Reserve(context.TODO(), state, pod, "node-a") + if got.Code() != tt.expectedCodes[i] { + t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message()) + } + // TODO assert + } + }) + } +} + +func TestUnreserve(t *testing.T) { + tests := []struct { + name string + pods []*v1.Pod + expected []tfv1.GPUStatus + }{ + { + name: "Unreserve pods", + pods: []*v1.Pod{ + makePod("t1-p1", "ns1", 50, 0, 0, midPriority, "t1-p1", "node-a"), + makePod("t1-p2", "ns2", 50, 0, 0, midPriority, "t1-p2", "node-a"), + makePod("t1-p3", "ns1", 50, 0, 0, midPriority, "t1-p3", "node-a"), + }, + expected: []tfv1.GPUStatus{ + { + Capacity: &tfv1.Resource{ + Tflops: resource.MustParse("100"), + Vram: resource.MustParse("20Gi"), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var registerPlugins []tf.RegisterPluginFunc + registeredPlugins := append( + registerPlugins, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + + fwk, err := tf.NewFramework( + ctx, registeredPlugins, "", + frameworkruntime.WithPodNominator(testutil.NewPodNominator(nil)), + frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(make([]*v1.Pod, 0), make([]*v1.Node, 0))), + ) + + if err != nil { + t.Fatal(err) + } + + cs := &TensorFusionScheduling{ + fh: fwk, + } + + state := framework.NewCycleState() + for _, pod := range tt.pods { + cs.Unreserve(context.TODO(), state, pod, "node-a") + // TODO asserts + } + }) + } +} + +func makeUnschedulableNodeStatusReader() *framework.NodeToStatus { + nodeStatusReader := framework.NewDefaultNodeToStatus() + nodeStatusReader.Set("node-a", framework.NewStatus(framework.Unschedulable)) + return nodeStatusReader +} + +func makePod(podName string, namespace string, memReq int64, cpuReq int64, gpuReq int64, priority int32, uid string, nodeName string) *v1.Pod { + pause := imageutils.GetPauseImageName() + pod := st.MakePod().Namespace(namespace).Name(podName).Container(pause). + Priority(priority).Node(nodeName).UID(uid).ZeroTerminationGracePeriod().Obj() + pod.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: *resource.NewQuantity(memReq, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(cpuReq, resource.DecimalSI), + ResourceGPU: *resource.NewQuantity(gpuReq, resource.DecimalSI), + }, + } + return pod +} + +func makePodWithStatus(pod *v1.Pod, podPhase v1.PodPhase) *v1.Pod { + pod.Status.Phase = podPhase + return pod +} + +func makeEQ(namespace, name string, max, min v1.ResourceList) *v1alpha1.ElasticQuota { + eq := &v1alpha1.ElasticQuota{ + TypeMeta: metav1.TypeMeta{Kind: "ElasticQuota", APIVersion: "scheduling.sigs.k8s.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eq.Spec.Max = max + eq.Spec.Min = min + return eq +} + +func makeResourceList(cpu, mem int64) v1.ResourceList { + return v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(mem, resource.BinarySI), + } +} + +func makeRegisteredPlugin() []tf.RegisterPluginFunc { + registeredPlugins := []tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + tf.RegisterPluginAsExtensions(noderesources.Name, func(ctx context.Context, plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return noderesources.NewFit(ctx, plArgs, fh, plfeature.Features{}) + }, "Filter", "PreFilter"), + } + return registeredPlugins +} From 81c69e609bcd68aa28f060d28386e3df830da18f Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Wed, 18 Jun 2025 22:21:05 +0800 Subject: [PATCH 3/4] fix: add scheduler framework and basic tests, align log to klog --- .gitignore | 4 +- .vscode/launch.json | 5 +- .vscode/settings.json | 20 +++++ cmd/main.go | 38 ++++++--- cmd/sched/setup.go | 70 ++++++++++++---- config/samples/scheduler-config.yaml | 47 +++++++++++ internal/metrics/connect.go | 3 + .../scheduler/gpuresources/gpuresources.go | 79 +++++++++++++++++++ .../gpuresources_test.go} | 12 +-- .../scheduler/gputopo/gpu_network_topo.go | 65 +++++++++++++++ internal/scheduler/sched.go | 59 -------------- internal/worker/worker.go | 1 + tf-scheduler/scheduler-plugins | 1 + 13 files changed, 311 insertions(+), 93 deletions(-) create mode 100644 config/samples/scheduler-config.yaml create mode 100644 internal/scheduler/gpuresources/gpuresources.go rename internal/scheduler/{sched_test.go => gpuresources/gpuresources_test.go} (97%) create mode 100644 internal/scheduler/gputopo/gpu_network_topo.go delete mode 100644 internal/scheduler/sched.go create mode 160000 tf-scheduler/scheduler-plugins diff --git a/.gitignore b/.gitignore index d178391..dfb2850 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,6 @@ cmd/*/__debug* prompts/* -tmp* \ No newline at end of file +tmp* + +__debug* \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 5f16f34..1d9a139 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -15,8 +15,9 @@ "program": "${workspaceFolder}/cmd/main.go", "args": [ "--gpu-info-config", "${workspaceFolder}/config/samples/gpu-info-config.yaml", - "--alert-rule-config", "${workspaceFolder}/config/samples/dynamic-config.yaml", - "--enable-alert", "true" + "--dynamic-config", "${workspaceFolder}/config/samples/dynamic-config.yaml", + "--scheduler-config", "${workspaceFolder}/config/samples/scheduler-config.yaml", + // "--enable-alert" ] }, { diff --git a/.vscode/settings.json b/.vscode/settings.json index c2b6c50..95e64a2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,6 +7,8 @@ "AMDCDNA", "AMDRDNA", "apimachinery", + "apiruntime", + "apiutil", "automount", "AWSGPU", "batchv", @@ -19,17 +21,22 @@ "cloudnative", "cloudprovider", "clusterissuers", + "componentconfig", + "configz", "controllerutil", "corev", "crds", "CUDA", "cycjimmy", + "datanode", + "defaultbinder", "dylib", "envtest", "essd", "Eventf", "finalizer", "Finalizers", + "frameworkruntime", "FULLTEXT", "goconst", "gocyclo", @@ -44,21 +51,30 @@ "gpunodes", "gpupool", "gpupools", + "gpuresources", "GPUT", + "gputopo", "GPUVRAM", "greptime", "greptimedb", "healthz", "iface", + "imageutils", "karpenter", + "klog", + "Klogr", "kubebuilder", "KUBECONFIG", "Kubelet", + "kubescheduler", + "kubeschedulerconfig", "kustomization", "metav", "metricsserver", + "Milli", "mito", "nindent", + "noderesources", "nolint", "NVML", "omitempty", @@ -69,12 +85,15 @@ "prometheusagents", "prometheuses", "prometheusrules", + "queuesort", "RDNA", "readyz", "replicaset", "runbook", "runpod", "samber", + "sched", + "schedulerserverconfig", "schedulingconfigtemplate", "schedulingconfigtemplates", "schedulingcorev", @@ -88,6 +107,7 @@ "tensorfusionclusters", "tensorfusionworkload", "Tera", + "testutil", "tflops", "timberio", "Tmpl", diff --git a/cmd/main.go b/cmd/main.go index 168ce22..c08478a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -29,6 +29,7 @@ import ( // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -37,7 +38,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -52,7 +52,8 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/portallocator" - "github.com/NexusGPU/tensor-fusion/internal/scheduler" + gpuResourceFitPlugin "github.com/NexusGPU/tensor-fusion/internal/scheduler/gpuresources" + gpuTopoPlugin "github.com/NexusGPU/tensor-fusion/internal/scheduler/gputopo" "github.com/NexusGPU/tensor-fusion/internal/server" "github.com/NexusGPU/tensor-fusion/internal/server/router" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -87,6 +88,7 @@ var timeSeriesDB *metrics.TimeSeriesDB var dynamicConfigPath string var globalConfig config.GlobalConfig var alertEvaluator *alert.AlertEvaluator +var schedulerConfigPath string func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) @@ -110,6 +112,8 @@ func main() { "/etc/tensor-fusion/gpu-info.yaml", "specify the path to gpuInfoConfig file") flag.StringVar(&dynamicConfigPath, "dynamic-config", "/etc/tensor-fusion/config.yaml", "specify the path to dynamic config file") + flag.StringVar(&schedulerConfigPath, "scheduler-config", "/etc/tensor-fusion/scheduler-config.yaml", + "specify the path to TensorFusion scheduler config file") flag.StringVar(&metricsPath, "metrics-path", "/logs/metrics.log", "specify the path to metrics file") flag.StringVar(&nodeLevelPortRange, "host-port-range", "40000-42000", "specify the port range for assigning ports to pre-scheduled Pods such as vGPU workers") @@ -125,13 +129,9 @@ func main() { "built-in rules if enabled alert, you can configure routers and receivers "+ "in your own alertmanager config, "+ "refer https://prometheus.io/docs/alerting/latest/configuration") - opts := zap.Options{ - Development: true, - } - opts.BindFlags(flag.CommandLine) - flag.Parse() - ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + flag.Parse() + ctrl.SetLogger(klog.NewKlogr()) ctx := context.Background() // print version info @@ -256,14 +256,30 @@ func main() { } if os.Getenv(constants.EnableSchedulerEnv) != "false" { + if schedulerConfigPath == "" { + setupLog.Error(err, "scheduler config path is empty, please and --scheduler-config in command line") + os.Exit(1) + } - pluginOpt := app.WithPlugin(scheduler.Name, scheduler.New) - cc, scheduler, err := sched.SetupScheduler(ctx, pluginOpt) + gpuResourceFitOpt := app.WithPlugin( + gpuResourceFitPlugin.Name, + gpuResourceFitPlugin.NewWithDeps(allocator), + ) + gpuTopoOpt := app.WithPlugin( + gpuTopoPlugin.Name, + gpuTopoPlugin.NewWithDeps(allocator), + ) + + cc, scheduler, err := sched.SetupScheduler(ctx, schedulerConfigPath, gpuResourceFitOpt, gpuTopoOpt) if err != nil { setupLog.Error(err, "unable to create tensor fusion scheduler") os.Exit(1) } - sched.RunScheduler(ctx, cc, scheduler) + + if err := sched.RunScheduler(ctx, cc, scheduler); err != nil { + setupLog.Error(err, "unable to run tensor fusion scheduler") + os.Exit(1) + } } if err = (&controller.TensorFusionClusterReconciler{ diff --git a/cmd/sched/setup.go b/cmd/sched/setup.go index bde10be..c43cb75 100644 --- a/cmd/sched/setup.go +++ b/cmd/sched/setup.go @@ -18,12 +18,10 @@ package sched import ( "context" "fmt" - "os" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/events" "k8s.io/component-base/configz" - utilversion "k8s.io/component-base/version" "k8s.io/klog/v2" "k8s.io/kubernetes/cmd/kube-scheduler/app" schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" @@ -35,8 +33,28 @@ import ( "k8s.io/kubernetes/pkg/scheduler/profile" ) -func SetupScheduler(ctx context.Context, outOfTreeRegistryOptions ...app.Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { +const ( + schedulerConfigFlagSet = "misc" + schedulerConfigFlag = "config" + configName = "componentconfig" +) + +func SetupScheduler( + ctx context.Context, + schedulerConfigPath string, + outOfTreeRegistryOptions ...app.Option, +) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { opts := options.NewOptions() + schedulerConfigFlag := opts.Flags.FlagSet(schedulerConfigFlagSet).Lookup(schedulerConfigFlag) + schedulerConfigFlag.Changed = true + err := schedulerConfigFlag.Value.Set(schedulerConfigPath) + if err != nil { + return nil, nil, err + } + err = opts.ComponentGlobalsRegistry.Set() + if err != nil { + return nil, nil, err + } if cfg, err := latest.Default(); err != nil { return nil, nil, err @@ -71,7 +89,7 @@ func SetupScheduler(ctx context.Context, outOfTreeRegistryOptions ...app.Option) cc.InformerFactory, cc.DynInformerFactory, recorderFactory, - scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), + scheduler.WithComponentConfigVersion(cc.ComponentConfig.APIVersion), scheduler.WithKubeConfig(cc.KubeConfig), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), @@ -82,14 +100,18 @@ func SetupScheduler(ctx context.Context, outOfTreeRegistryOptions ...app.Option) scheduler.WithExtenders(cc.ComponentConfig.Extenders...), scheduler.WithParallelism(cc.ComponentConfig.Parallelism), scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) { - // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging completedProfiles = append(completedProfiles, profile) }), ) if err != nil { return nil, nil, err } - if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil { + if err := options.LogOrWriteConfig( + klog.FromContext(ctx), + opts.WriteConfigTo, + &cc.ComponentConfig, + completedProfiles, + ); err != nil { return nil, nil, err } @@ -99,14 +121,9 @@ func SetupScheduler(ctx context.Context, outOfTreeRegistryOptions ...app.Option) func RunScheduler(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { logger := klog.FromContext(ctx) - // To help debugging, immediately log version - logger.Info("Starting Kubernetes Scheduler", "version", utilversion.Get()) - - logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) - - // Configz registration. - if cz, err := configz.New("componentconfig"); err != nil { - return fmt.Errorf("unable to register configz: %s", err) + // Config registration. + if cz, err := configz.New(configName); err != nil { + return fmt.Errorf("unable to register config: %s", err) } else { cz.Set(cc.ComponentConfig) } @@ -114,6 +131,31 @@ func RunScheduler(ctx context.Context, cc *schedulerserverconfig.CompletedConfig // Start events processing pipeline. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) defer cc.EventBroadcaster.Shutdown() + + startInformersAndWaitForSync := func(ctx context.Context) { + // Start all informers. + cc.InformerFactory.Start(ctx.Done()) + // DynInformerFactory can be nil in tests. + if cc.DynInformerFactory != nil { + cc.DynInformerFactory.Start(ctx.Done()) + } + + // Wait for all caches to sync before scheduling. + cc.InformerFactory.WaitForCacheSync(ctx.Done()) + // DynInformerFactory can be nil in tests. + if cc.DynInformerFactory != nil { + cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) + } + + // Wait for all handlers to sync (all items in the initial list delivered) before scheduling. + if err := sched.WaitForHandlersSync(ctx); err != nil { + logger.Error(err, "waiting for handlers to sync") + } + logger.V(3).Info("Handlers synced") + } + startInformersAndWaitForSync(ctx) + + go sched.Run(ctx) return nil } diff --git a/config/samples/scheduler-config.yaml b/config/samples/scheduler-config.yaml new file mode 100644 index 0000000..3f8633d --- /dev/null +++ b/config/samples/scheduler-config.yaml @@ -0,0 +1,47 @@ +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +clientConnection: + kubeconfig: /Users/joeyyang/.kube/config +profiles: +# Refer: https://kubernetes.io/docs/reference/scheduling/config/ +- schedulerName: tensor-fusion-scheduler + plugins: + filter: + enabled: + - name: GPUResourceFit + - name: GPUNetworkTopologyAware + score: + enabled: + - name: GPUResourceFit + weight: 5 + - name: GPUNetworkTopologyAware + weight: 7 + pluginConfig: + - name: GPUResourceFit + args: + maxWorkerPerNode: 256 + - name: GPUNetworkTopologyAware + args: + # Avoid the remote TFWorker RX/TX to avoid single node consume too much bandwidth + # Need enable monitor to take effect + totalIntranetBandWidthGBps: 100 + - name: NodeResourcesFit + args: + scoringStrategy: + resources: + - name: cpu + weight: 1 + - name: memory + weight: 1 + requestedToCapacityRatio: + shape: + - utilization: 0 + score: 0 + - utilization: 80 + score: 10 + - utilization: 90 + score: 1 + - utilization: 100 + score: 0 + type: RequestedToCapacityRatio + diff --git a/internal/metrics/connect.go b/internal/metrics/connect.go index e0307f9..9a03d1e 100644 --- a/internal/metrics/connect.go +++ b/internal/metrics/connect.go @@ -109,6 +109,9 @@ func (t *TimeSeriesDB) SetTableTTL(ttl string) error { &HypervisorWorkerUsageMetrics{}, &HypervisorGPUUsageMetrics{}, } + if t == nil || t.DB == nil { + return nil + } for _, table := range tables { if err := t.DB.Exec("ALTER TABLE " + table.TableName() + " SET ttl = '" + ttl + "'").Error; err != nil { return err diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go new file mode 100644 index 0000000..e96ec05 --- /dev/null +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -0,0 +1,79 @@ +package gpuresources + +import ( + "context" + "encoding/json" + + "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const Name = "GPUResourceFit" + +var _ framework.FilterPlugin = &GPUFit{} +var _ framework.ScorePlugin = &GPUFit{} +var _ framework.ReservePlugin = &GPUFit{} + +type GPUFit struct { + logger *klog.Logger + fh framework.Handle + podLister cache.Indexer + pdbLister cache.Indexer + client *rest.RESTClient + allocator *gpuallocator.GpuAllocator + + cfg *GPUFitConfig +} + +type GPUFitConfig struct { + MaxWorkerPerNode int `json:"maxWorkerPerNode"` +} + +type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) + +func NewWithDeps(allocator *gpuallocator.GpuAllocator) PluginFactoryFunc { + return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + target := &GPUFitConfig{} + if unknown, ok := obj.(*runtime.Unknown); ok { + if err := json.Unmarshal(unknown.Raw, target); err != nil { + return nil, err + } + } + lh := klog.FromContext(ctx).WithValues("plugin", Name) + c := &GPUFit{ + logger: &lh, + fh: handle, + cfg: target, + allocator: allocator, + } + return c, nil + } +} + +func (s *GPUFit) Name() string { + return Name +} + +func (s *GPUFit) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +func (s *GPUFit) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + return 0, nil +} + +func (s *GPUFit) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (s *GPUFit) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} + +func (s *GPUFit) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { +} diff --git a/internal/scheduler/sched_test.go b/internal/scheduler/gpuresources/gpuresources_test.go similarity index 97% rename from internal/scheduler/sched_test.go rename to internal/scheduler/gpuresources/gpuresources_test.go index 15b1328..1b7b792 100644 --- a/internal/scheduler/sched_test.go +++ b/internal/scheduler/gpuresources/gpuresources_test.go @@ -1,4 +1,4 @@ -package scheduler +package gpuresources import ( "context" @@ -85,12 +85,12 @@ func TestPreFilter(t *testing.T) { frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(make([]*v1.Pod, 0), make([]*v1.Node, 0))), ) - if err != nil { + if err != nil { t.Fatal(err) } // TODO add gpu store - cs := &TensorFusionScheduling{ + cs := &GPUFit{ fh: fwk, } @@ -106,7 +106,7 @@ func TestPreFilter(t *testing.T) { state := framework.NewCycleState() for i := range pods { - if _, got := cs.PreFilter(context.TODO(), state, pods[i]); got.Code() != tt.expected[i] { + if got := cs.Filter(context.TODO(), state, pods[i], nil); got.Code() != tt.expected[i] { t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message()) } } @@ -163,7 +163,7 @@ func TestReserve(t *testing.T) { t.Fatal(err) } - cs := &TensorFusionScheduling{ + cs := &GPUFit{ fh: fwk, } @@ -224,7 +224,7 @@ func TestUnreserve(t *testing.T) { t.Fatal(err) } - cs := &TensorFusionScheduling{ + cs := &GPUFit{ fh: fwk, } diff --git a/internal/scheduler/gputopo/gpu_network_topo.go b/internal/scheduler/gputopo/gpu_network_topo.go new file mode 100644 index 0000000..d93e716 --- /dev/null +++ b/internal/scheduler/gputopo/gpu_network_topo.go @@ -0,0 +1,65 @@ +package scheduler + +import ( + "context" + + "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const Name = "GPUNetworkTopologyAware" + +var _ framework.FilterPlugin = &GPUNetworkTopologyAware{} +var _ framework.ScorePlugin = &GPUNetworkTopologyAware{} +var _ framework.ReservePlugin = &GPUNetworkTopologyAware{} + +type GPUNetworkTopologyAware struct { + logger *klog.Logger + fh framework.Handle + allocator *gpuallocator.GpuAllocator + podLister cache.Indexer + pdbLister cache.Indexer + client *rest.RESTClient +} + +type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) + +func NewWithDeps(allocator *gpuallocator.GpuAllocator) PluginFactoryFunc { + return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + lh := klog.FromContext(ctx).WithValues("plugin", Name) + c := &GPUNetworkTopologyAware{ + logger: &lh, + fh: handle, + allocator: allocator, + } + return c, nil + } +} + +func (s *GPUNetworkTopologyAware) Name() string { + return Name +} + +func (s *GPUNetworkTopologyAware) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + return 0, nil +} + +func (s *GPUNetworkTopologyAware) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (s *GPUNetworkTopologyAware) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +func (s *GPUNetworkTopologyAware) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} + +func (s *GPUNetworkTopologyAware) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { +} diff --git a/internal/scheduler/sched.go b/internal/scheduler/sched.go deleted file mode 100644 index f5e3232..0000000 --- a/internal/scheduler/sched.go +++ /dev/null @@ -1,59 +0,0 @@ -package scheduler - -import ( - "context" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" - "k8s.io/kubernetes/pkg/scheduler" - "k8s.io/kubernetes/pkg/scheduler/framework" -) - -const Name = "tf-scheduler" - -var _ framework.PreFilterPlugin = &TensorFusionScheduling{} -var _ framework.ReservePlugin = &TensorFusionScheduling{} - -type TensorFusionScheduling struct { - logger *klog.Logger - fh framework.Handle - podLister cache.Indexer - pdbLister cache.Indexer - client *rest.RESTClient -} - -func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - lh := klog.FromContext(ctx).WithValues("plugin", Name) - c := &TensorFusionScheduling{ - logger: &lh, - fh: handle, - } - return c, nil -} - -func (s *TensorFusionScheduling) Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) { - -} - -func (s *TensorFusionScheduling) Name() string { - return Name -} - -func (s *TensorFusionScheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - return nil, nil -} - -func (s *TensorFusionScheduling) PreFilterExtensions() framework.PreFilterExtensions { - return nil -} - -func (s *TensorFusionScheduling) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - return nil -} - -func (s *TensorFusionScheduling) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { -} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 98284c9..13e0098 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -79,6 +79,7 @@ func (wg *WorkerGenerator) GenerateWorkerPod( VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: constants.TFDataPath, + Type: ptr.To(corev1.HostPathDirectoryOrCreate), }, }, }) diff --git a/tf-scheduler/scheduler-plugins b/tf-scheduler/scheduler-plugins new file mode 160000 index 0000000..93126ea --- /dev/null +++ b/tf-scheduler/scheduler-plugins @@ -0,0 +1 @@ +Subproject commit 93126eabdf526010bf697d5963d849eab7e8e898 From 8562c784d8f91227fd2f215093967ee6446f4ff7 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Wed, 18 Jun 2025 23:03:52 +0800 Subject: [PATCH 4/4] fix: convert scheduler kubeconfig and other args --- cmd/sched/setup.go | 50 ++++++++++++++++++- config/samples/scheduler-config.yaml | 2 +- internal/constants/constants.go | 4 +- internal/gpuallocator/gpuallocator.go | 3 +- .../scheduler/gputopo/gpu_network_topo.go | 13 +++++ internal/webhook/v1/pod_webhook.go | 15 ++++++ 6 files changed, 83 insertions(+), 4 deletions(-) diff --git a/cmd/sched/setup.go b/cmd/sched/setup.go index c43cb75..40268a1 100644 --- a/cmd/sched/setup.go +++ b/cmd/sched/setup.go @@ -18,6 +18,8 @@ package sched import ( "context" "fmt" + "os" + "strings" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/events" @@ -31,12 +33,15 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/profile" + yaml "sigs.k8s.io/yaml" ) const ( schedulerConfigFlagSet = "misc" schedulerConfigFlag = "config" configName = "componentconfig" + clientConnectionCfgKey = "clientConnection" + kubeConfigCfgKey = "kubeconfig" ) func SetupScheduler( @@ -47,7 +52,11 @@ func SetupScheduler( opts := options.NewOptions() schedulerConfigFlag := opts.Flags.FlagSet(schedulerConfigFlagSet).Lookup(schedulerConfigFlag) schedulerConfigFlag.Changed = true - err := schedulerConfigFlag.Value.Set(schedulerConfigPath) + cfgPath, err := preHandleConfig(schedulerConfigPath) + if err != nil { + return nil, nil, err + } + err = schedulerConfigFlag.Value.Set(cfgPath) if err != nil { return nil, nil, err } @@ -164,3 +173,42 @@ func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.Recor return cc.EventBroadcaster.NewRecorder(name) } } + +func preHandleConfig(cfgPath string) (string, error) { + tempDir := os.TempDir() + tempFile, err := os.CreateTemp(tempDir, "kube-scheduler-config") + if err != nil { + return "", err + } + defer tempFile.Close() + cfgBytes, err := os.ReadFile(cfgPath) + if err != nil { + return "", err + } + var cfgRaw map[string]interface{} + err = yaml.Unmarshal(cfgBytes, &cfgRaw) + if err != nil { + return "", err + } + + // Replace $HOME with actual home directory + if cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey] != "" { + cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey] = strings.ReplaceAll( + cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey].(string), + "$HOME", + os.Getenv("HOME"), + ) + } + + // TODO set other fields if needed + + cfgBytes, err = yaml.Marshal(cfgRaw) + if err != nil { + return "", err + } + + if err := os.WriteFile(tempFile.Name(), cfgBytes, 0644); err != nil { + return "", err + } + return tempFile.Name(), nil +} diff --git a/config/samples/scheduler-config.yaml b/config/samples/scheduler-config.yaml index 3f8633d..f8181e6 100644 --- a/config/samples/scheduler-config.yaml +++ b/config/samples/scheduler-config.yaml @@ -1,7 +1,7 @@ apiVersion: kubescheduler.config.k8s.io/v1 kind: KubeSchedulerConfiguration clientConnection: - kubeconfig: /Users/joeyyang/.kube/config + kubeconfig: $HOME/.kube/config profiles: # Refer: https://kubernetes.io/docs/reference/scheduling/config/ - schedulerName: tensor-fusion-scheduler diff --git a/internal/constants/constants.go b/internal/constants/constants.go index f7e39b0..6e9c8fa 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -13,6 +13,8 @@ const ( FinalizerSuffix = "finalizer" Finalizer = Domain + "/" + FinalizerSuffix + SchedulerName = "tensor-fusion-scheduler" + LabelKeyOwner = Domain + "/managed-by" LabelKeyUser = Domain + "/used-by" LabelKeyClusterOwner = Domain + "/cluster" @@ -99,7 +101,7 @@ const ( QoSLevelHigh = "high" QoSLevelCritical = "critical" - EnableWebhookEnv = "ENABLE_WEBHOOKS" + EnableWebhookEnv = "ENABLE_WEBHOOKS" EnableSchedulerEnv = "ENABLE_SCHEDULER" ) diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index a3373f6..5eceb25 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -11,6 +11,7 @@ import ( tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator/filter" + "github.com/NexusGPU/tensor-fusion/internal/utils" "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -446,7 +447,7 @@ func (s *GpuAllocator) syncToK8s(ctx context.Context) { var patch []byte timeValue := time.Now().Format(time.RFC3339) - encodedKey := strings.ReplaceAll(constants.GPULastReportTimeAnnotationKey, "/", "~1") + encodedKey := utils.EscapeJSONPointer(constants.GPULastReportTimeAnnotationKey) // Check if annotations already exist if node.Annotations == nil { diff --git a/internal/scheduler/gputopo/gpu_network_topo.go b/internal/scheduler/gputopo/gpu_network_topo.go index d93e716..3036fbf 100644 --- a/internal/scheduler/gputopo/gpu_network_topo.go +++ b/internal/scheduler/gputopo/gpu_network_topo.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "encoding/json" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" v1 "k8s.io/api/core/v1" @@ -25,17 +26,29 @@ type GPUNetworkTopologyAware struct { podLister cache.Indexer pdbLister cache.Indexer client *rest.RESTClient + cfg *GPUNetworkTopologyAwareConfig +} + +type GPUNetworkTopologyAwareConfig struct { + TotalIntranetBandWidthGBps int64 `json:"totalIntranetBandWidthGBps"` } type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) func NewWithDeps(allocator *gpuallocator.GpuAllocator) PluginFactoryFunc { return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + target := &GPUNetworkTopologyAwareConfig{} + if unknown, ok := obj.(*runtime.Unknown); ok { + if err := json.Unmarshal(unknown.Raw, target); err != nil { + return nil, err + } + } lh := klog.FromContext(ctx).WithValues("plugin", Name) c := &GPUNetworkTopologyAware{ logger: &lh, fh: handle, allocator: allocator, + cfg: target, } return c, nil } diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 66a23c8..185f162 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -127,6 +127,7 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque } workerFound := false + // TODO refactor, get rid of this for i := 0; i < 25; i++ { workloadStatus, err := worker.SelectWorker(ctx, m.Client, workload, 1) if err != nil { @@ -167,6 +168,20 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque patches = append(patches, patch) } + // Inject scheduler name + patches = append(patches, jsonpatch.JsonPatchOperation{ + Operation: "add", + Path: "/spec/schedulerName", + Value: constants.SchedulerName, + }) + + // TODO refactor, for localGPU mode, should add worker identifier in this pod, + // so that for scheduler to assign resources + // when it's auto-replicas mode, should create another worker pod and + // set owner to this Pod in pod controller, workload label to TFWorkload CR + // and for non-local-gpu & auto-replicas mode, connection info should be fixed + // and for non-local-gpu & specified-worker-replicas mode, connection info should be dynamic + return admission.Patched("tensor fusion component patched", patches...) }